1 /******************************************************************************* 2 3 copyright: Copyright (C) 2007 Daniel Keep. All rights reserved. 4 5 license: BSD style: $(LICENSE) 6 7 version: Initial release: July 2007 8 9 author: Daniel Keep 10 11 *******************************************************************************/ 12 13 module tango.io.stream.Bzip; 14 15 private import tango.util.compress.c.bzlib; 16 17 private import tango.core.Exception : IOException; 18 19 private import tango.io.device.Conduit : InputFilter, OutputFilter; 20 21 private import tango.io.model.IConduit : InputStream, OutputStream, IConduit; 22 23 private 24 { 25 /* This constant controls the size of the input/output buffers we use 26 * internally. There's no particular reason to pick this size. It might 27 * be an idea to run some benchmarks to work out what a good number is. 28 */ 29 private enum { BUFFER_SIZE = 4*1024 }; 30 31 private enum { DEFAULT_BLOCKSIZE = 9 }; 32 private enum { DEFAULT_WORKFACTOR = 0 }; 33 } 34 35 /******************************************************************************* 36 37 This output filter can be used to perform compression of data into a bzip2 38 stream. 39 40 *******************************************************************************/ 41 42 class BzipOutput : OutputFilter 43 { 44 /*************************************************************************** 45 46 This enumeration represents several pre-defined compression block 47 sizes, measured in hundreds of kilobytes. See the documentation for 48 the BzipOutput class' constructor for more details. 49 50 ***************************************************************************/ 51 52 enum BlockSize : int 53 { 54 Normal = 9, 55 Fast = 1, 56 Best = 9, 57 } 58 59 private 60 { 61 bool bzs_valid = false; 62 bz_stream bzs; 63 ubyte[] out_chunk; 64 size_t _written = 0; 65 } 66 67 /*************************************************************************** 68 69 Constructs a new bzip2 compression filter. You need to pass in the 70 stream that the compression filter will write to. If you are using 71 this filter with a conduit, the idiom to use is: 72 73 --- 74 auto output = new BzipOutput(myConduit.output); 75 output.write(myContent); 76 --- 77 78 blockSize relates to the size of the window bzip2 uses when 79 compressing data and determines how much memory is required to 80 decompress a stream. It is measured in hundreds of kilobytes. 81 82 ccording to the bzip2 documentation, there is no dramatic difference 83 between the various block sizes, so the default should suffice in most 84 cases. 85 86 BlockSize.Normal (the default) is the same as BlockSize.Best 87 (or 9). The blockSize may be any integer between 1 and 9 inclusive. 88 89 ***************************************************************************/ 90 91 this(OutputStream stream, int blockSize = BlockSize.Normal) 92 { 93 init(stream, blockSize); 94 scope(failure) kill_bzs(); 95 96 super(stream); 97 out_chunk = new ubyte[BUFFER_SIZE]; 98 } 99 100 /* 101 * This method performs initialisation for the stream. Note that this may 102 * be called more than once for an instance, provided the instance is 103 * either new or as part of a call to reset. 104 */ 105 private void init(OutputStream stream, int blockSize) 106 { 107 if( blockSize < 1 || blockSize > 9 ) 108 throw new BzipException("bzip2 block size must be between" 109 " 1 and 9"); 110 111 auto ret = BZ2_bzCompressInit(&bzs, blockSize, 0, DEFAULT_WORKFACTOR); 112 if( ret != BZ_OK ) 113 throw new BzipException(ret); 114 115 bzs_valid = true; 116 } 117 118 ~this() 119 { 120 if( bzs_valid ) 121 kill_bzs(); 122 } 123 124 /*************************************************************************** 125 126 Resets and re-initialises this instance. 127 128 If you are creating compression streams inside a loop, you may wish to 129 use this method to re-use a single instance. This prevents the 130 potentially costly re-allocation of internal buffers. 131 132 The stream must have already been closed before calling reset. 133 134 ***************************************************************************/ 135 136 void reset(OutputStream stream, int blockSize = BlockSize.Normal) 137 { 138 // If the stream is still valid, bail. 139 if( bzs_valid ) 140 throw new BzipStillOpenException; 141 142 init(stream, blockSize); 143 } 144 145 /*************************************************************************** 146 147 Compresses the given data to the underlying conduit. 148 149 Returns the number of bytes from src that were compressed, which may 150 be less than given. 151 152 ***************************************************************************/ 153 154 override size_t write(const(void)[] src) 155 { 156 check_valid(); 157 scope(failure) kill_bzs(); 158 159 bzs.avail_in = cast(uint)src.length; 160 bzs.next_in = cast(ubyte*)src.ptr; 161 162 do 163 { 164 bzs.avail_out = cast(uint)out_chunk.length; 165 bzs.next_out = out_chunk.ptr; 166 167 auto ret = BZ2_bzCompress(&bzs, BZ_RUN); 168 if( ret != BZ_RUN_OK ) 169 throw new BzipException(ret); 170 171 // Push the compressed bytes out to the stream, until it's either 172 // written them all, or choked. 173 auto have = out_chunk.length-bzs.avail_out; 174 auto out_buffer = out_chunk[0..have]; 175 do 176 { 177 auto w = sink.write(out_buffer); 178 if( w == IConduit.Eof ) 179 return w; 180 181 out_buffer = out_buffer[w..$]; 182 _written += w; 183 } 184 while( out_buffer.length > 0 ); 185 } 186 // Loop while we are still using up the whole output buffer 187 while( bzs.avail_out == 0 ); 188 189 assert( bzs.avail_in == 0, "failed to compress all provided data" ); 190 191 return src.length; 192 } 193 194 /*************************************************************************** 195 196 This read-only property returns the number of compressed bytes that 197 have been written to the underlying stream. Following a call to 198 either close or commit, this will contain the total compressed size of 199 the input data stream. 200 201 ***************************************************************************/ 202 203 size_t written() 204 { 205 return _written; 206 } 207 208 /*************************************************************************** 209 210 Close the compression stream. This will cause any buffered content to 211 be committed to the underlying stream. 212 213 ***************************************************************************/ 214 215 override void close() 216 { 217 if( bzs_valid ) commit(); 218 super.close(); 219 } 220 221 /*************************************************************************** 222 223 Purge any buffered content. Calling this will implicitly end the 224 bzip2 stream, so it should not be called until you are finished 225 compressing data. Any calls to either write or commit after a 226 compression filter has been committed will throw an exception. 227 228 The only difference between calling this method and calling close is 229 that the underlying stream will not be closed. 230 231 ***************************************************************************/ 232 233 void commit() 234 { 235 check_valid(); 236 scope(failure) kill_bzs(); 237 238 bzs.avail_in = 0; 239 bzs.next_in = null; 240 241 bool finished = false; 242 243 do 244 { 245 bzs.avail_out = cast(uint)out_chunk.length; 246 bzs.next_out = out_chunk.ptr; 247 248 auto ret = BZ2_bzCompress(&bzs, BZ_FINISH); 249 switch( ret ) 250 { 251 case BZ_FINISH_OK: 252 break; 253 254 case BZ_STREAM_END: 255 finished = true; 256 break; 257 258 default: 259 throw new BzipException(ret); 260 } 261 262 auto have = out_chunk.length - bzs.avail_out; 263 auto out_buffer = out_chunk[0..have]; 264 if( have > 0 ) 265 { 266 do 267 { 268 auto w = sink.write(out_buffer); 269 if( w == IConduit.Eof ) 270 return; 271 272 out_buffer = out_buffer[w..$]; 273 _written += w; 274 } 275 while( out_buffer.length > 0 ); 276 } 277 } 278 while( !finished ); 279 280 kill_bzs(); 281 } 282 283 // Disable seeking 284 override long seek(long offset, Anchor anchor = Anchor.Begin) 285 { 286 throw new IOException("BzipOutput does not support seek requests"); 287 } 288 289 // This function kills the stream: it deallocates the internal state, and 290 // unsets the bzs_valid flag. 291 private void kill_bzs() 292 { 293 check_valid(); 294 295 BZ2_bzCompressEnd(&bzs); 296 bzs_valid = false; 297 } 298 299 // Asserts that the stream is still valid and usable (except that this 300 // check doesn't get elided with -release). 301 private void check_valid() 302 { 303 if( !bzs_valid ) 304 throw new BzipClosedException; 305 } 306 } 307 308 /******************************************************************************* 309 310 This input filter can be used to perform decompression of bzip2 streams. 311 312 *******************************************************************************/ 313 314 class BzipInput : InputFilter 315 { 316 private 317 { 318 bool bzs_valid = false; 319 bz_stream bzs; 320 ubyte[] in_chunk; 321 } 322 323 /*************************************************************************** 324 325 Constructs a new bzip2 decompression filter. You need to pass in the 326 stream that the decompression filter will read from. If you are using 327 this filter with a conduit, the idiom to use is: 328 329 --- 330 auto input = new BzipInput(myConduit.input); 331 input.read(myContent); 332 --- 333 334 The small argument, if set to true, instructs bzip2 to perform 335 decompression using half the regular amount of memory, at the cost of 336 running at half speed. 337 338 ***************************************************************************/ 339 340 this(InputStream stream, bool small=false) 341 { 342 init(stream, small); 343 scope(failure) kill_bzs(); 344 345 super(stream); 346 in_chunk = new ubyte[BUFFER_SIZE]; 347 } 348 349 /* 350 * This method performs initialisation for the stream. Note that this may 351 * be called more than once for an instance, provided the instance is 352 * either new or as part of a call to reset. 353 */ 354 private void init(InputStream stream, bool small) 355 { 356 auto ret = BZ2_bzDecompressInit(&bzs, 0, small?1:0); 357 if( ret != BZ_OK ) 358 throw new BzipException(ret); 359 360 bzs_valid = true; 361 } 362 363 ~this() 364 { 365 if( bzs_valid ) 366 kill_bzs(); 367 } 368 369 /*************************************************************************** 370 371 Resets and re-initialises this instance. 372 373 If you are creating compression streams inside a loop, you may wish to 374 use this method to re-use a single instance. This prevents the 375 potentially costly re-allocation of internal buffers. 376 377 The stream must have already been closed before calling reset. 378 379 ***************************************************************************/ 380 381 void reset(InputStream stream, bool small=false) 382 { 383 // If the stream is still valid, bail. 384 if( bzs_valid ) 385 throw new BzipStillOpenException; 386 387 init(stream, small); 388 } 389 390 /*************************************************************************** 391 392 Decompresses data from the underlying conduit into a target array. 393 394 Returns the number of bytes stored into dst, which may be less than 395 requested. 396 397 ***************************************************************************/ 398 399 override size_t read(void[] dst) 400 { 401 if( !bzs_valid ) 402 return IConduit.Eof; 403 404 scope(failure) kill_bzs(); 405 406 bool finished = false; 407 408 bzs.avail_out = cast(uint)dst.length; 409 bzs.next_out = cast(ubyte*)dst.ptr; 410 411 do 412 { 413 if( bzs.avail_in == 0 ) 414 { 415 auto len = source.read(in_chunk); 416 if( len == IConduit.Eof ) 417 return IConduit.Eof; 418 419 bzs.avail_in = cast(uint)len; 420 bzs.next_in = in_chunk.ptr; 421 } 422 423 auto ret = BZ2_bzDecompress(&bzs); 424 if( ret == BZ_STREAM_END ) 425 { 426 kill_bzs(); 427 finished = true; 428 } 429 else if( ret != BZ_OK ) 430 throw new BzipException(ret); 431 } 432 while( !finished && bzs.avail_out > 0 ); 433 434 return dst.length - bzs.avail_out; 435 } 436 437 /*************************************************************************** 438 439 Closes the compression stream. 440 441 ***************************************************************************/ 442 443 override void close() 444 { 445 check_valid(); 446 447 // Kill the stream. Don't deallocate the buffer since the user may 448 // yet reset the stream. 449 kill_bzs(); 450 super.close(); 451 } 452 453 // Disable seeking 454 override long seek(long offset, Anchor anchor = Anchor.Begin) 455 { 456 throw new IOException("BzipOutput does not support seek requests"); 457 } 458 459 // This function kills the stream: it deallocates the internal state, and 460 // unsets the bzs_valid flag. 461 private void kill_bzs() 462 { 463 check_valid(); 464 465 BZ2_bzDecompressEnd(&bzs); 466 bzs_valid = false; 467 } 468 469 // Asserts that the stream is still valid and usable (except that this 470 // check doesn't get elided with -release). 471 private void check_valid() 472 { 473 if( !bzs_valid ) 474 throw new BzipClosedException; 475 } 476 } 477 478 /******************************************************************************* 479 480 This exception is thrown when an error occurs in the underlying bzip2 481 library. 482 483 *******************************************************************************/ 484 485 class BzipException : IOException 486 { 487 this(in int code) 488 { 489 super(codeName(code)); 490 } 491 492 this(string msg) 493 { 494 super(msg); 495 } 496 497 private string codeName(in int code) 498 { 499 string name; 500 501 switch( code ) 502 { 503 case BZ_OK: name = "BZ_OK"; break; 504 case BZ_RUN_OK: name = "BZ_RUN_OK"; break; 505 case BZ_FLUSH_OK: name = "BZ_FLUSH_OK"; break; 506 case BZ_STREAM_END: name = "BZ_STREAM_END"; break; 507 case BZ_SEQUENCE_ERROR: name = "BZ_SEQUENCE_ERROR"; break; 508 case BZ_PARAM_ERROR: name = "BZ_PARAM_ERROR"; break; 509 case BZ_MEM_ERROR: name = "BZ_MEM_ERROR"; break; 510 case BZ_DATA_ERROR: name = "BZ_DATA_ERROR"; break; 511 case BZ_DATA_ERROR_MAGIC: name = "BZ_DATA_ERROR_MAGIC"; break; 512 case BZ_IO_ERROR: name = "BZ_IO_ERROR"; break; 513 case BZ_UNEXPECTED_EOF: name = "BZ_UNEXPECTED_EOF"; break; 514 case BZ_OUTBUFF_FULL: name = "BZ_OUTBUFF_FULL"; break; 515 case BZ_CONFIG_ERROR: name = "BZ_CONFIG_ERROR"; break; 516 default: name = "BZ_UNKNOWN"; 517 } 518 519 return name; 520 } 521 } 522 523 /******************************************************************************* 524 525 This exception is thrown if you attempt to perform a read, write or flush 526 operation on a closed bzip2 filter stream. This can occur if the input 527 stream has finished, or an output stream was flushed. 528 529 *******************************************************************************/ 530 531 class BzipClosedException : IOException 532 { 533 this() 534 { 535 super("cannot operate on closed bzip2 stream"); 536 } 537 } 538 539 /******************************************************************************* 540 541 This exception is thrown if you attempt to reset a compression stream that 542 is still open. You must either close or commit a stream before it can be 543 reset. 544 545 *******************************************************************************/ 546 547 class BzipStillOpenException : IOException 548 { 549 this() 550 { 551 super("cannot reset an open bzip2 stream"); 552 } 553 } 554 555 /* ***************************************************************************** 556 557 This section contains a simple unit test for this module. It is hidden 558 behind a version statement because it introduces additional dependencies. 559 560 ***************************************************************************** */ 561 562 debug(UnitTest): 563 564 import tango.io.device.Array : Array; 565 566 unittest 567 { 568 __gshared immutable immutable(char)[] message = 569 "All dwarfs are by nature dutiful, serious, literate, obedient " 570 "and thoughtful people whose only minor failing is a tendency, " 571 "after one drink, to rush at enemies screaming \"Arrrrrrgh!\" and " 572 "axing their legs off at the knee."; 573 574 __gshared immutable ubyte[] message_z = [ 575 0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26, 576 0x53, 0x59, 0x40, 0x98, 0xbe, 0xaa, 0x00, 0x00, 577 0x16, 0xd5, 0x80, 0x10, 0x00, 0x70, 0x05, 0x20, 578 0x00, 0x3f, 0xef, 0xde, 0xe0, 0x30, 0x00, 0xac, 579 0xd8, 0x8a, 0x3d, 0x34, 0x6a, 0x6d, 0x4c, 0x4f, 580 0x24, 0x31, 0x0d, 0x08, 0x98, 0x9b, 0x48, 0x9a, 581 0x7a, 0x80, 0x00, 0x06, 0xa6, 0xd2, 0xa7, 0xe9, 582 0xaa, 0x37, 0xa8, 0xd4, 0xf5, 0x3f, 0x54, 0x63, 583 0x51, 0xe9, 0x2d, 0x4b, 0x99, 0xe1, 0xcc, 0xca, 584 0xda, 0x75, 0x04, 0x42, 0x14, 0xc8, 0x6a, 0x8e, 585 0x23, 0xc1, 0x3e, 0xb1, 0x8a, 0x16, 0xd2, 0x55, 586 0x9a, 0x3e, 0x56, 0x1a, 0xb1, 0x83, 0x11, 0xa6, 587 0x50, 0x4f, 0xd3, 0xed, 0x21, 0x40, 0xaa, 0xd1, 588 0x95, 0x2c, 0xda, 0xcb, 0xb7, 0x0e, 0xce, 0x65, 589 0xfc, 0x63, 0xf2, 0x88, 0x5b, 0x36, 0xda, 0xf0, 590 0xf5, 0xd2, 0x9c, 0xe6, 0xf1, 0x87, 0x12, 0x87, 591 0xce, 0x56, 0x0c, 0xf5, 0x65, 0x4d, 0x2e, 0xd6, 592 0x27, 0x61, 0x2b, 0x74, 0xcd, 0x5e, 0x3b, 0x02, 593 0x42, 0x4e, 0x0b, 0x80, 0xa8, 0x70, 0x04, 0x48, 594 0xfb, 0x93, 0x4c, 0x41, 0xa8, 0x2a, 0xdf, 0xf2, 595 0x67, 0x37, 0x28, 0xad, 0x38, 0xd4, 0x5c, 0xd6, 596 0x34, 0x8b, 0x49, 0x5e, 0x90, 0xb2, 0x06, 0xce, 597 0x0a, 0x83, 0x29, 0x84, 0x20, 0xd7, 0x5f, 0xc5, 598 0xdc, 0x91, 0x4e, 0x14, 0x24, 0x10, 0x26, 0x2f, 599 0xaa, 0x80]; 600 601 scope cond = new Array(1024, 1024); 602 scope comp = new BzipOutput(cond); 603 comp.write(message); 604 comp.close(); 605 606 assert( comp.written() == message_z.length ); 607 608 assert( message_z == cast(ubyte[])(cond.slice()) ); 609 610 scope decomp = new BzipInput(cond); 611 auto buffer = new ubyte[256]; 612 buffer = buffer[0 .. decomp.read(buffer)]; 613 614 assert( cast(ubyte[])message == buffer ); 615 } 616