Restructure Overhaul

Server accepts single Observable object (was Chain of Responsibility)
WebSocket is decorator of application implementing Observable
Observable interface returns Command pattern object
Interfaced all the things
Code is a mess
Unit tests are broken
This commit is contained in:
Chris Boden 2011-10-27 18:36:29 -04:00
parent ed1a35ff74
commit 1c0b8ed32d
13 changed files with 172 additions and 32 deletions

View File

@ -3,8 +3,8 @@ namespace Ratchet\Protocol;
use Ratchet\Server; use Ratchet\Server;
use Ratchet\Protocol\WebSocket\Client; use Ratchet\Protocol\WebSocket\Client;
use Ratchet\Protocol\WebSocket\Version; use Ratchet\Protocol\WebSocket\Version;
use Ratchet\Server\Message; use Ratchet\SocketInterface;
use Ratchet\Socket; use Ratchet\SocketObserver;
/** /**
* @link http://ca.php.net/manual/en/ref.http.php * @link http://ca.php.net/manual/en/ref.http.php
@ -20,8 +20,13 @@ class WebSocket implements ProtocolInterface {
*/ */
protected $_lookup; protected $_lookup;
public function __construct() { /**
*/
protected $_app;
public function __construct(SocketObserver $application) {
$this->_lookup = new \SplObjectStorage; $this->_lookup = new \SplObjectStorage;
$this->_app = $application;
} }
/** /**
@ -47,24 +52,40 @@ class WebSocket implements ProtocolInterface {
public function setUp(Server $server) { public function setUp(Server $server) {
$this->_server = $server; $this->_server = $server;
$this->_app->setUp($server);
} }
public function handleConnect(Socket $client) { public function onOpen(SocketInterface $conn) {
$this->_lookup[$client] = new Client; $this->_lookup[$conn] = new Client;
return $this->_app->onOpen($conn);
} }
public function handleMessage($message, Socket $from) { public function onRecv(SocketInterface $from, $msg) {
$headers = $this->getHeaders($message);
$client = $this->_lookup[$from]; $client = $this->_lookup[$from];
if (true !== $client->isHandshakeComplete()) { if (true !== $client->isHandshakeComplete()) {
$headers = $this->getHeaders($msg);
$header = $client->doHandshake($this->getVersion($headers)); $header = $client->doHandshake($this->getVersion($headers));
$from->write($header, strlen($header)); // $from->write($header, strlen($header));
$to = new \Ratchet\SocketCollection;
$to->enqueue($from);
$cmd = new \Ratchet\Server\Command\SendMessage($to);
$cmd->setMessage($header);
// call my decorated onRecv()
$this->_server->log('Returning handshake: ' . $header);
return $cmd;
} }
return $this->_app->onRecv($from, $msg);
} }
public function handleClose(Socket $client) { public function onClose(SocketInterface $conn) {
unset($this->_lookup[$client]); $cmd = $this->_app->onClose($conn);
unset($this->_lookup[$conn]);
return $cmd;
} }
/** /**

View File

@ -1,20 +1,13 @@
<?php <?php
namespace Ratchet; namespace Ratchet;
use Ratchet\Server; use Ratchet\Server;
use Ratchet\Server\Client; use Ratchet\SocketObserver;
use Ratchet\Server\Message;
interface ReceiverInterface { interface ReceiverInterface extends SocketObserver {
/** /**
* @return string * @return string
*/ */
function getName(); function getName();
function setUp(Server $server); function setUp(Server $server);
function handleConnect(Socket $client);
function handleMessage($message, Socket $from);
function handleClose(Socket $client);
} }

View File

