1 /**
2  * The condition module provides a primitive for synchronized condition
3  * checking.
4  *
5  * Copyright: Copyright (C) 2005-2006 Sean Kelly.  All rights reserved.
6  * License:   BSD style: $(LICENSE)
7  * Author:    Sean Kelly
8  */
9 module tango.core.sync.Condition;
10 
11 
12 public import tango.core.Exception : SyncException;
13 public import tango.core.sync.Mutex;
14 
15 version( Win32 )
16 {
17     private import tango.core.sync.Semaphore;
18     private import tango.sys.win32.UserGdi;
19 }
20 else version( Posix )
21 {
22     private import tango.core.sync.Config;
23     private import tango.stdc.errno;
24     private import tango.stdc.posix.pthread;
25     private import tango.stdc.posix.time;
26 }
27 
28 
29 ////////////////////////////////////////////////////////////////////////////////
30 // Condition
31 //
32 // void wait();
33 // void notify();
34 // void notifyAll();
35 ////////////////////////////////////////////////////////////////////////////////
36 
37 
38 /**
39  * This class represents a condition variable as concieved by C.A.R. Hoare.  As
40  * per Mesa type monitors however, "signal" has been replaced with "notify" to
41  * indicate that control is not transferred to the waiter when a notification
42  * is sent.
43  */
44 class Condition
45 {
46     ////////////////////////////////////////////////////////////////////////////
47     // Initialization
48     ////////////////////////////////////////////////////////////////////////////
49 
50 
51     /**
52      * Initializes a condition object which is associated with the supplied
53      * mutex object.
54      *
55      * Params:
56      *  m = The mutex with which this condition will be associated.
57      *
58      * Throws:
59      *  SyncException on error.
60      */
61     this( Mutex m )
62     {
63         version( Win32 )
64         {
65             m_blockLock = CreateSemaphoreA( null, 1, 1, null );
66             if( m_blockLock == m_blockLock.init )
67                 throw new SyncException( "Unable to initialize condition" );
68             scope(failure) CloseHandle( m_blockLock );
69 
70             m_blockQueue = CreateSemaphoreA( null, 0, int.max, null );
71             if( m_blockQueue == m_blockQueue.init )
72                 throw new SyncException( "Unable to initialize condition" );
73             scope(failure) CloseHandle( m_blockQueue );
74 
75             InitializeCriticalSection( &m_unblockLock );
76             m_assocMutex = m;
77         }
78         else version( Posix )
79         {
80             m_mutexAddr = m.handleAddr();
81 
82             int rc = pthread_cond_init( &m_hndl, null );
83             if( rc )
84                 throw new SyncException( "Unable to initialize condition" );
85         }
86     }
87 
88 
89     ~this()
90     {
91         version( Win32 )
92         {
93             BOOL rc = CloseHandle( m_blockLock );
94             assert( rc, "Unable to destroy condition" );
95             rc = CloseHandle( m_blockQueue );
96             assert( rc, "Unable to destroy condition" );
97             DeleteCriticalSection( &m_unblockLock );
98         }
99         else version( Posix )
100         {
101             int rc = pthread_cond_destroy( &m_hndl );
102             assert( !rc, "Unable to destroy condition" );
103         }
104     }
105 
106 
107     ////////////////////////////////////////////////////////////////////////////
108     // General Actions
109     ////////////////////////////////////////////////////////////////////////////
110 
111 
112     /**
113      * Wait until notified.
114      *
115      * Throws:
116      *  SyncException on error.
117      */
118     void wait()
119     {
120         version( Win32 )
121         {
122             timedWait( INFINITE );
123         }
124         else version( Posix )
125         {
126             int rc = pthread_cond_wait( &m_hndl, m_mutexAddr );
127             if( rc )
128                 throw new SyncException( "Unable to wait for condition" );
129         }
130     }
131 
132 
133     /**
134      * Suspends the calling thread until a notification occurs or until the
135      * supplied time period has elapsed.  The supplied period may be up to a
136      * maximum of (uint.max - 1) milliseconds.
137      *
138      * Params:
139      *  period = The time to wait, in seconds (fractional values are accepted).
140      *
141      * In:
142      *  period must be less than (uint.max - 1) milliseconds.
143      *
144      * Returns:
145      *  true if notified before the timeout and false if not.
146      *
147      * Throws:
148      *  SyncException on error.
149      */
150     bool wait( double period )
151     in
152     {
153         // NOTE: The fractional value added to period is to correct fp error.
154         assert( period * 1000 + 0.1 < uint.max - 1 );
155     }
156     body
157     {
158         version( Win32 )
159         {
160             return timedWait( cast(uint)(period * 1000 + 0.1) );
161         }
162         else version( Posix )
163         {
164             timespec t;
165 
166             getTimespec( t );
167             adjTimespec( t, period );
168             int rc = pthread_cond_timedwait( &m_hndl, m_mutexAddr, &t );
169             if( !rc )
170                 return true;
171             if( rc == ETIMEDOUT )
172                 return false;
173             throw new SyncException( "Unable to wait for condition" );
174         }
175     }
176 
177     /**
178      * Notifies one waiter.
179      *
180      * Throws:
181      *  SyncException on error.
182      */
183     void notify()
184     {
185         version( Win32 )
186         {
187             notify( false );
188         }
189         else version( Posix )
190         {
191             int rc = pthread_cond_signal( &m_hndl );
192             if( rc )
193                 throw new SyncException( "Unable to notify condition" );
194         }
195     }
196 
197 
198     /**
199      * Notifies all waiters.
200      *
201      * Throws:
202      *  SyncException on error.
203      */
204     void notifyAll()
205     {
206         version( Win32 )
207         {
208             notify( true );
209         }
210         else version( Posix )
211         {
212             int rc = pthread_cond_broadcast( &m_hndl );
213             if( rc )
214                 throw new SyncException( "Unable to notify condition" );
215         }
216     }
217 
218 
219 private:
220     version( Win32 )
221     {
222         bool timedWait( DWORD timeout )
223         {
224             int   numSignalsLeft;
225             int   numWaitersGone;
226             DWORD rc;
227 
228             rc = WaitForSingleObject( m_blockLock, INFINITE );
229             assert( rc == WAIT_OBJECT_0 );
230 
231             m_numWaitersBlocked++;
232 
233             rc = ReleaseSemaphore( m_blockLock, 1, null );
234             assert( rc );
235 
236             m_assocMutex.unlock();
237             scope(failure) m_assocMutex.lock();
238 
239             rc = WaitForSingleObject( m_blockQueue, timeout );
240             assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT );
241             bool timedOut = (rc == WAIT_TIMEOUT);
242 
243             EnterCriticalSection( &m_unblockLock );
244             scope(failure) LeaveCriticalSection( &m_unblockLock );
245 
246             if( (numSignalsLeft = m_numWaitersToUnblock) != 0 )
247             {
248                 if ( timedOut )
249                 {
250                     // timeout (or canceled)
251                     if( m_numWaitersBlocked != 0 )
252                     {
253                         m_numWaitersBlocked--;
254                         // do not unblock next waiter below (already unblocked)
255                         numSignalsLeft = 0;
256                     }
257                     else
258                     {
259                         // spurious wakeup pending!!
260                         m_numWaitersGone = 1;
261                     }
262                 }
263                 if( --m_numWaitersToUnblock == 0 )
264                 {
265                     if( m_numWaitersBlocked != 0 )
266                     {
267                         // open the gate
268                         rc = ReleaseSemaphore( m_blockLock, 1, null );
269                         assert( rc );
270                         // do not open the gate below again
271                         numSignalsLeft = 0;
272                     }
273                     else if( (numWaitersGone = m_numWaitersGone) != 0 )
274                     {
275                         m_numWaitersGone = 0;
276                     }
277                 }
278             }
279             else if( ++m_numWaitersGone == int.max / 2 )
280             {
281                 // timeout/canceled or spurious event :-)
282                 rc = WaitForSingleObject( m_blockLock, INFINITE );
283                 assert( rc == WAIT_OBJECT_0 );
284                 // something is going on here - test of timeouts?
285                 m_numWaitersBlocked -= m_numWaitersGone;
286                 rc = ReleaseSemaphore( m_blockLock, 1, null );
287                 assert( rc == WAIT_OBJECT_0 );
288                 m_numWaitersGone = 0;
289             }
290 
291             LeaveCriticalSection( &m_unblockLock );
292 
293             if( numSignalsLeft == 1 )
294             {
295                 // better now than spurious later (same as ResetEvent)
296                 for( ; numWaitersGone > 0; --numWaitersGone )
297                 {
298                     rc = WaitForSingleObject( m_blockQueue, INFINITE );
299                     assert( rc == WAIT_OBJECT_0 );
300                 }
301                 // open the gate
302                 rc = ReleaseSemaphore( m_blockLock, 1, null );
303                 assert( rc );
304             }
305             else if( numSignalsLeft != 0 )
306             {
307                 // unblock next waiter
308                 rc = ReleaseSemaphore( m_blockQueue, 1, null );
309                 assert( rc );
310             }
311             m_assocMutex.lock();
312             return !timedOut;
313         }
314 
315 
316         void notify( bool all )
317         {
318             DWORD rc;
319 
320             EnterCriticalSection( &m_unblockLock );
321             scope(failure) LeaveCriticalSection( &m_unblockLock );
322 
323             if( m_numWaitersToUnblock != 0 )
324             {
325                 if( m_numWaitersBlocked == 0 )
326                 {
327                     LeaveCriticalSection( &m_unblockLock );
328                     return;
329                 }
330                 if( all )
331                 {
332                     m_numWaitersToUnblock += m_numWaitersBlocked;
333                     m_numWaitersBlocked = 0;
334                 }
335                 else
336                 {
337                     m_numWaitersToUnblock++;
338                     m_numWaitersBlocked--;
339                 }
340                 LeaveCriticalSection( &m_unblockLock );
341             }
342             else if( m_numWaitersBlocked > m_numWaitersGone )
343             {
344                 rc = WaitForSingleObject( m_blockLock, INFINITE );
345                 assert( rc == WAIT_OBJECT_0 );
346                 if( 0 != m_numWaitersGone )
347                 {
348                     m_numWaitersBlocked -= m_numWaitersGone;
349                     m_numWaitersGone = 0;
350                 }
351                 if( all )
352                 {
353                     m_numWaitersToUnblock = m_numWaitersBlocked;
354                     m_numWaitersBlocked = 0;
355                 }
356                 else
357                 {
358                     m_numWaitersToUnblock = 1;
359                     m_numWaitersBlocked--;
360                 }
361                 LeaveCriticalSection( &m_unblockLock );
362                 rc = ReleaseSemaphore( m_blockQueue, 1, null );
363                 assert( rc );
364             }
365             else
366             {
367                 LeaveCriticalSection( &m_unblockLock );
368             }
369         }
370 
371 
372         // NOTE: This implementation uses Algorithm 8c as described here:
373         //       http://groups.google.com/group/comp.programming.threads/
374         //              browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a
375         HANDLE              m_blockLock;    // auto-reset event (now semaphore)
376         HANDLE              m_blockQueue;   // auto-reset event (now semaphore)
377         Mutex               m_assocMutex;   // external mutex/CS
378         CRITICAL_SECTION    m_unblockLock;  // internal mutex/CS
379         int                 m_numWaitersGone        = 0;
380         int                 m_numWaitersBlocked     = 0;
381         int                 m_numWaitersToUnblock   = 0;
382     }
383     else version( Posix )
384     {
385         pthread_cond_t      m_hndl;
386         pthread_mutex_t*    m_mutexAddr;
387     }
388 }
389 
390 
391 ////////////////////////////////////////////////////////////////////////////////
392 // Unit Tests
393 ////////////////////////////////////////////////////////////////////////////////
394 
395 
396 debug( UnitTest )
397 {
398     private import tango.core.Thread;
399     private import tango.core.sync.Mutex;
400     private import tango.core.sync.Semaphore;
401 
402 
403     void testNotify()
404     {
405         auto mutex      = new Mutex;
406         auto condReady  = new Condition( mutex );
407         auto semDone    = new Semaphore;
408         auto synLoop    = new Object;
409         int  numWaiters = 10;
410         int  numTries   = 10;
411         int  numReady   = 0;
412         int  numTotal   = 0;
413         int  numDone    = 0;
414         int  numPost    = 0;
415 
416         void waiter()
417         {
418             for( int i = 0; i < numTries; ++i )
419             {
420                 synchronized( mutex )
421                 {
422                     while( numReady < 1 )
423                     {
424                         condReady.wait();
425                     }
426                     --numReady;
427                     ++numTotal;
428                 }
429 
430                 synchronized( synLoop )
431                 {
432                     ++numDone;
433                 }
434                 semDone.wait();
435             }
436         }
437 
438         auto group = new ThreadGroup;
439 
440         for( int i = 0; i < numWaiters; ++i )
441             group.create( &waiter );
442 
443         for( int i = 0; i < numTries; ++i )
444         {
445             for( int j = 0; j < numWaiters; ++j )
446             {
447                 synchronized( mutex )
448                 {
449                     ++numReady;
450                     condReady.notify();
451                 }
452             }
453             while( true )
454             {
455                 synchronized( synLoop )
456                 {
457                     if( numDone >= numWaiters )
458                         break;
459                 }
460                 Thread.yield();
461             }
462             for( int j = 0; j < numWaiters; ++j )
463             {
464                 semDone.notify();
465             }
466         }
467 
468         group.joinAll();
469         assert( numTotal == numWaiters * numTries );
470     }
471 
472 
473     void testNotifyAll()
474     {
475         auto mutex      = new Mutex;
476         auto condReady  = new Condition( mutex );
477         int  numWaiters = 10;
478         int  numReady   = 0;
479         int  numDone    = 0;
480         bool alert      = false;
481 
482         void waiter()
483         {
484             synchronized( mutex )
485             {
486                 ++numReady;
487                 while( !alert )
488                     condReady.wait();
489                 ++numDone;
490             }
491         }
492 
493         auto group = new ThreadGroup;
494 
495         for( int i = 0; i < numWaiters; ++i )
496             group.create( &waiter );
497 
498         while( true )
499         {
500             synchronized( mutex )
501             {
502                 if( numReady >= numWaiters )
503                 {
504                     alert = true;
505                     condReady.notifyAll();
506                     break;
507                 }
508             }
509             Thread.yield();
510         }
511         group.joinAll();
512         assert( numReady == numWaiters && numDone == numWaiters );
513     }
514 
515 
516     void testWaitTimeout()
517     {
518         auto mutex      = new Mutex;
519         auto condReady  = new Condition( mutex );
520         bool waiting    = false;
521         bool alertedOne = true;
522         bool alertedTwo = true;
523 
524         void waiter()
525         {
526             synchronized( mutex )
527             {
528                 waiting    = true;
529                 alertedOne = condReady.wait( 1 );
530                 alertedTwo = condReady.wait( 1 );
531             }
532         }
533 
534         auto thread = new Thread( &waiter );
535         thread.start();
536 
537         while( true )
538         {
539             synchronized( mutex )
540             {
541                 if( waiting )
542                 {
543                     condReady.notify();
544                     break;
545                 }
546             }
547             Thread.yield();
548         }
549         thread.join();
550         assert( waiting && alertedOne && !alertedTwo );
551     }
552 
553 
554     unittest
555     {
556         testNotify();
557         testNotifyAll();
558         testWaitTimeout();
559     }
560 }