[WebSocket] Refactoring

Updated deps; React Socket notify client of shutdown
Separated core interfaces into many
Removed initial version support out of handshake negotiator
Moved message parser responsibility to each version
Removed __toString method from MessageInterface as to not confuse message from payload
Support for RFC control frames
Support message concatenation
[BCB] (temporary) WsConnection hard coded to RFC version
Handshake checks for \r\n\r\n anywhere, not just at end of string
This commit is contained in:
Chris Boden 2012-06-09 19:38:44 -04:00
parent b27c9700f4
commit 55243550af
13 changed files with 204 additions and 173 deletions

22
composer.lock generated
View File

@ -4,7 +4,7 @@
{
"package": "evenement/evenement",
"version": "dev-master",
"source-reference": "808e3aaea8d4f908e455b0e047cc1acc46b38d44"
"source-reference": "fa966683e7df3e5dd5929d984a44abfbd6bafe8d"
},
{
"package": "guzzle/guzzle",
@ -13,36 +13,28 @@
{
"package": "react/event-loop",
"version": "dev-master",
"source-reference": "cc341b109feae06fa33dff7486aa567e3b9d1406"
"source-reference": "0927a2129394f10cc8534994271c6073ca9e350c"
},
{
"package": "react/socket",
"version": "dev-master",
"source-reference": "6801c6d8653e1999cb34b235cdb4b3a287e4d528"
"source-reference": "b78d96a2cde9a78ab2f923e9aa9a40f778d051df"
},
{
"package": "symfony/event-dispatcher",
"version": "dev-master",
"source-reference": "eb82542e8ec9506096caf7c528564c740a214f56",
"alias-pretty-version": "2.1.x-dev",
"alias-version": "2.1.9999999.9999999-dev"
},
{
"package": "symfony/event-dispatcher",
"version": "dev-master",
"source-reference": "eb82542e8ec9506096caf7c528564c740a214f56"
"source-reference": "30d3f5da80c2aeab15bcdb5a7d448d15bc294b23"
},
{
"package": "symfony/http-foundation",
"version": "dev-master",
"source-reference": "3d9f4ce435f6322b9720c209ad610202526373c0",
"alias-pretty-version": "2.1.x-dev",
"alias-version": "2.1.9999999.9999999-dev"
"source-reference": "d9ef2afd0218415a8c04ea48a2c83bb5b8f0f51c"
},
{
"package": "symfony/http-foundation",
"version": "dev-master",
"source-reference": "3d9f4ce435f6322b9720c209ad610202526373c0"
"alias-pretty-version": "2.1.x-dev",
"alias-version": "2.1.9999999.9999999-dev"
}
],
"packages-dev": null,

View File

@ -1,6 +1,5 @@
<?php
namespace Ratchet;
use Ratchet\ConnectionInterface;
/**
* This is the interface to build a Ratchet application with

View File

@ -1,13 +1,5 @@
<?php
namespace Ratchet;
use Ratchet\ConnectionInterface;
interface MessageComponentInterface extends ComponentInterface {
/**
* Triggered when a client sends data through the socket
* @param Ratchet\ConnectionInterface The socket/connection that sent the message to your application
* @param string The message received
* @throws Exception
*/
function onMessage(ConnectionInterface $from, $msg);
interface MessageComponentInterface extends ComponentInterface, MessageInterface {
}

View File

@ -0,0 +1,12 @@
<?php
namespace Ratchet;
interface MessageInterface {
/**
* Triggered when a client sends data through the socket
* @param Ratchet\ConnectionInterface The socket/connection that sent the message to your application
* @param string The message received
* @throws Exception
*/
function onMessage(ConnectionInterface $from, $msg);
}

View File

