[WAMP][BCB] Init new serer component

Moved WampServer to ServerProtocol
New WampServer component
New Topic class
This commit is contained in:
Chris Boden 2012-07-17 16:57:05 -04:00
parent 3d97c76bcb
commit 5a80a67f03
7 changed files with 288 additions and 125 deletions

View File

@ -8,6 +8,11 @@ CHANGELOG
--- ---
* wamp-topics (devel branch)
* BC: Renamed "WampServer" to "ServerProtocol"
* BC: New "WampServer" component manages Topic containers of subscribed Connections
* 0.2 (2012-TBD) * 0.2 (2012-TBD)
* Ratchet passes every non-binary-frame test from the Autobahn Testsuite * Ratchet passes every non-binary-frame test from the Autobahn Testsuite

10
composer.lock generated
View File

@ -25,15 +25,7 @@
}, },
{ {
"package": "symfony/event-dispatcher", "package": "symfony/event-dispatcher",
"version": "dev-master", "version": "v2.1.0-BETA3"
"alias-pretty-version": "2.1.x-dev",
"alias-version": "2.1.9999999.9999999-dev"
},
{
"package": "symfony/event-dispatcher",
"version": "dev-master",
"source-reference": "v2.1.0-BETA3",
"commit-date": "1342347231"
}, },
{ {
"package": "symfony/http-foundation", "package": "symfony/http-foundation",

View File

@ -0,0 +1,153 @@
<?php
namespace Ratchet\Wamp;
use Ratchet\MessageComponentInterface;
use Ratchet\WebSocket\WsServerInterface;
use Ratchet\ConnectionInterface;
/**
* WebSocket Application Messaging Protocol
*
* @link http://wamp.ws/spec
* @link https://github.com/oberstet/AutobahnJS
*
* +--------------+----+------------------+
* | Message Type | ID | DIRECTION |
* |--------------+----+------------------+
* | WELCOME | 0 | Server-to-Client |
* | PREFIX | 1 | Bi-Directional |
* | CALL | 2 | Client-to-Server |
* | CALL RESULT | 3 | Server-to-Client |
* | CALL ERROR | 4 | Server-to-Client |
* | SUBSCRIBE | 5 | Client-to-Server |
* | UNSUBSCRIBE | 6 | Client-to-Server |
* | PUBLISH | 7 | Client-to-Server |
* | EVENT | 8 | Server-to-Client |
* +--------------+----+------------------+
*/
class ServerProtocol implements MessageComponentInterface, WsServerInterface {
const MSG_WELCOME = 0;
const MSG_PREFIX = 1;
const MSG_CALL = 2;
const MSG_CALL_RESULT = 3;
const MSG_CALL_ERROR = 4;
const MSG_SUBSCRIBE = 5;
const MSG_UNSUBSCRIBE = 6;
const MSG_PUBLISH = 7;
const MSG_EVENT = 8;
/**
* @var WampServerInterface
*/
protected $_decorating;
/**
* @var SplObjectStorage
*/
protected $connections;
/**
* @param WampServerInterface An class to propagate calls through
*/
public function __construct(WampServerInterface $server_component) {
$this->_decorating = $server_component;
$this->connections = new \SplObjectStorage;
}
/**
* {@inheritdoc}
*/
public function getSubProtocols() {
if ($this->_decorating instanceof WsServerInterface) {
$subs = $this->_decorating->getSubProtocols();
$subs[] = 'wamp';
return $subs;
} else {
return array('wamp');
}
}
/**
* {@inheritdoc}
*/
public function onOpen(ConnectionInterface $conn) {
$decor = new WampConnection($conn);
$this->connections->attach($conn, $decor);
$this->_decorating->onOpen($decor);
}
/**
* {@inheritdoc}
* @throws Exception
* @throws JsonException
*/
public function onMessage(ConnectionInterface $from, $msg) {
$from = $this->connections[$from];
if (null === ($json = @json_decode($msg, true))) {
throw new JsonException;
}
switch ($json[0]) {
case static::MSG_PREFIX:
$from->WAMP->prefixes[$json[1]] = $json[2];
break;
case static::MSG_CALL:
array_shift($json);
$callID = array_shift($json);
$procURI = array_shift($json);
if (count($json) == 1 && is_array($json[0])) {
$json = $json[0];
}
$this->_decorating->onCall($from, $callID, $procURI, $json);
break;
case static::MSG_SUBSCRIBE:
$this->_decorating->onSubscribe($from, $from->getUri($json[1]));
break;
case static::MSG_UNSUBSCRIBE:
$this->_decorating->onUnSubscribe($from, $from->getUri($json[1]));
break;
case static::MSG_PUBLISH:
$exclude = (array_key_exists(3, $json) ? $json[3] : null);
if (!is_array($exclude)) {
if (true === (boolean)$exclude) {
$exclude = array($from->WAMP->sessionId);
} else {
$exclude = array();
}
}
$eligible = (array_key_exists(4, $json) ? $json[4] : array());
$this->_decorating->onPublish($from, $from->getUri($json[1]), $json[2], $exclude, $eligible);
break;
default:
throw new Exception('Invalid message type');
}
}
/**
* {@inheritdoc}
*/
public function onClose(ConnectionInterface $conn) {
$decor = $this->connections[$conn];
$this->connections->detach($conn);
$this->_decorating->onClose($decor);
}
/**
* {@inheritdoc}
*/
public function onError(ConnectionInterface $conn, \Exception $e) {
return $this->_decorating->onError($this->connections[$conn], $e);
}
}

View File

@ -0,0 +1,66 @@
<?php
namespace Ratchet\Wamp;
/**
* A topic/channel containing connections that have subscribed to it
*/
class Topic implements \IteratorAggregate, \Countable {
private $id;
private $subscribers;
/**
* @param string Unique ID for this object
*/
public function __construct($topicId) {
$this->id = $topicId;
$this->subscribers = new \SplObjectStorage;
}
/**
* @return string
*/
public function getId() {
return $this->id;
}
/**
* Send a message to all the connectiosn in this topic
* @param string
*/
public function broadcast($msg) {
foreach ($thisi->subscribers as $client) {
$client->event($this->id, $msg);
}
}
/**
* @param WampConnection
*/
public function subscribed(WampConnection $conn) {
$this->subscribers->attach($conn);
}
/**
* @param WampConnection
*/
public function unsubscribed(WampConnection $conn) {
if ($this->subscribers->contains($conn)) {
$this->subscribers->detach($conn);
}
}
/**
* {@inheritdoc}
*/
public function getIterator() {
return $this->subscribers;
}
/**
* {@inheritdoc}
*/
public function count() {
return $this->subscribers->count();
}
}

View File

@ -2,7 +2,7 @@
namespace Ratchet\Wamp; namespace Ratchet\Wamp;
use Ratchet\ConnectionInterface; use Ratchet\ConnectionInterface;
use Ratchet\AbstractConnectionDecorator; use Ratchet\AbstractConnectionDecorator;
use Ratchet\Wamp\WampServer as WAMP; use Ratchet\Wamp\ServerProtocol as WAMP;
/** /**
* A ConnectionInterface object wrapper that is passed to your WAMP application * A ConnectionInterface object wrapper that is passed to your WAMP application

View File

@ -1,153 +1,100 @@
<?php <?php
namespace Ratchet\Wamp; namespace Ratchet\Wamp;
use Ratchet\MessageComponentInterface;
use Ratchet\WebSocket\WsServerInterface;
use Ratchet\ConnectionInterface; use Ratchet\ConnectionInterface;
class WampServer implements MessageComponentInterface, WsServerInterface, WampServerInterface {
/** /**
* WebSocket Application Messaging Protocol * @var ServerProtocol
*
* @link http://wamp.ws/spec
* @link https://github.com/oberstet/AutobahnJS
*
* +--------------+----+------------------+
* | Message Type | ID | DIRECTION |
* |--------------+----+------------------+
* | WELCOME | 0 | Server-to-Client |
* | PREFIX | 1 | Bi-Directional |
* | CALL | 2 | Client-to-Server |
* | CALL RESULT | 3 | Server-to-Client |
* | CALL ERROR | 4 | Server-to-Client |
* | SUBSCRIBE | 5 | Client-to-Server |
* | UNSUBSCRIBE | 6 | Client-to-Server |
* | PUBLISH | 7 | Client-to-Server |
* | EVENT | 8 | Server-to-Client |
* +--------------+----+------------------+
*/ */
class WampServer implements MessageComponentInterface, WsServerInterface { protected $protocol;
const MSG_WELCOME = 0;
const MSG_PREFIX = 1;
const MSG_CALL = 2;
const MSG_CALL_RESULT = 3;
const MSG_CALL_ERROR = 4;
const MSG_SUBSCRIBE = 5;
const MSG_UNSUBSCRIBE = 6;
const MSG_PUBLISH = 7;
const MSG_EVENT = 8;
/** /**
* @var WampServerInterface * @var WampServerInterface
*/ */
protected $_decorating; protected $app;
/** /**
* @var SplObjectStorage * @var array
*/ */
protected $connections; protected $topicLookup = array();
/** public function __construct(WampServerInterface $app) {
* @param WampServerInterface An class to propagate calls through $this->protocol = new ServerProtocol($this);
*/
public function __construct(WampServerInterface $server_component) {
$this->_decorating = $server_component;
$this->connections = new \SplObjectStorage;
}
/**
* {@inheritdoc}
*/
public function getSubProtocols() {
if ($this->_decorating instanceof WsServerInterface) {
$subs = $this->_decorating->getSubProtocols();
$subs[] = 'wamp';
return $subs;
} else {
return array('wamp');
}
} }
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
public function onOpen(ConnectionInterface $conn) { public function onOpen(ConnectionInterface $conn) {
$decor = new WampConnection($conn); if ($conn instanceof WampConnection) {
$this->connections->attach($conn, $decor); $this->app->onOpen($conn);
} else {
$this->_decorating->onOpen($decor); $conn->WAMP->topics = new \SplObjectStorage;
$this->protocol->onOpen($conn);
}
} }
/** /**
* {@inheritdoc} * {@inheritdoc}
* @throws Exception
* @throws JsonException
*/ */
public function onMessage(ConnectionInterface $from, $msg) { public function onCall(ConnectionInterface $conn, $id, $topic, array $params) {
$from = $this->connections[$from]; $this->app->onCall($conn, $id, $this->getTopic($topic), $params);
if (null === ($json = @json_decode($msg, true))) {
throw new JsonException;
} }
switch ($json[0]) { /**
case static::MSG_PREFIX: * {@inheritdoc}
$from->WAMP->prefixes[$json[1]] = $json[2]; */
break; public function onSubscribe(ConnectionInterface $conn, $topic) {
$topicObj = $this->getTopic($topic);
case static::MSG_CALL: $conn->WAMP->topics->attach($topicObj);
array_shift($json); $this->app->onSubscribe($conn, $topicObj);
$callID = array_shift($json);
$procURI = array_shift($json);
if (count($json) == 1 && is_array($json[0])) {
$json = $json[0];
} }
$this->_decorating->onCall($from, $callID, $procURI, $json); /**
break; * {@inheritdoc}
*/
public function onUnsubscribe(ConnectionInterface $conn, $topic) {
$topicObj = $this->getTopic($topic);
case static::MSG_SUBSCRIBE: if ($conn->WAMP->topics->contains($topicobj) {
$this->_decorating->onSubscribe($from, $from->getUri($json[1])); $conn->WAMP->topics->detach($topicObj);
break;
case static::MSG_UNSUBSCRIBE:
$this->_decorating->onUnSubscribe($from, $from->getUri($json[1]));
break;
case static::MSG_PUBLISH:
$exclude = (array_key_exists(3, $json) ? $json[3] : null);
if (!is_array($exclude)) {
if (true === (boolean)$exclude) {
$exclude = array($from->WAMP->sessionId);
} else {
$exclude = array();
}
} }
$eligible = (array_key_exists(4, $json) ? $json[4] : array()); $this->topicLookup[$topic]->remove($conn);
$this->app->onUnsubscribe($conn, $topicObj);
$this->_decorating->onPublish($from, $from->getUri($json[1]), $json[2], $exclude, $eligible);
break;
default:
throw new Exception('Invalid message type');
} }
/**
* {@inheritdoc}
*/
public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude = array(), array $eligible = array()) {
$this->app->onPublish($conn, $this->getTopic($topic), $event, $exclude, $eligible);
} }
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
public function onClose(ConnectionInterface $conn) { public function onClose(ConnectionInterface $conn) {
$decor = $this->connections[$conn]; $this->app->onClose($conn);
$this->connections->detach($conn);
$this->_decorating->onClose($decor); foreach ($this->topicLookup as $topic) {
$topic->remove($conn);
}
} }
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
public function onError(ConnectionInterface $conn, \Exception $e) { public function onError(ConnectionInterface $conn, \Exception $e) {
return $this->_decorating->onError($this->connections[$conn], $e); $this->app->onError($conn, $e);
}
protected function getTopic($topic) {
if (!array_key_exists($topic, $this->topicLookup)) {
$this->topicLookup[$topic] = new Topic($topic);
}
return $this->topicLookup[$topic];
} }
} }

