Communication between versions

HyBi-10 and Hixie-76 can now talk to each other!
This commit is contained in:
Chris Boden 2011-11-01 15:19:03 -04:00
parent 6a5c708775
commit a6073a87eb
12 changed files with 122 additions and 100 deletions

View File

@ -13,47 +13,48 @@ Re-use your application without changing any of its code just by wrapping it in
--- ---
###A Quick server example ###A quick server example
```php ```php
<?php <?php
namespace Me; namespace Me;
use Ratchet\SocketObserver, Ratchet\SocketInterface; use Ratchet\SocketObserver, Ratchet\SocketInterface;
use Ratchet\Socket, Ratchet\Server, Ratchet\Protocol\WebSocket; use Ratchet\Socket, Ratchet\Server, Ratchet\Protocol\WebSocket;
use Ratchet\SocketCollection, Ratchet\Command\SendMessage; use Ratchet\Command\Composite, Ratchet\Command\SendMessage;
/** /**
* Send any incoming messages to all connected clients (except sender) * Send any incoming messages to all connected clients (except sender)
*/ */
class Chat implements SocketObserver { class Chat implements SocketObserver {
protected $_clients; protected $_clients;
public function __construct() { public function __construct() {
$this->_clients = new \SplObjectStorage; $this->_clients = new \SplObjectStorage;
}
public function onOpen(SocketInterface $conn) {
$this->_clients->attach($conn);
}
public function onRecv(SocketInterface $from, $msg) {
$stack = new SocketCollection;
foreach ($this->_clients as $client) {
if ($from != $client) {
$stack->enqueue($client);
}
}
$command = new SendMessage($stack);
$command->setMessage($msg);
return $command;
}
public function onClose(SocketInterface $conn) {
$this->_clients->detach($conn);
}
} }
public function onOpen(SocketInterface $conn) {
$this->_clients->attach($conn);
}
public function onRecv(SocketInterface $from, $msg) {
$stack = new Composite;
foreach ($this->_clients as $client) {
if ($from != $client) {
$message = new SendMessage($client);
$message->setMessage($msg);
$stack->enqueue($message);
}
}
return $stack;
}
public function onClose(SocketInterface $conn) {
$this->_clients->detach($conn);
}
}
// Run the server application through the WebSocket protocol // Run the server application through the WebSocket protocol
$server = new Server(new Socket, new WebSocket(new Chat)); $server = new Server(new Socket, new WebSocket(new Chat));
$server->run('0.0.0.0', 80); $server->run('0.0.0.0', 80);

View File

@ -1,23 +1,21 @@
<?php <?php
namespace Ratchet\Command; namespace Ratchet\Command;
use Ratchet\SocketCollection; use Ratchet\SocketInterface;
/** /**
* Close the connection to the sockets passed in the constructor * Close the connection to the sockets passed in the constructor
*/ */
class CloseConnection implements CommandInterface { class CloseConnection implements CommandInterface {
/** /**
* @var SocketCollection * @var SocketInterface
*/ */
protected $_sockets; protected $_socket;
public function __construct(SocketCollection $sockets) { public function __construct(SocketInterface $sockets) {
$this->_sockets = $sockets; $this->_socket = $sockets;
} }
function execute() { function execute() {
foreach ($this->_sockets as $socket) { $this->_socket->close();
$socket->close();
}
} }
} }

View File

