New RFC interfaces, heartbeat init

Cherrypicked from 6b6a5f0d6d9a10547291a0d8c027584448481daf

:-/
This commit is contained in:
Chris Boden 2016-02-10 18:52:42 -05:00
parent 46487e756c
commit f9b052d85e
4 changed files with 85 additions and 124 deletions

View File

@ -29,7 +29,7 @@
"php": ">=5.3.9" "php": ">=5.3.9"
, "react/socket": "^0.3 || ^0.4" , "react/socket": "^0.3 || ^0.4"
, "guzzlehttp/psr7": "^1.0" , "guzzlehttp/psr7": "^1.0"
, "ratchet/rfc6455": "dev-psr7" , "ratchet/rfc6455": "dev-psr7-multi-streamer"
, "symfony/http-foundation": "^2.2" , "symfony/http-foundation": "^2.2"
, "symfony/routing": "^2.2" , "symfony/routing": "^2.2"
} }

View File

@ -1,84 +0,0 @@
<?php
namespace Ratchet\WebSocket;
use Ratchet\ConnectionInterface;
use Ratchet\MessageComponentInterface;
use Ratchet\RFC6455\Messaging\Protocol\Frame;
use Ratchet\RFC6455\Messaging\Protocol\FrameInterface;
use Ratchet\RFC6455\Messaging\Protocol\MessageInterface;
use Ratchet\RFC6455\Messaging\Streaming\ContextInterface;
class ConnectionContext implements ContextInterface {
private $message;
private $frame;
private $conn;
private $component;
public function __construct(ConnectionInterface $conn, MessageComponentInterface $component) {
$this->conn = $conn;
$this->component = $component;
}
public function detach() {
$conn = $this->conn;
$this->frame = null;
$this->message = null;
$this->component = null;
$this->conn = null;
return $conn;
}
public function onError(\Exception $e) {
$this->component->onError($this->conn, $e);
}
public function setFrame(FrameInterface $frame = null) {
$this->frame = $frame;
return $frame;
}
/**
* @return \Ratchet\RFC6455\Messaging\Protocol\FrameInterface
*/
public function getFrame() {
return $this->frame;
}
public function setMessage(MessageInterface $message = null) {
$this->message = $message;
return $message;
}
/**
* @return \Ratchet\RFC6455\Messaging\Protocol\MessageInterface
*/
public function getMessage() {
return $this->message;
}
public function onMessage(MessageInterface $msg) {
$this->component->onMessage($this->conn, $msg->getPayload());
}
public function onPing(FrameInterface $frame) {
$pong = new Frame($frame->getPayload(), true, Frame::OP_PONG);
$this->conn->send($pong);
}
public function onPong(FrameInterface $frame) {
// TODO: Implement onPong() method.
}
/**
* @param $code int
*/
public function onClose($code) {
$this->conn->close($code);
}
}

View File

