From c5cdd68cef508fb46809295ed43b245ae6fb7fd5 Mon Sep 17 00:00:00 2001 From: Chris Boden Date: Tue, 17 Jul 2012 17:06:35 -0400 Subject: [PATCH] [WAMP] Topic manager Separated functionality of Ratchet component and topic manager --- src/Ratchet/Wamp/Topic.php | 4 +- src/Ratchet/Wamp/TopicManager.php | 103 ++++++++++++++++++++++++++++++ src/Ratchet/Wamp/WampServer.php | 78 +++++----------------- 3 files changed, 121 insertions(+), 64 deletions(-) create mode 100644 src/Ratchet/Wamp/TopicManager.php diff --git a/src/Ratchet/Wamp/Topic.php b/src/Ratchet/Wamp/Topic.php index 5af61fa..83529a3 100644 --- a/src/Ratchet/Wamp/Topic.php +++ b/src/Ratchet/Wamp/Topic.php @@ -37,14 +37,14 @@ class Topic implements \IteratorAggregate, \Countable { /** * @param WampConnection */ - public function subscribed(WampConnection $conn) { + public function add(WampConnection $conn) { $this->subscribers->attach($conn); } /** * @param WampConnection */ - public function unsubscribed(WampConnection $conn) { + public function remove(WampConnection $conn) { if ($this->subscribers->contains($conn)) { $this->subscribers->detach($conn); } diff --git a/src/Ratchet/Wamp/TopicManager.php b/src/Ratchet/Wamp/TopicManager.php new file mode 100644 index 0000000..1946b1b --- /dev/null +++ b/src/Ratchet/Wamp/TopicManager.php @@ -0,0 +1,103 @@ +app = $app; + } + + /** + * {@inheritdoc} + */ + public function onOpen(ConnectionInterface $conn) { + $conn->WAMP->topics = new \SplObjectStorage; + $this->app->onOpen($conn); + } + + /** + * {@inheritdoc} + */ + public function onCall(ConnectionInterface $conn, $id, $topic, array $params) { + $this->app->onCall($conn, $id, $this->getTopic($topic), $params); + } + + /** + * {@inheritdoc} + */ + public function onSubscribe(ConnectionInterface $conn, $topic) { + $topicObj = $this->getTopic($topic); + + $conn->WAMP->topics->attach($topicObj); + $this->app->onSubscribe($conn, $topicObj); + } + + /** + * {@inheritdoc} + */ + public function onUnsubscribe(ConnectionInterface $conn, $topic) { + $topicObj = $this->getTopic($topic); + + if ($conn->WAMP->topics->contains($topicobj) { + $conn->WAMP->topics->remove($topicObj); + } + + $this->topicLookup[$topic]->remove($conn); + $this->app->onUnsubscribe($conn, $topicObj); + } + + /** + * {@inheritdoc} + */ + public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude = array(), array $eligible = array()) { + $this->app->onPublish($conn, $this->getTopic($topic), $event, $exclude, $eligible); + } + + /** + * {@inheritdoc} + */ + public function onClose(ConnectionInterface $conn) { + $this->app->onClose($conn); + + foreach ($this->topicLookup as $topic) { + $topic->remove($conn); + } + } + + /** + * {@inheritdoc} + */ + public function onError(ConnectionInterface $conn, \Exception $e) { + $this->app->onError($conn, $e); + } + + /** + * {@inheritdoc} + */ + public function getSubProtocols() { + if ($this->app instanceof WsServerInterface) { + return $this->app->getSubProtocols(); + } + + return array(); + } + + protected function getTopic($topic) { + if (!array_key_exists($topic, $this->topicLookup)) { + $this->topicLookup[$topic] = new Topic($topic); + } + + return $this->topicLookup[$topic]; + } +} \ No newline at end of file diff --git a/src/Ratchet/Wamp/WampServer.php b/src/Ratchet/Wamp/WampServer.php index a63bbdc..dd45a27 100644 --- a/src/Ratchet/Wamp/WampServer.php +++ b/src/Ratchet/Wamp/WampServer.php @@ -1,100 +1,54 @@ protocol = new ServerProtocol($this); + $this->wampProtocol = new ServerProtocol(new TopicManager($app)); } /** * {@inheritdoc} */ public function onOpen(ConnectionInterface $conn) { - if ($conn instanceof WampConnection) { - $this->app->onOpen($conn); - } else { - $conn->WAMP->topics = new \SplObjectStorage; - $this->protocol->onOpen($conn); - } + $this->wampProtocol->onOpen($conn); } /** * {@inheritdoc} */ - public function onCall(ConnectionInterface $conn, $id, $topic, array $params) { - $this->app->onCall($conn, $id, $this->getTopic($topic), $params); - } - - /** - * {@inheritdoc} - */ - public function onSubscribe(ConnectionInterface $conn, $topic) { - $topicObj = $this->getTopic($topic); - - $conn->WAMP->topics->attach($topicObj); - $this->app->onSubscribe($conn, $topicObj); - } - - /** - * {@inheritdoc} - */ - public function onUnsubscribe(ConnectionInterface $conn, $topic) { - $topicObj = $this->getTopic($topic); - - if ($conn->WAMP->topics->contains($topicobj) { - $conn->WAMP->topics->detach($topicObj); - } - - $this->topicLookup[$topic]->remove($conn); - $this->app->onUnsubscribe($conn, $topicObj); - } - - /** - * {@inheritdoc} - */ - public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude = array(), array $eligible = array()) { - $this->app->onPublish($conn, $this->getTopic($topic), $event, $exclude, $eligible); + public function onMessage(ConnectionInterface $conn, $msg) { + $this->wampProtocol->onMessage($conn, $msg); } /** * {@inheritdoc} */ public function onClose(ConnectionInterface $conn) { - $this->app->onClose($conn); - - foreach ($this->topicLookup as $topic) { - $topic->remove($conn); - } + $this->wampProtocol->onClose($conn); } /** * {@inheritdoc} */ public function onError(ConnectionInterface $conn, \Exception $e) { - $this->app->onError($conn, $e); + $this->wampProtocol->onError($conn, $e); } - protected function getTopic($topic) { - if (!array_key_exists($topic, $this->topicLookup)) { - $this->topicLookup[$topic] = new Topic($topic); - } - - return $this->topicLookup[$topic]; + /** + * {@inheritdoc} + */ + public function getSubProtocols() { + return $this->wampProtocol->getSubProtocols(); } } \ No newline at end of file