1 /*******************************************************************************
2   copyright:   Copyright (c) 2006 Juan Jose Comellas. All rights reserved
3   license:     BSD style: $(LICENSE)
4   author:      Juan Jose Comellas $(EMAIL juanjo@comellas.com.ar)
5 *******************************************************************************/
6 
7 module tango.io.selector.SelectSelector;
8 
9 
10 
11 public import tango.io.model.IConduit;
12 
13 private import Time = tango.core.Time;
14 public import tango.io.selector.model.ISelector;
15 
16 private import tango.io.selector.AbstractSelector;
17 private import tango.io.selector.SelectorException;
18 private import tango.sys.Common;
19 
20 private import tango.stdc.errno;
21 
22 debug (selector)
23 {
24     private import tango.io.Stdout;
25     private import tango.text.convert.Integer;
26 }
27 
28 
29 version (Windows)
30 {
31     private import tango.core.Thread;
32 
33     private
34     {
35         // Opaque struct
36         struct fd_set
37         {
38         }
39 
40         extern (Windows) int select(int nfds, fd_set* readfds, fd_set* writefds,
41                                     fd_set* errorfds, timeval* timeout);
42     }
43 }
44 
45 version (Posix)
46 {
47     private import tango.core.BitArray;
48 }
49 
50 
51 /**
52  * Selector that uses the select() system call to receive I/O events for
53  * the registered conduits. To use this class you would normally do
54  * something like this:
55  *
56  * Examples:
57  * ---
58  * import tango.io.selector.SelectSelector;
59  *
60  * Socket socket;
61  * ISelector selector = new SelectSelector();
62  *
63  * selector.open(100, 10);
64  *
65  * // Register to read from socket
66  * selector.register(socket, Event.Read);
67  *
68  * int eventCount = selector.select(0.1); // 0.1 seconds
69  * if (eventCount > 0)
70  * {
71  *     // We can now read from the socket
72  *     socket.read();
73  * }
74  * else if (eventCount == 0)
75  * {
76  *     // Timeout
77  * }
78  * else if (eventCount == -1)
79  * {
80  *     // Another thread called the wakeup() method.
81  * }
82  * else
83  * {
84  *     // Error: should never happen.
85  * }
86  *
87  * selector.close();
88  * ---
89  */
90 public class SelectSelector: AbstractSelector
91 {
92     /**
93      * Alias for the select() method as we're not reimplementing it in
94      * this class.
95      */
96     alias AbstractSelector.select select;
97 
98     uint _size;
99     private SelectionKey[ISelectable.Handle] _keys;
100     private HandleSet _readSet;
101     private HandleSet _writeSet;
102     private HandleSet _exceptionSet;
103     private HandleSet _selectedReadSet;
104     private HandleSet _selectedWriteSet;
105     private HandleSet _selectedExceptionSet;
106     int _eventCount;
107     version (Posix)
108     {
109         private ISelectable.Handle _maxfd = cast(ISelectable.Handle) -1;
110 
111         /**
112          * Default number of SelectionKey's that will be handled by the
113          * SelectSelector.
114          */
115         public enum uint DefaultSize = 1024;
116     }
117     else
118     {
119         /**
120          * Default number of SelectionKey's that will be handled by the
121          * SelectSelector.
122          */
123         public enum uint DefaultSize = 63;
124     }
125 
126     /**
127      * Open the select()-based selector.
128      *
129      * Params:
130      * size         = maximum amount of conduits that will be registered;
131      *                it will grow dynamically if needed.
132      * maxEvents    = maximum amount of conduit events that will be
133      *                returned in the selection set per call to select();
134      *                this value is currently not used by this selector.
135      */
136     override public void open(uint size = DefaultSize, uint maxEvents = DefaultSize)
137     in
138     {
139         assert(size > 0);
140     }
141     body
142     {
143         _size = size;
144     }
145 
146     /**
147      * Close the selector.
148      *
149      * Remarks:
150      * It can be called multiple times without harmful side-effects.
151      */
152     override public void close()
153     {
154         _size = 0;
155         _keys = null;
156         _readSet = HandleSet.init;
157         _writeSet = HandleSet.init;
158         _exceptionSet = HandleSet.init;
159         _selectedReadSet = HandleSet.init;
160         _selectedWriteSet = HandleSet.init;
161         _selectedExceptionSet = HandleSet.init;
162     }
163 
164     private HandleSet *allocateSet(ref HandleSet set, ref HandleSet selectedSet)
165     {
166         if(!set.initialized)
167         {
168             set.setup(_size);
169             selectedSet.setup(_size);
170         }
171         return &set;
172     }
173 
174     /**
175      * Associate a conduit to the selector and track specific I/O events.
176      * If a conduit is already associated with the selector, the events and
177      * attachment are upated.
178      *
179      * Params:
180      * conduit      = conduit that will be associated to the selector;
181      *                must be a valid conduit (i.e. not null and open).
182      * events       = bit mask of Event values that represent the events
183      *                that will be tracked for the conduit.
184      * attachment   = optional object with application-specific data that
185      *                will be available when an event is triggered for the
186      *                conduit
187      *
188      * Throws:
189      * RegisteredConduitException if the conduit had already been
190      * registered to the selector.
191      *
192      * Examples:
193      * ---
194      * selector.register(conduit, Event.Read | Event.Write, object);
195      * ---
196      */
197     override public void register(ISelectable conduit, Event events, Object attachment = null)
198     in
199     {
200         assert(conduit !is null && conduit.fileHandle());
201     }
202     body
203     {
204         ISelectable.Handle handle = conduit.fileHandle();
205 
206         debug (selector)
207             Stdout.format("--- SelectSelector.register(handle={0}, events=0x{1:x})\n",
208                           cast(int) handle, cast(uint) events);
209 
210         SelectionKey *key = (handle in _keys);
211         if (key !is null)
212         {
213             if ((events & Event.Read) || (events & Event.Hangup))
214             {
215                 allocateSet(_readSet, _selectedReadSet).set(handle);
216             }
217             else if (_readSet.initialized)
218             {
219                 _readSet.clear(handle);
220             }
221 
222             if ((events & Event.Write))
223             {
224                 allocateSet(_writeSet, _selectedWriteSet).set(handle);
225             }
226             else if (_writeSet.initialized)
227             {
228                 _writeSet.clear(handle);
229             }
230 
231             if (events & Event.Error)
232             {
233                 allocateSet(_exceptionSet, _selectedExceptionSet).set(handle);
234             }
235             else if (_exceptionSet.initialized)
236             {
237                 _exceptionSet.clear(handle);
238             }
239 
240             version (Posix)
241             {
242                 if (handle > _maxfd)
243                     _maxfd = handle;
244             }
245 
246             key.events = events;
247             key.attachment = attachment;
248         }
249         else
250         {
251             // Keep record of the Conduits for whom we're tracking events.
252             _keys[handle] = SelectionKey(conduit, events, attachment);
253 
254             if ((events & Event.Read) || (events & Event.Hangup))
255             {
256                 allocateSet(_readSet, _selectedReadSet).set(handle);
257             }
258 
259             if (events & Event.Write)
260             {
261                 allocateSet(_writeSet, _selectedWriteSet).set(handle);
262             }
263 
264             if (events & Event.Error)
265             {
266                 allocateSet(_exceptionSet, _selectedExceptionSet).set(handle);
267             }
268 
269             version (Posix)
270             {
271                 if (handle > _maxfd)
272                     _maxfd = handle;
273             }
274         }
275     }
276 
277     /**
278      * Remove a conduit from the selector.
279      *
280      * Params:
281      * conduit      = conduit that had been previously associated to the
282      *                selector; it can be null.
283      *
284      * Remarks:
285      * Unregistering a null conduit is allowed and no exception is thrown
286      * if this happens.
287      *
288      * Throws:
289      * UnregisteredConduitException if the conduit had not been previously
290      * registered to the selector.
291      */
292     override public void unregister(ISelectable conduit)
293     {
294         if (conduit !is null)
295         {
296             ISelectable.Handle handle = conduit.fileHandle();
297 
298             debug (selector)
299                 Stdout.format("--- SelectSelector.unregister(handle={0})\n",
300                               cast(int) handle);
301 
302             SelectionKey* removed = (handle in _keys);
303 
304             if (removed !is null)
305             {
306                 if (removed.events & Event.Error)
307                 {
308                     _exceptionSet.clear(handle);
309                 }
310                 if (removed.events & Event.Write)
311                 {
312                     _writeSet.clear(handle);
313                 }
314                 if ((removed.events & Event.Read) || (removed.events & Event.Hangup))
315                 {
316                     _readSet.clear(handle);
317                 }
318                 _keys.remove(handle);
319 
320                 version (Posix)
321                 {
322                     // If we're removing the biggest handle we've entered so far
323                     // we need to recalculate this value for the set.
324                     if (handle == _maxfd)
325                     {
326                         while (--_maxfd >= 0)
327                         {
328                             if (_readSet.isSet(_maxfd) ||
329                                 _writeSet.isSet(_maxfd) ||
330                                 _exceptionSet.isSet(_maxfd))
331                             {
332                                 break;
333                             }
334                         }
335                     }
336                 }
337             }
338             else
339             {
340                 debug (selector)
341                     Stdout.format("--- SelectSelector.unregister(handle={0}): conduit was not found\n",
342                                   cast(int) conduit.fileHandle());
343                 throw new UnregisteredConduitException(__FILE__, __LINE__);
344             }
345         }
346     }
347 
348     /**
349      * Wait for I/O events from the registered conduits for a specified
350      * amount of time.
351      *
352      * Params:
353      * timeout  = TimeSpan with the maximum amount of time that the
354      *            selector will wait for events from the conduits; the
355      *            amount of time is relative to the current system time
356      *            (i.e. just the number of milliseconds that the selector
357      *            has to wait for the events).
358      *
359      * Returns:
360      * The amount of conduits that have received events; 0 if no conduits
361      * have received events within the specified timeout; and -1 if the
362      * wakeup() method has been called from another thread.
363      *
364      * Throws:
365      * InterruptedSystemCallException if the underlying system call was
366      * interrupted by a signal and the 'restartInterruptedSystemCall'
367      * property was set to false; SelectorException if there were no
368      * resources available to wait for events from the conduits.
369      */
370     override public int select(TimeSpan timeout)
371     {
372         fd_set *readfds;
373         fd_set *writefds;
374         fd_set *exceptfds;
375         timeval tv;
376         version (Windows)
377             bool handlesAvailable = false;
378 
379         debug (selector)
380             Stdout.format("--- SelectSelector.select(timeout={0} msec)\n", timeout.millis);
381 
382         if (_readSet.initialized)
383         {
384             debug (selector)
385                 _readSet.dump("_readSet");
386 
387             version (Windows)
388                 handlesAvailable = handlesAvailable || (_readSet.length > 0);
389 
390             readfds = cast(fd_set*) _selectedReadSet.copy(_readSet);
391         }
392         if (_writeSet.initialized)
393         {
394             debug (selector)
395                 _writeSet.dump("_writeSet");
396 
397             version (Windows)
398                 handlesAvailable = handlesAvailable || (_writeSet.length > 0);
399 
400             writefds = cast(fd_set*) _selectedWriteSet.copy(_writeSet);
401         }
402         if (_exceptionSet.initialized)
403         {
404             debug (selector)
405                 _exceptionSet.dump("_exceptionSet");
406 
407             version (Windows)
408                 handlesAvailable = handlesAvailable || (_exceptionSet.length > 0);
409 
410             exceptfds = cast(fd_set*) _selectedExceptionSet.copy(_exceptionSet);
411         }
412 
413         version (Posix)
414         {
415             while (true)
416             {
417                 toTimeval(&tv, timeout);
418 
419                 // FIXME: add support for the wakeup() call.
420                 _eventCount = .select(_maxfd + 1, readfds, writefds, exceptfds, timeout is TimeSpan.max ? null : &tv);
421 
422                 debug (selector)
423                     Stdout.format("---   .select() returned {0} (maxfd={1})\n",
424                                   _eventCount, cast(int) _maxfd);
425                 if (_eventCount >= 0)
426                 {
427                     break;
428                 }
429                 else
430                 {
431                     if (errno != EINTR || !_restartInterruptedSystemCall)
432                     {
433                         // checkErrno() always throws an exception
434                         checkErrno(__FILE__, __LINE__);
435                     }
436                     debug (selector)
437                         Stdout.print("--- Restarting select() after being interrupted\n");
438                 }
439             }
440         }
441         else
442         {
443             // Windows returns an error when select() is called with all three
444             // handle sets empty, so we emulate the POSIX behavior by calling
445             // Thread.sleep().
446             if (handlesAvailable)
447             {
448                 toTimeval(&tv, timeout);
449 
450                 // FIXME: Can a system call be interrupted on Windows?
451                 _eventCount = .select(uint.max, readfds, writefds, exceptfds, timeout is TimeSpan.max ? null : &tv);
452 
453                 debug (selector)
454                     Stdout.format("---   .select() returned {0}\n", _eventCount);
455             }
456             else
457             {
458                 Thread.sleep(Time.seconds(timeout.interval()));
459                 _eventCount = 0;
460             }
461         }
462         return _eventCount;
463     }
464 
465     /**
466      * Return the selection set resulting from the call to any of the
467      * select() methods.
468      *
469      * Remarks:
470      * If the call to select() was unsuccessful or it did not return any
471      * events, the returned value will be null.
472      */
473     override public ISelectionSet selectedSet()
474     {
475         return (_eventCount > 0 ? new SelectSelectionSet(_keys, cast(uint) _eventCount, _selectedReadSet,
476                                                          _selectedWriteSet, _selectedExceptionSet) : null);
477     }
478 
479     /**
480      * Return the selection key resulting from the registration of a
481      * conduit to the selector.
482      *
483      * Remarks:
484      * If the conduit is not registered to the selector the returned
485      * value will be null. No exception will be thrown by this method.
486      */
487     override public SelectionKey key(ISelectable conduit)
488     {
489         if(conduit !is null)
490         {
491             if(auto k = conduit.fileHandle in _keys)
492             {
493                 return *k;
494             }
495         }
496         return SelectionKey.init;
497     }
498 
499     /**
500      * Return the number of keys resulting from the registration of a conduit
501      * to the selector.
502      */
503     override public size_t count()
504     {
505         return _keys.length;
506     }
507 
508     /**
509      * Iterate through the currently registered selection keys.  Note that
510      * you should not erase or add any items from the selector while
511      * iterating, although you can register existing conduits again.
512      */
513     int opApply(scope int delegate(ref SelectionKey) dg)
514     {
515         int result = 0;
516         foreach(v; _keys)
517         {
518             if((result = dg(v)) != 0)
519                 break;
520         }
521         return result;
522     }
523 }
524 
525 /**
526  * SelectionSet for the select()-based Selector.
527  */
528 private class SelectSelectionSet: ISelectionSet
529 {
530     SelectionKey[ISelectable.Handle] _keys;
531     uint _eventCount;
532     HandleSet _readSet;
533     HandleSet _writeSet;
534     HandleSet _exceptionSet;
535 
536     this(SelectionKey[ISelectable.Handle] keys, uint eventCount,
537                    HandleSet readSet, HandleSet writeSet, HandleSet exceptionSet)
538     {
539         _keys = keys;
540         _eventCount = eventCount;
541         _readSet = readSet;
542         _writeSet = writeSet;
543         _exceptionSet = exceptionSet;
544     }
545 
546     @property size_t length()
547     {
548         return _eventCount;
549     }
550 
551     int opApply(scope int delegate(ref SelectionKey) dg)
552     {
553         int rc = 0;
554         ISelectable.Handle handle;
555         Event events;
556 
557         debug (selector)
558             Stdout.format("--- SelectSelectionSet.opApply() ({0} elements)\n", _eventCount);
559 
560         foreach (SelectionKey current; _keys)
561         {
562             handle = current.conduit.fileHandle();
563 
564             if (_readSet.isSet(handle))
565                 events = Event.Read;
566             else
567                 events = Event.None;
568 
569             if (_writeSet.isSet(handle))
570                 events |= Event.Write;
571 
572             if (_exceptionSet.isSet(handle))
573                 events |= Event.Error;
574 
575             // Only invoke the delegate if there is an event for the conduit.
576             if (events != Event.None)
577             {
578                 current.events = events;
579 
580                 debug (selector)
581                     Stdout.format("---   Calling foreach delegate with selection key ({0}, 0x{1:x})\n",
582                                   cast(int) handle, cast(uint) events);
583 
584                 if ((rc = dg(current)) != 0)
585                 {
586                     break;
587                 }
588             }
589             else
590             {
591                 debug (selector)
592                     Stdout.format("---   Handle {0} doesn't have pending events\n",
593                                   cast(int) handle);
594             }
595         }
596         return rc;
597     }
598 }
599 
600 
601 version (Windows)
602 {
603     /**
604      * Helper class used by the select()-based Selector to store handles.
605      * On Windows the handles are kept in an array of uints and the first
606      * element of the array stores the array "length" (i.e. number of handles
607      * in the array). Everything is stored so that the native select() API
608      * can use the HandleSet without additional conversions by just casting it
609      * to a fd_set*.
610      */
611     private struct HandleSet
612     {
613         /** Default number of handles that will be held in the HandleSet. */
614         const uint DefaultSize = 63;
615 
616         uint[] _buffer;
617 
618         /**
619          * Constructor. Sets the initial number of handles that will be held
620          * in the HandleSet.
621          */
622         void setup(uint size = DefaultSize)
623         {
624             _buffer = new uint[1 + size];
625             _buffer[0] = 0;
626         }
627 
628         /**
629          *  return true if this handle set has been initialized.
630          */
631         @property bool initialized()
632         {
633             return _buffer.length > 0;
634         }
635 
636         /**
637          * Return the number of handles present in the HandleSet.
638          */
639         @property uint length()
640         {
641             return _buffer[0];
642         }
643 
644         /**
645          * Add the handle to the set.
646          */
647         void set(ISelectable.Handle handle)
648         in
649         {
650             assert(handle);
651         }
652         body
653         {
654             if (!isSet(handle))
655             {
656                 // If we added too many sockets we increment the size of the buffer
657                 if (++_buffer[0] >= _buffer.length)
658                 {
659                     _buffer.length = _buffer[0] + 1;
660                 }
661                 _buffer[_buffer[0]] = cast(uint) handle;
662             }
663         }
664 
665         /**
666          * Remove the handle from the set.
667          */
668         void clear(ISelectable.Handle handle)
669         {
670             for (uint i = 1; i <= _buffer[0]; ++i)
671             {
672                 if (_buffer[i] == cast(uint) handle)
673                 {
674                     // We don't need to keep the handles in the order in which
675                     // they were inserted, so we optimize the removal by
676                     // copying the last element to the position of the removed
677                     // element.
678                     if (i != _buffer[0])
679                     {
680                         _buffer[i] = _buffer[_buffer[0]];
681                     }
682                     _buffer[0]--;
683                     return;
684                 }
685             }
686         }
687 
688         /**
689          * Copy the contents of the HandleSet into this instance.
690          */
691         HandleSet copy(HandleSet handleSet)
692         {
693             if(handleSet._buffer.length > _buffer.length)
694             {
695                 _buffer.length = handleSet._buffer[0] + 1;
696             }
697 
698 
699             _buffer[] = handleSet._buffer[0.._buffer.length];
700             return this;
701         }
702 
703         /**
704          * Check whether the handle has been set.
705          */
706         public bool isSet(ISelectable.Handle handle)
707         {
708             if(_buffer.length == 0)
709                 return false;
710 
711             uint* start;
712             uint* stop;
713 
714             for (start = _buffer.ptr + 1, stop = start + _buffer[0]; start != stop; start++)
715             {
716                 if (*start == cast(uint) handle)
717                     return true;
718             }
719             return false;
720         }
721 
722         /**
723          * Cast the current object to a pointer to an fd_set, to be used with the
724          * select() system call.
725          */
726         public fd_set* opCast()
727         {
728             return cast(fd_set*) _buffer.ptr;
729         }
730 
731 
732         debug (selector)
733         {
734             /**
735              * Dump the contents of a HandleSet into stdout.
736              */
737             void dump(const(char)[] name = null)
738             {
739                 if (_buffer !is null && _buffer.length > 0 && _buffer[0] > 0)
740                 {
741                     const(char)[] handleStr = new char[16];
742                     const(char)[] handleListStr;
743                     bool isFirst = true;
744 
745                     if (name is null)
746                     {
747                         name = "HandleSet";
748                     }
749 
750                     for (uint i = 1; i < _buffer[0]; ++i)
751                     {
752                         if (!isFirst)
753                         {
754                             handleListStr ~= ", ";
755                         }
756                         else
757                         {
758                             isFirst = false;
759                         }
760 
761                         handleListStr ~= itoa(handleStr, _buffer[i]);
762                     }
763 
764                     Stdout.formatln("--- {0}[{1}]: {2}", name, _buffer[0], handleListStr);
765                 }
766             }
767         }
768     }
769 }
770 else version (Posix)
771 {
772     private import tango.core.BitManip;
773 
774     /**
775      * Helper class used by the select()-based Selector to store handles.
776      * On POSIX-compatible platforms the handles are kept in an array of bits.
777      * Everything is stored so that the native select() API can use the
778      * HandleSet without additional conversions by casting it to a fd_set*.
779      */
780     private struct HandleSet
781     {
782         /** Default number of handles that will be held in the HandleSet. */
783         enum uint DefaultSize     = 1024;
784 
785         BitArray _buffer;
786 
787         /**
788          * Constructor. Sets the initial number of handles that will be held
789          * in the HandleSet.
790          */
791         void setup(uint size = DefaultSize)
792         {
793             if (size < 1024)
794                 size = 1024;
795 
796             _buffer.length = size;
797         }
798 
799         /**
800          * Return true if the handleset has been initialized
801          */
802         @property bool initialized()
803         {
804             return _buffer.length > 0;
805         }
806 
807         /**
808          * Add a handle to the set.
809          */
810         public void set(ISelectable.Handle handle)
811         {
812             // If we added too many sockets we increment the size of the buffer
813             uint fd = cast(uint)handle;
814             if(fd >= _buffer.length)
815                 _buffer.length = fd + 1;
816             _buffer[fd] = true;
817         }
818 
819         /**
820          * Remove a handle from the set.
821          */
822         public void clear(ISelectable.Handle handle)
823         {
824             auto fd = cast(uint)handle;
825             if(fd < _buffer.length)
826                 _buffer[fd] = false;
827         }
828 
829         /**
830          * Copy the contents of the HandleSet into this instance.
831          */
832         HandleSet copy(HandleSet handleSet)
833         {
834             //
835             // adjust the length if necessary
836             //
837             if(handleSet._buffer.length != _buffer.length)
838                 _buffer.length = handleSet._buffer.length;
839             
840             _buffer[] = handleSet._buffer;
841             return this;
842         }
843 
844         /**
845          * Check whether the handle has been set.
846          */
847         bool isSet(ISelectable.Handle handle)
848         {
849             auto fd = cast(uint)handle;
850             if(fd < _buffer.length)
851                 return _buffer[fd];
852             return false;
853         }
854 
855         /**
856          * Cast the current object to a pointer to an fd_set, to be used with the
857          * select() system call.
858          */
859         fd_set* opCast()
860         {
861             return cast(fd_set*) _buffer.ptr;
862         }
863 
864         debug (selector)
865         {
866             /**
867              * Dump the contents of a HandleSet into stdout.
868              */
869             void dump(const(char)[] name = null)
870             {
871                 if (_buffer !is null && _buffer.length > 0)
872                 {
873                     const(char)[] handleStr = new char[16];
874                     const(char)[] handleListStr;
875                     bool isFirst = true;
876 
877                     if (name is null)
878                     {
879                         name = "HandleSet";
880                     }
881 
882                     for (uint i = 0; i < _buffer.length * _buffer[0].sizeof; ++i)
883                     {
884                         if (isSet(cast(ISelectable.Handle) i))
885                         {
886                             if (!isFirst)
887                             {
888                                 handleListStr ~= ", ";
889                             }
890                             else
891                             {
892                                 isFirst = false;
893                             }
894                             handleListStr ~= itoa(handleStr, i);
895                         }
896                     }
897                     Stdout.formatln("--- {0}: {1}", name, handleListStr);
898                 }
899             }
900         }
901     }
902 }