From 8944361dbe69c82da22009b267950b97c92f1650 Mon Sep 17 00:00:00 2001 From: Matt Bonneau Date: Tue, 10 Dec 2019 19:22:57 -0500 Subject: [PATCH] Update to latest react/socket and drop react/http for tests --- composer.json | 5 +-- tests/ab/clientRunner.php | 78 +++++++++++++++++++------------------ tests/ab/startServer.php | 81 ++++++++++++++++++++++----------------- 3 files changed, 89 insertions(+), 75 deletions(-) diff --git a/composer.json b/composer.json index 224066b..d9ff35d 100644 --- a/composer.json +++ b/composer.json @@ -25,8 +25,7 @@ "guzzlehttp/psr7": "^1.0" }, "require-dev": { - "react/http": "^0.4.1", - "react/socket-client": "^0.4.3", - "phpunit/phpunit": "4.8.*" + "phpunit/phpunit": "4.8.*", + "react/socket": "^1.3" } } diff --git a/tests/ab/clientRunner.php b/tests/ab/clientRunner.php index 0c5578a..274f82d 100644 --- a/tests/ab/clientRunner.php +++ b/tests/ab/clientRunner.php @@ -1,7 +1,14 @@ createCached('8.8.8.8', $loop); - -$factory = new \React\SocketClient\Connector($loop, $dnsResolver); +$connector = new Connector($loop); function echoStreamerFactory($conn) { - return new \Ratchet\RFC6455\Messaging\MessageBuffer( - new \Ratchet\RFC6455\Messaging\CloseFrameChecker, - function (\Ratchet\RFC6455\Messaging\MessageInterface $msg) use ($conn) { + return new MessageBuffer( + new CloseFrameChecker, + function (MessageInterface $msg) use ($conn) { /** @var Frame $frame */ foreach ($msg as $frame) { $frame->maskPayload(); } $conn->write($msg->getContents()); }, - function (\Ratchet\RFC6455\Messaging\FrameInterface $frame) use ($conn) { + function (FrameInterface $frame) use ($conn) { switch ($frame->getOpcode()) { case Frame::OP_PING: return $conn->write((new Frame($frame->getPayload(), true, Frame::OP_PONG))->maskPayload()->getContents()); @@ -42,22 +46,22 @@ function echoStreamerFactory($conn) } function getTestCases() { - global $factory; global $testServer; + global $connector; $deferred = new Deferred(); - $factory->create($testServer, 9001)->then(function (\React\Stream\Stream $stream) use ($deferred) { - $cn = new \Ratchet\RFC6455\Handshake\ClientNegotiator(); + $connector->connect($testServer . ':9001')->then(function (ConnectionInterface $connection) use ($deferred) { + $cn = new ClientNegotiator(); $cnRequest = $cn->generateRequest(new Uri('ws://127.0.0.1:9001/getCaseCount')); $rawResponse = ""; $response = null; - /** @var \Ratchet\RFC6455\Messaging\Streaming\MessageBuffer $ms */ + /** @var MessageBuffer $ms */ $ms = null; - $stream->on('data', function ($data) use ($stream, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { + $connection->on('data', function ($data) use ($connection, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { if ($response === null) { $rawResponse .= $data; $pos = strpos($rawResponse, "\r\n\r\n"); @@ -67,14 +71,14 @@ function getTestCases() { $response = \GuzzleHttp\Psr7\parse_response($rawResponse); if (!$cn->validateResponse($cnRequest, $response)) { - $stream->end(); + $connection->end(); $deferred->reject(); } else { - $ms = new \Ratchet\RFC6455\Messaging\MessageBuffer( - new \Ratchet\RFC6455\Messaging\CloseFrameChecker, - function (\Ratchet\RFC6455\Messaging\MessageInterface $msg) use ($deferred, $stream) { + $ms = new MessageBuffer( + new CloseFrameChecker, + function (MessageInterface $msg) use ($deferred, $connection) { $deferred->resolve($msg->getPayload()); - $stream->close(); + $connection->close(); }, null, false @@ -89,7 +93,7 @@ function getTestCases() { } }); - $stream->write(\GuzzleHttp\Psr7\str($cnRequest)); + $connection->write(\GuzzleHttp\Psr7\str($cnRequest)); }); return $deferred->promise(); @@ -97,15 +101,15 @@ function getTestCases() { function runTest($case) { - global $factory; + global $connector; global $testServer; $casePath = "/runCase?case={$case}&agent=" . AGENT; $deferred = new Deferred(); - $factory->create($testServer, 9001)->then(function (\React\Stream\Stream $stream) use ($deferred, $casePath, $case) { - $cn = new \Ratchet\RFC6455\Handshake\ClientNegotiator(); + $connector->connect($testServer . ':9001')->then(function (ConnectionInterface $connection) use ($deferred, $casePath, $case) { + $cn = new ClientNegotiator(); $cnRequest = $cn->generateRequest(new Uri('ws://127.0.0.1:9001' . $casePath)); $rawResponse = ""; @@ -113,7 +117,7 @@ function runTest($case) $ms = null; - $stream->on('data', function ($data) use ($stream, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { + $connection->on('data', function ($data) use ($connection, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { if ($response === null) { $rawResponse .= $data; $pos = strpos($rawResponse, "\r\n\r\n"); @@ -123,10 +127,10 @@ function runTest($case) $response = \GuzzleHttp\Psr7\parse_response($rawResponse); if (!$cn->validateResponse($cnRequest, $response)) { - $stream->end(); + $connection->end(); $deferred->reject(); } else { - $ms = echoStreamerFactory($stream); + $ms = echoStreamerFactory($connection); } } } @@ -137,34 +141,34 @@ function runTest($case) } }); - $stream->on('close', function () use ($deferred) { + $connection->on('close', function () use ($deferred) { $deferred->resolve(); }); - $stream->write(\GuzzleHttp\Psr7\str($cnRequest)); + $connection->write(\GuzzleHttp\Psr7\str($cnRequest)); }); return $deferred->promise(); } function createReport() { - global $factory; + global $connector; global $testServer; $deferred = new Deferred(); - $factory->create($testServer, 9001)->then(function (\React\Stream\Stream $stream) use ($deferred) { + $connector->connect($testServer . ':9001')->then(function (ConnectionInterface $connection) use ($deferred) { $reportPath = "/updateReports?agent=" . AGENT . "&shutdownOnComplete=true"; - $cn = new \Ratchet\RFC6455\Handshake\ClientNegotiator(); + $cn = new ClientNegotiator(); $cnRequest = $cn->generateRequest(new Uri('ws://127.0.0.1:9001' . $reportPath)); $rawResponse = ""; $response = null; - /** @var \Ratchet\RFC6455\Messaging\MessageBuffer $ms */ + /** @var MessageBuffer $ms */ $ms = null; - $stream->on('data', function ($data) use ($stream, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { + $connection->on('data', function ($data) use ($connection, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { if ($response === null) { $rawResponse .= $data; $pos = strpos($rawResponse, "\r\n\r\n"); @@ -174,12 +178,12 @@ function createReport() { $response = \GuzzleHttp\Psr7\parse_response($rawResponse); if (!$cn->validateResponse($cnRequest, $response)) { - $stream->end(); + $connection->end(); $deferred->reject(); } else { - $ms = new \Ratchet\RFC6455\Messaging\MessageBuffer( - new \Ratchet\RFC6455\Messaging\CloseFrameChecker, - function (\Ratchet\RFC6455\Messaging\MessageInterface $msg) use ($deferred, $stream) { + $ms = new MessageBuffer( + new CloseFrameChecker, + function (MessageInterface $msg) use ($deferred, $stream) { $deferred->resolve($msg->getPayload()); $stream->close(); }, @@ -196,7 +200,7 @@ function createReport() { } }); - $stream->write(\GuzzleHttp\Psr7\str($cnRequest)); + $connection->write(\GuzzleHttp\Psr7\str($cnRequest)); }); return $deferred->promise(); diff --git a/tests/ab/startServer.php b/tests/ab/startServer.php index b256ec2..4baf884 100644 --- a/tests/ab/startServer.php +++ b/tests/ab/startServer.php @@ -7,49 +7,60 @@ require_once __DIR__ . "/../bootstrap.php"; $loop = \React\EventLoop\Factory::create(); -$socket = new \React\Socket\Server($loop); -$server = new \React\Http\Server($socket); +$socket = new \React\Socket\Server('127.0.0.1:9001', $loop); $closeFrameChecker = new \Ratchet\RFC6455\Messaging\CloseFrameChecker; $negotiator = new \Ratchet\RFC6455\Handshake\ServerNegotiator(new \Ratchet\RFC6455\Handshake\RequestVerifier); $uException = new \UnderflowException; -$server->on('request', function (\React\Http\Request $request, \React\Http\Response $response) use ($negotiator, $closeFrameChecker, $uException) { - $psrRequest = new \GuzzleHttp\Psr7\Request($request->getMethod(), $request->getPath(), $request->getHeaders()); - - $negotiatorResponse = $negotiator->handshake($psrRequest); - - $response->writeHead( - $negotiatorResponse->getStatusCode(), - array_merge( - $negotiatorResponse->getHeaders(), - ["Content-Length" => "0"] - ) - ); - - if ($negotiatorResponse->getStatusCode() !== 101) { - $response->end(); - return; - } - - $parser = new \Ratchet\RFC6455\Messaging\MessageBuffer($closeFrameChecker, function(MessageInterface $message) use ($response) { - $response->write($message->getContents()); - }, function(FrameInterface $frame) use ($response, &$parser) { - switch ($frame->getOpCode()) { - case Frame::OP_CLOSE: - $response->end($frame->getContents()); - break; - case Frame::OP_PING: - $response->write($parser->newFrame($frame->getPayload(), true, Frame::OP_PONG)->getContents()); - break; +$socket->on('connection', function (React\Socket\ConnectionInterface $connection) use ($negotiator, $closeFrameChecker, $uException) { + $headerComplete = false; + $buffer = ''; + $parser = null; + $connection->on('data', function ($data) use ($connection, &$parser, &$headerComplete, &$buffer, $negotiator, $closeFrameChecker, $uException) { + if ($headerComplete) { + $parser->onData($data); + return; } - }, true, function() use ($uException) { - return $uException; - }); - $request->on('data', [$parser, 'onData']); + $buffer .= $data; + $parts = explode("\r\n\r\n", $buffer); + if (count($parts) < 2) { + return; + } + $headerComplete = true; + $psrRequest = \GuzzleHttp\Psr7\parse_request($parts[0] . "\r\n\r\n"); + $negotiatorResponse = $negotiator->handshake($psrRequest); + + $negotiatorResponse = $negotiatorResponse->withAddedHeader("Content-Length", "0"); + + $connection->write(\GuzzleHttp\Psr7\str($negotiatorResponse)); + + if ($negotiatorResponse->getStatusCode() !== 101) { + $connection->end(); + return; + } + + $parser = new \Ratchet\RFC6455\Messaging\MessageBuffer($closeFrameChecker, + function (MessageInterface $message) use ($connection) { + $connection->write($message->getContents()); + }, function (FrameInterface $frame) use ($connection, &$parser) { + switch ($frame->getOpCode()) { + case Frame::OP_CLOSE: + $connection->end($frame->getContents()); + break; + case Frame::OP_PING: + $connection->write($parser->newFrame($frame->getPayload(), true, Frame::OP_PONG)->getContents()); + break; + } + }, true, function () use ($uException) { + return $uException; + }); + + array_shift($parts); + $parser->onData(implode("\r\n\r\n", $parts)); + }); }); -$socket->listen(9001, '0.0.0.0'); $loop->run();