Replace evenement with callback interface
Use strict ContextInterface instead of event emitter Keep message/frame within connection, not parser Expect only 1 of specific WebSocket headers Non-UTF-8 server tests passing :-)
This commit is contained in:
parent
de76869847
commit
791ebaeb24
@ -22,8 +22,7 @@
|
|||||||
},
|
},
|
||||||
"require": {
|
"require": {
|
||||||
"php": ">=5.4.2",
|
"php": ">=5.4.2",
|
||||||
"guzzlehttp/psr7": "^1.0",
|
"guzzlehttp/psr7": "^1.0"
|
||||||
"evenement/evenement": "^2.0"
|
|
||||||
},
|
},
|
||||||
"require-dev": {
|
"require-dev": {
|
||||||
"react/http": "^0.4.1"
|
"react/http": "^0.4.1"
|
||||||
|
@ -3,7 +3,7 @@ namespace Ratchet\RFC6455\Encoding;
|
|||||||
|
|
||||||
class NullValidator implements ValidatorInterface {
|
class NullValidator implements ValidatorInterface {
|
||||||
/**
|
/**
|
||||||
* What value to return when checkEncoding is valled
|
* What value to return when checkEncoding is valid
|
||||||
* @var boolean
|
* @var boolean
|
||||||
*/
|
*/
|
||||||
public $validationResponse = true;
|
public $validationResponse = true;
|
||||||
|
@ -55,7 +55,7 @@ class RequestVerifier {
|
|||||||
* @return bool
|
* @return bool
|
||||||
*/
|
*/
|
||||||
public function verifyRequestURI($val) {
|
public function verifyRequestURI($val) {
|
||||||
if ($val[0] != '/') {
|
if ($val[0] !== '/') {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -81,11 +81,11 @@ class RequestVerifier {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify the Upgrade request to WebSockets.
|
* Verify the Upgrade request to WebSockets.
|
||||||
* @param array $upgradeHeader MUST include "websocket"
|
* @param array $upgradeHeader MUST equal "websocket"
|
||||||
* @return bool
|
* @return bool
|
||||||
*/
|
*/
|
||||||
public function verifyUpgradeRequest(array $upgradeHeader) {
|
public function verifyUpgradeRequest(array $upgradeHeader) {
|
||||||
return (in_array('websocket', array_map('strtolower', $upgradeHeader)));
|
return (1 === count($upgradeHeader) && 'websocket' === strtolower($upgradeHeader[0]));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -94,7 +94,7 @@ class RequestVerifier {
|
|||||||
* @return bool
|
* @return bool
|
||||||
*/
|
*/
|
||||||
public function verifyConnection(array $connectionHeader) {
|
public function verifyConnection(array $connectionHeader) {
|
||||||
return in_array('upgrade', array_map('strtolower', $connectionHeader));
|
return (1 === count($connectionHeader) && 'upgrade' === strtolower(($connectionHeader[0])));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -105,7 +105,7 @@ class RequestVerifier {
|
|||||||
* @todo Check the spec to see what the encoding of the key could be
|
* @todo Check the spec to see what the encoding of the key could be
|
||||||
*/
|
*/
|
||||||
public function verifyKey(array $keyHeader) {
|
public function verifyKey(array $keyHeader) {
|
||||||
return in_array(16, array_map('strlen', array_map('base64_decode', $keyHeader)));
|
return (1 === count($keyHeader) && 16 === strlen(base64_decode($keyHeader[0])));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
29
src/Messaging/Streaming/ContextInterface.php
Normal file
29
src/Messaging/Streaming/ContextInterface.php
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
<?php
|
||||||
|
namespace Ratchet\RFC6455\Messaging\Streaming;
|
||||||
|
use Ratchet\RFC6455\Messaging\Protocol\MessageInterface;
|
||||||
|
use Ratchet\RFC6455\Messaging\Protocol\FrameInterface;
|
||||||
|
|
||||||
|
interface ContextInterface {
|
||||||
|
public function setFrame(FrameInterface $frame = null);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return \Ratchet\RFC6455\Messaging\Protocol\FrameInterface
|
||||||
|
*/
|
||||||
|
public function getFrame();
|
||||||
|
|
||||||
|
public function setMessage(MessageInterface $message = null);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return \Ratchet\RFC6455\Messaging\Protocol\MessageInterface
|
||||||
|
*/
|
||||||
|
public function getMessage();
|
||||||
|
|
||||||
|
public function onMessage(MessageInterface $msg);
|
||||||
|
public function onPing(FrameInterface $frame);
|
||||||
|
public function onPong(FrameInterface $frame);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param $code int
|
||||||
|
*/
|
||||||
|
public function onClose($code);
|
||||||
|
}
|
@ -1,114 +1,52 @@
|
|||||||
<?php
|
<?php
|
||||||
|
|
||||||
namespace Ratchet\RFC6455\Messaging\Streaming;
|
namespace Ratchet\RFC6455\Messaging\Streaming;
|
||||||
|
use Ratchet\RFC6455\Encoding\ValidatorInterface;
|
||||||
use Evenement\EventEmitterInterface;
|
|
||||||
use Evenement\EventEmitterTrait;
|
|
||||||
use Ratchet\RFC6455\Encoding\Validator;
|
|
||||||
use Ratchet\RFC6455\Messaging\Protocol\Frame;
|
use Ratchet\RFC6455\Messaging\Protocol\Frame;
|
||||||
use Ratchet\RFC6455\Messaging\Protocol\Message;
|
use Ratchet\RFC6455\Messaging\Protocol\Message;
|
||||||
use Ratchet\RFC6455\Messaging\Validation\MessageValidator;
|
use Ratchet\RFC6455\Messaging\Validation\MessageValidator;
|
||||||
|
|
||||||
class MessageStreamer implements EventEmitterInterface {
|
class MessageStreamer {
|
||||||
use EventEmitterTrait;
|
|
||||||
|
|
||||||
/** @var Frame */
|
|
||||||
private $currentFrame;
|
|
||||||
|
|
||||||
/** @var Message */
|
|
||||||
private $currentMessage;
|
|
||||||
|
|
||||||
/** @var MessageValidator */
|
/** @var MessageValidator */
|
||||||
private $validator;
|
private $validator;
|
||||||
|
|
||||||
/** @var bool */
|
function __construct(ValidatorInterface $encodingValidator, $expectMask = false) {
|
||||||
private $checkForMask;
|
$this->validator = new MessageValidator($encodingValidator, !$expectMask);
|
||||||
|
|
||||||
function __construct($client = false)
|
|
||||||
{
|
|
||||||
$this->checkForMask = !$client;
|
|
||||||
$this->validator = new MessageValidator(new Validator(), $this->checkForMask);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public function onData($data) {
|
public function onData($data, ContextInterface $context) {
|
||||||
$overflow = '';
|
$overflow = '';
|
||||||
|
|
||||||
if (!isset($this->currentMessage)) {
|
$context->getMessage() || $context->setMessage($this->newMessage());
|
||||||
$this->currentMessage = $this->newMessage();
|
$context->getFrame() || $context->setFrame($this->newFrame());
|
||||||
}
|
|
||||||
|
|
||||||
// There is a frame fragment attached to the connection, add to it
|
$frame = $context->getFrame();
|
||||||
if (!isset($this->currentFrame)) {
|
|
||||||
$this->currentFrame = $this->newFrame();
|
|
||||||
}
|
|
||||||
|
|
||||||
$frame = $this->currentFrame;
|
|
||||||
|
|
||||||
$frame->addBuffer($data);
|
$frame->addBuffer($data);
|
||||||
if ($frame->isCoalesced()) {
|
if ($frame->isCoalesced()) {
|
||||||
$validFrame = $this->validator->validateFrame($frame);
|
$validFrame = $this->validator->validateFrame($frame);
|
||||||
if ($validFrame !== true) {
|
if (true !== $validFrame) {
|
||||||
$this->emit('close', [$validFrame]);
|
$context->onClose($validFrame);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$opcode = $frame->getOpcode();
|
$opcode = $frame->getOpcode();
|
||||||
if ($opcode > 2) {
|
if ($opcode > 2) {
|
||||||
if ($frame->getPayloadLength() > 125) {
|
|
||||||
// payload only allowed to 125 on control frames ab 2.5
|
|
||||||
$this->emit('close', [$frame::CLOSE_PROTOCOL]);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
switch ($opcode) {
|
switch ($opcode) {
|
||||||
case $frame::OP_CLOSE:
|
|
||||||
$closeCode = 0;
|
|
||||||
|
|
||||||
$bin = $frame->getPayload();
|
|
||||||
|
|
||||||
if (empty($bin)) {
|
|
||||||
$this->emit('close', [null]);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (strlen($bin) >= 2) {
|
|
||||||
list($closeCode) = array_merge(unpack('n*', substr($bin, 0, 2)));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!$frame->isValidCloseCode($closeCode)) {
|
|
||||||
$this->emit('close', [$frame::CLOSE_PROTOCOL]);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo:
|
|
||||||
//if (!$this->validator->checkEncoding(substr($bin, 2), 'UTF-8')) {
|
|
||||||
// $this->emit('close', [$frame::CLOSE_BAD_PAYLOAD]);
|
|
||||||
// return;
|
|
||||||
//}
|
|
||||||
|
|
||||||
$this->emit('close', [$closeCode]);
|
|
||||||
return;
|
|
||||||
break;
|
|
||||||
case $frame::OP_PING:
|
case $frame::OP_PING:
|
||||||
// this should probably be automatic
|
$context->onPing($frame);
|
||||||
//$from->send($this->newFrame($frame->getPayload(), true, $frame::OP_PONG));
|
break;
|
||||||
$this->emit('ping', [$frame]);
|
|
||||||
break;
|
|
||||||
case $frame::OP_PONG:
|
case $frame::OP_PONG:
|
||||||
$this->emit('pong', [$frame]);
|
$context->onPong($frame);
|
||||||
break;
|
break;
|
||||||
default:
|
|
||||||
$this->emit('close', [$frame::CLOSE_PROTOCOL]);
|
|
||||||
return;
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$overflow = $frame->extractOverflow();
|
$overflow = $frame->extractOverflow();
|
||||||
|
$context->setFrame(null);
|
||||||
unset($this->currentFrame, $frame, $opcode);
|
|
||||||
|
|
||||||
if (strlen($overflow) > 0) {
|
if (strlen($overflow) > 0) {
|
||||||
$this->onData($overflow);
|
$this->onData($overflow, $context);
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
@ -116,32 +54,30 @@ class MessageStreamer implements EventEmitterInterface {
|
|||||||
|
|
||||||
$overflow = $frame->extractOverflow();
|
$overflow = $frame->extractOverflow();
|
||||||
|
|
||||||
$frameAdded = $this->currentMessage->addFrame($this->currentFrame);
|
$frameAdded = $context->getMessage()->addFrame($frame);
|
||||||
if ($frameAdded !== true) {
|
if (true !== $frameAdded) {
|
||||||
$this->emit('close', [$frameAdded]);
|
$context->onClose($frameAdded);
|
||||||
}
|
}
|
||||||
unset($this->currentFrame);
|
$context->setFrame(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($this->currentMessage->isCoalesced()) {
|
if ($context->getMessage()->isCoalesced()) {
|
||||||
$msgCheck = $this->validator->checkMessage($this->currentMessage);
|
$msgCheck = $this->validator->checkMessage($context->getMessage());
|
||||||
if ($msgCheck !== true) {
|
if ($msgCheck !== true) {
|
||||||
if ($msgCheck === false) $msgCheck = null;
|
$context->onClose($msgCheck || null);
|
||||||
$this->emit('close', [$msgCheck]);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
$this->emit('message', [$this->currentMessage]);
|
$context->onMessage($context->getMessage());
|
||||||
//$parsed = $from->WebSocket->message->getPayload();
|
$context->setMessage(null);
|
||||||
unset($this->currentMessage);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strlen($overflow) > 0) {
|
if (strlen($overflow) > 0) {
|
||||||
$this->onData($overflow);
|
$this->onData($overflow, $context);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Message
|
* @return \Ratchet\RFC6455\Messaging\Protocol\MessageInterface
|
||||||
*/
|
*/
|
||||||
public function newMessage() {
|
public function newMessage() {
|
||||||
return new Message;
|
return new Message;
|
||||||
@ -151,7 +87,7 @@ class MessageStreamer implements EventEmitterInterface {
|
|||||||
* @param string|null $payload
|
* @param string|null $payload
|
||||||
* @param bool|null $final
|
* @param bool|null $final
|
||||||
* @param int|null $opcode
|
* @param int|null $opcode
|
||||||
* @return Frame
|
* @return \Ratchet\RFC6455\Messaging\Protocol\FrameInterface
|
||||||
*/
|
*/
|
||||||
public function newFrame($payload = null, $final = null, $opcode = null) {
|
public function newFrame($payload = null, $final = null, $opcode = null) {
|
||||||
return new Frame($payload, $final, $opcode);
|
return new Frame($payload, $final, $opcode);
|
||||||
|
@ -53,6 +53,10 @@ class MessageValidator {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param Frame $frame
|
||||||
|
* @return bool|int Return true if everything is good, an integer close code if not
|
||||||
|
*/
|
||||||
public function validateFrame(Frame $frame) {
|
public function validateFrame(Frame $frame) {
|
||||||
if (false !== $frame->getRsv1() ||
|
if (false !== $frame->getRsv1() ||
|
||||||
false !== $frame->getRsv2() ||
|
false !== $frame->getRsv2() ||
|
||||||
@ -79,7 +83,6 @@ class MessageValidator {
|
|||||||
|
|
||||||
$bin = $frame->getPayload();
|
$bin = $frame->getPayload();
|
||||||
|
|
||||||
|
|
||||||
if (empty($bin)) {
|
if (empty($bin)) {
|
||||||
return $frame::CLOSE_NORMAL;
|
return $frame::CLOSE_NORMAL;
|
||||||
}
|
}
|
||||||
|
@ -7,8 +7,7 @@
|
|||||||
"url": "ws://localhost:9001",
|
"url": "ws://localhost:9001",
|
||||||
"options": {"version": 18}}
|
"options": {"version": 18}}
|
||||||
],
|
],
|
||||||
|
"cases": ["*"],
|
||||||
"cases": ["1.*"],
|
|
||||||
"exclude-cases": ["12.*","13.*"],
|
"exclude-cases": ["12.*","13.*"],
|
||||||
"exclude-agent-cases": {}
|
"exclude-agent-cases": {}
|
||||||
}
|
}
|
||||||
|
@ -1,23 +1,75 @@
|
|||||||
<?php
|
<?php
|
||||||
|
|
||||||
use Ratchet\RFC6455\Messaging\Protocol\Frame;
|
use Ratchet\RFC6455\Messaging\Protocol\Frame;
|
||||||
use Ratchet\RFC6455\Messaging\Protocol\Message;
|
|
||||||
|
|
||||||
require_once __DIR__ . "/../bootstrap.php";
|
require_once __DIR__ . "/../bootstrap.php";
|
||||||
|
|
||||||
$loop = \React\EventLoop\Factory::create();
|
class ConnectionContext implements Ratchet\RFC6455\Messaging\Streaming\ContextInterface {
|
||||||
$socket = new \React\Socket\Server($loop);
|
private $_frame;
|
||||||
|
private $_message;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @var \React\Http\Response
|
||||||
|
*/
|
||||||
|
private $_conn;
|
||||||
|
|
||||||
|
public function __construct(\React\Http\Response $connectionContext) {
|
||||||
|
$this->_conn = $connectionContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function setFrame(\Ratchet\RFC6455\Messaging\Protocol\FrameInterface $frame = null) {
|
||||||
|
$this->_frame = $frame;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getFrame() {
|
||||||
|
return $this->_frame;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function setMessage(\Ratchet\RFC6455\Messaging\Protocol\MessageInterface $message = null) {
|
||||||
|
$this->_message = $message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getMessage() {
|
||||||
|
return $this->_message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function onMessage(\Ratchet\RFC6455\Messaging\Protocol\MessageInterface $msg) {
|
||||||
|
$frame = new Frame($msg->getPayload(), true, $msg[0]->getOpcode());
|
||||||
|
$this->_conn->write($frame->getContents());
|
||||||
|
}
|
||||||
|
|
||||||
|
public function onPing(\Ratchet\RFC6455\Messaging\Protocol\FrameInterface $frame) {
|
||||||
|
$pong = new Frame($frame->getPayload(), true, Frame::OP_PONG);
|
||||||
|
$this->_conn->write($pong->getContents());
|
||||||
|
}
|
||||||
|
|
||||||
|
public function onPong(\Ratchet\RFC6455\Messaging\Protocol\FrameInterface $msg) {
|
||||||
|
// TODO: Implement onPong() method.
|
||||||
|
}
|
||||||
|
|
||||||
|
public function onClose($code = 1000) {
|
||||||
|
$frame = new Frame(
|
||||||
|
pack('n', $code),
|
||||||
|
true,
|
||||||
|
Frame::OP_CLOSE
|
||||||
|
);
|
||||||
|
|
||||||
|
$this->_conn->end($frame->getContents());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$loop = \React\EventLoop\Factory::create();
|
||||||
|
$socket = new \React\Socket\Server($loop);
|
||||||
$server = new \React\Http\Server($socket);
|
$server = new \React\Http\Server($socket);
|
||||||
|
|
||||||
$server->on('request', function (\React\Http\Request $request, \React\Http\Response $response) {
|
$server->on('request', function (\React\Http\Request $request, \React\Http\Response $response) {
|
||||||
// saving this for later
|
$conn = new ConnectionContext($response);
|
||||||
$conn = $response;
|
|
||||||
|
$encodingValidator = new \Ratchet\RFC6455\Encoding\Validator;
|
||||||
|
|
||||||
// make the React Request a Psr7 request (not perfect)
|
// make the React Request a Psr7 request (not perfect)
|
||||||
$psrRequest = new \GuzzleHttp\Psr7\Request($request->getMethod(), $request->getPath(), $request->getHeaders());
|
$psrRequest = new \GuzzleHttp\Psr7\Request($request->getMethod(), $request->getPath(), $request->getHeaders());
|
||||||
|
|
||||||
$negotiator = new \Ratchet\RFC6455\Handshake\Negotiator(new \Ratchet\RFC6455\Encoding\NullValidator());
|
$negotiator = new \Ratchet\RFC6455\Handshake\Negotiator($encodingValidator);
|
||||||
|
|
||||||
$negotiatorResponse = $negotiator->handshake($psrRequest);
|
$negotiatorResponse = $negotiator->handshake($psrRequest);
|
||||||
|
|
||||||
@ -34,39 +86,11 @@ $server->on('request', function (\React\Http\Request $request, \React\Http\Respo
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$ms = new \Ratchet\RFC6455\Messaging\Streaming\MessageStreamer();
|
$ms = new \Ratchet\RFC6455\Messaging\Streaming\MessageStreamer($encodingValidator);
|
||||||
|
|
||||||
$ms->on('message', function (Message $msg) use ($conn) {
|
$request->on('data', function ($data) use ($ms, $conn) {
|
||||||
$opcode = $msg->isBinary() ? Frame::OP_BINARY : Frame::OP_TEXT;
|
$ms->onData($data, $conn);
|
||||||
$frame = new Frame($msg->getPayload(), true, $opcode);
|
|
||||||
$conn->write($frame->getContents());
|
|
||||||
});
|
|
||||||
|
|
||||||
$ms->on('ping', function (Frame $frame) use ($conn) {
|
|
||||||
$pong = new Frame($frame->getPayload(), true, Frame::OP_PONG);
|
|
||||||
$conn->write($pong->getContents());
|
|
||||||
});
|
|
||||||
|
|
||||||
$ms->on('pong', function (Frame $frame) {
|
|
||||||
echo "got PONG...\n";
|
|
||||||
});
|
|
||||||
|
|
||||||
$ms->on('close', function ($code) use ($conn) {
|
|
||||||
if ($code === null) {
|
|
||||||
$conn->close();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
$frame = new Frame(
|
|
||||||
pack('n', $code),
|
|
||||||
true,
|
|
||||||
Frame::OP_CLOSE
|
|
||||||
);
|
|
||||||
$conn->end($frame->getContents());
|
|
||||||
});
|
|
||||||
|
|
||||||
$request->on('data', function ($data) use ($ms) {
|
|
||||||
$ms->onData($data);
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
$socket->listen(9001);
|
$socket->listen(9001, '0.0.0.0');
|
||||||
$loop->run();
|
$loop->run();
|
||||||
|
Loading…
Reference in New Issue
Block a user