Merge branch 'mb-psr7' into psr7

This commit is contained in:
Chris Boden 2015-05-26 18:05:35 -04:00
commit 3fb32b2827
5 changed files with 134 additions and 81 deletions

View File

@ -377,6 +377,7 @@ class Frame implements FrameInterface {
$byte_length = $this->getNumPayloadBytes();
if ($this->bytesRecvd < 1 + $byte_length) {
$this->defPayLen = -1;
throw new \UnderflowException('Not enough data buffered to determine payload length');
}

View File

@ -124,8 +124,6 @@ class Message implements \IteratorAggregate, MessageInterface {
$buffer .= $frame->getPayload();
}
echo "Reassembled " . strlen($buffer) . " bytes in " . $this->_frames->count() . " frames\n";
return $buffer;
}

View File

@ -0,0 +1,67 @@
<?php
class AbConnectionContext implements Ratchet\RFC6455\Messaging\Streaming\ContextInterface {
private $_frame;
private $_message;
protected $maskPayload;
/**
* @var \React\Stream\Stream
*/
protected $_conn;
public function __construct(\React\Stream\Stream $connectionContext, $maskPayload = false) {
$this->_conn = $connectionContext;
$this->maskPayload = $maskPayload;
}
public function setFrame(\Ratchet\RFC6455\Messaging\Protocol\FrameInterface $frame = null) {
$this->_frame = $frame;
}
public function getFrame() {
return $this->_frame;
}
public function setMessage(\Ratchet\RFC6455\Messaging\Protocol\MessageInterface $message = null) {
$this->_message = $message;
}
public function getMessage() {
return $this->_message;
}
public function onMessage(\Ratchet\RFC6455\Messaging\Protocol\MessageInterface $msg) {
$frame = new \Ratchet\RFC6455\Messaging\Protocol\Frame($msg->getPayload(), true, $msg[0]->getOpcode());
if ($this->maskPayload) {
$frame->maskPayload();
}
$this->_conn->write($frame->getContents());
}
public function onPing(\Ratchet\RFC6455\Messaging\Protocol\FrameInterface $frame) {
$pong = new \Ratchet\RFC6455\Messaging\Protocol\Frame($frame->getPayload(), true, \Ratchet\RFC6455\Messaging\Protocol\Frame::OP_PONG);
if ($this->maskPayload) {
$pong->maskPayload();
}
$this->_conn->write($pong->getContents());
}
public function onPong(\Ratchet\RFC6455\Messaging\Protocol\FrameInterface $msg) {
// TODO: Implement onPong() method.
}
public function onClose($code = 1000) {
$frame = new \Ratchet\RFC6455\Messaging\Protocol\Frame(
pack('n', $code),
true,
\Ratchet\RFC6455\Messaging\Protocol\Frame::OP_CLOSE
);
if ($this->maskPayload) {
$frame->maskPayload();
}
$this->_conn->end($frame->getContents());
}
}

View File

