1 /*******************************************************************************
2 
3         copyright:      Copyright (c) 2008 Steven Schveighoffer.
4                         All rights reserved
5 
6         license:        BSD style: $(LICENSE)
7 
8         version:        Jun 2008: Initial release
9 
10         author:         schveiguy
11 
12 *******************************************************************************/
13 
14 module tango.io.device.ThreadPipe;
15 
16 private import tango.core.Exception;
17 
18 private import tango.io.device.Conduit;
19 
20 private import tango.core.sync.Condition;
21 
22 /**
23  * Conduit to support a data stream between 2 threads.  One creates a
24  * ThreadPipe, then uses the OutputStream and the InputStream from it to
25  * communicate.  All traffic is automatically synchronized, so one just uses
26  * the streams like they were normal device streams.
27  *
28  * It works by maintaining a circular buffer, where data is written to, and
29  * read from, in a FIFO fashion.
30  * ---
31  * auto tc = new ThreadPipe;
32  * void outFunc()
33  * {
34  *   Stdout.copy(tc.input);
35  * }
36  *
37  * auto t = new Thread(&outFunc);
38  * t.start();
39  * tc.write("hello, thread!");
40  * tc.close();
41  * t.join();
42  * ---
43  */
44 class ThreadPipe : Conduit
45 {
46     private bool _closed;
47     private size_t _readIdx, _remaining;
48     private void[] _buf;
49     private Mutex _mutex;
50     private Condition _condition;
51 
52     /**
53      * Create a new ThreadPipe with the given buffer size.
54      *
55      * Params:
56      * bufferSize = The size to allocate the buffer.
57      */
58     this(size_t bufferSize=(1024*16))
59     {
60         _buf = new ubyte[bufferSize];
61         _closed = false;
62         _readIdx = _remaining = 0;
63         _mutex = new Mutex;
64         _condition = new Condition(_mutex);
65     }
66 
67     /**
68      * Implements IConduit.bufferSize.
69      *
70      * Returns the appropriate buffer size that should be used to buffer the
71      * ThreadPipe.  Note that this is simply the buffer size passed in, and
72      * since all the ThreadPipe data is in memory, buffering doesn't make
73      * much sense.
74      */
75     override const size_t bufferSize()
76     {
77         return _buf.length;
78     }
79 
80     /**
81      * Implements IConduit.toString
82      *
83      * Returns "<thread conduit>"
84      */
85     override string toString()
86     {
87         return "<threadpipe>";
88     }
89 
90     /**
91      * Returns true if there is data left to be read, and the write end isn't
92      * closed.
93      */
94     override const bool isAlive()
95     {
96         synchronized(_mutex)
97         {
98             return !_closed || _remaining != 0;
99         }
100     }
101 
102     /**
103      * Return the number of bytes remaining to be read in the circular buffer.
104      */
105     size_t remaining()
106     {
107         synchronized(_mutex)
108             return _remaining;
109     }
110 
111     /**
112      * Return the number of bytes that can be written to the circular buffer.
113      */
114     size_t writable()
115     {
116         synchronized(_mutex)
117             return _buf.length - _remaining;
118     }
119 
120     /**
121      * Close the write end of the conduit.  Writing to the conduit after it is
122      * closed will return Eof.
123      *
124      * The read end is not closed until the buffer is empty.
125      */
126     void stop()
127     {
128         //
129         // close write end.  The read end can stay open until the remaining
130         // bytes are read.
131         //
132         synchronized(_mutex)
133         {
134             _closed = true;
135             _condition.notifyAll();
136         }
137     }
138 
139     /**
140      * This does nothing because we have no clue whether the members have been
141      * collected, and detach is run in the destructor.  To stop communications,
142      * use stop().
143      *
144      * TODO: move stop() functionality to detach when it becomes possible to
145      * have fully-owned members
146      */
147     override void detach()
148     {
149     }
150 
151     /**
152      * Implements InputStream.read.
153      *
154      * Read from the conduit into a target array.  The provided dst will be
155      * populated with content from the stream.
156      *
157      * Returns the number of bytes read, which may be less than requested in
158      * dst. Eof is returned whenever an end-of-flow condition arises.
159      */
160     override size_t read(void[] dst)
161     {
162         //
163         // don't block for empty read
164         //
165         if(dst.length == 0)
166             return 0;
167         synchronized(_mutex)
168         {
169             //
170             // see if any remaining data is present
171             //
172             size_t r;
173             while((r = _remaining) == 0 && !_closed)
174                 _condition.wait();
175 
176             //
177             // read all data that is available
178             //
179             if(r == 0)
180                 return Eof;
181             if(r > dst.length)
182                 r = dst.length;
183 
184             auto result = r;
185 
186             //
187             // handle wrapping
188             //
189             if(_readIdx + r >= _buf.length)
190             {
191                 size_t x = _buf.length - _readIdx;
192                 dst[0..x] = _buf[_readIdx..$];
193                 _readIdx = 0;
194                 _remaining -= x;
195                 r -= x;
196                 dst = dst[x..$];
197             }
198 
199             dst[0..r] = _buf[_readIdx..(_readIdx + r)];
200             _readIdx = (_readIdx + r) % _buf.length;
201             _remaining -= r;
202             _condition.notifyAll();
203             return result;
204         }
205     }
206 
207     /**
208      * Implements InputStream.clear().
209      *
210      * Clear any buffered content.
211      */
212     ThreadPipe clear()
213     {
214         synchronized(_mutex)
215         {
216             if(_remaining != 0)
217             {
218                 /*
219                  * this isn't technically necessary, but we do it because it
220                  * preserves the most recent data first
221                  */
222                 _readIdx = (_readIdx + _remaining) % _buf.length;
223                 _remaining = 0;
224                 _condition.notifyAll();
225             }
226         }
227         return this;
228     }
229 
230     /**
231      * Implements OutputStream.write.
232      *
233      * Write to stream from a source array. The provided src content will be
234      * written to the stream.
235      *
236      * Returns the number of bytes written from src, which may be less than
237      * the quantity provided. Eof is returned when an end-of-flow condition
238      * arises.
239      */
240     override size_t write(const(void)[] src)
241     {
242         //
243         // don't block for empty write
244         //
245         if(src.length == 0)
246             return 0;
247         synchronized(_mutex)
248         {
249             size_t w;
250             while((w = _buf.length - _remaining) == 0 && !_closed)
251                 _condition.wait();
252 
253             if(_closed)
254                 return Eof;
255 
256             if(w > src.length)
257                 w = src.length;
258 
259             auto writeIdx = (_readIdx + _remaining) % _buf.length;
260 
261             auto result = w;
262 
263             if(w + writeIdx >= _buf.length)
264             {
265                 auto x = _buf.length - writeIdx;
266                 _buf[writeIdx..$] = src[0..x];
267                 writeIdx = 0;
268                 w -= x;
269                 _remaining += x;
270                 src = src[x..$];
271             }
272             _buf[writeIdx..(writeIdx + w)] = src[0..w];
273             _remaining += w;
274             _condition.notifyAll();
275             return result;
276         }
277     }
278 }
279 
280 debug(UnitTest)
281 {
282     import tango.core.Thread;
283 
284     unittest
285     {
286         auto source = new uint[1000];
287         foreach(i, ref x; source)
288             x = cast(uint)i;
289 
290         ThreadPipe tp = new ThreadPipe(16);
291         void threadA()
292         {
293             void[] sourceBuf = source;
294             while(sourceBuf.length > 0)
295             {
296                 sourceBuf = sourceBuf[tp.write(sourceBuf)..$];
297             }
298             tp.stop();
299         }
300         Thread a = new Thread(&threadA);
301         a.start();
302         int readval;
303         int last = -1;
304         size_t nread;
305         while((nread = tp.read((&readval)[0..1])) == readval.sizeof)
306         {
307             assert(readval == last + 1);
308             last = readval;
309         }
310         assert(nread == tp.Eof);
311         a.join();
312     }
313 }