1 /******************************************************************************* 2 copyright: Copyright (c) 2006 Juan Jose Comellas. All rights reserved 3 license: BSD style: $(LICENSE) 4 author: Juan Jose Comellas $(EMAIL juanjo@comellas.com.ar) 5 *******************************************************************************/ 6 7 module tango.io.selector.SelectSelector; 8 9 10 11 public import tango.io.model.IConduit; 12 13 private import Time = tango.core.Time; 14 public import tango.io.selector.model.ISelector; 15 16 private import tango.io.selector.AbstractSelector; 17 private import tango.io.selector.SelectorException; 18 private import tango.sys.Common; 19 20 private import tango.stdc.errno; 21 22 debug (selector) 23 { 24 private import tango.io.Stdout; 25 private import tango.text.convert.Integer; 26 } 27 28 29 version (Windows) 30 { 31 private import tango.core.Thread; 32 33 private 34 { 35 // Opaque struct 36 struct fd_set 37 { 38 } 39 40 extern (Windows) int select(int nfds, fd_set* readfds, fd_set* writefds, 41 fd_set* errorfds, timeval* timeout); 42 } 43 } 44 45 version (Posix) 46 { 47 private import tango.core.BitArray; 48 } 49 50 51 /** 52 * Selector that uses the select() system call to receive I/O events for 53 * the registered conduits. To use this class you would normally do 54 * something like this: 55 * 56 * Examples: 57 * --- 58 * import tango.io.selector.SelectSelector; 59 * 60 * Socket socket; 61 * ISelector selector = new SelectSelector(); 62 * 63 * selector.open(100, 10); 64 * 65 * // Register to read from socket 66 * selector.register(socket, Event.Read); 67 * 68 * int eventCount = selector.select(0.1); // 0.1 seconds 69 * if (eventCount > 0) 70 * { 71 * // We can now read from the socket 72 * socket.read(); 73 * } 74 * else if (eventCount == 0) 75 * { 76 * // Timeout 77 * } 78 * else if (eventCount == -1) 79 * { 80 * // Another thread called the wakeup() method. 81 * } 82 * else 83 * { 84 * // Error: should never happen. 85 * } 86 * 87 * selector.close(); 88 * --- 89 */ 90 public class SelectSelector: AbstractSelector 91 { 92 /** 93 * Alias for the select() method as we're not reimplementing it in 94 * this class. 95 */ 96 alias AbstractSelector.select select; 97 98 uint _size; 99 private SelectionKey[ISelectable.Handle] _keys; 100 private HandleSet _readSet; 101 private HandleSet _writeSet; 102 private HandleSet _exceptionSet; 103 private HandleSet _selectedReadSet; 104 private HandleSet _selectedWriteSet; 105 private HandleSet _selectedExceptionSet; 106 int _eventCount; 107 version (Posix) 108 { 109 private ISelectable.Handle _maxfd = cast(ISelectable.Handle) -1; 110 111 /** 112 * Default number of SelectionKey's that will be handled by the 113 * SelectSelector. 114 */ 115 public enum uint DefaultSize = 1024; 116 } 117 else 118 { 119 /** 120 * Default number of SelectionKey's that will be handled by the 121 * SelectSelector. 122 */ 123 public enum uint DefaultSize = 63; 124 } 125 126 /** 127 * Open the select()-based selector. 128 * 129 * Params: 130 * size = maximum amount of conduits that will be registered; 131 * it will grow dynamically if needed. 132 * maxEvents = maximum amount of conduit events that will be 133 * returned in the selection set per call to select(); 134 * this value is currently not used by this selector. 135 */ 136 override public void open(uint size = DefaultSize, uint maxEvents = DefaultSize) 137 in 138 { 139 assert(size > 0); 140 } 141 body 142 { 143 _size = size; 144 } 145 146 /** 147 * Close the selector. 148 * 149 * Remarks: 150 * It can be called multiple times without harmful side-effects. 151 */ 152 override public void close() 153 { 154 _size = 0; 155 _keys = null; 156 _readSet = HandleSet.init; 157 _writeSet = HandleSet.init; 158 _exceptionSet = HandleSet.init; 159 _selectedReadSet = HandleSet.init; 160 _selectedWriteSet = HandleSet.init; 161 _selectedExceptionSet = HandleSet.init; 162 } 163 164 private HandleSet *allocateSet(ref HandleSet set, ref HandleSet selectedSet) 165 { 166 if(!set.initialized) 167 { 168 set.setup(_size); 169 selectedSet.setup(_size); 170 } 171 return &set; 172 } 173 174 /** 175 * Associate a conduit to the selector and track specific I/O events. 176 * If a conduit is already associated with the selector, the events and 177 * attachment are upated. 178 * 179 * Params: 180 * conduit = conduit that will be associated to the selector; 181 * must be a valid conduit (i.e. not null and open). 182 * events = bit mask of Event values that represent the events 183 * that will be tracked for the conduit. 184 * attachment = optional object with application-specific data that 185 * will be available when an event is triggered for the 186 * conduit 187 * 188 * Throws: 189 * RegisteredConduitException if the conduit had already been 190 * registered to the selector. 191 * 192 * Examples: 193 * --- 194 * selector.register(conduit, Event.Read | Event.Write, object); 195 * --- 196 */ 197 override public void register(ISelectable conduit, Event events, Object attachment = null) 198 in 199 { 200 assert(conduit !is null && conduit.fileHandle()); 201 } 202 body 203 { 204 ISelectable.Handle handle = conduit.fileHandle(); 205 206 debug (selector) 207 Stdout.format("--- SelectSelector.register(handle={0}, events=0x{1:x})\n", 208 cast(int) handle, cast(uint) events); 209 210 SelectionKey *key = (handle in _keys); 211 if (key !is null) 212 { 213 if ((events & Event.Read) || (events & Event.Hangup)) 214 { 215 allocateSet(_readSet, _selectedReadSet).set(handle); 216 } 217 else if (_readSet.initialized) 218 { 219 _readSet.clear(handle); 220 } 221 222 if ((events & Event.Write)) 223 { 224 allocateSet(_writeSet, _selectedWriteSet).set(handle); 225 } 226 else if (_writeSet.initialized) 227 { 228 _writeSet.clear(handle); 229 } 230 231 if (events & Event.Error) 232 { 233 allocateSet(_exceptionSet, _selectedExceptionSet).set(handle); 234 } 235 else if (_exceptionSet.initialized) 236 { 237 _exceptionSet.clear(handle); 238 } 239 240 version (Posix) 241 { 242 if (handle > _maxfd) 243 _maxfd = handle; 244 } 245 246 key.events = events; 247 key.attachment = attachment; 248 } 249 else 250 { 251 // Keep record of the Conduits for whom we're tracking events. 252 _keys[handle] = SelectionKey(conduit, events, attachment); 253 254 if ((events & Event.Read) || (events & Event.Hangup)) 255 { 256 allocateSet(_readSet, _selectedReadSet).set(handle); 257 } 258 259 if (events & Event.Write) 260 { 261 allocateSet(_writeSet, _selectedWriteSet).set(handle); 262 } 263 264 if (events & Event.Error) 265 { 266 allocateSet(_exceptionSet, _selectedExceptionSet).set(handle); 267 } 268 269 version (Posix) 270 { 271 if (handle > _maxfd) 272 _maxfd = handle; 273 } 274 } 275 } 276 277 /** 278 * Remove a conduit from the selector. 279 * 280 * Params: 281 * conduit = conduit that had been previously associated to the 282 * selector; it can be null. 283 * 284 * Remarks: 285 * Unregistering a null conduit is allowed and no exception is thrown 286 * if this happens. 287 * 288 * Throws: 289 * UnregisteredConduitException if the conduit had not been previously 290 * registered to the selector. 291 */ 292 override public void unregister(ISelectable conduit) 293 { 294 if (conduit !is null) 295 { 296 ISelectable.Handle handle = conduit.fileHandle(); 297 298 debug (selector) 299 Stdout.format("--- SelectSelector.unregister(handle={0})\n", 300 cast(int) handle); 301 302 SelectionKey* removed = (handle in _keys); 303 304 if (removed !is null) 305 { 306 if (removed.events & Event.Error) 307 { 308 _exceptionSet.clear(handle); 309 } 310 if (removed.events & Event.Write) 311 { 312 _writeSet.clear(handle); 313 } 314 if ((removed.events & Event.Read) || (removed.events & Event.Hangup)) 315 { 316 _readSet.clear(handle); 317 } 318 _keys.remove(handle); 319 320 version (Posix) 321 { 322 // If we're removing the biggest handle we've entered so far 323 // we need to recalculate this value for the set. 324 if (handle == _maxfd) 325 { 326 while (--_maxfd >= 0) 327 { 328 if (_readSet.isSet(_maxfd) || 329 _writeSet.isSet(_maxfd) || 330 _exceptionSet.isSet(_maxfd)) 331 { 332 break; 333 } 334 } 335 } 336 } 337 } 338 else 339 { 340 debug (selector) 341 Stdout.format("--- SelectSelector.unregister(handle={0}): conduit was not found\n", 342 cast(int) conduit.fileHandle()); 343 throw new UnregisteredConduitException(__FILE__, __LINE__); 344 } 345 } 346 } 347 348 /** 349 * Wait for I/O events from the registered conduits for a specified 350 * amount of time. 351 * 352 * Params: 353 * timeout = TimeSpan with the maximum amount of time that the 354 * selector will wait for events from the conduits; the 355 * amount of time is relative to the current system time 356 * (i.e. just the number of milliseconds that the selector 357 * has to wait for the events). 358 * 359 * Returns: 360 * The amount of conduits that have received events; 0 if no conduits 361 * have received events within the specified timeout; and -1 if the 362 * wakeup() method has been called from another thread. 363 * 364 * Throws: 365 * InterruptedSystemCallException if the underlying system call was 366 * interrupted by a signal and the 'restartInterruptedSystemCall' 367 * property was set to false; SelectorException if there were no 368 * resources available to wait for events from the conduits. 369 */ 370 override public int select(TimeSpan timeout) 371 { 372 fd_set *readfds; 373 fd_set *writefds; 374 fd_set *exceptfds; 375 timeval tv; 376 version (Windows) 377 bool handlesAvailable = false; 378 379 debug (selector) 380 Stdout.format("--- SelectSelector.select(timeout={0} msec)\n", timeout.millis); 381 382 if (_readSet.initialized) 383 { 384 debug (selector) 385 _readSet.dump("_readSet"); 386 387 version (Windows) 388 handlesAvailable = handlesAvailable || (_readSet.length > 0); 389 390 readfds = cast(fd_set*) _selectedReadSet.copy(_readSet); 391 } 392 if (_writeSet.initialized) 393 { 394 debug (selector) 395 _writeSet.dump("_writeSet"); 396 397 version (Windows) 398 handlesAvailable = handlesAvailable || (_writeSet.length > 0); 399 400 writefds = cast(fd_set*) _selectedWriteSet.copy(_writeSet); 401 } 402 if (_exceptionSet.initialized) 403 { 404 debug (selector) 405 _exceptionSet.dump("_exceptionSet"); 406 407 version (Windows) 408 handlesAvailable = handlesAvailable || (_exceptionSet.length > 0); 409 410 exceptfds = cast(fd_set*) _selectedExceptionSet.copy(_exceptionSet); 411 } 412 413 version (Posix) 414 { 415 while (true) 416 { 417 toTimeval(&tv, timeout); 418 419 // FIXME: add support for the wakeup() call. 420 _eventCount = .select(_maxfd + 1, readfds, writefds, exceptfds, timeout is TimeSpan.max ? null : &tv); 421 422 debug (selector) 423 Stdout.format("--- .select() returned {0} (maxfd={1})\n", 424 _eventCount, cast(int) _maxfd); 425 if (_eventCount >= 0) 426 { 427 break; 428 } 429 else 430 { 431 if (errno != EINTR || !_restartInterruptedSystemCall) 432 { 433 // checkErrno() always throws an exception 434 checkErrno(__FILE__, __LINE__); 435 } 436 debug (selector) 437 Stdout.print("--- Restarting select() after being interrupted\n"); 438 } 439 } 440 } 441 else 442 { 443 // Windows returns an error when select() is called with all three 444 // handle sets empty, so we emulate the POSIX behavior by calling 445 // Thread.sleep(). 446 if (handlesAvailable) 447 { 448 toTimeval(&tv, timeout); 449 450 // FIXME: Can a system call be interrupted on Windows? 451 _eventCount = .select(uint.max, readfds, writefds, exceptfds, timeout is TimeSpan.max ? null : &tv); 452 453 debug (selector) 454 Stdout.format("--- .select() returned {0}\n", _eventCount); 455 } 456 else 457 { 458 Thread.sleep(Time.seconds(timeout.interval())); 459 _eventCount = 0; 460 } 461 } 462 return _eventCount; 463 } 464 465 /** 466 * Return the selection set resulting from the call to any of the 467 * select() methods. 468 * 469 * Remarks: 470 * If the call to select() was unsuccessful or it did not return any 471 * events, the returned value will be null. 472 */ 473 override public ISelectionSet selectedSet() 474 { 475 return (_eventCount > 0 ? new SelectSelectionSet(_keys, cast(uint) _eventCount, _selectedReadSet, 476 _selectedWriteSet, _selectedExceptionSet) : null); 477 } 478 479 /** 480 * Return the selection key resulting from the registration of a 481 * conduit to the selector. 482 * 483 * Remarks: 484 * If the conduit is not registered to the selector the returned 485 * value will be null. No exception will be thrown by this method. 486 */ 487 override public SelectionKey key(ISelectable conduit) 488 { 489 if(conduit !is null) 490 { 491 if(auto k = conduit.fileHandle in _keys) 492 { 493 return *k; 494 } 495 } 496 return SelectionKey.init; 497 } 498 499 /** 500 * Return the number of keys resulting from the registration of a conduit 501 * to the selector. 502 */ 503 override public size_t count() 504 { 505 return _keys.length; 506 } 507 508 /** 509 * Iterate through the currently registered selection keys. Note that 510 * you should not erase or add any items from the selector while 511 * iterating, although you can register existing conduits again. 512 */ 513 int opApply(scope int delegate(ref SelectionKey) dg) 514 { 515 int result = 0; 516 foreach(v; _keys) 517 { 518 if((result = dg(v)) != 0) 519 break; 520 } 521 return result; 522 } 523 } 524 525 /** 526 * SelectionSet for the select()-based Selector. 527 */ 528 private class SelectSelectionSet: ISelectionSet 529 { 530 SelectionKey[ISelectable.Handle] _keys; 531 uint _eventCount; 532 HandleSet _readSet; 533 HandleSet _writeSet; 534 HandleSet _exceptionSet; 535 536 this(SelectionKey[ISelectable.Handle] keys, uint eventCount, 537 HandleSet readSet, HandleSet writeSet, HandleSet exceptionSet) 538 { 539 _keys = keys; 540 _eventCount = eventCount; 541 _readSet = readSet; 542 _writeSet = writeSet; 543 _exceptionSet = exceptionSet; 544 } 545 546 @property size_t length() 547 { 548 return _eventCount; 549 } 550 551 int opApply(scope int delegate(ref SelectionKey) dg) 552 { 553 int rc = 0; 554 ISelectable.Handle handle; 555 Event events; 556 557 debug (selector) 558 Stdout.format("--- SelectSelectionSet.opApply() ({0} elements)\n", _eventCount); 559 560 foreach (SelectionKey current; _keys) 561 { 562 handle = current.conduit.fileHandle(); 563 564 if (_readSet.isSet(handle)) 565 events = Event.Read; 566 else 567 events = Event.None; 568 569 if (_writeSet.isSet(handle)) 570 events |= Event.Write; 571 572 if (_exceptionSet.isSet(handle)) 573 events |= Event.Error; 574 575 // Only invoke the delegate if there is an event for the conduit. 576 if (events != Event.None) 577 { 578 current.events = events; 579 580 debug (selector) 581 Stdout.format("--- Calling foreach delegate with selection key ({0}, 0x{1:x})\n", 582 cast(int) handle, cast(uint) events); 583 584 if ((rc = dg(current)) != 0) 585 { 586 break; 587 } 588 } 589 else 590 { 591 debug (selector) 592 Stdout.format("--- Handle {0} doesn't have pending events\n", 593 cast(int) handle); 594 } 595 } 596 return rc; 597 } 598 } 599 600 601 version (Windows) 602 { 603 /** 604 * Helper class used by the select()-based Selector to store handles. 605 * On Windows the handles are kept in an array of uints and the first 606 * element of the array stores the array "length" (i.e. number of handles 607 * in the array). Everything is stored so that the native select() API 608 * can use the HandleSet without additional conversions by just casting it 609 * to a fd_set*. 610 */ 611 private struct HandleSet 612 { 613 /** Default number of handles that will be held in the HandleSet. */ 614 const uint DefaultSize = 63; 615 616 uint[] _buffer; 617 618 /** 619 * Constructor. Sets the initial number of handles that will be held 620 * in the HandleSet. 621 */ 622 void setup(uint size = DefaultSize) 623 { 624 _buffer = new uint[1 + size]; 625 _buffer[0] = 0; 626 } 627 628 /** 629 * return true if this handle set has been initialized. 630 */ 631 @property bool initialized() 632 { 633 return _buffer.length > 0; 634 } 635 636 /** 637 * Return the number of handles present in the HandleSet. 638 */ 639 @property uint length() 640 { 641 return _buffer[0]; 642 } 643 644 /** 645 * Add the handle to the set. 646 */ 647 void set(ISelectable.Handle handle) 648 in 649 { 650 assert(handle); 651 } 652 body 653 { 654 if (!isSet(handle)) 655 { 656 // If we added too many sockets we increment the size of the buffer 657 if (++_buffer[0] >= _buffer.length) 658 { 659 _buffer.length = _buffer[0] + 1; 660 } 661 _buffer[_buffer[0]] = cast(uint) handle; 662 } 663 } 664 665 /** 666 * Remove the handle from the set. 667 */ 668 void clear(ISelectable.Handle handle) 669 { 670 for (uint i = 1; i <= _buffer[0]; ++i) 671 { 672 if (_buffer[i] == cast(uint) handle) 673 { 674 // We don't need to keep the handles in the order in which 675 // they were inserted, so we optimize the removal by 676 // copying the last element to the position of the removed 677 // element. 678 if (i != _buffer[0]) 679 { 680 _buffer[i] = _buffer[_buffer[0]]; 681 } 682 _buffer[0]--; 683 return; 684 } 685 } 686 } 687 688 /** 689 * Copy the contents of the HandleSet into this instance. 690 */ 691 HandleSet copy(HandleSet handleSet) 692 { 693 if(handleSet._buffer.length > _buffer.length) 694 { 695 _buffer.length = handleSet._buffer[0] + 1; 696 } 697 698 699 _buffer[] = handleSet._buffer[0.._buffer.length]; 700 return this; 701 } 702 703 /** 704 * Check whether the handle has been set. 705 */ 706 public bool isSet(ISelectable.Handle handle) 707 { 708 if(_buffer.length == 0) 709 return false; 710 711 uint* start; 712 uint* stop; 713 714 for (start = _buffer.ptr + 1, stop = start + _buffer[0]; start != stop; start++) 715 { 716 if (*start == cast(uint) handle) 717 return true; 718 } 719 return false; 720 } 721 722 /** 723 * Cast the current object to a pointer to an fd_set, to be used with the 724 * select() system call. 725 */ 726 public fd_set* opCast() 727 { 728 return cast(fd_set*) _buffer.ptr; 729 } 730 731 732 debug (selector) 733 { 734 /** 735 * Dump the contents of a HandleSet into stdout. 736 */ 737 void dump(const(char)[] name = null) 738 { 739 if (_buffer !is null && _buffer.length > 0 && _buffer[0] > 0) 740 { 741 const(char)[] handleStr = new char[16]; 742 const(char)[] handleListStr; 743 bool isFirst = true; 744 745 if (name is null) 746 { 747 name = "HandleSet"; 748 } 749 750 for (uint i = 1; i < _buffer[0]; ++i) 751 { 752 if (!isFirst) 753 { 754 handleListStr ~= ", "; 755 } 756 else 757 { 758 isFirst = false; 759 } 760 761 handleListStr ~= itoa(handleStr, _buffer[i]); 762 } 763 764 Stdout.formatln("--- {0}[{1}]: {2}", name, _buffer[0], handleListStr); 765 } 766 } 767 } 768 } 769 } 770 else version (Posix) 771 { 772 private import tango.core.BitManip; 773 774 /** 775 * Helper class used by the select()-based Selector to store handles. 776 * On POSIX-compatible platforms the handles are kept in an array of bits. 777 * Everything is stored so that the native select() API can use the 778 * HandleSet without additional conversions by casting it to a fd_set*. 779 */ 780 private struct HandleSet 781 { 782 /** Default number of handles that will be held in the HandleSet. */ 783 enum uint DefaultSize = 1024; 784 785 BitArray _buffer; 786 787 /** 788 * Constructor. Sets the initial number of handles that will be held 789 * in the HandleSet. 790 */ 791 void setup(uint size = DefaultSize) 792 { 793 if (size < 1024) 794 size = 1024; 795 796 _buffer.length = size; 797 } 798 799 /** 800 * Return true if the handleset has been initialized 801 */ 802 @property bool initialized() 803 { 804 return _buffer.length > 0; 805 } 806 807 /** 808 * Add a handle to the set. 809 */ 810 public void set(ISelectable.Handle handle) 811 { 812 // If we added too many sockets we increment the size of the buffer 813 uint fd = cast(uint)handle; 814 if(fd >= _buffer.length) 815 _buffer.length = fd + 1; 816 _buffer[fd] = true; 817 } 818 819 /** 820 * Remove a handle from the set. 821 */ 822 public void clear(ISelectable.Handle handle) 823 { 824 auto fd = cast(uint)handle; 825 if(fd < _buffer.length) 826 _buffer[fd] = false; 827 } 828 829 /** 830 * Copy the contents of the HandleSet into this instance. 831 */ 832 HandleSet copy(HandleSet handleSet) 833 { 834 // 835 // adjust the length if necessary 836 // 837 if(handleSet._buffer.length != _buffer.length) 838 _buffer.length = handleSet._buffer.length; 839 840 _buffer[] = handleSet._buffer; 841 return this; 842 } 843 844 /** 845 * Check whether the handle has been set. 846 */ 847 bool isSet(ISelectable.Handle handle) 848 { 849 auto fd = cast(uint)handle; 850 if(fd < _buffer.length) 851 return _buffer[fd]; 852 return false; 853 } 854 855 /** 856 * Cast the current object to a pointer to an fd_set, to be used with the 857 * select() system call. 858 */ 859 fd_set* opCast() 860 { 861 return cast(fd_set*) _buffer.ptr; 862 } 863 864 debug (selector) 865 { 866 /** 867 * Dump the contents of a HandleSet into stdout. 868 */ 869 void dump(const(char)[] name = null) 870 { 871 if (_buffer !is null && _buffer.length > 0) 872 { 873 const(char)[] handleStr = new char[16]; 874 const(char)[] handleListStr; 875 bool isFirst = true; 876 877 if (name is null) 878 { 879 name = "HandleSet"; 880 } 881 882 for (uint i = 0; i < _buffer.length * _buffer[0].sizeof; ++i) 883 { 884 if (isSet(cast(ISelectable.Handle) i)) 885 { 886 if (!isFirst) 887 { 888 handleListStr ~= ", "; 889 } 890 else 891 { 892 isFirst = false; 893 } 894 handleListStr ~= itoa(handleStr, i); 895 } 896 } 897 Stdout.formatln("--- {0}: {1}", name, handleListStr); 898 } 899 } 900 } 901 } 902 }