diff --git a/.travis.yml b/.travis.yml index 09125d8..c65dedb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,12 +1,12 @@ language: php php: - - 5.4 - - 5.5 - 5.6 - 7.0 - 7.1 - 7.2 + - 7.3 + - 7.4 before_install: - export PATH=$HOME/.local/bin:$PATH diff --git a/composer.json b/composer.json index 224066b..d9ff35d 100644 --- a/composer.json +++ b/composer.json @@ -25,8 +25,7 @@ "guzzlehttp/psr7": "^1.0" }, "require-dev": { - "react/http": "^0.4.1", - "react/socket-client": "^0.4.3", - "phpunit/phpunit": "4.8.*" + "phpunit/phpunit": "4.8.*", + "react/socket": "^1.3" } } diff --git a/src/Messaging/MessageBuffer.php b/src/Messaging/MessageBuffer.php index 22f247c..6b3b440 100644 --- a/src/Messaging/MessageBuffer.php +++ b/src/Messaging/MessageBuffer.php @@ -37,6 +37,11 @@ class MessageBuffer { */ private $checkForMask; + /** + * @var string + */ + private $leftovers; + function __construct( CloseFrameChecker $frameChecker, callable $onMessage, @@ -53,12 +58,47 @@ class MessageBuffer { $this->onMessage = $onMessage; $this->onControl = $onControl ?: function() {}; + + $this->leftovers = ''; } public function onData($data) { - while (strlen($data) > 0) { - $data = $this->processData($data); + $data = $this->leftovers . $data; + $dataLen = strlen($data); + + if ($dataLen < 2) { + $this->leftovers = $data; + return; } + + $frameStart = 0; + while ($frameStart + 2 <= $dataLen) { + $headerSize = 2; + $payload_length = unpack('C', $data[$frameStart + 1] & "\x7f")[1]; + $isMasked = ($data[$frameStart + 1] & "\x80") === "\x80"; + $headerSize += $isMasked ? 4 : 0; + if ($payload_length > 125 && ($dataLen - $frameStart < $headerSize + 125)) { + // no point of checking - this frame is going to be bigger than the buffer is right now + break; + } + if ($payload_length > 125) { + $payloadLenBytes = $payload_length === 126 ? 2 : 8; + $headerSize += $payloadLenBytes; + $bytesToUpack = substr($data, $frameStart + 2, $payloadLenBytes); + $payload_length = $payload_length === 126 + ? unpack('n', $bytesToUpack)[1] + : unpack('J', $bytesToUpack)[1]; + } + + $isCoalesced = $dataLen - $frameStart >= $payload_length + $headerSize; + if (!$isCoalesced) { + break; + } + $this->processData(substr($data, $frameStart, $payload_length + $headerSize)); + $frameStart = $frameStart + $payload_length + $headerSize; + } + + $this->leftovers = substr($data, $frameStart); } /** @@ -70,16 +110,12 @@ class MessageBuffer { $this->frameBuffer ?: $this->frameBuffer = $this->newFrame(); $this->frameBuffer->addBuffer($data); - if (!$this->frameBuffer->isCoalesced()) { - return ''; - } $onMessage = $this->onMessage; $onControl = $this->onControl; $this->frameBuffer = $this->frameCheck($this->frameBuffer); - $overflow = $this->frameBuffer->extractOverflow(); $this->frameBuffer->unMaskPayload(); $opcode = $this->frameBuffer->getOpcode(); @@ -108,8 +144,6 @@ class MessageBuffer { $onMessage($msgBuffer); } } - - return $overflow; } /** @@ -230,4 +264,4 @@ class MessageBuffer { public function newCloseFrame($code, $reason = '') { return $this->newFrame(pack('n', $code) . $reason, true, Frame::OP_CLOSE); } -} +} \ No newline at end of file diff --git a/tests/ab/clientRunner.php b/tests/ab/clientRunner.php index 0c5578a..274f82d 100644 --- a/tests/ab/clientRunner.php +++ b/tests/ab/clientRunner.php @@ -1,7 +1,14 @@ createCached('8.8.8.8', $loop); - -$factory = new \React\SocketClient\Connector($loop, $dnsResolver); +$connector = new Connector($loop); function echoStreamerFactory($conn) { - return new \Ratchet\RFC6455\Messaging\MessageBuffer( - new \Ratchet\RFC6455\Messaging\CloseFrameChecker, - function (\Ratchet\RFC6455\Messaging\MessageInterface $msg) use ($conn) { + return new MessageBuffer( + new CloseFrameChecker, + function (MessageInterface $msg) use ($conn) { /** @var Frame $frame */ foreach ($msg as $frame) { $frame->maskPayload(); } $conn->write($msg->getContents()); }, - function (\Ratchet\RFC6455\Messaging\FrameInterface $frame) use ($conn) { + function (FrameInterface $frame) use ($conn) { switch ($frame->getOpcode()) { case Frame::OP_PING: return $conn->write((new Frame($frame->getPayload(), true, Frame::OP_PONG))->maskPayload()->getContents()); @@ -42,22 +46,22 @@ function echoStreamerFactory($conn) } function getTestCases() { - global $factory; global $testServer; + global $connector; $deferred = new Deferred(); - $factory->create($testServer, 9001)->then(function (\React\Stream\Stream $stream) use ($deferred) { - $cn = new \Ratchet\RFC6455\Handshake\ClientNegotiator(); + $connector->connect($testServer . ':9001')->then(function (ConnectionInterface $connection) use ($deferred) { + $cn = new ClientNegotiator(); $cnRequest = $cn->generateRequest(new Uri('ws://127.0.0.1:9001/getCaseCount')); $rawResponse = ""; $response = null; - /** @var \Ratchet\RFC6455\Messaging\Streaming\MessageBuffer $ms */ + /** @var MessageBuffer $ms */ $ms = null; - $stream->on('data', function ($data) use ($stream, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { + $connection->on('data', function ($data) use ($connection, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { if ($response === null) { $rawResponse .= $data; $pos = strpos($rawResponse, "\r\n\r\n"); @@ -67,14 +71,14 @@ function getTestCases() { $response = \GuzzleHttp\Psr7\parse_response($rawResponse); if (!$cn->validateResponse($cnRequest, $response)) { - $stream->end(); + $connection->end(); $deferred->reject(); } else { - $ms = new \Ratchet\RFC6455\Messaging\MessageBuffer( - new \Ratchet\RFC6455\Messaging\CloseFrameChecker, - function (\Ratchet\RFC6455\Messaging\MessageInterface $msg) use ($deferred, $stream) { + $ms = new MessageBuffer( + new CloseFrameChecker, + function (MessageInterface $msg) use ($deferred, $connection) { $deferred->resolve($msg->getPayload()); - $stream->close(); + $connection->close(); }, null, false @@ -89,7 +93,7 @@ function getTestCases() { } }); - $stream->write(\GuzzleHttp\Psr7\str($cnRequest)); + $connection->write(\GuzzleHttp\Psr7\str($cnRequest)); }); return $deferred->promise(); @@ -97,15 +101,15 @@ function getTestCases() { function runTest($case) { - global $factory; + global $connector; global $testServer; $casePath = "/runCase?case={$case}&agent=" . AGENT; $deferred = new Deferred(); - $factory->create($testServer, 9001)->then(function (\React\Stream\Stream $stream) use ($deferred, $casePath, $case) { - $cn = new \Ratchet\RFC6455\Handshake\ClientNegotiator(); + $connector->connect($testServer . ':9001')->then(function (ConnectionInterface $connection) use ($deferred, $casePath, $case) { + $cn = new ClientNegotiator(); $cnRequest = $cn->generateRequest(new Uri('ws://127.0.0.1:9001' . $casePath)); $rawResponse = ""; @@ -113,7 +117,7 @@ function runTest($case) $ms = null; - $stream->on('data', function ($data) use ($stream, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { + $connection->on('data', function ($data) use ($connection, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { if ($response === null) { $rawResponse .= $data; $pos = strpos($rawResponse, "\r\n\r\n"); @@ -123,10 +127,10 @@ function runTest($case) $response = \GuzzleHttp\Psr7\parse_response($rawResponse); if (!$cn->validateResponse($cnRequest, $response)) { - $stream->end(); + $connection->end(); $deferred->reject(); } else { - $ms = echoStreamerFactory($stream); + $ms = echoStreamerFactory($connection); } } } @@ -137,34 +141,34 @@ function runTest($case) } }); - $stream->on('close', function () use ($deferred) { + $connection->on('close', function () use ($deferred) { $deferred->resolve(); }); - $stream->write(\GuzzleHttp\Psr7\str($cnRequest)); + $connection->write(\GuzzleHttp\Psr7\str($cnRequest)); }); return $deferred->promise(); } function createReport() { - global $factory; + global $connector; global $testServer; $deferred = new Deferred(); - $factory->create($testServer, 9001)->then(function (\React\Stream\Stream $stream) use ($deferred) { + $connector->connect($testServer . ':9001')->then(function (ConnectionInterface $connection) use ($deferred) { $reportPath = "/updateReports?agent=" . AGENT . "&shutdownOnComplete=true"; - $cn = new \Ratchet\RFC6455\Handshake\ClientNegotiator(); + $cn = new ClientNegotiator(); $cnRequest = $cn->generateRequest(new Uri('ws://127.0.0.1:9001' . $reportPath)); $rawResponse = ""; $response = null; - /** @var \Ratchet\RFC6455\Messaging\MessageBuffer $ms */ + /** @var MessageBuffer $ms */ $ms = null; - $stream->on('data', function ($data) use ($stream, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { + $connection->on('data', function ($data) use ($connection, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { if ($response === null) { $rawResponse .= $data; $pos = strpos($rawResponse, "\r\n\r\n"); @@ -174,12 +178,12 @@ function createReport() { $response = \GuzzleHttp\Psr7\parse_response($rawResponse); if (!$cn->validateResponse($cnRequest, $response)) { - $stream->end(); + $connection->end(); $deferred->reject(); } else { - $ms = new \Ratchet\RFC6455\Messaging\MessageBuffer( - new \Ratchet\RFC6455\Messaging\CloseFrameChecker, - function (\Ratchet\RFC6455\Messaging\MessageInterface $msg) use ($deferred, $stream) { + $ms = new MessageBuffer( + new CloseFrameChecker, + function (MessageInterface $msg) use ($deferred, $stream) { $deferred->resolve($msg->getPayload()); $stream->close(); }, @@ -196,7 +200,7 @@ function createReport() { } }); - $stream->write(\GuzzleHttp\Psr7\str($cnRequest)); + $connection->write(\GuzzleHttp\Psr7\str($cnRequest)); }); return $deferred->promise(); diff --git a/tests/ab/startServer.php b/tests/ab/startServer.php index b256ec2..4baf884 100644 --- a/tests/ab/startServer.php +++ b/tests/ab/startServer.php @@ -7,49 +7,60 @@ require_once __DIR__ . "/../bootstrap.php"; $loop = \React\EventLoop\Factory::create(); -$socket = new \React\Socket\Server($loop); -$server = new \React\Http\Server($socket); +$socket = new \React\Socket\Server('127.0.0.1:9001', $loop); $closeFrameChecker = new \Ratchet\RFC6455\Messaging\CloseFrameChecker; $negotiator = new \Ratchet\RFC6455\Handshake\ServerNegotiator(new \Ratchet\RFC6455\Handshake\RequestVerifier); $uException = new \UnderflowException; -$server->on('request', function (\React\Http\Request $request, \React\Http\Response $response) use ($negotiator, $closeFrameChecker, $uException) { - $psrRequest = new \GuzzleHttp\Psr7\Request($request->getMethod(), $request->getPath(), $request->getHeaders()); - - $negotiatorResponse = $negotiator->handshake($psrRequest); - - $response->writeHead( - $negotiatorResponse->getStatusCode(), - array_merge( - $negotiatorResponse->getHeaders(), - ["Content-Length" => "0"] - ) - ); - - if ($negotiatorResponse->getStatusCode() !== 101) { - $response->end(); - return; - } - - $parser = new \Ratchet\RFC6455\Messaging\MessageBuffer($closeFrameChecker, function(MessageInterface $message) use ($response) { - $response->write($message->getContents()); - }, function(FrameInterface $frame) use ($response, &$parser) { - switch ($frame->getOpCode()) { - case Frame::OP_CLOSE: - $response->end($frame->getContents()); - break; - case Frame::OP_PING: - $response->write($parser->newFrame($frame->getPayload(), true, Frame::OP_PONG)->getContents()); - break; +$socket->on('connection', function (React\Socket\ConnectionInterface $connection) use ($negotiator, $closeFrameChecker, $uException) { + $headerComplete = false; + $buffer = ''; + $parser = null; + $connection->on('data', function ($data) use ($connection, &$parser, &$headerComplete, &$buffer, $negotiator, $closeFrameChecker, $uException) { + if ($headerComplete) { + $parser->onData($data); + return; } - }, true, function() use ($uException) { - return $uException; - }); - $request->on('data', [$parser, 'onData']); + $buffer .= $data; + $parts = explode("\r\n\r\n", $buffer); + if (count($parts) < 2) { + return; + } + $headerComplete = true; + $psrRequest = \GuzzleHttp\Psr7\parse_request($parts[0] . "\r\n\r\n"); + $negotiatorResponse = $negotiator->handshake($psrRequest); + + $negotiatorResponse = $negotiatorResponse->withAddedHeader("Content-Length", "0"); + + $connection->write(\GuzzleHttp\Psr7\str($negotiatorResponse)); + + if ($negotiatorResponse->getStatusCode() !== 101) { + $connection->end(); + return; + } + + $parser = new \Ratchet\RFC6455\Messaging\MessageBuffer($closeFrameChecker, + function (MessageInterface $message) use ($connection) { + $connection->write($message->getContents()); + }, function (FrameInterface $frame) use ($connection, &$parser) { + switch ($frame->getOpCode()) { + case Frame::OP_CLOSE: + $connection->end($frame->getContents()); + break; + case Frame::OP_PING: + $connection->write($parser->newFrame($frame->getPayload(), true, Frame::OP_PONG)->getContents()); + break; + } + }, true, function () use ($uException) { + return $uException; + }); + + array_shift($parts); + $parser->onData(implode("\r\n\r\n", $parts)); + }); }); -$socket->listen(9001, '0.0.0.0'); $loop->run();