From 8944361dbe69c82da22009b267950b97c92f1650 Mon Sep 17 00:00:00 2001 From: Matt Bonneau Date: Tue, 10 Dec 2019 19:22:57 -0500 Subject: [PATCH 1/5] Update to latest react/socket and drop react/http for tests --- composer.json | 5 +-- tests/ab/clientRunner.php | 78 +++++++++++++++++++------------------ tests/ab/startServer.php | 81 ++++++++++++++++++++++----------------- 3 files changed, 89 insertions(+), 75 deletions(-) diff --git a/composer.json b/composer.json index 224066b..d9ff35d 100644 --- a/composer.json +++ b/composer.json @@ -25,8 +25,7 @@ "guzzlehttp/psr7": "^1.0" }, "require-dev": { - "react/http": "^0.4.1", - "react/socket-client": "^0.4.3", - "phpunit/phpunit": "4.8.*" + "phpunit/phpunit": "4.8.*", + "react/socket": "^1.3" } } diff --git a/tests/ab/clientRunner.php b/tests/ab/clientRunner.php index 0c5578a..274f82d 100644 --- a/tests/ab/clientRunner.php +++ b/tests/ab/clientRunner.php @@ -1,7 +1,14 @@ createCached('8.8.8.8', $loop); - -$factory = new \React\SocketClient\Connector($loop, $dnsResolver); +$connector = new Connector($loop); function echoStreamerFactory($conn) { - return new \Ratchet\RFC6455\Messaging\MessageBuffer( - new \Ratchet\RFC6455\Messaging\CloseFrameChecker, - function (\Ratchet\RFC6455\Messaging\MessageInterface $msg) use ($conn) { + return new MessageBuffer( + new CloseFrameChecker, + function (MessageInterface $msg) use ($conn) { /** @var Frame $frame */ foreach ($msg as $frame) { $frame->maskPayload(); } $conn->write($msg->getContents()); }, - function (\Ratchet\RFC6455\Messaging\FrameInterface $frame) use ($conn) { + function (FrameInterface $frame) use ($conn) { switch ($frame->getOpcode()) { case Frame::OP_PING: return $conn->write((new Frame($frame->getPayload(), true, Frame::OP_PONG))->maskPayload()->getContents()); @@ -42,22 +46,22 @@ function echoStreamerFactory($conn) } function getTestCases() { - global $factory; global $testServer; + global $connector; $deferred = new Deferred(); - $factory->create($testServer, 9001)->then(function (\React\Stream\Stream $stream) use ($deferred) { - $cn = new \Ratchet\RFC6455\Handshake\ClientNegotiator(); + $connector->connect($testServer . ':9001')->then(function (ConnectionInterface $connection) use ($deferred) { + $cn = new ClientNegotiator(); $cnRequest = $cn->generateRequest(new Uri('ws://127.0.0.1:9001/getCaseCount')); $rawResponse = ""; $response = null; - /** @var \Ratchet\RFC6455\Messaging\Streaming\MessageBuffer $ms */ + /** @var MessageBuffer $ms */ $ms = null; - $stream->on('data', function ($data) use ($stream, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { + $connection->on('data', function ($data) use ($connection, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { if ($response === null) { $rawResponse .= $data; $pos = strpos($rawResponse, "\r\n\r\n"); @@ -67,14 +71,14 @@ function getTestCases() { $response = \GuzzleHttp\Psr7\parse_response($rawResponse); if (!$cn->validateResponse($cnRequest, $response)) { - $stream->end(); + $connection->end(); $deferred->reject(); } else { - $ms = new \Ratchet\RFC6455\Messaging\MessageBuffer( - new \Ratchet\RFC6455\Messaging\CloseFrameChecker, - function (\Ratchet\RFC6455\Messaging\MessageInterface $msg) use ($deferred, $stream) { + $ms = new MessageBuffer( + new CloseFrameChecker, + function (MessageInterface $msg) use ($deferred, $connection) { $deferred->resolve($msg->getPayload()); - $stream->close(); + $connection->close(); }, null, false @@ -89,7 +93,7 @@ function getTestCases() { } }); - $stream->write(\GuzzleHttp\Psr7\str($cnRequest)); + $connection->write(\GuzzleHttp\Psr7\str($cnRequest)); }); return $deferred->promise(); @@ -97,15 +101,15 @@ function getTestCases() { function runTest($case) { - global $factory; + global $connector; global $testServer; $casePath = "/runCase?case={$case}&agent=" . AGENT; $deferred = new Deferred(); - $factory->create($testServer, 9001)->then(function (\React\Stream\Stream $stream) use ($deferred, $casePath, $case) { - $cn = new \Ratchet\RFC6455\Handshake\ClientNegotiator(); + $connector->connect($testServer . ':9001')->then(function (ConnectionInterface $connection) use ($deferred, $casePath, $case) { + $cn = new ClientNegotiator(); $cnRequest = $cn->generateRequest(new Uri('ws://127.0.0.1:9001' . $casePath)); $rawResponse = ""; @@ -113,7 +117,7 @@ function runTest($case) $ms = null; - $stream->on('data', function ($data) use ($stream, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { + $connection->on('data', function ($data) use ($connection, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { if ($response === null) { $rawResponse .= $data; $pos = strpos($rawResponse, "\r\n\r\n"); @@ -123,10 +127,10 @@ function runTest($case) $response = \GuzzleHttp\Psr7\parse_response($rawResponse); if (!$cn->validateResponse($cnRequest, $response)) { - $stream->end(); + $connection->end(); $deferred->reject(); } else { - $ms = echoStreamerFactory($stream); + $ms = echoStreamerFactory($connection); } } } @@ -137,34 +141,34 @@ function runTest($case) } }); - $stream->on('close', function () use ($deferred) { + $connection->on('close', function () use ($deferred) { $deferred->resolve(); }); - $stream->write(\GuzzleHttp\Psr7\str($cnRequest)); + $connection->write(\GuzzleHttp\Psr7\str($cnRequest)); }); return $deferred->promise(); } function createReport() { - global $factory; + global $connector; global $testServer; $deferred = new Deferred(); - $factory->create($testServer, 9001)->then(function (\React\Stream\Stream $stream) use ($deferred) { + $connector->connect($testServer . ':9001')->then(function (ConnectionInterface $connection) use ($deferred) { $reportPath = "/updateReports?agent=" . AGENT . "&shutdownOnComplete=true"; - $cn = new \Ratchet\RFC6455\Handshake\ClientNegotiator(); + $cn = new ClientNegotiator(); $cnRequest = $cn->generateRequest(new Uri('ws://127.0.0.1:9001' . $reportPath)); $rawResponse = ""; $response = null; - /** @var \Ratchet\RFC6455\Messaging\MessageBuffer $ms */ + /** @var MessageBuffer $ms */ $ms = null; - $stream->on('data', function ($data) use ($stream, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { + $connection->on('data', function ($data) use ($connection, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { if ($response === null) { $rawResponse .= $data; $pos = strpos($rawResponse, "\r\n\r\n"); @@ -174,12 +178,12 @@ function createReport() { $response = \GuzzleHttp\Psr7\parse_response($rawResponse); if (!$cn->validateResponse($cnRequest, $response)) { - $stream->end(); + $connection->end(); $deferred->reject(); } else { - $ms = new \Ratchet\RFC6455\Messaging\MessageBuffer( - new \Ratchet\RFC6455\Messaging\CloseFrameChecker, - function (\Ratchet\RFC6455\Messaging\MessageInterface $msg) use ($deferred, $stream) { + $ms = new MessageBuffer( + new CloseFrameChecker, + function (MessageInterface $msg) use ($deferred, $stream) { $deferred->resolve($msg->getPayload()); $stream->close(); }, @@ -196,7 +200,7 @@ function createReport() { } }); - $stream->write(\GuzzleHttp\Psr7\str($cnRequest)); + $connection->write(\GuzzleHttp\Psr7\str($cnRequest)); }); return $deferred->promise(); diff --git a/tests/ab/startServer.php b/tests/ab/startServer.php index b256ec2..4baf884 100644 --- a/tests/ab/startServer.php +++ b/tests/ab/startServer.php @@ -7,49 +7,60 @@ require_once __DIR__ . "/../bootstrap.php"; $loop = \React\EventLoop\Factory::create(); -$socket = new \React\Socket\Server($loop); -$server = new \React\Http\Server($socket); +$socket = new \React\Socket\Server('127.0.0.1:9001', $loop); $closeFrameChecker = new \Ratchet\RFC6455\Messaging\CloseFrameChecker; $negotiator = new \Ratchet\RFC6455\Handshake\ServerNegotiator(new \Ratchet\RFC6455\Handshake\RequestVerifier); $uException = new \UnderflowException; -$server->on('request', function (\React\Http\Request $request, \React\Http\Response $response) use ($negotiator, $closeFrameChecker, $uException) { - $psrRequest = new \GuzzleHttp\Psr7\Request($request->getMethod(), $request->getPath(), $request->getHeaders()); - - $negotiatorResponse = $negotiator->handshake($psrRequest); - - $response->writeHead( - $negotiatorResponse->getStatusCode(), - array_merge( - $negotiatorResponse->getHeaders(), - ["Content-Length" => "0"] - ) - ); - - if ($negotiatorResponse->getStatusCode() !== 101) { - $response->end(); - return; - } - - $parser = new \Ratchet\RFC6455\Messaging\MessageBuffer($closeFrameChecker, function(MessageInterface $message) use ($response) { - $response->write($message->getContents()); - }, function(FrameInterface $frame) use ($response, &$parser) { - switch ($frame->getOpCode()) { - case Frame::OP_CLOSE: - $response->end($frame->getContents()); - break; - case Frame::OP_PING: - $response->write($parser->newFrame($frame->getPayload(), true, Frame::OP_PONG)->getContents()); - break; +$socket->on('connection', function (React\Socket\ConnectionInterface $connection) use ($negotiator, $closeFrameChecker, $uException) { + $headerComplete = false; + $buffer = ''; + $parser = null; + $connection->on('data', function ($data) use ($connection, &$parser, &$headerComplete, &$buffer, $negotiator, $closeFrameChecker, $uException) { + if ($headerComplete) { + $parser->onData($data); + return; } - }, true, function() use ($uException) { - return $uException; - }); - $request->on('data', [$parser, 'onData']); + $buffer .= $data; + $parts = explode("\r\n\r\n", $buffer); + if (count($parts) < 2) { + return; + } + $headerComplete = true; + $psrRequest = \GuzzleHttp\Psr7\parse_request($parts[0] . "\r\n\r\n"); + $negotiatorResponse = $negotiator->handshake($psrRequest); + + $negotiatorResponse = $negotiatorResponse->withAddedHeader("Content-Length", "0"); + + $connection->write(\GuzzleHttp\Psr7\str($negotiatorResponse)); + + if ($negotiatorResponse->getStatusCode() !== 101) { + $connection->end(); + return; + } + + $parser = new \Ratchet\RFC6455\Messaging\MessageBuffer($closeFrameChecker, + function (MessageInterface $message) use ($connection) { + $connection->write($message->getContents()); + }, function (FrameInterface $frame) use ($connection, &$parser) { + switch ($frame->getOpCode()) { + case Frame::OP_CLOSE: + $connection->end($frame->getContents()); + break; + case Frame::OP_PING: + $connection->write($parser->newFrame($frame->getPayload(), true, Frame::OP_PONG)->getContents()); + break; + } + }, true, function () use ($uException) { + return $uException; + }); + + array_shift($parts); + $parser->onData(implode("\r\n\r\n", $parts)); + }); }); -$socket->listen(9001, '0.0.0.0'); $loop->run(); From 8aee2208987e217933bf30e8a2a02279afb1c454 Mon Sep 17 00:00:00 2001 From: Matt Bonneau Date: Tue, 10 Dec 2019 19:23:45 -0500 Subject: [PATCH 2/5] Rework MessageBuffer to better handle large buffers filled with small frames --- src/Messaging/MessageBuffer.php | 56 +++++++++++++++++++++++++++------ 1 file changed, 47 insertions(+), 9 deletions(-) diff --git a/src/Messaging/MessageBuffer.php b/src/Messaging/MessageBuffer.php index 22f247c..a88d713 100644 --- a/src/Messaging/MessageBuffer.php +++ b/src/Messaging/MessageBuffer.php @@ -37,6 +37,11 @@ class MessageBuffer { */ private $checkForMask; + /** + * @var string + */ + private $leftovers; + function __construct( CloseFrameChecker $frameChecker, callable $onMessage, @@ -53,12 +58,51 @@ class MessageBuffer { $this->onMessage = $onMessage; $this->onControl = $onControl ?: function() {}; + + $this->leftovers = ''; } public function onData($data) { - while (strlen($data) > 0) { - $data = $this->processData($data); + $data = $this->leftovers . $data; + $dataLen = strlen($data); + $spyFrame = new Frame(); + + if ($dataLen < 2) { + $this->leftovers = $data; + return; } + $currentByte = 0; + $frameStart = 0; + $spyFrame->addBuffer($data[$currentByte]); + $currentByte++; + + while ($currentByte < $dataLen) { + $spyFrame->addBuffer($data[$currentByte]); + $currentByte ++; + try { + $payload_length = $spyFrame->getPayloadLength(); + $payload_start = $spyFrame->getPayloadStartingByte(); + } catch (\UnderflowException $e) { + if ($currentByte < $dataLen) { + continue; + } + break; + } + + $isCoalesced = $dataLen - $frameStart >= $payload_length + $payload_start; + + + if (!$isCoalesced) { + break; + } + $this->processData(substr($data, $frameStart, $payload_length + $payload_start)); + $spyFrame = new Frame(); + $currentByte = $frameStart + $payload_length + $payload_start; + $frameStart = $currentByte; + } + + $this->leftovers = substr($data, $frameStart); + } /** @@ -70,16 +114,12 @@ class MessageBuffer { $this->frameBuffer ?: $this->frameBuffer = $this->newFrame(); $this->frameBuffer->addBuffer($data); - if (!$this->frameBuffer->isCoalesced()) { - return ''; - } $onMessage = $this->onMessage; $onControl = $this->onControl; $this->frameBuffer = $this->frameCheck($this->frameBuffer); - $overflow = $this->frameBuffer->extractOverflow(); $this->frameBuffer->unMaskPayload(); $opcode = $this->frameBuffer->getOpcode(); @@ -108,8 +148,6 @@ class MessageBuffer { $onMessage($msgBuffer); } } - - return $overflow; } /** @@ -230,4 +268,4 @@ class MessageBuffer { public function newCloseFrame($code, $reason = '') { return $this->newFrame(pack('n', $code) . $reason, true, Frame::OP_CLOSE); } -} +} \ No newline at end of file From 58e79971e0abacc7a3d0ba09eaf0948edde0aa14 Mon Sep 17 00:00:00 2001 From: Matt Bonneau Date: Tue, 10 Dec 2019 19:31:01 -0500 Subject: [PATCH 3/5] New PHP versions for travis --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 09125d8..c65dedb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,12 +1,12 @@ language: php php: - - 5.4 - - 5.5 - 5.6 - 7.0 - 7.1 - 7.2 + - 7.3 + - 7.4 before_install: - export PATH=$HOME/.local/bin:$PATH From 11a21b762817c75e165c48fe59788399d24f272b Mon Sep 17 00:00:00 2001 From: Matt Bonneau Date: Tue, 10 Dec 2019 22:23:00 -0500 Subject: [PATCH 4/5] A little faster by not using Frame functions to test for frame sizes --- src/Messaging/MessageBuffer.php | 42 +++++++++++++++------------------ 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/src/Messaging/MessageBuffer.php b/src/Messaging/MessageBuffer.php index a88d713..300b330 100644 --- a/src/Messaging/MessageBuffer.php +++ b/src/Messaging/MessageBuffer.php @@ -65,44 +65,40 @@ class MessageBuffer { public function onData($data) { $data = $this->leftovers . $data; $dataLen = strlen($data); - $spyFrame = new Frame(); if ($dataLen < 2) { $this->leftovers = $data; return; } - $currentByte = 0; - $frameStart = 0; - $spyFrame->addBuffer($data[$currentByte]); - $currentByte++; - while ($currentByte < $dataLen) { - $spyFrame->addBuffer($data[$currentByte]); - $currentByte ++; - try { - $payload_length = $spyFrame->getPayloadLength(); - $payload_start = $spyFrame->getPayloadStartingByte(); - } catch (\UnderflowException $e) { - if ($currentByte < $dataLen) { - continue; - } + $frameStart = 0; + while ($frameStart + 1 <= $dataLen) { + $headerSize = 2; + $payload_length = unpack('C', $data[$frameStart + 1] & "\x7f")[1]; + $isMasked = ($data[$frameStart + 1] & "\x80") === "\x80"; + $headerSize += $isMasked ? 4 : 0; + if ($payload_length > 125 && ($dataLen - $frameStart < $headerSize + 125)) { + // no point of checking - this frame is going to be bigger than the buffer is right now break; } + if ($payload_length > 125) { + $payloadLenBytes = $payload_length === 126 ? 2 : 8; + $headerSize += $payloadLenBytes; + $bytesToUpack = substr($data, $frameStart + 2, $payloadLenBytes); + $payload_length = $payload_length === 126 + ? unpack('n', $bytesToUpack)[1] + : unpack('J', $bytesToUpack)[1]; + } - $isCoalesced = $dataLen - $frameStart >= $payload_length + $payload_start; - - + $isCoalesced = $dataLen - $frameStart >= $payload_length + $headerSize; if (!$isCoalesced) { break; } - $this->processData(substr($data, $frameStart, $payload_length + $payload_start)); - $spyFrame = new Frame(); - $currentByte = $frameStart + $payload_length + $payload_start; - $frameStart = $currentByte; + $this->processData(substr($data, $frameStart, $payload_length + $headerSize)); + $frameStart = $frameStart + $payload_length + $headerSize; } $this->leftovers = substr($data, $frameStart); - } /** From 9c1df6a8e141edb4bc4bb299cfa8c92a62c95270 Mon Sep 17 00:00:00 2001 From: Matt Bonneau Date: Tue, 10 Dec 2019 23:05:53 -0500 Subject: [PATCH 5/5] Should be framestart + 2 --- src/Messaging/MessageBuffer.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Messaging/MessageBuffer.php b/src/Messaging/MessageBuffer.php index 300b330..6b3b440 100644 --- a/src/Messaging/MessageBuffer.php +++ b/src/Messaging/MessageBuffer.php @@ -72,7 +72,7 @@ class MessageBuffer { } $frameStart = 0; - while ($frameStart + 1 <= $dataLen) { + while ($frameStart + 2 <= $dataLen) { $headerSize = 2; $payload_length = unpack('C', $data[$frameStart + 1] & "\x7f")[1]; $isMasked = ($data[$frameStart + 1] & "\x80") === "\x80";