1 /******************************************************************************* 2 3 copyright: Copyright (c) 2005 Kris Bell. All rights reserved 4 5 license: BSD style: $(LICENSE) 6 7 version: Mar 2005: Initial release 8 9 author: Kris 10 11 *******************************************************************************/ 12 13 module tango.net.util.MemCache; 14 15 private import tango.io.Console; 16 17 private import Thr = tango.core.Thread, /* Because the druntime folks in their infinite wisdom decided to publically import core.time in thread */ 18 tango.core.Exception, 19 tango.core.Time; 20 21 private import tango.io.stream.Lines, 22 tango.io.stream.Buffered; 23 24 private import tango.net.device.Socket, 25 tango.net.InternetAddress; 26 27 private import Integer = tango.text.convert.Integer; 28 29 30 /****************************************************************************** 31 32 ******************************************************************************/ 33 34 class MemCache : Thr.Thread 35 { 36 private shared(Connection)[] hosts; 37 private bool active; 38 private uint watchdog; 39 40 /********************************************************************** 41 42 **********************************************************************/ 43 44 this (const(char[])[] hosts, uint watchdog = 3) 45 { 46 super (&run); 47 setHosts (hosts); 48 49 // save configuration 50 this.watchdog = watchdog; 51 52 // start the watchdog 53 active = true; 54 super.start(); 55 } 56 57 /********************************************************************** 58 59 **********************************************************************/ 60 61 final void close () 62 { 63 if (hosts) 64 { 65 foreach (shared Connection server; hosts) 66 server.close(); 67 hosts = null; 68 } 69 } 70 71 /********************************************************************** 72 73 Store the key and value 74 75 **********************************************************************/ 76 77 final bool set (const(void)[][] key, const(void)[][] value, int flags=0, int timeout=0) 78 { 79 return select(key).put("set", key, value, flags, timeout); 80 } 81 82 /********************************************************************** 83 84 Store the value if key does not already exist 85 86 **********************************************************************/ 87 88 final bool add (const(void)[][] key, const(void)[][] value, int flags=0, int timeout=0) 89 { 90 return select(key).put("add", key, value, flags, timeout); 91 } 92 93 /********************************************************************** 94 95 Store the value only if key exists 96 97 **********************************************************************/ 98 99 final bool replace (const(void)[][] key, const(void)[][] value, int flags=0, int timeout=0) 100 { 101 return select(key).put("replace", key, value, flags, timeout); 102 } 103 104 /********************************************************************** 105 106 Remove the specified key and make key "invalid" for the 107 duration of timeout, causing add(), get() and remove() on 108 the same key to fail within that period 109 110 **********************************************************************/ 111 112 final bool remove (const(void)[][] key, int timeout=0) 113 { 114 return select(key).remove(key, timeout); 115 } 116 117 /********************************************************************** 118 119 VALUE <key> <flags> <bytes>\r\n 120 <data block>\r\n 121 122 **********************************************************************/ 123 124 final bool get (const(void)[][] key, Buffer buffer) 125 { 126 return select(key).get(key, buffer); 127 } 128 129 /********************************************************************** 130 131 **********************************************************************/ 132 133 final bool incr (const(void)[][] key, uint value) 134 { 135 uint result; 136 return incr (key, value, result); 137 } 138 139 /********************************************************************** 140 141 **********************************************************************/ 142 143 final bool decr (const(void)[][] key, uint value) 144 { 145 uint result; 146 return decr (key, value, result); 147 } 148 149 /********************************************************************** 150 151 **********************************************************************/ 152 153 final bool incr (const(void)[][] key, uint value, ref uint result) 154 { 155 return select(key).bump ("incr", key, value, result); 156 } 157 158 /********************************************************************** 159 160 **********************************************************************/ 161 162 final bool decr (const(void)[][] key, uint value, ref uint result) 163 { 164 return select(key).bump ("decr", key, value, result); 165 } 166 167 /********************************************************************** 168 169 **********************************************************************/ 170 171 final void status (void delegate (const(char)[], const(char[])[] list) dg) 172 { 173 foreach (shared Connection server; hosts) 174 server.status (dg); 175 } 176 177 /********************************************************************** 178 179 **********************************************************************/ 180 181 final Buffer buffer (uint size) 182 { 183 return new Buffer (size); 184 } 185 186 /********************************************************************** 187 188 **********************************************************************/ 189 190 final void setHosts (const(char[])[] hosts) 191 { 192 auto conn = new shared(Connection) [hosts.length]; 193 194 foreach (int i, const(char)[] host; hosts) 195 conn[i] = cast(shared) new Connection(host); 196 197 // set new list of connections 198 this.hosts = conn; 199 connect (conn); 200 } 201 202 /********************************************************************** 203 204 Connection watchdog thread 205 206 **********************************************************************/ 207 208 private void run () 209 { 210 while (active) 211 try { 212 Thr.Thread.sleep (seconds(watchdog)); 213 debug(TangoMemCache) Cout ("testing connections ...").newline; 214 connect (hosts); 215 } catch (Exception e) 216 debug(TangoMemCache) Cout ("memcache watchdog: ") (e.toString).newline; 217 } 218 219 /********************************************************************** 220 221 **********************************************************************/ 222 223 private shared(Connection) select (const(void)[][] key) 224 { 225 return hosts[jhash(key) % hosts.length]; 226 } 227 228 /********************************************************************** 229 230 **********************************************************************/ 231 232 private void connect (shared(Connection)[] hosts) 233 { 234 foreach (Connection c; cast(Connection[])hosts) 235 c.connect(); 236 } 237 238 /********************************************************************** 239 240 **********************************************************************/ 241 242 static class Buffer 243 { 244 private size_t extent; 245 private void[] content; 246 247 /************************************************************** 248 249 **************************************************************/ 250 251 private this (size_t size) 252 { 253 this.content = new byte [size]; 254 } 255 256 /************************************************************** 257 258 **************************************************************/ 259 260 bool expand (size_t size) 261 { 262 if (size > content.length) 263 content.length = size; 264 return true; 265 } 266 267 /************************************************************** 268 269 **************************************************************/ 270 271 void[] set (size_t size) 272 { 273 extent = size; 274 return get(); 275 } 276 277 /************************************************************** 278 279 **************************************************************/ 280 281 void[] get () 282 { 283 return content [0..extent]; 284 } 285 } 286 287 /********************************************************************** 288 289 jhash() -- hash a variable-length key into a 32-bit value 290 291 k : the key (the unaligned variable-length array of bytes) 292 len : the length of the key, counting by bytes 293 level : can be any 4-byte value 294 295 Returns a 32-bit value. Every bit of the key affects every bit of 296 the return value. Every 1-bit and 2-bit delta achieves avalanche. 297 298 About 4.3*len + 80 X86 instructions, with excellent pipelining 299 300 The best hash table sizes are powers of 2. There is no need to do 301 mod a prime (mod is sooo slow!). If you need less than 32 bits, 302 use a bitmask. For example, if you need only 10 bits, do 303 304 h = (h & hashmask(10)); 305 306 In which case, the hash table should have hashsize(10) elements. 307 If you are hashing n strings (ub1 **)k, do it like this: 308 309 for (i=0, h=0; i<n; ++i) h = hash( k[i], len[i], h); 310 311 By Bob Jenkins, 1996. bob_jenkins@burtleburtle.net. You may use 312 this code any way you wish, private, educational, or commercial. 313 It's free. 314 315 See http://burlteburtle.net/bob/hash/evahash.html 316 Use for hash table lookup, or anything where one collision in 2^32 317 is acceptable. Do NOT use for cryptographic purposes. 318 319 **********************************************************************/ 320 321 static final uint jhash (const(void)[][] x, uint c = 0) 322 { 323 uint a, 324 b; 325 326 a = b = 0x9e3779b9; 327 328 auto len = x.length; 329 ubyte* k = cast(ubyte *) x.ptr; 330 331 // handle most of the key 332 while (len >= 12) 333 { 334 a += *cast(uint *)(k+0); 335 b += *cast(uint *)(k+4); 336 c += *cast(uint *)(k+8); 337 338 a -= b; a -= c; a ^= (c>>13); 339 b -= c; b -= a; b ^= (a<<8); 340 c -= a; c -= b; c ^= (b>>13); 341 a -= b; a -= c; a ^= (c>>12); 342 b -= c; b -= a; b ^= (a<<16); 343 c -= a; c -= b; c ^= (b>>5); 344 a -= b; a -= c; a ^= (c>>3); 345 b -= c; b -= a; b ^= (a<<10); 346 c -= a; c -= b; c ^= (b>>15); 347 k += 12; len -= 12; 348 } 349 350 // handle the last 11 bytes 351 c += x.length; 352 switch (len) 353 { 354 case 11: c += (cast(uint)k[10]<<24); goto case; 355 case 10: c += (cast(uint)k[9]<<16); goto case; 356 case 9 : c += (cast(uint)k[8]<<8); goto case; 357 case 8 : b += (cast(uint)k[7]<<24); goto case; 358 case 7 : b += (cast(uint)k[6]<<16); goto case; 359 case 6 : b += (cast(uint)k[5]<<8); goto case; 360 case 5 : b += k[4]; goto case; 361 case 4 : a += (cast(uint)k[3]<<24); goto case; 362 case 3 : a += (cast(uint)k[2]<<16); goto case; 363 case 2 : a += (cast(uint)k[1]<<8); goto case; 364 case 1 : a += k[0]; break; 365 default: 366 } 367 368 a -= b; a -= c; a ^= (c>>13); 369 b -= c; b -= a; b ^= (a<<8); 370 c -= a; c -= b; c ^= (b>>13); 371 a -= b; a -= c; a ^= (c>>12); 372 b -= c; b -= a; b ^= (a<<16); 373 c -= a; c -= b; c ^= (b>>5); 374 a -= b; a -= c; a ^= (c>>3); 375 b -= c; b -= a; b ^= (a<<10); 376 c -= a; c -= b; c ^= (b>>15); 377 378 return c; 379 } 380 } 381 382 383 /****************************************************************************** 384 385 ******************************************************************************/ 386 387 private class Connection 388 { 389 private alias Lines!(char) Line; 390 391 private const(char)[] host; // original host address 392 private __gshared Line line; // reading lines from server 393 private __gshared Bin input; // input stream 394 private __gshared Bout output; // output stream 395 private __gshared Socket conduit; // socket to server 396 private InternetAddress address; // where server is listening 397 private bool connected; // currently connected? 398 399 /********************************************************************** 400 401 **********************************************************************/ 402 403 this (const(char)[] host) 404 { 405 this.host = host; 406 conduit = new Socket; 407 output = new Bout (conduit); 408 input = new Bin (conduit); 409 line = new Line (input); 410 address = new InternetAddress (host); 411 } 412 413 /********************************************************************** 414 415 **********************************************************************/ 416 417 private void connect () 418 { 419 if (! connected) 420 try { 421 conduit.connect (address); 422 connected = true; 423 debug(TangoMemCache) Cout ("connected to ") (host).newline; 424 } catch (Throwable th) 425 debug(TangoMemCache) Cout ("failed to connect to ")(host).newline; 426 } 427 428 /********************************************************************** 429 430 **********************************************************************/ 431 432 private synchronized void close () 433 { 434 bool alive = connected; 435 connected = false; 436 437 if (alive) 438 conduit.close(); 439 } 440 441 /********************************************************************** 442 443 **********************************************************************/ 444 445 private synchronized void error () 446 { 447 // close this dead socket 448 close(); 449 450 // open another one for next attempt to connect 451 conduit.socket.reopen(); 452 } 453 454 /********************************************************************** 455 456 **********************************************************************/ 457 458 private synchronized bool put (const(char)[] cmd, const(void)[][] key, const(void)[][] value, int flags, int timeout) 459 { 460 if (connected) 461 try { 462 char[16] tmp; 463 464 output.clear(); 465 output.append ("delete ") 466 .append (key) 467 .append (" ") 468 .append (Integer.format (tmp, timeout)) 469 .append ("\r\n") 470 .flush(); 471 472 if (line.next) 473 return line.get() == "DELETED"; 474 } catch (IOException e) 475 error(); 476 return false; 477 } 478 479 /********************************************************************** 480 481 VALUE <key> <flags> <bytes>\r\n 482 <data block>\r\n 483 484 **********************************************************************/ 485 486 private synchronized bool get (const(void)[][] key, MemCache.Buffer buffer) 487 { 488 if (connected) 489 try { 490 output.clear(); 491 output.append ("get ") 492 .append (key) 493 .append ("\r\n") 494 .flush(); 495 496 if (line.next) 497 { 498 const(char)[] content = line.get(); 499 if (content.length > 4 && content[0..5] == "VALUE") 500 { 501 size_t i = 0; 502 503 // parse the incoming content-length 504 for (i=content.length; content[--i] != ' ';) 505 {} 506 i = cast(size_t) Integer.parse (content[i .. $]); 507 508 // ensure output buffer has enough space 509 buffer.expand (i); 510 void[] dst = buffer.set (i); 511 512 // fill the buffer content 513 if (! input.fill (dst)) 514 return false; 515 516 // eat the CR and test terminator 517 line.next; 518 line.next; 519 return line.get() == "END"; 520 } 521 } 522 } catch (IOException e) 523 error(); 524 return false; 525 } 526 527 /********************************************************************** 528 529 Remove the specified key and make key "invalid" for the 530 duration of timeout, causing add(), get() and remove() on 531 the same key to fail within that period 532 533 **********************************************************************/ 534 535 private synchronized bool remove (const(void)[][] key, int timeout=0) 536 { 537 if (connected) 538 try { 539 char[16] tmp; 540 541 output.clear(); 542 output.append ("delete ") 543 .append (key) 544 .append (" ") 545 .append (Integer.format (tmp, timeout)) 546 .append ("\r\n") 547 .flush(); 548 549 if (line.next) 550 return line.get() == "DELETED"; 551 } catch (IOException e) 552 error(); 553 return false; 554 } 555 556 /********************************************************************** 557 558 **********************************************************************/ 559 560 private synchronized bool bump (const(char)[] cmd, const(void)[][] key, uint value, 561 ref uint result) 562 { 563 if (connected) 564 try { 565 char[16] tmp; 566 567 output.clear(); 568 output.append (cmd) 569 .append (" ") 570 .append (key) 571 .append (" ") 572 .append (Integer.format (tmp, value)) 573 .append ("\r\n") 574 .flush(); 575 576 if (line.next) 577 if (line.get() != "NOT_FOUND") 578 { 579 result = cast(uint)Integer.parse (line.get()); 580 return true; 581 } 582 } catch (IOException e) 583 error(); 584 return false; 585 } 586 587 /********************************************************************** 588 589 **********************************************************************/ 590 591 private synchronized void status (scope void delegate (const(char)[], const(char[])[] list) dg) 592 { 593 if (connected) 594 try { 595 const(char[])[] list; 596 597 output.clear(); 598 output.write ("stats\r\n"); 599 600 while (line.next) 601 if (line.get() == "END") 602 { 603 dg (cast(char[])host, list); 604 break; 605 } 606 else 607 list ~= line.get(); 608 609 } catch (IOException e) 610 error(); 611 } 612 613 } 614 615 616 debug (TangoMemCache) 617 { 618 /****************************************************************************** 619 620 ******************************************************************************/ 621 622 void main() 623 { 624 __gshared const(char[])[] hosts = ["192.168.111.224:11211"]; 625 626 auto cache = new MemCache (hosts); 627 628 cache.set ("foo", "bar"); 629 cache.set ("foo", "wumpus"); 630 631 auto buffer = cache.buffer (1024); 632 if (cache.get ("foo", buffer)) 633 Cout ("value: ") (cast(const(char)[]) buffer.get).newline; 634 635 void stat (const(char)[] host, const(char[])[] list) 636 { 637 foreach (const(char)[] line; list) 638 Cout (host) (" ") (line).newline; 639 } 640 641 while (true) 642 { 643 cache.status (&stat); 644 Thr.Thread.sleep (seconds(1.0)); 645 } 646 Cout ("exiting"); 647 } 648 } 649