@ -4,9 +4,34 @@ use Ratchet\RFC6455\Messaging\Protocol\Frame;
use Ratchet\RFC6455\Messaging\Protocol\Message;
require __DIR__ . '/../bootstrap.php';
require __DIR__ . '/AbConnectionContext.php';
define('AGENT', 'RatchetRFC/0.0.0');
$testServer = "127.0.0.1";
class EmConnectionContext extends AbConnectionContext implements \Evenement\EventEmitterInterface, Ratchet\RFC6455\Messaging\Streaming\ContextInterface {
use \Evenement\EventEmitterTrait;
public function onMessage(\Ratchet\RFC6455\Messaging\Protocol\MessageInterface $msg) {
$this->emit('message', [$msg]);
}
public function sendMessage(Frame $frame) {
if ($this->maskPayload) {
$frame->maskPayload();
}
$this->_conn->write($frame->getContents());
}
public function close($closeCode = Frame::CLOSE_NORMAL) {
$closeFrame = new Frame(pack('n', $closeCode), true, Frame::OP_CLOSE);
$closeFrame->maskPayload();
$this->_conn->end($closeFrame->getContents());
}
}
$loop = React\EventLoop\Factory::create();
$dnsResolverFactory = new React\Dns\Resolver\Factory();
@ -16,37 +41,23 @@ $factory = new \React\SocketClient\Connector($loop, $dnsResolver);
function getTestCases() {
global $factory;
global $testServer;
$deferred = new Deferred();
$factory->create('127.0.0.1', 9001)->then(function (\React\Stream\Stream $stream) use ($deferred) {
$factory->create($testServer, 9001)->then(function (\React\Stream\Stream $stream) use ($deferred) {
$cn = new \Ratchet\RFC6455\Handshake\ClientNegotiator("/getCaseCount");
$cnRequest = $cn->getRequest();
$rawResponse = "";
$response = null;
$ms = new \Ratchet\RFC6455\Messaging\Streaming\MessageStreamer(true);
$ms = new \Ratchet\RFC6455\Messaging\Streaming\MessageStreamer(new \Ratchet\RFC6455\Encoding\Validator(), true);
$ms->on('message', function (Message $msg) use ($stream, $deferred) {
$deferred->resolve($msg->getPayload());
/** @var EmConnectionContext $context */
$context = null;
$closeFrame = new Frame(pack('n', Frame::CLOSE_NORMAL), true, Frame::OP_CLOSE);
$closeFrame->maskPayload();
$stream->end($closeFrame->getContents());
});
$ms->on('close', function ($code) use ($stream) {
if ($code === null) {
$stream->end();
return;
}
$frame = new Frame(pack('n', $code), true, Frame::OP_CLOSE);
$frame->maskPayload();
$stream->end($frame->getContents());
});
$stream->on('data', function ($data) use ($stream, &$rawResponse, &$response, $ms, $cn, $deferred) {
$stream->on('data', function ($data) use ($stream, &$rawResponse, &$response, $ms, $cn, $deferred, &$context) {
if ($response === null) {
$rawResponse .= $data;
$pos = strpos($rawResponse, "\r\n\r\n");
@ -58,13 +69,20 @@ function getTestCases() {
if (!$cn->validateResponse($response)) {
$stream->end();
$deferred->reject();
} else {
$context = new EmConnectionContext($stream, true);
$context->on('message', function (Message $msg) use ($stream, $deferred, $context) {
$deferred->resolve($msg->getPayload());
$context->close();
});
}
}
}
// feed the message streamer
if ($response) {
$ms->onData($data);
if ($response && $context) {
$ms->onData($data, $context);
}
});
@ -77,46 +95,25 @@ function getTestCases() {
function runTest($case)
{
global $factory;
global $testServer;
$casePath = "/runCase?case={$case}&agent=" . AGENT;
$deferred = new Deferred();
$factory->create('127.0.0.1', 9001)->then(function (\React\Stream\Stream $stream) use ($deferred, $casePath, $case) {
$factory->create($testServer, 9001)->then(function (\React\Stream\Stream $stream) use ($deferred, $casePath, $case) {
$cn = new \Ratchet\RFC6455\Handshake\ClientNegotiator($casePath);
$cnRequest = $cn->getRequest();
$rawResponse = "";
$response = null;
$ms = new \Ratchet\RFC6455\Messaging\Streaming\MessageStreamer(true);
$ms = new \Ratchet\RFC6455\Messaging\Streaming\MessageStreamer(new \Ratchet\RFC6455\Encoding\Validator(), true);
$ms->on('message', function (Message $msg) use ($stream, $deferred, $case) {
echo "Got message on case " . $case . "\n";
$opcode = $msg->isBinary() ? Frame::OP_BINARY : Frame::OP_TEXT;
$frame = new Frame($msg->getPayload(), true, $opcode);
$frame->maskPayload();
/** @var AbConnectionContext $context */
$context = null;
$stream->write($frame->getContents());
});
$ms->on('ping', function (Frame $frame) use ($stream) {
$response = new Frame($frame->getPayload(), true, Frame::OP_PONG);
$response->maskPayload();
$stream->write($response->getContents());
});
$ms->on('close', function ($code) use ($stream, $deferred) {
if ($code === null) {
$stream->end();
return;
}
$frame = new Frame(pack('n', $code), true, Frame::OP_CLOSE);
$frame->maskPayload();
$stream->end($frame->getContents());
});
$stream->on('data', function ($data) use ($stream, &$rawResponse, &$response, $ms, $cn, $deferred) {
$stream->on('data', function ($data) use ($stream, &$rawResponse, &$response, $ms, $cn, $deferred, &$context) {
if ($response === null) {
$rawResponse .= $data;
$pos = strpos($rawResponse, "\r\n\r\n");
@ -128,13 +125,15 @@ function runTest($case)
if (!$cn->validateResponse($response)) {
$stream->end();
$deferred->reject();
} else {
$context = new AbConnectionContext($stream, true);
}
}
}
// feed the message streamer
if ($response) {
$ms->onData($data);
if ($response && $context) {
$ms->onData($data, $context);
}
});
@ -148,41 +147,25 @@ function runTest($case)
return $deferred->promise();
}
function createReport() {
global $factory;
global $testServer;
$deferred = new Deferred();
$factory->create('127.0.0.1', 9001)->then(function (\React\Stream\Stream $stream) use ($deferred) {
$factory->create($testServer, 9001)->then(function (\React\Stream\Stream $stream) use ($deferred) {
$cn = new \Ratchet\RFC6455\Handshake\ClientNegotiator('/updateReports?agent=' . AGENT);
$cnRequest = $cn->getRequest();
$rawResponse = "";
$response = null;
$ms = new \Ratchet\RFC6455\Messaging\Streaming\MessageStreamer(true);
$ms = new \Ratchet\RFC6455\Messaging\Streaming\MessageStreamer(new \Ratchet\RFC6455\Encoding\Validator(), true);
$ms->on('message', function (Message $msg) use ($stream, $deferred) {
$deferred->resolve($msg->getPayload());
/** @var EmConnectionContext $context */
$context = null;
$closeFrame = new Frame(pack('n', Frame::CLOSE_NORMAL), true, Frame::OP_CLOSE);
$closeFrame->maskPayload();
$stream->end($closeFrame->getContents());
});
$ms->on('close', function ($code) use ($stream) {
if ($code === null) {
$stream->end();
return;
}
$frame = new Frame(pack('n', $code), true, Frame::OP_CLOSE);
$frame->maskPayload();
$stream->end($frame->getContents());
});
$stream->on('data', function ($data) use ($stream, &$rawResponse, &$response, $ms, $cn, $deferred) {
$stream->on('data', function ($data) use ($stream, &$rawResponse, &$response, $ms, $cn, $deferred, &$context) {
if ($response === null) {
$rawResponse .= $data;
$pos = strpos($rawResponse, "\r\n\r\n");
@ -194,13 +177,20 @@ function createReport() {
if (!$cn->validateResponse($response)) {
$stream->end();
$deferred->reject();
} else {
$context = new EmConnectionContext($stream, true);
$context->on('message', function (Message $msg) use ($stream, $deferred, $context) {
$deferred->resolve($msg->getPayload());
$context->close();
});
}
}
}
// feed the message streamer
if ($response) {
$ms->onData($data);
if ($response && $context) {
$ms->onData($data, $context);
}
});

View File

@ -4,10 +4,7 @@
"failByDrop": false
}
, "outdir": "./reports/clients"
, "casesa": ["*"]
, "casesj": ["9.3.2", "9.3.3", "9.3.4", "9.4.2", "9.4.3", "9.4.4", "9.7.6", "9.8.6"]
, "cases": ["9.3.1", "9.3.2"]
, "casesx": ["1.*","2.*"]
, "cases": ["*"]
, "exclude-cases": ["12.*", "13.*"]
, "exclude-agent-cases": {}
}