@ -5,7 +5,11 @@ use Ratchet\Protocol\ProtocolInterface;
use Ratchet\Logging\LoggerInterface; use Ratchet\Logging\LoggerInterface;
use Ratchet\Logging\NullLogger; use Ratchet\Logging\NullLogger;
class Server implements ServerInterface { /**
* @todo Consider using _connections as master reference and passing iterator_to_array(_connections) to socket_select
* @todo Move SocketObserver methods to separate class, create, wrap class in __construct
*/
class Server implements SocketObserver {
/** /**
* The master socket, receives all connections * The master socket, receives all connections
* @type Socket * @type Socket
@ -33,12 +37,14 @@ class Server implements ServerInterface {
*/ */
protected $_log; protected $_log;
protected $_app = null;
/** /**
* @param Ratchet\Socket * @param Ratchet\Socket
* @param boolean True, enables debug mode and the server doesn't infiniate loop * @param boolean True, enables debug mode and the server doesn't infiniate loop
* @param Logging\LoggerInterface * @param Logging\LoggerInterface
*/ */
public function __construct(Socket $host, LoggerInterface $logger = null) { public function __construct(SocketInterface $host, ReceiverInterface $application, LoggerInterface $logger = null) {
$this->_master = $host; $this->_master = $host;
$socket = $host->getResource(); $socket = $host->getResource();
$this->_resources[] = $socket; $this->_resources[] = $socket;
@ -49,6 +55,9 @@ class Server implements ServerInterface {
$this->_log = $logger; $this->_log = $logger;
$this->_connections = new \ArrayIterator(array()); $this->_connections = new \ArrayIterator(array());
$this->_app = $application;
$this->_app->setUp($this);
} }
/** /**
@ -67,6 +76,8 @@ class Server implements ServerInterface {
/** /**
* @param ReceiverInterface * @param ReceiverInterface
* @return Server * @return Server
* @deprecated
* @todo Consider making server chain of responsibility, currently 1-1 relation w/ receivers
*/ */
public function attatchReceiver(ReceiverInterface $receiver) { public function attatchReceiver(ReceiverInterface $receiver) {
$receiver->setUp($this); $receiver->setUp($this);
@ -76,7 +87,7 @@ class Server implements ServerInterface {
} }
/** /**
* @return Socket * @return SocketInterface
*/ */
public function getMaster() { public function getMaster() {
return $this->_master; return $this->_master;
@ -101,9 +112,11 @@ class Server implements ServerInterface {
* @todo Should I make handling open/close/msg an application? * @todo Should I make handling open/close/msg an application?
*/ */
public function run($address = '127.0.0.1', $port = 1025) { public function run($address = '127.0.0.1', $port = 1025) {
/*
if (count($this->_receivers) == 0) { if (count($this->_receivers) == 0) {
throw new \RuntimeException("No receiver has been attached to the server"); throw new \RuntimeException("No receiver has been attached to the server");
} }
*/
set_time_limit(0); set_time_limit(0);
ob_implicit_flush(); ob_implicit_flush();
@ -125,7 +138,7 @@ class Server implements ServerInterface {
foreach($changed as $resource) { foreach($changed as $resource) {
if ($this->_master->getResource() === $resource) { if ($this->_master->getResource() === $resource) {
$this->onConnect($this->_master); $this->onOpen($this->_master);
} else { } else {
$conn = $this->_connections[$resource]; $conn = $this->_connections[$resource];
$data = null; $data = null;
@ -134,7 +147,7 @@ class Server implements ServerInterface {
if ($bytes == 0) { if ($bytes == 0) {
$this->onClose($conn); $this->onClose($conn);
} else { } else {
$this->onMessage($data, $conn); $this->onRecv($conn, $data);
// new Message // new Message
// $this->_receivers->handleMessage($msg, $conn); // $this->_receivers->handleMessage($msg, $conn);
@ -143,6 +156,8 @@ class Server implements ServerInterface {
} }
} catch (Exception $e) { } catch (Exception $e) {
$this->_log->error($e->getMessage()); $this->_log->error($e->getMessage());
} catch (\Exception $fuck) {
$this->_log->error('Big uh oh: ' . $e->getMessage());
} }
} while (true); } while (true);
@ -150,27 +165,32 @@ class Server implements ServerInterface {
// declare(ticks = 1); // declare(ticks = 1);
} }
protected function onConnect(Socket $master) { public function onOpen(SocketInterface $conn) {
$new_connection = clone $master; $new_connection = clone $conn;
$this->_resources[] = $new_connection->getResource(); $this->_resources[] = $new_connection->getResource();
$this->_connections[$new_connection->getResource()] = $new_connection; $this->_connections[$new_connection->getResource()] = $new_connection;
$this->_log->note('New connection, ' . count($this->_connections) . ' total'); $this->_log->note('New connection, ' . count($this->_connections) . ' total');
$this->_app->onOpen($new_connection)->execute();
// /here $this->_receivers->handleConnection($new_connection); // /here $this->_receivers->handleConnection($new_connection);
$this->tmpRIterator('handleConnect', $new_connection); // $this->tmpRIterator('handleConnect', $new_connection);
} }
protected function onMessage($msg, Socket $from) { public function onRecv(SocketInterface $from, $msg) {
$this->_log->note('New message "' . $msg . '"'); $this->_log->note('New message "' . $msg . '"');
$this->tmpRIterator('handleMessage', $msg, $from);
$this->_app->onRecv($from, $msg)->execute();
// $this->tmpRIterator('handleMessage', $msg, $from);
} }
protected function onClose(Socket $conn) { public function onClose(SocketInterface $conn) {
$resource = $conn->getResource(); $resource = $conn->getResource();
$this->tmpRIterator('handleClose', $conn); // $this->tmpRIterator('handleClose', $conn);
// $this->_receivers->handleDisconnect($conn); // $this->_receivers->handleDisconnect($conn);
$this->_app->onClose($conn)->execute();
unset($this->_connections[$resource]); unset($this->_connections[$resource]);
unset($this->_resources[array_search($resource, $this->_resources)]); unset($this->_resources[array_search($resource, $this->_resources)]);

View File

@ -0,0 +1,15 @@
<?php
namespace Ratchet\Server\Command;
use Ratchet\SocketCollection;
class CloseConnection implements CommandInterface {
protected $_sockets;
public function __construct(SocketCollection $sockets) {
$this->_sockets = $sockets;
}
function execute() {
$this->_sockets->close();
}
}

View File

@ -0,0 +1,9 @@
<?php
namespace Ratchet\Server\Command;
use Ratchet\SocketCollection;
interface CommandInterface {
function __construct(SocketCollection $sockets);
function execute();
}

View File

@ -0,0 +1,11 @@
<?php
namespace Ratchet\Server\Command;
use Ratchet\SocketCollection;
class Null implements CommandInterface {
public function __construct(SocketCollection $sockets) {
}
public function execute() {
}
}

View File

View File

View File

@ -0,0 +1,30 @@
<?php
namespace Ratchet\Server\Command;
use Ratchet\SocketCollection;
class SendMessage implements CommandInterface {
protected $_sockets;
protected $_message = '';
public function __construct(SocketCollection $sockets) {
$this->_sockets = $sockets;
}
public function setMessage($msg) {
$this->_message = (string)$msg;
}
public function getMessage() {
return $this->_message;
}
public function execute() {
if (empty($this->_message)) {
throw new \UnexpectedValueException("Message is empty");
}
foreach ($this->_sockets as $current) {
$current->write($this->_message, strlen($this->_message));
}
}
}

View File

@ -7,7 +7,7 @@ use Ratchet\Protocol\ProtocolInterface;
* @author Chris Boden <shout at chrisboden dot ca> * @author Chris Boden <shout at chrisboden dot ca>
* @todo Needs to be observable, Server needs to know when an applicaiton closes a connection * @todo Needs to be observable, Server needs to know when an applicaiton closes a connection
*/ */
class Socket { class Socket implements SocketInterface {
/** /**
* @type resource * @type resource
*/ */
@ -80,6 +80,14 @@ class Socket {
return $num; return $num;
} }
public function write($buffer, $length = 0) {
return $this->__call('write', array($buffer, $length));
}
public function close() {
return $this->__call('close', array());
}
/** /**
* @see http://ca3.php.net/manual/en/function.socket-recv.php * @see http://ca3.php.net/manual/en/function.socket-recv.php
* @param string * @param string

View File

@ -0,0 +1,13 @@
<?php
namespace Ratchet;
class SocketCollection extends \SplQueue {
public function __construct() {
// parent::__construct();
$this->setIteratorMode(static::IT_MODE_DELETE);
}
public function enqueue(SocketInterface $value) {
parent::enqueue($value);
}
}

View File

@ -0,0 +1,10 @@
<?php
namespace Ratchet;
interface SocketInterface {
function write($buffer, $length = 0);
function recv(&$buf, $len, $flags);
function close();
}

View File

@ -0,0 +1,10 @@
<?php
namespace Ratchet;
interface SocketObserver {
function onOpen(SocketInterface $conn);
function onRecv(SocketInterface $from, $msg);
function onClose(SocketInterface $conn);
}