Rework MessageBuffer to better handle large buffers filled with small frames
This commit is contained in:
parent
8944361dbe
commit
8aee220898
@ -37,6 +37,11 @@ class MessageBuffer {
|
|||||||
*/
|
*/
|
||||||
private $checkForMask;
|
private $checkForMask;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
|
private $leftovers;
|
||||||
|
|
||||||
function __construct(
|
function __construct(
|
||||||
CloseFrameChecker $frameChecker,
|
CloseFrameChecker $frameChecker,
|
||||||
callable $onMessage,
|
callable $onMessage,
|
||||||
@ -53,12 +58,51 @@ class MessageBuffer {
|
|||||||
|
|
||||||
$this->onMessage = $onMessage;
|
$this->onMessage = $onMessage;
|
||||||
$this->onControl = $onControl ?: function() {};
|
$this->onControl = $onControl ?: function() {};
|
||||||
|
|
||||||
|
$this->leftovers = '';
|
||||||
}
|
}
|
||||||
|
|
||||||
public function onData($data) {
|
public function onData($data) {
|
||||||
while (strlen($data) > 0) {
|
$data = $this->leftovers . $data;
|
||||||
$data = $this->processData($data);
|
$dataLen = strlen($data);
|
||||||
|
$spyFrame = new Frame();
|
||||||
|
|
||||||
|
if ($dataLen < 2) {
|
||||||
|
$this->leftovers = $data;
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
$currentByte = 0;
|
||||||
|
$frameStart = 0;
|
||||||
|
$spyFrame->addBuffer($data[$currentByte]);
|
||||||
|
$currentByte++;
|
||||||
|
|
||||||
|
while ($currentByte < $dataLen) {
|
||||||
|
$spyFrame->addBuffer($data[$currentByte]);
|
||||||
|
$currentByte ++;
|
||||||
|
try {
|
||||||
|
$payload_length = $spyFrame->getPayloadLength();
|
||||||
|
$payload_start = $spyFrame->getPayloadStartingByte();
|
||||||
|
} catch (\UnderflowException $e) {
|
||||||
|
if ($currentByte < $dataLen) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
$isCoalesced = $dataLen - $frameStart >= $payload_length + $payload_start;
|
||||||
|
|
||||||
|
|
||||||
|
if (!$isCoalesced) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
$this->processData(substr($data, $frameStart, $payload_length + $payload_start));
|
||||||
|
$spyFrame = new Frame();
|
||||||
|
$currentByte = $frameStart + $payload_length + $payload_start;
|
||||||
|
$frameStart = $currentByte;
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->leftovers = substr($data, $frameStart);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -70,16 +114,12 @@ class MessageBuffer {
|
|||||||
$this->frameBuffer ?: $this->frameBuffer = $this->newFrame();
|
$this->frameBuffer ?: $this->frameBuffer = $this->newFrame();
|
||||||
|
|
||||||
$this->frameBuffer->addBuffer($data);
|
$this->frameBuffer->addBuffer($data);
|
||||||
if (!$this->frameBuffer->isCoalesced()) {
|
|
||||||
return '';
|
|
||||||
}
|
|
||||||
|
|
||||||
$onMessage = $this->onMessage;
|
$onMessage = $this->onMessage;
|
||||||
$onControl = $this->onControl;
|
$onControl = $this->onControl;
|
||||||
|
|
||||||
$this->frameBuffer = $this->frameCheck($this->frameBuffer);
|
$this->frameBuffer = $this->frameCheck($this->frameBuffer);
|
||||||
|
|
||||||
$overflow = $this->frameBuffer->extractOverflow();
|
|
||||||
$this->frameBuffer->unMaskPayload();
|
$this->frameBuffer->unMaskPayload();
|
||||||
|
|
||||||
$opcode = $this->frameBuffer->getOpcode();
|
$opcode = $this->frameBuffer->getOpcode();
|
||||||
@ -108,8 +148,6 @@ class MessageBuffer {
|
|||||||
$onMessage($msgBuffer);
|
$onMessage($msgBuffer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return $overflow;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -230,4 +268,4 @@ class MessageBuffer {
|
|||||||
public function newCloseFrame($code, $reason = '') {
|
public function newCloseFrame($code, $reason = '') {
|
||||||
return $this->newFrame(pack('n', $code) . $reason, true, Frame::OP_CLOSE);
|
return $this->newFrame(pack('n', $code) . $reason, true, Frame::OP_CLOSE);
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user