diff --git a/src/Handshake/Negotiator.php b/src/Handshake/Negotiator.php index 3c9c7d0..80075cd 100644 --- a/src/Handshake/Negotiator.php +++ b/src/Handshake/Negotiator.php @@ -99,9 +99,9 @@ class Negotiator implements NegotiatorInterface { return new Response(101, array_merge($headers, [ 'Upgrade' => 'websocket' - , 'Connection' => 'Upgrade' - , 'Sec-WebSocket-Accept' => $this->sign((string)$request->getHeader('Sec-WebSocket-Key')[0]) - , 'X-Powered-By' => 'Ratchet' + , 'Connection' => 'Upgrade' + , 'Sec-WebSocket-Accept' => $this->sign((string)$request->getHeader('Sec-WebSocket-Key')[0]) + , 'X-Powered-By' => 'Ratchet' ])); } diff --git a/src/Messaging/Protocol/CloseFrameChecker.php b/src/Messaging/Protocol/CloseFrameChecker.php new file mode 100644 index 0000000..7556b97 --- /dev/null +++ b/src/Messaging/Protocol/CloseFrameChecker.php @@ -0,0 +1,24 @@ +validCloseCodes = [ + Frame::CLOSE_NORMAL, + Frame::CLOSE_GOING_AWAY, + Frame::CLOSE_PROTOCOL, + Frame::CLOSE_BAD_DATA, + Frame::CLOSE_BAD_PAYLOAD, + Frame::CLOSE_POLICY, + Frame::CLOSE_TOO_BIG, + Frame::CLOSE_MAND_EXT, + Frame::CLOSE_SRV_ERR, + ]; + } + + public function __invoke($val) { + return ($val >= 3000 && $val <= 4999) || in_array($val, $this->validCloseCodes); + } +} diff --git a/src/Messaging/Protocol/Message.php b/src/Messaging/Protocol/Message.php index 0a95f19..1b1ed17 100644 --- a/src/Messaging/Protocol/Message.php +++ b/src/Messaging/Protocol/Message.php @@ -22,22 +22,6 @@ class Message implements \IteratorAggregate, MessageInterface { return count($this->_frames); } - public function offsetExists($index) { - return $this->_frames->offsetExists($index); - } - - public function offsetGet($index) { - return $this->_frames->offsetGet($index); - } - - public function offsetSet($index, $newval) { - throw new \DomainException('Frame access in messages is read-only'); - } - - public function offsetUnset($index) { - unset($this->_frames[$index]); - } - /** * {@inheritdoc} */ diff --git a/src/Messaging/Protocol/MessageInterface.php b/src/Messaging/Protocol/MessageInterface.php index 2913d82..f153686 100644 --- a/src/Messaging/Protocol/MessageInterface.php +++ b/src/Messaging/Protocol/MessageInterface.php @@ -1,7 +1,7 @@ validator = $encodingValidator; + function __construct( + ValidatorInterface $encodingValidator, + CloseFrameChecker $frameChecker, + callable $onMessage, + callable $onControl = null, + $expectMask = true + ) { + $this->validator = $encodingValidator; + $this->closeFrameChecker = $frameChecker; $this->checkForMask = (bool)$expectMask; $exception = new \UnderflowException; @@ -36,104 +64,67 @@ class MessageStreamer { return $exception; }; - $this->noop = function() {}; - - $this->validCloseCodes = [ - Frame::CLOSE_NORMAL, - Frame::CLOSE_GOING_AWAY, - Frame::CLOSE_PROTOCOL, - Frame::CLOSE_BAD_DATA, - Frame::CLOSE_BAD_PAYLOAD, - Frame::CLOSE_POLICY, - Frame::CLOSE_TOO_BIG, - Frame::CLOSE_MAND_EXT, - Frame::CLOSE_SRV_ERR, - ]; + $this->onMessage = $onMessage; + $this->onControl = $onControl ?: function() {}; } /** - * @param $data - * @param mixed $context - * @param MessageInterface $message - * @param callable(MessageInterface) $onMessage - * @param callable(FrameInterface) $onControl - * @return MessageInterface + * @param string $data + * @return null */ - public function onData($data, MessageInterface $message = null, callable $onMessage, callable $onControl = null, $context = null) { - $overflow = ''; + public function onData($data) { + $this->messageBuffer ?: $this->messageBuffer = $this->newMessage(); + $this->frameBuffer ?: $this->frameBuffer = $this->newFrame(); - $onControl ?: $this->noop; - $message ?: $message = $this->newMessage(); + $this->frameBuffer->addBuffer($data); + if (!$this->frameBuffer->isCoalesced()) { + return; + } - $prevFrame = null; - $frameCount = count($message); + $onMessage = $this->onMessage; + $onControl = $this->onControl; - if ($frameCount > 0) { - $frame = $message[$frameCount - 1]; + $this->frameBuffer = $this->frameCheck($this->frameBuffer); - if ($frame->isCoalesced()) { - $prevFrame = $frame; - $frame = $this->newFrame(); - $message->addFrame($frame); - $frameCount++; - } elseif ($frameCount > 1) { - $prevFrame = $message[$frameCount - 2]; + $overflow = $this->frameBuffer->extractOverflow(); + $this->frameBuffer->unMaskPayload(); + + $opcode = $this->frameBuffer->getOpcode(); + + if ($opcode > 2) { + $onControl($this->frameBuffer); + + if (Frame::OP_CLOSE === $opcode) { + return; } } else { - $frame = $this->newFrame(); - $message->addFrame($frame); - $frameCount++; + $this->messageBuffer->addFrame($this->frameBuffer); } - $frame->addBuffer($data); - if ($frame->isCoalesced()) { - $frame = $this->frameCheck($frame, $prevFrame); + $this->frameBuffer = null; - $opcode = $frame->getOpcode(); - if ($opcode > 2) { - $onControl($frame, $context); - unset($message[$frameCount - 1]); - - $overflow = $frame->extractOverflow(); - - if (strlen($overflow) > 0) { - $message = $this->onData($overflow, $message, $onMessage, $onControl, $context); - } - - return $message; - } - - $overflow = $frame->extractOverflow(); - - $frame->unMaskPayload(); - } - - if ($message->isCoalesced()) { - $msgCheck = $this->checkMessage($message); + if ($this->messageBuffer->isCoalesced()) { + $msgCheck = $this->checkMessage($this->messageBuffer); if (true !== $msgCheck) { - $onControl($this->newCloseFrame($msgCheck), $context); - - return $this->newMessage(); + $onControl($this->newCloseFrame($msgCheck)); + } else { + $onMessage($this->messageBuffer); } - $onMessage($message, $context); - $message = $this->newMessage(); + $this->messageBuffer = null; } if (strlen($overflow) > 0) { - $this->onData($overflow, $message, $onMessage, $onControl, $context); + $this->onData($overflow); // PHP doesn't do tail recursion :( } - - return $message; } /** - * Check a frame and previous frame in a message; returns the frame that should be dealt with + * Check a frame to be added to the current message buffer * @param \Ratchet\RFC6455\Messaging\Protocol\FrameInterface|FrameInterface $frame - * @param \Ratchet\RFC6455\Messaging\Protocol\FrameInterface|FrameInterface $previousFrame * @return \Ratchet\RFC6455\Messaging\Protocol\FrameInterface|FrameInterface */ - public function frameCheck(FrameInterface $frame, FrameInterface $previousFrame = null) { + public function frameCheck(FrameInterface $frame) { if (false !== $frame->getRsv1() || false !== $frame->getRsv2() || false !== $frame->getRsv3() @@ -170,7 +161,8 @@ class MessageStreamer { list($closeCode) = array_merge(unpack('n*', substr($bin, 0, 2))); } - if (!$this->isValidCloseCode($closeCode)) { + $checker = $this->closeFrameChecker; + if (!$checker($closeCode)) { return $this->newCloseFrame(Frame::CLOSE_PROTOCOL); } @@ -191,11 +183,11 @@ class MessageStreamer { return $frame; } - if (Frame::OP_CONTINUE === $frame->getOpcode() && null === $previousFrame) { + if (Frame::OP_CONTINUE == $frame->getOpcode() && 0 == count($this->messageBuffer)) { return $this->newCloseFrame(Frame::CLOSE_PROTOCOL); } - if (null !== $previousFrame && Frame::OP_CONTINUE != $frame->getOpcode()) { + if (count($this->messageBuffer) > 0 && Frame::OP_CONTINUE != $frame->getOpcode()) { return $this->newCloseFrame(Frame::CLOSE_PROTOCOL); } @@ -237,8 +229,4 @@ class MessageStreamer { public function newCloseFrame($code) { return $this->newFrame(pack('n', $code), true, Frame::OP_CLOSE); } - - public function isValidCloseCode($val) { - return ($val >= 3000 && $val <= 4999) || in_array($val, $this->validCloseCodes); - } } \ No newline at end of file diff --git a/tests/ab/startServer.php b/tests/ab/startServer.php index 2fa6f8c..6be6e62 100644 --- a/tests/ab/startServer.php +++ b/tests/ab/startServer.php @@ -10,10 +10,10 @@ $socket = new \React\Socket\Server($loop); $server = new \React\Http\Server($socket); $encodingValidator = new \Ratchet\RFC6455\Encoding\Validator; +$closeFrameChecker = new \Ratchet\RFC6455\Messaging\Protocol\CloseFrameChecker; $negotiator = new \Ratchet\RFC6455\Handshake\Negotiator($encodingValidator); -$ms = new \Ratchet\RFC6455\Messaging\Streaming\MessageStreamer($encodingValidator); -$server->on('request', function (\React\Http\Request $request, \React\Http\Response $response) use ($negotiator, $ms) { +$server->on('request', function (\React\Http\Request $request, \React\Http\Response $response) use ($negotiator, $encodingValidator, $closeFrameChecker) { $psrRequest = new \GuzzleHttp\Psr7\Request($request->getMethod(), $request->getPath(), $request->getHeaders()); $negotiatorResponse = $negotiator->handshake($psrRequest); @@ -31,21 +31,21 @@ $server->on('request', function (\React\Http\Request $request, \React\Http\Respo return; } - $msg = null; - $request->on('data', function($data) use ($ms, $response, &$msg) { - $msg = $ms->onData($data, $msg, function(MessageInterface $msg, \React\Http\Response $conn) { - $conn->write($msg->getContents()); - }, function(FrameInterface $frame, \React\Http\Response $conn) use ($ms) { - switch ($frame->getOpCode()) { - case Frame::OP_CLOSE: - $conn->end($frame->getContents()); + $parser = new \Ratchet\RFC6455\Messaging\Streaming\MessageStreamer($encodingValidator, $closeFrameChecker, function(MessageInterface $message) use ($response) { + $response->write($message->getContents()); + }, function(FrameInterface $frame) use ($response, &$parser) { + switch ($frame->getOpCode()) { + case Frame::OP_CLOSE: + $response->end($frame->getContents()); break; - case Frame::OP_PING: - $conn->write($ms->newFrame($frame->getPayload(), true, Frame::OP_PONG)->getContents()); + case Frame::OP_PING: + $response->write($parser->newFrame($frame->getPayload(), true, Frame::OP_PONG)->getContents()); break; - } - }, $response); + } }); + + $request->on('data', [$parser, 'onData']); }); + $socket->listen(9001, '0.0.0.0'); $loop->run();