Socket Proxy

Replaced passing SocketInterface everywhere with a proxy object
This commit is contained in:
Chris Boden 2011-11-14 15:56:30 -05:00
parent 551888aa3c
commit f3c7dd4d7f
20 changed files with 220 additions and 154 deletions

View File

@ -1,12 +1,44 @@
<?php
namespace Ratchet\Application;
use Ratchet\ObserverInterface;
//use Ratchet\ObserverInterface;
use Ratchet\Resource\Connection;
interface ApplicationInterface extends ObserverInterface {
interface ApplicationInterface /*extends ObserverInterface*/ {
/**
* Decorator pattern
* @param Ratchet\ObserverInterface Application to wrap in protocol
* @throws UnexpectedValueException
*/
public function __construct(ApplicationInterface $app = null);
/**
* When a new connection is opened it will be passed to this method
* @param SocketInterface The socket/connection that just connected to your application
* @return Ratchet\Resource\Command\CommandInterface|null
*/
function onOpen(Connection $conn);
/**
* Triggered when a client sends data through the socket
* @param SocketInterface The socket/connection that sent the message to your application
* @param string The message received
* @return Ratchet\Resource\Command\CommandInterface|null
*/
function onRecv(Connection $from, $msg);
/**
* This is called before or after a socket is closed (depends on how it's closed). SendMessage to $conn will not result in an error if it has already been closed.
* @param SocketInterface The socket/connection that is closing/closed
* @return Ratchet\Resource\Command\CommandInterface|null
*/
function onClose(Connection $conn);
/**
* If there is an error with one of the sockets, or somewhere in the application where an Exception is thrown,
* the Exception is sent back down the stack, handled by the Server and bubbled back up the application through this method
* @param SocketInterface
* @param \Exception
* @return Ratchet\Resource\Command\CommandInterface|null
*/
function onError(Connection $conn, \Exception $e);
}

View File

@ -2,6 +2,7 @@
namespace Ratchet\Application\Server;
use Ratchet\Application\ApplicationInterface;
use Ratchet\SocketInterface;
use Ratchet\Resource\Connection;
use Ratchet\Resource\Command\CommandInterface;
/**
@ -11,24 +12,18 @@ use Ratchet\Resource\Command\CommandInterface;
* @todo With all these options for the server I should probably use a DIC
*/
class App implements ApplicationInterface {
/**
* The master socket, receives all connections
* @var Socket
*/
protected $_master = null;
/**
* @var array of Socket Resources
*/
protected $_resources = array();
/**
* @var ArrayIterator of Resouces (Socket type) as keys, Ratchet\Socket as values
* @var array of resources/Connections
*/
protected $_connections;
protected $_connections = array();
/**
* @var Ratchet\ObserverInterface
* @var Ratchet\Application\ApplicationInterface
* Maybe temporary?
*/
protected $_app;
@ -41,8 +36,7 @@ class App implements ApplicationInterface {
throw new \UnexpectedValueException("Server requires an application to run off of");
}
$this->_app = $application;
$this->_connections = new \ArrayIterator(array());
$this->_app = $application;
}
/*
@ -54,7 +48,7 @@ class App implements ApplicationInterface {
* @todo Consider making the 4kb listener changable
*/
public function run(SocketInterface $host, $address = '127.0.0.1', $port = 1025) {
$this->_master = $host;
$this->_connections[$host->getResource()] = new Connection($host);
$this->_resources[] = $host->getResource();
$recv_bytes = 1024;
@ -62,22 +56,22 @@ class App implements ApplicationInterface {
set_time_limit(0);
ob_implicit_flush();
$this->_master->set_nonblock();
$host->set_nonblock();
declare(ticks = 1);
if (false === ($this->_master->bind($address, (int)$port))) {
throw new Exception($this->_master);
if (false === ($host->bind($address, (int)$port))) {
throw new Exception($host);
}
if (false === ($this->_master->listen())) {
throw new Exception($this->_master);
if (false === ($host->listen())) {
throw new Exception($host);
}
do {
$changed = $this->_resources;
try {
$num_changed = $this->_master->select($changed, $write = null, $except = null, null);
$num_changed = $host->select($changed, $write = null, $except = null, null);
} catch (Exception $e) {
// master had a problem?...what to do?
continue;
@ -85,14 +79,13 @@ class App implements ApplicationInterface {
foreach($changed as $resource) {
try {
if ($this->_master->getResource() === $resource) {
$conn = $this->_master;
$res = $this->onOpen($conn);
} else {
$conn = $this->_connections[$resource];
$data = $buf = '';
$conn = $this->_connections[$resource];
$bytes = $conn->recv($buf, $recv_bytes, 0);
if ($host->getResource() === $resource) {
$res = $this->onOpen($conn);
} else {
$data = $buf = '';
$bytes = $conn->getSocket()->recv($buf, $recv_bytes, 0);
if ($bytes > 0) {
$data = $buf;
@ -134,30 +127,31 @@ class App implements ApplicationInterface {
} while (true);
}
public function onOpen(SocketInterface $conn) {
$new_connection = clone $conn;
$this->_resources[] = $new_connection->getResource();
$this->_connections[$new_connection->getResource()] = $new_connection;
public function onOpen(Connection $conn) {
$new_socket = clone $conn->getSocket();
$new_connection = new Connection($new_socket);
$this->_resources[] = $new_connection->getSocket()->getResource();
$this->_connections[$new_connection->getSocket()->getResource()] = $new_connection;
return $this->_app->onOpen($new_connection);
}
public function onRecv(SocketInterface $from, $msg) {
public function onRecv(Connection $from, $msg) {
return $this->_app->onRecv($from, $msg);
}
public function onClose(SocketInterface $conn) {
$resource = $conn->getResource();
public function onClose(Connection $conn) {
$resource = $conn->getSocket()->getResource();
$cmd = $this->_app->onClose($conn);
unset($this->_connections[$resource]);
unset($this->_resources[array_search($resource, $this->_resources)]);
unset($this->_connections[$resource], $this->_resources[array_search($resource, $this->_resources)]);
return $cmd;
}
public function onError(SocketInterface $conn, \Exception $e) {
public function onError(Connection $conn, \Exception $e) {
return $this->_app->onError($conn, $e);
}
}

View File

@ -2,9 +2,9 @@
namespace Ratchet\Application\WebSocket;
use Ratchet\Application\WebSocket\Client;
use Ratchet\Application\WebSocket\VersionInterface;
use Ratchet\SocketInterface;
use Ratchet\Application\ApplicationInterface;
use Ratchet\Application\ConfiguratorInterface;
use Ratchet\Resource\Connection;
use Ratchet\Resource\Command\Factory;
use Ratchet\Resource\Command\CommandInterface;
use Ratchet\Resource\Command\Action\SendMessage;
@ -70,11 +70,11 @@ class App implements ApplicationInterface, ConfiguratorInterface {
);
}
public function onOpen(SocketInterface $conn) {
public function onOpen(Connection $conn) {
$this->_clients[$conn] = new Client;
}
public function onRecv(SocketInterface $from, $msg) {
public function onRecv(Connection $from, $msg) {
$client = $this->_clients[$from];
if (true !== $client->isHandshakeComplete()) {
$response = $client->setVersion($this->getVersion($msg))->doHandshake($msg);
@ -109,7 +109,7 @@ class App implements ApplicationInterface, ConfiguratorInterface {
return $this->prepareCommand($cmds);
}
public function onClose(SocketInterface $conn) {
public function onClose(Connection $conn) {
$cmds = $this->prepareCommand($this->_app->onClose($conn));
// $cmds = new Composite if null
@ -121,7 +121,7 @@ class App implements ApplicationInterface, ConfiguratorInterface {
return $cmds;
}
public function onError(SocketInterface $conn, \Exception $e) {
public function onError(Connection $conn, \Exception $e) {
return $this->_app->onError($conn, $e);
}
@ -138,7 +138,7 @@ class App implements ApplicationInterface, ConfiguratorInterface {
*/
protected function prepareCommand(CommandInterface $command = null) {
if ($command instanceof SendMessage) {
$version = $this->_clients[$command->getSocket()]->getVersion();
$version = $this->_clients[$command->getConnection()]->getVersion();
return $command->setMessage($version->frame($command->getMessage()));
}

View File

@ -3,10 +3,8 @@ namespace Ratchet;
/**
* Observable/Observer design pattern interface for handing events on a socket
* @todo Consider an onException method. Since server is running its own loop the app currently doesn't know when a problem is handled
* @todo Consider an onDisconnect method for a server-side close()'ing of a connection - onClose would be client side close()
* @todo Consider adding __construct(ObserverInterface $decorator = null) - on Server move Socket as parameter to run()
* @todo Does this belong in \Ratchet\Server\?
* @todo Is this interface needed anymore?
*/
interface ObserverInterface {
/**

View File

@ -1,8 +1,8 @@
<?php
namespace Ratchet\Resource\Command\Action;
use Ratchet\Resource\Command\ActionTemplate;
use Ratchet\ObserverInterface;
use Ratchet\SocketInterface;
use Ratchet\Application\ApplicationInterface;
use Ratchet\Resource\Connection;
use Ratchet\Resource\Command\CommandInterface;
use Ratchet\Resource\Command\Composite;
@ -10,23 +10,23 @@ use Ratchet\Resource\Command\Composite;
* Close the connection to the sockets passed in the constructor
*/
class CloseConnection extends ActionTemplate {
function execute(ObserverInterface $scope = null) {
function execute(ApplicationInterface $scope = null) {
// All this code allows an application to have its onClose method executed before the socket is actually closed
$ret = $scope->onClose($this->getSocket());
$ret = $scope->onClose($this->getConnection());
if ($ret instanceof CommandInterface) {
$comp = new Composite;
$comp->enqueue($ret);
$rt = new Runtime($this->getSocket());
$rt->setCommand(function(SocketInterface $socket, ObserverInterface $scope) {
$socket->close();
$rt = new Runtime($this->getConnection());
$rt->setCommand(function(Connection $conn, ApplicationInterface $scope) {
$conn->getSocket()->close();
});
$comp->enqueue($rt);
return $comp;
}
$this->getSocket()->close();
$this->getConnection()->getSocket()->close();
}
}

View File

@ -1,7 +1,7 @@
<?php
namespace Ratchet\Resource\Command\Action;
use Ratchet\Resource\Command\ActionTemplate;
use Ratchet\ObserverInterface;
use Ratchet\Application\ApplicationInterface;
class Runtime extends ActionTemplate {
/**
@ -10,14 +10,14 @@ class Runtime extends ActionTemplate {
protected $_command = null;
/**
* Your closure should accept two parameters (\Ratchet\SocketInterface, \Ratchet\ObserverInterface) parameter and return a CommandInterface or NULL
* Your closure should accept two parameters (\Ratchet\Resource\Connection, \Ratchet\Application\ApplicationInterface) parameter and return a CommandInterface or NULL
* @param Closure Your closure/lambda to execute when the time comes
*/
public function setCommand(\Closure $callback) {
$this->_command = $callback;
}
public function execute(ObserverInterface $scope = null) {
return call_user_func($this->_command, $this->getSocket(), $scope);
public function execute(ApplicationInterface $scope = null) {
return call_user_func($this->_command, $this->getConnection(), $scope);
}
}

View File

@ -1,7 +1,8 @@
<?php
namespace Ratchet\Resource\Command\Action;
use Ratchet\Resource\Command\ActionTemplate;
use Ratchet\ObserverInterface;
//use Ratchet\ObserverInterface;
use Ratchet\Application\ApplicationInterface;
/**
* Send text back to the client end of the socket(s)
@ -33,11 +34,11 @@ class SendMessage extends ActionTemplate {
/**
* @throws \UnexpectedValueException if a message was not set with setMessage()
*/
public function execute(ObserverInterface $scope = null) {
public function execute(ApplicationInterface $scope = null) {
if (empty($this->_message)) {
throw new \UnexpectedValueException("Message is empty");
}
$this->getSocket()->write($this->_message, strlen($this->_message));
$this->getConnection()->getSocket()->write($this->_message, strlen($this->_message));
}
}

View File

@ -1,6 +1,6 @@
<?php
namespace Ratchet\Resource\Command;
use Ratchet\SocketInterface;
use Ratchet\Resource\Connection;
/**
* A single command tied to 1 socket connection
@ -8,12 +8,12 @@ use Ratchet\SocketInterface;
interface ActionInterface extends CommandInterface {
/**
* Pass the Sockets to execute the command on
* @param Ratchet\SocketInterface
* @param Ratchet\Resource\Connection
*/
function __construct(SocketInterface $socket);
function __construct(Connection $conn);
/**
* @return Ratchet\SocketInterface
* @return Ratchet\Command\Connection
*/
function getSocket();
function getConnection();
}

View File

@ -1,18 +1,18 @@
<?php
namespace Ratchet\Resource\Command;
use Ratchet\SocketInterface;
use Ratchet\Resource\Connection;
abstract class ActionTemplate implements ActionInterface {
/**
* @var Ratchet\SocketInterface
* @var Ratchet\Resource\Connection
*/
protected $_socket;
protected $_conn;
public function __construct(SocketInterface $socket) {
$this->_socket = $socket;
public function __construct(Connection $conn) {
$this->_conn = $conn;
}
public function getSocket() {
return $this->_socket;
public function getConnection() {
return $this->_conn;
}
}

View File

@ -1,6 +1,7 @@
<?php
namespace Ratchet\Resource\Command;
use Ratchet\ObserverInterface;
//use Ratchet\ObserverInterface;
use Ratchet\Application\ApplicationInterface;
/**
* Socket implementation of the Command Pattern
@ -12,5 +13,5 @@ interface CommandInterface {
* @param Ratchet\ObserverInterface Scope to execute the command under
* @return CommandInterface|NULL
*/
function execute(ObserverInterface $scope = null);
function execute(ApplicationInterface $scope = null);
}

View File

@ -1,6 +1,7 @@
<?php
namespace Ratchet\Resource\Command;
use Ratchet\ObserverInterface;
//use Ratchet\ObserverInterface;
use Ratchet\Application\ApplicationInterface;
class Composite extends \SplQueue implements CommandInterface {
/**
@ -22,7 +23,7 @@ class Composite extends \SplQueue implements CommandInterface {
}
}
public function execute(ObserverInterface $scope = null) {
public function execute(ApplicationInterface $scope = null) {
$this->setIteratorMode(static::IT_MODE_DELETE);
$recursive = new self;

View File

@ -1,6 +1,6 @@
<?php
namespace Ratchet\Resource\Command;
use Ratchet\SocketInterface;
use Ratchet\Resource\Connection;
/**
* A factory pattern class to easily create all the things in the Ratchet\Resource\Command interface
@ -32,7 +32,7 @@ class Factory {
* @return CommandInterface
* @throws UnexpectedValueException
*/
public function newCommand($name, SocketInterface $conn) {
public function newCommand($name, Connection $conn) {
$cmd = null;
foreach ($this->_paths as $path) {
if (class_exists($path . $name)) {

View File

@ -0,0 +1,62 @@
<?php
namespace Ratchet\Resource;
use Ratchet\SocketInterface;
/**
* @todo Consider if this belongs under Application
*/
class Connection {
protected $_data = array();
/**
* @var Ratchet\SocketInterface
*/
protected $_socket;
public function __construct(SocketInterface $socket) {
$this->_socket = $socket;
}
/**
* @return int
*/
public function getID() {
return (int)(string)$this->_socket;
}
/**
* This is here because I couldn't figure out a better/easier way to tie a connection and socket together for the server and commands
* Anyway, if you're here, it's not recommended you use this/directly interact with the socket in your App...
* The command pattern (which is fully flexible, see Runtime) is the safest, desired way to interact with the socket(s).
* @return Ratchet\SocketInterface
*/
public function getSocket() {
return $this->_socket;
}
/**
* Set an attribute to the connection
* @param mixed
* @param mixed
*/
public function __set($name, $value) {
$this->_data[$name] = $value;
}
/**
* Get a previously set attribute bound to the connection
* @return mixed
* @throws \InvalidArgumentException
*/
public function __get($name) {
if (!isset($this->_data[$name])) {
throw new \InvalidArgumentException("Attribute '{$name}' not found in Connection {$this->getID()}");
}
if (is_callable($this->_data[$name])) {
return $this->_data[$name]($this);
} else {
return $this->_data[$name];
}
}
}

View File

@ -1,39 +0,0 @@
<?php
namespace Ratchet\Resource\Connection;
use Ratchet\SocketInterface;
/**
* @todo Should I build the commands into this class? They'd be executed by the Server...
*/
class Connection implements ConnectionInterface {
/**
* @var int
*/
protected $_id;
protected $_data = array();
public function __construct(SocketInterface $socket) {
$this->_id = (string)$socket->getResource();
$this->_id = (int)substr($this->_id, strrpos($this->_id, '#') + 1);
}
/**
* @return int
*/
public function getID() {
return $this->_id;
}
public function set($name, $val) {
$this->_data[$name] = $val;
}
public function get($name) {
if (!isset($this->_data[$name])) {
throw new \UnexpectedValueException("Attribute '{$name}' not found in Connection {$this->getID()}");
}
return $this->_data[$name];
}
}

View File

@ -1,30 +0,0 @@
<?php
namespace Ratchet\Resource\Connection;
use Ratchet\SocketInterface;
interface ConnectionInterface {
/**
* The socket this representative connection is tied to
* @param Ratchet\SocketInterface
*/
function __construct(SocketInterface $socket);
/**
* @return scalar
*/
function getID();
/**
* Set an attribute to the connection
* @param string
* @param mixed
*/
function set($name, $val);
/**
* Get a previously set attribute bound to the connection
* @return mixed
* @throws \UnexpectedValueException
*/
function get($name);
}

View File

@ -39,6 +39,11 @@ class Socket implements SocketInterface {
@socket_close($this->_resource);
}
public function __toString() {
$id = (string)$this->getResource();
return (string)substr($id, strrpos($id, '#') + 1);
}
/**
* @return resource (Socket)
*/

View File

@ -8,7 +8,7 @@ interface SocketInterface {
/**
* @return resource
*/
public function getResource();
function getResource();
/**
* Send text to the client on the other end of the socket
@ -31,4 +31,9 @@ interface SocketInterface {
* Close the open connection to the client/socket
*/
function close();
/**
* Return the unique ID of this socket instance
*/
function __toString();
}

View File

@ -2,21 +2,21 @@
namespace Ratchet\Tests\Mock;
use Ratchet\Application\ApplicationInterface;
use Ratchet\Tests\Mock\Socket as MockSocket;
use Ratchet\SocketInterface;
use Ratchet\Resource\Connection;
class Application implements ApplicationInterface {
public function __construct(ApplicationInterface $app = null) {
}
public function onOpen(SocketInterface $conn) {
public function onOpen(Connection $conn) {
}
public function onRecv(SocketInterface $from, $msg) {
public function onRecv(Connection $from, $msg) {
}
public function onClose(SocketInterface $conn) {
public function onClose(Connection $conn) {
}
public function onError(SocketInterface $conn, \Exception $e) {
public function onError(Connection $conn, \Exception $e) {
}
}

View File

@ -10,6 +10,10 @@ class FakeSocket extends RealSocket {
list($this->_arguments['domain'], $this->_arguments['type'], $this->_arguments['protocol']) = static::getConfig($domain, $type, $protocol);
}
public function __toString() {
return '1';
}
public function accept() {
}

View File

@ -0,0 +1,32 @@
<?php
namespace Ratchet\Tests\Resource;
use Ratchet\Resource\Connection;
use Ratchet\Tests\Mock\FakeSocket;
/**
* @covers Ratchet\Resource\Connection
*/
class ConnectionTest extends \PHPUnit_Framework_TestCase {
protected $_c;
public function setUp() {
$this->_c = new Connection(new FakeSocket);
}
public function testCanGetWhatIsSet() {
$key = 'hello';
$val = 'world';
$this->_c->{$key} = $val;
$this->assertEquals($val, $this->_c->{$key});
}
public function testExceptionThrownOnInvalidGet() {
$this->setExpectedException('InvalidArgumentException');
$ret = $this->_c->faked;
}
public function testLambdaReturnValueOnGet() {
$this->markTestIncomplete();
}
}