1 /*******************************************************************************
2 
3     copyright:  Copyright (C) 2007 Daniel Keep.  All rights reserved.
4 
5     license:    BSD style: $(LICENSE)
6 
7     version:    Initial release: July 2007
8 
9     author:     Daniel Keep
10 
11 *******************************************************************************/
12 
13 module tango.io.stream.Bzip;
14 
15 private import tango.util.compress.c.bzlib;
16 
17 private import tango.core.Exception : IOException;
18 
19 private import tango.io.device.Conduit : InputFilter, OutputFilter;
20 
21 private import tango.io.model.IConduit : InputStream, OutputStream, IConduit;
22 
23 private
24 {
25     /* This constant controls the size of the input/output buffers we use
26      * internally.  There's no particular reason to pick this size.  It might
27      * be an idea to run some benchmarks to work out what a good number is.
28      */
29     private enum { BUFFER_SIZE = 4*1024 };
30 
31     private enum { DEFAULT_BLOCKSIZE = 9 };
32     private enum { DEFAULT_WORKFACTOR = 0 };
33 }
34 
35 /*******************************************************************************
36   
37     This output filter can be used to perform compression of data into a bzip2
38     stream.
39 
40 *******************************************************************************/
41 
42 class BzipOutput : OutputFilter
43 {
44     /***************************************************************************
45 
46         This enumeration represents several pre-defined compression block
47         sizes, measured in hundreds of kilobytes.  See the documentation for
48         the BzipOutput class' constructor for more details.
49 
50     ***************************************************************************/
51 
52     enum BlockSize : int
53     {
54         Normal = 9,
55         Fast = 1,
56         Best = 9,
57     }
58 
59     private
60     {
61         bool bzs_valid = false;
62         bz_stream bzs;
63         ubyte[] out_chunk;
64         size_t _written = 0;
65     }
66 
67     /***************************************************************************
68 
69         Constructs a new bzip2 compression filter.  You need to pass in the
70         stream that the compression filter will write to.  If you are using
71         this filter with a conduit, the idiom to use is:
72 
73         ---
74         auto output = new BzipOutput(myConduit.output);
75         output.write(myContent);
76         ---
77 
78         blockSize relates to the size of the window bzip2 uses when
79         compressing data and determines how much memory is required to
80         decompress a stream.  It is measured in hundreds of kilobytes.
81         
82         ccording to the bzip2 documentation, there is no dramatic difference
83         between the various block sizes, so the default should suffice in most
84         cases.
85 
86         BlockSize.Normal (the default) is the same as BlockSize.Best
87         (or 9).  The blockSize may be any integer between 1 and 9 inclusive.
88 
89     ***************************************************************************/
90 
91     this(OutputStream stream, int blockSize = BlockSize.Normal)
92     {
93         init(stream, blockSize);
94         scope(failure) kill_bzs();
95 
96         super(stream);
97         out_chunk = new ubyte[BUFFER_SIZE];
98     }
99 
100     /*
101      * This method performs initialisation for the stream.  Note that this may
102      * be called more than once for an instance, provided the instance is
103      * either new or as part of a call to reset.
104      */
105     private void init(OutputStream stream, int blockSize)
106     {
107         if( blockSize < 1 || blockSize > 9 )
108             throw new BzipException("bzip2 block size must be between" ~
109                     " 1 and 9");
110 
111         auto ret = BZ2_bzCompressInit(&bzs, blockSize, 0, DEFAULT_WORKFACTOR);
112         if( ret != BZ_OK )
113             throw new BzipException(ret);
114 
115         bzs_valid = true;
116     }
117 
118     ~this()
119     {
120         if( bzs_valid )
121             kill_bzs();
122     }
123 
124     /***************************************************************************
125         
126         Resets and re-initialises this instance.
127 
128         If you are creating compression streams inside a loop, you may wish to
129         use this method to re-use a single instance.  This prevents the
130         potentially costly re-allocation of internal buffers.
131 
132         The stream must have already been closed before calling reset.
133 
134     ***************************************************************************/ 
135 
136     void reset(OutputStream stream, int blockSize = BlockSize.Normal)
137     {
138         // If the stream is still valid, bail.
139         if( bzs_valid )
140             throw new BzipStillOpenException;
141 
142         init(stream, blockSize);
143     }
144 
145     /***************************************************************************
146 
147         Compresses the given data to the underlying conduit.
148 
149         Returns the number of bytes from src that were compressed, which may
150         be less than given.
151 
152     ***************************************************************************/
153 
154     override size_t write(const(void)[] src)
155     {
156         check_valid();
157         scope(failure) kill_bzs();
158 
159         bzs.avail_in = cast(uint)src.length;
160         bzs.next_in = cast(ubyte*)src.ptr;
161 
162         do
163         {
164             bzs.avail_out = cast(uint)out_chunk.length;
165             bzs.next_out = out_chunk.ptr;
166 
167             auto ret = BZ2_bzCompress(&bzs, BZ_RUN);
168             if( ret != BZ_RUN_OK )
169                 throw new BzipException(ret);
170 
171             // Push the compressed bytes out to the stream, until it's either
172             // written them all, or choked.
173             auto have = out_chunk.length-bzs.avail_out;
174             auto out_buffer = out_chunk[0..have];
175             do
176             {
177                 auto w = sink.write(out_buffer);
178                 if( w == IConduit.Eof )
179                     return w;
180 
181                 out_buffer = out_buffer[w..$];
182                 _written += w;
183             }
184             while( out_buffer.length > 0 );
185         }
186         // Loop while we are still using up the whole output buffer
187         while( bzs.avail_out == 0 );
188 
189         assert( bzs.avail_in == 0, "failed to compress all provided data" );
190 
191         return src.length;
192     }
193 
194     /***************************************************************************
195 
196         This read-only property returns the number of compressed bytes that
197         have been written to the underlying stream.  Following a call to
198         either close or commit, this will contain the total compressed size of
199         the input data stream.
200 
201     ***************************************************************************/
202 
203     size_t written()
204     {
205         return _written;
206     }
207 
208     /***************************************************************************
209 
210         Close the compression stream.  This will cause any buffered content to
211         be committed to the underlying stream.
212 
213     ***************************************************************************/
214 
215     override void close()
216     {
217         if( bzs_valid ) commit();
218         super.close();
219     }
220 
221     /***************************************************************************
222 
223         Purge any buffered content.  Calling this will implicitly end the
224         bzip2 stream, so it should not be called until you are finished
225         compressing data.  Any calls to either write or commit after a
226         compression filter has been committed will throw an exception.
227 
228         The only difference between calling this method and calling close is
229         that the underlying stream will not be closed.
230 
231     ***************************************************************************/
232 
233     void commit()
234     {
235         check_valid();
236         scope(failure) kill_bzs();
237 
238         bzs.avail_in = 0;
239         bzs.next_in = null;
240 
241         bool finished = false;
242 
243         do
244         {
245             bzs.avail_out = cast(uint)out_chunk.length;
246             bzs.next_out = out_chunk.ptr;
247 
248             auto ret = BZ2_bzCompress(&bzs, BZ_FINISH);
249             switch( ret )
250             {
251                 case BZ_FINISH_OK:
252                     break;
253 
254                 case BZ_STREAM_END:
255                     finished = true;
256                     break;
257 
258                 default:
259                     throw new BzipException(ret);
260             }
261 
262             auto have = out_chunk.length - bzs.avail_out;
263             auto out_buffer = out_chunk[0..have];
264             if( have > 0 )
265             {
266                 do
267                 {
268                     auto w = sink.write(out_buffer);
269                     if( w == IConduit.Eof )
270                         return;
271 
272                     out_buffer = out_buffer[w..$];
273                     _written += w;
274                 }
275                 while( out_buffer.length > 0 );
276             }
277         }
278         while( !finished );
279 
280         kill_bzs();
281     }
282 
283     // Disable seeking
284     override long seek(long offset, Anchor anchor = Anchor.Begin)
285     {
286         throw new IOException("BzipOutput does not support seek requests");
287     }
288 
289     // This function kills the stream: it deallocates the internal state, and
290     // unsets the bzs_valid flag.
291     private void kill_bzs()
292     {
293         check_valid();
294 
295         BZ2_bzCompressEnd(&bzs);
296         bzs_valid = false;
297     }
298 
299     // Asserts that the stream is still valid and usable (except that this
300     // check doesn't get elided with -release).
301     private void check_valid()
302     {
303         if( !bzs_valid )
304             throw new BzipClosedException;
305     }
306 }
307 
308 /*******************************************************************************
309   
310     This input filter can be used to perform decompression of bzip2 streams.
311 
312 *******************************************************************************/
313 
314 class BzipInput : InputFilter
315 {
316     private
317     {
318         bool bzs_valid = false;
319         bz_stream bzs;
320         ubyte[] in_chunk;
321     }
322 
323     /***************************************************************************
324 
325         Constructs a new bzip2 decompression filter.  You need to pass in the
326         stream that the decompression filter will read from.  If you are using
327         this filter with a conduit, the idiom to use is:
328 
329         ---
330         auto input = new BzipInput(myConduit.input);
331         input.read(myContent);
332         ---
333 
334         The small argument, if set to true, instructs bzip2 to perform
335         decompression using half the regular amount of memory, at the cost of
336         running at half speed.
337 
338     ***************************************************************************/
339 
340     this(InputStream stream, bool small=false)
341     {
342         init(stream, small);
343         scope(failure) kill_bzs();
344 
345         super(stream);
346         in_chunk = new ubyte[BUFFER_SIZE];
347     }
348 
349     /*
350      * This method performs initialisation for the stream.  Note that this may
351      * be called more than once for an instance, provided the instance is
352      * either new or as part of a call to reset.
353      */
354     private void init(InputStream stream, bool small)
355     {
356         auto ret = BZ2_bzDecompressInit(&bzs, 0, small?1:0);
357         if( ret != BZ_OK )
358             throw new BzipException(ret);
359 
360         bzs_valid = true;
361     }
362 
363     ~this()
364     {
365         if( bzs_valid )
366             kill_bzs();
367     }
368 
369     /***************************************************************************
370         
371         Resets and re-initialises this instance.
372 
373         If you are creating compression streams inside a loop, you may wish to
374         use this method to re-use a single instance.  This prevents the
375         potentially costly re-allocation of internal buffers.
376 
377         The stream must have already been closed before calling reset.
378 
379     ***************************************************************************/ 
380 
381     void reset(InputStream stream, bool small=false)
382     {
383         // If the stream is still valid, bail.
384         if( bzs_valid )
385             throw new BzipStillOpenException;
386 
387         init(stream, small);
388     }
389 
390     /***************************************************************************
391 
392         Decompresses data from the underlying conduit into a target array.
393 
394         Returns the number of bytes stored into dst, which may be less than
395         requested.
396 
397     ***************************************************************************/ 
398 
399     override size_t read(void[] dst)
400     {
401         if( !bzs_valid )
402             return IConduit.Eof;
403 
404         scope(failure) kill_bzs();
405 
406         bool finished = false;
407 
408         bzs.avail_out = cast(uint)dst.length;
409         bzs.next_out = cast(ubyte*)dst.ptr;
410 
411         do
412         {
413             if( bzs.avail_in == 0 )
414             {
415                 auto len = source.read(in_chunk);
416                 if( len == IConduit.Eof )
417                     return IConduit.Eof;
418 
419                 bzs.avail_in = cast(uint)len;
420                 bzs.next_in = in_chunk.ptr;
421             }
422 
423             auto ret = BZ2_bzDecompress(&bzs);
424             if( ret == BZ_STREAM_END )
425             {
426                 kill_bzs();
427                 finished = true;
428             }
429             else if( ret != BZ_OK )
430                 throw new BzipException(ret);
431         }
432         while( !finished && bzs.avail_out > 0 );
433 
434         return dst.length - bzs.avail_out;
435     }
436 
437     /***************************************************************************
438 
439         Closes the compression stream.
440 
441     ***************************************************************************/ 
442 
443     override void close()
444     {
445         check_valid();
446 
447         // Kill the stream.  Don't deallocate the buffer since the user may
448         // yet reset the stream.
449         kill_bzs();
450         super.close();
451     }
452 
453     // Disable seeking
454     override long seek(long offset, Anchor anchor = Anchor.Begin)
455     {
456         throw new IOException("BzipOutput does not support seek requests");
457     }
458 
459     // This function kills the stream: it deallocates the internal state, and
460     // unsets the bzs_valid flag.
461     private void kill_bzs()
462     {
463         check_valid();
464 
465         BZ2_bzDecompressEnd(&bzs);
466         bzs_valid = false;
467     }
468 
469     // Asserts that the stream is still valid and usable (except that this
470     // check doesn't get elided with -release).
471     private void check_valid()
472     {
473         if( !bzs_valid )
474             throw new BzipClosedException;
475     }
476 }
477 
478 /*******************************************************************************
479   
480     This exception is thrown when an error occurs in the underlying bzip2
481     library.
482 
483 *******************************************************************************/
484 
485 class BzipException : IOException
486 {
487     this(in int code)
488     {
489         super(codeName(code));
490     }
491 
492     this(string msg)
493     {
494         super(msg);
495     }
496 
497     private string codeName(in int code)
498     {
499         string name;
500 
501         switch( code )
502         {
503             case BZ_OK:                 name = "BZ_OK";                 break;
504             case BZ_RUN_OK:             name = "BZ_RUN_OK";             break;
505             case BZ_FLUSH_OK:           name = "BZ_FLUSH_OK";           break;
506             case BZ_STREAM_END:         name = "BZ_STREAM_END";         break;
507             case BZ_SEQUENCE_ERROR:     name = "BZ_SEQUENCE_ERROR";     break;
508             case BZ_PARAM_ERROR:        name = "BZ_PARAM_ERROR";        break;
509             case BZ_MEM_ERROR:          name = "BZ_MEM_ERROR";          break;
510             case BZ_DATA_ERROR:         name = "BZ_DATA_ERROR";         break;
511             case BZ_DATA_ERROR_MAGIC:   name = "BZ_DATA_ERROR_MAGIC";   break;
512             case BZ_IO_ERROR:           name = "BZ_IO_ERROR";           break;
513             case BZ_UNEXPECTED_EOF:     name = "BZ_UNEXPECTED_EOF";     break;
514             case BZ_OUTBUFF_FULL:       name = "BZ_OUTBUFF_FULL";       break;
515             case BZ_CONFIG_ERROR:       name = "BZ_CONFIG_ERROR";       break;
516             default:                    name = "BZ_UNKNOWN";
517         }
518 
519         return name;
520     }
521 }
522 
523 /*******************************************************************************
524   
525     This exception is thrown if you attempt to perform a read, write or flush
526     operation on a closed bzip2 filter stream.  This can occur if the input
527     stream has finished, or an output stream was flushed.
528 
529 *******************************************************************************/
530 
531 class BzipClosedException : IOException
532 {
533     this()
534     {
535         super("cannot operate on closed bzip2 stream");
536     }
537 }
538 
539 /*******************************************************************************
540   
541     This exception is thrown if you attempt to reset a compression stream that
542     is still open.  You must either close or commit a stream before it can be
543     reset.
544 
545 *******************************************************************************/
546 
547 class BzipStillOpenException : IOException
548 {
549     this()
550     {
551         super("cannot reset an open bzip2 stream");
552     }
553 }
554 
555 /* *****************************************************************************
556 
557     This section contains a simple unit test for this module.  It is hidden
558     behind a version statement because it introduces additional dependencies.
559 
560 ***************************************************************************** */
561 
562 debug(UnitTest):
563 
564 import tango.io.device.Array : Array;
565 
566 unittest
567 {
568     __gshared immutable immutable(char)[] message =
569         "All dwarfs are by nature dutiful, serious, literate, obedient " ~
570         "and thoughtful people whose only minor failing is a tendency, " ~
571         "after one drink, to rush at enemies screaming \"Arrrrrrgh!\" and " ~
572         "axing their legs off at the knee.";
573 
574     __gshared immutable ubyte[] message_z = [
575         0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26,
576         0x53, 0x59, 0x40, 0x98, 0xbe, 0xaa, 0x00, 0x00,
577         0x16, 0xd5, 0x80, 0x10, 0x00, 0x70, 0x05, 0x20,
578         0x00, 0x3f, 0xef, 0xde, 0xe0, 0x30, 0x00, 0xac,
579         0xd8, 0x8a, 0x3d, 0x34, 0x6a, 0x6d, 0x4c, 0x4f,
580         0x24, 0x31, 0x0d, 0x08, 0x98, 0x9b, 0x48, 0x9a,
581         0x7a, 0x80, 0x00, 0x06, 0xa6, 0xd2, 0xa7, 0xe9,
582         0xaa, 0x37, 0xa8, 0xd4, 0xf5, 0x3f, 0x54, 0x63,
583         0x51, 0xe9, 0x2d, 0x4b, 0x99, 0xe1, 0xcc, 0xca,
584         0xda, 0x75, 0x04, 0x42, 0x14, 0xc8, 0x6a, 0x8e,
585         0x23, 0xc1, 0x3e, 0xb1, 0x8a, 0x16, 0xd2, 0x55,
586         0x9a, 0x3e, 0x56, 0x1a, 0xb1, 0x83, 0x11, 0xa6,
587         0x50, 0x4f, 0xd3, 0xed, 0x21, 0x40, 0xaa, 0xd1,
588         0x95, 0x2c, 0xda, 0xcb, 0xb7, 0x0e, 0xce, 0x65,
589         0xfc, 0x63, 0xf2, 0x88, 0x5b, 0x36, 0xda, 0xf0,
590         0xf5, 0xd2, 0x9c, 0xe6, 0xf1, 0x87, 0x12, 0x87,
591         0xce, 0x56, 0x0c, 0xf5, 0x65, 0x4d, 0x2e, 0xd6,
592         0x27, 0x61, 0x2b, 0x74, 0xcd, 0x5e, 0x3b, 0x02,
593         0x42, 0x4e, 0x0b, 0x80, 0xa8, 0x70, 0x04, 0x48,
594         0xfb, 0x93, 0x4c, 0x41, 0xa8, 0x2a, 0xdf, 0xf2,
595         0x67, 0x37, 0x28, 0xad, 0x38, 0xd4, 0x5c, 0xd6,
596         0x34, 0x8b, 0x49, 0x5e, 0x90, 0xb2, 0x06, 0xce,
597         0x0a, 0x83, 0x29, 0x84, 0x20, 0xd7, 0x5f, 0xc5,
598         0xdc, 0x91, 0x4e, 0x14, 0x24, 0x10, 0x26, 0x2f,
599         0xaa, 0x80];
600 
601     scope cond = new Array(1024, 1024);
602     scope comp = new BzipOutput(cond);
603     comp.write(message);
604     comp.close();
605 
606     assert( comp.written() == message_z.length );
607 
608     assert( message_z == cast(ubyte[])(cond.slice()) );
609 
610     scope decomp = new BzipInput(cond);
611     auto buffer = new ubyte[256];
612     buffer = buffer[0 .. decomp.read(buffer)];
613 
614     assert( cast(ubyte[])message == buffer );
615 }
616