From f9b052d85eeaeaf4979f693c04193abbad83a65a Mon Sep 17 00:00:00 2001 From: Chris Boden Date: Wed, 10 Feb 2016 18:52:42 -0500 Subject: [PATCH] New RFC interfaces, heartbeat init Cherrypicked from 6b6a5f0d6d9a10547291a0d8c027584448481daf :-/ --- composer.json | 2 +- src/Ratchet/WebSocket/ConnectionContext.php | 84 -------------- src/Ratchet/WebSocket/WsConnection.php | 6 +- src/Ratchet/WebSocket/WsServer.php | 117 ++++++++++++++------ 4 files changed, 85 insertions(+), 124 deletions(-) delete mode 100644 src/Ratchet/WebSocket/ConnectionContext.php diff --git a/composer.json b/composer.json index 72a8283..a34968e 100644 --- a/composer.json +++ b/composer.json @@ -29,7 +29,7 @@ "php": ">=5.3.9" , "react/socket": "^0.3 || ^0.4" , "guzzlehttp/psr7": "^1.0" - , "ratchet/rfc6455": "dev-psr7" + , "ratchet/rfc6455": "dev-psr7-multi-streamer" , "symfony/http-foundation": "^2.2" , "symfony/routing": "^2.2" } diff --git a/src/Ratchet/WebSocket/ConnectionContext.php b/src/Ratchet/WebSocket/ConnectionContext.php deleted file mode 100644 index 20fb58b..0000000 --- a/src/Ratchet/WebSocket/ConnectionContext.php +++ /dev/null @@ -1,84 +0,0 @@ -conn = $conn; - $this->component = $component; - } - - public function detach() { - $conn = $this->conn; - - $this->frame = null; - $this->message = null; - - $this->component = null; - $this->conn = null; - - return $conn; - } - - public function onError(\Exception $e) { - $this->component->onError($this->conn, $e); - } - - public function setFrame(FrameInterface $frame = null) { - $this->frame = $frame; - - return $frame; - } - - /** - * @return \Ratchet\RFC6455\Messaging\Protocol\FrameInterface - */ - public function getFrame() { - return $this->frame; - } - - public function setMessage(MessageInterface $message = null) { - $this->message = $message; - - return $message; - } - - /** - * @return \Ratchet\RFC6455\Messaging\Protocol\MessageInterface - */ - public function getMessage() { - return $this->message; - } - - public function onMessage(MessageInterface $msg) { - $this->component->onMessage($this->conn, $msg->getPayload()); - } - - public function onPing(FrameInterface $frame) { - $pong = new Frame($frame->getPayload(), true, Frame::OP_PONG); - - $this->conn->send($pong); - } - - public function onPong(FrameInterface $frame) { - // TODO: Implement onPong() method. - } - - /** - * @param $code int - */ - public function onClose($code) { - $this->conn->close($code); - } -} \ No newline at end of file diff --git a/src/Ratchet/WebSocket/WsConnection.php b/src/Ratchet/WebSocket/WsConnection.php index a4adce2..175f152 100644 --- a/src/Ratchet/WebSocket/WsConnection.php +++ b/src/Ratchet/WebSocket/WsConnection.php @@ -1,8 +1,8 @@ WebSocket->closing) { diff --git a/src/Ratchet/WebSocket/WsServer.php b/src/Ratchet/WebSocket/WsServer.php index 8f71538..5228105 100644 --- a/src/Ratchet/WebSocket/WsServer.php +++ b/src/Ratchet/WebSocket/WsServer.php @@ -6,6 +6,11 @@ use Ratchet\Http\HttpServerInterface; use Ratchet\Http\CloseResponseTrait; use Psr\Http\Message\RequestInterface; use GuzzleHttp\Psr7 as gPsr; +use Ratchet\RFC6455\Messaging\FrameInterface; +use Ratchet\RFC6455\Messaging\MessageInterface; +use Ratchet\RFC6455\Messaging\MessageBuffer; +use React\EventLoop\LoopInterface; +use Ratchet\RFC6455\Messaging\Frame; /** * The adapter to handle WebSocket requests/responses @@ -20,7 +25,7 @@ class WsServer implements HttpServerInterface { * Decorated component * @var \Ratchet\MessageComponentInterface */ - public $component; + private $delegate; /** * @var \SplObjectStorage @@ -28,34 +33,34 @@ class WsServer implements HttpServerInterface { protected $connections; /** - * Holder of accepted protocols, implement through WampServerInterface + * @var \Ratchet\RFC6455\Messaging\CloseFrameChecker */ - protected $acceptedSubProtocols = []; + private $closeFrameChecker; /** - * Flag if we have checked the decorated component for sub-protocols - * @var boolean + * @var \Ratchet\RFC6455\Handshake\Negotiator */ - private $isSpGenerated = false; - private $handshakeNegotiator; - private $messageStreamer; + + private $pongReceiver; /** * @param \Ratchet\MessageComponentInterface $component Your application to run with WebSockets * If you want to enable sub-protocols have your component implement WsServerInterface as well */ public function __construct(MessageComponentInterface $component) { - $this->component = $component; + $this->delegate = $component; $this->connections = new \SplObjectStorage; - $encodingValidator = new \Ratchet\RFC6455\Encoding\Validator; - $this->handshakeNegotiator = new \Ratchet\RFC6455\Handshake\Negotiator($encodingValidator); - $this->messageStreamer = new \Ratchet\RFC6455\Messaging\Streaming\MessageStreamer($encodingValidator); +// $this->encodingValidator = new \Ratchet\RFC6455\Encoding\Validator; + $this->closeFrameChecker = new \Ratchet\RFC6455\Messaging\CloseFrameChecker; + $this->handshakeNegotiator = new \Ratchet\RFC6455\Handshake\ServerNegotiator; if ($component instanceof WsServerInterface) { $this->handshakeNegotiator->setSupportedSubProtocols($component->getSubProtocols()); } + + $this->pongReceiver = function() {}; } /** @@ -68,23 +73,33 @@ class WsServer implements HttpServerInterface { $conn->httpRequest = $request; // This will replace ->WebSocket->request - $conn->WebSocket = new \StdClass; - $conn->WebSocket->closing = false; - $conn->WebSocket->request = $request; // deprecated + $conn->WebSocket = new \StdClass; + $conn->WebSocket->closing = false; + $conn->WebSocket->request = $request; // deprecated $response = $this->handshakeNegotiator->handshake($request)->withHeader('X-Powered-By', \Ratchet\VERSION); $conn->send(gPsr\str($response)); - if (101 != $response->getStatusCode()) { + if (101 !== $response->getStatusCode()) { return $conn->close(); } - $wsConn = new WsConnection($conn); - $context = new ConnectionContext($wsConn, $this->component); - $this->connections->attach($conn, $context); + $wsConn = new WsConnection($conn); - return $this->component->onOpen($wsConn); + $streamer = new MessageBuffer( + $this->closeFrameChecker, + function(MessageInterface $msg) use ($wsConn) { + $this->delegate->onMessage($wsConn, $msg); + }, + function(FrameInterface $frame) use ($wsConn) { + $this->onControlFrame($frame, $wsConn); + } + ); + + $this->connections->attach($conn, [$wsConn, $streamer]); + + return $this->delegate->onOpen($wsConn); } /** @@ -96,8 +111,7 @@ class WsServer implements HttpServerInterface { } $context = $this->connections[$from]; - - $this->messageStreamer->onData($msg, $context); + $context[1]->onData($msg); } /** @@ -105,12 +119,10 @@ class WsServer implements HttpServerInterface { */ public function onClose(ConnectionInterface $conn) { if ($this->connections->contains($conn)) { - $decor = $this->connections[$conn]; + $context = $this->connections[$conn]; $this->connections->detach($conn); - $conn = $decor->detach(); - - $this->component->onClose($conn); + $this->delegate->onClose($context[0]); } } @@ -120,24 +132,57 @@ class WsServer implements HttpServerInterface { public function onError(ConnectionInterface $conn, \Exception $e) { if ($this->connections->contains($conn)) { $context = $this->connections[$conn]; - $context->onError($e); + $this->delegate->onError($context[0], $e); } else { $conn->close(); } } - /** - * Toggle weather to check encoding of incoming messages - * @param bool - * @return WsServer - */ - public function setEncodingChecks($opt) { -// $this->validator->on = (boolean)$opt; - - return $this; + public function onControlFrame(FrameInterface $frame, WsConnection $conn) { + switch ($frame->getOpCode()) { + case Frame::OP_CLOSE: + $conn->close($frame); + break; + case Frame::OP_PING: + $conn->send(new Frame($frame->getPayload(), true, Frame::OP_PONG)); + break; + case Frame::OP_PONG: + $pongReceiver = $this->pongReceiver; + $pongReceiver($frame, $conn); + break; + } } public function setStrictSubProtocolCheck($enable) { $this->handshakeNegotiator->setStrictSubProtocolCheck($enable); } + + public function enableKeepAlive(LoopInterface $loop, $interval = 30) { + $lastPing = null; + $pingedConnections = new \SplObjectStorage; + $splClearer = new \SplObjectStorage; + + $this->pongReceiver = function(FrameInterface $frame, $wsConn) use ($pingedConnections, &$lastPing) { + if ($frame->getPayload() === $lastPing->getPayload()) { + $pingedConnections->detach($wsConn); + } + }; + + $loop->addPeriodicTimer((int)$interval, function() use ($pingedConnections, &$lastPing, $splClearer) { + foreach ($pingedConnections as $wsConn) { + $wsConn->close(); + } + $pingedConnections->removeAllExcept($splClearer); + + $lastPing = new Frame(uniqid(), true, Frame::OP_PING); + + foreach ($this->connections as $key => $conn) { + $context = $this->connections[$conn]; + $wsConn = $context[0]; + + $wsConn->send($lastPing); + $pingedConnections->attach($wsConn); + } + }); + } } \ No newline at end of file