1 /** 2 * This module provides an implementation of the classical thread-pool model. 3 * 4 * Copyright: Copyright (C) 2007-2008 Anders Halager. All rights reserved. 5 * License: BSD style: $(LICENSE) 6 * Author: Anders Halager 7 */ 8 9 module tango.core.ThreadPool; 10 11 private import tango.core.Thread, 12 tango.core.sync.Atomic, 13 tango.core.sync.Mutex, 14 tango.core.sync.Condition, 15 tango.core.Exception : ThreadPoolException; 16 17 private import tango.stdc.string: memmove; 18 19 private version = Queued; 20 21 /** 22 * A thread pool is a way to process multiple jobs in parallel without creating 23 * a new thread for each job. This way the overhead of creating a thread is 24 * only paid once, and not once for each job and you can limit the maximum 25 * number of threads active at any one point. 26 * 27 * In this case a "job" is simply a delegate and some parameters the delegate 28 * will be called with after having been added to the thread pool's queue. 29 * 30 * Example: 31 * -------------------- 32 * // create a new pool with two threads 33 * auto pool = new ThreadPool!(int)(2); 34 * void delegate(int) f = (int x) { Log(x); }; 35 * 36 * // Now we have three ways of telling the pool to execute our jobs 37 * // First we can say we just want it done at some later point 38 * pool.append(f, 1); 39 * // Secondly we can ask for a job to be done as soon as possible, blocking 40 * // until it is started by some thread 41 * pool.assign(f, 2); 42 * // Finally we can say we either want it done immediately or not at all 43 * if (pool.tryAssign(f, 3)) 44 * Log("Someone took the job!"); 45 * else 46 * Log("No one was available to do the job right now"); 47 * // After giving the pool some jobs to do, we need to give it a chance to 48 * // finish, so we can do one of two things. 49 * // Choice no. 1 is to finish what has already been assigned to the threads, 50 * // but ignore any remaining queued jobs 51 * // pool.shutdown(); 52 * // The other choice is to finish all jobs currently executing or in queue: 53 * pool.finish(); 54 * -------------------- 55 * 56 * If append isn't called there should be no additional heap allocations after 57 * initialization. 58 */ 59 60 class ThreadPool(Args...) 61 { 62 /// An alias for the type of delegates this thread pool considers a job 63 alias void delegate(Args) JobD; 64 65 /** 66 * Create a new ThreadPool. 67 * 68 * Params: 69 * workers = The amount of threads to spawn 70 * q_size = The expected size of the queue (how many elements are 71 * preallocated) 72 */ 73 this(size_t workers, size_t q_size = 0) 74 { 75 // pre-allocate memory for q_size jobs in the queue 76 q.length = q_size; 77 q.length = 0; 78 79 m = new Mutex; 80 poolActivity = new Condition(m); 81 workerActivity = new Condition(m); 82 83 flagSet(priority_job, cast(Job*) null); 84 flagSet(active_jobs, cast(size_t) 0); 85 flagSet(done, false); 86 87 for (size_t i = 0; i < workers; i++) 88 { 89 auto thread = new Thread(&doJob); 90 // Allow the OS to kill the threads if we exit the program without 91 // handling them our selves 92 thread.isDaemon = true; 93 thread.start(); 94 pool ~= thread; 95 } 96 } 97 98 /** 99 Assign the given job to a thread immediately or block until one is 100 available 101 */ 102 void assign(JobD job, Args args) 103 { 104 if(this.pool.length == 0) 105 { 106 throw new ThreadPoolException("No workers available!"); 107 } 108 109 m.lock(); 110 scope(exit) m.unlock(); 111 auto j = Job(job, args); 112 flagSet(priority_job, &j); 113 poolActivity.notify(); 114 // Wait until someone has taken the job 115 while (flagGet(priority_job) !is null) 116 workerActivity.wait(); 117 } 118 119 /** 120 Assign the given job to a thread immediately or return false if none is 121 available. (Returns true if one was available) 122 */ 123 bool tryAssign(JobD job, Args args) 124 { 125 if (flagGet(active_jobs) >= pool.length) 126 return false; 127 assign(job, args); 128 return true; 129 } 130 131 /** 132 Put a job into the pool for eventual execution. 133 134 Warning: Acts as a stack, not a queue as you would expect 135 */ 136 void append(JobD job, Args args) 137 { 138 if(this.pool.length == 0) 139 { 140 throw new ThreadPoolException("No workers available!"); 141 } 142 143 m.lock(); 144 q ~= Job(job, args); 145 m.unlock(); 146 poolActivity.notify(); 147 } 148 149 /// Get the number of jobs waiting to be executed 150 size_t pendingJobs() 151 { 152 m.lock(); scope(exit) m.unlock(); 153 return q.length; 154 } 155 156 /// Get the number of jobs being executed 157 size_t activeJobs() 158 { 159 return flagGet(active_jobs); 160 } 161 162 /// Block until all pending jobs complete, but do not shut down. This allows more tasks to be added later. 163 void wait() 164 { 165 m.lock(); 166 while (q.length > 0 || flagGet(active_jobs) > 0) 167 workerActivity.wait(); 168 m.unlock(); 169 } 170 171 /// Finish currently executing jobs and drop all pending. 172 void shutdown() 173 { 174 flagSet(done, true); 175 m.lock(); 176 q.length = 0; 177 m.unlock(); 178 poolActivity.notifyAll(); 179 foreach (thread; pool) 180 thread.join(); 181 182 pool.length = 0; 183 184 m.lock(); 185 m.unlock(); 186 } 187 188 /// Complete all pending jobs and shutdown. 189 void finish() 190 { 191 wait(); 192 shutdown(); 193 } 194 195 private: 196 // Our list of threads -- only used during startup and shutdown 197 Thread[] pool; 198 struct Job 199 { 200 JobD dg; 201 Args args; 202 } 203 // Used for storing queued jobs that will be executed eventually 204 Job[] q; 205 206 // This is to store a single job for immediate execution, which hopefully 207 // means that any program using only assign and tryAssign wont need any 208 // heap allocations after startup. 209 Job* priority_job; 210 211 // This should be used when accessing the job queue 212 Mutex m; 213 214 // Notify is called on this condition whenever we have activity in the pool 215 // that the workers might want to know about. 216 Condition poolActivity; 217 218 // Worker threads call notify on this when they are done with a job or are 219 // completely done. 220 // This allows a graceful shut down and is necessary since assign has to 221 // wait for a job to become available 222 Condition workerActivity; 223 224 // Are we in the shutdown phase? 225 bool done; 226 227 // Counter for the number of jobs currently being calculated 228 size_t active_jobs; 229 230 // Thread delegate: 231 void doJob() 232 { 233 while (!flagGet(done)) 234 { 235 m.lock(); 236 while (q.length == 0 && flagGet(priority_job) is null && !flagGet(done)) 237 poolActivity.wait(); 238 if (flagGet(done)) { 239 m.unlock(); // not using scope(exit), need to manually unlock 240 break; 241 } 242 Job job; 243 Job* jobPtr = flagGet(priority_job); 244 if (jobPtr !is null) 245 { 246 job = *jobPtr; 247 flagSet(priority_job, cast(Job*)null); 248 workerActivity.notify(); 249 } 250 else 251 { 252 version (Queued) // #1896 253 { 254 job = q[0]; 255 memmove(q.ptr, q.ptr + 1, (q.length - 1) * typeof(*(q.ptr)).sizeof); 256 q.length = q.length - 1; 257 } 258 else 259 { 260 // A stack -- should be a queue 261 job = q[$ - 1]; 262 q.length = q.length - 1; 263 } 264 } 265 266 // Make sure we unlock before we start doing the calculations 267 m.unlock(); 268 269 // Do the actual job 270 flagAdd!(size_t)(active_jobs, 1); 271 try { 272 job.dg(job.args); 273 } catch (Exception ex) { } 274 flagAdd!(size_t)(active_jobs, -1); 275 276 // Tell the pool that we are done with something 277 m.lock(); 278 workerActivity.notify(); 279 m.unlock(); 280 } 281 // Tell the pool that we are now done 282 m.lock(); 283 workerActivity.notify(); 284 m.unlock(); 285 } 286 } 287 288 289 290 /******************************************************************************* 291 292 Invoke as "threadpool 1 2 3 4 5 6 7 10 20" or similar 293 294 *******************************************************************************/ 295 296 debug (ThreadPool) 297 { 298 import tango.util.log.Trace; 299 import Integer = tango.text.convert.Integer; 300 301 void main(char[][] args) 302 { 303 long job(long val) 304 { 305 // a 'big job' 306 Thread.sleep (3.0/val); 307 return val; 308 } 309 310 void hashJob(char[] file) 311 { 312 // If we don't catch exceptions the thread-pool will still 313 // work, but the job will fail silently 314 try { 315 long n = Integer.parse(file); 316 Trace.formatln("job({}) = {}", n, job(n)); 317 } catch (Exception ex) { 318 Trace.formatln("Exception: {}", ex.msg); 319 } 320 } 321 322 // Create new thread pool with one worker thread per file given 323 auto thread_pool = new ThreadPool!(char[])(args.length - 1); 324 325 Thread.sleep(1); 326 Trace.formatln ("starting"); 327 328 foreach (file; args[1 .. args.length]) 329 thread_pool.assign(&hashJob, file); 330 331 thread_pool.finish(); 332 } 333 }