MessageStreamer refactor

Remove notion of context and nested callbacks
Each connection will create an instance of MessageParser to hold message/frame state
This commit is contained in:
Chris Boden 2015-12-22 20:16:55 -05:00
parent 06263cd9a5
commit 3c3588fc8b
6 changed files with 116 additions and 120 deletions

View File

@ -0,0 +1,24 @@
<?php
namespace Ratchet\RFC6455\Messaging\Protocol;
class CloseFrameChecker {
private $validCloseFrames = [];
public function __construct() {
$this->validCloseCodes = [
Frame::CLOSE_NORMAL,
Frame::CLOSE_GOING_AWAY,
Frame::CLOSE_PROTOCOL,
Frame::CLOSE_BAD_DATA,
Frame::CLOSE_BAD_PAYLOAD,
Frame::CLOSE_POLICY,
Frame::CLOSE_TOO_BIG,
Frame::CLOSE_MAND_EXT,
Frame::CLOSE_SRV_ERR,
];
}
public function __invoke($val) {
return ($val >= 3000 && $val <= 4999) || in_array($val, $this->validCloseCodes);
}
}

View File

@ -22,22 +22,6 @@ class Message implements \IteratorAggregate, MessageInterface {
return count($this->_frames); return count($this->_frames);
} }
public function offsetExists($index) {
return $this->_frames->offsetExists($index);
}
public function offsetGet($index) {
return $this->_frames->offsetGet($index);
}
public function offsetSet($index, $newval) {
throw new \DomainException('Frame access in messages is read-only');
}
public function offsetUnset($index) {
unset($this->_frames[$index]);
}
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */

View File

