1 /******************************************************************************* 2 3 copyright: Copyright (c) 2004 Kris Bell. All rights reserved 4 5 license: BSD style: $(LICENSE) 6 7 version: Mar 2004: Initial release 8 version: Jan 2005: RedShodan patch for timeout query 9 version: Dec 2006: Outback release 10 version: Apr 2009: revised for asynchronous IO 11 12 author: Kris 13 14 *******************************************************************************/ 15 16 module tango.net.device.Socket; 17 18 private import tango.sys.Common; 19 20 private import tango.io.device.Conduit; 21 22 package import tango.net.device.Berkeley; 23 24 /******************************************************************************* 25 26 *******************************************************************************/ 27 28 version (Windows) 29 { 30 private import tango.sys.win32.WsaSock; 31 } 32 33 /******************************************************************************* 34 35 A wrapper around the Berkeley API to implement the IConduit 36 abstraction and add stream-specific functionality. 37 38 *******************************************************************************/ 39 40 class Socket : Conduit, ISelectable 41 { 42 public alias native socket; // backward compatibility 43 44 private SocketSet pending; // synchronous timeouts 45 private Berkeley berkeley; // wrap a berkeley socket 46 47 48 /// see super.timeout(int) 49 deprecated void setTimeout (double t) 50 { 51 timeout = cast(uint) (t * 1000); 52 } 53 54 deprecated bool hadTimeout () 55 { 56 return false; 57 } 58 59 /*********************************************************************** 60 61 Create a streaming Internet socket 62 63 ***********************************************************************/ 64 65 this () 66 { 67 this (AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP); 68 } 69 70 /*********************************************************************** 71 72 Create an Internet Socket with the provided characteristics 73 74 ***********************************************************************/ 75 76 this (Address addr) 77 { 78 this (addr.addressFamily, SocketType.STREAM, ProtocolType.TCP); 79 } 80 81 /*********************************************************************** 82 83 Create an Internet socket 84 85 ***********************************************************************/ 86 87 this (AddressFamily family, SocketType type, ProtocolType protocol) 88 { 89 berkeley.open (family, type, protocol); 90 version (Windows) version(TangoRuntime) 91 if (scheduler) 92 scheduler.open (fileHandle, toString); 93 } 94 95 /*********************************************************************** 96 97 Return the name of this device 98 99 ***********************************************************************/ 100 101 override string toString() 102 { 103 return "<socket>"; 104 } 105 106 /*********************************************************************** 107 108 Models a handle-oriented device. 109 110 TODO: figure out how to avoid exposing this in the general 111 case 112 113 ***********************************************************************/ 114 115 @property Handle fileHandle () 116 { 117 return cast(Handle) berkeley.sock; 118 } 119 120 /*********************************************************************** 121 122 Return the socket wrapper 123 124 ***********************************************************************/ 125 126 @property Berkeley* native () 127 { 128 return &berkeley; 129 } 130 131 /*********************************************************************** 132 133 Return a preferred size for buffering conduit I/O 134 135 ***********************************************************************/ 136 137 @property override const size_t bufferSize () 138 { 139 return 1024 * 8; 140 } 141 142 /*********************************************************************** 143 144 Connect to the provided endpoint 145 146 ***********************************************************************/ 147 148 Socket connect (const(char)[] address, uint port) 149 { 150 assert(port < ushort.max); 151 scope addr = new IPv4Address (address, cast(ushort) port); 152 return connect (addr); 153 } 154 155 /*********************************************************************** 156 157 Connect to the provided endpoint 158 159 ***********************************************************************/ 160 161 Socket connect (Address addr) 162 { 163 version (TangoRuntime) 164 { 165 if (scheduler) 166 { 167 asyncConnect (addr); 168 return this; 169 } 170 } 171 native.connect (addr); 172 173 return this; 174 } 175 176 /*********************************************************************** 177 178 Bind this socket. This is typically used to configure a 179 listening socket (such as a server or multicast socket). 180 The address given should describe a local adapter, or 181 specify the port alone (ADDR_ANY) to have the OS assign 182 a local adapter address. 183 184 ***********************************************************************/ 185 186 Socket bind (Address address) 187 { 188 berkeley.bind (address); 189 return this; 190 } 191 192 /*********************************************************************** 193 194 Inform other end of a connected socket that we're no longer 195 available. In general, this should be invoked before close() 196 197 The shutdown function shuts down the connection of the socket: 198 199 - stops receiving data for this socket. If further data 200 arrives, it is rejected. 201 202 - stops trying to transmit data from this socket. Also 203 discards any data waiting to be sent. Stop looking for 204 acknowledgement of data already sent; don't retransmit 205 if any data is lost. 206 207 ***********************************************************************/ 208 209 Socket shutdown () 210 { 211 berkeley.shutdown (SocketShutdown.BOTH); 212 return this; 213 } 214 215 /*********************************************************************** 216 217 Release this Socket 218 219 Note that one should always disconnect a Socket under 220 normal conditions, and generally invoke shutdown on all 221 connected sockets beforehand 222 223 ***********************************************************************/ 224 225 override void detach () 226 { 227 berkeley.detach(); 228 } 229 230 /*********************************************************************** 231 232 Read content from the socket. Note that the operation 233 may timeout if method setTimeout() has been invoked with 234 a non-zero value. 235 236 Returns the number of bytes read from the socket, or 237 IConduit.Eof where there's no more content available. 238 239 ***********************************************************************/ 240 241 override size_t read (void[] dst) 242 { 243 version (TangoRuntime) 244 if (scheduler) 245 return asyncRead (dst); 246 247 auto x = Eof; 248 if (wait (true)) 249 { 250 x = native.receive (dst); 251 if (x <= 0) 252 x = Eof; 253 } 254 return x; 255 } 256 257 /*********************************************************************** 258 259 ***********************************************************************/ 260 261 override size_t write (const(void)[] src) 262 { 263 version (TangoRuntime) 264 if (scheduler) 265 return asyncWrite (src); 266 267 auto x = Eof; 268 if (wait (false)) 269 { 270 x = native.send (src); 271 if (x < 0) 272 x = Eof; 273 } 274 return x; 275 } 276 277 /*********************************************************************** 278 279 Transfer the content of another conduit to this one. Returns 280 the dst OutputStream, or throws IOException on failure. 281 282 Does optimized transfers 283 284 ***********************************************************************/ 285 286 override OutputStream copy (InputStream src, size_t max = -1) 287 { 288 auto x = cast(ISelectable) src; 289 290 version (TangoRuntime) 291 { 292 if (scheduler && x){ 293 asyncCopy (x.fileHandle); 294 return this; 295 } 296 } 297 298 super.copy (src, max); 299 return this; 300 } 301 302 /*********************************************************************** 303 304 Manage socket IO under a timeout 305 306 ***********************************************************************/ 307 308 package final bool wait (bool reading) 309 { 310 // did user enable timeout checks? 311 if (timeout != -1) 312 { 313 SocketSet read, write; 314 315 // yes, ensure we have a SocketSet 316 if (pending is null) 317 pending = new SocketSet (1); 318 pending.reset().add (native.sock); 319 320 // wait until IO is available, or a timeout occurs 321 if (reading) 322 read = pending; 323 else 324 write = pending; 325 int i = pending.select (read, write, null, timeout * 1000); 326 if (i <= 0) 327 { 328 if (i is 0) 329 super.error ("Socket :: request timeout"); 330 return false; 331 } 332 } 333 return true; 334 } 335 336 /*********************************************************************** 337 338 Throw an IOException noting the last error 339 340 ***********************************************************************/ 341 342 final void error () 343 { 344 super.error (this.toString() ~ " :: " ~ SysError.lastMsg); 345 } 346 347 /*********************************************************************** 348 349 ***********************************************************************/ 350 351 version (Win32) 352 { 353 private OVERLAPPED overlapped; 354 355 /*************************************************************** 356 357 Connect to the provided endpoint 358 359 ***************************************************************/ 360 361 private void asyncConnect (Address addr) 362 { 363 IPv4Address.sockaddr_in local; 364 365 auto handle = berkeley.sock; 366 .bind (handle, cast(Address.sockaddr*)&local, local.sizeof); 367 368 ConnectEx (handle, addr.name, addr.nameLen, null, 0, null, &overlapped); 369 version(TangoRuntime) 370 wait (scheduler.Type.Connect); 371 patch (handle, SO_UPDATE_CONNECT_CONTEXT); 372 } 373 374 /*************************************************************** 375 376 ***************************************************************/ 377 378 private void asyncCopy (Handle handle) 379 { 380 TransmitFile (berkeley.sock, cast(HANDLE) handle, 381 0, 0, &overlapped, null, 0); 382 version(TangoRuntime) 383 if (wait (scheduler.Type.Transfer) is Eof) 384 berkeley.exception ("Socket.copy :: "); 385 } 386 387 /*************************************************************** 388 389 Read a chunk of bytes from the file into the provided 390 array. Returns the number of bytes read, or Eof where 391 there is no further data. 392 393 Operates asynchronously where the hosting thread is 394 configured in that manner. 395 396 ***************************************************************/ 397 398 private size_t asyncRead (void[] dst) 399 { 400 DWORD flags; 401 DWORD bytes; 402 WSABUF buf = {dst.length, dst.ptr}; 403 404 WSARecv (cast(HANDLE) berkeley.sock, &buf, 1, &bytes, &flags, &overlapped, null); 405 version(TangoRuntime) 406 if ((bytes = wait (scheduler.Type.Read, bytes)) is Eof) 407 return Eof; 408 409 // read of zero means Eof 410 if (bytes is 0 && dst.length > 0) 411 return Eof; 412 return bytes; 413 } 414 415 /*************************************************************** 416 417 Write a chunk of bytes to the file from the provided 418 array. Returns the number of bytes written, or Eof if 419 the output is no longer available. 420 421 Operates asynchronously where the hosting thread is 422 configured in that manner. 423 424 ***************************************************************/ 425 426 private size_t asyncWrite (const(void)[] src) 427 { 428 DWORD bytes; 429 WSABUF buf = {src.length, cast(void*)src.ptr}; 430 431 WSASend (cast(HANDLE) berkeley.sock, &buf, 1, &bytes, 0, &overlapped, null); 432 version(TangoRuntime) 433 if ((bytes = wait (scheduler.Type.Write, bytes)) is Eof) 434 return Eof; 435 return bytes; 436 } 437 438 /*************************************************************** 439 440 ***************************************************************/ 441 442 version(TangoRuntime) 443 { 444 private size_t wait (scheduler.Type type, uint bytes=0) 445 { 446 while (true) 447 { 448 auto code = WSAGetLastError; 449 if (code is ERROR_HANDLE_EOF || 450 code is ERROR_BROKEN_PIPE) 451 return Eof; 452 453 if (scheduler) 454 { 455 if (code is ERROR_SUCCESS || 456 code is ERROR_IO_PENDING || 457 code is ERROR_IO_INCOMPLETE) 458 { 459 DWORD flags; 460 461 if (code is ERROR_IO_INCOMPLETE) 462 super.error ("timeout"); 463 464 auto handle = fileHandle; 465 scheduler.await (handle, type, timeout); 466 if (WSAGetOverlappedResult (handle, &overlapped, &bytes, false, &flags)) 467 return bytes; 468 } 469 else 470 error; 471 } 472 else 473 if (code is ERROR_SUCCESS) 474 return bytes; 475 else 476 error; 477 } 478 // should never get here 479 assert (false); 480 } 481 } 482 483 /*************************************************************** 484 485 ***************************************************************/ 486 487 private static void patch (socket_t dst, uint how, socket_t* src=null) 488 { 489 auto len = src ? src.sizeof : 0; 490 if (setsockopt (dst, SocketOptionLevel.SOCKET, how, src, len)) 491 berkeley.exception ("patch :: "); 492 } 493 } 494 495 496 /*********************************************************************** 497 498 ***********************************************************************/ 499 500 version (Posix) 501 { 502 /*************************************************************** 503 504 Connect to the provided endpoint 505 506 ***************************************************************/ 507 508 private void asyncConnect (Address addr) 509 { 510 assert (false); 511 } 512 513 /*************************************************************** 514 515 ***************************************************************/ 516 517 Socket asyncCopy (Handle file) 518 { 519 assert (false); 520 } 521 522 /*************************************************************** 523 524 Read a chunk of bytes from the file into the provided 525 array. Returns the number of bytes read, or Eof where 526 there is no further data. 527 528 Operates asynchronously where the hosting thread is 529 configured in that manner. 530 531 ***************************************************************/ 532 533 private size_t asyncRead (void[] dst) 534 { 535 assert (false); 536 } 537 538 /*************************************************************** 539 540 Write a chunk of bytes to the file from the provided 541 array. Returns the number of bytes written, or Eof if 542 the output is no longer available. 543 544 Operates asynchronously where the hosting thread is 545 configured in that manner. 546 547 ***************************************************************/ 548 549 private size_t asyncWrite (const(void)[] src) 550 { 551 assert (false); 552 } 553 } 554 } 555 556 557 558 /******************************************************************************* 559 560 561 *******************************************************************************/ 562 563 class ServerSocket : Socket 564 { 565 /*********************************************************************** 566 567 ***********************************************************************/ 568 569 this (uint port, int backlog=32, bool reuse=false) 570 { 571 scope addr = new IPv4Address (cast(ushort) port); 572 this (addr, backlog, reuse); 573 } 574 575 /*********************************************************************** 576 577 ***********************************************************************/ 578 579 this (Address addr, int backlog=32, bool reuse=false) 580 { 581 super (addr); 582 berkeley.addressReuse(reuse).bind(addr).listen(backlog); 583 } 584 585 /*********************************************************************** 586 587 Return the name of this device 588 589 ***********************************************************************/ 590 591 override string toString() 592 { 593 return "<accept>"; 594 } 595 596 /*********************************************************************** 597 598 ***********************************************************************/ 599 600 Socket accept (Socket recipient = null) 601 { 602 if (recipient is null) 603 recipient = new Socket; 604 605 version (TangoRuntime) 606 { 607 if (scheduler) 608 asyncAccept(recipient); 609 else 610 berkeley.accept(recipient.berkeley); 611 } 612 else 613 berkeley.accept(recipient.berkeley); 614 615 recipient.timeout = timeout; 616 return recipient; 617 } 618 619 /*********************************************************************** 620 621 ***********************************************************************/ 622 623 version (Windows) 624 { 625 /*************************************************************** 626 627 ***************************************************************/ 628 629 private void asyncAccept (Socket recipient) 630 { 631 byte[128] tmp; 632 DWORD bytes; 633 DWORD flags; 634 635 auto target = recipient.berkeley.sock; 636 AcceptEx (berkeley.sock, target, tmp.ptr, 0, 64, 64, &bytes, &overlapped); 637 version(TangoRuntime) 638 wait (scheduler.Type.Accept); 639 patch (target, SO_UPDATE_ACCEPT_CONTEXT, &berkeley.sock); 640 } 641 } 642 643 /*********************************************************************** 644 645 ***********************************************************************/ 646 647 version (Posix) 648 { 649 /*************************************************************** 650 651 ***************************************************************/ 652 653 private void asyncAccept (Socket recipient) 654 { 655 assert (false); 656 } 657 } 658 } 659