1 /******************************************************************************* 2 3 copyright: Copyright (c) 2008 Steven Schveighoffer. 4 All rights reserved 5 6 license: BSD style: $(LICENSE) 7 8 version: Jun 2008: Initial release 9 10 author: schveiguy 11 12 *******************************************************************************/ 13 14 module tango.io.device.ThreadPipe; 15 16 private import tango.core.Exception; 17 18 private import tango.io.device.Conduit; 19 20 private import tango.core.sync.Condition; 21 22 /** 23 * Conduit to support a data stream between 2 threads. One creates a 24 * ThreadPipe, then uses the OutputStream and the InputStream from it to 25 * communicate. All traffic is automatically synchronized, so one just uses 26 * the streams like they were normal device streams. 27 * 28 * It works by maintaining a circular buffer, where data is written to, and 29 * read from, in a FIFO fashion. 30 * --- 31 * auto tc = new ThreadPipe; 32 * void outFunc() 33 * { 34 * Stdout.copy(tc.input); 35 * } 36 * 37 * auto t = new Thread(&outFunc); 38 * t.start(); 39 * tc.write("hello, thread!"); 40 * tc.close(); 41 * t.join(); 42 * --- 43 */ 44 class ThreadPipe : Conduit 45 { 46 private bool _closed; 47 private size_t _readIdx, _remaining; 48 private void[] _buf; 49 private Mutex _mutex; 50 private Condition _condition; 51 52 /** 53 * Create a new ThreadPipe with the given buffer size. 54 * 55 * Params: 56 * bufferSize = The size to allocate the buffer. 57 */ 58 this(size_t bufferSize=(1024*16)) 59 { 60 _buf = new ubyte[bufferSize]; 61 _closed = false; 62 _readIdx = _remaining = 0; 63 _mutex = new Mutex; 64 _condition = new Condition(_mutex); 65 } 66 67 /** 68 * Implements IConduit.bufferSize. 69 * 70 * Returns the appropriate buffer size that should be used to buffer the 71 * ThreadPipe. Note that this is simply the buffer size passed in, and 72 * since all the ThreadPipe data is in memory, buffering doesn't make 73 * much sense. 74 */ 75 override const size_t bufferSize() 76 { 77 return _buf.length; 78 } 79 80 /** 81 * Implements IConduit.toString 82 * 83 * Returns "<thread conduit>" 84 */ 85 override string toString() 86 { 87 return "<threadpipe>"; 88 } 89 90 /** 91 * Returns true if there is data left to be read, and the write end isn't 92 * closed. 93 */ 94 override const bool isAlive() 95 { 96 synchronized(_mutex) 97 { 98 return !_closed || _remaining != 0; 99 } 100 } 101 102 /** 103 * Return the number of bytes remaining to be read in the circular buffer. 104 */ 105 size_t remaining() 106 { 107 synchronized(_mutex) 108 return _remaining; 109 } 110 111 /** 112 * Return the number of bytes that can be written to the circular buffer. 113 */ 114 size_t writable() 115 { 116 synchronized(_mutex) 117 return _buf.length - _remaining; 118 } 119 120 /** 121 * Close the write end of the conduit. Writing to the conduit after it is 122 * closed will return Eof. 123 * 124 * The read end is not closed until the buffer is empty. 125 */ 126 void stop() 127 { 128 // 129 // close write end. The read end can stay open until the remaining 130 // bytes are read. 131 // 132 synchronized(_mutex) 133 { 134 _closed = true; 135 _condition.notifyAll(); 136 } 137 } 138 139 /** 140 * This does nothing because we have no clue whether the members have been 141 * collected, and detach is run in the destructor. To stop communications, 142 * use stop(). 143 * 144 * TODO: move stop() functionality to detach when it becomes possible to 145 * have fully-owned members 146 */ 147 override void detach() 148 { 149 } 150 151 /** 152 * Implements InputStream.read. 153 * 154 * Read from the conduit into a target array. The provided dst will be 155 * populated with content from the stream. 156 * 157 * Returns the number of bytes read, which may be less than requested in 158 * dst. Eof is returned whenever an end-of-flow condition arises. 159 */ 160 override size_t read(void[] dst) 161 { 162 // 163 // don't block for empty read 164 // 165 if(dst.length == 0) 166 return 0; 167 synchronized(_mutex) 168 { 169 // 170 // see if any remaining data is present 171 // 172 size_t r; 173 while((r = _remaining) == 0 && !_closed) 174 _condition.wait(); 175 176 // 177 // read all data that is available 178 // 179 if(r == 0) 180 return Eof; 181 if(r > dst.length) 182 r = dst.length; 183 184 auto result = r; 185 186 // 187 // handle wrapping 188 // 189 if(_readIdx + r >= _buf.length) 190 { 191 size_t x = _buf.length - _readIdx; 192 dst[0..x] = _buf[_readIdx..$]; 193 _readIdx = 0; 194 _remaining -= x; 195 r -= x; 196 dst = dst[x..$]; 197 } 198 199 dst[0..r] = _buf[_readIdx..(_readIdx + r)]; 200 _readIdx = (_readIdx + r) % _buf.length; 201 _remaining -= r; 202 _condition.notifyAll(); 203 return result; 204 } 205 } 206 207 /** 208 * Implements InputStream.clear(). 209 * 210 * Clear any buffered content. 211 */ 212 ThreadPipe clear() 213 { 214 synchronized(_mutex) 215 { 216 if(_remaining != 0) 217 { 218 /* 219 * this isn't technically necessary, but we do it because it 220 * preserves the most recent data first 221 */ 222 _readIdx = (_readIdx + _remaining) % _buf.length; 223 _remaining = 0; 224 _condition.notifyAll(); 225 } 226 } 227 return this; 228 } 229 230 /** 231 * Implements OutputStream.write. 232 * 233 * Write to stream from a source array. The provided src content will be 234 * written to the stream. 235 * 236 * Returns the number of bytes written from src, which may be less than 237 * the quantity provided. Eof is returned when an end-of-flow condition 238 * arises. 239 */ 240 override size_t write(const(void)[] src) 241 { 242 // 243 // don't block for empty write 244 // 245 if(src.length == 0) 246 return 0; 247 synchronized(_mutex) 248 { 249 size_t w; 250 while((w = _buf.length - _remaining) == 0 && !_closed) 251 _condition.wait(); 252 253 if(_closed) 254 return Eof; 255 256 if(w > src.length) 257 w = src.length; 258 259 auto writeIdx = (_readIdx + _remaining) % _buf.length; 260 261 auto result = w; 262 263 if(w + writeIdx >= _buf.length) 264 { 265 auto x = _buf.length - writeIdx; 266 _buf[writeIdx..$] = src[0..x]; 267 writeIdx = 0; 268 w -= x; 269 _remaining += x; 270 src = src[x..$]; 271 } 272 _buf[writeIdx..(writeIdx + w)] = src[0..w]; 273 _remaining += w; 274 _condition.notifyAll(); 275 return result; 276 } 277 } 278 } 279 280 debug(UnitTest) 281 { 282 import tango.core.Thread; 283 284 unittest 285 { 286 auto source = new uint[1000]; 287 foreach(i, ref x; source) 288 x = cast(uint)i; 289 290 ThreadPipe tp = new ThreadPipe(16); 291 void threadA() 292 { 293 void[] sourceBuf = source; 294 while(sourceBuf.length > 0) 295 { 296 sourceBuf = sourceBuf[tp.write(sourceBuf)..$]; 297 } 298 tp.stop(); 299 } 300 Thread a = new Thread(&threadA); 301 a.start(); 302 int readval; 303 int last = -1; 304 size_t nread; 305 while((nread = tp.read((&readval)[0..1])) == readval.sizeof) 306 { 307 assert(readval == last + 1); 308 last = readval; 309 } 310 assert(nread == tp.Eof); 311 a.join(); 312 } 313 }