@ -1,7 +1,7 @@
<?php <?php
namespace Ratchet\RFC6455\Messaging\Protocol; namespace Ratchet\RFC6455\Messaging\Protocol;
interface MessageInterface extends DataInterface, \Traversable, \ArrayAccess, \Countable { interface MessageInterface extends DataInterface, \Traversable, \Countable {
/** /**
* @param FrameInterface $fragment * @param FrameInterface $fragment
* @return MessageInterface * @return MessageInterface

View File

@ -1,6 +1,7 @@
<?php <?php
namespace Ratchet\RFC6455\Messaging\Streaming; namespace Ratchet\RFC6455\Messaging\Streaming;
use Ratchet\RFC6455\Encoding\ValidatorInterface; use Ratchet\RFC6455\Encoding\ValidatorInterface;
use Ratchet\RFC6455\Messaging\Protocol\CloseFrameChecker;
use Ratchet\RFC6455\Messaging\Protocol\MessageInterface; use Ratchet\RFC6455\Messaging\Protocol\MessageInterface;
use Ratchet\RFC6455\Messaging\Protocol\FrameInterface; use Ratchet\RFC6455\Messaging\Protocol\FrameInterface;
use Ratchet\RFC6455\Messaging\Protocol\Message; use Ratchet\RFC6455\Messaging\Protocol\Message;
@ -12,23 +13,50 @@ class MessageStreamer {
*/ */
private $validator; private $validator;
/**
* @var \Ratchet\RFC6455\Messaging\Protocol\CloseFrameChecker
*/
private $closeFrameChecker;
/** /**
* @var callable * @var callable
*/ */
private $exceptionFactory; private $exceptionFactory;
/**
* @var \Ratchet\RFC6455\Messaging\Protocol\Message
*/
private $messageBuffer = null;
/**
* @var \Ratchet\RFC6455\Messaging\Protocol\Frame
*/
private $frameBuffer;
/**
* @var callable
*/
private $onMessage;
/**
* @var callable
*/
private $onControl;
/** /**
* @var bool * @var bool
*/ */
private $checkForMask; private $checkForMask;
/** function __construct(
* @var array ValidatorInterface $encodingValidator,
*/ CloseFrameChecker $frameChecker,
private $validCloseCodes; callable $onMessage,
callable $onControl = null,
function __construct(ValidatorInterface $encodingValidator, $expectMask = true) { $expectMask = true
) {
$this->validator = $encodingValidator; $this->validator = $encodingValidator;
$this->closeFrameChecker = $frameChecker;
$this->checkForMask = (bool)$expectMask; $this->checkForMask = (bool)$expectMask;
$exception = new \UnderflowException; $exception = new \UnderflowException;
@ -36,104 +64,67 @@ class MessageStreamer {
return $exception; return $exception;
}; };
$this->noop = function() {}; $this->onMessage = $onMessage;
$this->onControl = $onControl ?: function() {};
$this->validCloseCodes = [
Frame::CLOSE_NORMAL,
Frame::CLOSE_GOING_AWAY,
Frame::CLOSE_PROTOCOL,
Frame::CLOSE_BAD_DATA,
Frame::CLOSE_BAD_PAYLOAD,
Frame::CLOSE_POLICY,
Frame::CLOSE_TOO_BIG,
Frame::CLOSE_MAND_EXT,
Frame::CLOSE_SRV_ERR,
];
} }
/** /**
* @param $data * @param string $data
* @param mixed $context * @return null
* @param MessageInterface $message
* @param callable(MessageInterface) $onMessage
* @param callable(FrameInterface) $onControl
* @return MessageInterface
*/ */
public function onData($data, MessageInterface $message = null, callable $onMessage, callable $onControl = null, $context = null) { public function onData($data) {
$overflow = ''; $this->messageBuffer ?: $this->messageBuffer = $this->newMessage();
$this->frameBuffer ?: $this->frameBuffer = $this->newFrame();
$onControl ?: $this->noop; $this->frameBuffer->addBuffer($data);
$message ?: $message = $this->newMessage(); if (!$this->frameBuffer->isCoalesced()) {
return;
}
$prevFrame = null; $onMessage = $this->onMessage;
$frameCount = count($message); $onControl = $this->onControl;
if ($frameCount > 0) { $this->frameBuffer = $this->frameCheck($this->frameBuffer);
$frame = $message[$frameCount - 1];
if ($frame->isCoalesced()) { $overflow = $this->frameBuffer->extractOverflow();
$prevFrame = $frame; $this->frameBuffer->unMaskPayload();
$frame = $this->newFrame();
$message->addFrame($frame); $opcode = $this->frameBuffer->getOpcode();
$frameCount++;
} elseif ($frameCount > 1) { if ($opcode > 2) {
$prevFrame = $message[$frameCount - 2]; $onControl($this->frameBuffer);
if (Frame::OP_CLOSE === $opcode) {
return;
} }
} else { } else {
$frame = $this->newFrame(); $this->messageBuffer->addFrame($this->frameBuffer);
$message->addFrame($frame);
$frameCount++;
} }
$frame->addBuffer($data); $this->frameBuffer = null;
if ($frame->isCoalesced()) {
$frame = $this->frameCheck($frame, $prevFrame);
$opcode = $frame->getOpcode(); if ($this->messageBuffer->isCoalesced()) {
if ($opcode > 2) { $msgCheck = $this->checkMessage($this->messageBuffer);
$onControl($frame, $context);
unset($message[$frameCount - 1]);
$overflow = $frame->extractOverflow();
if (strlen($overflow) > 0) {
$message = $this->onData($overflow, $message, $onMessage, $onControl, $context);
}
return $message;
}
$overflow = $frame->extractOverflow();
$frame->unMaskPayload();
}
if ($message->isCoalesced()) {
$msgCheck = $this->checkMessage($message);
if (true !== $msgCheck) { if (true !== $msgCheck) {
$onControl($this->newCloseFrame($msgCheck), $context); $onControl($this->newCloseFrame($msgCheck));
} else {
return $this->newMessage(); $onMessage($this->messageBuffer);
} }
$onMessage($message, $context); $this->messageBuffer = null;
$message = $this->newMessage();
} }
if (strlen($overflow) > 0) { if (strlen($overflow) > 0) {
$this->onData($overflow, $message, $onMessage, $onControl, $context); $this->onData($overflow); // PHP doesn't do tail recursion :(
} }
return $message;
} }
/** /**
* Check a frame and previous frame in a message; returns the frame that should be dealt with * Check a frame to be added to the current message buffer
* @param \Ratchet\RFC6455\Messaging\Protocol\FrameInterface|FrameInterface $frame * @param \Ratchet\RFC6455\Messaging\Protocol\FrameInterface|FrameInterface $frame
* @param \Ratchet\RFC6455\Messaging\Protocol\FrameInterface|FrameInterface $previousFrame
* @return \Ratchet\RFC6455\Messaging\Protocol\FrameInterface|FrameInterface * @return \Ratchet\RFC6455\Messaging\Protocol\FrameInterface|FrameInterface
*/ */
public function frameCheck(FrameInterface $frame, FrameInterface $previousFrame = null) { public function frameCheck(FrameInterface $frame) {
if (false !== $frame->getRsv1() || if (false !== $frame->getRsv1() ||
false !== $frame->getRsv2() || false !== $frame->getRsv2() ||
false !== $frame->getRsv3() false !== $frame->getRsv3()
@ -170,7 +161,8 @@ class MessageStreamer {
list($closeCode) = array_merge(unpack('n*', substr($bin, 0, 2))); list($closeCode) = array_merge(unpack('n*', substr($bin, 0, 2)));
} }
if (!$this->isValidCloseCode($closeCode)) { $checker = $this->closeFrameChecker;
if (!$checker($closeCode)) {
return $this->newCloseFrame(Frame::CLOSE_PROTOCOL); return $this->newCloseFrame(Frame::CLOSE_PROTOCOL);
} }
@ -191,11 +183,11 @@ class MessageStreamer {
return $frame; return $frame;
} }
if (Frame::OP_CONTINUE === $frame->getOpcode() && null === $previousFrame) { if (Frame::OP_CONTINUE == $frame->getOpcode() && 0 == count($this->messageBuffer)) {
return $this->newCloseFrame(Frame::CLOSE_PROTOCOL); return $this->newCloseFrame(Frame::CLOSE_PROTOCOL);
} }
if (null !== $previousFrame && Frame::OP_CONTINUE != $frame->getOpcode()) { if (count($this->messageBuffer) > 0 && Frame::OP_CONTINUE != $frame->getOpcode()) {
return $this->newCloseFrame(Frame::CLOSE_PROTOCOL); return $this->newCloseFrame(Frame::CLOSE_PROTOCOL);
} }
@ -237,8 +229,4 @@ class MessageStreamer {
public function newCloseFrame($code) { public function newCloseFrame($code) {
return $this->newFrame(pack('n', $code), true, Frame::OP_CLOSE); return $this->newFrame(pack('n', $code), true, Frame::OP_CLOSE);
} }
public function isValidCloseCode($val) {
return ($val >= 3000 && $val <= 4999) || in_array($val, $this->validCloseCodes);
}
} }

View File

@ -10,10 +10,10 @@ $socket = new \React\Socket\Server($loop);
$server = new \React\Http\Server($socket); $server = new \React\Http\Server($socket);
$encodingValidator = new \Ratchet\RFC6455\Encoding\Validator; $encodingValidator = new \Ratchet\RFC6455\Encoding\Validator;
$closeFrameChecker = new \Ratchet\RFC6455\Messaging\Protocol\CloseFrameChecker;
$negotiator = new \Ratchet\RFC6455\Handshake\Negotiator($encodingValidator); $negotiator = new \Ratchet\RFC6455\Handshake\Negotiator($encodingValidator);
$ms = new \Ratchet\RFC6455\Messaging\Streaming\MessageStreamer($encodingValidator);
$server->on('request', function (\React\Http\Request $request, \React\Http\Response $response) use ($negotiator, $ms) { $server->on('request', function (\React\Http\Request $request, \React\Http\Response $response) use ($negotiator, $encodingValidator, $closeFrameChecker) {
$psrRequest = new \GuzzleHttp\Psr7\Request($request->getMethod(), $request->getPath(), $request->getHeaders()); $psrRequest = new \GuzzleHttp\Psr7\Request($request->getMethod(), $request->getPath(), $request->getHeaders());
$negotiatorResponse = $negotiator->handshake($psrRequest); $negotiatorResponse = $negotiator->handshake($psrRequest);
@ -31,21 +31,21 @@ $server->on('request', function (\React\Http\Request $request, \React\Http\Respo
return; return;
} }
$msg = null; $parser = new \Ratchet\RFC6455\Messaging\Streaming\MessageStreamer($encodingValidator, $closeFrameChecker, function(MessageInterface $message) use ($response) {
$request->on('data', function($data) use ($ms, $response, &$msg) { $response->write($message->getContents());
$msg = $ms->onData($data, $msg, function(MessageInterface $msg, \React\Http\Response $conn) { }, function(FrameInterface $frame) use ($response, &$parser) {
$conn->write($msg->getContents());
}, function(FrameInterface $frame, \React\Http\Response $conn) use ($ms) {
switch ($frame->getOpCode()) { switch ($frame->getOpCode()) {
case Frame::OP_CLOSE: case Frame::OP_CLOSE:
$conn->end($frame->getContents()); $response->end($frame->getContents());
break; break;
case Frame::OP_PING: case Frame::OP_PING:
$conn->write($ms->newFrame($frame->getPayload(), true, Frame::OP_PONG)->getContents()); $response->write($parser->newFrame($frame->getPayload(), true, Frame::OP_PONG)->getContents());
break; break;
} }
}, $response);
}); });
$request->on('data', [$parser, 'onData']);
}); });
$socket->listen(9001, '0.0.0.0'); $socket->listen(9001, '0.0.0.0');
$loop->run(); $loop->run();