1 /** 2 * The condition module provides a primitive for synchronized condition 3 * checking. 4 * 5 * Copyright: Copyright (C) 2005-2006 Sean Kelly. All rights reserved. 6 * License: BSD style: $(LICENSE) 7 * Author: Sean Kelly 8 */ 9 module tango.core.sync.Condition; 10 11 12 public import tango.core.Exception : SyncException; 13 public import tango.core.sync.Mutex; 14 15 version( Win32 ) 16 { 17 private import tango.core.sync.Semaphore; 18 private import tango.sys.win32.UserGdi; 19 } 20 else version( Posix ) 21 { 22 private import tango.core.sync.Config; 23 private import tango.stdc.errno; 24 private import tango.stdc.posix.pthread; 25 private import tango.stdc.posix.time; 26 } 27 28 29 //////////////////////////////////////////////////////////////////////////////// 30 // Condition 31 // 32 // void wait(); 33 // void notify(); 34 // void notifyAll(); 35 //////////////////////////////////////////////////////////////////////////////// 36 37 38 /** 39 * This class represents a condition variable as concieved by C.A.R. Hoare. As 40 * per Mesa type monitors however, "signal" has been replaced with "notify" to 41 * indicate that control is not transferred to the waiter when a notification 42 * is sent. 43 */ 44 class Condition 45 { 46 //////////////////////////////////////////////////////////////////////////// 47 // Initialization 48 //////////////////////////////////////////////////////////////////////////// 49 50 51 /** 52 * Initializes a condition object which is associated with the supplied 53 * mutex object. 54 * 55 * Params: 56 * m = The mutex with which this condition will be associated. 57 * 58 * Throws: 59 * SyncException on error. 60 */ 61 this( Mutex m ) 62 { 63 version( Win32 ) 64 { 65 m_blockLock = CreateSemaphoreA( null, 1, 1, null ); 66 if( m_blockLock == m_blockLock.init ) 67 throw new SyncException( "Unable to initialize condition" ); 68 scope(failure) CloseHandle( m_blockLock ); 69 70 m_blockQueue = CreateSemaphoreA( null, 0, int.max, null ); 71 if( m_blockQueue == m_blockQueue.init ) 72 throw new SyncException( "Unable to initialize condition" ); 73 scope(failure) CloseHandle( m_blockQueue ); 74 75 InitializeCriticalSection( &m_unblockLock ); 76 m_assocMutex = m; 77 } 78 else version( Posix ) 79 { 80 m_mutexAddr = m.handleAddr(); 81 82 int rc = pthread_cond_init( &m_hndl, null ); 83 if( rc ) 84 throw new SyncException( "Unable to initialize condition" ); 85 } 86 } 87 88 89 ~this() 90 { 91 version( Win32 ) 92 { 93 BOOL rc = CloseHandle( m_blockLock ); 94 assert( rc, "Unable to destroy condition" ); 95 rc = CloseHandle( m_blockQueue ); 96 assert( rc, "Unable to destroy condition" ); 97 DeleteCriticalSection( &m_unblockLock ); 98 } 99 else version( Posix ) 100 { 101 int rc = pthread_cond_destroy( &m_hndl ); 102 assert( !rc, "Unable to destroy condition" ); 103 } 104 } 105 106 107 //////////////////////////////////////////////////////////////////////////// 108 // General Actions 109 //////////////////////////////////////////////////////////////////////////// 110 111 112 /** 113 * Wait until notified. 114 * 115 * Throws: 116 * SyncException on error. 117 */ 118 void wait() 119 { 120 version( Win32 ) 121 { 122 timedWait( INFINITE ); 123 } 124 else version( Posix ) 125 { 126 int rc = pthread_cond_wait( &m_hndl, m_mutexAddr ); 127 if( rc ) 128 throw new SyncException( "Unable to wait for condition" ); 129 } 130 } 131 132 133 /** 134 * Suspends the calling thread until a notification occurs or until the 135 * supplied time period has elapsed. The supplied period may be up to a 136 * maximum of (uint.max - 1) milliseconds. 137 * 138 * Params: 139 * period = The time to wait, in seconds (fractional values are accepted). 140 * 141 * In: 142 * period must be less than (uint.max - 1) milliseconds. 143 * 144 * Returns: 145 * true if notified before the timeout and false if not. 146 * 147 * Throws: 148 * SyncException on error. 149 */ 150 bool wait( double period ) 151 in 152 { 153 // NOTE: The fractional value added to period is to correct fp error. 154 assert( period * 1000 + 0.1 < uint.max - 1 ); 155 } 156 body 157 { 158 version( Win32 ) 159 { 160 return timedWait( cast(uint)(period * 1000 + 0.1) ); 161 } 162 else version( Posix ) 163 { 164 timespec t; 165 166 getTimespec( t ); 167 adjTimespec( t, period ); 168 int rc = pthread_cond_timedwait( &m_hndl, m_mutexAddr, &t ); 169 if( !rc ) 170 return true; 171 if( rc == ETIMEDOUT ) 172 return false; 173 throw new SyncException( "Unable to wait for condition" ); 174 } 175 } 176 177 /** 178 * Notifies one waiter. 179 * 180 * Throws: 181 * SyncException on error. 182 */ 183 void notify() 184 { 185 version( Win32 ) 186 { 187 notify( false ); 188 } 189 else version( Posix ) 190 { 191 int rc = pthread_cond_signal( &m_hndl ); 192 if( rc ) 193 throw new SyncException( "Unable to notify condition" ); 194 } 195 } 196 197 198 /** 199 * Notifies all waiters. 200 * 201 * Throws: 202 * SyncException on error. 203 */ 204 void notifyAll() 205 { 206 version( Win32 ) 207 { 208 notify( true ); 209 } 210 else version( Posix ) 211 { 212 int rc = pthread_cond_broadcast( &m_hndl ); 213 if( rc ) 214 throw new SyncException( "Unable to notify condition" ); 215 } 216 } 217 218 219 private: 220 version( Win32 ) 221 { 222 bool timedWait( DWORD timeout ) 223 { 224 int numSignalsLeft; 225 int numWaitersGone; 226 DWORD rc; 227 228 rc = WaitForSingleObject( m_blockLock, INFINITE ); 229 assert( rc == WAIT_OBJECT_0 ); 230 231 m_numWaitersBlocked++; 232 233 rc = ReleaseSemaphore( m_blockLock, 1, null ); 234 assert( rc ); 235 236 m_assocMutex.unlock(); 237 scope(failure) m_assocMutex.lock(); 238 239 rc = WaitForSingleObject( m_blockQueue, timeout ); 240 assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT ); 241 bool timedOut = (rc == WAIT_TIMEOUT); 242 243 EnterCriticalSection( &m_unblockLock ); 244 scope(failure) LeaveCriticalSection( &m_unblockLock ); 245 246 if( (numSignalsLeft = m_numWaitersToUnblock) != 0 ) 247 { 248 if ( timedOut ) 249 { 250 // timeout (or canceled) 251 if( m_numWaitersBlocked != 0 ) 252 { 253 m_numWaitersBlocked--; 254 // do not unblock next waiter below (already unblocked) 255 numSignalsLeft = 0; 256 } 257 else 258 { 259 // spurious wakeup pending!! 260 m_numWaitersGone = 1; 261 } 262 } 263 if( --m_numWaitersToUnblock == 0 ) 264 { 265 if( m_numWaitersBlocked != 0 ) 266 { 267 // open the gate 268 rc = ReleaseSemaphore( m_blockLock, 1, null ); 269 assert( rc ); 270 // do not open the gate below again 271 numSignalsLeft = 0; 272 } 273 else if( (numWaitersGone = m_numWaitersGone) != 0 ) 274 { 275 m_numWaitersGone = 0; 276 } 277 } 278 } 279 else if( ++m_numWaitersGone == int.max / 2 ) 280 { 281 // timeout/canceled or spurious event :-) 282 rc = WaitForSingleObject( m_blockLock, INFINITE ); 283 assert( rc == WAIT_OBJECT_0 ); 284 // something is going on here - test of timeouts? 285 m_numWaitersBlocked -= m_numWaitersGone; 286 rc = ReleaseSemaphore( m_blockLock, 1, null ); 287 assert( rc == WAIT_OBJECT_0 ); 288 m_numWaitersGone = 0; 289 } 290 291 LeaveCriticalSection( &m_unblockLock ); 292 293 if( numSignalsLeft == 1 ) 294 { 295 // better now than spurious later (same as ResetEvent) 296 for( ; numWaitersGone > 0; --numWaitersGone ) 297 { 298 rc = WaitForSingleObject( m_blockQueue, INFINITE ); 299 assert( rc == WAIT_OBJECT_0 ); 300 } 301 // open the gate 302 rc = ReleaseSemaphore( m_blockLock, 1, null ); 303 assert( rc ); 304 } 305 else if( numSignalsLeft != 0 ) 306 { 307 // unblock next waiter 308 rc = ReleaseSemaphore( m_blockQueue, 1, null ); 309 assert( rc ); 310 } 311 m_assocMutex.lock(); 312 return !timedOut; 313 } 314 315 316 void notify( bool all ) 317 { 318 DWORD rc; 319 320 EnterCriticalSection( &m_unblockLock ); 321 scope(failure) LeaveCriticalSection( &m_unblockLock ); 322 323 if( m_numWaitersToUnblock != 0 ) 324 { 325 if( m_numWaitersBlocked == 0 ) 326 { 327 LeaveCriticalSection( &m_unblockLock ); 328 return; 329 } 330 if( all ) 331 { 332 m_numWaitersToUnblock += m_numWaitersBlocked; 333 m_numWaitersBlocked = 0; 334 } 335 else 336 { 337 m_numWaitersToUnblock++; 338 m_numWaitersBlocked--; 339 } 340 LeaveCriticalSection( &m_unblockLock ); 341 } 342 else if( m_numWaitersBlocked > m_numWaitersGone ) 343 { 344 rc = WaitForSingleObject( m_blockLock, INFINITE ); 345 assert( rc == WAIT_OBJECT_0 ); 346 if( 0 != m_numWaitersGone ) 347 { 348 m_numWaitersBlocked -= m_numWaitersGone; 349 m_numWaitersGone = 0; 350 } 351 if( all ) 352 { 353 m_numWaitersToUnblock = m_numWaitersBlocked; 354 m_numWaitersBlocked = 0; 355 } 356 else 357 { 358 m_numWaitersToUnblock = 1; 359 m_numWaitersBlocked--; 360 } 361 LeaveCriticalSection( &m_unblockLock ); 362 rc = ReleaseSemaphore( m_blockQueue, 1, null ); 363 assert( rc ); 364 } 365 else 366 { 367 LeaveCriticalSection( &m_unblockLock ); 368 } 369 } 370 371 372 // NOTE: This implementation uses Algorithm 8c as described here: 373 // http://groups.google.com/group/comp.programming.threads/ 374 // browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a 375 HANDLE m_blockLock; // auto-reset event (now semaphore) 376 HANDLE m_blockQueue; // auto-reset event (now semaphore) 377 Mutex m_assocMutex; // external mutex/CS 378 CRITICAL_SECTION m_unblockLock; // internal mutex/CS 379 int m_numWaitersGone = 0; 380 int m_numWaitersBlocked = 0; 381 int m_numWaitersToUnblock = 0; 382 } 383 else version( Posix ) 384 { 385 pthread_cond_t m_hndl; 386 pthread_mutex_t* m_mutexAddr; 387 } 388 } 389 390 391 //////////////////////////////////////////////////////////////////////////////// 392 // Unit Tests 393 //////////////////////////////////////////////////////////////////////////////// 394 395 396 debug( UnitTest ) 397 { 398 private import tango.core.Thread; 399 private import tango.core.sync.Mutex; 400 private import tango.core.sync.Semaphore; 401 402 403 void testNotify() 404 { 405 auto mutex = new Mutex; 406 auto condReady = new Condition( mutex ); 407 auto semDone = new Semaphore; 408 auto synLoop = new Object; 409 int numWaiters = 10; 410 int numTries = 10; 411 int numReady = 0; 412 int numTotal = 0; 413 int numDone = 0; 414 int numPost = 0; 415 416 void waiter() 417 { 418 for( int i = 0; i < numTries; ++i ) 419 { 420 synchronized( mutex ) 421 { 422 while( numReady < 1 ) 423 { 424 condReady.wait(); 425 } 426 --numReady; 427 ++numTotal; 428 } 429 430 synchronized( synLoop ) 431 { 432 ++numDone; 433 } 434 semDone.wait(); 435 } 436 } 437 438 auto group = new ThreadGroup; 439 440 for( int i = 0; i < numWaiters; ++i ) 441 group.create( &waiter ); 442 443 for( int i = 0; i < numTries; ++i ) 444 { 445 for( int j = 0; j < numWaiters; ++j ) 446 { 447 synchronized( mutex ) 448 { 449 ++numReady; 450 condReady.notify(); 451 } 452 } 453 while( true ) 454 { 455 synchronized( synLoop ) 456 { 457 if( numDone >= numWaiters ) 458 break; 459 } 460 Thread.yield(); 461 } 462 for( int j = 0; j < numWaiters; ++j ) 463 { 464 semDone.notify(); 465 } 466 } 467 468 group.joinAll(); 469 assert( numTotal == numWaiters * numTries ); 470 } 471 472 473 void testNotifyAll() 474 { 475 auto mutex = new Mutex; 476 auto condReady = new Condition( mutex ); 477 int numWaiters = 10; 478 int numReady = 0; 479 int numDone = 0; 480 bool alert = false; 481 482 void waiter() 483 { 484 synchronized( mutex ) 485 { 486 ++numReady; 487 while( !alert ) 488 condReady.wait(); 489 ++numDone; 490 } 491 } 492 493 auto group = new ThreadGroup; 494 495 for( int i = 0; i < numWaiters; ++i ) 496 group.create( &waiter ); 497 498 while( true ) 499 { 500 synchronized( mutex ) 501 { 502 if( numReady >= numWaiters ) 503 { 504 alert = true; 505 condReady.notifyAll(); 506 break; 507 } 508 } 509 Thread.yield(); 510 } 511 group.joinAll(); 512 assert( numReady == numWaiters && numDone == numWaiters ); 513 } 514 515 516 void testWaitTimeout() 517 { 518 auto mutex = new Mutex; 519 auto condReady = new Condition( mutex ); 520 bool waiting = false; 521 bool alertedOne = true; 522 bool alertedTwo = true; 523 524 void waiter() 525 { 526 synchronized( mutex ) 527 { 528 waiting = true; 529 alertedOne = condReady.wait( 1 ); 530 alertedTwo = condReady.wait( 1 ); 531 } 532 } 533 534 auto thread = new Thread( &waiter ); 535 thread.start(); 536 537 while( true ) 538 { 539 synchronized( mutex ) 540 { 541 if( waiting ) 542 { 543 condReady.notify(); 544 break; 545 } 546 } 547 Thread.yield(); 548 } 549 thread.join(); 550 assert( waiting && alertedOne && !alertedTwo ); 551 } 552 553 554 unittest 555 { 556 testNotify(); 557 testNotifyAll(); 558 testWaitTimeout(); 559 } 560 }