From 6cbf0eb186c34176d18a654a396a0ac382e82664 Mon Sep 17 00:00:00 2001 From: Chris Boden Date: Sat, 9 Jun 2012 19:38:44 -0400 Subject: [PATCH] [WebSocket] Refactoring Updated deps; React Socket notify client of shutdown Separated core interfaces into many Removed initial version support out of handshake negotiator Moved message parser responsibility to each version Removed __toString method from MessageInterface as to not confuse message from payload Support for RFC control frames Support message concatenation [BCB] (temporary) WsConnection hard coded to RFC version Handshake checks for \r\n\r\n anywhere, not just at end of string --- HandshakeNegotiator.php | 24 +++----- MessageParser.php | 47 ---------------- Version/MessageInterface.php | 5 -- Version/RFC6455.php | 103 +++++++++++++++++++++++++++++++++-- Version/RFC6455/Frame.php | 73 +++++++++++++++---------- Version/VersionInterface.php | 5 +- WsConnection.php | 38 ++++--------- WsServer.php | 23 ++++---- 8 files changed, 175 insertions(+), 143 deletions(-) delete mode 100644 MessageParser.php diff --git a/HandshakeNegotiator.php b/HandshakeNegotiator.php index 1bce9a3..1b80b13 100644 --- a/HandshakeNegotiator.php +++ b/HandshakeNegotiator.php @@ -1,12 +1,13 @@ enableVersion(new Version\RFC6455); - $this->enableVersion(new Version\HyBi10); - $this->enableVersion(new Version\Hixie76); - } - } - /** * @param WsConnection */ - public function onOpen(WsConnection $conn) { + public function onOpen(ConnectionInterface $conn) { $conn->WebSocket->handshakeBuffer = ''; } @@ -41,7 +34,7 @@ class HandshakeNegotiator { * @return Guzzle\Http\Message\Response|null Response object if it's done parsing, null if there's more to be buffered * @throws HttpException */ - public function onData(WsConnection $conn, $data) { + public function onMessage(ConnectionInterface $conn, $data) { $conn->WebSocket->handshakeBuffer .= $data; if (strlen($conn->WebSocket->handshakeBuffer) >= (int)$this->maxSize) { @@ -66,7 +59,8 @@ class HandshakeNegotiator { $response = $version->handshake($conn->WebSocket->request); $response->setHeader('X-Powered-By', \Ratchet\VERSION); - $conn->setVersion($version); + // This needs to be decoupled + $conn->WebSocket->version = $version; unset($conn->WebSocket->handshakeBuffer); return $response; @@ -77,10 +71,10 @@ class HandshakeNegotiator { * Determine if the message has been buffered as per the HTTP specification * @param string * @return boolean - * @todo Safari does not send 2xCRLF after the 6 byte body...this will always return false for Hixie */ public function isEom($message) { - return (static::EOM === substr($message, 0 - strlen(static::EOM))); + //return (static::EOM === substr($message, 0 - strlen(static::EOM))); + return (boolean)strpos($message, static::EOM); } /** diff --git a/MessageParser.php b/MessageParser.php deleted file mode 100644 index 51a6941..0000000 --- a/MessageParser.php +++ /dev/null @@ -1,47 +0,0 @@ -WebSocket->message)) { - $from->WebSocket->message = $from->WebSocket->version->newMessage(); - } - - // There is a frame fragment attatched to the connection, add to it - if (!isset($from->WebSocket->frame)) { - $from->WebSocket->frame = $from->WebSocket->version->newFrame(); - } - - $from->WebSocket->frame->addBuffer($data); - if ($from->WebSocket->frame->isCoalesced()) { - // check if masked - // close if not - - if ($from->WebSocket->frame->getOpcode() > 2) { - // take action on the control frame - - unset($from->WebSocket->frame); - - return; - } - - // Check frame - // If is control frame, do your thing - // Else, add to message - // Control frames (ping, pong, close) can be sent in between a fragmented message - - $nextFrame = $from->WebSocket->version->newFrame(); - $nextFrame->addBuffer($from->WebSocket->frame->extractOverflow()); - - $from->WebSocket->message->addFrame($from->WebSocket->frame); - $from->WebSocket->frame = $nextFrame; - } - - if ($from->WebSocket->message->isCoalesced()) { - $parsed = (string)$from->WebSocket->message; - unset($from->WebSocket->message); - - return $parsed; - } - } -} \ No newline at end of file diff --git a/Version/MessageInterface.php b/Version/MessageInterface.php index 4c21114..1dc91fc 100644 --- a/Version/MessageInterface.php +++ b/Version/MessageInterface.php @@ -5,11 +5,6 @@ namespace Ratchet\WebSocket\Version; * @todo Consider making parent interface/composite for Message/Frame with (isCoalesced, getOpcdoe, getPayloadLength, getPayload) */ interface MessageInterface { - /** - * @alias getPayload - */ - function __toString(); - /** * @return bool */ diff --git a/Version/RFC6455.php b/Version/RFC6455.php index c01613b..2f06de8 100644 --- a/Version/RFC6455.php +++ b/Version/RFC6455.php @@ -1,6 +1,10 @@ _verifier = new HandshakeVerifier; + /** + * @var Ratchet\MessageInterface + */ + protected $coalescedCallback; + + public function __construct(MessageInterface $coalescedCallback = null) { + $this->_verifier = new HandshakeVerifier; + $this->coalescedCallback = $coalescedCallback; } /** @@ -29,6 +39,9 @@ class RFC6455 implements VersionInterface { return ($this->getVersionNumber() === $version); } + /** + * {@inheritdoc} + */ public function getVersionNumber() { return 13; } @@ -51,18 +64,96 @@ class RFC6455 implements VersionInterface { return new Response(101, $headers); } + /** + * {@inheritdoc} + */ + public function onMessage(ConnectionInterface $from, $data) { + $overflow = ''; + + if (!isset($from->WebSocket->message)) { + $from->WebSocket->message = $this->newMessage(); + } + + // There is a frame fragment attatched to the connection, add to it + if (!isset($from->WebSocket->frame)) { + $from->WebSocket->frame = $this->newFrame(); + } + + $from->WebSocket->frame->addBuffer($data); + if ($from->WebSocket->frame->isCoalesced()) { + $frame = $from->WebSocket->frame; + + if (!$frame->isMasked()) { + unset($from->WebSocket->frame); + + $from->send($this->newFrame($frame::CLOSE_PROTOCOL, true, $frame::OP_CLOSE)); + $from->getConnection()->close(); + + return; + } + + $opcode = $frame->getOpcode(); + + if ($opcode > 2) { + switch ($opcode) { + case $frame::OP_CLOSE: + $from->send($frame->unMaskPayload()); + $from->getConnection()->close(); +// $from->send(Frame::create(Frame::CLOSE_NORMAL, true, Frame::OP_CLOSE)); + + return; + break; + case $frame::OP_PING: + $from->send($this->newFrame($frame->getPayload(), true, $frame::OP_PONG)); + break; + case $frame::OP_PONG: + break; + default: + return $from->close($frame::CLOSE_PROTOCOL); + break; + } + + $overflow = $from->WebSocket->frame->extractOverflow(); + + unset($from->WebSocket->frame, $frame, $opcode); + + if (strlen($overflow) > 0) { + $this->onMessage($from, $overflow); + } + + return; + } + + $overflow = $from->WebSocket->frame->extractOverflow(); + + $from->WebSocket->message->addFrame($from->WebSocket->frame); + unset($from->WebSocket->frame); + } + + if ($from->WebSocket->message->isCoalesced()) { + $parsed = $from->WebSocket->message->getPayload(); + unset($from->WebSocket->message); + + $this->coalescedCallback->onMessage($from, $parsed); + } + + if (strlen($overflow) > 0) { + $this->onMessage($from, $overflow); + } + } + /** * @return RFC6455\Message */ public function newMessage() { - return new RFC6455\Message; + return new Message; } /** * @return RFC6455\Frame */ - public function newFrame() { - return new RFC6455\Frame; + public function newFrame($payload = null, $final = true, $opcode = 1) { + return new Frame($payload, $final, $opcode); } /** @@ -71,7 +162,7 @@ class RFC6455 implements VersionInterface { * @return string */ public function frame($message, $mask = true) { - return RFC6455\Frame::create($message)->data; + return $this->newFrame($message)->data; } /** diff --git a/Version/RFC6455/Frame.php b/Version/RFC6455/Frame.php index 03601ff..21ad94d 100644 --- a/Version/RFC6455/Frame.php +++ b/Version/RFC6455/Frame.php @@ -10,6 +10,19 @@ class Frame implements FrameInterface { const OP_PING = 9; const OP_PONG = 10; + const CLOSE_NORMAL = 1000; + const CLOSE_GOING_AWAY = 1001; + const CLOSE_PROTOCOL = 1002; + const CLOSE_BAD_DATA = 1003; + const CLOSE_NO_STATUS = 1005; + const CLOSE_ABNORMAL = 1006; + const CLOSE_BAD_PAYLOAD = 1007; + const CLOSE_POLICY = 1008; + const CLOSE_TOO_BIG = 1009; + const CLOSE_MAND_EXT = 1010; + const CLOSE_SRV_ERR = 1011; + const CLOSE_TLS = 1015; + const MASK_LENGTH = 4; /** @@ -22,7 +35,7 @@ class Frame implements FrameInterface { * Number of bytes received from the frame * @var int */ - public $_bytes_rec = 0; + public $bytesRecvd = 0; /** * Number of bytes in the payload (as per framing protocol) @@ -30,20 +43,9 @@ class Frame implements FrameInterface { */ protected $_pay_len_def = -1; - /** - * @param string A valid UTF-8 string to send over the wire - * @param bool Is the final frame in a message - * @param int The opcode of the frame, see constants - * @param bool Mask the payload - * @return Frame - * @throws InvalidArgumentException If the payload is not a valid UTF-8 string - * @throws LengthException If the payload is too big - */ - public static function create($payload, $final = true, $opcode = 1) { - $frame = new static(); - - if (!mb_check_encoding($payload, 'UTF-8')) { - throw new \InvalidArgumentException("Payload is not a valid UTF-8 string"); + public function __construct($payload = null, $final = true, $opcode = 1) { + if (null === $payload) { + return; } $raw = (int)(boolean)$final . sprintf('%07b', (int)$opcode); @@ -57,9 +59,20 @@ class Frame implements FrameInterface { $raw .= sprintf('%08b', 127) . sprintf('%064b', $plLen); } - $frame->addBuffer(static::encode($raw) . $payload); + $this->addBuffer(static::encode($raw) . $payload); + } - return $frame; + /** + * @param string A valid UTF-8 string to send over the wire + * @param bool Is the final frame in a message + * @param int The opcode of the frame, see constants + * @param bool Mask the payload + * @return Frame + * @throws InvalidArgumentException If the payload is not a valid UTF-8 string + * @throws LengthException If the payload is too big + */ + public static function create($payload, $final = true, $opcode = 1) { + return new static($payload, $final, $opcode); } /** @@ -93,7 +106,7 @@ class Frame implements FrameInterface { return false; } - return $this->_bytes_rec >= $payload_length + $payload_start; + return $this->bytesRecvd >= $payload_length + $payload_start; } /** @@ -103,14 +116,14 @@ class Frame implements FrameInterface { $buf = (string)$buf; $this->data .= $buf; - $this->_bytes_rec += strlen($buf); + $this->bytesRecvd += strlen($buf); } /** * {@inheritdoc} */ public function isFinal() { - if ($this->_bytes_rec < 1) { + if ($this->bytesRecvd < 1) { throw new \UnderflowException('Not enough bytes received to determine if this is the final frame in message'); } @@ -123,8 +136,8 @@ class Frame implements FrameInterface { * {@inheritdoc} */ public function isMasked() { - if ($this->_bytes_rec < 2) { - throw new \UnderflowException("Not enough bytes received ({$this->_bytes_rec}) to determine if mask is set"); + if ($this->bytesRecvd < 2) { + throw new \UnderflowException("Not enough bytes received ({$this->bytesRecvd}) to determine if mask is set"); } return (boolean)bindec(substr(sprintf('%08b', ord(substr($this->data, 1, 1))), 0, 1)); @@ -140,7 +153,7 @@ class Frame implements FrameInterface { $start = 1 + $this->getNumPayloadBytes(); - if ($this->_bytes_rec < $start + static::MASK_LENGTH) { + if ($this->bytesRecvd < $start + static::MASK_LENGTH) { throw new \UnderflowException('Not enough data buffered to calculate the masking key'); } @@ -186,7 +199,7 @@ class Frame implements FrameInterface { $this->data = substr_replace($this->data, static::encode(substr_replace($byte, '1', 0, 1)), 1, 1); $this->data = substr_replace($this->data, $maskingKey, $this->getNumPayloadBytes() + 1, 0); - $this->_bytes_rec += static::MASK_LENGTH; + $this->bytesRecvd += static::MASK_LENGTH; $this->data = substr_replace($this->data, $this->applyMask($maskingKey), $this->getPayloadStartingByte(), $this->getPayloadLength()); return $this; @@ -209,7 +222,7 @@ class Frame implements FrameInterface { $this->data = substr_replace($this->data, static::encode(substr_replace($byte, '0', 0, 1)), 1, 1); $this->data = substr_replace($this->data, '', $this->getNumPayloadBytes() + 1, static::MASK_LENGTH); - $this->_bytes_rec -= static::MASK_LENGTH; + $this->bytesRecvd -= static::MASK_LENGTH; $this->data = substr_replace($this->data, $this->applyMask($maskingKey), $this->getPayloadStartingByte(), $this->getPayloadLength()); return $this; @@ -236,7 +249,7 @@ class Frame implements FrameInterface { * {@inheritdoc} */ public function getOpcode() { - if ($this->_bytes_rec < 1) { + if ($this->bytesRecvd < 1) { throw new \UnderflowException('Not enough bytes received to determine opcode'); } @@ -249,7 +262,7 @@ class Frame implements FrameInterface { * @throws UnderflowException If the buffer doesn't have enough data to determine this */ protected function getFirstPayloadVal() { - if ($this->_bytes_rec < 2) { + if ($this->bytesRecvd < 2) { throw new \UnderflowException('Not enough bytes received'); } @@ -261,7 +274,7 @@ class Frame implements FrameInterface { * @throws UnderflowException */ protected function getNumPayloadBits() { - if ($this->_bytes_rec < 2) { + if ($this->bytesRecvd < 2) { throw new \UnderflowException('Not enough bytes received'); } @@ -315,7 +328,7 @@ class Frame implements FrameInterface { } $byte_length = $this->getNumPayloadBytes(); - if ($this->_bytes_rec < 1 + $byte_length) { + if ($this->bytesRecvd < 1 + $byte_length) { throw new \UnderflowException('Not enough data buffered to determine payload length'); } @@ -365,7 +378,7 @@ class Frame implements FrameInterface { $endPoint = $this->getPayloadLength(); $endPoint += $this->getPayloadStartingByte(); - if ($this->_bytes_rec > $endPoint) { + if ($this->bytesRecvd > $endPoint) { $overflow = substr($this->data, $endPoint); $this->data = substr($this->data, 0, $endPoint); diff --git a/Version/VersionInterface.php b/Version/VersionInterface.php index d0839b9..630c62a 100644 --- a/Version/VersionInterface.php +++ b/Version/VersionInterface.php @@ -1,11 +1,12 @@ hasVersion()) { + if ($data instanceof FrameInterface) { + $data = $data->data; + } elseif (isset($this->WebSocket->version)) { // need frame caching $data = $this->WebSocket->version->frame($data, false); } @@ -29,7 +29,12 @@ class WsConnection extends AbstractConnectionDecorator { $this->getConnection()->send($data); } - public function close() { + /** + * {@inheritdoc} + * @todo If code is 1000 send close frame - false is close w/o frame...? + */ + public function close($code = 1000) { + $this->send(Frame::create($code, true, Frame::OP_CLOSE)); // send close frame with code 1000 // ??? @@ -38,23 +43,4 @@ class WsConnection extends AbstractConnectionDecorator { $this->getConnection()->close(); // temporary } - - /** - * @return boolean - * @internal - */ - public function hasVersion() { - return (null === $this->version); - } - - /** - * Set the WebSocket protocol version to communicate with - * @param Ratchet\WebSocket\Version\VersionInterface - * @internal - */ - public function setVersion(VersionInterface $version) { - $this->WebSocket->version = $version; - - return $this; - } } \ No newline at end of file diff --git a/WsServer.php b/WsServer.php index d1e0f2b..20b2a6d 100644 --- a/WsServer.php +++ b/WsServer.php @@ -2,6 +2,7 @@ namespace Ratchet\WebSocket; use Ratchet\MessageComponentInterface; use Ratchet\ConnectionInterface; +use Ratchet\WebSocket\Version; use Guzzle\Http\Message\RequestInterface; use Ratchet\WebSocket\Guzzle\Http\Message\RequestFactory; @@ -32,11 +33,6 @@ class WsServer implements MessageComponentInterface { */ protected $connections; - /** - * @var MessageParser - */ - protected $messager; - /** * For now, array_push accepted subprotocols to this array * @deprecated @@ -54,10 +50,15 @@ class WsServer implements MessageComponentInterface { * @param Ratchet\MessageComponentInterface Your application to run with WebSockets */ public function __construct(MessageComponentInterface $component) { - mb_internal_encoding('UTF-8'); + //mb_internal_encoding('UTF-8'); - $this->handshaker = new HandshakeNegotiator; - $this->messager = new MessageParser; + $this->handshaker = new HandshakeNegotiator(); + + $this->handshaker + ->enableVersion(new Version\RFC6455($component)) + ->enableVersion(new Version\HyBi10($component)) + //->enableVersion(new Version\Hixie76) + ; $this->_decorating = $component; $this->connections = new \SplObjectStorage; @@ -83,7 +84,7 @@ class WsServer implements MessageComponentInterface { $conn = $this->connections[$from]; if (true !== $conn->WebSocket->established) { - if (null === ($response = $this->handshaker->onData($conn, $msg))) { + if (null === ($response = $this->handshaker->onMessage($conn, $msg))) { return; } @@ -103,9 +104,7 @@ class WsServer implements MessageComponentInterface { return $this->_decorating->onOpen($conn); } - if (null !== ($parsed = $this->messager->onData($conn, $msg))) { - $this->_decorating->onMessage($conn, $parsed); - } + $conn->WebSocket->version->onMessage($conn, $msg); } /**