From 1c0b8ed32d645bc99e2b8645aa7e98db6ff77375 Mon Sep 17 00:00:00 2001 From: Chris Boden Date: Thu, 27 Oct 2011 18:36:29 -0400 Subject: [PATCH] Restructure Overhaul Server accepts single Observable object (was Chain of Responsibility) WebSocket is decorator of application implementing Observable Observable interface returns Command pattern object Interfaced all the things Code is a mess Unit tests are broken --- lib/Ratchet/Protocol/WebSocket.php | 41 ++++++++++++----- lib/Ratchet/ReceiverInterface.php | 11 +---- lib/Ratchet/Server.php | 44 ++++++++++++++----- .../Server/Command/CloseConnection.php | 15 +++++++ .../Server/Command/CommandInterface.php | 9 ++++ lib/Ratchet/Server/Command/Null.php | 11 +++++ lib/Ratchet/Server/Command/Ping.php | 0 lib/Ratchet/Server/Command/Pong.php | 0 lib/Ratchet/Server/Command/SendMessage.php | 30 +++++++++++++ lib/Ratchet/Socket.php | 10 ++++- lib/Ratchet/SocketCollection.php | 13 ++++++ lib/Ratchet/SocketInterface.php | 10 +++++ lib/Ratchet/SocketObserver.php | 10 +++++ 13 files changed, 172 insertions(+), 32 deletions(-) create mode 100644 lib/Ratchet/Server/Command/CloseConnection.php create mode 100644 lib/Ratchet/Server/Command/CommandInterface.php create mode 100644 lib/Ratchet/Server/Command/Null.php create mode 100644 lib/Ratchet/Server/Command/Ping.php create mode 100644 lib/Ratchet/Server/Command/Pong.php create mode 100644 lib/Ratchet/Server/Command/SendMessage.php create mode 100644 lib/Ratchet/SocketCollection.php create mode 100644 lib/Ratchet/SocketInterface.php create mode 100644 lib/Ratchet/SocketObserver.php diff --git a/lib/Ratchet/Protocol/WebSocket.php b/lib/Ratchet/Protocol/WebSocket.php index 7d007bd..b68dce4 100644 --- a/lib/Ratchet/Protocol/WebSocket.php +++ b/lib/Ratchet/Protocol/WebSocket.php @@ -3,8 +3,8 @@ namespace Ratchet\Protocol; use Ratchet\Server; use Ratchet\Protocol\WebSocket\Client; use Ratchet\Protocol\WebSocket\Version; -use Ratchet\Server\Message; -use Ratchet\Socket; +use Ratchet\SocketInterface; +use Ratchet\SocketObserver; /** * @link http://ca.php.net/manual/en/ref.http.php @@ -20,8 +20,13 @@ class WebSocket implements ProtocolInterface { */ protected $_lookup; - public function __construct() { + /** + */ + protected $_app; + + public function __construct(SocketObserver $application) { $this->_lookup = new \SplObjectStorage; + $this->_app = $application; } /** @@ -47,24 +52,40 @@ class WebSocket implements ProtocolInterface { public function setUp(Server $server) { $this->_server = $server; + $this->_app->setUp($server); } - public function handleConnect(Socket $client) { - $this->_lookup[$client] = new Client; + public function onOpen(SocketInterface $conn) { + $this->_lookup[$conn] = new Client; + return $this->_app->onOpen($conn); } - public function handleMessage($message, Socket $from) { - $headers = $this->getHeaders($message); + public function onRecv(SocketInterface $from, $msg) { $client = $this->_lookup[$from]; if (true !== $client->isHandshakeComplete()) { + $headers = $this->getHeaders($msg); $header = $client->doHandshake($this->getVersion($headers)); - $from->write($header, strlen($header)); +// $from->write($header, strlen($header)); + $to = new \Ratchet\SocketCollection; + $to->enqueue($from); + $cmd = new \Ratchet\Server\Command\SendMessage($to); + $cmd->setMessage($header); + + // call my decorated onRecv() + +$this->_server->log('Returning handshake: ' . $header); + + return $cmd; } + + return $this->_app->onRecv($from, $msg); } - public function handleClose(Socket $client) { - unset($this->_lookup[$client]); + public function onClose(SocketInterface $conn) { + $cmd = $this->_app->onClose($conn); + unset($this->_lookup[$conn]); + return $cmd; } /** diff --git a/lib/Ratchet/ReceiverInterface.php b/lib/Ratchet/ReceiverInterface.php index 8f2d3b3..bc5aae6 100644 --- a/lib/Ratchet/ReceiverInterface.php +++ b/lib/Ratchet/ReceiverInterface.php @@ -1,20 +1,13 @@ _master = $host; $socket = $host->getResource(); $this->_resources[] = $socket; @@ -49,6 +55,9 @@ class Server implements ServerInterface { $this->_log = $logger; $this->_connections = new \ArrayIterator(array()); + + $this->_app = $application; + $this->_app->setUp($this); } /** @@ -67,6 +76,8 @@ class Server implements ServerInterface { /** * @param ReceiverInterface * @return Server + * @deprecated + * @todo Consider making server chain of responsibility, currently 1-1 relation w/ receivers */ public function attatchReceiver(ReceiverInterface $receiver) { $receiver->setUp($this); @@ -76,7 +87,7 @@ class Server implements ServerInterface { } /** - * @return Socket + * @return SocketInterface */ public function getMaster() { return $this->_master; @@ -101,9 +112,11 @@ class Server implements ServerInterface { * @todo Should I make handling open/close/msg an application? */ public function run($address = '127.0.0.1', $port = 1025) { +/* if (count($this->_receivers) == 0) { throw new \RuntimeException("No receiver has been attached to the server"); } +*/ set_time_limit(0); ob_implicit_flush(); @@ -125,7 +138,7 @@ class Server implements ServerInterface { foreach($changed as $resource) { if ($this->_master->getResource() === $resource) { - $this->onConnect($this->_master); + $this->onOpen($this->_master); } else { $conn = $this->_connections[$resource]; $data = null; @@ -134,7 +147,7 @@ class Server implements ServerInterface { if ($bytes == 0) { $this->onClose($conn); } else { - $this->onMessage($data, $conn); + $this->onRecv($conn, $data); // new Message // $this->_receivers->handleMessage($msg, $conn); @@ -143,6 +156,8 @@ class Server implements ServerInterface { } } catch (Exception $e) { $this->_log->error($e->getMessage()); + } catch (\Exception $fuck) { + $this->_log->error('Big uh oh: ' . $e->getMessage()); } } while (true); @@ -150,27 +165,32 @@ class Server implements ServerInterface { // declare(ticks = 1); } - protected function onConnect(Socket $master) { - $new_connection = clone $master; + public function onOpen(SocketInterface $conn) { + $new_connection = clone $conn; $this->_resources[] = $new_connection->getResource(); $this->_connections[$new_connection->getResource()] = $new_connection; $this->_log->note('New connection, ' . count($this->_connections) . ' total'); + $this->_app->onOpen($new_connection)->execute(); // /here $this->_receivers->handleConnection($new_connection); - $this->tmpRIterator('handleConnect', $new_connection); +// $this->tmpRIterator('handleConnect', $new_connection); } - protected function onMessage($msg, Socket $from) { + public function onRecv(SocketInterface $from, $msg) { $this->_log->note('New message "' . $msg . '"'); - $this->tmpRIterator('handleMessage', $msg, $from); + + $this->_app->onRecv($from, $msg)->execute(); +// $this->tmpRIterator('handleMessage', $msg, $from); } - protected function onClose(Socket $conn) { + public function onClose(SocketInterface $conn) { $resource = $conn->getResource(); - $this->tmpRIterator('handleClose', $conn); +// $this->tmpRIterator('handleClose', $conn); // $this->_receivers->handleDisconnect($conn); + $this->_app->onClose($conn)->execute(); + unset($this->_connections[$resource]); unset($this->_resources[array_search($resource, $this->_resources)]); diff --git a/lib/Ratchet/Server/Command/CloseConnection.php b/lib/Ratchet/Server/Command/CloseConnection.php new file mode 100644 index 0000000..999943a --- /dev/null +++ b/lib/Ratchet/Server/Command/CloseConnection.php @@ -0,0 +1,15 @@ +_sockets = $sockets; + } + + function execute() { + $this->_sockets->close(); + } +} \ No newline at end of file diff --git a/lib/Ratchet/Server/Command/CommandInterface.php b/lib/Ratchet/Server/Command/CommandInterface.php new file mode 100644 index 0000000..30c9b16 --- /dev/null +++ b/lib/Ratchet/Server/Command/CommandInterface.php @@ -0,0 +1,9 @@ +_sockets = $sockets; + } + + public function setMessage($msg) { + $this->_message = (string)$msg; + } + + public function getMessage() { + return $this->_message; + } + + public function execute() { + if (empty($this->_message)) { + throw new \UnexpectedValueException("Message is empty"); + } + + foreach ($this->_sockets as $current) { + $current->write($this->_message, strlen($this->_message)); + } + } +} \ No newline at end of file diff --git a/lib/Ratchet/Socket.php b/lib/Ratchet/Socket.php index ddbbe63..5b5d7d6 100644 --- a/lib/Ratchet/Socket.php +++ b/lib/Ratchet/Socket.php @@ -7,7 +7,7 @@ use Ratchet\Protocol\ProtocolInterface; * @author Chris Boden * @todo Needs to be observable, Server needs to know when an applicaiton closes a connection */ -class Socket { +class Socket implements SocketInterface { /** * @type resource */ @@ -80,6 +80,14 @@ class Socket { return $num; } + public function write($buffer, $length = 0) { + return $this->__call('write', array($buffer, $length)); + } + + public function close() { + return $this->__call('close', array()); + } + /** * @see http://ca3.php.net/manual/en/function.socket-recv.php * @param string diff --git a/lib/Ratchet/SocketCollection.php b/lib/Ratchet/SocketCollection.php new file mode 100644 index 0000000..d1934c3 --- /dev/null +++ b/lib/Ratchet/SocketCollection.php @@ -0,0 +1,13 @@ +setIteratorMode(static::IT_MODE_DELETE); + } + + public function enqueue(SocketInterface $value) { + parent::enqueue($value); + } +} \ No newline at end of file diff --git a/lib/Ratchet/SocketInterface.php b/lib/Ratchet/SocketInterface.php new file mode 100644 index 0000000..57d672a --- /dev/null +++ b/lib/Ratchet/SocketInterface.php @@ -0,0 +1,10 @@ +