@ -1,8 +1,8 @@
<?php <?php
namespace Ratchet\WebSocket; namespace Ratchet\WebSocket;
use Ratchet\AbstractConnectionDecorator; use Ratchet\AbstractConnectionDecorator;
use Ratchet\RFC6455\Messaging\Protocol\DataInterface; use Ratchet\RFC6455\Messaging\DataInterface;
use Ratchet\RFC6455\Messaging\Protocol\Frame; use Ratchet\RFC6455\Messaging\Frame;
/** /**
* {@inheritdoc} * {@inheritdoc}
@ -25,7 +25,7 @@ class WsConnection extends AbstractConnectionDecorator {
} }
/** /**
* {@inheritdoc} * @param int|\Ratchet\RFC6455\Messaging\Protocol\DataInterface
*/ */
public function close($code = 1000) { public function close($code = 1000) {
if ($this->WebSocket->closing) { if ($this->WebSocket->closing) {

View File

@ -6,6 +6,11 @@ use Ratchet\Http\HttpServerInterface;
use Ratchet\Http\CloseResponseTrait; use Ratchet\Http\CloseResponseTrait;
use Psr\Http\Message\RequestInterface; use Psr\Http\Message\RequestInterface;
use GuzzleHttp\Psr7 as gPsr; use GuzzleHttp\Psr7 as gPsr;
use Ratchet\RFC6455\Messaging\FrameInterface;
use Ratchet\RFC6455\Messaging\MessageInterface;
use Ratchet\RFC6455\Messaging\MessageBuffer;
use React\EventLoop\LoopInterface;
use Ratchet\RFC6455\Messaging\Frame;
/** /**
* The adapter to handle WebSocket requests/responses * The adapter to handle WebSocket requests/responses
@ -20,7 +25,7 @@ class WsServer implements HttpServerInterface {
* Decorated component * Decorated component
* @var \Ratchet\MessageComponentInterface * @var \Ratchet\MessageComponentInterface
*/ */
public $component; private $delegate;
/** /**
* @var \SplObjectStorage * @var \SplObjectStorage
@ -28,34 +33,34 @@ class WsServer implements HttpServerInterface {
protected $connections; protected $connections;
/** /**
* Holder of accepted protocols, implement through WampServerInterface * @var \Ratchet\RFC6455\Messaging\CloseFrameChecker
*/ */
protected $acceptedSubProtocols = []; private $closeFrameChecker;
/** /**
* Flag if we have checked the decorated component for sub-protocols * @var \Ratchet\RFC6455\Handshake\Negotiator
* @var boolean
*/ */
private $isSpGenerated = false;
private $handshakeNegotiator; private $handshakeNegotiator;
private $messageStreamer;
private $pongReceiver;
/** /**
* @param \Ratchet\MessageComponentInterface $component Your application to run with WebSockets * @param \Ratchet\MessageComponentInterface $component Your application to run with WebSockets
* If you want to enable sub-protocols have your component implement WsServerInterface as well * If you want to enable sub-protocols have your component implement WsServerInterface as well
*/ */
public function __construct(MessageComponentInterface $component) { public function __construct(MessageComponentInterface $component) {
$this->component = $component; $this->delegate = $component;
$this->connections = new \SplObjectStorage; $this->connections = new \SplObjectStorage;
$encodingValidator = new \Ratchet\RFC6455\Encoding\Validator; // $this->encodingValidator = new \Ratchet\RFC6455\Encoding\Validator;
$this->handshakeNegotiator = new \Ratchet\RFC6455\Handshake\Negotiator($encodingValidator); $this->closeFrameChecker = new \Ratchet\RFC6455\Messaging\CloseFrameChecker;
$this->messageStreamer = new \Ratchet\RFC6455\Messaging\Streaming\MessageStreamer($encodingValidator); $this->handshakeNegotiator = new \Ratchet\RFC6455\Handshake\ServerNegotiator;
if ($component instanceof WsServerInterface) { if ($component instanceof WsServerInterface) {
$this->handshakeNegotiator->setSupportedSubProtocols($component->getSubProtocols()); $this->handshakeNegotiator->setSupportedSubProtocols($component->getSubProtocols());
} }
$this->pongReceiver = function() {};
} }
/** /**
@ -68,23 +73,33 @@ class WsServer implements HttpServerInterface {
$conn->httpRequest = $request; // This will replace ->WebSocket->request $conn->httpRequest = $request; // This will replace ->WebSocket->request
$conn->WebSocket = new \StdClass; $conn->WebSocket = new \StdClass;
$conn->WebSocket->closing = false; $conn->WebSocket->closing = false;
$conn->WebSocket->request = $request; // deprecated $conn->WebSocket->request = $request; // deprecated
$response = $this->handshakeNegotiator->handshake($request)->withHeader('X-Powered-By', \Ratchet\VERSION); $response = $this->handshakeNegotiator->handshake($request)->withHeader('X-Powered-By', \Ratchet\VERSION);
$conn->send(gPsr\str($response)); $conn->send(gPsr\str($response));
if (101 != $response->getStatusCode()) { if (101 !== $response->getStatusCode()) {
return $conn->close(); return $conn->close();
} }
$wsConn = new WsConnection($conn); $wsConn = new WsConnection($conn);
$context = new ConnectionContext($wsConn, $this->component);
$this->connections->attach($conn, $context);
return $this->component->onOpen($wsConn); $streamer = new MessageBuffer(
$this->closeFrameChecker,
function(MessageInterface $msg) use ($wsConn) {
$this->delegate->onMessage($wsConn, $msg);
},
function(FrameInterface $frame) use ($wsConn) {
$this->onControlFrame($frame, $wsConn);
}
);
$this->connections->attach($conn, [$wsConn, $streamer]);
return $this->delegate->onOpen($wsConn);
} }
/** /**
@ -96,8 +111,7 @@ class WsServer implements HttpServerInterface {
} }
$context = $this->connections[$from]; $context = $this->connections[$from];
$context[1]->onData($msg);
$this->messageStreamer->onData($msg, $context);
} }
/** /**
@ -105,12 +119,10 @@ class WsServer implements HttpServerInterface {
*/ */
public function onClose(ConnectionInterface $conn) { public function onClose(ConnectionInterface $conn) {
if ($this->connections->contains($conn)) { if ($this->connections->contains($conn)) {
$decor = $this->connections[$conn]; $context = $this->connections[$conn];
$this->connections->detach($conn); $this->connections->detach($conn);
$conn = $decor->detach(); $this->delegate->onClose($context[0]);
$this->component->onClose($conn);
} }
} }
@ -120,24 +132,57 @@ class WsServer implements HttpServerInterface {
public function onError(ConnectionInterface $conn, \Exception $e) { public function onError(ConnectionInterface $conn, \Exception $e) {
if ($this->connections->contains($conn)) { if ($this->connections->contains($conn)) {
$context = $this->connections[$conn]; $context = $this->connections[$conn];
$context->onError($e); $this->delegate->onError($context[0], $e);
} else { } else {
$conn->close(); $conn->close();
} }
} }
/** public function onControlFrame(FrameInterface $frame, WsConnection $conn) {
* Toggle weather to check encoding of incoming messages switch ($frame->getOpCode()) {
* @param bool case Frame::OP_CLOSE:
* @return WsServer $conn->close($frame);
*/ break;
public function setEncodingChecks($opt) { case Frame::OP_PING:
// $this->validator->on = (boolean)$opt; $conn->send(new Frame($frame->getPayload(), true, Frame::OP_PONG));
break;
return $this; case Frame::OP_PONG:
$pongReceiver = $this->pongReceiver;
$pongReceiver($frame, $conn);
break;
}
} }
public function setStrictSubProtocolCheck($enable) { public function setStrictSubProtocolCheck($enable) {
$this->handshakeNegotiator->setStrictSubProtocolCheck($enable); $this->handshakeNegotiator->setStrictSubProtocolCheck($enable);
} }
public function enableKeepAlive(LoopInterface $loop, $interval = 30) {
$lastPing = null;
$pingedConnections = new \SplObjectStorage;
$splClearer = new \SplObjectStorage;
$this->pongReceiver = function(FrameInterface $frame, $wsConn) use ($pingedConnections, &$lastPing) {
if ($frame->getPayload() === $lastPing->getPayload()) {
$pingedConnections->detach($wsConn);
}
};
$loop->addPeriodicTimer((int)$interval, function() use ($pingedConnections, &$lastPing, $splClearer) {
foreach ($pingedConnections as $wsConn) {
$wsConn->close();
}
$pingedConnections->removeAllExcept($splClearer);
$lastPing = new Frame(uniqid(), true, Frame::OP_PING);
foreach ($this->connections as $key => $conn) {
$context = $this->connections[$conn];
$wsConn = $context[0];
$wsConn->send($lastPing);
$pingedConnections->attach($wsConn);
}
});
}
} }