Replace ContextInterface with callable's

Replace ContextInterface with callable's
Move message/frame validation back into streamer
Always return frame objects from check
Move close code validation to validator, not base element
This commit is contained in:
Chris Boden 2015-06-01 22:39:17 -04:00
parent d8babac7e7
commit 59464f855c
6 changed files with 195 additions and 277 deletions

View File

@ -71,7 +71,7 @@ class Frame implements FrameInterface {
* @param string|null $payload * @param string|null $payload
* @param bool $final * @param bool $final
* @param int $opcode * @param int $opcode
* @param callable $ufExceptionFactory<\UnderflowException> * @param callable<\UnderflowException> $ufExceptionFactory
*/ */
public function __construct($payload = null, $final = true, $opcode = 1, callable $ufExceptionFactory = null) { public function __construct($payload = null, $final = true, $opcode = 1, callable $ufExceptionFactory = null) {
$this->ufeg = $ufExceptionFactory ?: function($msg = '') { $this->ufeg = $ufExceptionFactory ?: function($msg = '') {
@ -449,7 +449,6 @@ class Frame implements FrameInterface {
/** /**
* Sometimes clients will concatenate more than one frame over the wire * Sometimes clients will concatenate more than one frame over the wire
* This method will take the extra bytes off the end and return them * This method will take the extra bytes off the end and return them
* @todo Consider returning new Frame
* @return string * @return string
*/ */
public function extractOverflow() { public function extractOverflow() {
@ -467,34 +466,4 @@ class Frame implements FrameInterface {
return ''; return '';
} }
/**
* Determine if a close code is valid
* @param int|string
* @return bool
*/
public function isValidCloseCode($val) {
if (in_array($val, [
static::CLOSE_NORMAL,
static::CLOSE_GOING_AWAY,
static::CLOSE_PROTOCOL,
static::CLOSE_BAD_DATA,
//static::CLOSE_NO_STATUS,
//static::CLOSE_ABNORMAL,
static::CLOSE_BAD_PAYLOAD,
static::CLOSE_POLICY,
static::CLOSE_TOO_BIG,
static::CLOSE_MAND_EXT,
static::CLOSE_SRV_ERR,
//static::CLOSE_TLS,
])) {
return true;
}
if ($val >= 3000 && $val <= 4999) {
return true;
}
return false;
}
} }

View File

@ -35,7 +35,7 @@ class Message implements \IteratorAggregate, MessageInterface {
} }
public function offsetUnset($index) { public function offsetUnset($index) {
throw new \DomainException('Frame access in messages is read-only'); unset($this->_frames[$index]);
} }
/** /**

View File

@ -1,37 +0,0 @@
<?php
namespace Ratchet\RFC6455\Messaging\Streaming;
use Ratchet\RFC6455\Messaging\Protocol\MessageInterface;
use Ratchet\RFC6455\Messaging\Protocol\FrameInterface;
interface ContextInterface {
/**
* @param FrameInterface $frame
* @return FrameInterface
*/
public function setFrame(FrameInterface $frame = null);
/**
* @return \Ratchet\RFC6455\Messaging\Protocol\FrameInterface
*/
public function getFrame();
/**
* @param MessageInterface $message
* @return MessageInterface
*/
public function setMessage(MessageInterface $message = null);
/**
* @return \Ratchet\RFC6455\Messaging\Protocol\MessageInterface
*/
public function getMessage();
public function onMessage(MessageInterface $msg);
public function onPing(FrameInterface $frame);
public function onPong(FrameInterface $frame);
/**
* @param $code int
*/
public function onClose($code);
}

View File

@ -1,86 +1,220 @@
<?php <?php
namespace Ratchet\RFC6455\Messaging\Streaming; namespace Ratchet\RFC6455\Messaging\Streaming;
use Ratchet\RFC6455\Encoding\ValidatorInterface; use Ratchet\RFC6455\Encoding\ValidatorInterface;
use Ratchet\RFC6455\Messaging\Protocol\Frame; use Ratchet\RFC6455\Messaging\Protocol\MessageInterface;
use Ratchet\RFC6455\Messaging\Protocol\FrameInterface;
use Ratchet\RFC6455\Messaging\Protocol\Message; use Ratchet\RFC6455\Messaging\Protocol\Message;
use Ratchet\RFC6455\Messaging\Validation\MessageValidator; use Ratchet\RFC6455\Messaging\Protocol\Frame;
class MessageStreamer { class MessageStreamer {
/** /**
* @var MessageValidator * @var \Ratchet\RFC6455\Encoding\ValidatorInterface
*/ */
private $validator; private $validator;
/**
* @var callable
*/
private $exceptionFactory; private $exceptionFactory;
function __construct(ValidatorInterface $encodingValidator, $expectMask = false) { /**
$this->validator = new MessageValidator($encodingValidator, !$expectMask); * @var bool
*/
private $checkForMask;
/**
* @var array
*/
private $validCloseCodes;
function __construct(ValidatorInterface $encodingValidator, $expectMask = true) {
$this->validator = $encodingValidator;
$this->checkForMask = (bool)$expectMask;
$exception = new \UnderflowException; $exception = new \UnderflowException;
$this->exceptionFactory = function() use ($exception) { $this->exceptionFactory = function() use ($exception) {
return $exception; return $exception;
}; };
$this->noop = function() {};
$this->validCloseCodes = [
Frame::CLOSE_NORMAL,
Frame::CLOSE_GOING_AWAY,
Frame::CLOSE_PROTOCOL,
Frame::CLOSE_BAD_DATA,
Frame::CLOSE_BAD_PAYLOAD,
Frame::CLOSE_POLICY,
Frame::CLOSE_TOO_BIG,
Frame::CLOSE_MAND_EXT,
Frame::CLOSE_SRV_ERR,
];
} }
/**
public function onData($data, ContextInterface $context) { * @param $data
* @param mixed $context
* @param MessageInterface $message
* @param callable(MessageInterface) $onMessage
* @param callable(FrameInterface) $onControl
* @return MessageInterface
*/
public function onData($data, $context, MessageInterface $message = null, callable $onMessage, callable $onControl = null) {
$overflow = ''; $overflow = '';
$message = $context->getMessage() ?: $context->setMessage($this->newMessage()); $onControl ?: $this->noop;
$frame = $context->getFrame() ?: $context->setFrame($this->newFrame()); $message ?: $message = $this->newMessage();
$prevFrame = null;
$frameCount = count($message);
if ($frameCount > 0) {
$frame = $message[$frameCount - 1];
if ($frame->isCoalesced()) {
$prevFrame = $frame;
$frame = $this->newFrame();
$message->addFrame($frame);
$frameCount++;
} elseif ($frameCount > 1) {
$prevFrame = $message[$frameCount - 2];
}
} else {
$frame = $this->newFrame();
$message->addFrame($frame);
$frameCount++;
}
$frame->addBuffer($data); $frame->addBuffer($data);
if ($frame->isCoalesced()) { if ($frame->isCoalesced()) {
$frameCount = $message->count(); $frame = $this->frameCheck($frame, $prevFrame);
$prevFrame = $frameCount > 0 ? $message[$frameCount - 1] : null;
$frameStatus = $this->validator->validateFrame($frame, $prevFrame);
if (0 !== $frameStatus) {
return $context->onClose($frameStatus);
}
$opcode = $frame->getOpcode(); $opcode = $frame->getOpcode();
if ($opcode > 2) { if ($opcode > 2) {
switch ($opcode) { $onControl($frame, $context);
case Frame::OP_PING: unset($message[$frameCount - 1]);
$context->onPing($frame);
break;
case Frame::OP_PONG:
$context->onPong($frame);
break;
}
$overflow = $frame->extractOverflow(); $overflow = $frame->extractOverflow();
$context->setFrame(null);
if (strlen($overflow) > 0) { if (strlen($overflow) > 0) {
$this->onData($overflow, $context); $message = $this->onData($overflow, $context, $message, $onMessage, $onControl);
} }
return; return $message;
} }
$overflow = $frame->extractOverflow(); $overflow = $frame->extractOverflow();
$frame->unMaskPayload(); $frame->unMaskPayload();
$message->addFrame($frame);
$context->setFrame(null);
} }
if ($message->isCoalesced()) { if ($message->isCoalesced()) {
$msgCheck = $this->validator->checkMessage($message); $msgCheck = $this->checkMessage($message);
if (true !== $msgCheck) { if (true !== $msgCheck) {
return $context->onClose($msgCheck); $onControl($this->newCloseFrame($msgCheck), $context);
return $this->newMessage();
} }
$context->onMessage($message); $onMessage($message, $context);
$context->setMessage(null); $message = $this->newMessage();
} }
if (strlen($overflow) > 0) { if (strlen($overflow) > 0) {
$this->onData($overflow, $context); $this->onData($overflow, $context, $message, $onMessage, $onControl);
} }
return $message;
}
/**
* Check a frame and previous frame in a message; returns the frame that should be dealt with
* @param \Ratchet\RFC6455\Messaging\Protocol\FrameInterface|FrameInterface $frame
* @param \Ratchet\RFC6455\Messaging\Protocol\FrameInterface|FrameInterface $previousFrame
* @return \Ratchet\RFC6455\Messaging\Protocol\FrameInterface|FrameInterface
*/
public function frameCheck(FrameInterface $frame, FrameInterface $previousFrame = null) {
if (false !== $frame->getRsv1() ||
false !== $frame->getRsv2() ||
false !== $frame->getRsv3()
) {
return $this->newCloseFrame(Frame::CLOSE_PROTOCOL);
}
if ($this->checkForMask && !$frame->isMasked()) {
return $this->newCloseFrame(Frame::CLOSE_PROTOCOL);
}
$opcode = $frame->getOpcode();
if ($opcode > 2) {
if ($frame->getPayloadLength() > 125 || !$frame->isFinal()) {
return $this->newCloseFrame(Frame::CLOSE_PROTOCOL);
}
switch ($opcode) {
case Frame::OP_CLOSE:
$closeCode = 0;
$bin = $frame->getPayload();
if (empty($bin)) {
return $this->newCloseFrame(Frame::CLOSE_NORMAL);
}
if (strlen($bin) == 1) {
return $this->newCloseFrame(Frame::CLOSE_PROTOCOL);
}
if (strlen($bin) >= 2) {
list($closeCode) = array_merge(unpack('n*', substr($bin, 0, 2)));
}
if (!$this->isValidCloseCode($closeCode)) {
return $this->newCloseFrame(Frame::CLOSE_PROTOCOL);
}
if (!$this->validator->checkEncoding(substr($bin, 2), 'UTF-8')) {
return $this->newCloseFrame(Frame::CLOSE_BAD_PAYLOAD);
}
return $this->newCloseFrame(Frame::CLOSE_NORMAL);
break;
case Frame::OP_PING:
case Frame::OP_PONG:
break;
default:
return $this->newCloseFrame(Frame::CLOSE_PROTOCOL);
break;
}
return $frame;
}
if (Frame::OP_CONTINUE === $frame->getOpcode() && null === $previousFrame) {
return $this->newCloseFrame(Frame::CLOSE_PROTOCOL);
}
if (null !== $previousFrame && Frame::OP_CONTINUE != $frame->getOpcode()) {
return $this->newCloseFrame(Frame::CLOSE_PROTOCOL);
}
return $frame;
}
/**
* Determine if a message is valid
* @param \Ratchet\RFC6455\Messaging\Protocol\MessageInterface
* @return bool|int true if valid - false if incomplete - int of recommended close code
*/
public function checkMessage(MessageInterface $message) {
if (!$message->isBinary()) {
if (!$this->validator->checkEncoding($message->getPayload(), 'UTF-8')) {
return Frame::CLOSE_BAD_PAYLOAD;
}
}
return true;
} }
/** /**
@ -99,4 +233,12 @@ class MessageStreamer {
public function newFrame($payload = null, $final = null, $opcode = null) { public function newFrame($payload = null, $final = null, $opcode = null) {
return new Frame($payload, $final, $opcode, $this->exceptionFactory); return new Frame($payload, $final, $opcode, $this->exceptionFactory);
} }
public function newCloseFrame($code) {
return $this->newFrame(pack('n', $code), true, Frame::OP_CLOSE);
}
public function isValidCloseCode($val) {
return ($val >= 3000 && $val <= 4999) || in_array($val, $this->validCloseCodes);
}
} }

View File

@ -1,110 +0,0 @@
<?php
namespace Ratchet\RFC6455\Messaging\Validation;
use Ratchet\RFC6455\Encoding\ValidatorInterface;
use Ratchet\RFC6455\Messaging\Protocol\Frame;
use Ratchet\RFC6455\Messaging\Protocol\FrameInterface;
use Ratchet\RFC6455\Messaging\Protocol\MessageInterface;
class MessageValidator {
public $checkForMask;
private $validator;
public function __construct(ValidatorInterface $validator, $checkForMask = true) {
$this->validator = $validator;
$this->checkForMask = $checkForMask;
}
/**
* Determine if a message is valid
* @param \Ratchet\RFC6455\Messaging\Protocol\MessageInterface
* @return bool|int true if valid - false if incomplete - int of recommended close code
*/
public function checkMessage(MessageInterface $message) {
$frame = $message[0];
if (!$message->isBinary()) {
$parsed = $message->getPayload();
if (!$this->validator->checkEncoding($parsed, 'UTF-8')) {
return $frame::CLOSE_BAD_PAYLOAD;
}
}
return true;
}
/**
* @param FrameInterface $frame
* @param FrameInterface $previousFrame
* @return int Return 0 if everything is good, an integer close code if not
*/
public function validateFrame(FrameInterface $frame, FrameInterface $previousFrame = null) {
if (false !== $frame->getRsv1() ||
false !== $frame->getRsv2() ||
false !== $frame->getRsv3()
) {
return Frame::CLOSE_PROTOCOL;
}
// Should be checking all frames
if ($this->checkForMask && !$frame->isMasked()) {
return Frame::CLOSE_PROTOCOL;
}
$opcode = $frame->getOpcode();
if ($opcode > 2) {
if ($frame->getPayloadLength() > 125 || !$frame->isFinal()) {
return Frame::CLOSE_PROTOCOL;
}
switch ($opcode) {
case Frame::OP_CLOSE:
$closeCode = 0;
$bin = $frame->getPayload();
if (empty($bin)) {
return Frame::CLOSE_NORMAL;
}
if (strlen($bin) == 1) {
return Frame::CLOSE_PROTOCOL;
}
if (strlen($bin) >= 2) {
list($closeCode) = array_merge(unpack('n*', substr($bin, 0, 2)));
}
if (!$frame->isValidCloseCode($closeCode)) {
return Frame::CLOSE_PROTOCOL;
}
if (!$this->validator->checkEncoding(substr($bin, 2), 'UTF-8')) {
return Frame::CLOSE_BAD_PAYLOAD;
}
return Frame::CLOSE_NORMAL;
break;
case Frame::OP_PING:
case Frame::OP_PONG:
break;
default:
return Frame::CLOSE_PROTOCOL;
break;
}
return 0;
}
if (Frame::OP_CONTINUE === $frame->getOpcode() && null === $previousFrame) {
return Frame::CLOSE_PROTOCOL;
}
if (null !== $previousFrame && Frame::OP_CONTINUE != $frame->getOpcode()) {
return Frame::CLOSE_PROTOCOL;
}
return 0;
}
}

View File

@ -1,65 +1,10 @@
<?php <?php
use Ratchet\RFC6455\Messaging\Protocol\MessageInterface;
use Ratchet\RFC6455\Messaging\Protocol\FrameInterface;
use Ratchet\RFC6455\Messaging\Protocol\Frame; use Ratchet\RFC6455\Messaging\Protocol\Frame;
require_once __DIR__ . "/../bootstrap.php"; require_once __DIR__ . "/../bootstrap.php";
class ConnectionContext implements Ratchet\RFC6455\Messaging\Streaming\ContextInterface {
private $_frame;
private $_message;
/**
* @var \React\Http\Response
*/
private $_conn;
public function __construct(\React\Http\Response $connectionContext) {
$this->_conn = $connectionContext;
}
public function setFrame(\Ratchet\RFC6455\Messaging\Protocol\FrameInterface $frame = null) {
$this->_frame = $frame;
return $frame;
}
public function getFrame() {
return $this->_frame;
}
public function setMessage(\Ratchet\RFC6455\Messaging\Protocol\MessageInterface $message = null) {
$this->_message = $message;
return $message;
}
public function getMessage() {
return $this->_message;
}
public function onMessage(\Ratchet\RFC6455\Messaging\Protocol\MessageInterface $msg) {
$this->_conn->write($msg->getContents());
}
public function onPing(\Ratchet\RFC6455\Messaging\Protocol\FrameInterface $frame) {
$pong = new Frame($frame->getPayload(), true, Frame::OP_PONG);
$this->_conn->write($pong->getContents());
}
public function onPong(\Ratchet\RFC6455\Messaging\Protocol\FrameInterface $msg) {
// TODO: Implement onPong() method.
}
public function onClose($code = 1000) {
$frame = new Frame(
pack('n', $code),
true,
Frame::OP_CLOSE
);
$this->_conn->end($frame->getContents());
}
}
$loop = \React\EventLoop\Factory::create(); $loop = \React\EventLoop\Factory::create();
$socket = new \React\Socket\Server($loop); $socket = new \React\Socket\Server($loop);
$server = new \React\Http\Server($socket); $server = new \React\Http\Server($socket);
@ -69,9 +14,6 @@ $negotiator = new \Ratchet\RFC6455\Handshake\Negotiator($encodingValidator);
$ms = new \Ratchet\RFC6455\Messaging\Streaming\MessageStreamer($encodingValidator); $ms = new \Ratchet\RFC6455\Messaging\Streaming\MessageStreamer($encodingValidator);
$server->on('request', function (\React\Http\Request $request, \React\Http\Response $response) use ($negotiator, $ms) { $server->on('request', function (\React\Http\Request $request, \React\Http\Response $response) use ($negotiator, $ms) {
$conn = new ConnectionContext($response);
// make the React Request a Psr7 request (not perfect)
$psrRequest = new \GuzzleHttp\Psr7\Request($request->getMethod(), $request->getPath(), $request->getHeaders()); $psrRequest = new \GuzzleHttp\Psr7\Request($request->getMethod(), $request->getPath(), $request->getHeaders());
$negotiatorResponse = $negotiator->handshake($psrRequest); $negotiatorResponse = $negotiator->handshake($psrRequest);
@ -89,8 +31,20 @@ $server->on('request', function (\React\Http\Request $request, \React\Http\Respo
return; return;
} }
$request->on('data', function ($data) use ($ms, $conn) { $msg = null;
$ms->onData($data, $conn); $request->on('data', function($data) use ($ms, $response, &$msg) {
$msg = $ms->onData($data, $response, $msg, function(MessageInterface $msg, \React\Http\Response $conn) {
$conn->write($msg->getContents());
}, function(FrameInterface $frame, \React\Http\Response $conn) use ($ms) {
switch ($frame->getOpCode()) {
case Frame::OP_CLOSE:
$conn->end($frame->getContents());
break;
case Frame::OP_PING:
$conn->write($ms->newFrame($frame->getPayload(), true, Frame::OP_PONG)->getContents());
break;
}
});
}); });
}); });
$socket->listen(9001, '0.0.0.0'); $socket->listen(9001, '0.0.0.0');