* @param int $keepAliveInterval Seconds between ping calls

This commit is contained in:
Illia Kovalov 2017-10-04 12:03:25 +02:00
parent 467c4552a5
commit e2b7a8f95a
2 changed files with 23 additions and 9 deletions

View File

@ -97,24 +97,21 @@ class App {
* @param ComponentInterface $controller Your application to server for the route. If not specified, assumed to be for a WebSocket * @param ComponentInterface $controller Your application to server for the route. If not specified, assumed to be for a WebSocket
* @param array $allowedOrigins An array of hosts allowed to connect (same host by default), ['*'] for any * @param array $allowedOrigins An array of hosts allowed to connect (same host by default), ['*'] for any
* @param string $httpHost Override the $httpHost variable provided in the __construct * @param string $httpHost Override the $httpHost variable provided in the __construct
* @param int $keepAliveInterval Seconds between ping calls. Works for WsServer only
* @return ComponentInterface|WsServer * @return ComponentInterface|WsServer
*/ */
public function route($path, ComponentInterface $controller, array $allowedOrigins = array(), $httpHost = null, $keepAliveInterval = 30) { public function route($path, ComponentInterface $controller, array $allowedOrigins = array(), $httpHost = null) {
if ($controller instanceof HttpServerInterface || $controller instanceof WsServer) { if ($controller instanceof HttpServerInterface || $controller instanceof WsServer) {
$decorated = $controller; $decorated = $controller;
} elseif ($controller instanceof WampServerInterface) { } elseif ($controller instanceof WampServerInterface) {
$decorated = new WsServer(new WampServer($controller)); $decorated = new WsServer(new WampServer($controller));
$decorated->enableKeepAlive($this->_server->loop);
} elseif ($controller instanceof MessageComponentInterface) { } elseif ($controller instanceof MessageComponentInterface) {
$decorated = new WsServer($controller); $decorated = new WsServer($controller);
$decorated->enableKeepAlive($this->_server->loop);
} else { } else {
$decorated = $controller; $decorated = $controller;
} }
if ($decorated instanceof WsServer) {
$decorated->enableKeepAlive($this->_server->loop, $keepAliveInterval);
}
if ($httpHost === null) { if ($httpHost === null) {
$httpHost = $this->httpHost; $httpHost = $this->httpHost;
} }

View File

@ -61,11 +61,22 @@ class WsServer implements HttpServerInterface {
*/ */
private $msgCb; private $msgCb;
/**
* @var bool
*/
private $keepAliveEnabled;
/**
* @var int
*/
private $keepAliveTimeout = 30;
/** /**
* @param \Ratchet\WebSocket\MessageComponentInterface|\Ratchet\MessageComponentInterface $component Your application to run with WebSockets * @param \Ratchet\WebSocket\MessageComponentInterface|\Ratchet\MessageComponentInterface $component Your application to run with WebSockets
* @param int $keepAliveTimeout
* @note If you want to enable sub-protocols have your component implement WsServerInterface as well * @note If you want to enable sub-protocols have your component implement WsServerInterface as well
*/ */
public function __construct(ComponentInterface $component) { public function __construct(ComponentInterface $component, $keepAliveTimeout = 30) {
if ($component instanceof MessageComponentInterface) { if ($component instanceof MessageComponentInterface) {
$this->msgCb = function(ConnectionInterface $conn, MessageInterface $msg) { $this->msgCb = function(ConnectionInterface $conn, MessageInterface $msg) {
$this->delegate->onMessage($conn, $msg); $this->delegate->onMessage($conn, $msg);
@ -88,6 +99,7 @@ class WsServer implements HttpServerInterface {
$this->closeFrameChecker = new CloseFrameChecker; $this->closeFrameChecker = new CloseFrameChecker;
$this->handshakeNegotiator = new ServerNegotiator(new RequestVerifier); $this->handshakeNegotiator = new ServerNegotiator(new RequestVerifier);
$this->handshakeNegotiator->setStrictSubProtocolCheck(true); $this->handshakeNegotiator->setStrictSubProtocolCheck(true);
$this->keepAliveEnabled = false;
if ($component instanceof WsServerInterface) { if ($component instanceof WsServerInterface) {
$this->handshakeNegotiator->setSupportedSubProtocols($component->getSubProtocols()); $this->handshakeNegotiator->setSupportedSubProtocols($component->getSubProtocols());
@ -99,6 +111,7 @@ class WsServer implements HttpServerInterface {
$this->ueFlowFactory = function() use ($reusableUnderflowException) { $this->ueFlowFactory = function() use ($reusableUnderflowException) {
return $reusableUnderflowException; return $reusableUnderflowException;
}; };
$this->keepAliveTimeout = $keepAliveTimeout;
} }
/** /**
@ -195,7 +208,11 @@ class WsServer implements HttpServerInterface {
$this->handshakeNegotiator->setStrictSubProtocolCheck($enable); $this->handshakeNegotiator->setStrictSubProtocolCheck($enable);
} }
public function enableKeepAlive(LoopInterface $loop, $interval = 30) { public function enableKeepAlive(LoopInterface $loop) {
if ($this->keepAliveEnabled) {
return;
}
$this->keepAliveEnabled = true;
$lastPing = new Frame(uniqid(), true, Frame::OP_PING); $lastPing = new Frame(uniqid(), true, Frame::OP_PING);
$pingedConnections = new \SplObjectStorage; $pingedConnections = new \SplObjectStorage;
$splClearer = new \SplObjectStorage; $splClearer = new \SplObjectStorage;
@ -206,7 +223,7 @@ class WsServer implements HttpServerInterface {
} }
}; };
$loop->addPeriodicTimer((int)$interval, function() use ($pingedConnections, &$lastPing, $splClearer) { $loop->addPeriodicTimer((int)$this->keepAliveTimeout, function() use ($pingedConnections, &$lastPing, $splClearer) {
foreach ($pingedConnections as $wsConn) { foreach ($pingedConnections as $wsConn) {
$wsConn->close(); $wsConn->close();
} }