[BCB] [WIP] Overhaul

React at the core of Ratchet, refs #6
Removed Commands (except WAMP), refs #22
Updated Guzzle to 2.4 branch, refs #20
Fixed some Hixie bugs, refs #21
This commit is contained in:
Chris Boden 2012-05-07 18:49:13 -04:00
parent af35aab345
commit d30c8358ef
28 changed files with 449 additions and 1121 deletions

View File

@ -2,7 +2,7 @@
#Ratchet
A PHP 5.3 (PSR-0 compliant) component library for serving sockets and building socket based applications.
A PHP 5.3 (PSR-0) component library for serving sockets and building socket based applications.
Build up your application through simple interfaces using the decorator and command patterns.
Re-use your application without changing any of its code just by combining different components.
@ -32,57 +32,46 @@ See https://github.com/cboden/Ratchet-examples for some out-of-the-box working d
```php
<?php
namespace MyApps;
use Ratchet\Component\MessageComponentInterface;
use Ratchet\Resource\ConnectionInterface;
use Ratchet\Component\Server\IOServerComponent;
use Ratchet\Component\WebSocket\WebSocketComponent;
use Ratchet\Resource\Command\Composite as Cmds;
use Ratchet\Resource\Command\Action\SendMessage;
use Ratchet\Resource\Command\Action\CloseConnection;
use Ratchet\MessageInterface;
use Ratchet\ConnectionInterface;
use Ratchet\Server\IoServer;
use Ratchet\WebSocket\WsServer;
/**
* chat.php
* Send any incoming messages to all connected clients (except sender)
*/
class Chat implements MessageComponentInterface {
protected $_clients;
class Chat implements MessageInterface {
protected $clients;
public function __construct(MessageComponentInterface $app = null) {
$this->_clients = new \SplObjectStorage;
public function __construct() {
$this->clients = new \SplObjectStorage;
}
public function onOpen(ConnectionInterface $conn) {
$this->_clients->attach($conn);
$this->clients->attach($conn);
}
public function onMessage(ConnectionInterface $from, $msg) {
$commands = new Cmds;
foreach ($this->_clients as $client) {
foreach ($this->clients as $client) {
if ($from != $client) {
$msg_cmd = new SendMessage($client);
$msg_cmd->setMessage($msg);
$commands->enqueue($msg_cmd);
$client->send($msg);
}
}
return $commands;
}
public function onClose(ConnectionInterface $conn) {
$this->_clients->detach($conn);
$this->clients->detach($conn);
}
public function onError(ConnectionInterface $conn, \Exception $e) {
return new CloseConnection($conn);
$conn->close();
}
}
// Run the server application through the WebSocket protocol
$server = new IOServerComponent(new WebSocketComponent(new Chat));
$server->run(8000);
// Run the server application through the WebSocket protocol
$server = new IoServer(new WsServer(new Chat));
$server->run(8000);
```
# php chat.php

View File

@ -23,5 +23,26 @@
"php": ">=5.3.2"
, "guzzle/guzzle": "2.4.*"
, "symfony/http-foundation": "2.1.*"
, "evenement/evenement": "dev-master"
, "cboden/react": "dev-master"
}
, "repositories": [
{
"type": "package"
, "package": {
"name": "cboden/react"
, "version": "dev-master"
, "autoload": {
"psr-0": {
"React": "src"
}
}
, "source": {
"url": "git@github.com:cboden/SocketServer.git"
, "type": "git"
, "reference": "ratchet"
}
}
}
]
}

19
composer.lock generated
View File

