1 /**
2  * The semaphore module provides a general use semaphore for synchronization.
3  *
4  * Copyright: Copyright (C) 2005-2006 Sean Kelly.  All rights reserved.
5  * License:   BSD style: $(LICENSE)
6  * Author:    Sean Kelly
7  */
8 module tango.core.sync.Semaphore;
9 
10 
11 public import tango.core.Exception : SyncException;
12 
13 import Time = tango.core.Time;
14 
15 version( Win32 )
16 {
17     private import tango.sys.win32.UserGdi;
18 }
19 else version( Posix )
20 {
21     private import tango.core.sync.Config;
22     private import tango.stdc.errno;
23     private import tango.stdc.posix.pthread;
24     private import tango.stdc.posix.semaphore;
25 }
26 
27 
28 ////////////////////////////////////////////////////////////////////////////////
29 // Semaphore
30 //
31 // void wait();
32 // void notify();
33 // bool tryWait();
34 ////////////////////////////////////////////////////////////////////////////////
35 
36 
37 /**
38  * This class represents a general counting semaphore as concieved by Edsger
39  * Dijkstra.  As per Mesa type monitors however, "signal" has been replaced
40  * with "notify" to indicate that control is not transferred to the waiter when
41  * a notification is sent.
42  */
43 class Semaphore
44 {
45     ////////////////////////////////////////////////////////////////////////////
46     // Initialization
47     ////////////////////////////////////////////////////////////////////////////
48 
49 
50     /**
51      * Initializes a semaphore object with the specified initial count.
52      *
53      * Params:
54      *  count = The initial count for the semaphore.
55      *
56      * Throws:
57      *  SyncException on error.
58      */
59     this( uint count = 0 )
60     {
61         version( Win32 )
62         {
63             m_hndl = CreateSemaphoreA( null, count, int.max, null );
64             if( m_hndl == m_hndl.init )
65                 throw new SyncException( "Unable to create semaphore" );
66         }
67         else version(OSX){
68             creatingTask=mach_task_self();
69             auto rc=semaphore_create(creatingTask,
70                                    &m_hndl,
71                                    MACH_SYNC_POLICY.SYNC_POLICY_FIFO,
72                                    count);
73             if( rc )
74                 throw new SyncException( "Unable to create semaphore" );
75         }
76         else version( Posix )
77         {
78             int rc = sem_init( &m_hndl, 0, count );
79             if( rc )
80                 throw new SyncException( "Unable to create semaphore" );
81         }
82     }
83 
84 
85     ~this()
86     {
87         version( Win32 )
88         {
89             BOOL rc = CloseHandle( m_hndl );
90             assert( rc, "Unable to destroy semaphore" );
91         }
92 		else version(OSX)
93 		{
94             auto rc=semaphore_destroy(creatingTask, m_hndl);
95             assert( !rc, "Unable to destroy semaphore" );
96         }
97 		else version( Posix )
98         {
99             int rc = sem_destroy( &m_hndl );
100             assert( !rc, "Unable to destroy semaphore" );
101         }
102     }
103 
104 
105     ////////////////////////////////////////////////////////////////////////////
106     // General Actions
107     ////////////////////////////////////////////////////////////////////////////
108 
109 
110     /**
111      * Wait until the current count is above zero, then atomically decrement
112      * the count by one and return.
113      *
114      * Throws:
115      *  SyncException on error.
116      */
117     void wait()
118     {
119         version( Win32 )
120         {
121             DWORD rc = WaitForSingleObject( m_hndl, INFINITE );
122             if( rc != WAIT_OBJECT_0 )
123                 throw new SyncException( "Unable to wait for semaphore" );
124         }
125         else version(OSX){
126             while (true){
127                 auto rc=semaphore_wait(m_hndl);
128                 if (rc==KERN_RETURN.ABORTED){
129                     if( errno != EINTR )
130                         throw new SyncException( "Unable to wait for semaphore (abort)" );
131                     // wait again
132                 } else if (rc!=0) {
133                     throw new SyncException( "Unable to wait for semaphore" );
134                 } else {
135                     break;
136                 }
137             }
138         }
139         else version( Posix )
140         {
141             while( true )
142             {
143                 if( !sem_wait( &m_hndl ) )
144                     return;
145                 if( errno != EINTR )
146                     throw new SyncException( "Unable to wait for semaphore" );
147             }
148         }
149     }
150 
151 
152     /**
153      * Suspends the calling thread until the current count moves above zero or
154      * until the supplied time period has elapsed.  If the count moves above
155      * zero in this interval, then atomically decrement the count by one and
156      * return true.  Otherwise, return false.  The supplied period may be up to
157      * a maximum of (uint.max - 1) milliseconds.
158      *
159      * Params:
160      *  period = The number of seconds to wait.
161      *
162      * In:
163      *  period must be less than (uint.max - 1) milliseconds.
164      *
165      * Returns:
166      *  true if notified before the timeout and false if not.
167      *
168      * Throws:
169      *  SyncException on error.
170      */
171     bool wait( double period )
172     in
173     {
174         assert( period * 1000 + 0.1 < uint.max - 1);
175     }
176     body
177     {
178         version( Win32 )
179         {
180             DWORD t = cast(DWORD)(period * 1000 + 0.1);
181             switch( WaitForSingleObject( m_hndl, t ) )
182             {
183             case WAIT_OBJECT_0:
184                 return true;
185             case WAIT_TIMEOUT:
186                 return false;
187             default:
188                 throw new SyncException( "Unable to wait for semaphore" );
189             }
190         }
191         else version(OSX){
192             timespec t;
193 
194             adjTimespec( t, period );
195             auto rc=semaphore_timedwait(m_hndl,t);
196             if (rc==0){
197                 return true;
198             } else if (rc==KERN_RETURN.OPERATION_TIMED_OUT){
199                 return false;
200             } else if (rc==KERN_RETURN.ABORTED) {
201                 if( errno != EINTR )
202                     throw new SyncException( "Unable to wait for semaphore (abort)" );
203                 return false; // wait can be too short, wait is not resumed
204             } else {
205                 throw new SyncException( "Unable to wait for semaphore" );
206             }
207         }
208         else version( Posix )
209         {
210             timespec t;
211 
212             getTimespec( t );
213             adjTimespec( t, period );
214 
215             while( true )
216             {
217                 if( !sem_timedwait( &m_hndl, &t ) )
218                     return true;
219                 if( errno == ETIMEDOUT )
220                     return false;
221                 if( errno != EINTR )
222                     throw new SyncException( "Unable to wait for semaphore" );
223             }
224         }
225 
226         // -w trip
227         //return false;
228     }
229 
230 
231     /**
232      * Atomically increment the current count by one.  This will notify one
233      * waiter, if there are any in the queue.
234      *
235      * Throws:
236      *  SyncException on error.
237      */
238     void notify()
239     {
240         version( Win32 )
241         {
242             if( !ReleaseSemaphore( m_hndl, 1, null ) )
243                 throw new SyncException( "Unable to notify semaphore" );
244         }
245         else version(OSX){
246             semaphore_signal(m_hndl);
247         }
248         else version( Posix )
249         {
250             int rc = sem_post( &m_hndl );
251             if( rc )
252                 throw new SyncException( "Unable to notify semaphore" );
253         }
254     }
255 
256 
257     /**
258      * If the current count is equal to zero, return.  Otherwise, atomically
259      * decrement the count by one and return true.
260      *
261      * Returns:
262      *  true if the count was above zero and false if not.
263      *
264      * Throws:
265      *  SyncException on error.
266      */
267     bool tryWait()
268     {
269         version( Win32 )
270         {
271             switch( WaitForSingleObject( m_hndl, 0 ) )
272             {
273             case WAIT_OBJECT_0:
274                 return true;
275             case WAIT_TIMEOUT:
276                 return false;
277             default:
278                 throw new SyncException( "Unable to wait for semaphore" );
279             }
280         }
281         else version(OSX){
282             return wait(0.0);
283         }
284         else version( Posix )
285         {
286             while( true )
287             {
288                 if( !sem_trywait( &m_hndl ) )
289                     return true;
290                 if( errno == EAGAIN )
291                     return false;
292                 if( errno != EINTR )
293                     throw new SyncException( "Unable to wait for semaphore" );
294             }
295         }
296 
297         // -w trip
298         //return false;
299     }
300 
301 
302 private:
303     version( Win32 )
304     {
305         HANDLE  m_hndl;
306     }
307     else version(OSX){
308         task_t creatingTask;
309         semaphore_t m_hndl;
310     }
311     else version( Posix )
312     {
313         sem_t   m_hndl;
314     }
315 }
316 
317 
318 ////////////////////////////////////////////////////////////////////////////////
319 // Unit Tests
320 ////////////////////////////////////////////////////////////////////////////////
321 
322 
323 debug( UnitTest )
324 {
325     private import tango.core.Thread;
326 
327 
328     void testWait()
329     {
330         auto semaphore    = new Semaphore;
331         int  numToProduce = 10;
332         bool allProduced  = false;
333         auto synProduced  = new Object;
334         int  numConsumed  = 0;
335         auto synConsumed  = new Object;
336         int  numConsumers = 10;
337         int  numComplete  = 0;
338         auto synComplete  = new Object;
339 
340         void consumer()
341         {
342             while( true )
343             {
344                 semaphore.wait();
345 
346                 synchronized( synProduced )
347                 {
348                     if( allProduced )
349                         break;
350                 }
351 
352                 synchronized( synConsumed )
353                 {
354                     ++numConsumed;
355                 }
356             }
357 
358             synchronized( synComplete )
359             {
360                 ++numComplete;
361             }
362         }
363 
364         void producer()
365         {
366             assert( !semaphore.tryWait() );
367 
368             for( int i = 0; i < numToProduce; ++i )
369             {
370                 semaphore.notify();
371                 Thread.yield();
372             }
373             Thread.sleep(Time.seconds(1));
374             synchronized( synProduced )
375             {
376                 allProduced = true;
377             }
378 
379             for( int i = 0; i < numConsumers; ++i )
380             {
381                 semaphore.notify();
382                 Thread.yield();
383             }
384 
385             for( int i = numConsumers * 10000; i > 0; --i )
386             {
387                 synchronized( synComplete )
388                 {
389                     if( numComplete == numConsumers )
390                         break;
391                 }
392                 Thread.yield();
393             }
394 
395             synchronized( synComplete )
396             {
397                 assert( numComplete == numConsumers );
398             }
399 
400             synchronized( synConsumed )
401             {
402                 assert( numConsumed == numToProduce );
403             }
404 
405             assert( !semaphore.tryWait() );
406             semaphore.notify();
407             assert( semaphore.tryWait() );
408             assert( !semaphore.tryWait() );
409         }
410 
411         auto group = new ThreadGroup;
412 
413         for( int i = 0; i < numConsumers; ++i )
414             group.create( &consumer );
415         group.create( &producer );
416         group.joinAll();
417     }
418 
419 
420     void testWaitTimeout()
421     {
422         auto synReady   = new Object;
423         auto semReady   = new Semaphore;
424         bool waiting    = false;
425         bool alertedOne = true;
426         bool alertedTwo = true;
427 
428         void waiter()
429         {
430             synchronized( synReady )
431             {
432                 waiting    = true;
433             }
434             alertedOne = semReady.wait( 0.1 );
435             alertedTwo = semReady.wait( 0.1 );
436         }
437 
438         auto thread = new Thread( &waiter );
439         thread.start();
440 
441         while( true )
442         {
443             synchronized( synReady )
444             {
445                 if( waiting )
446                 {
447                     semReady.notify();
448                     break;
449                 }
450             }
451             Thread.yield();
452         }
453         thread.join();
454         assert( waiting && alertedOne && !alertedTwo );
455     }
456 
457 
458     unittest
459     {
460         testWait();
461         testWaitTimeout();
462     }
463 }