Merge pull request #30 from jmoo/fix-for-blocking-ops

Null out message buffer before calling onMessage handler
This commit is contained in:
Chris Boden 2019-03-10 13:10:42 -04:00 committed by GitHub
commit c62f7cd95f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 3 deletions

View File

@ -98,13 +98,15 @@ class MessageBuffer {
if ($this->messageBuffer->isCoalesced()) {
$msgCheck = $this->checkMessage($this->messageBuffer);
$msgBuffer = $this->messageBuffer;
$this->messageBuffer = null;
if (true !== $msgCheck) {
$onControl($this->newCloseFrame($msgCheck, 'Ratchet detected an invalid UTF-8 payload'));
} else {
$onMessage($this->messageBuffer);
$onMessage($msgBuffer);
}
$this->messageBuffer = null;
}
return $overflow;

View File

@ -6,6 +6,7 @@ use Ratchet\RFC6455\Messaging\CloseFrameChecker;
use Ratchet\RFC6455\Messaging\Frame;
use Ratchet\RFC6455\Messaging\Message;
use Ratchet\RFC6455\Messaging\MessageBuffer;
use React\EventLoop\Factory;
class MessageBufferTest extends \PHPUnit_Framework_TestCase
{
@ -36,4 +37,36 @@ class MessageBufferTest extends \PHPUnit_Framework_TestCase
$this->assertEquals(1000, $messageCount);
}
public function testProcessingMessagesAsynchronouslyWhileBlockingInMessageHandler() {
$loop = Factory::create();
$frameA = new Frame('a', true, Frame::OP_TEXT);
$frameB = new Frame('b', true, Frame::OP_TEXT);
$bReceived = false;
$messageBuffer = new MessageBuffer(
new CloseFrameChecker(),
function (Message $message) use (&$messageCount, &$bReceived, $loop) {
$payload = $message->getPayload();
$bReceived = $payload === 'b';
if (!$bReceived) {
$loop->run();
}
},
null,
false
);
$loop->addPeriodicTimer(0.1, function () use ($messageBuffer, $frameB, $loop) {
$loop->stop();
$messageBuffer->onData($frameB->getContents());
});
$messageBuffer->onData($frameA->getContents());
$this->assertTrue($bReceived);
}
}