1 /**
2 * The semaphore module provides a general use semaphore for synchronization.
3 *
4 * Copyright: Copyright (C) 2005-2006 Sean Kelly. All rights reserved.
5 * License: BSD style: $(LICENSE)
6 * Author: Sean Kelly
7 */
8 module tango.core.sync.Semaphore;
9
10
11 public import tango.core.Exception : SyncException;
12
13 import Time = tango.core.Time;
14
15 version( Win32 )
16 {
17 private import tango.sys.win32.UserGdi;
18 }
19 else version( Posix )
20 {
21 private import tango.core.sync.Config;
22 private import tango.stdc.errno;
23 private import tango.stdc.posix.pthread;
24 private import tango.stdc.posix.semaphore;
25 }
26
27
28 ////////////////////////////////////////////////////////////////////////////////
29 // Semaphore
30 //
31 // void wait();
32 // void notify();
33 // bool tryWait();
34 ////////////////////////////////////////////////////////////////////////////////
35
36
37 /**
38 * This class represents a general counting semaphore as concieved by Edsger
39 * Dijkstra. As per Mesa type monitors however, "signal" has been replaced
40 * with "notify" to indicate that control is not transferred to the waiter when
41 * a notification is sent.
42 */
43 class Semaphore
44 {
45 ////////////////////////////////////////////////////////////////////////////
46 // Initialization
47 ////////////////////////////////////////////////////////////////////////////
48
49
50 /**
51 * Initializes a semaphore object with the specified initial count.
52 *
53 * Params:
54 * count = The initial count for the semaphore.
55 *
56 * Throws:
57 * SyncException on error.
58 */
59 this( uint count = 0 )
60 {
61 version( Win32 )
62 {
63 m_hndl = CreateSemaphoreA( null, count, int.max, null );
64 if( m_hndl == m_hndl.init )
65 throw new SyncException( "Unable to create semaphore" );
66 }
67 else version(OSX){
68 creatingTask=mach_task_self();
69 auto rc=semaphore_create(creatingTask,
70 &m_hndl,
71 MACH_SYNC_POLICY.SYNC_POLICY_FIFO,
72 count);
73 if( rc )
74 throw new SyncException( "Unable to create semaphore" );
75 }
76 else version( Posix )
77 {
78 int rc = sem_init( &m_hndl, 0, count );
79 if( rc )
80 throw new SyncException( "Unable to create semaphore" );
81 }
82 }
83
84
85 ~this()
86 {
87 version( Win32 )
88 {
89 BOOL rc = CloseHandle( m_hndl );
90 assert( rc, "Unable to destroy semaphore" );
91 }
92 else version(OSX)
93 {
94 auto rc=semaphore_destroy(creatingTask, m_hndl);
95 assert( !rc, "Unable to destroy semaphore" );
96 }
97 else version( Posix )
98 {
99 int rc = sem_destroy( &m_hndl );
100 assert( !rc, "Unable to destroy semaphore" );
101 }
102 }
103
104
105 ////////////////////////////////////////////////////////////////////////////
106 // General Actions
107 ////////////////////////////////////////////////////////////////////////////
108
109
110 /**
111 * Wait until the current count is above zero, then atomically decrement
112 * the count by one and return.
113 *
114 * Throws:
115 * SyncException on error.
116 */
117 void wait()
118 {
119 version( Win32 )
120 {
121 DWORD rc = WaitForSingleObject( m_hndl, INFINITE );
122 if( rc != WAIT_OBJECT_0 )
123 throw new SyncException( "Unable to wait for semaphore" );
124 }
125 else version(OSX){
126 while (true){
127 auto rc=semaphore_wait(m_hndl);
128 if (rc==KERN_RETURN.ABORTED){
129 if( errno != EINTR )
130 throw new SyncException( "Unable to wait for semaphore (abort)" );
131 // wait again
132 } else if (rc!=0) {
133 throw new SyncException( "Unable to wait for semaphore" );
134 } else {
135 break;
136 }
137 }
138 }
139 else version( Posix )
140 {
141 while( true )
142 {
143 if( !sem_wait( &m_hndl ) )
144 return;
145 if( errno != EINTR )
146 throw new SyncException( "Unable to wait for semaphore" );
147 }
148 }
149 }
150
151
152 /**
153 * Suspends the calling thread until the current count moves above zero or
154 * until the supplied time period has elapsed. If the count moves above
155 * zero in this interval, then atomically decrement the count by one and
156 * return true. Otherwise, return false. The supplied period may be up to
157 * a maximum of (uint.max - 1) milliseconds.
158 *
159 * Params:
160 * period = The number of seconds to wait.
161 *
162 * In:
163 * period must be less than (uint.max - 1) milliseconds.
164 *
165 * Returns:
166 * true if notified before the timeout and false if not.
167 *
168 * Throws:
169 * SyncException on error.
170 */
171 bool wait( double period )
172 in
173 {
174 assert( period * 1000 + 0.1 < uint.max - 1);
175 }
176 body
177 {
178 version( Win32 )
179 {
180 DWORD t = cast(DWORD)(period * 1000 + 0.1);
181 switch( WaitForSingleObject( m_hndl, t ) )
182 {
183 case WAIT_OBJECT_0:
184 return true;
185 case WAIT_TIMEOUT:
186 return false;
187 default:
188 throw new SyncException( "Unable to wait for semaphore" );
189 }
190 }
191 else version(OSX){
192 timespec t;
193
194 adjTimespec( t, period );
195 auto rc=semaphore_timedwait(m_hndl,t);
196 if (rc==0){
197 return true;
198 } else if (rc==KERN_RETURN.OPERATION_TIMED_OUT){
199 return false;
200 } else if (rc==KERN_RETURN.ABORTED) {
201 if( errno != EINTR )
202 throw new SyncException( "Unable to wait for semaphore (abort)" );
203 return false; // wait can be too short, wait is not resumed
204 } else {
205 throw new SyncException( "Unable to wait for semaphore" );
206 }
207 }
208 else version( Posix )
209 {
210 timespec t;
211
212 getTimespec( t );
213 adjTimespec( t, period );
214
215 while( true )
216 {
217 if( !sem_timedwait( &m_hndl, &t ) )
218 return true;
219 if( errno == ETIMEDOUT )
220 return false;
221 if( errno != EINTR )
222 throw new SyncException( "Unable to wait for semaphore" );
223 }
224 }
225
226 // -w trip
227 //return false;
228 }
229
230
231 /**
232 * Atomically increment the current count by one. This will notify one
233 * waiter, if there are any in the queue.
234 *
235 * Throws:
236 * SyncException on error.
237 */
238 void notify()
239 {
240 version( Win32 )
241 {
242 if( !ReleaseSemaphore( m_hndl, 1, null ) )
243 throw new SyncException( "Unable to notify semaphore" );
244 }
245 else version(OSX){
246 semaphore_signal(m_hndl);
247 }
248 else version( Posix )
249 {
250 int rc = sem_post( &m_hndl );
251 if( rc )
252 throw new SyncException( "Unable to notify semaphore" );
253 }
254 }
255
256
257 /**
258 * If the current count is equal to zero, return. Otherwise, atomically
259 * decrement the count by one and return true.
260 *
261 * Returns:
262 * true if the count was above zero and false if not.
263 *
264 * Throws:
265 * SyncException on error.
266 */
267 bool tryWait()
268 {
269 version( Win32 )
270 {
271 switch( WaitForSingleObject( m_hndl, 0 ) )
272 {
273 case WAIT_OBJECT_0:
274 return true;
275 case WAIT_TIMEOUT:
276 return false;
277 default:
278 throw new SyncException( "Unable to wait for semaphore" );
279 }
280 }
281 else version(OSX){
282 return wait(0.0);
283 }
284 else version( Posix )
285 {
286 while( true )
287 {
288 if( !sem_trywait( &m_hndl ) )
289 return true;
290 if( errno == EAGAIN )
291 return false;
292 if( errno != EINTR )
293 throw new SyncException( "Unable to wait for semaphore" );
294 }
295 }
296
297 // -w trip
298 //return false;
299 }
300
301
302 private:
303 version( Win32 )
304 {
305 HANDLE m_hndl;
306 }
307 else version(OSX){
308 task_t creatingTask;
309 semaphore_t m_hndl;
310 }
311 else version( Posix )
312 {
313 sem_t m_hndl;
314 }
315 }
316
317
318 ////////////////////////////////////////////////////////////////////////////////
319 // Unit Tests
320 ////////////////////////////////////////////////////////////////////////////////
321
322
323 debug( UnitTest )
324 {
325 private import tango.core.Thread;
326
327
328 void testWait()
329 {
330 auto semaphore = new Semaphore;
331 int numToProduce = 10;
332 bool allProduced = false;
333 auto synProduced = new Object;
334 int numConsumed = 0;
335 auto synConsumed = new Object;
336 int numConsumers = 10;
337 int numComplete = 0;
338 auto synComplete = new Object;
339
340 void consumer()
341 {
342 while( true )
343 {
344 semaphore.wait();
345
346 synchronized( synProduced )
347 {
348 if( allProduced )
349 break;
350 }
351
352 synchronized( synConsumed )
353 {
354 ++numConsumed;
355 }
356 }
357
358 synchronized( synComplete )
359 {
360 ++numComplete;
361 }
362 }
363
364 void producer()
365 {
366 assert( !semaphore.tryWait() );
367
368 for( int i = 0; i < numToProduce; ++i )
369 {
370 semaphore.notify();
371 Thread.yield();
372 }
373 Thread.sleep(Time.seconds(1));
374 synchronized( synProduced )
375 {
376 allProduced = true;
377 }
378
379 for( int i = 0; i < numConsumers; ++i )
380 {
381 semaphore.notify();
382 Thread.yield();
383 }
384
385 for( int i = numConsumers * 10000; i > 0; --i )
386 {
387 synchronized( synComplete )
388 {
389 if( numComplete == numConsumers )
390 break;
391 }
392 Thread.yield();
393 }
394
395 synchronized( synComplete )
396 {
397 assert( numComplete == numConsumers );
398 }
399
400 synchronized( synConsumed )
401 {
402 assert( numConsumed == numToProduce );
403 }
404
405 assert( !semaphore.tryWait() );
406 semaphore.notify();
407 assert( semaphore.tryWait() );
408 assert( !semaphore.tryWait() );
409 }
410
411 auto group = new ThreadGroup;
412
413 for( int i = 0; i < numConsumers; ++i )
414 group.create( &consumer );
415 group.create( &producer );
416 group.joinAll();
417 }
418
419
420 void testWaitTimeout()
421 {
422 auto synReady = new Object;
423 auto semReady = new Semaphore;
424 bool waiting = false;
425 bool alertedOne = true;
426 bool alertedTwo = true;
427
428 void waiter()
429 {
430 synchronized( synReady )
431 {
432 waiting = true;
433 }
434 alertedOne = semReady.wait( 0.1 );
435 alertedTwo = semReady.wait( 0.1 );
436 }
437
438 auto thread = new Thread( &waiter );
439 thread.start();
440
441 while( true )
442 {
443 synchronized( synReady )
444 {
445 if( waiting )
446 {
447 semReady.notify();
448 break;
449 }
450 }
451 Thread.yield();
452 }
453 thread.join();
454 assert( waiting && alertedOne && !alertedTwo );
455 }
456
457
458 unittest
459 {
460 testWait();
461 testWaitTimeout();
462 }
463 }