app = $app; } /** * {@inheritdoc} */ public function onOpen(ConnectionInterface $conn) { $conn->WAMP->topics = new \SplObjectStorage; $this->app->onOpen($conn); } /** * {@inheritdoc} */ public function onCall(ConnectionInterface $conn, $id, $topic, array $params) { $this->app->onCall($conn, $id, $this->getTopic($topic), $params); } /** * {@inheritdoc} */ public function onSubscribe(ConnectionInterface $conn, $topic) { $topicObj = $this->getTopic($topic); $conn->WAMP->topics->attach($topicObj); $this->app->onSubscribe($conn, $topicObj); } /** * {@inheritdoc} */ public function onUnsubscribe(ConnectionInterface $conn, $topic) { $topicObj = $this->getTopic($topic); if ($conn->WAMP->topics->contains($topicobj)) { $conn->WAMP->topics->remove($topicObj); } $this->topicLookup[$topic]->remove($conn); $this->app->onUnsubscribe($conn, $topicObj); } /** * {@inheritdoc} */ public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude = array(), array $eligible = array()) { $this->app->onPublish($conn, $this->getTopic($topic), $event, $exclude, $eligible); } /** * {@inheritdoc} */ public function onClose(ConnectionInterface $conn) { $this->app->onClose($conn); foreach ($this->topicLookup as $topic) { $topic->remove($conn); } } /** * {@inheritdoc} */ public function onError(ConnectionInterface $conn, \Exception $e) { $this->app->onError($conn, $e); } /** * {@inheritdoc} */ public function getSubProtocols() { if ($this->app instanceof WsServerInterface) { return $this->app->getSubProtocols(); } return array(); } /** * @param string * @return Topic */ protected function getTopic($topic) { if (!array_key_exists($topic, $this->topicLookup)) { $this->topicLookup[$topic] = new Topic($topic); } return $this->topicLookup[$topic]; } }