From 7280ddcd19555a326e213ef1f69ff8c94a12ce59 Mon Sep 17 00:00:00 2001 From: Matt Bonneau Date: Thu, 9 Mar 2017 18:22:42 -0500 Subject: [PATCH] Per-message deflate with options work --- src/Handshake/ClientNegotiator.php | 15 +- src/Handshake/PermessageDeflateOptions.php | 182 ++++++++++++++++++++ src/Handshake/RequestVerifier.php | 23 +++ src/Handshake/ResponseVerifier.php | 28 +++- src/Handshake/ServerNegotiator.php | 32 +++- src/Messaging/Frame.php | 17 ++ src/Messaging/Message.php | 12 ++ src/Messaging/MessageBuffer.php | 185 ++++++++++++++++++++- tests/ab/clientRunner.php | 37 +++-- tests/ab/startServer.php | 46 +++-- 10 files changed, 538 insertions(+), 39 deletions(-) create mode 100644 src/Handshake/PermessageDeflateOptions.php diff --git a/src/Handshake/ClientNegotiator.php b/src/Handshake/ClientNegotiator.php index 70856df..95f49ec 100644 --- a/src/Handshake/ClientNegotiator.php +++ b/src/Handshake/ClientNegotiator.php @@ -16,7 +16,7 @@ class ClientNegotiator { */ private $defaultHeader; - function __construct() { + function __construct($enablePerMessageDeflate = false) { $this->verifier = new ResponseVerifier; $this->defaultHeader = new Request('GET', '', [ @@ -25,6 +25,19 @@ class ClientNegotiator { , 'Sec-WebSocket-Version' => $this->getVersion() , 'User-Agent' => "Ratchet" ]); + + if ($enablePerMessageDeflate && (version_compare(PHP_VERSION, '7.0.15', '<') || version_compare(PHP_VERSION, '7.1.0', '='))) { + $enablePerMessageDeflate = false; + } + if ($enablePerMessageDeflate && !function_exists('deflate_add')) { + $enablePerMessageDeflate = false; + } + + if ($enablePerMessageDeflate) { + $this->defaultHeader = $this->defaultHeader->withAddedHeader( + 'Sec-WebSocket-Extensions', + 'permessage-deflate'); + } } public function generateRequest(UriInterface $uri) { diff --git a/src/Handshake/PermessageDeflateOptions.php b/src/Handshake/PermessageDeflateOptions.php new file mode 100644 index 0000000..6b325d8 --- /dev/null +++ b/src/Handshake/PermessageDeflateOptions.php @@ -0,0 +1,182 @@ +getHeader('Sec-Websocket-Extensions'))); + + $configurationRequests = explode(',', $extHeader); + foreach ($configurationRequests as $configurationRequest) { + $parts = explode(';', $configurationRequest); + if (count($parts) == 0) { + continue; + } + + if ($parts[0] !== 'permessage-deflate') { + continue; + } + + array_shift($parts); + $options = new static(); + $options->deflate = true; + foreach ($parts as $part) { + $kv = explode('=', $part); + $key = $kv[0]; + $value = count($kv) > 1 ? $kv[1] : null; + + $validBits = ['8', '9', '10', '11', '12', '13', '14', '15']; + switch ($key) { + case "server_no_context_takeover": + case "client_no_context_takeover": + if ($value !== null) { + throw new \Exception($key . ' must not have a value.'); + } + $value = true; + break; + case "server_max_window_bits": + if (!in_array($value, $validBits)) { + throw new \Exception($key . ' must have a value between 8 and 15.'); + } + break; + case "client_max_window_bits": + if ($value === null) { + $value = '15'; + } + if (!in_array($value, $validBits)) { + throw new \Exception($key . ' must have no value or a value between 8 and 15.'); + } + break; + default: + throw new \Exception('Option "' . $key . '"is not valid for this extension'); + } + + if ($options->$key !== null) { + throw new \Exception('Key specified more than once. Connection must be declined.'); + } + + $options->$key = $value; + } + + if ($options->getClientMaxWindowBits() === null) { + $options->client_max_window_bits = 15; + } + + if ($options->getServerMaxWindowBits() === null) { + $options->server_max_window_bits = 15; + } + + $optionSets[] = $options; + } + + // always put a disabled on the end + $optionSets[] = new static(); + + return $optionSets; + } + + public static function createDisabled() { + return new static(); + } + + public static function validateResponseToRequest(ResponseInterface $response, RequestInterface $request) { + $requestOptions = static::fromRequestOrResponse($request); + $responseOptions = static::fromRequestOrResponse($response); + } + + /** + * @return mixed + */ + public function getServerNoContextTakeover() + { + return $this->server_no_context_takeover; + } + + /** + * @return mixed + */ + public function getClientNoContextTakeover() + { + return false; // always return false unless we want to conserve resources + return $this->client_no_context_takeover; + } + + /** + * @return mixed + */ + public function getServerMaxWindowBits() + { + return $this->server_max_window_bits; + } + + /** + * @return mixed + */ + public function getClientMaxWindowBits() + { + return $this->client_max_window_bits; + } + + /** + * @return bool + */ + public function getDeflate() + { + return $this->deflate; + } + + /** + * @param ResponseInterface $response + * @return ResponseInterface + */ + public function addHeaderToResponse(ResponseInterface $response) + { + if (!$this->deflate) { + return $response; + } + + $header = 'permessage-deflate'; + if ($this->client_max_window_bits != 15) { + $header .= '; client_max_window_bits='. $this->client_max_window_bits; + } + // this would only be needed if you want to save server resources (no buffer needed) + // worse compression +// if ($this->client_no_context_takeover) { +// $header .= '; client_no_context_takeover'; +// } + if ($this->server_max_window_bits != 15) { + $header .= '; server_max_window_bits=' . $this->server_max_window_bits; + } + if ($this->server_no_context_takeover) { + $header .= '; server_no_context_takeover'; + } + + echo $header . "\n"; + + return $response->withAddedHeader('Sec-Websocket-Extensions', $header); + } +} \ No newline at end of file diff --git a/src/Handshake/RequestVerifier.php b/src/Handshake/RequestVerifier.php index 1ace489..dbce9a9 100644 --- a/src/Handshake/RequestVerifier.php +++ b/src/Handshake/RequestVerifier.php @@ -137,4 +137,27 @@ class RequestVerifier { */ public function verifyExtensions($val) { } + + public function getPermessageDeflateOptions(array $requestHeader, array $responseHeader) { + $deflate = true; + if (!isset($requestHeader['Sec-WebSocket-Extensions']) || count(array_filter($requestHeader['Sec-WebSocket-Extensions'], function ($val) { + return 'permessage-deflate' === substr($val, 0, strlen('permessage-deflate')); + })) === 0) { + $deflate = false; + } + + if (!isset($responseHeader['Sec-WebSocket-Extensions']) || count(array_filter($responseHeader['Sec-WebSocket-Extensions'], function ($val) { + return 'permessage-deflate' === substr($val, 0, strlen('permessage-deflate')); + })) === 0) { + $deflate = false; + } + + return [ + 'deflate' => $deflate, + 'no_context_takeover' => false, + 'max_window_bits' => null, + 'request_no_context_takeover' => false, + 'request_max_window_bits' => null + ]; + } } diff --git a/src/Handshake/ResponseVerifier.php b/src/Handshake/ResponseVerifier.php index de03f53..7dc3921 100644 --- a/src/Handshake/ResponseVerifier.php +++ b/src/Handshake/ResponseVerifier.php @@ -18,8 +18,12 @@ class ResponseVerifier { $request->getHeader('Sec-WebSocket-Protocol') , $response->getHeader('Sec-WebSocket-Protocol') ); + $passes += (int)$this->verifyExtensions( + $request->getHeader('Sec-WebSocket-Extensions') + , $response->getHeader('Sec-WebSocket-Extensions') + ); - return (5 === $passes); + return (6 === $passes); } public function verifyStatus($status) { @@ -49,4 +53,26 @@ class ResponseVerifier { public function verifySubProtocol(array $requestHeader, array $responseHeader) { return 0 === count($responseHeader) || count(array_intersect($responseHeader, $requestHeader)) > 0; } + + public function verifyExtensions(array $requestHeader, array $responseHeader) { + if (in_array('permessage-deflate', $responseHeader)) { + return in_array('permessage-deflate', $requestHeader); + } + + return 1; + } + + public function getPermessageDeflateOptions(array $requestHeader, array $responseHeader) { + if (!$this->verifyExtensions($requestHeader, $responseHeader)) { + return false; + } + + return [ + 'deflate' => in_array('permessage-deflate', $responseHeader), + 'no_context_takeover' => false, + 'max_window_bits' => null, + 'request_no_context_takeover' => false, + 'request_max_window_bits' => null + ]; + } } \ No newline at end of file diff --git a/src/Handshake/ServerNegotiator.php b/src/Handshake/ServerNegotiator.php index 5a0073b..8ff15a9 100644 --- a/src/Handshake/ServerNegotiator.php +++ b/src/Handshake/ServerNegotiator.php @@ -17,8 +17,19 @@ class ServerNegotiator implements NegotiatorInterface { private $_strictSubProtocols = false; - public function __construct(RequestVerifier $requestVerifier) { + private $enablePerMessageDeflate = false; + + public function __construct(RequestVerifier $requestVerifier, $enablePerMessageDeflate = false) { $this->verifier = $requestVerifier; + + if ($enablePerMessageDeflate && (version_compare(PHP_VERSION, '7.0.15', '<') || version_compare(PHP_VERSION, '7.1.0', '='))) { + $enablePerMessageDeflate = false; + } + if ($enablePerMessageDeflate && !function_exists('deflate_add')) { + $enablePerMessageDeflate = false; + } + + $this->enablePerMessageDeflate = $enablePerMessageDeflate; } /** @@ -97,12 +108,23 @@ class ServerNegotiator implements NegotiatorInterface { } } - return new Response(101, array_merge($headers, [ + $response = new Response(101, array_merge($headers, [ 'Upgrade' => 'websocket' - , 'Connection' => 'Upgrade' - , 'Sec-WebSocket-Accept' => $this->sign((string)$request->getHeader('Sec-WebSocket-Key')[0]) - , 'X-Powered-By' => 'Ratchet' + , 'Connection' => 'Upgrade' + , 'Sec-WebSocket-Accept' => $this->sign((string)$request->getHeader('Sec-WebSocket-Key')[0]) + , 'X-Powered-By' => 'Ratchet' ])); + + +// $perMessageDeflate = array_filter($request->getHeader('Sec-WebSocket-Extensions'), function ($x) { +// return 'permessage-deflate' === substr($x, 0, strlen('permessage-deflate')); +// }); + $perMessageDeflateRequest = PermessageDeflateOptions::fromRequestOrResponse($request)[0]; + if ($this->enablePerMessageDeflate && $perMessageDeflateRequest->getDeflate()) { + $response = $perMessageDeflateRequest->addHeaderToResponse($response); + } + + return $response; } /** diff --git a/src/Messaging/Frame.php b/src/Messaging/Frame.php index 40f9eb2..f558554 100644 --- a/src/Messaging/Frame.php +++ b/src/Messaging/Frame.php @@ -149,6 +149,23 @@ class Frame implements FrameInterface { return 128 === ($this->firstByte & 128); } + public function setRsv1($value = true) { + if (strlen($this->data) == 0) { + throw new \UnderflowException("Cannot set Rsv1 because there is no data."); + } + + $this->firstByte = + ($this->isFinal() ? 128 : 0) + + $this->getOpcode() + + ($value ? 64 : 0) + + ($this->getRsv2() ? 32 : 0) + + ($this->getRsv3() ? 16 : 0) + ; + + $this->data[0] = chr($this->firstByte); + return $this; + } + /** * @return boolean * @throws \UnderflowException diff --git a/src/Messaging/Message.php b/src/Messaging/Message.php index 4f3b014..06f3949 100644 --- a/src/Messaging/Message.php +++ b/src/Messaging/Message.php @@ -120,4 +120,16 @@ class Message implements \IteratorAggregate, MessageInterface { return Frame::OP_BINARY === $this->_frames->bottom()->getOpcode(); } + + /** + * @return boolean + */ + public function getRsv1() { + if ($this->_frames->isEmpty()) { + return false; + //throw new \UnderflowException('Not enough data has been received to determine if message is binary'); + } + + return $this->_frames->bottom()->getRsv1(); + } } diff --git a/src/Messaging/MessageBuffer.php b/src/Messaging/MessageBuffer.php index 07ff4f1..2ace8db 100644 --- a/src/Messaging/MessageBuffer.php +++ b/src/Messaging/MessageBuffer.php @@ -1,6 +1,8 @@ closeFrameChecker = $frameChecker; $this->checkForMask = (bool)$expectMask; @@ -53,6 +78,14 @@ class MessageBuffer { $this->onMessage = $onMessage; $this->onControl = $onControl ?: function() {}; + + $this->sender = $sender; + + $this->permessageDeflateOptions = $permessageDeflateOptions ? $permessageDeflateOptions : PermessageDeflateOptions::createDisabled(); + + $this->deflate = $this->permessageDeflateOptions->getDeflate(); + + $this->compressedMessage = false; } public function onData($data) { @@ -85,12 +118,19 @@ class MessageBuffer { $opcode = $this->frameBuffer->getOpcode(); if ($opcode > 2) { - $onControl($this->frameBuffer); + $onControl($this->frameBuffer, $this); if (Frame::OP_CLOSE === $opcode) { return ''; } } else { + if ($this->messageBuffer->count() === 0 && $this->frameBuffer->getRsv1()) { + $this->compressedMessage = true; + } + if ($this->compressedMessage) { + $this->frameBuffer = $this->inflateFrame($this->frameBuffer); + } + $this->messageBuffer->addFrame($this->frameBuffer); } @@ -99,12 +139,17 @@ class MessageBuffer { if ($this->messageBuffer->isCoalesced()) { $msgCheck = $this->checkMessage($this->messageBuffer); if (true !== $msgCheck) { - $onControl($this->newCloseFrame($msgCheck, 'Ratchet detected an invalid UTF-8 payload')); + $onControl($this->newCloseFrame($msgCheck, 'Ratchet detected an invalid UTF-8 payload'), $this); } else { - $onMessage($this->messageBuffer); + $onMessage($this->messageBuffer, $this); } $this->messageBuffer = null; + $this->compressedMessage = false; + + if ($this->permessageDeflateOptions->getServerNoContextTakeover()) { + $this->inflator = null; + } } return $overflow; @@ -116,7 +161,7 @@ class MessageBuffer { * @return \Ratchet\RFC6455\Messaging\FrameInterface|FrameInterface */ public function frameCheck(FrameInterface $frame) { - if (false !== $frame->getRsv1() || + if ((false !== $frame->getRsv1() && !$this->deflate) || false !== $frame->getRsv2() || false !== $frame->getRsv3() ) { @@ -228,4 +273,134 @@ class MessageBuffer { public function newCloseFrame($code, $reason = '') { return $this->newFrame(pack('n', $code) . $reason, true, Frame::OP_CLOSE); } + + + public function sendFrame(Frame $frame) { + if ($this->deflate && + ($frame->getOpcode() === Frame::OP_TEXT || $frame->getOpcode() === Frame::OP_BINARY)) { + $frame = $this->deflateFrame($frame); + } + + if (!$this->checkForMask) { + $frame->maskPayload(); + } + + $sender = $this->sender; + $sender($frame->getContents()); + } + + public function sendMessage($messagePayload, $final = true, $isBinary = false) { + $opCode = $isBinary ? Frame::OP_BINARY : Frame::OP_TEXT; + if ($this->streamingMessageOpCode === -1) { + $this->streamingMessageOpCode = $opCode; + } + + if ($this->streamingMessageOpCode !== $opCode) { + throw new \Exception('Binary and text message parts cannot be streamed together.'); + } + + $frame = $this->newFrame($messagePayload, $final, $opCode); + + $this->sendFrame($frame); + + if ($final) { + // reset deflator if client doesn't remember contexts + if ($this->permessageDeflateOptions->getClientNoContextTakeover()) { + $this->deflator = null; + } + $this->streamingMessageOpCode = -1; + } + } + + private $inflator; + + private function inflateFrame(Frame $frame) { + if ($this->inflator === null) { +// $this->inflator = inflate_init(ZLIB_ENCODING_RAW); + $this->inflator = inflate_init( + ZLIB_ENCODING_RAW, + [ + 'level' => -1, + 'memory' => 8, + 'window' => $this->permessageDeflateOptions->getClientMaxWindowBits(), + 'strategy' => ZLIB_DEFAULT_STRATEGY + ] + ); + } + + $terminator = ''; + if ($frame->isFinal()) { + $terminator = "\x00\x00\xff\xff"; + } + + gc_collect_cycles(); // memory runs away if we don't collect ?? + + return new Frame( + inflate_add($this->inflator, $frame->getPayload() . $terminator), + $frame->isFinal(), + $frame->getOpcode() + ); + } + private $deflator; + + private function deflateFrame(Frame $frame) { + if ($frame->getRsv1()) { + return $frame; // frame is already deflated + } + + if ($this->deflator === null) { +// $this->deflator = deflate_init( +// ZLIB_ENCODING_RAW +// ); + echo "init with " . $this->permessageDeflateOptions->getServerMaxWindowBits(); + $this->deflator = deflate_init( + ZLIB_ENCODING_RAW, + [ + 'level' => -1, + 'memory' => 8, + 'window' => $this->permessageDeflateOptions->getServerMaxWindowBits(), + 'strategy' => ZLIB_DEFAULT_STRATEGY + ] + ); + } + + // there is an issue in the zlib extension for php where + // deflate_add does not check avail_out to see if the buffer filled + // this only seems to be an issue for payloads between 16 and 64 bytes + // This if statement is a hack fix to break the output up allowing us + // to call deflate_add twice which should clear the buffer issue +// if ($frame->getPayloadLength() >= 16 && $frame->getPayloadLength() <= 64) { +// // try processing in 8 byte chunks +// // https://bugs.php.net/bug.php?id=73373 +// $payload = ""; +// $orig = $frame->getPayload(); +// $partSize = 8; +// while (strlen($orig) > 0) { +// $part = substr($orig, 0, $partSize); +// $orig = substr($orig, strlen($part)); +// $flags = strlen($orig) > 0 ? ZLIB_PARTIAL_FLUSH : ZLIB_SYNC_FLUSH; +// $payload .= deflate_add($this->deflator, $part, $flags); +// } +// } else { + $payload = deflate_add( + $this->deflator, + $frame->getPayload(), + ZLIB_SYNC_FLUSH + ); +// } + + $deflatedFrame = new Frame( + substr($payload, 0, $frame->isFinal() ? -4 : strlen($payload)), + $frame->isFinal(), + $frame->getOpcode() + ); + + if ($frame->isFinal()) { + $deflatedFrame->setRsv1(); + } + + gc_collect_cycles(); // memory runs away if we don't collect ?? + + return $deflatedFrame; + } } diff --git a/tests/ab/clientRunner.php b/tests/ab/clientRunner.php index 0c5578a..bfd90a4 100644 --- a/tests/ab/clientRunner.php +++ b/tests/ab/clientRunner.php @@ -1,5 +1,7 @@ createCached('8.8.8.8', $loop); $factory = new \React\SocketClient\Connector($loop, $dnsResolver); -function echoStreamerFactory($conn) +function echoStreamerFactory($conn, $permessageDeflateOptions = null) { + if ($permessageDeflateOptions === null) { + $permessageDeflateOptions = []; + } + return new \Ratchet\RFC6455\Messaging\MessageBuffer( new \Ratchet\RFC6455\Messaging\CloseFrameChecker, - function (\Ratchet\RFC6455\Messaging\MessageInterface $msg) use ($conn) { - /** @var Frame $frame */ - foreach ($msg as $frame) { - $frame->maskPayload(); - } - $conn->write($msg->getContents()); + function (\Ratchet\RFC6455\Messaging\MessageInterface $msg, MessageBuffer $messageBuffer) use ($conn) { + $messageBuffer->sendMessage($msg->getPayload(), true, $msg->isBinary()); }, - function (\Ratchet\RFC6455\Messaging\FrameInterface $frame) use ($conn) { + [$conn, 'write'], + function (\Ratchet\RFC6455\Messaging\FrameInterface $frame, MessageBuffer $messageBuffer) use ($conn) { switch ($frame->getOpcode()) { case Frame::OP_PING: return $conn->write((new Frame($frame->getPayload(), true, Frame::OP_PONG))->maskPayload()->getContents()); @@ -37,7 +40,9 @@ function echoStreamerFactory($conn) break; } }, - false + false, + null, + $permessageDeflateOptions ); } @@ -54,7 +59,7 @@ function getTestCases() { $rawResponse = ""; $response = null; - /** @var \Ratchet\RFC6455\Messaging\Streaming\MessageBuffer $ms */ + /** @var MessageBuffer $ms */ $ms = null; $stream->on('data', function ($data) use ($stream, &$rawResponse, &$response, &$ms, $cn, $deferred, &$context, $cnRequest) { @@ -76,6 +81,7 @@ function getTestCases() { $deferred->resolve($msg->getPayload()); $stream->close(); }, + function () {}, null, false ); @@ -105,7 +111,7 @@ function runTest($case) $deferred = new Deferred(); $factory->create($testServer, 9001)->then(function (\React\Stream\Stream $stream) use ($deferred, $casePath, $case) { - $cn = new \Ratchet\RFC6455\Handshake\ClientNegotiator(); + $cn = new \Ratchet\RFC6455\Handshake\ClientNegotiator(true); $cnRequest = $cn->generateRequest(new Uri('ws://127.0.0.1:9001' . $casePath)); $rawResponse = ""; @@ -126,7 +132,13 @@ function runTest($case) $stream->end(); $deferred->reject(); } else { - $ms = echoStreamerFactory($stream); + $ms = echoStreamerFactory( + $stream, + (new ResponseVerifier())->getPermessageDeflateOptions( + $cnRequest->getHeader('Sec-WebSocket-Extensions'), + $response->getHeader('Sec-WebSocket-Extensions') + ) + ); } } } @@ -183,6 +195,7 @@ function createReport() { $deferred->resolve($msg->getPayload()); $stream->close(); }, + function () {}, null, false ); diff --git a/tests/ab/startServer.php b/tests/ab/startServer.php index b256ec2..f492e19 100644 --- a/tests/ab/startServer.php +++ b/tests/ab/startServer.php @@ -1,4 +1,8 @@ on('request', function (\React\Http\Request $request, \React\Http\Respo return; } - $parser = new \Ratchet\RFC6455\Messaging\MessageBuffer($closeFrameChecker, function(MessageInterface $message) use ($response) { - $response->write($message->getContents()); - }, function(FrameInterface $frame) use ($response, &$parser) { - switch ($frame->getOpCode()) { - case Frame::OP_CLOSE: - $response->end($frame->getContents()); - break; - case Frame::OP_PING: - $response->write($parser->newFrame($frame->getPayload(), true, Frame::OP_PONG)->getContents()); - break; - } - }, true, function() use ($uException) { - return $uException; - }); + // there is no need to look through the client requests + // we support any valid permessage deflate + $deflateOptions = PermessageDeflateOptions::fromRequestOrResponse($psrRequest)[0]; + + $parser = new \Ratchet\RFC6455\Messaging\MessageBuffer( + $closeFrameChecker, + function (MessageInterface $message, MessageBuffer $messageBuffer) use ($response) { + $messageBuffer->sendMessage($message->getPayload(), true, $message->isBinary()); + }, + [$response, 'write'], + function (FrameInterface $frame) use ($response, &$parser) { + switch ($frame->getOpCode()) { + case Frame::OP_CLOSE: + $response->end($frame->getContents()); + break; + case Frame::OP_PING: + $response->write($parser->newFrame($frame->getPayload(), true, Frame::OP_PONG)->getContents()); + break; + } + }, + true, + function () use ($uException) { + return $uException; + }, + $deflateOptions + ); $request->on('data', [$parser, 'onData']); });