@ -1,17 +1,16 @@
<?php <?php
namespace Ratchet\Command; namespace Ratchet\Command;
use Ratchet\SocketCollection; use Ratchet\SocketInterface;
/** /**
* Socket implementation of the Command Pattern * Socket implementation of the Command Pattern
* User created applications are to return a Command to the server for execution * User created applications are to return a Command to the server for execution
* @todo Bad format - very limited
*/ */
interface CommandInterface { interface CommandInterface {
/** /**
* Pass the Sockets to execute the command on * Pass the Sockets to execute the command on
*/ */
function __construct(SocketCollection $sockets); function __construct(SocketInterface $socket);
/** /**
* The Server class will call the execution * The Server class will call the execution

View File

@ -0,0 +1,39 @@
<?php
namespace Ratchet\Command;
use Ratchet\SocketInterface;
class Composite extends \SplQueue {
/**
* @param string
* @param Ratchet\SocketInterface
* @return CommandInterface
*/
public function create($name, SocketInterface $socket) {
$class = __NAMESPACE__ . "\\{$name}\\";
if (!class_exists($class)) {
throw new \UnexpectedValueException("Command {$name} not found");
}
$cmd = new $class($socket);
if ($cmd instanceof CommandInterface) {
throw new RuntimeException("{$name} is not a valid command");
}
$this->enqueue($cmd);
return $cmd;
}
public function enqueue(CommandInterface $command) {
return parent::enqueue($command);
}
public function execute() {
$this->setIteratorMode(static::IT_MODE_DELETE);
foreach ($this as $command) {
$command->execute();
}
}
}

View File

@ -1,12 +1,12 @@
<?php <?php
namespace Ratchet\Command; namespace Ratchet\Command;
use Ratchet\SocketCollection; use Ratchet\SocketInterface;
/** /**
* Null pattern - execution does nothing, something needs to be passed back though * Null pattern - execution does nothing, something needs to be passed back though
*/ */
class Null implements CommandInterface { class Null implements CommandInterface {
public function __construct(SocketCollection $sockets) { public function __construct(SocketInterface $socket) {
} }
public function execute() { public function execute() {

View File

@ -1,12 +1,12 @@
<?php <?php
namespace Ratchet\Command; namespace Ratchet\Command;
use Ratchet\SocketCollection; use Ratchet\SocketInterface;
/** /**
* @todo Move this command to the WebSocket protocol namespace * @todo Move this command to the WebSocket protocol namespace
*/ */
class Ping implements CommandInterface { class Ping implements CommandInterface {
public function __construct(SocketCollection $sockets) { public function __construct(SocketInterface $socket) {
} }
public function execute() { public function execute() {

View File

@ -1,12 +1,12 @@
<?php <?php
namespace Ratchet\Command; namespace Ratchet\Command;
use Ratchet\SocketCollection; use Ratchet\SocketInterface;
/** /**
* @todo Move this command to the WebSocket protocol namespace * @todo Move this command to the WebSocket protocol namespace
*/ */
class Pong implements CommandInterface { class Pong implements CommandInterface {
public function __construct(SocketCollection $sockets) { public function __construct(SocketInterface $socket) {
} }
public function execute() { public function execute() {

View File

@ -1,20 +1,20 @@
<?php <?php
namespace Ratchet\Command; namespace Ratchet\Command;
use Ratchet\SocketCollection; use Ratchet\SocketInterface;
class Runtime implements CommandInterface { class Runtime implements CommandInterface {
/** /**
* @var SocketCollection * @var SocketInterface
*/ */
protected $_sockets; protected $_socket;
/** /**
* @var Closure * @var Closure
*/ */
protected $_command = null; protected $_command = null;
public function __construct(SocketCollection $sockets) { public function __construct(SocketInterface $socket) {
$this->_socket = $sockets; $this->_socket = $socket;
} }
/** /**
@ -26,8 +26,6 @@ class Runtime implements CommandInterface {
} }
public function execute() { public function execute() {
foreach ($this->_sockets as $socket) { return call_user_func($this->_command, $socket);
return call_user_func($this->_command, $socket);
}
} }
} }

View File

@ -1,23 +1,23 @@
<?php <?php
namespace Ratchet\Command; namespace Ratchet\Command;
use Ratchet\SocketCollection; use Ratchet\SocketInterface;
/** /**
* Send text back to the client end of the socket(s) * Send text back to the client end of the socket(s)
*/ */
class SendMessage implements CommandInterface { class SendMessage implements CommandInterface {
/** /**
* @var SocketCollection * @var SocketInterface
*/ */
protected $_sockets; public $_socket;
/** /**
* @var string * @var string
*/ */
protected $_message = ''; protected $_message = '';
public function __construct(SocketCollection $sockets) { public function __construct(SocketInterface $socket) {
$this->_sockets = $sockets; $this->_socket = $socket;
} }
/** /**
@ -45,8 +45,6 @@ class SendMessage implements CommandInterface {
throw new \UnexpectedValueException("Message is empty"); throw new \UnexpectedValueException("Message is empty");
} }
foreach ($this->_sockets as $current) { $this->_socket->write($this->_message, strlen($this->_message));
$current->write($this->_message, strlen($this->_message));
}
} }
} }

View File

@ -8,6 +8,7 @@ use Ratchet\SocketInterface;
use Ratchet\SocketObserver; use Ratchet\SocketObserver;
use Ratchet\Command\CommandInterface; use Ratchet\Command\CommandInterface;
use Ratchet\Command\SendMessage; use Ratchet\Command\SendMessage;
use Ratchet\Command\Composite;
/** /**
* The adapter to handle WebSocket requests/responses * The adapter to handle WebSocket requests/responses
@ -16,7 +17,6 @@ use Ratchet\Command\SendMessage;
* @todo Make sure this works both ways (client/server) as stack needs to exist on client for framing * @todo Make sure this works both ways (client/server) as stack needs to exist on client for framing
* @todo Clean up Client/Version stuff. This should be a factory making single instances of Version classes, implement chain of reponsibility for version - client should implement an interface? * @todo Clean up Client/Version stuff. This should be a factory making single instances of Version classes, implement chain of reponsibility for version - client should implement an interface?
* @todo Make sure all SendMessage Commands are framed, not just ones received from onRecv * @todo Make sure all SendMessage Commands are framed, not just ones received from onRecv
* @todo Logic is flawed with Command/SocketCollection and framing - framing is done based on the protocol version of the received, not individual receivers...
*/ */
class WebSocket implements ProtocolInterface { class WebSocket implements ProtocolInterface {
/** /**
@ -77,12 +77,12 @@ class WebSocket implements ProtocolInterface {
$header = $response; $header = $response;
} }
$to = new \Ratchet\SocketCollection; $cmds = new Composite;
$to->enqueue($from); $mess = new SendMessage($from);
$cmd = new \Ratchet\Command\SendMessage($to); $mess->setMessage($header);
$cmd->setMessage($header); $cmds->enqueue($mess);
return $cmd; return $cmds;
} }
try { try {
@ -91,19 +91,26 @@ class WebSocket implements ProtocolInterface {
$msg = $msg['payload']; $msg = $msg['payload'];
} }
} catch (\UnexpectedValueException $e) { } catch (\UnexpectedValueException $e) {
$to = new \Ratchet\SocketCollection; $cmd = new Composite;
$to->enqueue($from); $close = new \Ratchet\Command\Close($from);
$cmd = new \Ratchet\Command\Close($to); $cmd->enqueue($close);
return $cmd; return $cmd;
} }
$cmd = $this->_app->onRecv($from, $msg); $cmds = $this->_app->onRecv($from, $msg);
if ($cmd instanceof SendMessage) { if ($cmds instanceof Composite) {
$cmd->setMessage($client->getVersion()->frame($cmd->getMessage())); foreach ($cmds as $cmd) {
if ($cmd instanceof SendMessage) {
$sock = $cmd->_socket;
$clnt = $this->_clients[$sock];
$cmd->setMessage($clnt->getVersion()->frame($cmd->getMessage()));
}
}
} }
return $cmd; return $cmds;
} }
/** /**

View File

@ -4,7 +4,7 @@ use Ratchet\Server\Aggregator;
use Ratchet\Protocol\ProtocolInterface; use Ratchet\Protocol\ProtocolInterface;
use Ratchet\Logging\LoggerInterface; use Ratchet\Logging\LoggerInterface;
use Ratchet\Logging\NullLogger; use Ratchet\Logging\NullLogger;
use Ratchet\Command\CommandInterface; use Ratchet\Command\Composite;
/** /**
* Creates an open-ended socket to listen on a port for incomming connections. Events are delegated through this to attached applications * Creates an open-ended socket to listen on a port for incomming connections. Events are delegated through this to attached applications
@ -108,7 +108,7 @@ class Server implements SocketObserver, \IteratorAggregate {
} }
} }
if ($res instanceof CommandInterface) { if ($res instanceof Composite) {
$res->execute(); $res->execute();
} }
} }

View File

@ -1,18 +0,0 @@
<?php
namespace Ratchet;
/**
* A self-deprecating queue that can only hold Socket objects
*/
class SocketCollection extends \SplQueue {
public function __construct() {
$this->setIteratorMode(static::IT_MODE_DELETE);
}
/**
* @param SocketInterface
*/
public function enqueue(SocketInterface $value) {
parent::enqueue($value);
}
}