1 /******************************************************************************* 2 3 copyright: Copyright (c) 2004 Kris Bell. All rights reserved 4 5 license: BSD style: $(LICENSE) 6 7 version: Mar 2004: Initial release 8 9 author: Kris 10 11 *******************************************************************************/ 12 13 module tango.io.device.Conduit; 14 15 private import tango.core.Thread, 16 tango.core.Exception; 17 18 public import tango.io.model.IConduit; 19 20 /******************************************************************************* 21 22 Conduit abstract base-class, implementing interface IConduit. 23 Only the conduit-specific read(), write(), detach() and 24 bufferSize() need to be implemented for a concrete conduit 25 implementation. See File for an example. 26 27 Conduits provide virtualized access to external content, and 28 represent things like files or Internet connections. Conduits 29 expose a pair of streams, are modelled by tango.io.model.IConduit, 30 and are implemented via classes such as File & SocketConduit. 31 32 Additional kinds of conduit are easy to construct: either subclass 33 tango.io.device.Conduit, or implement tango.io.model.IConduit. A 34 conduit typically reads and writes via a Buffer in large chunks, 35 typically the entire buffer. Alternatively, one can invoke method 36 read(dst[]) and/or write(src[]) directly. 37 38 *******************************************************************************/ 39 40 class Conduit : IConduit 41 { 42 version(TangoRuntime) 43 { 44 protected Fiber.Scheduler scheduler; // optional scheduler 45 } 46 private uint duration = -1; // scheduling timeout 47 48 /*********************************************************************** 49 50 Test for asynchronous capability. This will be eligable 51 for scheduling where (a) it is created within a fiber and 52 (b) there is a scheduler attached to the fiber at the time 53 this is invoked. 54 55 Note that fibers may schedule just one outstanding I/O 56 request at a time. 57 58 ***********************************************************************/ 59 60 this () 61 { 62 auto f = Fiber.getThis(); 63 version(TangoRuntime) 64 { 65 if (f) 66 scheduler = f.event.scheduler; 67 } 68 } 69 70 /*********************************************************************** 71 72 Clean up when collected. See method detach(). 73 74 ***********************************************************************/ 75 76 ~this () 77 { 78 detach(); 79 } 80 81 /*********************************************************************** 82 83 Return the name of this conduit. 84 85 ***********************************************************************/ 86 87 abstract override string toString (); 88 89 /*********************************************************************** 90 91 Return a preferred size for buffering conduit I/O. 92 93 ***********************************************************************/ 94 95 abstract const size_t bufferSize (); 96 97 /*********************************************************************** 98 99 Read from conduit into a target array. The provided dst 100 will be populated with content from the conduit. 101 102 Returns the number of bytes read, which may be less than 103 requested in dst. Eof is returned whenever an end-of-flow 104 condition arises. 105 106 ***********************************************************************/ 107 108 abstract size_t read (void[] dst); 109 110 /*********************************************************************** 111 112 Write to conduit from a source array. The provided src 113 content will be written to the conduit. 114 115 Returns the number of bytes written from src, which may 116 be less than the quantity provided. Eof is returned when 117 an end-of-flow condition arises. 118 119 ***********************************************************************/ 120 121 abstract size_t write (const(void)[] src); 122 123 /*********************************************************************** 124 125 Disconnect this conduit. Note that this may be invoked 126 both explicitly by the user, and implicitly by the GC. 127 Be sure to manage multiple detachment requests correctly: 128 set a flag, or sentinel value as necessary. 129 130 ***********************************************************************/ 131 132 abstract void detach (); 133 134 /*********************************************************************** 135 136 Set the active timeout period for IO calls (in milliseconds.) 137 138 ***********************************************************************/ 139 140 @property final void timeout (uint millisec) 141 { 142 duration = millisec; 143 } 144 145 /*********************************************************************** 146 147 Get the active timeout period for IO calls (in milliseconds.) 148 149 ***********************************************************************/ 150 151 @property final const uint timeout () 152 { 153 return duration; 154 } 155 156 /*********************************************************************** 157 158 Is the conduit alive? Default behaviour returns true. 159 160 ***********************************************************************/ 161 162 @property const bool isAlive () 163 { 164 return true; 165 } 166 167 /*********************************************************************** 168 169 Return the host. This is part of the Stream interface. 170 171 ***********************************************************************/ 172 173 @property final IConduit conduit () 174 { 175 return this; 176 } 177 178 /*********************************************************************** 179 180 Emit buffered output or reset buffered input. 181 182 ***********************************************************************/ 183 184 IOStream flush () 185 { 186 return this; 187 } 188 189 /*********************************************************************** 190 191 Close this conduit. 192 193 Both input and output are detached, and are no longer usable. 194 195 ***********************************************************************/ 196 197 void close () 198 { 199 this.detach(); 200 } 201 202 /*********************************************************************** 203 204 Throw an IOException, with the provided message. 205 206 ***********************************************************************/ 207 208 final void error (const(char[]) msg) 209 { 210 throw new IOException (msg.idup); 211 } 212 213 /*********************************************************************** 214 215 Return the input stream. 216 217 ***********************************************************************/ 218 219 @property final InputStream input () 220 { 221 return this; 222 } 223 224 /*********************************************************************** 225 226 Return the output stream. 227 228 ***********************************************************************/ 229 230 @property final OutputStream output () 231 { 232 return this; 233 } 234 235 /*********************************************************************** 236 237 Emit fixed-length content from 'src' into this conduit, 238 throwing an IOException upon Eof. 239 240 ***********************************************************************/ 241 242 final Conduit put (void[] src) 243 { 244 put (src, this); 245 return this; 246 } 247 248 /*********************************************************************** 249 250 Consume fixed-length content into 'dst' from this conduit, 251 throwing an IOException upon Eof. 252 253 ***********************************************************************/ 254 255 final Conduit get (void[] dst) 256 { 257 get (dst, this); 258 return this; 259 } 260 261 /*********************************************************************** 262 263 Rewind to beginning of file. 264 265 ***********************************************************************/ 266 267 final Conduit rewind () 268 { 269 seek (0); 270 return this; 271 } 272 273 /*********************************************************************** 274 275 Transfer the content of another conduit to this one. Returns 276 the dst OutputStream, or throws IOException on failure. 277 278 ***********************************************************************/ 279 280 OutputStream copy (InputStream src, size_t max = -1) 281 { 282 transfer (src, this, max); 283 return this; 284 } 285 286 /*********************************************************************** 287 288 Seek on this stream. Source conduits that don't support 289 seeking will throw an IOException. 290 291 ***********************************************************************/ 292 293 long seek (long offset, Anchor anchor = Anchor.Begin) 294 { 295 error (this.toString() ~ " does not support seek requests"); 296 return 0; 297 } 298 299 /*********************************************************************** 300 301 Load text from a stream, and return them all in an 302 array. 303 304 Returns an array representing the content, and throws 305 IOException on error. 306 307 ***********************************************************************/ 308 309 char[] text(T=char) (size_t max = -1) 310 { 311 return cast(T[]) load (max); 312 } 313 314 /*********************************************************************** 315 316 Load the bits from a stream, and return them all in an 317 array. The dst array can be provided as an option, which 318 will be expanded as necessary to consume the input. 319 320 Returns an array representing the content, and throws 321 IOException on error. 322 323 ***********************************************************************/ 324 325 void[] load (size_t max = -1) 326 { 327 return load (this, max); 328 } 329 330 /*********************************************************************** 331 332 Load the bits from a stream, and return them all in an 333 array. The dst array can be provided as an option, which 334 will be expanded as necessary to consume input. 335 336 Returns an array representing the content, and throws 337 IOException on error. 338 339 ***********************************************************************/ 340 341 static void[] load (InputStream src, size_t max=-1) 342 { 343 void[] dst; 344 size_t i, 345 len, 346 chunk; 347 348 if (max != -1) 349 chunk = max; 350 else 351 chunk = src.conduit.bufferSize; 352 353 while (len < max) 354 { 355 if (dst.length - len is 0) 356 dst.length = len + chunk; 357 358 if ((i = src.read (dst[len .. $])) is Eof) 359 break; 360 len += i; 361 } 362 363 return dst [0 .. len]; 364 } 365 366 /*********************************************************************** 367 368 Emit fixed-length content from 'src' into 'output', 369 throwing an IOException upon Eof. 370 371 ***********************************************************************/ 372 373 static void put (void[] src, OutputStream output) 374 { 375 while (src.length) 376 { 377 auto i = output.write (src); 378 if (i is Eof) 379 output.conduit.error ("Conduit.put :: eof while writing"); 380 src = src [i..$]; 381 } 382 } 383 384 /*********************************************************************** 385 386 Consume fixed-length content into 'dst' from 'input', 387 throwing an IOException upon Eof. 388 389 ***********************************************************************/ 390 391 static void get (void[] dst, InputStream input) 392 { 393 while (dst.length) 394 { 395 auto i = input.read (dst); 396 if (i is Eof) 397 input.conduit.error ("Conduit.get :: eof while reading"); 398 dst = dst [i..$]; 399 } 400 } 401 402 /*********************************************************************** 403 404 Low-level data transfer, where max represents the maximum 405 number of bytes to transfer. 406 407 Returns Eof on failure, number of bytes copied on success. 408 409 ***********************************************************************/ 410 411 static size_t transfer (InputStream src, OutputStream dst, size_t max=-1) 412 { 413 byte[8192] tmp; 414 size_t done; 415 416 while (max) 417 { 418 auto len = max; 419 if (len > tmp.length) 420 len = tmp.length; 421 422 if ((len = src.read(tmp[0 .. len])) is Eof) 423 max = 0; 424 else 425 { 426 max -= len; 427 done += len; 428 auto p = tmp.ptr; 429 for (size_t j=0; len > 0; len -= j, p += j) 430 if ((j = dst.write (p[0 .. len])) is Eof) 431 return Eof; 432 } 433 } 434 435 return done; 436 } 437 } 438 439 440 /******************************************************************************* 441 442 Base class for input stream filtering. The provided source stream 443 should generally never be null, though some filters have a need to 444 set this lazily. 445 446 *******************************************************************************/ 447 448 class InputFilter : InputStream 449 { 450 protected InputStream source; 451 452 /*********************************************************************** 453 454 Attach to the provided stream. The provided source stream 455 should generally never be null, though some filters have a 456 need to set this lazily. 457 458 ***********************************************************************/ 459 460 this (InputStream source) 461 { 462 this.source = source; 463 } 464 465 /*********************************************************************** 466 467 Return the hosting conduit. 468 469 ***********************************************************************/ 470 471 @property IConduit conduit () 472 { 473 return source.conduit; 474 } 475 476 /*********************************************************************** 477 478 Read from conduit into a target array. The provided dst 479 will be populated with content from the conduit. 480 481 Returns the number of bytes read, which may be less than 482 requested in dst. Eof is returned whenever an end-of-flow 483 condition arises. 484 485 ***********************************************************************/ 486 487 size_t read (void[] dst) 488 { 489 return source.read (dst); 490 } 491 492 /*********************************************************************** 493 494 Load the bits from a stream, and return them all in an 495 array. The dst array can be provided as an option, which 496 will be expanded as necessary to consume the input. 497 498 Returns an array representing the content, and throws 499 IOException on error. 500 501 ***********************************************************************/ 502 503 void[] load (size_t max = -1) 504 { 505 return Conduit.load (this, max); 506 } 507 508 /*********************************************************************** 509 510 Clear any buffered content. 511 512 ***********************************************************************/ 513 514 IOStream flush () 515 { 516 source.flush(); 517 return this; 518 } 519 520 /*********************************************************************** 521 522 Seek on this stream. Target conduits that don't support 523 seeking will throw an IOException. 524 525 ***********************************************************************/ 526 527 long seek (long offset, Anchor anchor = Anchor.Begin) 528 { 529 return source.seek (offset, anchor); 530 } 531 532 /*********************************************************************** 533 534 Return the upstream host of this filter. 535 536 ***********************************************************************/ 537 538 @property InputStream input () 539 { 540 return source; 541 } 542 543 /*********************************************************************** 544 545 Close the input. 546 547 ***********************************************************************/ 548 549 void close () 550 { 551 source.close(); 552 } 553 } 554 555 556 /******************************************************************************* 557 558 Base class for output stream filtering. The provided sink stream 559 should generally never be null, though some filters have a need to 560 set this lazily. 561 562 *******************************************************************************/ 563 564 class OutputFilter : OutputStream 565 { 566 protected OutputStream sink; 567 568 /*********************************************************************** 569 570 Attach to the provided stream. 571 572 ***********************************************************************/ 573 574 this (OutputStream sink) 575 { 576 this.sink = sink; 577 } 578 579 /*********************************************************************** 580 581 Return the hosting conduit. 582 583 ***********************************************************************/ 584 585 @property IConduit conduit () 586 { 587 return sink.conduit; 588 } 589 590 /*********************************************************************** 591 592 Write to conduit from a source array. The provided src 593 content will be written to the conduit. 594 595 Returns the number of bytes written from src, which may 596 be less than the quantity provided. Eof is returned when 597 an end-of-flow condition arises. 598 599 ***********************************************************************/ 600 601 size_t write (const(void)[] src) 602 { 603 return sink.write (src); 604 } 605 606 /*********************************************************************** 607 608 Transfer the content of another conduit to this one. Returns 609 a reference to this class, or throws IOException on failure. 610 611 ***********************************************************************/ 612 613 OutputStream copy (InputStream src, size_t max = -1) 614 { 615 Conduit.transfer (src, this, max); 616 return this; 617 } 618 619 /*********************************************************************** 620 621 Emit/purge buffered content. 622 623 ***********************************************************************/ 624 625 IOStream flush () 626 { 627 sink.flush(); 628 return this; 629 } 630 631 /*********************************************************************** 632 633 Seek on this stream. Target conduits that don't support 634 seeking will throw an IOException. 635 636 ***********************************************************************/ 637 638 long seek (long offset, Anchor anchor = Anchor.Begin) 639 { 640 return sink.seek (offset, anchor); 641 } 642 643 /*********************************************************************** 644 645 Return the upstream host of this filter. 646 647 ***********************************************************************/ 648 649 @property OutputStream output () 650 { 651 return sink; 652 } 653 654 /*********************************************************************** 655 656 Close the output. 657 658 ***********************************************************************/ 659 660 void close () 661 { 662 sink.close(); 663 } 664 }