diff --git a/src/Messaging/Protocol/Frame.php b/src/Messaging/Protocol/Frame.php index a06d22e..88aa818 100644 --- a/src/Messaging/Protocol/Frame.php +++ b/src/Messaging/Protocol/Frame.php @@ -71,7 +71,7 @@ class Frame implements FrameInterface { * @param string|null $payload * @param bool $final * @param int $opcode - * @param callable $ufExceptionFactory<\UnderflowException> + * @param callable<\UnderflowException> $ufExceptionFactory */ public function __construct($payload = null, $final = true, $opcode = 1, callable $ufExceptionFactory = null) { $this->ufeg = $ufExceptionFactory ?: function($msg = '') { @@ -449,7 +449,6 @@ class Frame implements FrameInterface { /** * Sometimes clients will concatenate more than one frame over the wire * This method will take the extra bytes off the end and return them - * @todo Consider returning new Frame * @return string */ public function extractOverflow() { @@ -467,34 +466,4 @@ class Frame implements FrameInterface { return ''; } - - /** - * Determine if a close code is valid - * @param int|string - * @return bool - */ - public function isValidCloseCode($val) { - if (in_array($val, [ - static::CLOSE_NORMAL, - static::CLOSE_GOING_AWAY, - static::CLOSE_PROTOCOL, - static::CLOSE_BAD_DATA, - //static::CLOSE_NO_STATUS, - //static::CLOSE_ABNORMAL, - static::CLOSE_BAD_PAYLOAD, - static::CLOSE_POLICY, - static::CLOSE_TOO_BIG, - static::CLOSE_MAND_EXT, - static::CLOSE_SRV_ERR, - //static::CLOSE_TLS, - ])) { - return true; - } - - if ($val >= 3000 && $val <= 4999) { - return true; - } - - return false; - } } diff --git a/src/Messaging/Protocol/Message.php b/src/Messaging/Protocol/Message.php index 2ac28a7..0a95f19 100644 --- a/src/Messaging/Protocol/Message.php +++ b/src/Messaging/Protocol/Message.php @@ -35,7 +35,7 @@ class Message implements \IteratorAggregate, MessageInterface { } public function offsetUnset($index) { - throw new \DomainException('Frame access in messages is read-only'); + unset($this->_frames[$index]); } /** diff --git a/src/Messaging/Streaming/ContextInterface.php b/src/Messaging/Streaming/ContextInterface.php deleted file mode 100644 index 0bd101f..0000000 --- a/src/Messaging/Streaming/ContextInterface.php +++ /dev/null @@ -1,37 +0,0 @@ -validator = new MessageValidator($encodingValidator, !$expectMask); + /** + * @var bool + */ + private $checkForMask; + + /** + * @var array + */ + private $validCloseCodes; + + function __construct(ValidatorInterface $encodingValidator, $expectMask = true) { + $this->validator = $encodingValidator; + $this->checkForMask = (bool)$expectMask; $exception = new \UnderflowException; $this->exceptionFactory = function() use ($exception) { return $exception; }; + + $this->noop = function() {}; + + $this->validCloseCodes = [ + Frame::CLOSE_NORMAL, + Frame::CLOSE_GOING_AWAY, + Frame::CLOSE_PROTOCOL, + Frame::CLOSE_BAD_DATA, + Frame::CLOSE_BAD_PAYLOAD, + Frame::CLOSE_POLICY, + Frame::CLOSE_TOO_BIG, + Frame::CLOSE_MAND_EXT, + Frame::CLOSE_SRV_ERR, + ]; } - - public function onData($data, ContextInterface $context) { + /** + * @param $data + * @param mixed $context + * @param MessageInterface $message + * @param callable(MessageInterface) $onMessage + * @param callable(FrameInterface) $onControl + * @return MessageInterface + */ + public function onData($data, $context, MessageInterface $message = null, callable $onMessage, callable $onControl = null) { $overflow = ''; - $message = $context->getMessage() ?: $context->setMessage($this->newMessage()); - $frame = $context->getFrame() ?: $context->setFrame($this->newFrame()); + $onControl ?: $this->noop; + $message ?: $message = $this->newMessage(); + + $prevFrame = null; + $frameCount = count($message); + + if ($frameCount > 0) { + $frame = $message[$frameCount - 1]; + + if ($frame->isCoalesced()) { + $prevFrame = $frame; + $frame = $this->newFrame(); + $message->addFrame($frame); + $frameCount++; + } elseif ($frameCount > 1) { + $prevFrame = $message[$frameCount - 2]; + } + } else { + $frame = $this->newFrame(); + $message->addFrame($frame); + $frameCount++; + } $frame->addBuffer($data); if ($frame->isCoalesced()) { - $frameCount = $message->count(); - $prevFrame = $frameCount > 0 ? $message[$frameCount - 1] : null; - - $frameStatus = $this->validator->validateFrame($frame, $prevFrame); - - if (0 !== $frameStatus) { - return $context->onClose($frameStatus); - } + $frame = $this->frameCheck($frame, $prevFrame); $opcode = $frame->getOpcode(); if ($opcode > 2) { - switch ($opcode) { - case Frame::OP_PING: - $context->onPing($frame); - break; - case Frame::OP_PONG: - $context->onPong($frame); - break; - } + $onControl($frame, $context); + unset($message[$frameCount - 1]); $overflow = $frame->extractOverflow(); - $context->setFrame(null); if (strlen($overflow) > 0) { - $this->onData($overflow, $context); + $message = $this->onData($overflow, $context, $message, $onMessage, $onControl); } - return; + return $message; } $overflow = $frame->extractOverflow(); $frame->unMaskPayload(); - $message->addFrame($frame); - $context->setFrame(null); } if ($message->isCoalesced()) { - $msgCheck = $this->validator->checkMessage($message); + $msgCheck = $this->checkMessage($message); if (true !== $msgCheck) { - return $context->onClose($msgCheck); + $onControl($this->newCloseFrame($msgCheck), $context); + + return $this->newMessage(); } - $context->onMessage($message); - $context->setMessage(null); + $onMessage($message, $context); + $message = $this->newMessage(); } if (strlen($overflow) > 0) { - $this->onData($overflow, $context); + $this->onData($overflow, $context, $message, $onMessage, $onControl); } + + return $message; + } + + /** + * Check a frame and previous frame in a message; returns the frame that should be dealt with + * @param \Ratchet\RFC6455\Messaging\Protocol\FrameInterface|FrameInterface $frame + * @param \Ratchet\RFC6455\Messaging\Protocol\FrameInterface|FrameInterface $previousFrame + * @return \Ratchet\RFC6455\Messaging\Protocol\FrameInterface|FrameInterface + */ + public function frameCheck(FrameInterface $frame, FrameInterface $previousFrame = null) { + if (false !== $frame->getRsv1() || + false !== $frame->getRsv2() || + false !== $frame->getRsv3() + ) { + return $this->newCloseFrame(Frame::CLOSE_PROTOCOL); + } + + if ($this->checkForMask && !$frame->isMasked()) { + return $this->newCloseFrame(Frame::CLOSE_PROTOCOL); + } + + $opcode = $frame->getOpcode(); + + if ($opcode > 2) { + if ($frame->getPayloadLength() > 125 || !$frame->isFinal()) { + return $this->newCloseFrame(Frame::CLOSE_PROTOCOL); + } + + switch ($opcode) { + case Frame::OP_CLOSE: + $closeCode = 0; + + $bin = $frame->getPayload(); + + if (empty($bin)) { + return $this->newCloseFrame(Frame::CLOSE_NORMAL); + } + + if (strlen($bin) == 1) { + return $this->newCloseFrame(Frame::CLOSE_PROTOCOL); + } + + if (strlen($bin) >= 2) { + list($closeCode) = array_merge(unpack('n*', substr($bin, 0, 2))); + } + + if (!$this->isValidCloseCode($closeCode)) { + return $this->newCloseFrame(Frame::CLOSE_PROTOCOL); + } + + if (!$this->validator->checkEncoding(substr($bin, 2), 'UTF-8')) { + return $this->newCloseFrame(Frame::CLOSE_BAD_PAYLOAD); + } + + return $this->newCloseFrame(Frame::CLOSE_NORMAL); + break; + case Frame::OP_PING: + case Frame::OP_PONG: + break; + default: + return $this->newCloseFrame(Frame::CLOSE_PROTOCOL); + break; + } + + return $frame; + } + + if (Frame::OP_CONTINUE === $frame->getOpcode() && null === $previousFrame) { + return $this->newCloseFrame(Frame::CLOSE_PROTOCOL); + } + + if (null !== $previousFrame && Frame::OP_CONTINUE != $frame->getOpcode()) { + return $this->newCloseFrame(Frame::CLOSE_PROTOCOL); + } + + return $frame; + } + + /** + * Determine if a message is valid + * @param \Ratchet\RFC6455\Messaging\Protocol\MessageInterface + * @return bool|int true if valid - false if incomplete - int of recommended close code + */ + public function checkMessage(MessageInterface $message) { + if (!$message->isBinary()) { + if (!$this->validator->checkEncoding($message->getPayload(), 'UTF-8')) { + return Frame::CLOSE_BAD_PAYLOAD; + } + } + + return true; } /** @@ -99,4 +233,12 @@ class MessageStreamer { public function newFrame($payload = null, $final = null, $opcode = null) { return new Frame($payload, $final, $opcode, $this->exceptionFactory); } + + public function newCloseFrame($code) { + return $this->newFrame(pack('n', $code), true, Frame::OP_CLOSE); + } + + public function isValidCloseCode($val) { + return ($val >= 3000 && $val <= 4999) || in_array($val, $this->validCloseCodes); + } } \ No newline at end of file diff --git a/src/Messaging/Validation/MessageValidator.php b/src/Messaging/Validation/MessageValidator.php deleted file mode 100644 index de51a9c..0000000 --- a/src/Messaging/Validation/MessageValidator.php +++ /dev/null @@ -1,110 +0,0 @@ -validator = $validator; - $this->checkForMask = $checkForMask; - } - - /** - * Determine if a message is valid - * @param \Ratchet\RFC6455\Messaging\Protocol\MessageInterface - * @return bool|int true if valid - false if incomplete - int of recommended close code - */ - public function checkMessage(MessageInterface $message) { - $frame = $message[0]; - - if (!$message->isBinary()) { - $parsed = $message->getPayload(); - if (!$this->validator->checkEncoding($parsed, 'UTF-8')) { - return $frame::CLOSE_BAD_PAYLOAD; - } - } - - return true; - } - - /** - * @param FrameInterface $frame - * @param FrameInterface $previousFrame - * @return int Return 0 if everything is good, an integer close code if not - */ - public function validateFrame(FrameInterface $frame, FrameInterface $previousFrame = null) { - if (false !== $frame->getRsv1() || - false !== $frame->getRsv2() || - false !== $frame->getRsv3() - ) { - return Frame::CLOSE_PROTOCOL; - } - - // Should be checking all frames - if ($this->checkForMask && !$frame->isMasked()) { - return Frame::CLOSE_PROTOCOL; - } - - $opcode = $frame->getOpcode(); - - if ($opcode > 2) { - if ($frame->getPayloadLength() > 125 || !$frame->isFinal()) { - return Frame::CLOSE_PROTOCOL; - } - - switch ($opcode) { - case Frame::OP_CLOSE: - $closeCode = 0; - - $bin = $frame->getPayload(); - - if (empty($bin)) { - return Frame::CLOSE_NORMAL; - } - - if (strlen($bin) == 1) { - return Frame::CLOSE_PROTOCOL; - } - - if (strlen($bin) >= 2) { - list($closeCode) = array_merge(unpack('n*', substr($bin, 0, 2))); - } - - if (!$frame->isValidCloseCode($closeCode)) { - return Frame::CLOSE_PROTOCOL; - } - - if (!$this->validator->checkEncoding(substr($bin, 2), 'UTF-8')) { - return Frame::CLOSE_BAD_PAYLOAD; - } - - return Frame::CLOSE_NORMAL; - break; - case Frame::OP_PING: - case Frame::OP_PONG: - break; - default: - return Frame::CLOSE_PROTOCOL; - break; - } - - return 0; - } - - if (Frame::OP_CONTINUE === $frame->getOpcode() && null === $previousFrame) { - return Frame::CLOSE_PROTOCOL; - } - - if (null !== $previousFrame && Frame::OP_CONTINUE != $frame->getOpcode()) { - return Frame::CLOSE_PROTOCOL; - } - - return 0; - } -} diff --git a/tests/ab/startServer.php b/tests/ab/startServer.php index 3cdde1e..b316323 100644 --- a/tests/ab/startServer.php +++ b/tests/ab/startServer.php @@ -1,65 +1,10 @@ _conn = $connectionContext; - } - - public function setFrame(\Ratchet\RFC6455\Messaging\Protocol\FrameInterface $frame = null) { - $this->_frame = $frame; - - return $frame; - } - - public function getFrame() { - return $this->_frame; - } - - public function setMessage(\Ratchet\RFC6455\Messaging\Protocol\MessageInterface $message = null) { - $this->_message = $message; - - return $message; - } - - public function getMessage() { - return $this->_message; - } - - public function onMessage(\Ratchet\RFC6455\Messaging\Protocol\MessageInterface $msg) { - $this->_conn->write($msg->getContents()); - } - - public function onPing(\Ratchet\RFC6455\Messaging\Protocol\FrameInterface $frame) { - $pong = new Frame($frame->getPayload(), true, Frame::OP_PONG); - $this->_conn->write($pong->getContents()); - } - - public function onPong(\Ratchet\RFC6455\Messaging\Protocol\FrameInterface $msg) { - // TODO: Implement onPong() method. - } - - public function onClose($code = 1000) { - $frame = new Frame( - pack('n', $code), - true, - Frame::OP_CLOSE - ); - - $this->_conn->end($frame->getContents()); - } -} - $loop = \React\EventLoop\Factory::create(); $socket = new \React\Socket\Server($loop); $server = new \React\Http\Server($socket); @@ -69,9 +14,6 @@ $negotiator = new \Ratchet\RFC6455\Handshake\Negotiator($encodingValidator); $ms = new \Ratchet\RFC6455\Messaging\Streaming\MessageStreamer($encodingValidator); $server->on('request', function (\React\Http\Request $request, \React\Http\Response $response) use ($negotiator, $ms) { - $conn = new ConnectionContext($response); - - // make the React Request a Psr7 request (not perfect) $psrRequest = new \GuzzleHttp\Psr7\Request($request->getMethod(), $request->getPath(), $request->getHeaders()); $negotiatorResponse = $negotiator->handshake($psrRequest); @@ -89,8 +31,20 @@ $server->on('request', function (\React\Http\Request $request, \React\Http\Respo return; } - $request->on('data', function ($data) use ($ms, $conn) { - $ms->onData($data, $conn); + $msg = null; + $request->on('data', function($data) use ($ms, $response, &$msg) { + $msg = $ms->onData($data, $response, $msg, function(MessageInterface $msg, \React\Http\Response $conn) { + $conn->write($msg->getContents()); + }, function(FrameInterface $frame, \React\Http\Response $conn) use ($ms) { + switch ($frame->getOpCode()) { + case Frame::OP_CLOSE: + $conn->end($frame->getContents()); + break; + case Frame::OP_PING: + $conn->write($ms->newFrame($frame->getPayload(), true, Frame::OP_PONG)->getContents()); + break; + } + }); }); }); $socket->listen(9001, '0.0.0.0');