1 /*******************************************************************************
2 
3         copyright:      Copyright (c) 2005 Kris Bell. All rights reserved
4 
5         license:        BSD style: $(LICENSE)
6 
7         version:        Mar 2005: Initial release
8 
9         author:         Kris
10 
11 *******************************************************************************/
12 
13 module tango.net.util.MemCache;
14 
15 private import  tango.io.Console;
16 
17 private import  Thr = tango.core.Thread, /* Because the druntime folks in their infinite wisdom decided to publically import core.time in thread */
18                 tango.core.Exception,
19                 tango.core.Time;
20 
21 private import  tango.io.stream.Lines,
22                 tango.io.stream.Buffered;
23 
24 private import  tango.net.device.Socket,
25                 tango.net.InternetAddress;
26 
27 private import  Integer = tango.text.convert.Integer;
28 
29 
30 /******************************************************************************
31 
32 ******************************************************************************/
33 
34 class MemCache : Thr.Thread
35 {
36         private shared(Connection)[] hosts;
37         private bool            active;
38         private uint            watchdog;
39 
40         /**********************************************************************
41 
42         **********************************************************************/
43 
44         this (const(char[])[] hosts, uint watchdog = 3)
45         {
46                 super (&run);
47                 setHosts (hosts);
48 
49                 // save configuration
50                 this.watchdog = watchdog;
51 
52                 // start the watchdog
53                 active = true;
54                 super.start();
55         }
56 
57         /**********************************************************************
58 
59         **********************************************************************/
60 
61         final void close ()
62         {
63                 if (hosts)
64                    {
65                    foreach (shared Connection server; hosts)
66                             server.close();
67                    hosts = null;
68                    }
69         }
70 
71         /**********************************************************************
72 
73                 Store the key and value
74 
75         **********************************************************************/
76 
77         final bool set (const(void)[][] key, const(void)[][] value, int flags=0, int timeout=0)
78         {
79                 return select(key).put("set", key, value, flags, timeout);
80         }
81 
82         /**********************************************************************
83 
84                 Store the value if key does not already exist
85 
86         **********************************************************************/
87 
88         final bool add (const(void)[][] key, const(void)[][] value, int flags=0, int timeout=0)
89         {
90                 return select(key).put("add", key, value, flags, timeout);
91         }
92 
93         /**********************************************************************
94 
95                 Store the value only if key exists
96 
97         **********************************************************************/
98 
99         final bool replace (const(void)[][] key, const(void)[][] value, int flags=0, int timeout=0)
100         {
101                 return select(key).put("replace", key, value, flags, timeout);
102         }
103 
104         /**********************************************************************
105 
106                 Remove the specified key and make key "invalid" for the
107                 duration of timeout, causing add(), get() and remove() on
108                 the same key to fail within that period
109 
110         **********************************************************************/
111 
112         final bool remove (const(void)[][] key, int timeout=0)
113         {
114                 return select(key).remove(key, timeout);
115         }
116 
117         /**********************************************************************
118 
119                 VALUE <key> <flags> <bytes>\r\n
120                 <data block>\r\n
121 
122         **********************************************************************/
123 
124         final bool get (const(void)[][] key, Buffer buffer)
125         {
126                 return select(key).get(key, buffer);
127         }
128 
129         /**********************************************************************
130 
131         **********************************************************************/
132 
133         final bool incr (const(void)[][] key, uint value)
134         {
135                 uint result;
136                 return incr (key, value, result);
137         }
138 
139         /**********************************************************************
140 
141         **********************************************************************/
142 
143         final bool decr (const(void)[][] key, uint value)
144         {
145                 uint result;
146                 return decr (key, value, result);
147         }
148 
149         /**********************************************************************
150 
151         **********************************************************************/
152 
153         final bool incr (const(void)[][] key, uint value, ref uint result)
154         {
155                 return select(key).bump ("incr", key, value, result);
156         }
157 
158         /**********************************************************************
159 
160         **********************************************************************/
161 
162         final bool decr (const(void)[][] key, uint value, ref uint result)
163         {
164                 return select(key).bump ("decr", key, value, result);
165         }
166 
167         /**********************************************************************
168 
169         **********************************************************************/
170 
171         final void status (void delegate (const(char)[], const(char[])[] list) dg)
172         {
173                 foreach (shared Connection server; hosts)
174                          server.status (dg);
175         }
176 
177         /**********************************************************************
178 
179         **********************************************************************/
180 
181         final Buffer buffer (uint size)
182         {
183                 return new Buffer (size);
184         }
185 
186         /**********************************************************************
187 
188         **********************************************************************/
189 
190         final void setHosts (const(char[])[] hosts)
191         {
192                 auto conn = new shared(Connection) [hosts.length];
193 
194                 foreach (i, const(char)[] host; hosts)
195                          conn[i] = cast(shared) new Connection(host);
196 
197                 // set new list of connections
198                 this.hosts = conn;
199                 connect (conn);
200         }
201 
202         /**********************************************************************
203 
204                 Connection watchdog thread
205 
206         **********************************************************************/
207 
208         private void run ()
209         {
210                 while (active)
211                        try {
212                            Thr.Thread.sleep (seconds(watchdog));
213                            debug(TangoMemCache) Cout ("testing connections ...").newline;
214                            connect (hosts);
215                            } catch (Exception e)
216                                     debug(TangoMemCache) Cout ("memcache watchdog: ") (e.toString).newline;
217         }
218 
219         /**********************************************************************
220 
221         **********************************************************************/
222 
223         private shared(Connection) select (const(void)[][] key)
224         {
225                 return hosts[jhash(key) % hosts.length];
226         }
227 
228         /**********************************************************************
229 
230         **********************************************************************/
231 
232         private void connect (shared(Connection)[] hosts)
233         {
234                 foreach (Connection c; cast(Connection[])hosts)
235                          c.connect();
236         }
237 
238         /**********************************************************************
239 
240         **********************************************************************/
241 
242         static class Buffer
243         {
244                 private size_t    extent;
245                 private void[]  content;
246 
247                 /**************************************************************
248 
249                 **************************************************************/
250 
251                 private this (size_t size)
252                 {
253                         this.content = new byte [size];
254                 }
255 
256                 /**************************************************************
257 
258                 **************************************************************/
259 
260                 bool expand (size_t size)
261                 {
262                         if (size > content.length)
263                             content.length = size;
264                         return true;
265                 }
266 
267                 /**************************************************************
268 
269                 **************************************************************/
270 
271                 void[] set (size_t size)
272                 {
273                         extent = size;
274                         return get();
275                 }
276 
277                 /**************************************************************
278 
279                 **************************************************************/
280 
281                 void[] get ()
282                 {
283                         return content [0..extent];
284                 }
285         }
286 
287 	/**********************************************************************
288 
289 	        jhash() -- hash a variable-length key into a 32-bit value
290 
291 	          k     : the key (the unaligned variable-length array of bytes)
292 	          len   : the length of the key, counting by bytes
293 	          level : can be any 4-byte value
294 
295 	        Returns a 32-bit value.  Every bit of the key affects every bit of
296 	        the return value.  Every 1-bit and 2-bit delta achieves avalanche.
297 
298 	        About 4.3*len + 80 X86 instructions, with excellent pipelining
299 
300 	        The best hash table sizes are powers of 2.  There is no need to do
301 	        mod a prime (mod is sooo slow!).  If you need less than 32 bits,
302 	        use a bitmask.  For example, if you need only 10 bits, do
303 
304 	                    h = (h & hashmask(10));
305 
306 	        In which case, the hash table should have hashsize(10) elements.
307 	        If you are hashing n strings (ub1 **)k, do it like this:
308 
309 	                    for (i=0, h=0; i<n; ++i) h = hash( k[i], len[i], h);
310 
311 	        By Bob Jenkins, 1996.  bob_jenkins@burtleburtle.net.  You may use
312 	        this code any way you wish, private, educational, or commercial.
313 	        It's free.
314 
315 	        See http://burlteburtle.net/bob/hash/evahash.html
316 	        Use for hash table lookup, or anything where one collision in 2^32
317 	        is acceptable. Do NOT use for cryptographic purposes.
318 
319 	**********************************************************************/
320 
321 	static final uint jhash (const(void)[][] x, uint c = 0)
322 	{
323 	        uint    a,
324 	                b;
325 
326 	        a = b = 0x9e3779b9;
327 
328             auto len = x.length;
329 	        ubyte* k = cast(ubyte *) x.ptr;
330 
331 	        // handle most of the key
332 	        while (len >= 12)
333 	              {
334 	              a += *cast(uint *)(k+0);
335 	              b += *cast(uint *)(k+4);
336 	              c += *cast(uint *)(k+8);
337 
338 	              a -= b; a -= c; a ^= (c>>13);
339 	              b -= c; b -= a; b ^= (a<<8);
340 	              c -= a; c -= b; c ^= (b>>13);
341 	              a -= b; a -= c; a ^= (c>>12);
342 	              b -= c; b -= a; b ^= (a<<16);
343 	              c -= a; c -= b; c ^= (b>>5);
344 	              a -= b; a -= c; a ^= (c>>3);
345 	              b -= c; b -= a; b ^= (a<<10);
346 	              c -= a; c -= b; c ^= (b>>15);
347 	              k += 12; len -= 12;
348 	              }
349 
350 	        // handle the last 11 bytes
351 	        c += x.length;
352 	        switch (len)
353 	               {
354 	               case 11: c += (cast(uint)k[10]<<24); goto case;
355 	               case 10: c += (cast(uint)k[9]<<16); goto case;
356 	               case 9 : c += (cast(uint)k[8]<<8); goto case;
357 	               case 8 : b += (cast(uint)k[7]<<24); goto case;
358 	               case 7 : b += (cast(uint)k[6]<<16); goto case;
359 	               case 6 : b += (cast(uint)k[5]<<8); goto case;
360 	               case 5 : b += k[4]; goto case;
361 	               case 4 : a += (cast(uint)k[3]<<24); goto case;
362 	               case 3 : a += (cast(uint)k[2]<<16); goto case;
363 	               case 2 : a += (cast(uint)k[1]<<8); goto case;
364 	               case 1 : a += k[0]; break;
365 	               default:
366 	               }
367 
368 	        a -= b; a -= c; a ^= (c>>13);
369 	        b -= c; b -= a; b ^= (a<<8);
370 	        c -= a; c -= b; c ^= (b>>13);
371 	        a -= b; a -= c; a ^= (c>>12);
372 	        b -= c; b -= a; b ^= (a<<16);
373 	        c -= a; c -= b; c ^= (b>>5);
374 	        a -= b; a -= c; a ^= (c>>3);
375 	        b -= c; b -= a; b ^= (a<<10);
376 	        c -= a; c -= b; c ^= (b>>15);
377 
378 	        return c;
379 	}
380 }
381 
382 
383 /******************************************************************************
384 
385 ******************************************************************************/
386 
387 private class Connection
388 {
389         private alias Lines!(char) Line;
390 
391         private const(char)[]          host;           // original host address
392         private __gshared Line         line;           // reading lines from server
393         private __gshared Bin          input;          // input stream
394         private __gshared Bout         output;         // output stream
395         private __gshared Socket       conduit;        // socket to server
396         private InternetAddress address;        // where server is listening
397         private bool            connected;      // currently connected?
398 
399         /**********************************************************************
400 
401         **********************************************************************/
402 
403         this (const(char)[] host)
404         {
405                 this.host = host;
406                 conduit = new Socket;
407                 output = new Bout (conduit);
408                 input = new Bin (conduit);
409                 line = new Line (input);
410                 address = new InternetAddress (host);
411         }
412 
413         /**********************************************************************
414 
415         **********************************************************************/
416 
417         private void connect ()
418         {
419                 if (! connected)
420                       try {
421                           conduit.connect (address);
422                           connected = true;
423                           debug(TangoMemCache) Cout ("connected to ") (host).newline;
424                           } catch (Throwable th)
425                                    debug(TangoMemCache) Cout ("failed to connect to ")(host).newline;
426         }
427 
428         /**********************************************************************
429 
430         **********************************************************************/
431 
432         private synchronized void close ()
433         {
434                 bool alive = connected;
435                 connected = false;
436 
437                 if (alive)
438                     conduit.close();
439         }
440 
441         /**********************************************************************
442 
443         **********************************************************************/
444 
445         private synchronized void error ()
446         {
447                 // close this dead socket
448                 close();
449 
450                 // open another one for next attempt to connect
451                 conduit.socket.reopen();
452         }
453 
454         /**********************************************************************
455 
456         **********************************************************************/
457 
458         private synchronized bool put (const(char)[] cmd, const(void)[][] key, const(void)[][] value, int flags, int timeout)
459         {
460                 if (connected)
461                     try {
462                         char[16] tmp;
463 
464                         output.clear();
465                         output.append ("delete ")
466                               .append (key)
467                               .append (" ")
468                               .append (Integer.format (tmp, timeout))
469                               .append ("\r\n")
470                               .flush();
471 
472                         if (line.next)
473                             return line.get() == "DELETED";
474                         } catch (IOException e)
475                                  error();
476                 return false;
477         }
478 
479         /**********************************************************************
480 
481                 VALUE <key> <flags> <bytes>\r\n
482                 <data block>\r\n
483 
484         **********************************************************************/
485 
486         private synchronized bool get (const(void)[][] key, MemCache.Buffer buffer)
487         {
488                 if (connected)
489                     try {
490                         output.clear();
491                         output.append ("get ")
492                               .append (key)
493                               .append ("\r\n")
494                               .flush();
495 
496                         if (line.next)
497                            {
498                            const(char)[] content = line.get();
499                            if (content.length > 4 && content[0..5] == "VALUE")
500                               {
501                               size_t i = 0;
502 
503                               // parse the incoming content-length
504                               for (i=content.length; content[--i] != ' ';)
505                                   {}
506                               i = cast(size_t) Integer.parse (content[i .. $]);
507 
508                               // ensure output buffer has enough space
509                               buffer.expand (i);
510                               void[] dst = buffer.set (i);
511 
512                               // fill the buffer content
513                               if (! input.fill (dst))
514                                     return false;
515 
516                               // eat the CR and test terminator
517                               line.next;
518                               line.next;
519                               return line.get() == "END";
520                               }
521                            }
522                         } catch (IOException e)
523                                  error();
524                 return false;
525         }
526 
527         /**********************************************************************
528 
529                 Remove the specified key and make key "invalid" for the
530                 duration of timeout, causing add(), get() and remove() on
531                 the same key to fail within that period
532 
533         **********************************************************************/
534 
535         private synchronized bool remove (const(void)[][] key, int timeout=0)
536         {
537                 if (connected)
538                     try {
539                         char[16] tmp;
540 
541                         output.clear();
542                         output.append ("delete ")
543                               .append (key)
544                               .append (" ")
545                               .append (Integer.format (tmp, timeout))
546                               .append ("\r\n")
547                               .flush();
548 
549                         if (line.next)
550                             return line.get() == "DELETED";
551                         } catch (IOException e)
552                                  error();
553                 return false;
554         }
555 
556         /**********************************************************************
557 
558         **********************************************************************/
559 
560         private synchronized bool bump (const(char)[] cmd, const(void)[][] key, uint value,
561                                         ref uint result)
562         {
563                 if (connected)
564                     try {
565                         char[16] tmp;
566 
567                         output.clear();
568                         output.append (cmd)
569                               .append (" ")
570                               .append (key)
571                               .append (" ")
572                               .append (Integer.format (tmp, value))
573                               .append ("\r\n")
574                               .flush();
575 
576                         if (line.next)
577                             if (line.get() != "NOT_FOUND")
578                                {
579                                result = cast(uint)Integer.parse (line.get());
580                                return true;
581                                }
582                         } catch (IOException e)
583                                  error();
584                 return false;
585         }
586 
587         /**********************************************************************
588 
589         **********************************************************************/
590 
591         private synchronized void status (scope void delegate (const(char)[], const(char[])[] list) dg)
592         {
593                 if (connected)
594                     try {
595                         const(char[])[] list;
596 
597                         output.clear();
598                         output.write ("stats\r\n");
599 
600                         while (line.next)
601                                if (line.get() == "END")
602                                   {
603                                   dg (cast(char[])host, list);
604                                       break;
605                                   }
606                                else
607                                   list ~= line.get();
608 
609                         } catch (IOException e)
610                                  error();
611         }
612 
613 }
614 
615 
616 debug (TangoMemCache)
617 {
618 /******************************************************************************
619 
620 ******************************************************************************/
621 
622 void main()
623 {
624         __gshared const(char[])[] hosts = ["192.168.111.224:11211"];
625 
626         auto cache = new MemCache (hosts);
627 
628         cache.set ("foo", "bar");
629         cache.set ("foo", "wumpus");
630 
631         auto buffer = cache.buffer (1024);
632         if (cache.get ("foo", buffer))
633             Cout ("value: ") (cast(const(char)[]) buffer.get).newline;
634 
635         void stat (const(char)[] host, const(char[])[] list)
636         {
637                 foreach (const(char)[] line; list)
638                          Cout (host) (" ") (line).newline;
639         }
640 
641         while (true)
642               {
643               cache.status (&stat);
644               Thr.Thread.sleep (seconds(1.0));
645               }
646         Cout ("exiting");
647 }
648 }
649