1 /**
2  * The read/write mutex module provides a primitive for maintaining shared read
3  * access and mutually exclusive write access.
4  *
5  * Copyright: Copyright (C) 2005-2006 Sean Kelly.  All rights reserved.
6  * License:   BSD style: $(LICENSE)
7  * Author:    Sean Kelly
8  */
9 module tango.core.sync.ReadWriteMutex;
10 
11 
12 public import tango.core.Exception : SyncException;
13 private import tango.core.sync.Condition;
14 private import tango.core.sync.Mutex;
15 private import Time = tango.core.Time;
16 
17 version( Win32 )
18 {
19     private import tango.sys.win32.UserGdi;
20 }
21 else version( Posix )
22 {
23     private import tango.stdc.posix.pthread;
24 }
25 
26 
27 ////////////////////////////////////////////////////////////////////////////////
28 // ReadWriteMutex
29 //
30 // Reader reader();
31 // Writer writer();
32 ////////////////////////////////////////////////////////////////////////////////
33 
34 
35 /**
36  * This class represents a mutex that allows any number of readers to enter,
37  * but when a writer enters, all other readers and writers are blocked.
38  *
39  * Please note that this mutex is not recursive and is intended to guard access
40  * to data only.  Also, no deadlock checking is in place because doing so would
41  * require dynamic memory allocation, which would reduce performance by an
42  * unacceptable amount.  As a result, any attempt to recursively acquire this
43  * mutex may well deadlock the caller, particularly if a write lock is acquired
44  * while holding a read lock, or vice-versa.  In practice, this should not be
45  * an issue however, because it is uncommon to call deeply into unknown code
46  * while holding a lock that simply protects data.
47  */
48 class ReadWriteMutex
49 {
50     /**
51      * Defines the policy used by this mutex.  Currently, two policies are
52      * defined.
53      *
54      * The first will queue writers until no readers hold the mutex, then
55      * pass the writers through one at a time.  If a reader acquires the mutex
56      * while there are still writers queued, the reader will take precedence.
57      *
58      * The second will queue readers if there are any writers queued.  Writers
59      * are passed through one at a time, and once there are no writers present,
60      * all queued readers will be alerted.
61      *
62      * Future policies may offer a more even balance between reader and writer
63      * precedence.
64      */
65     enum Policy
66     {
67         PREFER_READERS, /// Readers get preference.  This may starve writers.
68         PREFER_WRITERS  /// Writers get preference.  This may starve readers.
69     }
70 
71 
72     ////////////////////////////////////////////////////////////////////////////
73     // Initialization
74     ////////////////////////////////////////////////////////////////////////////
75 
76 
77     /**
78      * Initializes a read/write mutex object with the supplied policy.
79      *
80      * Params:
81      *  policy = The policy to use.
82      *
83      * Throws:
84      *  SyncException on error.
85      */
86     this( Policy policy = Policy.PREFER_WRITERS )
87     {
88         m_commonMutex = new Mutex;
89         if( !m_commonMutex )
90             throw new SyncException( "Unable to initialize mutex" );
91         scope(failure) delete m_commonMutex;
92 
93         m_readerQueue = new Condition( m_commonMutex );
94         if( !m_readerQueue )
95             throw new SyncException( "Unable to initialize mutex" );
96         scope(failure) delete m_readerQueue;
97 
98         m_writerQueue = new Condition( m_commonMutex );
99         if( !m_writerQueue )
100             throw new SyncException( "Unable to initialize mutex" );
101         scope(failure) delete m_writerQueue;
102 
103         m_policy = policy;
104         m_reader = new Reader;
105         m_writer = new Writer;
106     }
107 
108 
109     ////////////////////////////////////////////////////////////////////////////
110     // General Properties
111     ////////////////////////////////////////////////////////////////////////////
112 
113 
114     /**
115      * Gets the policy for the associated mutex.
116      *
117      * Returns:
118      *  The policy used by this mutex.
119      */
120     Policy policy()
121     {
122         return m_policy;
123     }
124 
125 
126     ////////////////////////////////////////////////////////////////////////////
127     // Reader/Writer Handles
128     ////////////////////////////////////////////////////////////////////////////
129 
130 
131     /**
132      * Gets an object representing the reader lock for the associated mutex.
133      *
134      * Returns:
135      *  A reader sub-mutex.
136      */
137     @property Reader reader()
138     {
139         return m_reader;
140     }
141 
142 
143     /**
144      * Gets an object representing the writer lock for the associated mutex.
145      *
146      * Returns:
147      *  A writer sub-mutex.
148      */
149     @property Writer writer()
150     {
151         return m_writer;
152     }
153 
154 
155     ////////////////////////////////////////////////////////////////////////////
156     // Reader
157     ////////////////////////////////////////////////////////////////////////////
158 
159 
160     /**
161      * This class can be considered a mutex in its own right, and is used to
162      * negotiate a read lock for the enclosing mutex.
163      */
164     class Reader :
165         Object.Monitor
166     {
167         /**
168          * Initializes a read/write mutex reader proxy object.
169          */
170         this()
171         {
172             m_proxy.link = this;
173             (cast(void**) this)[1] = &m_proxy;
174         }
175 
176 
177         /**
178          * Acquires a read lock on the enclosing mutex.
179          */
180         void lock()
181         {
182             synchronized( m_commonMutex )
183             {
184                 ++m_numQueuedReaders;
185                 scope(exit) --m_numQueuedReaders;
186 
187                 while( shouldQueueReader() )
188                     m_readerQueue.wait();
189                 ++m_numActiveReaders;
190             }
191         }
192 
193 
194         /**
195          * Releases a read lock on the enclosing mutex.
196          */
197         void unlock()
198         {
199             synchronized( m_commonMutex )
200             {
201                 if( --m_numActiveReaders < 1 )
202                 {
203                     if( m_numQueuedWriters > 0 )
204                         m_writerQueue.notify();
205                 }
206             }
207         }
208 
209 
210         /**
211          * Attempts to acquire a read lock on the enclosing mutex.  If one can
212          * be obtained without blocking, the lock is acquired and true is
213          * returned.  If not, the lock is not acquired and false is returned.
214          *
215          * Returns:
216          *  true if the lock was acquired and false if not.
217          */
218         bool tryLock()
219         {
220             synchronized( m_commonMutex )
221             {
222                 if( shouldQueueReader() )
223                     return false;
224                 ++m_numActiveReaders;
225                 return true;
226             }
227         }
228 
229 
230     private:
231         bool shouldQueueReader()
232         {
233             if( m_numActiveWriters > 0 )
234                 return true;
235 
236             switch( m_policy )
237             {
238             case Policy.PREFER_WRITERS:
239                  return m_numQueuedWriters > 0;
240 
241             case Policy.PREFER_READERS:
242             default:
243                  break;
244             }
245 
246         return false;
247         }
248 
249         struct MonitorProxy
250         {
251             Object.Monitor link;
252         }
253 
254         MonitorProxy    m_proxy;
255     }
256 
257 
258     ////////////////////////////////////////////////////////////////////////////
259     // Writer
260     ////////////////////////////////////////////////////////////////////////////
261 
262 
263     /**
264      * This class can be considered a mutex in its own right, and is used to
265      * negotiate a write lock for the enclosing mutex.
266      */
267     class Writer :
268         Object.Monitor
269     {
270         /**
271          * Initializes a read/write mutex writer proxy object.
272          */
273         this()
274         {
275             m_proxy.link = this;
276             (cast(void**) this)[1] = &m_proxy;
277         }
278 
279 
280         /**
281          * Acquires a write lock on the enclosing mutex.
282          */
283         void lock()
284         {
285             synchronized( m_commonMutex )
286             {
287                 ++m_numQueuedWriters;
288                 scope(exit) --m_numQueuedWriters;
289 
290                 while( shouldQueueWriter() )
291                     m_writerQueue.wait();
292                 ++m_numActiveWriters;
293             }
294         }
295 
296 
297         /**
298          * Releases a write lock on the enclosing mutex.
299          */
300         void unlock()
301         {
302             synchronized( m_commonMutex )
303             {
304                 if( --m_numActiveWriters < 1 )
305                 {
306                     switch( m_policy )
307                     {
308                     default:
309                     case Policy.PREFER_READERS:
310                         if( m_numQueuedReaders > 0 )
311                             m_readerQueue.notifyAll();
312                         else if( m_numQueuedWriters > 0 )
313                             m_writerQueue.notify();
314                         break;
315                     case Policy.PREFER_WRITERS:
316                         if( m_numQueuedWriters > 0 )
317                             m_writerQueue.notify();
318                         else if( m_numQueuedReaders > 0 )
319                             m_readerQueue.notifyAll();
320                     }
321                 }
322             }
323         }
324 
325 
326         /**
327          * Attempts to acquire a write lock on the enclosing mutex.  If one can
328          * be obtained without blocking, the lock is acquired and true is
329          * returned.  If not, the lock is not acquired and false is returned.
330          *
331          * Returns:
332          *  true if the lock was acquired and false if not.
333          */
334         bool tryLock()
335         {
336             synchronized( m_commonMutex )
337             {
338                 if( shouldQueueWriter() )
339                     return false;
340                 ++m_numActiveWriters;
341                 return true;
342             }
343         }
344 
345 
346     private:
347         bool shouldQueueWriter()
348         {
349             if( m_numActiveWriters > 0 ||
350                 m_numActiveReaders > 0 )
351                 return true;
352             switch( m_policy )
353             {
354             case Policy.PREFER_READERS:
355                 return m_numQueuedReaders > 0;
356 
357             case Policy.PREFER_WRITERS:
358             default:
359                  break;
360             }
361 
362         return false;
363         }
364 
365         struct MonitorProxy
366         {
367             Object.Monitor link;
368         }
369 
370         MonitorProxy    m_proxy;
371     }
372 
373 
374 private:
375     Policy      m_policy;
376     Reader      m_reader;
377     Writer      m_writer;
378 
379     Mutex       m_commonMutex;
380     Condition   m_readerQueue;
381     Condition   m_writerQueue;
382 
383     int         m_numQueuedReaders;
384     int         m_numActiveReaders;
385     int         m_numQueuedWriters;
386     int         m_numActiveWriters;
387 }
388 
389 
390 ////////////////////////////////////////////////////////////////////////////////
391 // Unit Tests
392 ////////////////////////////////////////////////////////////////////////////////
393 
394 
395 debug( UnitTest )
396 {
397     private import tango.core.Thread;
398 
399 
400     void testRead( ReadWriteMutex.Policy policy )
401     {
402         auto mutex      = new ReadWriteMutex( policy );
403         auto synInfo    = new Object;
404         int  numThreads = 10;
405         int  numReaders = 0;
406         int  maxReaders = 0;
407 
408         void readerFn()
409         {
410             synchronized( mutex.reader() )
411             {
412                 synchronized( synInfo )
413                 {
414                     if( ++numReaders > maxReaders )
415                         maxReaders = numReaders;
416                 }
417                 Thread.sleep( Time.seconds(0.001) );
418                 synchronized( synInfo )
419                 {
420                     --numReaders;
421                 }
422             }
423         }
424 
425         auto group = new ThreadGroup;
426 
427         for( int i = 0; i < numThreads; ++i )
428         {
429             group.create( &readerFn );
430         }
431         group.joinAll();
432         assert( numReaders < 1 && maxReaders > 1 );
433     }
434 
435 
436     void testReadWrite( ReadWriteMutex.Policy policy )
437     {
438         auto mutex      = new ReadWriteMutex( policy );
439         auto synInfo    = new Object;
440         int  numThreads = 10;
441         int  numReaders = 0;
442         int  numWriters = 0;
443         int  maxReaders = 0;
444         int  maxWriters = 0;
445         int  numTries   = 20;
446 
447         void readerFn()
448         {
449             for( int i = 0; i < numTries; ++i )
450             {
451                 synchronized( mutex.reader() )
452                 {
453                     synchronized( synInfo )
454                     {
455                         if( ++numReaders > maxReaders )
456                             maxReaders = numReaders;
457                     }
458                     Thread.sleep( Time.seconds(0.001) );
459                     synchronized( synInfo )
460                     {
461                         --numReaders;
462                     }
463                 }
464             }
465         }
466 
467         void writerFn()
468         {
469             for( int i = 0; i < numTries; ++i )
470             {
471                 synchronized( mutex.writer() )
472                 {
473                     synchronized( synInfo )
474                     {
475                         if( ++numWriters > maxWriters )
476                             maxWriters = numWriters;
477                     }
478                     Thread.sleep( Time.seconds(0.001) );
479                     synchronized( synInfo )
480                     {
481                         --numWriters;
482                     }
483                 }
484             }
485         }
486 
487         auto group = new ThreadGroup;
488 
489         for( int i = 0; i < numThreads; ++i )
490         {
491             group.create( &readerFn );
492             group.create( &writerFn );
493         }
494         group.joinAll();
495         assert( numReaders < 1 && maxReaders > 1 &&
496                 numWriters < 1 && maxWriters < 2 );
497     }
498 
499 
500     unittest
501     {
502         testRead( ReadWriteMutex.Policy.PREFER_READERS );
503         testRead( ReadWriteMutex.Policy.PREFER_WRITERS );
504         testReadWrite( ReadWriteMutex.Policy.PREFER_READERS );
505         testReadWrite( ReadWriteMutex.Policy.PREFER_WRITERS );
506     }
507 }