[WAMP] Topic manager

Separated functionality of Ratchet component and topic manager
This commit is contained in:
Chris Boden 2012-07-17 17:06:35 -04:00
parent 5a80a67f03
commit c5cdd68cef
3 changed files with 121 additions and 64 deletions

View File

@ -37,14 +37,14 @@ class Topic implements \IteratorAggregate, \Countable {
/** /**
* @param WampConnection * @param WampConnection
*/ */
public function subscribed(WampConnection $conn) { public function add(WampConnection $conn) {
$this->subscribers->attach($conn); $this->subscribers->attach($conn);
} }
/** /**
* @param WampConnection * @param WampConnection
*/ */
public function unsubscribed(WampConnection $conn) { public function remove(WampConnection $conn) {
if ($this->subscribers->contains($conn)) { if ($this->subscribers->contains($conn)) {
$this->subscribers->detach($conn); $this->subscribers->detach($conn);
} }

View File

@ -0,0 +1,103 @@
<?php
namespace Ratchet\Wamp;
use Ratchet\ConnectionInterface;
use Ratchet\WebSocket\WsServerInterface;
class TopicManager implements WsServerInterface, WampServerInterface {
/**
* @var WampServerInterface
*/
protected $app;
/**
* @var array
*/
protected $topicLookup = array();
public function __construct(WampServerInterface $app) {
$this->app = $app;
}
/**
* {@inheritdoc}
*/
public function onOpen(ConnectionInterface $conn) {
$conn->WAMP->topics = new \SplObjectStorage;
$this->app->onOpen($conn);
}
/**
* {@inheritdoc}
*/
public function onCall(ConnectionInterface $conn, $id, $topic, array $params) {
$this->app->onCall($conn, $id, $this->getTopic($topic), $params);
}
/**
* {@inheritdoc}
*/
public function onSubscribe(ConnectionInterface $conn, $topic) {
$topicObj = $this->getTopic($topic);
$conn->WAMP->topics->attach($topicObj);
$this->app->onSubscribe($conn, $topicObj);
}
/**
* {@inheritdoc}
*/
public function onUnsubscribe(ConnectionInterface $conn, $topic) {
$topicObj = $this->getTopic($topic);
if ($conn->WAMP->topics->contains($topicobj) {
$conn->WAMP->topics->remove($topicObj);
}
$this->topicLookup[$topic]->remove($conn);
$this->app->onUnsubscribe($conn, $topicObj);
}
/**
* {@inheritdoc}
*/
public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude = array(), array $eligible = array()) {
$this->app->onPublish($conn, $this->getTopic($topic), $event, $exclude, $eligible);
}
/**
* {@inheritdoc}
*/
public function onClose(ConnectionInterface $conn) {
$this->app->onClose($conn);
foreach ($this->topicLookup as $topic) {
$topic->remove($conn);
}
}
/**
* {@inheritdoc}
*/
public function onError(ConnectionInterface $conn, \Exception $e) {
$this->app->onError($conn, $e);
}
/**
* {@inheritdoc}
*/
public function getSubProtocols() {
if ($this->app instanceof WsServerInterface) {
return $this->app->getSubProtocols();
}
return array();
}
protected function getTopic($topic) {
if (!array_key_exists($topic, $this->topicLookup)) {
$this->topicLookup[$topic] = new Topic($topic);
}
return $this->topicLookup[$topic];
}
}

View File

@ -1,100 +1,54 @@
<?php <?php
namespace Ratchet\Wamp; namespace Ratchet\Wamp;
use Ratchet\MessageComponentInterface;
use Ratchet\WebSocket\WsServerInterface;
use Ratchet\ConnectionInterface; use Ratchet\ConnectionInterface;
class WampServer implements MessageComponentInterface, WsServerInterface, WampServerInterface { class WampServer implements MessageComponentInterface, WsServerInterface {
/** /**
* @var ServerProtocol * @var ServerProtocol
*/ */
protected $protocol; private $wampProtocol;
/** /**
* @var WampServerInterface * {@inheritdoc}
*/ */
protected $app;
/**
* @var array
*/
protected $topicLookup = array();
public function __construct(WampServerInterface $app) { public function __construct(WampServerInterface $app) {
$this->protocol = new ServerProtocol($this); $this->wampProtocol = new ServerProtocol(new TopicManager($app));
} }
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
public function onOpen(ConnectionInterface $conn) { public function onOpen(ConnectionInterface $conn) {
if ($conn instanceof WampConnection) { $this->wampProtocol->onOpen($conn);
$this->app->onOpen($conn);
} else {
$conn->WAMP->topics = new \SplObjectStorage;
$this->protocol->onOpen($conn);
}
} }
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
public function onCall(ConnectionInterface $conn, $id, $topic, array $params) { public function onMessage(ConnectionInterface $conn, $msg) {
$this->app->onCall($conn, $id, $this->getTopic($topic), $params); $this->wampProtocol->onMessage($conn, $msg);
}
/**
* {@inheritdoc}
*/
public function onSubscribe(ConnectionInterface $conn, $topic) {
$topicObj = $this->getTopic($topic);
$conn->WAMP->topics->attach($topicObj);
$this->app->onSubscribe($conn, $topicObj);
}
/**
* {@inheritdoc}
*/
public function onUnsubscribe(ConnectionInterface $conn, $topic) {
$topicObj = $this->getTopic($topic);
if ($conn->WAMP->topics->contains($topicobj) {
$conn->WAMP->topics->detach($topicObj);
}
$this->topicLookup[$topic]->remove($conn);
$this->app->onUnsubscribe($conn, $topicObj);
}
/**
* {@inheritdoc}
*/
public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude = array(), array $eligible = array()) {
$this->app->onPublish($conn, $this->getTopic($topic), $event, $exclude, $eligible);
} }
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
public function onClose(ConnectionInterface $conn) { public function onClose(ConnectionInterface $conn) {
$this->app->onClose($conn); $this->wampProtocol->onClose($conn);
foreach ($this->topicLookup as $topic) {
$topic->remove($conn);
}
} }
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
public function onError(ConnectionInterface $conn, \Exception $e) { public function onError(ConnectionInterface $conn, \Exception $e) {
$this->app->onError($conn, $e); $this->wampProtocol->onError($conn, $e);
} }
protected function getTopic($topic) { /**
if (!array_key_exists($topic, $this->topicLookup)) { * {@inheritdoc}
$this->topicLookup[$topic] = new Topic($topic); */
} public function getSubProtocols() {
return $this->wampProtocol->getSubProtocols();
return $this->topicLookup[$topic];
} }
} }