1 /** 2 * The read/write mutex module provides a primitive for maintaining shared read 3 * access and mutually exclusive write access. 4 * 5 * Copyright: Copyright (C) 2005-2006 Sean Kelly. All rights reserved. 6 * License: BSD style: $(LICENSE) 7 * Author: Sean Kelly 8 */ 9 module tango.core.sync.ReadWriteMutex; 10 11 12 public import tango.core.Exception : SyncException; 13 private import tango.core.sync.Condition; 14 private import tango.core.sync.Mutex; 15 private import Time = tango.core.Time; 16 17 version( Win32 ) 18 { 19 private import tango.sys.win32.UserGdi; 20 } 21 else version( Posix ) 22 { 23 private import tango.stdc.posix.pthread; 24 } 25 26 27 //////////////////////////////////////////////////////////////////////////////// 28 // ReadWriteMutex 29 // 30 // Reader reader(); 31 // Writer writer(); 32 //////////////////////////////////////////////////////////////////////////////// 33 34 35 /** 36 * This class represents a mutex that allows any number of readers to enter, 37 * but when a writer enters, all other readers and writers are blocked. 38 * 39 * Please note that this mutex is not recursive and is intended to guard access 40 * to data only. Also, no deadlock checking is in place because doing so would 41 * require dynamic memory allocation, which would reduce performance by an 42 * unacceptable amount. As a result, any attempt to recursively acquire this 43 * mutex may well deadlock the caller, particularly if a write lock is acquired 44 * while holding a read lock, or vice-versa. In practice, this should not be 45 * an issue however, because it is uncommon to call deeply into unknown code 46 * while holding a lock that simply protects data. 47 */ 48 class ReadWriteMutex 49 { 50 /** 51 * Defines the policy used by this mutex. Currently, two policies are 52 * defined. 53 * 54 * The first will queue writers until no readers hold the mutex, then 55 * pass the writers through one at a time. If a reader acquires the mutex 56 * while there are still writers queued, the reader will take precedence. 57 * 58 * The second will queue readers if there are any writers queued. Writers 59 * are passed through one at a time, and once there are no writers present, 60 * all queued readers will be alerted. 61 * 62 * Future policies may offer a more even balance between reader and writer 63 * precedence. 64 */ 65 enum Policy 66 { 67 PREFER_READERS, /// Readers get preference. This may starve writers. 68 PREFER_WRITERS /// Writers get preference. This may starve readers. 69 } 70 71 72 //////////////////////////////////////////////////////////////////////////// 73 // Initialization 74 //////////////////////////////////////////////////////////////////////////// 75 76 77 /** 78 * Initializes a read/write mutex object with the supplied policy. 79 * 80 * Params: 81 * policy = The policy to use. 82 * 83 * Throws: 84 * SyncException on error. 85 */ 86 this( Policy policy = Policy.PREFER_WRITERS ) 87 { 88 m_commonMutex = new Mutex; 89 if( !m_commonMutex ) 90 throw new SyncException( "Unable to initialize mutex" ); 91 scope(failure) delete m_commonMutex; 92 93 m_readerQueue = new Condition( m_commonMutex ); 94 if( !m_readerQueue ) 95 throw new SyncException( "Unable to initialize mutex" ); 96 scope(failure) delete m_readerQueue; 97 98 m_writerQueue = new Condition( m_commonMutex ); 99 if( !m_writerQueue ) 100 throw new SyncException( "Unable to initialize mutex" ); 101 scope(failure) delete m_writerQueue; 102 103 m_policy = policy; 104 m_reader = new Reader; 105 m_writer = new Writer; 106 } 107 108 109 //////////////////////////////////////////////////////////////////////////// 110 // General Properties 111 //////////////////////////////////////////////////////////////////////////// 112 113 114 /** 115 * Gets the policy for the associated mutex. 116 * 117 * Returns: 118 * The policy used by this mutex. 119 */ 120 Policy policy() 121 { 122 return m_policy; 123 } 124 125 126 //////////////////////////////////////////////////////////////////////////// 127 // Reader/Writer Handles 128 //////////////////////////////////////////////////////////////////////////// 129 130 131 /** 132 * Gets an object representing the reader lock for the associated mutex. 133 * 134 * Returns: 135 * A reader sub-mutex. 136 */ 137 @property Reader reader() 138 { 139 return m_reader; 140 } 141 142 143 /** 144 * Gets an object representing the writer lock for the associated mutex. 145 * 146 * Returns: 147 * A writer sub-mutex. 148 */ 149 @property Writer writer() 150 { 151 return m_writer; 152 } 153 154 155 //////////////////////////////////////////////////////////////////////////// 156 // Reader 157 //////////////////////////////////////////////////////////////////////////// 158 159 160 /** 161 * This class can be considered a mutex in its own right, and is used to 162 * negotiate a read lock for the enclosing mutex. 163 */ 164 class Reader : 165 Object.Monitor 166 { 167 /** 168 * Initializes a read/write mutex reader proxy object. 169 */ 170 this() 171 { 172 m_proxy.link = this; 173 (cast(void**) this)[1] = &m_proxy; 174 } 175 176 177 /** 178 * Acquires a read lock on the enclosing mutex. 179 */ 180 void lock() 181 { 182 synchronized( m_commonMutex ) 183 { 184 ++m_numQueuedReaders; 185 scope(exit) --m_numQueuedReaders; 186 187 while( shouldQueueReader() ) 188 m_readerQueue.wait(); 189 ++m_numActiveReaders; 190 } 191 } 192 193 194 /** 195 * Releases a read lock on the enclosing mutex. 196 */ 197 void unlock() 198 { 199 synchronized( m_commonMutex ) 200 { 201 if( --m_numActiveReaders < 1 ) 202 { 203 if( m_numQueuedWriters > 0 ) 204 m_writerQueue.notify(); 205 } 206 } 207 } 208 209 210 /** 211 * Attempts to acquire a read lock on the enclosing mutex. If one can 212 * be obtained without blocking, the lock is acquired and true is 213 * returned. If not, the lock is not acquired and false is returned. 214 * 215 * Returns: 216 * true if the lock was acquired and false if not. 217 */ 218 bool tryLock() 219 { 220 synchronized( m_commonMutex ) 221 { 222 if( shouldQueueReader() ) 223 return false; 224 ++m_numActiveReaders; 225 return true; 226 } 227 } 228 229 230 private: 231 bool shouldQueueReader() 232 { 233 if( m_numActiveWriters > 0 ) 234 return true; 235 236 switch( m_policy ) 237 { 238 case Policy.PREFER_WRITERS: 239 return m_numQueuedWriters > 0; 240 241 case Policy.PREFER_READERS: 242 default: 243 break; 244 } 245 246 return false; 247 } 248 249 struct MonitorProxy 250 { 251 Object.Monitor link; 252 } 253 254 MonitorProxy m_proxy; 255 } 256 257 258 //////////////////////////////////////////////////////////////////////////// 259 // Writer 260 //////////////////////////////////////////////////////////////////////////// 261 262 263 /** 264 * This class can be considered a mutex in its own right, and is used to 265 * negotiate a write lock for the enclosing mutex. 266 */ 267 class Writer : 268 Object.Monitor 269 { 270 /** 271 * Initializes a read/write mutex writer proxy object. 272 */ 273 this() 274 { 275 m_proxy.link = this; 276 (cast(void**) this)[1] = &m_proxy; 277 } 278 279 280 /** 281 * Acquires a write lock on the enclosing mutex. 282 */ 283 void lock() 284 { 285 synchronized( m_commonMutex ) 286 { 287 ++m_numQueuedWriters; 288 scope(exit) --m_numQueuedWriters; 289 290 while( shouldQueueWriter() ) 291 m_writerQueue.wait(); 292 ++m_numActiveWriters; 293 } 294 } 295 296 297 /** 298 * Releases a write lock on the enclosing mutex. 299 */ 300 void unlock() 301 { 302 synchronized( m_commonMutex ) 303 { 304 if( --m_numActiveWriters < 1 ) 305 { 306 switch( m_policy ) 307 { 308 default: 309 case Policy.PREFER_READERS: 310 if( m_numQueuedReaders > 0 ) 311 m_readerQueue.notifyAll(); 312 else if( m_numQueuedWriters > 0 ) 313 m_writerQueue.notify(); 314 break; 315 case Policy.PREFER_WRITERS: 316 if( m_numQueuedWriters > 0 ) 317 m_writerQueue.notify(); 318 else if( m_numQueuedReaders > 0 ) 319 m_readerQueue.notifyAll(); 320 } 321 } 322 } 323 } 324 325 326 /** 327 * Attempts to acquire a write lock on the enclosing mutex. If one can 328 * be obtained without blocking, the lock is acquired and true is 329 * returned. If not, the lock is not acquired and false is returned. 330 * 331 * Returns: 332 * true if the lock was acquired and false if not. 333 */ 334 bool tryLock() 335 { 336 synchronized( m_commonMutex ) 337 { 338 if( shouldQueueWriter() ) 339 return false; 340 ++m_numActiveWriters; 341 return true; 342 } 343 } 344 345 346 private: 347 bool shouldQueueWriter() 348 { 349 if( m_numActiveWriters > 0 || 350 m_numActiveReaders > 0 ) 351 return true; 352 switch( m_policy ) 353 { 354 case Policy.PREFER_READERS: 355 return m_numQueuedReaders > 0; 356 357 case Policy.PREFER_WRITERS: 358 default: 359 break; 360 } 361 362 return false; 363 } 364 365 struct MonitorProxy 366 { 367 Object.Monitor link; 368 } 369 370 MonitorProxy m_proxy; 371 } 372 373 374 private: 375 Policy m_policy; 376 Reader m_reader; 377 Writer m_writer; 378 379 Mutex m_commonMutex; 380 Condition m_readerQueue; 381 Condition m_writerQueue; 382 383 int m_numQueuedReaders; 384 int m_numActiveReaders; 385 int m_numQueuedWriters; 386 int m_numActiveWriters; 387 } 388 389 390 //////////////////////////////////////////////////////////////////////////////// 391 // Unit Tests 392 //////////////////////////////////////////////////////////////////////////////// 393 394 395 debug( UnitTest ) 396 { 397 private import tango.core.Thread; 398 399 400 void testRead( ReadWriteMutex.Policy policy ) 401 { 402 auto mutex = new ReadWriteMutex( policy ); 403 auto synInfo = new Object; 404 int numThreads = 10; 405 int numReaders = 0; 406 int maxReaders = 0; 407 408 void readerFn() 409 { 410 synchronized( mutex.reader() ) 411 { 412 synchronized( synInfo ) 413 { 414 if( ++numReaders > maxReaders ) 415 maxReaders = numReaders; 416 } 417 Thread.sleep( Time.seconds(0.001) ); 418 synchronized( synInfo ) 419 { 420 --numReaders; 421 } 422 } 423 } 424 425 auto group = new ThreadGroup; 426 427 for( int i = 0; i < numThreads; ++i ) 428 { 429 group.create( &readerFn ); 430 } 431 group.joinAll(); 432 assert( numReaders < 1 && maxReaders > 1 ); 433 } 434 435 436 void testReadWrite( ReadWriteMutex.Policy policy ) 437 { 438 auto mutex = new ReadWriteMutex( policy ); 439 auto synInfo = new Object; 440 int numThreads = 10; 441 int numReaders = 0; 442 int numWriters = 0; 443 int maxReaders = 0; 444 int maxWriters = 0; 445 int numTries = 20; 446 447 void readerFn() 448 { 449 for( int i = 0; i < numTries; ++i ) 450 { 451 synchronized( mutex.reader() ) 452 { 453 synchronized( synInfo ) 454 { 455 if( ++numReaders > maxReaders ) 456 maxReaders = numReaders; 457 } 458 Thread.sleep( Time.seconds(0.001) ); 459 synchronized( synInfo ) 460 { 461 --numReaders; 462 } 463 } 464 } 465 } 466 467 void writerFn() 468 { 469 for( int i = 0; i < numTries; ++i ) 470 { 471 synchronized( mutex.writer() ) 472 { 473 synchronized( synInfo ) 474 { 475 if( ++numWriters > maxWriters ) 476 maxWriters = numWriters; 477 } 478 Thread.sleep( Time.seconds(0.001) ); 479 synchronized( synInfo ) 480 { 481 --numWriters; 482 } 483 } 484 } 485 } 486 487 auto group = new ThreadGroup; 488 489 for( int i = 0; i < numThreads; ++i ) 490 { 491 group.create( &readerFn ); 492 group.create( &writerFn ); 493 } 494 group.joinAll(); 495 assert( numReaders < 1 && maxReaders > 1 && 496 numWriters < 1 && maxWriters < 2 ); 497 } 498 499 500 unittest 501 { 502 testRead( ReadWriteMutex.Policy.PREFER_READERS ); 503 testRead( ReadWriteMutex.Policy.PREFER_WRITERS ); 504 testReadWrite( ReadWriteMutex.Policy.PREFER_READERS ); 505 testReadWrite( ReadWriteMutex.Policy.PREFER_WRITERS ); 506 } 507 }