1 /** 2 * The semaphore module provides a general use semaphore for synchronization. 3 * 4 * Copyright: Copyright (C) 2005-2006 Sean Kelly. All rights reserved. 5 * License: BSD style: $(LICENSE) 6 * Author: Sean Kelly 7 */ 8 module tango.core.sync.Semaphore; 9 10 11 public import tango.core.Exception : SyncException; 12 13 import Time = tango.core.Time; 14 15 version( Win32 ) 16 { 17 private import tango.sys.win32.UserGdi; 18 } 19 else version( Posix ) 20 { 21 private import tango.core.sync.Config; 22 private import tango.stdc.errno; 23 private import tango.stdc.posix.pthread; 24 private import tango.stdc.posix.semaphore; 25 } 26 27 28 //////////////////////////////////////////////////////////////////////////////// 29 // Semaphore 30 // 31 // void wait(); 32 // void notify(); 33 // bool tryWait(); 34 //////////////////////////////////////////////////////////////////////////////// 35 36 37 /** 38 * This class represents a general counting semaphore as concieved by Edsger 39 * Dijkstra. As per Mesa type monitors however, "signal" has been replaced 40 * with "notify" to indicate that control is not transferred to the waiter when 41 * a notification is sent. 42 */ 43 class Semaphore 44 { 45 //////////////////////////////////////////////////////////////////////////// 46 // Initialization 47 //////////////////////////////////////////////////////////////////////////// 48 49 50 /** 51 * Initializes a semaphore object with the specified initial count. 52 * 53 * Params: 54 * count = The initial count for the semaphore. 55 * 56 * Throws: 57 * SyncException on error. 58 */ 59 this( uint count = 0 ) 60 { 61 version( Win32 ) 62 { 63 m_hndl = CreateSemaphoreA( null, count, int.max, null ); 64 if( m_hndl == m_hndl.init ) 65 throw new SyncException( "Unable to create semaphore" ); 66 } 67 else version(OSX){ 68 creatingTask=mach_task_self(); 69 auto rc=semaphore_create(creatingTask, 70 &m_hndl, 71 MACH_SYNC_POLICY.SYNC_POLICY_FIFO, 72 count); 73 if( rc ) 74 throw new SyncException( "Unable to create semaphore" ); 75 } 76 else version( Posix ) 77 { 78 int rc = sem_init( &m_hndl, 0, count ); 79 if( rc ) 80 throw new SyncException( "Unable to create semaphore" ); 81 } 82 } 83 84 85 ~this() 86 { 87 version( Win32 ) 88 { 89 BOOL rc = CloseHandle( m_hndl ); 90 assert( rc, "Unable to destroy semaphore" ); 91 } 92 else version(OSX) 93 { 94 auto rc=semaphore_destroy(creatingTask, m_hndl); 95 assert( !rc, "Unable to destroy semaphore" ); 96 } 97 else version( Posix ) 98 { 99 int rc = sem_destroy( &m_hndl ); 100 assert( !rc, "Unable to destroy semaphore" ); 101 } 102 } 103 104 105 //////////////////////////////////////////////////////////////////////////// 106 // General Actions 107 //////////////////////////////////////////////////////////////////////////// 108 109 110 /** 111 * Wait until the current count is above zero, then atomically decrement 112 * the count by one and return. 113 * 114 * Throws: 115 * SyncException on error. 116 */ 117 void wait() 118 { 119 version( Win32 ) 120 { 121 DWORD rc = WaitForSingleObject( m_hndl, INFINITE ); 122 if( rc != WAIT_OBJECT_0 ) 123 throw new SyncException( "Unable to wait for semaphore" ); 124 } 125 else version(OSX){ 126 while (true){ 127 auto rc=semaphore_wait(m_hndl); 128 if (rc==KERN_RETURN.ABORTED){ 129 if( errno != EINTR ) 130 throw new SyncException( "Unable to wait for semaphore (abort)" ); 131 // wait again 132 } else if (rc!=0) { 133 throw new SyncException( "Unable to wait for semaphore" ); 134 } else { 135 break; 136 } 137 } 138 } 139 else version( Posix ) 140 { 141 while( true ) 142 { 143 if( !sem_wait( &m_hndl ) ) 144 return; 145 if( errno != EINTR ) 146 throw new SyncException( "Unable to wait for semaphore" ); 147 } 148 } 149 } 150 151 152 /** 153 * Suspends the calling thread until the current count moves above zero or 154 * until the supplied time period has elapsed. If the count moves above 155 * zero in this interval, then atomically decrement the count by one and 156 * return true. Otherwise, return false. The supplied period may be up to 157 * a maximum of (uint.max - 1) milliseconds. 158 * 159 * Params: 160 * period = The number of seconds to wait. 161 * 162 * In: 163 * period must be less than (uint.max - 1) milliseconds. 164 * 165 * Returns: 166 * true if notified before the timeout and false if not. 167 * 168 * Throws: 169 * SyncException on error. 170 */ 171 bool wait( double period ) 172 in 173 { 174 assert( period * 1000 + 0.1 < uint.max - 1); 175 } 176 body 177 { 178 version( Win32 ) 179 { 180 DWORD t = cast(DWORD)(period * 1000 + 0.1); 181 switch( WaitForSingleObject( m_hndl, t ) ) 182 { 183 case WAIT_OBJECT_0: 184 return true; 185 case WAIT_TIMEOUT: 186 return false; 187 default: 188 throw new SyncException( "Unable to wait for semaphore" ); 189 } 190 } 191 else version(OSX){ 192 timespec t; 193 194 adjTimespec( t, period ); 195 auto rc=semaphore_timedwait(m_hndl,t); 196 if (rc==0){ 197 return true; 198 } else if (rc==KERN_RETURN.OPERATION_TIMED_OUT){ 199 return false; 200 } else if (rc==KERN_RETURN.ABORTED) { 201 if( errno != EINTR ) 202 throw new SyncException( "Unable to wait for semaphore (abort)" ); 203 return false; // wait can be too short, wait is not resumed 204 } else { 205 throw new SyncException( "Unable to wait for semaphore" ); 206 } 207 } 208 else version( Posix ) 209 { 210 timespec t; 211 212 getTimespec( t ); 213 adjTimespec( t, period ); 214 215 while( true ) 216 { 217 if( !sem_timedwait( &m_hndl, &t ) ) 218 return true; 219 if( errno == ETIMEDOUT ) 220 return false; 221 if( errno != EINTR ) 222 throw new SyncException( "Unable to wait for semaphore" ); 223 } 224 } 225 226 // -w trip 227 //return false; 228 } 229 230 231 /** 232 * Atomically increment the current count by one. This will notify one 233 * waiter, if there are any in the queue. 234 * 235 * Throws: 236 * SyncException on error. 237 */ 238 void notify() 239 { 240 version( Win32 ) 241 { 242 if( !ReleaseSemaphore( m_hndl, 1, null ) ) 243 throw new SyncException( "Unable to notify semaphore" ); 244 } 245 else version(OSX){ 246 semaphore_signal(m_hndl); 247 } 248 else version( Posix ) 249 { 250 int rc = sem_post( &m_hndl ); 251 if( rc ) 252 throw new SyncException( "Unable to notify semaphore" ); 253 } 254 } 255 256 257 /** 258 * If the current count is equal to zero, return. Otherwise, atomically 259 * decrement the count by one and return true. 260 * 261 * Returns: 262 * true if the count was above zero and false if not. 263 * 264 * Throws: 265 * SyncException on error. 266 */ 267 bool tryWait() 268 { 269 version( Win32 ) 270 { 271 switch( WaitForSingleObject( m_hndl, 0 ) ) 272 { 273 case WAIT_OBJECT_0: 274 return true; 275 case WAIT_TIMEOUT: 276 return false; 277 default: 278 throw new SyncException( "Unable to wait for semaphore" ); 279 } 280 } 281 else version(OSX){ 282 return wait(0.0); 283 } 284 else version( Posix ) 285 { 286 while( true ) 287 { 288 if( !sem_trywait( &m_hndl ) ) 289 return true; 290 if( errno == EAGAIN ) 291 return false; 292 if( errno != EINTR ) 293 throw new SyncException( "Unable to wait for semaphore" ); 294 } 295 } 296 297 // -w trip 298 //return false; 299 } 300 301 302 private: 303 version( Win32 ) 304 { 305 HANDLE m_hndl; 306 } 307 else version(OSX){ 308 task_t creatingTask; 309 semaphore_t m_hndl; 310 } 311 else version( Posix ) 312 { 313 sem_t m_hndl; 314 } 315 } 316 317 318 //////////////////////////////////////////////////////////////////////////////// 319 // Unit Tests 320 //////////////////////////////////////////////////////////////////////////////// 321 322 323 debug( UnitTest ) 324 { 325 private import tango.core.Thread; 326 327 328 void testWait() 329 { 330 auto semaphore = new Semaphore; 331 int numToProduce = 10; 332 bool allProduced = false; 333 auto synProduced = new Object; 334 int numConsumed = 0; 335 auto synConsumed = new Object; 336 int numConsumers = 10; 337 int numComplete = 0; 338 auto synComplete = new Object; 339 340 void consumer() 341 { 342 while( true ) 343 { 344 semaphore.wait(); 345 346 synchronized( synProduced ) 347 { 348 if( allProduced ) 349 break; 350 } 351 352 synchronized( synConsumed ) 353 { 354 ++numConsumed; 355 } 356 } 357 358 synchronized( synComplete ) 359 { 360 ++numComplete; 361 } 362 } 363 364 void producer() 365 { 366 assert( !semaphore.tryWait() ); 367 368 for( int i = 0; i < numToProduce; ++i ) 369 { 370 semaphore.notify(); 371 Thread.yield(); 372 } 373 Thread.sleep(Time.seconds(1)); 374 synchronized( synProduced ) 375 { 376 allProduced = true; 377 } 378 379 for( int i = 0; i < numConsumers; ++i ) 380 { 381 semaphore.notify(); 382 Thread.yield(); 383 } 384 385 for( int i = numConsumers * 10000; i > 0; --i ) 386 { 387 synchronized( synComplete ) 388 { 389 if( numComplete == numConsumers ) 390 break; 391 } 392 Thread.yield(); 393 } 394 395 synchronized( synComplete ) 396 { 397 assert( numComplete == numConsumers ); 398 } 399 400 synchronized( synConsumed ) 401 { 402 assert( numConsumed == numToProduce ); 403 } 404 405 assert( !semaphore.tryWait() ); 406 semaphore.notify(); 407 assert( semaphore.tryWait() ); 408 assert( !semaphore.tryWait() ); 409 } 410 411 auto group = new ThreadGroup; 412 413 for( int i = 0; i < numConsumers; ++i ) 414 group.create( &consumer ); 415 group.create( &producer ); 416 group.joinAll(); 417 } 418 419 420 void testWaitTimeout() 421 { 422 auto synReady = new Object; 423 auto semReady = new Semaphore; 424 bool waiting = false; 425 bool alertedOne = true; 426 bool alertedTwo = true; 427 428 void waiter() 429 { 430 synchronized( synReady ) 431 { 432 waiting = true; 433 } 434 alertedOne = semReady.wait( 0.1 ); 435 alertedTwo = semReady.wait( 0.1 ); 436 } 437 438 auto thread = new Thread( &waiter ); 439 thread.start(); 440 441 while( true ) 442 { 443 synchronized( synReady ) 444 { 445 if( waiting ) 446 { 447 semReady.notify(); 448 break; 449 } 450 } 451 Thread.yield(); 452 } 453 thread.join(); 454 assert( waiting && alertedOne && !alertedTwo ); 455 } 456 457 458 unittest 459 { 460 testWait(); 461 testWaitTimeout(); 462 } 463 }