From ad258e6eaa7d2686626bf553348649fac51d641f Mon Sep 17 00:00:00 2001 From: Chris Boden Date: Thu, 10 Nov 2011 20:07:02 -0500 Subject: [PATCH] Socket Buffering Server now buffers incoming messages until it (thinks it) receives the full message. Slight tweak of HyBi-10: spacing, FIN indicator, continuation frame recognition Sockets close() if container is destroyed --- .../Protocol/WebSocket/Version/HyBi10.php | 250 +++++++++--------- lib/Ratchet/Server.php | 33 ++- lib/Ratchet/Socket.php | 4 + 3 files changed, 160 insertions(+), 127 deletions(-) diff --git a/lib/Ratchet/Protocol/WebSocket/Version/HyBi10.php b/lib/Ratchet/Protocol/WebSocket/Version/HyBi10.php index c0e1000..7238f6b 100644 --- a/lib/Ratchet/Protocol/WebSocket/Version/HyBi10.php +++ b/lib/Ratchet/Protocol/WebSocket/Version/HyBi10.php @@ -36,70 +36,82 @@ class HyBi10 implements VersionInterface { */ public function unframe($message) { $data = $message; - $mask = $payloadLength = $unmaskedPayload = ''; - $decodedData = array(); + $mask = $payloadLength = $unmaskedPayload = ''; + $decodedData = array(); - // estimate frame type: - $firstByteBinary = sprintf('%08b', ord($data[0])); - $secondByteBinary = sprintf('%08b', ord($data[1])); - $opcode = bindec(substr($firstByteBinary, 4, 4)); - $isMasked = ($secondByteBinary[0] == '1') ? true : false; - $payloadLength = ord($data[1]) & 127; + // estimate frame type: + $firstByteBinary = sprintf('%08b', ord($data[0])); + $finIndicator = bindec(substr($firstByteBinary, 0, 1)); + $opcode = bindec(substr($firstByteBinary, 4, 4)); - // close connection if unmasked frame is received: - if($isMasked === false) { + $secondByteBinary = sprintf('%08b', ord($data[1])); + $isMasked = ($secondByteBinary[0] == '1') ? true : false; + $payloadLength = ord($data[1]) & 127; + + // close connection if unmasked frame is received: + if($isMasked === false) { throw new \UnexpectedValueException('Masked byte is false'); - } + } - switch($opcode) { - // text frame: - case 1: - $decodedData['type'] = 'text'; - break; - - // connection close frame: - case 8: - $decodedData['type'] = 'close'; - break; - - // ping frame: - case 9: - $decodedData['type'] = 'ping'; - break; - - // pong frame: - case 10: - $decodedData['type'] = 'pong'; - break; - - default: - // Close connection on unknown opcode: + switch($opcode) { + // continuation frame + case 0: + $decodedData['type'] = 'text'; // incomplete + break; + + // text frame: + case 1: + $decodedData['type'] = 'text'; + break; + + // binary data frame + case 2: + $decodedData['type'] = 'binary'; + break; + + // connection close frame: + case 8: + $decodedData['type'] = 'close'; + break; + + // ping frame: + case 9: + $decodedData['type'] = 'ping'; + break; + + // pong frame: + case 10: + $decodedData['type'] = 'pong'; + break; + + default: + // Close connection on unknown opcode: throw new \UnexpectedValueException("Unknown opcode ({$opcode})"); - break; - } + break; + } - if($payloadLength === 126) { - $mask = substr($data, 4, 4); - $payloadOffset = 8; - } elseif($payloadLength === 127) { - $mask = substr($data, 10, 4); - $payloadOffset = 14; - } else { - $mask = substr($data, 2, 4); - $payloadOffset = 6; - } + if($payloadLength === 126) { + $mask = substr($data, 4, 4); + $payloadOffset = 8; + } elseif($payloadLength === 127) { + $mask = substr($data, 10, 4); + $payloadOffset = 14; + } else { + $mask = substr($data, 2, 4); + $payloadOffset = 6; + } - $dataLength = strlen($data); + $dataLength = strlen($data); - if($isMasked === true) { - for($i = $payloadOffset; $i < $dataLength; $i++) { - $j = $i - $payloadOffset; - $unmaskedPayload .= $data[$i] ^ $mask[$j % 4]; - } - $decodedData['payload'] = $unmaskedPayload; - } + if($isMasked === true) { // This will always pass... + for($i = $payloadOffset; $i < $dataLength; $i++) { + $j = $i - $payloadOffset; + $unmaskedPayload .= $data[$i] ^ $mask[$j % 4]; + } + $decodedData['payload'] = $unmaskedPayload; + } - return $decodedData; + return $decodedData; } /** @@ -114,74 +126,74 @@ class HyBi10 implements VersionInterface { $type = 'text'; $masked = true; - $frameHead = array(); - $frame = ''; - $payloadLength = strlen($payload); + $frameHead = array(); + $frame = ''; + $payloadLength = strlen($payload); - switch($type) { - case 'text': - // first byte indicates FIN, Text-Frame (10000001): - $frameHead[0] = 129; - break; - - case 'close': - // first byte indicates FIN, Close Frame(10001000): - $frameHead[0] = 136; - break; - - case 'ping': - // first byte indicates FIN, Ping frame (10001001): - $frameHead[0] = 137; - break; - - case 'pong': - // first byte indicates FIN, Pong frame (10001010): - $frameHead[0] = 138; - break; - } - - // set mask and payload length (using 1, 3 or 9 bytes) - if($payloadLength > 65535) { - $payloadLengthBin = str_split(sprintf('%064b', $payloadLength), 8); - $frameHead[1] = ($masked === true) ? 255 : 127; - for($i = 0; $i < 8; $i++) { - $frameHead[$i+2] = bindec($payloadLengthBin[$i]); - } - // most significant bit MUST be 0 (return false if to much data) - if($frameHead[2] > 127) { - return false; - } - } elseif($payloadLength > 125) { - $payloadLengthBin = str_split(sprintf('%016b', $payloadLength), 8); - $frameHead[1] = ($masked === true) ? 254 : 126; - $frameHead[2] = bindec($payloadLengthBin[0]); - $frameHead[3] = bindec($payloadLengthBin[1]); - } else { - $frameHead[1] = ($masked === true) ? $payloadLength + 128 : $payloadLength; - } + switch($type) { + case 'text': + // first byte indicates FIN, Text-Frame (10000001): + $frameHead[0] = 129; + break; - // convert frame-head to string: - foreach(array_keys($frameHead) as $i) { - $frameHead[$i] = chr($frameHead[$i]); - } if($masked === true) { - // generate a random mask: - $mask = array(); - for($i = 0; $i < 4; $i++) - { - $mask[$i] = chr(rand(0, 255)); - } - - $frameHead = array_merge($frameHead, $mask); - } - $frame = implode('', $frameHead); + case 'close': + // first byte indicates FIN, Close Frame(10001000): + $frameHead[0] = 136; + break; - // append payload to frame: - $framePayload = array(); - for($i = 0; $i < $payloadLength; $i++) { - $frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i]; - } + case 'ping': + // first byte indicates FIN, Ping frame (10001001): + $frameHead[0] = 137; + break; - return $frame; + case 'pong': + // first byte indicates FIN, Pong frame (10001010): + $frameHead[0] = 138; + break; + } + + // set mask and payload length (using 1, 3 or 9 bytes) + if($payloadLength > 65535) { + $payloadLengthBin = str_split(sprintf('%064b', $payloadLength), 8); + $frameHead[1] = ($masked === true) ? 255 : 127; + for($i = 0; $i < 8; $i++) { + $frameHead[$i+2] = bindec($payloadLengthBin[$i]); + } + // most significant bit MUST be 0 (return false if to much data) + if($frameHead[2] > 127) { + return false; + } + } elseif($payloadLength > 125) { + $payloadLengthBin = str_split(sprintf('%016b', $payloadLength), 8); + $frameHead[1] = ($masked === true) ? 254 : 126; + $frameHead[2] = bindec($payloadLengthBin[0]); + $frameHead[3] = bindec($payloadLengthBin[1]); + } else { + $frameHead[1] = ($masked === true) ? $payloadLength + 128 : $payloadLength; + } + + // convert frame-head to string: + foreach(array_keys($frameHead) as $i) { + $frameHead[$i] = chr($frameHead[$i]); + } if($masked === true) { + // generate a random mask: + $mask = array(); + for($i = 0; $i < 4; $i++) + { + $mask[$i] = chr(rand(0, 255)); + } + + $frameHead = array_merge($frameHead, $mask); + } + $frame = implode('', $frameHead); + + // append payload to frame: + $framePayload = array(); + for($i = 0; $i < $payloadLength; $i++) { + $frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i]; + } + + return $frame; } /** diff --git a/lib/Ratchet/Server.php b/lib/Ratchet/Server.php index d15caab..b8762e9 100644 --- a/lib/Ratchet/Server.php +++ b/lib/Ratchet/Server.php @@ -8,8 +8,14 @@ use Ratchet\Command\CommandInterface; * Creates an open-ended socket to listen on a port for incomming connections. Events are delegated through this to attached applications * @todo Consider using _connections as master reference and passing iterator_to_array(_connections) to socket_select * @todo Currently passing Socket object down the decorated chain - should be sending reference to it instead; Receivers do not interact with the Socket directly, they do so through the Command pattern + * @todo With all these options for the server I should probably use a DIC */ class Server implements SocketObserver, \IteratorAggregate { + /** + * This is probably temporary + */ + const RECV_BYTES = 1024; + /** * The master socket, receives all connections * @type Socket @@ -65,6 +71,9 @@ class Server implements SocketObserver, \IteratorAggregate { set_time_limit(0); ob_implicit_flush(); + $this->_master->set_nonblock(); + declare(ticks = 1); + if (false === ($this->_master->bind($address, (int)$port))) { throw new Exception($this->_master); } @@ -73,9 +82,6 @@ class Server implements SocketObserver, \IteratorAggregate { throw new Exception($this->_master); } - $this->_master->set_nonblock(); - declare(ticks = 1); - do { try { $changed = $this->_resources; @@ -86,13 +92,24 @@ class Server implements SocketObserver, \IteratorAggregate { $res = $this->onOpen($this->_master); } else { $conn = $this->_connections[$resource]; - $data = null; - $bytes = $conn->recv($data, 4096, 0); + $data = $buf = ''; + + $bytes = $conn->recv($buf, static::RECV_BYTES, 0); + if ($bytes > 0) { + $data = $buf; + + // This idea works* but... + // 1) A single DDOS attack will block the entire application (I think) + // 2) What if the last message in the frame is equal to RECV_BYTES? Would loop until another msg is sent + // Need to 1) proc_open the recv() calls. 2) ??? + while ($bytes === static::RECV_BYTES) { + $bytes = $conn->recv($buf, static::RECV_BYTES, 0); + $data .= $buf; + } - if ($bytes == 0) { - $res = $this->onClose($conn); - } else { $res = $this->onRecv($conn, $data); + } else { + $res = $this->onClose($conn); } } diff --git a/lib/Ratchet/Socket.php b/lib/Ratchet/Socket.php index 0ee614d..6a7beb2 100644 --- a/lib/Ratchet/Socket.php +++ b/lib/Ratchet/Socket.php @@ -34,6 +34,10 @@ class Socket implements SocketInterface { } } + public function __destruct() { + @socket_close($this->_resource); + } + /** * @return resource (Socket) */