@ -1,12 +1,13 @@
<?php
namespace Ratchet\WebSocket;
use Ratchet\MessageInterface;
use Ratchet\ConnectionInterface;
use Ratchet\WebSocket\Guzzle\Http\Message\RequestFactory;
use Ratchet\WebSocket\Version\VersionInterface;
use Ratchet\WebSocket\Version;
use Guzzle\Http\Message\RequestInterface;
use Guzzle\Http\Message\Response;
class HandshakeNegotiator {
class HandshakeNegotiator implements MessageInterface {
const EOM = "\r\n\r\n";
/**
@ -20,18 +21,10 @@ class HandshakeNegotiator {
protected $versions = array();
public function __construct($autoloadVersions = true) {
if ($autoloadVersions) {
$this->enableVersion(new Version\RFC6455);
$this->enableVersion(new Version\HyBi10);
$this->enableVersion(new Version\Hixie76);
}
}
/**
* @param WsConnection
*/
public function onOpen(WsConnection $conn) {
public function onOpen(ConnectionInterface $conn) {
$conn->WebSocket->handshakeBuffer = '';
}
@ -41,7 +34,7 @@ class HandshakeNegotiator {
* @return Guzzle\Http\Message\Response|null Response object if it's done parsing, null if there's more to be buffered
* @throws HttpException
*/
public function onData(WsConnection $conn, $data) {
public function onMessage(ConnectionInterface $conn, $data) {
$conn->WebSocket->handshakeBuffer .= $data;
if (strlen($conn->WebSocket->handshakeBuffer) >= (int)$this->maxSize) {
@ -66,7 +59,8 @@ class HandshakeNegotiator {
$response = $version->handshake($conn->WebSocket->request);
$response->setHeader('X-Powered-By', \Ratchet\VERSION);
$conn->setVersion($version);
// This needs to be decoupled
$conn->WebSocket->version = $version;
unset($conn->WebSocket->handshakeBuffer);
return $response;
@ -77,10 +71,10 @@ class HandshakeNegotiator {
* Determine if the message has been buffered as per the HTTP specification
* @param string
* @return boolean
* @todo Safari does not send 2xCRLF after the 6 byte body...this will always return false for Hixie
*/
public function isEom($message) {
return (static::EOM === substr($message, 0 - strlen(static::EOM)));
//return (static::EOM === substr($message, 0 - strlen(static::EOM)));
return (boolean)strpos($message, static::EOM);
}
/**

View File

@ -1,47 +0,0 @@
<?php
namespace Ratchet\WebSocket;
class MessageParser {
public function onData(WsConnection $from, $data) {
if (!isset($from->WebSocket->message)) {
$from->WebSocket->message = $from->WebSocket->version->newMessage();
}
// There is a frame fragment attatched to the connection, add to it
if (!isset($from->WebSocket->frame)) {
$from->WebSocket->frame = $from->WebSocket->version->newFrame();
}
$from->WebSocket->frame->addBuffer($data);
if ($from->WebSocket->frame->isCoalesced()) {
// check if masked
// close if not
if ($from->WebSocket->frame->getOpcode() > 2) {
// take action on the control frame
unset($from->WebSocket->frame);
return;
}
// Check frame
// If is control frame, do your thing
// Else, add to message
// Control frames (ping, pong, close) can be sent in between a fragmented message
$nextFrame = $from->WebSocket->version->newFrame();
$nextFrame->addBuffer($from->WebSocket->frame->extractOverflow());
$from->WebSocket->message->addFrame($from->WebSocket->frame);
$from->WebSocket->frame = $nextFrame;
}
if ($from->WebSocket->message->isCoalesced()) {
$parsed = (string)$from->WebSocket->message;
unset($from->WebSocket->message);
return $parsed;
}
}
}

View File

@ -5,11 +5,6 @@ namespace Ratchet\WebSocket\Version;
* @todo Consider making parent interface/composite for Message/Frame with (isCoalesced, getOpcdoe, getPayloadLength, getPayload)
*/
interface MessageInterface {
/**
* @alias getPayload
*/
function __toString();
/**
* @return bool
*/

View File

@ -1,6 +1,10 @@
<?php
namespace Ratchet\WebSocket\Version;
use Ratchet\ConnectionInterface;
use Ratchet\MessageInterface;
use Ratchet\WebSocket\Version\RFC6455\HandshakeVerifier;
use Ratchet\WebSocket\Version\RFC6455\Message;
use Ratchet\WebSocket\Version\RFC6455\Frame;
use Guzzle\Http\Message\RequestInterface;
use Guzzle\Http\Message\Response;
@ -16,8 +20,14 @@ class RFC6455 implements VersionInterface {
*/
protected $_verifier;
public function __construct() {
$this->_verifier = new HandshakeVerifier;
/**
* @var Ratchet\MessageInterface
*/
protected $coalescedCallback;
public function __construct(MessageInterface $coalescedCallback = null) {
$this->_verifier = new HandshakeVerifier;
$this->coalescedCallback = $coalescedCallback;
}
/**
@ -29,6 +39,9 @@ class RFC6455 implements VersionInterface {
return ($this->getVersionNumber() === $version);
}
/**
* {@inheritdoc}
*/
public function getVersionNumber() {
return 13;
}
@ -51,18 +64,96 @@ class RFC6455 implements VersionInterface {
return new Response(101, $headers);
}
/**
* {@inheritdoc}
*/
public function onMessage(ConnectionInterface $from, $data) {
$overflow = '';
if (!isset($from->WebSocket->message)) {
$from->WebSocket->message = $this->newMessage();
}
// There is a frame fragment attatched to the connection, add to it
if (!isset($from->WebSocket->frame)) {
$from->WebSocket->frame = $this->newFrame();
}
$from->WebSocket->frame->addBuffer($data);
if ($from->WebSocket->frame->isCoalesced()) {
$frame = $from->WebSocket->frame;
if (!$frame->isMasked()) {
unset($from->WebSocket->frame);
$from->send($this->newFrame($frame::CLOSE_PROTOCOL, true, $frame::OP_CLOSE));
$from->getConnection()->close();
return;
}
$opcode = $frame->getOpcode();
if ($opcode > 2) {
switch ($opcode) {
case $frame::OP_CLOSE:
$from->send($frame->unMaskPayload());
$from->getConnection()->close();
// $from->send(Frame::create(Frame::CLOSE_NORMAL, true, Frame::OP_CLOSE));
return;
break;
case $frame::OP_PING:
$from->send($this->newFrame($frame->getPayload(), true, $frame::OP_PONG));
break;
case $frame::OP_PONG:
break;
default:
return $from->close($frame::CLOSE_PROTOCOL);
break;
}
$overflow = $from->WebSocket->frame->extractOverflow();
unset($from->WebSocket->frame, $frame, $opcode);
if (strlen($overflow) > 0) {
$this->onMessage($from, $overflow);
}
return;
}
$overflow = $from->WebSocket->frame->extractOverflow();
$from->WebSocket->message->addFrame($from->WebSocket->frame);
unset($from->WebSocket->frame);
}
if ($from->WebSocket->message->isCoalesced()) {
$parsed = $from->WebSocket->message->getPayload();
unset($from->WebSocket->message);
$this->coalescedCallback->onMessage($from, $parsed);
}
if (strlen($overflow) > 0) {
$this->onMessage($from, $overflow);
}
}
/**
* @return RFC6455\Message
*/
public function newMessage() {
return new RFC6455\Message;
return new Message;
}
/**
* @return RFC6455\Frame
*/
public function newFrame() {
return new RFC6455\Frame;
public function newFrame($payload = null, $final = true, $opcode = 1) {
return new Frame($payload, $final, $opcode);
}
/**
@ -71,7 +162,7 @@ class RFC6455 implements VersionInterface {
* @return string
*/
public function frame($message, $mask = true) {
return RFC6455\Frame::create($message)->data;
return $this->newFrame($message)->data;
}
/**

View File

@ -10,6 +10,19 @@ class Frame implements FrameInterface {
const OP_PING = 9;
const OP_PONG = 10;
const CLOSE_NORMAL = 1000;
const CLOSE_GOING_AWAY = 1001;
const CLOSE_PROTOCOL = 1002;
const CLOSE_BAD_DATA = 1003;
const CLOSE_NO_STATUS = 1005;
const CLOSE_ABNORMAL = 1006;
const CLOSE_BAD_PAYLOAD = 1007;
const CLOSE_POLICY = 1008;
const CLOSE_TOO_BIG = 1009;
const CLOSE_MAND_EXT = 1010;
const CLOSE_SRV_ERR = 1011;
const CLOSE_TLS = 1015;
const MASK_LENGTH = 4;
/**
@ -22,7 +35,7 @@ class Frame implements FrameInterface {
* Number of bytes received from the frame
* @var int
*/
public $_bytes_rec = 0;
public $bytesRecvd = 0;
/**
* Number of bytes in the payload (as per framing protocol)
@ -30,20 +43,9 @@ class Frame implements FrameInterface {
*/
protected $_pay_len_def = -1;
/**
* @param string A valid UTF-8 string to send over the wire
* @param bool Is the final frame in a message
* @param int The opcode of the frame, see constants
* @param bool Mask the payload
* @return Frame
* @throws InvalidArgumentException If the payload is not a valid UTF-8 string
* @throws LengthException If the payload is too big
*/
public static function create($payload, $final = true, $opcode = 1) {
$frame = new static();
if (!mb_check_encoding($payload, 'UTF-8')) {
throw new \InvalidArgumentException("Payload is not a valid UTF-8 string");
public function __construct($payload = null, $final = true, $opcode = 1) {
if (null === $payload) {
return;
}
$raw = (int)(boolean)$final . sprintf('%07b', (int)$opcode);
@ -57,9 +59,20 @@ class Frame implements FrameInterface {
$raw .= sprintf('%08b', 127) . sprintf('%064b', $plLen);
}
$frame->addBuffer(static::encode($raw) . $payload);
$this->addBuffer(static::encode($raw) . $payload);
}
return $frame;
/**
* @param string A valid UTF-8 string to send over the wire
* @param bool Is the final frame in a message
* @param int The opcode of the frame, see constants
* @param bool Mask the payload
* @return Frame
* @throws InvalidArgumentException If the payload is not a valid UTF-8 string
* @throws LengthException If the payload is too big
*/
public static function create($payload, $final = true, $opcode = 1) {
return new static($payload, $final, $opcode);
}
/**
@ -93,7 +106,7 @@ class Frame implements FrameInterface {
return false;
}
return $this->_bytes_rec >= $payload_length + $payload_start;
return $this->bytesRecvd >= $payload_length + $payload_start;
}
/**
@ -103,14 +116,14 @@ class Frame implements FrameInterface {
$buf = (string)$buf;
$this->data .= $buf;
$this->_bytes_rec += strlen($buf);
$this->bytesRecvd += strlen($buf);
}
/**
* {@inheritdoc}
*/
public function isFinal() {
if ($this->_bytes_rec < 1) {
if ($this->bytesRecvd < 1) {
throw new \UnderflowException('Not enough bytes received to determine if this is the final frame in message');
}
@ -123,8 +136,8 @@ class Frame implements FrameInterface {
* {@inheritdoc}
*/
public function isMasked() {
if ($this->_bytes_rec < 2) {
throw new \UnderflowException("Not enough bytes received ({$this->_bytes_rec}) to determine if mask is set");
if ($this->bytesRecvd < 2) {
throw new \UnderflowException("Not enough bytes received ({$this->bytesRecvd}) to determine if mask is set");
}
return (boolean)bindec(substr(sprintf('%08b', ord(substr($this->data, 1, 1))), 0, 1));
@ -140,7 +153,7 @@ class Frame implements FrameInterface {
$start = 1 + $this->getNumPayloadBytes();
if ($this->_bytes_rec < $start + static::MASK_LENGTH) {
if ($this->bytesRecvd < $start + static::MASK_LENGTH) {
throw new \UnderflowException('Not enough data buffered to calculate the masking key');
}
@ -186,7 +199,7 @@ class Frame implements FrameInterface {
$this->data = substr_replace($this->data, static::encode(substr_replace($byte, '1', 0, 1)), 1, 1);
$this->data = substr_replace($this->data, $maskingKey, $this->getNumPayloadBytes() + 1, 0);
$this->_bytes_rec += static::MASK_LENGTH;
$this->bytesRecvd += static::MASK_LENGTH;
$this->data = substr_replace($this->data, $this->applyMask($maskingKey), $this->getPayloadStartingByte(), $this->getPayloadLength());
return $this;
@ -209,7 +222,7 @@ class Frame implements FrameInterface {
$this->data = substr_replace($this->data, static::encode(substr_replace($byte, '0', 0, 1)), 1, 1);
$this->data = substr_replace($this->data, '', $this->getNumPayloadBytes() + 1, static::MASK_LENGTH);
$this->_bytes_rec -= static::MASK_LENGTH;
$this->bytesRecvd -= static::MASK_LENGTH;
$this->data = substr_replace($this->data, $this->applyMask($maskingKey), $this->getPayloadStartingByte(), $this->getPayloadLength());
return $this;
@ -236,7 +249,7 @@ class Frame implements FrameInterface {
* {@inheritdoc}
*/
public function getOpcode() {
if ($this->_bytes_rec < 1) {
if ($this->bytesRecvd < 1) {
throw new \UnderflowException('Not enough bytes received to determine opcode');
}
@ -249,7 +262,7 @@ class Frame implements FrameInterface {
* @throws UnderflowException If the buffer doesn't have enough data to determine this
*/
protected function getFirstPayloadVal() {
if ($this->_bytes_rec < 2) {
if ($this->bytesRecvd < 2) {
throw new \UnderflowException('Not enough bytes received');
}
@ -261,7 +274,7 @@ class Frame implements FrameInterface {
* @throws UnderflowException
*/
protected function getNumPayloadBits() {
if ($this->_bytes_rec < 2) {
if ($this->bytesRecvd < 2) {
throw new \UnderflowException('Not enough bytes received');
}
@ -315,7 +328,7 @@ class Frame implements FrameInterface {
}
$byte_length = $this->getNumPayloadBytes();
if ($this->_bytes_rec < 1 + $byte_length) {
if ($this->bytesRecvd < 1 + $byte_length) {
throw new \UnderflowException('Not enough data buffered to determine payload length');
}
@ -365,7 +378,7 @@ class Frame implements FrameInterface {
$endPoint = $this->getPayloadLength();
$endPoint += $this->getPayloadStartingByte();
if ($this->_bytes_rec > $endPoint) {
if ($this->bytesRecvd > $endPoint) {
$overflow = substr($this->data, $endPoint);
$this->data = substr($this->data, 0, $endPoint);

View File

@ -1,11 +1,12 @@
<?php
namespace Ratchet\WebSocket\Version;
use Ratchet\MessageInterface;
use Guzzle\Http\Message\RequestInterface;
/**
* A standard interface for interacting with the various version of the WebSocket protocol
*/
interface VersionInterface {
interface VersionInterface extends MessageInterface {
/**
* Given an HTTP header, determine if this version should handle the protocol
* @param Guzzle\Http\Message\RequestInterface
@ -45,5 +46,5 @@ interface VersionInterface {
* @return string
* @todo Change to use other classes, this will be removed eventually
*/
function frame($message, $mask = true);
//function frame($message, $mask = true);
}

View File

@ -3,17 +3,15 @@ namespace Ratchet\WebSocket;
use Ratchet\ConnectionInterface;
use Ratchet\AbstractConnectionDecorator;
use Ratchet\WebSocket\Version\VersionInterface;
use Ratchet\WebSocket\Version\FrameInterface;
use Ratchet\WebSocket\Version\RFC6455\Frame;
/**
* {@inheritdoc}
* @property stdClass $WebSocket
*/
class WsConnection extends AbstractConnectionDecorator {
/**
* @var Ratchet\WebSocket\Version\VersionInterface
*/
protected $version = null;
public function __construct(ConnectionInterface $conn) {
parent::__construct($conn);
@ -21,7 +19,9 @@ class WsConnection extends AbstractConnectionDecorator {
}
public function send($data) {
if ($this->hasVersion()) {
if ($data instanceof FrameInterface) {
$data = $data->data;
} elseif (isset($this->WebSocket->version)) {
// need frame caching
$data = $this->WebSocket->version->frame($data, false);
}
@ -29,7 +29,12 @@ class WsConnection extends AbstractConnectionDecorator {
$this->getConnection()->send($data);
}
public function close() {
/**
* {@inheritdoc}
* @todo If code is 1000 send close frame - false is close w/o frame...?
*/
public function close($code = 1000) {
$this->send(Frame::create($code, true, Frame::OP_CLOSE));
// send close frame with code 1000
// ???
@ -38,23 +43,4 @@ class WsConnection extends AbstractConnectionDecorator {
$this->getConnection()->close(); // temporary
}
/**
* @return boolean
* @internal
*/
public function hasVersion() {
return (null === $this->version);
}
/**
* Set the WebSocket protocol version to communicate with
* @param Ratchet\WebSocket\Version\VersionInterface
* @internal
*/
public function setVersion(VersionInterface $version) {
$this->WebSocket->version = $version;
return $this;
}
}

View File

@ -2,6 +2,7 @@
namespace Ratchet\WebSocket;
use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;
use Ratchet\WebSocket\Version;
use Guzzle\Http\Message\RequestInterface;
use Ratchet\WebSocket\Guzzle\Http\Message\RequestFactory;
@ -32,11 +33,6 @@ class WsServer implements MessageComponentInterface {
*/
protected $connections;
/**
* @var MessageParser
*/
protected $messager;
/**
* For now, array_push accepted subprotocols to this array
* @deprecated
@ -54,10 +50,15 @@ class WsServer implements MessageComponentInterface {
* @param Ratchet\MessageComponentInterface Your application to run with WebSockets
*/
public function __construct(MessageComponentInterface $component) {
mb_internal_encoding('UTF-8');
//mb_internal_encoding('UTF-8');
$this->handshaker = new HandshakeNegotiator;
$this->messager = new MessageParser;
$this->handshaker = new HandshakeNegotiator();
$this->handshaker
->enableVersion(new Version\RFC6455($component))
->enableVersion(new Version\HyBi10($component))
//->enableVersion(new Version\Hixie76)
;
$this->_decorating = $component;
$this->connections = new \SplObjectStorage;
@ -83,7 +84,7 @@ class WsServer implements MessageComponentInterface {
$conn = $this->connections[$from];
if (true !== $conn->WebSocket->established) {
if (null === ($response = $this->handshaker->onData($conn, $msg))) {
if (null === ($response = $this->handshaker->onMessage($conn, $msg))) {
return;
}
@ -103,9 +104,7 @@ class WsServer implements MessageComponentInterface {
return $this->_decorating->onOpen($conn);
}
if (null !== ($parsed = $this->messager->onData($conn, $msg))) {
$this->_decorating->onMessage($conn, $parsed);
}
$conn->WebSocket->version->onMessage($conn, $msg);
}
/**

View File

@ -22,10 +22,10 @@ class HandshakeNegotiatorTest extends \PHPUnit_Framework_TestCase {
return array(
array(false, "GET / HTTP/1.1\r\nHost: socketo.me\r\n")
, array(true, "GET / HTTP/1.1\r\nHost: socketo.me\r\n\r\n")
, array(false, "GET / HTTP/1.1\r\nHost: socketo.me\r\n\r\n1")
, array(false, "GET / HTTP/1.1\r\nHost: socketo.me\r\n\r\nHixie✖")
, array(true, "GET / HTTP/1.1\r\nHost: socketo.me\r\n\r\n1")
, array(true, "GET / HTTP/1.1\r\nHost: socketo.me\r\n\r\nHixie✖")
, array(true, "GET / HTTP/1.1\r\nHost: socketo.me\r\n\r\nHixie✖\r\n\r\n")
, array(false, "GET / HTTP/1.1\r\nHost: socketo.me\r\n\r\nHixie\r\n")
, array(true, "GET / HTTP/1.1\r\nHost: socketo.me\r\n\r\nHixie\r\n")
);
}
@ -83,6 +83,10 @@ class HandshakeNegotiatorTest extends \PHPUnit_Framework_TestCase {
}
public function testGetSupportedVersionAfterRemoval() {
$this->parser->enableVersion(new RFC6455);
$this->parser->enableVersion(new HyBi10);
$this->parser->enableVersion(new Hixie76);
$this->parser->disableVersion(0);
$values = explode(',', $this->parser->getSupportedVersionString());
@ -97,7 +101,7 @@ class HandshakeNegotiatorTest extends \PHPUnit_Framework_TestCase {
$this->parser->maxSize = 20;
$this->assertNull($this->parser->onData($conn, "GET / HTTP/1.1\r\n"));
$this->assertGreaterThan(400, $this->parser->onData($conn, "Header-Is: Too Big")->getStatusCode());
$this->assertNull($this->parser->onMessage($conn, "GET / HTTP/1.1\r\n"));
$this->assertGreaterThan(400, $this->parser->onMessage($conn, "Header-Is: Too Big")->getStatusCode());
}
}