1 /*******************************************************************************
2 
3         copyright:      Copyright (c) 2004 Kris Bell. All rights reserved
4 
5         license:        BSD style: $(LICENSE)
6 
7         version:        Mar 2004: Initial release
8 
9         author:         Kris
10 
11 *******************************************************************************/
12 
13 module tango.io.device.Conduit;
14 
15 private import tango.core.Thread,
16                tango.core.Exception;
17 
18 public  import tango.io.model.IConduit;
19 
20 /*******************************************************************************
21 
22         Conduit abstract base-class, implementing interface IConduit.
23         Only the conduit-specific read(), write(), detach() and
24         bufferSize() need to be implemented for a concrete conduit
25         implementation. See File for an example.
26 
27         Conduits provide virtualized access to external content, and
28         represent things like files or Internet connections. Conduits
29         expose a pair of streams, are modelled by tango.io.model.IConduit,
30         and are implemented via classes such as File & SocketConduit.
31 
32         Additional kinds of conduit are easy to construct: either subclass
33         tango.io.device.Conduit, or implement tango.io.model.IConduit. A
34         conduit typically reads and writes via a Buffer in large chunks,
35         typically the entire buffer. Alternatively, one can invoke method
36         read(dst[]) and/or write(src[]) directly.
37 
38 *******************************************************************************/
39 
40 class Conduit : IConduit
41 {
42         version(TangoRuntime)
43         {
44             protected Fiber.Scheduler scheduler;            // optional scheduler
45         }
46         private   uint            duration = -1;        // scheduling timeout
47 
48         /***********************************************************************
49 
50                 Test for asynchronous capability. This will be eligable
51                 for scheduling where (a) it is created within a fiber and
52                 (b) there is a scheduler attached to the fiber at the time
53                 this is invoked.
54 
55                 Note that fibers may schedule just one outstanding I/O
56                 request at a time.
57 
58         ***********************************************************************/
59 
60         this ()
61         {
62                 auto f = Fiber.getThis();
63                 version(TangoRuntime)
64                 {
65                     if (f)
66                         scheduler = f.event.scheduler;
67                 }
68         }
69 
70         /***********************************************************************
71 
72                 Clean up when collected. See method detach().
73 
74         ***********************************************************************/
75 
76         ~this ()
77         {
78                 detach();
79         }
80 
81         /***********************************************************************
82 
83                 Return the name of this conduit.
84 
85         ***********************************************************************/
86 
87         abstract override string toString (); 
88 
89         /***********************************************************************
90 
91                 Return a preferred size for buffering conduit I/O.
92 
93         ***********************************************************************/
94 
95         abstract const size_t bufferSize ();
96 
97         /***********************************************************************
98 
99                 Read from conduit into a target array. The provided dst
100                 will be populated with content from the conduit.
101 
102                 Returns the number of bytes read, which may be less than
103                 requested in dst. Eof is returned whenever an end-of-flow
104                 condition arises.
105 
106         ***********************************************************************/
107 
108         abstract size_t read (void[] dst);
109 
110         /***********************************************************************
111 
112                 Write to conduit from a source array. The provided src
113                 content will be written to the conduit.
114 
115                 Returns the number of bytes written from src, which may
116                 be less than the quantity provided. Eof is returned when
117                 an end-of-flow condition arises.
118 
119         ***********************************************************************/
120 
121         abstract size_t write (const(void)[] src);
122 
123         /***********************************************************************
124 
125                 Disconnect this conduit. Note that this may be invoked
126                 both explicitly by the user, and implicitly by the GC.
127                 Be sure to manage multiple detachment requests correctly:
128                 set a flag, or sentinel value as necessary.
129 
130         ***********************************************************************/
131 
132         abstract void detach ();
133 
134         /***********************************************************************
135 
136                 Set the active timeout period for IO calls (in milliseconds.)
137 
138         ***********************************************************************/
139 
140         @property final void timeout (uint millisec)
141         {
142                 duration = millisec;
143         }
144 
145         /***********************************************************************
146 
147                 Get the active timeout period for IO calls (in milliseconds.)
148 
149         ***********************************************************************/
150 
151         @property final const uint timeout ()
152         {
153                 return duration;
154         }
155 
156         /***********************************************************************
157 
158                 Is the conduit alive? Default behaviour returns true.
159 
160         ***********************************************************************/
161 
162         @property const bool isAlive ()
163         {
164                 return true;
165         }
166 
167         /***********************************************************************
168 
169                 Return the host. This is part of the Stream interface.
170 
171         ***********************************************************************/
172 
173         @property final IConduit conduit ()
174         {
175                 return this;
176         }
177 
178         /***********************************************************************
179 
180                 Emit buffered output or reset buffered input.
181 
182         ***********************************************************************/
183 
184         IOStream flush ()
185         {
186                 return this;
187         }
188 
189         /***********************************************************************
190 
191                 Close this conduit.
192 
193                 Both input and output are detached, and are no longer usable.
194 
195         ***********************************************************************/
196 
197         void close ()
198         {
199                 this.detach();
200         }
201 
202         /***********************************************************************
203 
204                 Throw an IOException, with the provided message.
205 
206         ***********************************************************************/
207 
208         final void error (const(char[]) msg)
209         {
210                 throw new IOException (msg.idup);
211         }
212 
213         /***********************************************************************
214 
215                 Return the input stream.
216 
217         ***********************************************************************/
218 
219         @property final InputStream input ()
220         {
221                 return this;
222         }
223 
224         /***********************************************************************
225 
226                 Return the output stream.
227 
228         ***********************************************************************/
229 
230         @property final OutputStream output ()
231         {
232                 return this;
233         }
234 
235         /***********************************************************************
236 
237                 Emit fixed-length content from 'src' into this conduit,
238                 throwing an IOException upon Eof.
239 
240         ***********************************************************************/
241 
242         final Conduit put (void[] src)
243         {
244                 put (src, this);
245                 return this;
246         }
247 
248         /***********************************************************************
249 
250                 Consume fixed-length content into 'dst' from this conduit,
251                 throwing an IOException upon Eof.
252 
253         ***********************************************************************/
254 
255         final Conduit get (void[] dst)
256         {
257                get (dst, this);
258                return this;
259         }
260 
261         /***********************************************************************
262 
263                 Rewind to beginning of file.
264 
265         ***********************************************************************/
266 
267         final Conduit rewind ()
268         {
269                 seek (0);
270                 return this;
271         }
272 
273         /***********************************************************************
274 
275                 Transfer the content of another conduit to this one. Returns
276                 the dst OutputStream, or throws IOException on failure.
277 
278         ***********************************************************************/
279 
280         OutputStream copy (InputStream src, size_t max = -1)
281         {
282                 transfer (src, this, max);
283                 return this;
284         }
285 
286         /***********************************************************************
287 
288                 Seek on this stream. Source conduits that don't support
289                 seeking will throw an IOException.
290 
291         ***********************************************************************/
292 
293         long seek (long offset, Anchor anchor = Anchor.Begin)
294         {
295                 error (this.toString() ~ " does not support seek requests");
296                 return 0;
297         }
298 
299         /***********************************************************************
300 
301                 Load text from a stream, and return them all in an
302                 array.
303 
304                 Returns an array representing the content, and throws
305                 IOException on error.
306 
307         ***********************************************************************/
308 
309         char[] text(T=char) (size_t max = -1)
310         {
311                 return cast(T[]) load (max);
312         }
313 
314         /***********************************************************************
315 
316                 Load the bits from a stream, and return them all in an
317                 array. The dst array can be provided as an option, which
318                 will be expanded as necessary to consume the input.
319 
320                 Returns an array representing the content, and throws
321                 IOException on error.
322 
323         ***********************************************************************/
324 
325         void[] load (size_t max = -1)
326         {
327                 return load (this, max);
328         }
329 
330         /***********************************************************************
331 
332                 Load the bits from a stream, and return them all in an
333                 array. The dst array can be provided as an option, which
334                 will be expanded as necessary to consume input.
335 
336                 Returns an array representing the content, and throws
337                 IOException on error.
338 
339         ***********************************************************************/
340 
341         static void[] load (InputStream src, size_t max=-1)
342         {
343                 void[]  dst;
344                 size_t  i,
345                         len,
346                         chunk;
347 
348                 if (max != -1)
349                     chunk = max;
350                 else
351                    chunk = src.conduit.bufferSize;
352 
353                 while (len < max)
354                       {
355                       if (dst.length - len is 0)
356                           dst.length = len + chunk;
357 
358                       if ((i = src.read (dst[len .. $])) is Eof)
359                            break;
360                       len += i;
361                       }
362 
363                 return dst [0 .. len];
364         }
365 
366         /***********************************************************************
367 
368                 Emit fixed-length content from 'src' into 'output',
369                 throwing an IOException upon Eof.
370 
371         ***********************************************************************/
372 
373         static void put (void[] src, OutputStream output)
374         {
375                 while (src.length)
376                       {
377                       auto i = output.write (src);
378                       if (i is Eof)
379                           output.conduit.error ("Conduit.put :: eof while writing");
380                       src = src [i..$];
381                       }
382         }
383 
384         /***********************************************************************
385 
386                 Consume fixed-length content into 'dst' from 'input',
387                 throwing an IOException upon Eof.
388 
389         ***********************************************************************/
390 
391         static void get (void[] dst, InputStream input)
392         {
393                 while (dst.length)
394                       {
395                       auto i = input.read (dst);
396                       if (i is Eof)
397                           input.conduit.error ("Conduit.get :: eof while reading");
398                       dst = dst [i..$];
399                       }
400         }
401 
402         /***********************************************************************
403 
404                 Low-level data transfer, where max represents the maximum
405                 number of bytes to transfer.
406 
407                 Returns Eof on failure, number of bytes copied on success.
408 
409         ***********************************************************************/
410 
411         static size_t transfer (InputStream src, OutputStream dst, size_t max=-1)
412         {
413                 byte[8192] tmp;
414                 size_t     done;
415 
416                 while (max)
417                       {
418                       auto len = max;
419                       if (len > tmp.length)
420                           len = tmp.length;
421 
422                       if ((len = src.read(tmp[0 .. len])) is Eof)
423                            max = 0;
424                       else
425                          {
426                          max -= len;
427                          done += len;
428                          auto p = tmp.ptr;
429                          for (size_t j=0; len > 0; len -= j, p += j)
430                               if ((j = dst.write (p[0 .. len])) is Eof)
431                                    return Eof;
432                          }
433                       }
434 
435                 return done;
436         }
437 }
438 
439 
440 /*******************************************************************************
441 
442         Base class for input stream filtering. The provided source stream
443         should generally never be null, though some filters have a need to
444         set this lazily.
445 
446 *******************************************************************************/
447 
448 class InputFilter : InputStream
449 {
450         protected InputStream source;
451 
452         /***********************************************************************
453 
454                 Attach to the provided stream. The provided source stream
455                 should generally never be null, though some filters have a
456                 need to set this lazily.
457 
458         ***********************************************************************/
459 
460         this (InputStream source)
461         {
462                 this.source = source;
463         }
464 
465         /***********************************************************************
466 
467                 Return the hosting conduit.
468 
469         ***********************************************************************/
470 
471         @property IConduit conduit ()
472         {
473                 return source.conduit;
474         }
475 
476         /***********************************************************************
477 
478                 Read from conduit into a target array. The provided dst
479                 will be populated with content from the conduit.
480 
481                 Returns the number of bytes read, which may be less than
482                 requested in dst. Eof is returned whenever an end-of-flow
483                 condition arises.
484 
485         ***********************************************************************/
486 
487         size_t read (void[] dst)
488         {
489                 return source.read (dst);
490         }
491 
492         /***********************************************************************
493 
494                 Load the bits from a stream, and return them all in an
495                 array. The dst array can be provided as an option, which
496                 will be expanded as necessary to consume the input.
497 
498                 Returns an array representing the content, and throws
499                 IOException on error.
500 
501         ***********************************************************************/
502 
503         void[] load (size_t max = -1)
504         {
505                 return Conduit.load (this, max);
506         }
507 
508         /***********************************************************************
509 
510                 Clear any buffered content.
511 
512         ***********************************************************************/
513 
514         IOStream flush ()
515         {
516                 source.flush();
517                 return this;
518         }
519 
520         /***********************************************************************
521 
522                 Seek on this stream. Target conduits that don't support
523                 seeking will throw an IOException.
524 
525         ***********************************************************************/
526 
527         long seek (long offset, Anchor anchor = Anchor.Begin)
528         {
529                 return source.seek (offset, anchor);
530         }
531 
532         /***********************************************************************
533 
534                 Return the upstream host of this filter.
535 
536         ***********************************************************************/
537 
538         @property InputStream input ()
539         {
540                 return source;
541         }
542 
543         /***********************************************************************
544 
545                 Close the input.
546 
547         ***********************************************************************/
548 
549         void close ()
550         {
551                 source.close();
552         }
553 }
554 
555 
556 /*******************************************************************************
557 
558         Base class for output stream filtering. The provided sink stream
559         should generally never be null, though some filters have a need to
560         set this lazily.
561 
562 *******************************************************************************/
563 
564 class OutputFilter : OutputStream
565 {
566         protected OutputStream sink;
567 
568         /***********************************************************************
569 
570                 Attach to the provided stream.
571 
572         ***********************************************************************/
573 
574         this (OutputStream sink)
575         {
576                 this.sink = sink;
577         }
578 
579         /***********************************************************************
580 
581                 Return the hosting conduit.
582 
583         ***********************************************************************/
584 
585         @property IConduit conduit ()
586         {
587                 return sink.conduit;
588         }
589 
590         /***********************************************************************
591 
592                 Write to conduit from a source array. The provided src
593                 content will be written to the conduit.
594 
595                 Returns the number of bytes written from src, which may
596                 be less than the quantity provided. Eof is returned when
597                 an end-of-flow condition arises.
598 
599         ***********************************************************************/
600 
601         size_t write (const(void)[] src)
602         {
603                 return sink.write (src);
604         }
605 
606         /***********************************************************************
607 
608                 Transfer the content of another conduit to this one. Returns
609                 a reference to this class, or throws IOException on failure.
610 
611         ***********************************************************************/
612 
613         OutputStream copy (InputStream src, size_t max = -1)
614         {
615                 Conduit.transfer (src, this, max);
616                 return this;
617         }
618 
619         /***********************************************************************
620 
621                 Emit/purge buffered content.
622 
623         ***********************************************************************/
624 
625         IOStream flush ()
626         {
627                 sink.flush();
628                 return this;
629         }
630 
631         /***********************************************************************
632 
633                 Seek on this stream. Target conduits that don't support
634                 seeking will throw an IOException.
635 
636         ***********************************************************************/
637 
638         long seek (long offset, Anchor anchor = Anchor.Begin)
639         {
640                 return sink.seek (offset, anchor);
641         }
642 
643         /***********************************************************************
644 
645                 Return the upstream host of this filter.
646 
647         ***********************************************************************/
648 
649         @property OutputStream output ()
650         {
651                 return sink;
652         }
653 
654         /***********************************************************************
655 
656                 Close the output.
657 
658         ***********************************************************************/
659 
660         void close ()
661         {
662                 sink.close();
663         }
664 }