1 /*******************************************************************************
2 
3         copyright:      Copyright (c) 2004 Kris Bell. All rights reserved
4 
5         license:        BSD style: $(LICENSE)
6 
7         version:        Mar 2004: Initial release
8         version:        Jan 2005: RedShodan patch for timeout query
9         version:        Dec 2006: Outback release
10         version:        Apr 2009: revised for asynchronous IO
11 
12         author:         Kris
13 
14 *******************************************************************************/
15 
16 module tango.net.device.Socket;
17 
18 private import tango.sys.Common;
19 
20 private import tango.io.device.Conduit;
21 
22 package import tango.net.device.Berkeley;
23 
24 /*******************************************************************************
25 
26 *******************************************************************************/
27 
28 version (Windows)
29 {
30          private import tango.sys.win32.WsaSock;
31 }
32 
33 /*******************************************************************************
34 
35         A wrapper around the Berkeley API to implement the IConduit 
36         abstraction and add stream-specific functionality.
37 
38 *******************************************************************************/
39 
40 class Socket : Conduit, ISelectable
41 {
42         public alias native socket;             // backward compatibility
43 
44         private SocketSet pending;              // synchronous timeouts   
45         private Berkeley  berkeley;             // wrap a berkeley socket
46 
47 
48         /// see super.timeout(int)
49         deprecated void setTimeout (double t) 
50         {
51                 timeout = cast(uint) (t * 1000);
52         }
53 
54         deprecated bool hadTimeout ()
55         {
56                 return false;
57         }
58 
59         /***********************************************************************
60         
61                 Create a streaming Internet socket
62 
63         ***********************************************************************/
64 
65         this ()
66         {
67                 this (AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP);
68         }
69 
70         /***********************************************************************
71         
72                 Create an Internet Socket with the provided characteristics
73 
74         ***********************************************************************/
75 
76         this (Address addr) 
77         { 
78                 this (addr.addressFamily, SocketType.STREAM, ProtocolType.TCP); 
79         }
80                                 
81         /***********************************************************************
82         
83                 Create an Internet socket
84 
85         ***********************************************************************/
86 
87         this (AddressFamily family, SocketType type, ProtocolType protocol)
88         {
89                 berkeley.open (family, type, protocol);
90                 version (Windows) version(TangoRuntime)
91                          if (scheduler)
92                              scheduler.open (fileHandle, toString);
93         }
94 
95         /***********************************************************************
96 
97                 Return the name of this device
98 
99         ***********************************************************************/
100 
101         override string toString()
102         {
103                 return "<socket>";
104         }
105 
106         /***********************************************************************
107 
108                 Models a handle-oriented device. 
109 
110                 TODO: figure out how to avoid exposing this in the general
111                 case
112 
113         ***********************************************************************/
114 
115         @property Handle fileHandle ()
116         {
117                 return cast(Handle) berkeley.sock;
118         }
119 
120         /***********************************************************************
121 
122                 Return the socket wrapper
123                 
124         ***********************************************************************/
125 
126         @property Berkeley* native ()
127         {
128                 return &berkeley;
129         }
130 
131         /***********************************************************************
132 
133                 Return a preferred size for buffering conduit I/O
134 
135         ***********************************************************************/
136 
137         @property override const size_t bufferSize ()
138         {
139                 return 1024 * 8;
140         }
141 
142         /***********************************************************************
143 
144                 Connect to the provided endpoint
145         
146         ***********************************************************************/
147 
148         Socket connect (const(char)[] address, uint port)
149         {
150                 assert(port < ushort.max);
151                 scope addr = new IPv4Address (address, cast(ushort) port);
152                 return connect (addr);
153         }
154 
155         /***********************************************************************
156 
157                 Connect to the provided endpoint
158         
159         ***********************************************************************/
160 
161         Socket connect (Address addr)
162         {
163                 version (TangoRuntime)
164                 {
165                     if (scheduler)
166                         {
167                         asyncConnect (addr);
168                         return this;
169                         }
170                 }
171                 native.connect (addr);
172                 
173                 return this;
174         }
175 
176         /***********************************************************************
177 
178                 Bind this socket. This is typically used to configure a
179                 listening socket (such as a server or multicast socket).
180                 The address given should describe a local adapter, or
181                 specify the port alone (ADDR_ANY) to have the OS assign
182                 a local adapter address.
183         
184         ***********************************************************************/
185 
186         Socket bind (Address address)
187         {
188                 berkeley.bind (address);
189                 return this;
190         }
191 
192         /***********************************************************************
193 
194                 Inform other end of a connected socket that we're no longer
195                 available. In general, this should be invoked before close()
196         
197                 The shutdown function shuts down the connection of the socket: 
198 
199                     -   stops receiving data for this socket. If further data 
200                         arrives, it is rejected.
201 
202                     -   stops trying to transmit data from this socket. Also
203                         discards any data waiting to be sent. Stop looking for 
204                         acknowledgement of data already sent; don't retransmit 
205                         if any data is lost.
206 
207         ***********************************************************************/
208 
209         Socket shutdown ()
210         {
211                 berkeley.shutdown (SocketShutdown.BOTH);
212                 return this;
213         }
214 
215         /***********************************************************************
216 
217                 Release this Socket
218 
219                 Note that one should always disconnect a Socket under 
220                 normal conditions, and generally invoke shutdown on all 
221                 connected sockets beforehand
222 
223         ***********************************************************************/
224 
225         override void detach ()
226         {
227                 berkeley.detach();
228         }
229         
230        /***********************************************************************
231 
232                 Read content from the socket. Note that the operation 
233                 may timeout if method setTimeout() has been invoked with 
234                 a non-zero value.
235 
236                 Returns the number of bytes read from the socket, or
237                 IConduit.Eof where there's no more content available.
238 
239         ***********************************************************************/
240 
241         override size_t read (void[] dst)
242         {
243             version (TangoRuntime)
244                 if (scheduler)
245                     return asyncRead (dst);
246             
247                 auto x = Eof;
248                 if (wait (true))
249                    {
250                    x = native.receive (dst);
251                    if (x <= 0)
252                        x = Eof;
253                    }
254                 return x;                        
255         }
256         
257         /***********************************************************************
258 
259         ***********************************************************************/
260 
261         override size_t write (const(void)[] src)
262         {
263                 version (TangoRuntime)
264                     if (scheduler)
265                         return asyncWrite (src);
266 
267                 auto x = Eof;
268                 if (wait (false))
269                    {
270                    x = native.send (src);
271                    if (x < 0)
272                        x = Eof;
273                    }
274                 return x;                        
275         }
276 
277         /***********************************************************************
278 
279                 Transfer the content of another conduit to this one. Returns
280                 the dst OutputStream, or throws IOException on failure.
281 
282                 Does optimized transfers 
283 
284         ***********************************************************************/
285 
286         override OutputStream copy (InputStream src, size_t max = -1)
287         {
288                 auto x = cast(ISelectable) src;
289                 
290                 version (TangoRuntime)
291                 {
292                     if (scheduler && x){
293                         asyncCopy (x.fileHandle);
294                         return this;
295                     }
296                 }
297                 
298                 super.copy (src, max);
299                 return this;
300         }
301 
302         /***********************************************************************
303  
304                 Manage socket IO under a timeout
305 
306         ***********************************************************************/
307 
308         package final bool wait (bool reading)
309         {
310                 // did user enable timeout checks?
311                 if (timeout != -1)
312                    {
313                    SocketSet read, write;
314 
315                    // yes, ensure we have a SocketSet
316                    if (pending is null)
317                        pending = new SocketSet (1);
318                    pending.reset().add (native.sock);
319 
320                    // wait until IO is available, or a timeout occurs
321                    if (reading)
322                        read = pending;
323                    else
324                       write = pending;
325                    int i = pending.select (read, write, null, timeout * 1000);
326                    if (i <= 0)
327                       {
328                       if (i is 0)
329                           super.error ("Socket :: request timeout");
330                       return false;
331                       }
332                    }       
333                 return true;
334         }
335 
336         /***********************************************************************
337 
338                 Throw an IOException noting the last error
339         
340         ***********************************************************************/
341 
342         final void error ()
343         {
344                 super.error (this.toString() ~ " :: " ~ SysError.lastMsg);
345         }
346 
347         /***********************************************************************
348  
349         ***********************************************************************/
350 
351         version (Win32)
352         {
353                 private OVERLAPPED overlapped;
354         
355                 /***************************************************************
356         
357                         Connect to the provided endpoint
358                 
359                 ***************************************************************/
360         
361                 private void asyncConnect (Address addr)
362                 {
363                         IPv4Address.sockaddr_in local;
364         
365                         auto handle = berkeley.sock;
366                         .bind (handle, cast(Address.sockaddr*)&local, local.sizeof);
367         
368                         ConnectEx (handle, addr.name, addr.nameLen, null, 0, null, &overlapped);
369                         version(TangoRuntime)
370                            wait (scheduler.Type.Connect);
371                         patch (handle, SO_UPDATE_CONNECT_CONTEXT);
372                 }
373         
374                 /***************************************************************
375         
376                 ***************************************************************/
377         
378                 private void asyncCopy (Handle handle)
379                 {
380                         TransmitFile (berkeley.sock, cast(HANDLE) handle, 
381                                       0, 0, &overlapped, null, 0);
382                         version(TangoRuntime)
383                         if (wait (scheduler.Type.Transfer) is Eof)
384                             berkeley.exception ("Socket.copy :: ");
385                 }
386 
387                 /***************************************************************
388 
389                         Read a chunk of bytes from the file into the provided
390                         array. Returns the number of bytes read, or Eof where 
391                         there is no further data.
392 
393                         Operates asynchronously where the hosting thread is
394                         configured in that manner.
395 
396                 ***************************************************************/
397 
398                 private size_t asyncRead (void[] dst)
399                 {
400                         DWORD flags;
401                         DWORD bytes;
402                         WSABUF buf = {dst.length, dst.ptr};
403 
404                         WSARecv (cast(HANDLE) berkeley.sock, &buf, 1, &bytes, &flags, &overlapped, null);
405                         version(TangoRuntime)
406                         if ((bytes = wait (scheduler.Type.Read, bytes)) is Eof)
407                              return Eof;
408 
409                         // read of zero means Eof
410                         if (bytes is 0 && dst.length > 0)
411                             return Eof;
412                         return bytes;
413                 }
414 
415                 /***************************************************************
416 
417                         Write a chunk of bytes to the file from the provided
418                         array. Returns the number of bytes written, or Eof if 
419                         the output is no longer available.
420 
421                         Operates asynchronously where the hosting thread is
422                         configured in that manner.
423 
424                 ***************************************************************/
425 
426                 private size_t asyncWrite (const(void)[] src)
427                 {
428                         DWORD bytes;
429                         WSABUF buf = {src.length, cast(void*)src.ptr};
430 
431                         WSASend (cast(HANDLE) berkeley.sock, &buf, 1, &bytes, 0, &overlapped, null);
432                         version(TangoRuntime)
433                         if ((bytes = wait (scheduler.Type.Write, bytes)) is Eof)
434                              return Eof;
435                         return bytes;
436                 }
437 
438                 /***************************************************************
439 
440                 ***************************************************************/
441 
442                 version(TangoRuntime)
443                 {
444                    private size_t wait (scheduler.Type type, uint bytes=0)
445                    {
446                            while (true)
447                                  {
448                                  auto code = WSAGetLastError;
449                                  if (code is ERROR_HANDLE_EOF ||
450                                      code is ERROR_BROKEN_PIPE)
451                                      return Eof;
452 
453                                  if (scheduler)
454                                     {
455                                     if (code is ERROR_SUCCESS || 
456                                         code is ERROR_IO_PENDING || 
457                                         code is ERROR_IO_INCOMPLETE)
458                                        {
459                                        DWORD flags;
460 
461                                        if (code is ERROR_IO_INCOMPLETE)
462                                            super.error ("timeout"); 
463 
464                                        auto handle = fileHandle;
465                                        scheduler.await (handle, type, timeout);
466                                        if (WSAGetOverlappedResult (handle, &overlapped, &bytes, false, &flags))
467                                            return bytes;
468                                        }
469                                     else
470                                        error;
471                                     }
472                                  else
473                                     if (code is ERROR_SUCCESS)
474                                         return bytes;
475                                     else
476                                        error;
477                                  }
478                            // should never get here
479                            assert (false);
480                    }
481                 }
482         
483                 /***************************************************************
484         
485                 ***************************************************************/
486         
487                 private static void patch (socket_t dst, uint how, socket_t* src=null)
488                 {
489                         auto len = src ? src.sizeof : 0;
490                         if (setsockopt (dst, SocketOptionLevel.SOCKET, how, src, len))
491                             berkeley.exception ("patch :: ");
492                 }
493         }
494 
495 
496         /***********************************************************************
497  
498         ***********************************************************************/
499 
500         version (Posix)
501         {
502                 /***************************************************************
503         
504                         Connect to the provided endpoint
505                 
506                 ***************************************************************/
507         
508                 private void asyncConnect (Address addr)
509                 {
510                         assert (false);
511                 }
512         
513                 /***************************************************************
514         
515                 ***************************************************************/
516         
517                 Socket asyncCopy (Handle file)
518                 {
519                         assert (false);
520                 }
521 
522                 /***************************************************************
523 
524                         Read a chunk of bytes from the file into the provided
525                         array. Returns the number of bytes read, or Eof where 
526                         there is no further data.
527 
528                         Operates asynchronously where the hosting thread is
529                         configured in that manner.
530 
531                 ***************************************************************/
532 
533                 private size_t asyncRead (void[] dst)
534                 {
535                         assert (false);
536                 }
537 
538                 /***************************************************************
539 
540                         Write a chunk of bytes to the file from the provided
541                         array. Returns the number of bytes written, or Eof if 
542                         the output is no longer available.
543 
544                         Operates asynchronously where the hosting thread is
545                         configured in that manner.
546 
547                 ***************************************************************/
548 
549                 private size_t asyncWrite (const(void)[] src)
550                 {
551                         assert (false);
552                 }
553         }
554 }
555 
556 
557 
558 /*******************************************************************************
559 
560 
561 *******************************************************************************/
562 
563 class ServerSocket : Socket
564 {      
565         /***********************************************************************
566 
567         ***********************************************************************/
568 
569         this (uint port, int backlog=32, bool reuse=false)
570         {
571                 scope addr = new IPv4Address (cast(ushort) port);
572                 this (addr, backlog, reuse);
573         }
574 
575         /***********************************************************************
576 
577         ***********************************************************************/
578 
579         this (Address addr, int backlog=32, bool reuse=false)
580         {
581                 super (addr);
582                 berkeley.addressReuse(reuse).bind(addr).listen(backlog);
583         }
584 
585         /***********************************************************************
586 
587                 Return the name of this device
588 
589         ***********************************************************************/
590 
591         override string toString()
592         {
593                 return "<accept>";
594         }
595 
596         /***********************************************************************
597 
598         ***********************************************************************/
599 
600         Socket accept (Socket recipient = null)
601         {
602                 if (recipient is null)
603                     recipient = new Socket;
604                     
605                 version (TangoRuntime)
606                 {
607                     if (scheduler)
608                         asyncAccept(recipient);
609                     else
610                         berkeley.accept(recipient.berkeley);
611                 }
612                 else
613                     berkeley.accept(recipient.berkeley);
614                 
615                 recipient.timeout = timeout;
616                 return recipient;
617         }
618 
619         /***********************************************************************
620 
621         ***********************************************************************/
622 
623         version (Windows)
624         {
625                 /***************************************************************
626 
627                 ***************************************************************/
628 
629                 private void asyncAccept (Socket recipient)
630                 {
631                         byte[128]      tmp;
632                         DWORD          bytes;
633                         DWORD          flags;
634 
635                         auto target = recipient.berkeley.sock;
636                         AcceptEx (berkeley.sock, target, tmp.ptr, 0, 64, 64, &bytes, &overlapped);
637                         version(TangoRuntime)
638                            wait (scheduler.Type.Accept);
639                         patch (target, SO_UPDATE_ACCEPT_CONTEXT, &berkeley.sock);
640                 }
641         }
642 
643         /***********************************************************************
644 
645         ***********************************************************************/
646 
647         version (Posix)
648         {
649                 /***************************************************************
650 
651                 ***************************************************************/
652 
653                 private void asyncAccept (Socket recipient)
654                 {
655                         assert (false);
656                 }
657         }
658 }
659