@ -1,10 +1,20 @@
{
"hash": "8cea9930a3f96f47c598f35e2ea96a85",
"hash": "ae2788962867b06b14a9e473cdd71ebd",
"packages": [
{
"package": "cboden/react",
"version": "dev-master",
"source-reference": "ratchet"
},
{
"package": "doctrine/common",
"version": "2.2.x-dev",
"source-reference": "1e0aa60d109c630d19543d999f12e2852ef8f932"
"version": "dev-master",
"source-reference": "717ea940ff8fa0854e84a2249edadfb998f91406"
},
{
"package": "evenement/evenement",
"version": "dev-master",
"source-reference": "808e3aaea8d4f908e455b0e047cc1acc46b38d44"
},
{
"package": "guzzle/guzzle",
@ -23,8 +33,7 @@
},
{
"package": "symfony/validator",
"version": "dev-master",
"source-reference": "dac248b43b62d30023dd9b73ad7e5b7bc1128e5e"
"version": "v2.0.10"
}
],
"packages-dev": null,

View File

@ -1,199 +1,57 @@
<?php
namespace Ratchet\Component\Server;
use Ratchet\Component\MessageComponentInterface;
use Ratchet\Resource\Socket\SocketInterface;
use Ratchet\Resource\Socket\BSDSocket;
use Ratchet\Resource\ConnectionInterface;
use Ratchet\Resource\Connection;
use Ratchet\Resource\Command\CommandInterface;
use React\EventLoop\LoopInterface;
use React\Socket\ServerInterface;
use React\EventLoop\Factory as LoopFactory;
use React\Socket\Server as Reactor;
/**
* Creates an open-ended socket to listen on a port for incomming connections. Events are delegated through this to attached applications
*/
class IOServerComponent implements MessageComponentInterface {
/**
* @var array of Socket Resources
*/
protected $_resources = array();
class IOServerComponent {
protected $loop;
/**
* @var array of resources/Connections
*/
protected $_connections = array();
protected $connections;
/**
* The decorated application to send events to
* @var Ratchet\Component\ComponentInterface
*/
protected $_decorating;
public function __construct(MessageComponentInterface $app, ServerInterface $socket, LoopInterface $loop) {
$this->loop = $loop;
/**
* Number of bytes to read in the TCP buffer at a time
* Default is (currently) 4kb
* @var int
*/
protected $_buffer_size = 4096;
$that = $this;
/**
* After run() is called, the server will loop as long as this is true
* This is here for unit testing purposes
* @var bool
* @internal
*/
protected $_run = true;
$socket->on('connect', function($conn) use ($app, $that) {
$decor = new IoConnection($conn, $that);
public function __construct(MessageComponentInterface $component) {
$this->_decorating = $component;
$decor->resourceId = (int)$conn->socket;
$decor->remoteAddress = '127.0.0.1'; // todo
$app->onOpen($decor);
$conn->on('data', function($data) use ($decor, $app) {
$app->onMessage($decor, $data);
});
$conn->on('error', function($e) use ($decor, $app) {
$app->onError($decor, $e);
});
$conn->on('end', function() use ($decor, $app) {
$app->onClose($decor);
});
});
}
/**
* Set the incoming buffer size in bytes
* @param int
* @return App
* @throws InvalidArgumentException If the parameter is less than 1
*/
public function setBufferSize($recv_bytes) {
if ((int)$recv_bytes < 1) {
throw new \InvalidArgumentException('Invalid number of bytes set, must be more than 0');
public static function factory(MessageComponentInterface $component, $port = 80, $address = '0.0.0.0') {
$loop = LoopFactory::create();
$socket = new Reactor($loop);
$socket->listen($port, $address);
$server = new self($component, $socket, $loop);
return $server;
}
$this->_buffer_size = (int)$recv_bytes;
return $this;
}
/*
* Run the server infinitely
* @param int The port to listen to connections on (make sure to run as root if < 1000)
* @param mixed The address to listen for incoming connections on. "0.0.0.0" to listen from anywhere
* @param Ratchet\Resource\Socket\SocketInterface
* @throws Ratchet\Exception
*/
public function run($port, $address = '0.0.0.0', SocketInterface $host = null) {
if (null === $host) {
$host = new BSDSocket;
$host->set_option(SOL_SOCKET, SO_REUSEADDR, 1);
}
$this->_connections[$host->getResource()] = new Connection($host);
$this->_resources[] = $host->getResource();
gc_enable();
set_time_limit(0);
ob_implicit_flush();
declare(ticks = 1);
$host->set_option(SOL_SOCKET, SO_SNDBUF, $this->_buffer_size);
$host->set_nonblock()->bind($address, (int)$port)->listen();
do {
$this->loop($host);
} while ($this->_run);
}
protected function loop(SocketInterface $host) {
$changed = $this->_resources;
try {
$write = $except = null;
$num_changed = $host->select($changed, $write, $except, null);
} catch (Exception $e) {
// master had a problem?...what to do?
return;
}
foreach($changed as $resource) {
try {
$conn = $this->_connections[$resource];
if ($host->getResource() === $resource) {
$res = $this->onOpen($conn);
} else {
$data = $buf = '';
$bytes = $conn->getSocket()->recv($buf, $this->_buffer_size, MSG_DONTWAIT);
if ($bytes > 0) {
$data = $buf;
// This idea works* but...
// 1) A single DDOS attack will block the entire application (I think)
// 2) What if the last message in the frame is equal to $recv_bytes? Would loop until another msg is sent
// 3) This failed...an intermediary can set their buffer lower and this still propagates a fragment
// Need to 1) proc_open the recv() calls. 2) ???
/*
while ($bytes === $recv_bytes) {
$bytes = $conn->recv($buf, $recv_bytes, 0);
$data .= $buf;
}
*/
$res = $this->onMessage($conn, $data);
} else {
$res = $this->onClose($conn);
}
}
} catch (\Exception $e) {
$res = $this->onError($conn, $e);
}
while ($res instanceof CommandInterface) {
try {
$new_res = $res->execute($this);
} catch (\Exception $e) {
break;
// trigger new error
// $new_res = $this->onError($e->getSocket()); ???
// this is dangerous territory...could get in an infinte loop...Exception might not be Ratchet\Exception...$new_res could be ActionInterface|Composite|NULL...
}
$res = $new_res;
}
}
}
/**
* {@inheritdoc}
*/
public function onOpen(ConnectionInterface $conn) {
$new_socket = clone $conn->getSocket();
$new_socket->set_nonblock();
$new_connection = new Connection($new_socket);
$new_connection->remoteAddress = $new_socket->getRemoteAddress();
$new_connection->resourceId = (int)substr((string)$new_socket->getResource(), strrpos((string)$new_socket->getResource(), '#') + 1);
$this->_resources[] = $new_connection->getSocket()->getResource();
$this->_connections[$new_connection->getSocket()->getResource()] = $new_connection;
return $this->_decorating->onOpen($new_connection);
}
/**
* {@inheritdoc}
*/
public function onMessage(ConnectionInterface $from, $msg) {
return $this->_decorating->onMessage($from, $msg);
}
/**
* {@inheritdoc}
*/
public function onClose(ConnectionInterface $conn) {
$resource = $conn->getSocket()->getResource();
$cmd = $this->_decorating->onClose($conn);
unset($this->_connections[$resource], $this->_resources[array_search($resource, $this->_resources)]);
return $cmd;
}
/**
* {@inheritdoc}
*/
public function onError(ConnectionInterface $conn, \Exception $e) {
return $this->_decorating->onError($conn, $e);
public function run() {
$this->loop->run();
}
}

View File

@ -0,0 +1,40 @@
<?php
namespace Ratchet\Component\Server;
use Ratchet\Resource\ConnectionInterface;
use React\Socket\ConnectionInterface as ReactConn;
/**
* A proxy object representing a connection to the application
* This acts as a container to storm data (in memory) about the connection
*/
class IoConnection implements ConnectionInterface {
/**
* @var Ratchet\Component\Server\IOServer
*/
protected $server;
/**
* @var React\Socket\ConnectionInterface
*/
protected $conn;
public function __construct(ReactConn $conn, IOServerComponent $server) {
$this->conn = $conn;
$this->server = $server;
}
/**
* {@inheritdoc}
*/
public function send($data) {
return $this->conn->write($data);
}
/**
* {@inheritdoc}
*/
public function close() {
$this->server->onClose($this);
$this->conn->end();
}
}

View File

@ -2,7 +2,6 @@
namespace Ratchet\Component\Server;
use Ratchet\Component\MessageComponentInterface;
use Ratchet\Resource\ConnectionInterface;
use Ratchet\Resource\Command\Action\CloseConnection;
class IpBlackListComponent implements MessageComponentInterface {
/**
@ -74,7 +73,7 @@ class IpBlackListComponent implements MessageComponentInterface {
*/
function onOpen(ConnectionInterface $conn) {
if ($this->isBlocked($conn->remoteAddress)) {
return new CloseConnection($conn);
return $conn->close();
}
return $this->_decorating->onOpen($conn);

View File

@ -0,0 +1,28 @@
<?php
namespace Ratchet\Component\WAMP\Resource;
use Ratchet\Resource\AbstractConnectionDecorator;
use Ratchet\Resrouce\ConnectionInterface;
/**
* @property stdClass $WAMP
*/
class Connection extends AbstractConnectionDecorator {
public function __construct() {
// call write() with welcome message
}
public function callResponse() {
}
public function callError() {
}
public function event() {
}
public function write($data) {
}
public function end() {
}
}

View File

@ -1,22 +0,0 @@
<?php
namespace Ratchet\Component\WebSocket\Command\Action;
use Ratchet\Resource\Command\Action\SendMessage;
use Ratchet\Component\ComponentInterface;
/**
* Not yet implemented/completed
*/
class Disconnect extends SendMessage {
protected $_code = 1000;
public function setStatusCode($code) {
$this->_code = (int)$code;
// re-do message based on code
}
public function execute(ComponentInterface $scope = null) {
parent::execute();
$this->_socket->close();
}
}

View File

@ -1,12 +0,0 @@
<?php
namespace Ratchet\Component\WebSocket\Command\Action;
use Ratchet\Resource\Command\Action\ActionTemplate;
use Ratchet\Component\ComponentInterface;
/**
* Not yet implemented/completed
*/
class Ping extends ActionTemplate {
public function execute(ComponentInterface $scope = null) {
}
}

View File

@ -1,12 +0,0 @@
<?php
namespace Ratchet\Component\WebSocket\Command\Action;
use Ratchet\Resource\Command\Action\ActionTemplate;
use Ratchet\Component\ComponentInterface;
/**
* Not yet implemented/completed
*/
class Pong extends ActionTemplate {
public function execute(ComponentInterface $scope = null) {
}
}

View File

@ -1,19 +1,224 @@
<?php
namespace Ratchet\Component\WebSocket\Guzzle\Http\Message;
use Guzzle\Http\Message\RequestFactory as gReqFac;
use Guzzle\Common\Collection;
use Guzzle\Http\EntityBody;
use Guzzle\Http\QueryString;
use Guzzle\Http\Url;
use Guzzle\Http\Message\RequestFactoryInterface;
/**
* Just slighly changing the Guzzle RequestFactory to always return an EntityEnclosingRequest instance instead of Request
* Default HTTP request factory used to create the default
* Guzzle\Http\Message\Request and Guzzle\Http\Message\EntityEnclosingRequest
* objects.
*/
class RequestFactory extends gReqFac {
public static function getInstance() {
static $instance = null;
if (null === $instance) {
$instance = parent::getInstance();
static::$instance->requestClass = static::$instance->entityEnclosingRequestClass;
class RequestFactory implements RequestFactoryInterface
{
/**
* @var Standard request headers
*/
protected static $requestHeaders = array(
'accept', 'accept-charset', 'accept-encoding', 'accept-language',
'authorization', 'cache-control', 'connection', 'cookie',
'content-length', 'content-type', 'date', 'expect', 'from', 'host',
'if-match', 'if-modified-since', 'if-none-match', 'if-range',
'if-unmodified-since', 'max-forwards', 'pragma', 'proxy-authorization',
'range', 'referer', 'te', 'transfer-encoding', 'upgrade', 'user-agent',
'via', 'warning'
);
/**
* @var RequestFactory Singleton instance of the default request factory
*/
protected static $instance;
/**
* @var string Class to instantiate for GET, HEAD, and DELETE requests
*/
protected $requestClass = 'Guzzle\\Http\\Message\\EntityEnclosingRequest';
/**
* @var string Class to instantiate for POST and PUT requests
*/
protected $entityEnclosingRequestClass = 'Guzzle\\Http\\Message\\EntityEnclosingRequest';
/**
* Get a cached instance of the default request factory
*
* @return RequestFactory
*/
public static function getInstance()
{
// @codeCoverageIgnoreStart
if (!self::$instance) {
self::$instance = new self();
}
// @codeCoverageIgnoreEnd
return self::$instance;
}
return $instance;
/**
* {@inheritdoc}
*/
public function parseMessage($message)
{
if (!$message) {
return false;
}
$headers = new Collection();
$scheme = $host = $body = $method = $user = $pass = $query = $port = $version = $protocol = '';
$path = '/';
// Inspired by https://github.com/kriswallsmith/Buzz/blob/message-interfaces/lib/Buzz/Message/Parser/Parser.php#L16
$lines = preg_split('/(\\r?\\n)/', $message, -1, PREG_SPLIT_DELIM_CAPTURE);
for ($i = 0, $c = count($lines); $i < $c; $i += 2) {
$line = $lines[$i];
// If two line breaks were encountered, then this is the body
if (empty($line)) {
$body = implode('', array_slice($lines, $i + 2));
break;
}
// Parse message headers
$matches = array();
if (!$method && preg_match('#^(?P<method>[A-Za-z]+)\s+(?P<path>/.*)\s+(?P<protocol>\w+)/(?P<version>\d\.\d)\s*$#i', $line, $matches)) {
$method = strtoupper($matches['method']);
$protocol = strtoupper($matches['protocol']);
$path = $matches['path'];
$version = $matches['version'];
$scheme = 'http';
} else if (strpos($line, ':')) {
list($key, $value) = explode(':', $line, 2);
$key = trim($key);
// Normalize standard HTTP headers
if (in_array(strtolower($key), self::$requestHeaders)) {
$key = str_replace(' ', '-', ucwords(str_replace('-', ' ', $key)));
}
// Headers are case insensitive
$headers->add($key, trim($value));
}
}
// Check for the Host header
if (isset($headers['Host'])) {
$host = $headers['Host'];
}
if (strpos($host, ':')) {
list($host, $port) = array_map('trim', explode(':', $host));
if ($port == 443) {
$scheme = 'https';
}
} else {
$port = '';
}
// Check for basic authorization
$auth = isset($headers['Authorization']) ? $headers['Authorization'] : '';
if ($auth) {
list($type, $data) = explode(' ', $auth);
if (strtolower($type) == 'basic') {
$data = base64_decode($data);
list($user, $pass) = explode(':', $data);
}
}
// Check if a query is present
$qpos = strpos($path, '?');
if ($qpos) {
$query = substr($path, $qpos);
$path = substr($path, 0, $qpos);
}
return array(
'method' => $method,
'protocol' => $protocol,
'protocol_version' => $version,
'parts' => array(
'scheme' => $scheme,
'host' => $host,
'port' => $port,
'user' => $user,
'pass' => $pass,
'path' => $path,
'query' => $query
),
'headers' => $headers->getAll(),
'body' => $body
);
}
/**
* {@inheritdoc}
*/
public function fromMessage($message)
{
$parsed = $this->parseMessage($message);
if (!$parsed) {
return false;
}
$request = $this->fromParts($parsed['method'], $parsed['parts'],
$parsed['headers'], $parsed['body'], $parsed['protocol'],
$parsed['protocol_version']);
// EntityEnclosingRequest adds an "Expect: 100-Continue" header when
// using a raw request body for PUT or POST requests. This factory
// method should accurately reflect the message, so here we are
// removing the Expect header if one was not supplied in the message.
if (!isset($parsed['headers']['Expect'])) {
$request->removeHeader('Expect');
}
return $request;
}
/**
* {@inheritdoc}
*/
public function fromParts($method, array $parts, $headers = null, $body = null, $protocol = 'HTTP', $protocolVersion = '1.1')
{
return $this->create($method, Url::buildUrl($parts, true), $headers, $body)
->setProtocolVersion($protocolVersion);
}
/**
* {@inheritdoc}
*/
public function create($method, $url, $headers = null, $body = null)
{
if ($method != 'POST' && $method != 'PUT' && $method != 'PATCH') {
$c = $this->requestClass;
$request = new $c($method, $url, $headers);
if ($body) {
$request->setBody(EntityBody::factory($body));
}
} else {
$c = $this->entityEnclosingRequestClass;
$request = new $c($method, $url, $headers);
if ($body) {
if ($method == 'POST' && (is_array($body) || $body instanceof Collection)) {
$request->addPostFields($body);
} else if (is_resource($body) || $body instanceof EntityBody) {
$request->setBody($body, (string) $request->getHeader('Content-Type'));
} else {
$request->setBody((string) $body, (string) $request->getHeader('Content-Type'));
}
}
// Fix chunked transfers based on the passed headers
if (isset($headers['Transfer-Encoding']) && $headers['Transfer-Encoding'] == 'chunked') {
$request->removeHeader('Content-Length')
->setHeader('Transfer-Encoding', 'chunked');
}
}
return $request;
}
}

View File

@ -20,7 +20,7 @@ class Hixie76 implements VersionInterface {
* {@inheritdoc}
*/
public static function isProtocol(RequestInterface $request) {
return !(null === $request->getHeader('Sec-WebSocket-Key2'));
return !(null === $request->getHeader('Sec-WebSocket-Key2', true));
}
/**
@ -28,13 +28,13 @@ class Hixie76 implements VersionInterface {
* @return string
*/
public function handshake(RequestInterface $request) {
$body = $this->sign($request->getHeader('Sec-WebSocket-Key1'), $request->getHeader('Sec-WebSocket-Key2'), $request->getBody());
$body = $this->sign($request->getHeader('Sec-WebSocket-Key1', true), $request->getHeader('Sec-WebSocket-Key2', true), (string)$request->getBody());
$headers = array(
'Upgrade' => 'WebSocket'
, 'Connection' => 'Upgrade'
, 'Sec-WebSocket-Origin' => $request->getHeader('Origin')
, 'Sec-WebSocket-Location' => 'ws://' . $request->getHeader('Host') . $request->getPath()
, 'Sec-WebSocket-Origin' => $request->getHeader('Origin', true)
, 'Sec-WebSocket-Location' => 'ws://' . $request->getHeader('Host', true) . $request->getPath()
);
$response = new Response('101', $headers, $body);

View File

@ -2,15 +2,13 @@
namespace Ratchet\Component\WebSocket;
use Ratchet\Component\MessageComponentInterface;
use Ratchet\Resource\ConnectionInterface;
use Ratchet\Resource\Command\Factory;
use Ratchet\Resource\Command\CommandInterface;
use Ratchet\Resource\Command\Action\SendMessage;
use Guzzle\Http\Message\RequestInterface;
use Ratchet\Component\WebSocket\Guzzle\Http\Message\RequestFactory;
/**
* The adapter to handle WebSocket requests/responses
* This is a mediator between the Server and your application to handle real-time messaging through a web browser
* @todo Separate this class into a two classes: Component and a protocol handler
* @link http://ca.php.net/manual/en/ref.http.php
* @link http://dev.w3.org/html5/websockets/
*/
@ -22,10 +20,9 @@ class WebSocketComponent implements MessageComponentInterface {
protected $_decorating;
/**
* Creates commands/composites instead of calling several classes manually
* @var Ratchet\Resource\Command\Factory
* @var SplObjectStorage
*/
protected $_factory;
protected $connections;
/**
* Re-entrant instances of protocol version classes
@ -48,7 +45,7 @@ class WebSocketComponent implements MessageComponentInterface {
public function __construct(MessageComponentInterface $component) {
$this->_decorating = $component;
$this->_factory = new Factory;
$this->connections = new \SplObjectStorage;
}
/**
@ -96,11 +93,12 @@ class WebSocketComponent implements MessageComponentInterface {
$response->setHeader('X-Powered-By', \Ratchet\Resource\VERSION);
$header = (string)$response;
$comp = $this->_factory->newComposite();
$comp->enqueue($this->_factory->newCommand('SendMessage', $from)->setMessage($header));
$comp->enqueue($this->prepareCommand($this->_decorating->onOpen($from, $msg))); // Need to send headers/handshake to application, let it have the cookies, etc
$from->send($header);
return $comp;
$conn = new WsConnection($from);
$this->connections->attach($from, $conn);
return $this->_decorating->onOpen($conn);
}
if (!isset($from->WebSocket->message)) {
@ -115,6 +113,7 @@ class WebSocketComponent implements MessageComponentInterface {
$from->WebSocket->frame->addBuffer($msg);
if ($from->WebSocket->frame->isCoalesced()) {
if ($from->WebSocket->frame->getOpcode() > 2) {
$from->end();
throw new \UnexpectedValueException('Control frame support coming soon!');
}
// Check frame
@ -127,10 +126,8 @@ class WebSocketComponent implements MessageComponentInterface {
}
if ($from->WebSocket->message->isCoalesced()) {
$cmds = $this->prepareCommand($this->_decorating->onMessage($from, (string)$from->WebSocket->message));
$this->_decorating->onMessage($this->connections[$from], (string)$from->WebSocket->message);
unset($from->WebSocket->message);
return $cmds;
}
}
@ -138,57 +135,23 @@ class WebSocketComponent implements MessageComponentInterface {
* {@inheritdoc}
*/
public function onClose(ConnectionInterface $conn) {
return $this->prepareCommand($this->_decorating->onClose($conn));
// WS::onOpen is not called when the socket connects, it's call when the handshake is done
// The socket could close before WS calls onOpen, so we need to check if we've "opened" it for the developer yet
if ($this->connections->contains($conn)) {
$this->_decorating->onClose($this->connections[$conn]);
$this->connections->detach($conn);
}
}
/**
* {@inheritdoc}
* @todo Shouldn't I be using prepareCommand() on the return? look into this
*/
public function onError(ConnectionInterface $conn, \Exception $e) {
return $this->_decorating->onError($conn, $e);
if ($this->connections->contains($conn)) {
$this->_decorating->onError($this->connections[$conn], $e);
} else {
$conn->close();
}
/**
* Checks if a return Command from your application is a message, if so encode it/them
* @param Ratchet\Resource\Command\CommandInterface|NULL
* @return Ratchet\Resource\Command\CommandInterface|NULL
*/
protected function prepareCommand(CommandInterface $command = null) {
$cache = array();
return $this->mungCommand($command, $cache);
}
/**
* Does the actual work of prepareCommand
* Separated to pass the cache array by reference, so we're not framing the same stirng over and over
* @param Ratchet\Resource\Command\CommandInterface|NULL
* @param array
* @return Ratchet\Resource\Command\CommandInterface|NULL
*/
protected function mungCommand(CommandInterface $command = null, &$cache) {
if ($command instanceof SendMessage) {
if (!isset($command->getConnection()->WebSocket->version)) { // Client could close connection before handshake complete or invalid handshake
return $command;
}
$version = $command->getConnection()->WebSocket->version;
$hash = md5($command->getMessage()) . '-' . spl_object_hash($version);
if (!isset($cache[$hash])) {
$cache[$hash] = $version->frame($command->getMessage(), $this->_mask_payload);
}
return $command->setMessage($cache[$hash]);
}
if ($command instanceof \Traversable) {
foreach ($command as $cmd) {
$cmd = $this->mungCommand($cmd, $cache);
}
}
return $command;
}
/**

View File

@ -0,0 +1,33 @@
<?php
namespace Ratchet\Component\WebSocket;
use Ratchet\Resource\AbstractConnectionDecorator;
use Ratchet\Resrouce\ConnectionInterface;
/**
* @property stdClass $WebSocket
*/
class WsConnection extends AbstractConnectionDecorator {
public function send($data) {
// need frame caching
$data = $this->WebSocket->version->frame($data, false);
$this->getConnection()->send($data);
}
public function close() {
// send close frame
// ???
// profit
$this->getConnection()->close(); // temporary
}
public function ping() {
}
public function pong() {
}
}

View File

@ -1,49 +0,0 @@
<?php
namespace Ratchet\Resource;
use Ratchet\Resource\Socket\SocketInterface;
use Ratchet\Resource\Socket\BSDSocketException;
/**
* A proxy object representing a connection to the application
* This acts as a container to storm data (in memory) about the connection
*/
class Connection implements ConnectionInterface {
/**
* @var Ratchet\Resource\Socket\SocketInterface
*/
protected $_socket;
public function __construct(SocketInterface $socket) {
$this->_socket = $socket;
}
/**
* {@inheritdoc}
*/
public function write($data) {
return $this->_socket->deliver($data);
}
/**
* {@inheritdoc}
*/
public function end() {
try {
$this->_socket->shutdown();
} catch (BSDSocketException $e) {
}
$this->_socket->close();
}
/**
* This is here because I couldn't figure out a better/easier way to tie a connection and socket together for the server and commands
* Anyway, if you're here, it's not recommended you use this/directly interact with the socket in your App...
* The command pattern (which is fully flexible, see Runtime) is the safest, desired way to interact with the socket(s).
* @return Ratchet\SocketInterface
* @todo Figure out a better way to match Socket/Connection in Application and Commands
*/
public function getSocket() {
return $this->_socket;
}
}

View File

@ -8,10 +8,10 @@ interface ConnectionInterface {
* Send data to the connection
* @param string
*/
function write($data);
function send($data);
/**
* End the connection
* Close the connection
*/
function end();
function close();
}

View File

@ -1,245 +0,0 @@
<?php
namespace Ratchet\Resource\Socket;
/**
* A wrapper for the PHP socket_ functions
* @author Chris Boden <shout at chrisboden dot ca>
* @link http://ca2.php.net/manual/en/book.sockets.php
*/
class BSDSocket implements SocketInterface {
/**
* @type resource
*/
protected $_resource;
public static $_defaults = array(
'domain' => AF_INET
, 'type' => SOCK_STREAM
, 'protocol' => SOL_TCP
);
/**
* @param int Specifies the protocol family to be used by the socket.
* @param int The type of communication to be used by the socket
* @param int Sets the specific protocol within the specified domain to be used when communicating on the returned socket
* @throws BSDSocketException
*/
public function __construct($domain = null, $type = null, $protocol = null) {
list($domain, $type, $protocol) = static::getConfig($domain, $type, $protocol);
$this->_resource = @socket_create($domain, $type, $protocol);
if (!is_resource($this->_resource)) {
throw new BSDSocketException($this);
}
}
public function __destruct() {
@socket_close($this->_resource);
}
public function __toString() {
$id = (string)$this->getResource();
return (string)substr($id, strrpos($id, '#') + 1);
}
/**
* @return resource (Socket)
*/
public function getResource() {
return $this->_resource;
}
public function __clone() {
$this->_resource = @socket_accept($this->_resource);
if (false === $this->_resource) {
throw new BSDSocketException($this);
}
}
public function deliver($message) {
$len = strlen($message);
do {
$sent = $this->write($message, $len);
$len -= $sent;
$message = substr($message, $sent);
} while ($len > 0);
}
public function bind($address, $port = 0) {
if (false === @socket_bind($this->getResource(), $address, $port)) {
throw new BSDSocketException($this);
}
return $this;
}
public function close() {
@socket_close($this->getResource());
unset($this->_resource);
}
public function connect($address, $port = 0) {
if (false === @socket_connect($this->getResource(), $address, $port)) {
throw new BSDSocketException($this);
}
return $this;
}
public function getRemoteAddress() {
$address = $port = '';
if (false === @socket_getpeername($this->getResource(), $address, $port)) {
throw new BSDSocketException($this);
}
return $address;
}
public function get_option($level, $optname) {
if (false === ($res = @socket_get_option($this->getResource(), $level, $optname))) {
throw new BSDSocketException($this);
}
return $res;
}
public function listen($backlog = 0) {
if (false === @socket_listen($this->getResource(), $backlog)) {
throw new BSDSocketException($this);
}
return $this;
}
public function read($length, $type = PHP_BINARY_READ) {
if (false === ($res = @socket_read($this->getResource(), $length, $type))) {
throw new BSDSocketException($this);
}
return $res;
}
/**
* @see http://ca3.php.net/manual/en/function.socket-recv.php
* @param string Variable to write data to
* @param int Number of bytes to read
* @param int
* @return int Number of bytes received
* @throws BSDSocketException
*/
public function recv(&$buf, $len, $flags) {
if (false === ($bytes = @socket_recv($this->_resource, $buf, $len, $flags))) {
throw new BSDSocketException($this);
}
return $bytes;
}
/**
* Since PHP is retarded and their golden hammer, the array, doesn't implement any interfaces I have to hackishly overload socket_select
* @see http://ca3.php.net/manual/en/function.socket-select.php
* @param Iterator|array|NULL The sockets listed in the read array will be watched to see if characters become available for reading (more precisely, to see if a read will not block - in particular, a socket resource is also ready on end-of-file, in which case a socket_read() will return a zero length string).
* @param Iterator|array|NULL The sockets listed in the write array will be watched to see if a write will not block.
* @param Iterator|array|NULL The sockets listed in the except array will be watched for exceptions.
* @param int The tv_sec and tv_usec together form the timeout parameter. The timeout is an upper bound on the amount of time elapsed before socket_select() return. tv_sec may be zero , causing socket_select() to return immediately. This is useful for polling. If tv_sec is NULL (no timeout), socket_select() can block indefinitely.
* @param int
* @throws \InvalidArgumentException
* @throws BSDSocketException
*/
public function select(&$read, &$write, &$except, $tv_sec, $tv_usec = 0) {
$read = static::mungForSelect($read);
$write = static::mungForSelect($write);
$except = static::mungForSelect($except);
$num = socket_select($read, $write, $except, $tv_sec, $tv_usec);
if (false === $num) {
throw new BSDSocketException($this);
}
return $num;
}
public function set_block() {
if (false === @socket_set_block($this->getResource())) {
throw new BSDSocketException($this);
}
return $this;
}
public function set_nonblock() {
if (false === @socket_set_nonblock($this->getResource())) {
throw new BSDSocketException($this);
}
return $this;
}
public function set_option($level, $optname, $optval) {
if (false === @socket_set_option($this->getResource(), $level, $optname, $optval)) {
throw new BSDSocketException($this);
}
return $this;
}
public function shutdown($how = 2) {
if (false === @socket_shutdown($this->getResource(), $how)) {
throw new BSDSocketException($this);
}
return $this;
}
public function write($buffer, $length = 0) {
if (false === ($res = @socket_write($this->getResource(), $buffer, $length))) {
throw new BSDSocketException($this);
}
return $res;
}
/**
* @internal
* @param int Specifies the protocol family to be used by the socket.
* @param int The type of communication to be used by the socket
* @param int Sets the specific protocol within the specified domain to be used when communicating on the returned socket
* @return array
*/
protected static function getConfig($domain = null, $type = null, $protocol = null) {
foreach (static::$_defaults as $key => $val) {
if (null === $$key) {
$$key = $val;
}
}
return array($domain, $type, $protocol);
}
/**
* @internal
* @param Iterator|array|NULL
* @return array|NULL
* @throws \InvalidArgumentException
*/
protected static function mungForSelect($collection) {
if (null === $collection || is_array($collection)) {
return $collection;
}
if (!($collection instanceof \Traversable)) {
throw new \InvalidArgumentException('Object pass is not traversable');
}
$return = array();
foreach ($collection as $key => $socket) {
$return[$key] = ($socket instanceof $this ? $socket->getResource() : $socket);
}
return $return;
}
}

View File

@ -1,26 +0,0 @@
<?php
namespace Ratchet\Resource\Socket;
/**
* Uses internal php methods to fill an Exception class (no parameters required)
*/
class BSDSocketException extends \Exception {
/**
* @var BSDSocket
*/
protected $_socket;
public function __construct(BSDSocket $socket) {
$int = socket_last_error();
$msg = socket_strerror($int);
$this->_socket = $socket;
//@socket_clear_error($socket->getResource());
parent::__construct($msg, $int);
}
public function getSocket() {
return $this->_socket;
}
}

View File

@ -1,151 +0,0 @@
<?php
namespace Ratchet\Resource\Socket;
/**
* An object-oriented container for a single socket connection
* @todo Major refactor when socket streams are implemented against this interface
*/
interface SocketInterface {
/**
* @return resource
*/
function getResource();
/**
* Return the unique ID of this socket instance
*/
function __toString();
/**
* Calls socket_accept, duplicating its self
* @throws Exception
*/
function __clone();
/**
* Send a message through the socket. This writes to the buffer until the entire message is delivered
* @param string Your message to send to the socket
* @return null
* @throws Exception
* @see write
*/
function deliver($message);
// Not sure if I'll implement this or leave it only in clone
// function accept();
/**
* Bind the socket instance to an address/port
* @param string
* @param int
* @return SocketInterface
* @throws Exception
*/
function bind($address, $port = 0);
/**
* Close the open connection to the client/socket
*/
function close();
/**
* Initiates a connection to a socket
* @param string
* @param int
* @return SocketInterface
* @throws Exception
*/
function connect($address, $port = 0);
/**
* Get the address the socket connected from
* @return string
* @throws Exception
*/
function getRemoteAddress();
/**
* @param int
* @param int
* @return mixed
* @throws Exception
*/
function get_option($level, $optname);
/**
* Listen for incoming connections on this socket
* @param int
* @return SocketInterface
* @throws Exception
*/
function listen($backlog = 0);
/**
* Read a maximum of length bytes from a socket
* @param int Number of bytes to read
* @param int Flags
* @return string Data read from the socket
* @throws Exception
*/
function read($length, $type = PHP_BINARY_READ);
/**
* Called when the client sends data to the server through the socket
* @param string Variable to write data to
* @param int Number of bytes to read
* @param int
* @return int Number of bytes received
* @throws Exception
* @todo Change the pass by reference
*/
function recv(&$buf, $len, $flags);
/**
* @param array|Iterator
* @param array|Iterator
* @param array|Iterator
* @param int
* @param int
* @return int
* @throws Exception
* @todo Figure out how to break this out to not do pass by reference
*/
function select(&$read, &$write, &$except, $tv_sec, $tv_usec = 0);
/**
* Sets the blocking mode on the socket resource
* Wen an operation (receive, send, connect, accept, etc) is performed after set_block() the script will pause execution until the operation is completed
* @return SocketInterface
* @throws Exception
*/
function set_block();
/**
* Sets nonblocking mode for socket resource
* @return SocketInterface
* @throws Exception
*/
function set_nonblock();
/**
* @param int
* @param int
* @param mixed
* @return SocketInterface
*/
function set_option($level, $optname, $optval);
/**
* @param int
* @return SocketInterface
* @throws Exception
*/
function shutdown($how = 2);
/**
* Send text to the client on the other end of the socket
* @param string
* @param int
*/
function write($buffer, $length = 0);
}

View File

@ -1,65 +0,0 @@
<?php
namespace Ratchet\Tests\Component\Server;
use Ratchet\Component\Server\IOServerComponent;
use Ratchet\Tests\Mock\FakeSocket as Socket;
use Ratchet\Tests\Mock\Component as TestApp;
/**
* @covers Ratchet\Component\Server\IOServerComponent
*/
class IOServerComponentTest extends \PHPUnit_Framework_TestCase {
protected $_catalyst;
protected $_server;
protected $_decorated;
public function setUp() {
$this->_catalyst = new Socket;
$this->_decorated = new TestApp;
$this->_server = new IOServerComponent($this->_decorated);
$ref = new \ReflectionClass('\\Ratchet\\Component\\Server\\IOServerComponent');
$prop = $ref->getProperty('_run');
$prop->setAccessible(true);
$prop->setValue($this->_server, false);
}
protected function getPrivateProperty($class, $name) {
$reflectedClass = new \ReflectionClass($class);
$property = $reflectedClass->getProperty($name);
$property->setAccessible(true);
return $property->getValue($class);
}
protected function getMasterConnection() {
$connections = $this->getPrivateProperty($this->_server, '_connections');
return array_pop($connections);
}
public function testOnOpenPassesClonedSocket() {
$this->_server->run(1025, '127.0.0.1', $this->_catalyst);
$master = $this->getMasterConnection();
$this->_server->onOpen($master);
$clone = $this->_decorated->last['onOpen'][0];
$this->assertEquals($master->resourceId + 1, $clone->resourceId);
}
public function testOnMessageSendsToApp() {
$this->_server->run(1025, '127.0.0.1', $this->_catalyst);
$master = $this->getMasterConnection();
// todo, make FakeSocket better, set data in select, recv to pass data when called, then do this check
// that way can mimic the TCP fragmentation/buffer situation
$this->_server->onOpen($master);
$clone = $this->_decorated->last['onOpen'][0];
// $this->_server->run($this->_catalyst);
$msg = 'Hello World!';
$this->_server->onMessage($clone, $msg);
$this->assertEquals($msg, $this->_decorated->last['onMessage'][1]);
}
}

View File

@ -23,7 +23,7 @@ class IpBlackListComponentTest extends \PHPUnit_Framework_TestCase {
$ret = $this->_comp->onOpen($conn);
$this->assertInstanceOf('\\Ratchet\\Resource\\Command\\Action\\CloseConnection', $ret);
$this->assertTrue($conn->last['close']);
}
public function testAddAndRemoveWithFluentInterfaces() {

View File

@ -1,8 +1,7 @@
<?php
namespace Ratchet\Tests\Component\WAMP;
use Ratchet\Component\WAMP\WAMPServerComponent;
use Ratchet\Resource\Connection;
use Ratchet\Tests\Mock\FakeSocket;
use Ratchet\Tests\Mock\Connection;
use Ratchet\Tests\Mock\WAMPComponent as TestComponent;
use Ratchet\Component\WAMP\Command\Action\CallResult;
use Ratchet\Component\WAMP\Command\Action\CallError;
@ -23,7 +22,7 @@ class WAMPServerComponentTest extends \PHPUnit_Framework_TestCase {
}
protected function newConn() {
return new Connection(new FakeSocket);
return new Connection;
}
public function invalidMessageProvider() {
@ -49,7 +48,7 @@ class WAMPServerComponentTest extends \PHPUnit_Framework_TestCase {
* @covers Ratchet\Component\WAMP\Command\Action\Welcome
*/
public function testWelcomeMessage() {
$conn = new Connection(new FakeSocket);
$conn = new Connection();
$return = $this->_comp->onOpen($conn);
$action = $return->pop();
@ -155,10 +154,7 @@ class WAMPServerComponentTest extends \PHPUnit_Framework_TestCase {
public function testOnErrorPropagation() {
$conn = $this->newConn();
try {
throw new \Exception('Nope');
} catch (\Exception $e) {
}
$e = new \Exception('Nope');
$this->_comp->onError($conn, $e);

View File

@ -4,17 +4,17 @@ use Ratchet\Resource\ConnectionInterface;
class Connection implements ConnectionInterface {
public $last = array(
'write' => ''
, 'end' => false
'send' => ''
, 'close' => false
);
public $remoteAddress = '127.0.0.1';
public function write($data) {
public function send($data) {
$this->last[__FUNCTION__] = $data;
}
public function end() {
public function close() {
$this->last[__FUNCTION__] = true;
}
}

View File

@ -8,15 +8,15 @@ class ConnectionDecorator extends AbstractConnectionDecorator {
, 'end' => false
);
public function write($data) {
public function send($data) {
$this->last[__FUNCTION__] = $data;
$this->getConnection()->write($data);
$this->getConnection()->send($data);
}
public function end() {
public function close() {
$this->last[__FUNCTION__] = true;
$this->getConnection()->end();
$this->getConnection()->close();
}
}

View File

@ -1,100 +0,0 @@
<?php
namespace Ratchet\Tests\Mock;
use Ratchet\Resource\Socket\SocketInterface;
use Ratchet\Resource\Socket\BSDSocket as RealSocket;
class FakeSocket implements SocketInterface {
public $_arguments = array();
public $_options = array();
protected $_id = 1;
public $_last = array();
public function getResource() {
return "#{$this->_id}";
}
public function __toString() {
return (string)$this->_id;
}
public function __construct($domain = null, $type = null, $protocol = null) {
list($this->_arguments['domain'], $this->_arguments['type'], $this->_arguments['protocol']) = array(1, 1, 1);
}
public function __clone() {
$this->_id++;
}
public function deliver($message) {
$this->write($message, strlen($message));
}
public function bind($address, $port = 0) {
$this->_last['bind'] = array($address, $port);
return $this;
}
public function close() {
}
public function connect($address, $port = 0) {
$this->_last['connect'] = array($address, $port = 0);
return $this;
}
public function getRemoteAddress() {
return '127.0.0.1';
}
public function get_option($level, $optname) {
return $this->_options[$level][$optname];
}
public function listen($backlog = 0) {
$this->_last['listen'] = array($backlog);
return $this;
}
public function read($length, $type = PHP_BINARY_READ) {
$this->_last['read'] = array($length, $type);
return 0;
}
public function recv(&$buf, $len, $flags) {
$this->_last['recv'] = array($buf, $len, $flags);
return 0;
}
public function select(&$read, &$write, &$except, $tv_sec, $tv_usec = 0) {
$this->_last['select'] = array($read, $write, $except, $tv_sec, $tv_usec);
return 0;
}
public function set_block() {
return $this;
}
public function set_nonblock() {
return $this;
}
public function set_option($level, $optname, $optval) {
if (!isset($this->_options[$level])) {
$this->_options[$level] = array();
}
$this->_options[$level][$optname] = $optval;
}
public function shutdown($how = 2) {
$this->_last['shutdown'] = array($how);
return $this;
}
public function write($buffer, $length = 0) {
$this->_last['write'] = array($buffer, $length);
return $this;
}
}

View File

@ -1,8 +1,7 @@
<?php
namespace Ratchet\Tests\Resource\Command;
use Ratchet\Resource\Command\Composite;
use Ratchet\Resource\Connection;
use Ratchet\Tests\Mock\FakeSocket;
use Ratchet\Tests\Mock\Connection;
use Ratchet\Resource\Command\Action\Null as NullAction;
/**
@ -16,7 +15,7 @@ class CompositeTest extends \PHPUnit_Framework_TestCase {
}
protected function newNull() {
return new NullAction(new Connection(new FakeSocket));
return new NullAction(new Connection);
}
public function testCanEnqueueNull() {

View File

@ -1,62 +0,0 @@
<?php
namespace Ratchet\Tests\Resource;
use Ratchet\Resource\Connection;
use Ratchet\Tests\Mock\FakeSocket;
/**
* @covers Ratchet\Resource\Connection
*/
class ConnectionTest extends \PHPUnit_Framework_TestCase {
/**
* @var Ratchet\Tests\Mock\FakeSocket
*/
protected $_fs;
/**
* @var Ratchet\Resource\Connection
*/
protected $_c;
public function setUp() {
$this->_fs = new FakeSocket;
$this->_c = new Connection($this->_fs);
}
public static function keyAndValProvider() {
return array(
array('hello', 'world')
, array('herp', 'derp')
, array('depth', array('hell', 'yes'))
, array('moar', array('hellz' => 'yes'))
);
}
public function testGetSocketReturnsWhatIsSetInConstruct() {
$this->assertSame($this->_fs, $this->_c->getSocket());
}
/**
* @dataProvider keyAndValProvider
*/
public function testCanGetWhatIsSet($key, $val) {
$this->_c->{$key} = $val;
$this->assertEquals($val, $this->_c->{$key});
}
/**
* @dataProvider keyAndValProvider
*/
public function testIssetWorksOnOverloadedVariables($key, $val) {
$this->_c->{$key} = $val;
$this->assertTrue(isset($this->_c->{$key}));
}
/**
* @dataProvider keyAndValProvider
*/
public function testUnsetMakesIssetReturnFalse($key, $val) {
$this->_c->{$key} = $val;
unset($this->_c->{$key});
$this->assertFalse(isset($this->_c->{$key}));
}
}

View File

@ -1,68 +0,0 @@
<?php
namespace Ratchet\Tests\Resource\Socket;
use Ratchet\Tests\Mock\FakeSocket as Socket;
use Ratchet\Resource\Socket\BSDSocket as RealSocket;
/**
* @covers Ratchet\Resource\Socket\BSDSocket
*/
class BSDSocketTest extends \PHPUnit_Framework_TestCase {
protected $_socket;
protected static function getMethod($name) {
$class = new \ReflectionClass('\\Ratchet\\Tests\\Mock\\FakeSocket');
$method = $class->getMethod($name);
$method->setAccessible(true);
return $method;
}
public function setUp() {
$this->_socket = new Socket();
}
/* (1): I may or may not re-enable this test (need to add code back to FakeSocket), not sure if I'll keep this feature at all
public function testGetDefaultConfigForConstruct() {
$ref_conf = static::getMethod('getConfig');
$config = $ref_conf->invokeArgs($this->_socket, array());
$this->assertEquals(array_values(Socket::$_defaults), $config);
}
/**/
public function testInvalidConstructorArguments() {
$this->setExpectedException('\\Ratchet\\Resource\\Socket\\BSDSocketException');
$socket = new RealSocket('invalid', 'param', 'derp');
}
public function testConstructAndCallByOpenAndClose() {
$socket = new RealSocket();
$socket->close();
}
public function asArrayProvider() {
return array(
array(array('hello' => 'world'), array('hello' => 'world'))
, array(null, null)
, array(array('hello' => 'world'), new \ArrayObject(array('hello' => 'world')))
);
}
/**
* (1)
* @dataProvider asArrayProvider
* /
public function testMethodMungforselectReturnsExpectedValues($output, $input) {
$method = static::getMethod('mungForSelect');
$return = $method->invokeArgs($this->_socket, array($input));
$this->assertEquals($return, $output);
}
public function NOPEtestMethodMungforselectRejectsNonTraversable() {
$this->setExpectedException('\\InvalidArgumentException');
$method = static::getMethod('mungForSelect');
$method->invokeArgs($this->_socket, array('I am upset with PHP ATM'));
}
*/
}