View File

@ -1,23 +1,23 @@
<?php <?php
namespace Ratchet\Tests\Wamp; namespace Ratchet\Tests\Wamp;
use Ratchet\Wamp\WampServer; use Ratchet\Wamp\ServerProtocol;
use Ratchet\Wamp\WampConnection; use Ratchet\Wamp\WampConnection;
use Ratchet\Tests\Mock\Connection; use Ratchet\Tests\Mock\Connection;
use Ratchet\Tests\Mock\WampComponent as TestComponent; use Ratchet\Tests\Mock\WampComponent as TestComponent;
/** /**
* @covers Ratchet\Wamp\WampServer * @covers Ratchet\Wamp\ServerProtocol
* @covers Ratchet\Wamp\WampServerInterface * @covers Ratchet\Wamp\WampServerInterface
* @covers Ratchet\Wamp\WampConnection * @covers Ratchet\Wamp\WampConnection
*/ */
class WampServerTest extends \PHPUnit_Framework_TestCase { class ServerProtocolTest extends \PHPUnit_Framework_TestCase {
protected $_comp; protected $_comp;
protected $_app; protected $_app;
public function setUp() { public function setUp() {
$this->_app = new TestComponent; $this->_app = new TestComponent;
$this->_comp = new WampServer($this->_app); $this->_comp = new ServerProtocol($this->_app);
} }
protected function newConn() { protected function newConn() {