1 /**
2  * This module provides an implementation of the classical thread-pool model.
3  *
4  * Copyright: Copyright (C) 2007-2008 Anders Halager. All rights reserved.
5  * License:   BSD style: $(LICENSE)
6  * Author:    Anders Halager
7  */
8 
9 module tango.core.ThreadPool;
10 
11 private import tango.core.Thread,
12                 tango.core.sync.Atomic,
13                 tango.core.sync.Mutex,
14                 tango.core.sync.Condition,
15                 tango.core.Exception : ThreadPoolException;
16 
17 private import  tango.stdc.string: memmove;
18 
19 private version = Queued;
20 
21 /**
22  * A thread pool is a way to process multiple jobs in parallel without creating
23  * a new thread for each job. This way the overhead of creating a thread is
24  * only paid once, and not once for each job and you can limit the maximum
25  * number of threads active at any one point.
26  *
27  * In this case a "job" is simply a delegate and some parameters the delegate
28  * will be called with after having been added to the thread pool's queue.
29  *
30  * Example:
31  * --------------------
32  * // create a new pool with two threads
33  * auto pool = new ThreadPool!(int)(2);
34  * void delegate(int) f = (int x) { Log(x); };
35  *
36  * // Now we have three ways of telling the pool to execute our jobs
37  * // First we can say we just want it done at some later point
38  * pool.append(f, 1);
39  * // Secondly we can ask for a job to be done as soon as possible, blocking
40  * // until it is started by some thread
41  * pool.assign(f, 2);
42  * // Finally we can say we either want it done immediately or not at all
43  * if (pool.tryAssign(f, 3))
44  *     Log("Someone took the job!");
45  * else
46  *     Log("No one was available to do the job right now");
47  * // After giving the pool some jobs to do, we need to give it a chance to
48  * // finish, so we can do one of two things.
49  * // Choice no. 1 is to finish what has already been assigned to the threads,
50  * // but ignore any remaining queued jobs
51  * //   pool.shutdown();
52  * // The other choice is to finish all jobs currently executing or in queue:
53  * pool.finish();
54  * --------------------
55  *
56  * If append isn't called there should be no additional heap allocations after
57  * initialization.
58  */
59 
60 class ThreadPool(Args...)
61 {
62     /// An alias for the type of delegates this thread pool considers a job
63     alias void delegate(Args) JobD;
64 
65     /**
66      * Create a new ThreadPool.
67      *
68      * Params:
69      *   workers = The amount of threads to spawn
70      *   q_size  = The expected size of the queue (how many elements are
71      *   preallocated)
72      */
73     this(size_t workers, size_t q_size = 0)
74     {
75         // pre-allocate memory for q_size jobs in the queue
76         q.length = q_size;
77         q.length = 0;
78 
79         m = new Mutex;
80         poolActivity = new Condition(m);
81         workerActivity = new Condition(m);
82 
83         flagSet(priority_job, cast(Job*) null);
84         flagSet(active_jobs, cast(size_t) 0);
85         flagSet(done, false);
86 
87         for (size_t i = 0; i < workers; i++)
88         {
89             auto thread = new Thread(&doJob);
90             // Allow the OS to kill the threads if we exit the program without
91             // handling them our selves
92             thread.isDaemon = true;
93             thread.start();
94             pool ~= thread;
95         }
96     }
97 
98     /**
99       Assign the given job to a thread immediately or block until one is
100       available
101      */
102     void assign(JobD job, Args args)
103     {
104         if(this.pool.length == 0)
105         {
106             throw new ThreadPoolException("No workers available!");
107         }
108 
109         m.lock();
110         scope(exit) m.unlock();
111         auto j = Job(job, args);
112         flagSet(priority_job, &j);
113         poolActivity.notify();
114         // Wait until someone has taken the job
115         while (flagGet(priority_job) !is null)
116             workerActivity.wait();
117     }
118 
119     /**
120       Assign the given job to a thread immediately or return false if none is
121       available. (Returns true if one was available)
122      */
123     bool tryAssign(JobD job, Args args)
124     {
125         if (flagGet(active_jobs) >= pool.length)
126             return false;
127         assign(job, args);
128         return true;
129     }
130 
131     /**
132       Put a job into the pool for eventual execution.
133 
134       Warning: Acts as a stack, not a queue as you would expect
135      */
136     void append(JobD job, Args args)
137     {
138         if(this.pool.length == 0)
139         {
140             throw new ThreadPoolException("No workers available!");        
141         }
142 
143         m.lock();
144         q ~= Job(job, args);
145         m.unlock();
146         poolActivity.notify();
147     }
148 
149     /// Get the number of jobs waiting to be executed
150     size_t pendingJobs()
151     {
152         m.lock(); scope(exit) m.unlock();
153         return q.length;
154     }
155 
156     /// Get the number of jobs being executed
157     size_t activeJobs()
158     {
159         return flagGet(active_jobs);
160     }
161 
162     /// Block until all pending jobs complete, but do not shut down.  This allows more tasks to be added later.
163     void wait()
164     {    
165         m.lock();
166         while (q.length > 0 || flagGet(active_jobs) > 0)
167                workerActivity.wait();
168         m.unlock();
169     } 
170 
171     /// Finish currently executing jobs and drop all pending.
172     void shutdown()
173     {
174         flagSet(done, true);
175         m.lock();
176         q.length = 0;
177         m.unlock();
178         poolActivity.notifyAll();
179         foreach (thread; pool)
180             thread.join();
181 
182         pool.length = 0;
183 
184         m.lock();
185         m.unlock();
186     }
187 
188     /// Complete all pending jobs and shutdown.
189     void finish()
190     {
191         wait();
192         shutdown();
193     }
194 
195 private:
196     // Our list of threads -- only used during startup and shutdown
197     Thread[] pool;
198     struct Job
199     {
200         JobD dg;
201         Args args;
202     }
203     // Used for storing queued jobs that will be executed eventually
204     Job[] q;
205 
206     // This is to store a single job for immediate execution, which hopefully
207     // means that any program using only assign and tryAssign wont need any
208     // heap allocations after startup.
209     Job* priority_job;
210 
211     // This should be used when accessing the job queue
212     Mutex m;
213 
214     // Notify is called on this condition whenever we have activity in the pool
215     // that the workers might want to know about.
216     Condition poolActivity;
217 
218     // Worker threads call notify on this when they are done with a job or are
219     // completely done.
220     // This allows a graceful shut down and is necessary since assign has to
221     // wait for a job to become available
222     Condition workerActivity;
223 
224     // Are we in the shutdown phase?
225     bool done;
226 
227     // Counter for the number of jobs currently being calculated
228     size_t active_jobs;
229 
230     // Thread delegate:
231     void doJob()
232     {
233         while (!flagGet(done))
234         {
235             m.lock();
236             while (q.length == 0 && flagGet(priority_job) is null && !flagGet(done))
237                 poolActivity.wait();
238             if (flagGet(done)) {
239                 m.unlock(); // not using scope(exit), need to manually unlock
240                 break;
241             }
242             Job job;
243             Job* jobPtr = flagGet(priority_job);
244             if (jobPtr !is null)
245             {
246                 job = *jobPtr;
247                 flagSet(priority_job, cast(Job*)null);
248                 workerActivity.notify();
249             }
250             else
251             {
252                 version (Queued) // #1896
253                         {
254                         job = q[0];
255                         memmove(q.ptr, q.ptr + 1, (q.length - 1) * typeof(*(q.ptr)).sizeof);
256                         q.length = q.length - 1;
257                         }
258                      else
259                         {
260                         // A stack -- should be a queue
261                         job = q[$ - 1];
262                         q.length = q.length - 1;
263                         }
264             }
265 
266             // Make sure we unlock before we start doing the calculations
267             m.unlock();
268 
269             // Do the actual job
270             flagAdd!(size_t)(active_jobs, 1);
271             try {
272                 job.dg(job.args);
273             } catch (Exception ex) { }
274             flagAdd!(size_t)(active_jobs, -1);
275 
276             // Tell the pool that we are done with something
277             m.lock();
278             workerActivity.notify();
279             m.unlock();
280         }
281         // Tell the pool that we are now done
282         m.lock();
283         workerActivity.notify();
284         m.unlock();
285     }
286 }
287 
288 
289 
290 /*******************************************************************************
291 
292         Invoke as "threadpool 1 2 3 4 5 6 7 10 20" or similar
293 
294 *******************************************************************************/
295 
296 debug (ThreadPool)
297 {
298         import tango.util.log.Trace;
299         import Integer = tango.text.convert.Integer;
300 
301         void main(char[][] args)
302         {
303                 long job(long val)
304                 {
305                         // a 'big job'
306                         Thread.sleep (3.0/val);
307                         return val;
308                 }
309 
310                 void hashJob(char[] file)
311                 {
312                         // If we don't catch exceptions the thread-pool will still
313                         // work, but the job will fail silently
314                         try {
315                             long n = Integer.parse(file);
316                             Trace.formatln("job({}) = {}", n, job(n));
317                             } catch (Exception ex) {
318                                     Trace.formatln("Exception: {}", ex.msg);
319                                     }
320                 }
321 
322                 // Create new thread pool with one worker thread per file given
323                 auto thread_pool = new ThreadPool!(char[])(args.length - 1);
324 
325                 Thread.sleep(1);
326                 Trace.formatln ("starting");
327 
328                 foreach (file; args[1 .. args.length])
329                          thread_pool.assign(&hashJob, file);
330 
331                 thread_pool.finish();
332         }
333 }