Stability

Added onError hook to observable interface
Handling errors in proper places, no longer a catchall
Temporarily throwing errors on all non-message HyBi-10 frames ("fixes" FF breaking everything)
This commit is contained in:
Chris Boden 2011-11-10 21:23:31 -05:00
parent ad258e6eaa
commit 32d9dda703
6 changed files with 55 additions and 33 deletions

View File

@ -95,13 +95,9 @@ class WebSocket implements ProtocolInterface {
return $comp; return $comp;
} }
try { $msg = $client->getVersion()->unframe($msg);
$msg = $client->getVersion()->unframe($msg); if (is_array($msg)) { // temporary
if (is_array($msg)) { // temporary $msg = $msg['payload'];
$msg = $msg['payload'];
}
} catch (\UnexpectedValueException $e) {
return $this->_factory->newCommand('CloseConnection', $from);
} }
$cmds = $this->_app->onRecv($from, $msg); $cmds = $this->_app->onRecv($from, $msg);
@ -120,6 +116,10 @@ class WebSocket implements ProtocolInterface {
return $cmds; return $cmds;
} }
public function onError(SocketInterface $conn, \Exception $e) {
return $this->_app->onError($conn, $e);
}
/** /**
* @param string * @param string
*/ */

View File

@ -56,6 +56,7 @@ class HyBi10 implements VersionInterface {
switch($opcode) { switch($opcode) {
// continuation frame // continuation frame
case 0: case 0:
throw new \UnexpectedValueException("Opcode CONTINUATION not accepted yet");
$decodedData['type'] = 'text'; // incomplete $decodedData['type'] = 'text'; // incomplete
break; break;
@ -66,21 +67,25 @@ class HyBi10 implements VersionInterface {
// binary data frame // binary data frame
case 2: case 2:
throw new \UnexpectedValueException("Opcode BINARY not accepted yet");
$decodedData['type'] = 'binary'; $decodedData['type'] = 'binary';
break; break;
// connection close frame: // connection close frame:
case 8: case 8:
throw new \UnexpectedValueException("Opcode CLOSE not accepted yet");
$decodedData['type'] = 'close'; $decodedData['type'] = 'close';
break; break;
// ping frame: // ping frame:
case 9: case 9:
throw new \UnexpectedValueException("Opcode PING not accepted yet");
$decodedData['type'] = 'ping'; $decodedData['type'] = 'ping';
break; break;
// pong frame: // pong frame:
case 10: case 10:
throw new \UnexpectedValueException("Opcode PONG not accepted yet");
$decodedData['type'] = 'pong'; $decodedData['type'] = 'pong';
break; break;

View File

@ -11,11 +11,6 @@ use Ratchet\Command\CommandInterface;
* @todo With all these options for the server I should probably use a DIC * @todo With all these options for the server I should probably use a DIC
*/ */
class Server implements SocketObserver, \IteratorAggregate { class Server implements SocketObserver, \IteratorAggregate {
/**
* This is probably temporary
*/
const RECV_BYTES = 1024;
/** /**
* The master socket, receives all connections * The master socket, receives all connections
* @type Socket * @type Socket
@ -68,6 +63,8 @@ class Server implements SocketObserver, \IteratorAggregate {
* @todo Consider making the 4kb listener changable * @todo Consider making the 4kb listener changable
*/ */
public function run($address = '127.0.0.1', $port = 1025) { public function run($address = '127.0.0.1', $port = 1025) {
$recv_bytes = 1024;
set_time_limit(0); set_time_limit(0);
ob_implicit_flush(); ob_implicit_flush();
@ -83,27 +80,34 @@ class Server implements SocketObserver, \IteratorAggregate {
} }
do { do {
try { $changed = $this->_resources;
$changed = $this->_resources;
$num_changed = $this->_master->select($changed, $write = null, $except = null, null);
foreach($changed as $resource) { try {
$num_changed = $this->_master->select($changed, $write = null, $except = null, null);
} catch (Exception $e) {
// master had a problem?...what to do?
continue;
}
foreach($changed as $resource) {
try {
if ($this->_master->getResource() === $resource) { if ($this->_master->getResource() === $resource) {
$res = $this->onOpen($this->_master); $conn = $this->_master;
$res = $this->onOpen($conn);
} else { } else {
$conn = $this->_connections[$resource]; $conn = $this->_connections[$resource];
$data = $buf = ''; $data = $buf = '';
$bytes = $conn->recv($buf, static::RECV_BYTES, 0); $bytes = $conn->recv($buf, $recv_bytes, 0);
if ($bytes > 0) { if ($bytes > 0) {
$data = $buf; $data = $buf;
// This idea works* but... // This idea works* but...
// 1) A single DDOS attack will block the entire application (I think) // 1) A single DDOS attack will block the entire application (I think)
// 2) What if the last message in the frame is equal to RECV_BYTES? Would loop until another msg is sent // 2) What if the last message in the frame is equal to $recv_bytes? Would loop until another msg is sent
// Need to 1) proc_open the recv() calls. 2) ??? // Need to 1) proc_open the recv() calls. 2) ???
while ($bytes === static::RECV_BYTES) { while ($bytes === $recv_bytes) {
$bytes = $conn->recv($buf, static::RECV_BYTES, 0); $bytes = $conn->recv($buf, $recv_bytes, 0);
$data .= $buf; $data .= $buf;
} }
@ -112,22 +116,23 @@ class Server implements SocketObserver, \IteratorAggregate {
$res = $this->onClose($conn); $res = $this->onClose($conn);
} }
} }
} catch (Exception $se) {
$res = $this->onError($se->getSocket(), $se); // Just in case...but I don't think I need to do this
} catch (\Exception $e) {
$res = $this->onError($conn, $e);
}
while ($res instanceof CommandInterface) { while ($res instanceof CommandInterface) {
$res = $res->execute($this); try {
$new_res = $res->execute($this);
} catch (\Exception $e) {
// trigger new error
// $new_res = $this->onError($e->getSocket()); ???
// this is dangerous territory...could get in an infinte loop...Exception might not be Ratchet\Exception...$new_res could be ActionInterface|Composite|NULL...
} }
}
} catch (Exception $se) {
// Instead of logging error, will probably add/trigger onIOError/onError or something in SocketObserver
// temporary, move to application $res = $new_res;
if ($se->getCode() != 35) {
$close = new \Ratchet\Command\Action\CloseConnection($se->getSocket());
$close->execute($this);
} }
} catch (\Exception $e) {
// onError() - but can I determine which is/was the target Socket that threw the exception...?
// $conn->close() ???
} }
} while (true); } while (true);
} }
@ -154,4 +159,8 @@ class Server implements SocketObserver, \IteratorAggregate {
return $cmd; return $cmd;
} }
public function onError(SocketInterface $conn, \Exception $e) {
return $this->_app->onError($conn, $e);
}
} }

View File

@ -29,4 +29,6 @@ interface SocketObserver {
* @return Ratchet\Command\CommandInterface|null * @return Ratchet\Command\CommandInterface|null
*/ */
function onClose(SocketInterface $conn); function onClose(SocketInterface $conn);
function onError(SocketInterface $conn, \Exception $e);
} }

View File

@ -14,4 +14,7 @@ class Application implements SocketObserver {
public function onClose(SocketInterface $conn) { public function onClose(SocketInterface $conn) {
} }
public function onError(SocketInterface $conn, \Exception $e) {
}
} }

View File

@ -28,4 +28,7 @@ class Protocol implements ProtocolInterface {
public function onClose(SocketInterface $conn) { public function onClose(SocketInterface $conn) {
} }
public function onError(SocketInterface $conn, \Exception $e) {
}
} }