From 5a80a67f030ac3b1125c58337b65723070a89314 Mon Sep 17 00:00:00 2001 From: Chris Boden Date: Tue, 17 Jul 2012 16:57:05 -0400 Subject: [PATCH] [WAMP][BCB] Init new serer component Moved WampServer to ServerProtocol New WampServer component New Topic class --- CHANGELOG.md | 5 + composer.lock | 10 +- src/Ratchet/Wamp/ServerProtocol.php | 153 ++++++++++++++++ src/Ratchet/Wamp/Topic.php | 66 +++++++ src/Ratchet/Wamp/WampConnection.php | 2 +- src/Ratchet/Wamp/WampServer.php | 169 ++++++------------ ...pServerTest.php => ServerProtocolTest.php} | 8 +- 7 files changed, 288 insertions(+), 125 deletions(-) create mode 100644 src/Ratchet/Wamp/ServerProtocol.php create mode 100644 src/Ratchet/Wamp/Topic.php rename tests/Ratchet/Tests/Wamp/{WampServerTest.php => ServerProtocolTest.php} (97%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c706a9..cfc9741 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,11 @@ CHANGELOG --- +* wamp-topics (devel branch) + + * BC: Renamed "WampServer" to "ServerProtocol" + * BC: New "WampServer" component manages Topic containers of subscribed Connections + * 0.2 (2012-TBD) * Ratchet passes every non-binary-frame test from the Autobahn Testsuite diff --git a/composer.lock b/composer.lock index a3f6490..dbdf524 100644 --- a/composer.lock +++ b/composer.lock @@ -25,15 +25,7 @@ }, { "package": "symfony/event-dispatcher", - "version": "dev-master", - "alias-pretty-version": "2.1.x-dev", - "alias-version": "2.1.9999999.9999999-dev" - }, - { - "package": "symfony/event-dispatcher", - "version": "dev-master", - "source-reference": "v2.1.0-BETA3", - "commit-date": "1342347231" + "version": "v2.1.0-BETA3" }, { "package": "symfony/http-foundation", diff --git a/src/Ratchet/Wamp/ServerProtocol.php b/src/Ratchet/Wamp/ServerProtocol.php new file mode 100644 index 0000000..128ab88 --- /dev/null +++ b/src/Ratchet/Wamp/ServerProtocol.php @@ -0,0 +1,153 @@ +_decorating = $server_component; + $this->connections = new \SplObjectStorage; + } + + /** + * {@inheritdoc} + */ + public function getSubProtocols() { + if ($this->_decorating instanceof WsServerInterface) { + $subs = $this->_decorating->getSubProtocols(); + $subs[] = 'wamp'; + + return $subs; + } else { + return array('wamp'); + } + } + + /** + * {@inheritdoc} + */ + public function onOpen(ConnectionInterface $conn) { + $decor = new WampConnection($conn); + $this->connections->attach($conn, $decor); + + $this->_decorating->onOpen($decor); + } + + /** + * {@inheritdoc} + * @throws Exception + * @throws JsonException + */ + public function onMessage(ConnectionInterface $from, $msg) { + $from = $this->connections[$from]; + + if (null === ($json = @json_decode($msg, true))) { + throw new JsonException; + } + + switch ($json[0]) { + case static::MSG_PREFIX: + $from->WAMP->prefixes[$json[1]] = $json[2]; + break; + + case static::MSG_CALL: + array_shift($json); + $callID = array_shift($json); + $procURI = array_shift($json); + + if (count($json) == 1 && is_array($json[0])) { + $json = $json[0]; + } + + $this->_decorating->onCall($from, $callID, $procURI, $json); + break; + + case static::MSG_SUBSCRIBE: + $this->_decorating->onSubscribe($from, $from->getUri($json[1])); + break; + + case static::MSG_UNSUBSCRIBE: + $this->_decorating->onUnSubscribe($from, $from->getUri($json[1])); + break; + + case static::MSG_PUBLISH: + $exclude = (array_key_exists(3, $json) ? $json[3] : null); + if (!is_array($exclude)) { + if (true === (boolean)$exclude) { + $exclude = array($from->WAMP->sessionId); + } else { + $exclude = array(); + } + } + + $eligible = (array_key_exists(4, $json) ? $json[4] : array()); + + $this->_decorating->onPublish($from, $from->getUri($json[1]), $json[2], $exclude, $eligible); + break; + + default: + throw new Exception('Invalid message type'); + } + } + + /** + * {@inheritdoc} + */ + public function onClose(ConnectionInterface $conn) { + $decor = $this->connections[$conn]; + $this->connections->detach($conn); + + $this->_decorating->onClose($decor); + } + + /** + * {@inheritdoc} + */ + public function onError(ConnectionInterface $conn, \Exception $e) { + return $this->_decorating->onError($this->connections[$conn], $e); + } +} \ No newline at end of file diff --git a/src/Ratchet/Wamp/Topic.php b/src/Ratchet/Wamp/Topic.php new file mode 100644 index 0000000..5af61fa --- /dev/null +++ b/src/Ratchet/Wamp/Topic.php @@ -0,0 +1,66 @@ +id = $topicId; + $this->subscribers = new \SplObjectStorage; + } + + /** + * @return string + */ + public function getId() { + return $this->id; + } + + /** + * Send a message to all the connectiosn in this topic + * @param string + */ + public function broadcast($msg) { + foreach ($thisi->subscribers as $client) { + $client->event($this->id, $msg); + } + } + + /** + * @param WampConnection + */ + public function subscribed(WampConnection $conn) { + $this->subscribers->attach($conn); + } + + /** + * @param WampConnection + */ + public function unsubscribed(WampConnection $conn) { + if ($this->subscribers->contains($conn)) { + $this->subscribers->detach($conn); + } + } + + /** + * {@inheritdoc} + */ + public function getIterator() { + return $this->subscribers; + } + + /** + * {@inheritdoc} + */ + public function count() { + return $this->subscribers->count(); + } +} \ No newline at end of file diff --git a/src/Ratchet/Wamp/WampConnection.php b/src/Ratchet/Wamp/WampConnection.php index e99b8e5..e625060 100644 --- a/src/Ratchet/Wamp/WampConnection.php +++ b/src/Ratchet/Wamp/WampConnection.php @@ -2,7 +2,7 @@ namespace Ratchet\Wamp; use Ratchet\ConnectionInterface; use Ratchet\AbstractConnectionDecorator; -use Ratchet\Wamp\WampServer as WAMP; +use Ratchet\Wamp\ServerProtocol as WAMP; /** * A ConnectionInterface object wrapper that is passed to your WAMP application diff --git a/src/Ratchet/Wamp/WampServer.php b/src/Ratchet/Wamp/WampServer.php index 6b6b8be..a63bbdc 100644 --- a/src/Ratchet/Wamp/WampServer.php +++ b/src/Ratchet/Wamp/WampServer.php @@ -1,153 +1,100 @@ _decorating = $server_component; - $this->connections = new \SplObjectStorage; - } - - /** - * {@inheritdoc} - */ - public function getSubProtocols() { - if ($this->_decorating instanceof WsServerInterface) { - $subs = $this->_decorating->getSubProtocols(); - $subs[] = 'wamp'; - - return $subs; - } else { - return array('wamp'); - } + public function __construct(WampServerInterface $app) { + $this->protocol = new ServerProtocol($this); } /** * {@inheritdoc} */ public function onOpen(ConnectionInterface $conn) { - $decor = new WampConnection($conn); - $this->connections->attach($conn, $decor); - - $this->_decorating->onOpen($decor); + if ($conn instanceof WampConnection) { + $this->app->onOpen($conn); + } else { + $conn->WAMP->topics = new \SplObjectStorage; + $this->protocol->onOpen($conn); + } } /** * {@inheritdoc} - * @throws Exception - * @throws JsonException */ - public function onMessage(ConnectionInterface $from, $msg) { - $from = $this->connections[$from]; + public function onCall(ConnectionInterface $conn, $id, $topic, array $params) { + $this->app->onCall($conn, $id, $this->getTopic($topic), $params); + } - if (null === ($json = @json_decode($msg, true))) { - throw new JsonException; + /** + * {@inheritdoc} + */ + public function onSubscribe(ConnectionInterface $conn, $topic) { + $topicObj = $this->getTopic($topic); + + $conn->WAMP->topics->attach($topicObj); + $this->app->onSubscribe($conn, $topicObj); + } + + /** + * {@inheritdoc} + */ + public function onUnsubscribe(ConnectionInterface $conn, $topic) { + $topicObj = $this->getTopic($topic); + + if ($conn->WAMP->topics->contains($topicobj) { + $conn->WAMP->topics->detach($topicObj); } - switch ($json[0]) { - case static::MSG_PREFIX: - $from->WAMP->prefixes[$json[1]] = $json[2]; - break; + $this->topicLookup[$topic]->remove($conn); + $this->app->onUnsubscribe($conn, $topicObj); + } - case static::MSG_CALL: - array_shift($json); - $callID = array_shift($json); - $procURI = array_shift($json); - - if (count($json) == 1 && is_array($json[0])) { - $json = $json[0]; - } - - $this->_decorating->onCall($from, $callID, $procURI, $json); - break; - - case static::MSG_SUBSCRIBE: - $this->_decorating->onSubscribe($from, $from->getUri($json[1])); - break; - - case static::MSG_UNSUBSCRIBE: - $this->_decorating->onUnSubscribe($from, $from->getUri($json[1])); - break; - - case static::MSG_PUBLISH: - $exclude = (array_key_exists(3, $json) ? $json[3] : null); - if (!is_array($exclude)) { - if (true === (boolean)$exclude) { - $exclude = array($from->WAMP->sessionId); - } else { - $exclude = array(); - } - } - - $eligible = (array_key_exists(4, $json) ? $json[4] : array()); - - $this->_decorating->onPublish($from, $from->getUri($json[1]), $json[2], $exclude, $eligible); - break; - - default: - throw new Exception('Invalid message type'); - } + /** + * {@inheritdoc} + */ + public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude = array(), array $eligible = array()) { + $this->app->onPublish($conn, $this->getTopic($topic), $event, $exclude, $eligible); } /** * {@inheritdoc} */ public function onClose(ConnectionInterface $conn) { - $decor = $this->connections[$conn]; - $this->connections->detach($conn); + $this->app->onClose($conn); - $this->_decorating->onClose($decor); + foreach ($this->topicLookup as $topic) { + $topic->remove($conn); + } } /** * {@inheritdoc} */ public function onError(ConnectionInterface $conn, \Exception $e) { - return $this->_decorating->onError($this->connections[$conn], $e); + $this->app->onError($conn, $e); + } + + protected function getTopic($topic) { + if (!array_key_exists($topic, $this->topicLookup)) { + $this->topicLookup[$topic] = new Topic($topic); + } + + return $this->topicLookup[$topic]; } } \ No newline at end of file diff --git a/tests/Ratchet/Tests/Wamp/WampServerTest.php b/tests/Ratchet/Tests/Wamp/ServerProtocolTest.php similarity index 97% rename from tests/Ratchet/Tests/Wamp/WampServerTest.php rename to tests/Ratchet/Tests/Wamp/ServerProtocolTest.php index ae46143..4dcb28e 100644 --- a/tests/Ratchet/Tests/Wamp/WampServerTest.php +++ b/tests/Ratchet/Tests/Wamp/ServerProtocolTest.php @@ -1,23 +1,23 @@ _app = new TestComponent; - $this->_comp = new WampServer($this->_app); + $this->_comp = new ServerProtocol($this->_app); } protected function newConn() {