Merge branch 'refs/heads/topic-retain'
This commit is contained in:
commit
0a501fef5d
@ -6,6 +6,13 @@ use Ratchet\ConnectionInterface;
|
|||||||
* A topic/channel containing connections that have subscribed to it
|
* A topic/channel containing connections that have subscribed to it
|
||||||
*/
|
*/
|
||||||
class Topic implements \IteratorAggregate, \Countable {
|
class Topic implements \IteratorAggregate, \Countable {
|
||||||
|
/**
|
||||||
|
* If true the TopicManager will destroy this object if it's ever empty of connections
|
||||||
|
* @deprecated in v0.4
|
||||||
|
* @type bool
|
||||||
|
*/
|
||||||
|
public $autoDelete = false;
|
||||||
|
|
||||||
private $id;
|
private $id;
|
||||||
|
|
||||||
private $subscribers;
|
private $subscribers;
|
||||||
|
@ -54,13 +54,12 @@ class TopicManager implements WsServerInterface, WampServerInterface {
|
|||||||
public function onUnsubscribe(ConnectionInterface $conn, $topic) {
|
public function onUnsubscribe(ConnectionInterface $conn, $topic) {
|
||||||
$topicObj = $this->getTopic($topic);
|
$topicObj = $this->getTopic($topic);
|
||||||
|
|
||||||
if ($conn->WAMP->subscriptions->contains($topicObj)) {
|
if (!$conn->WAMP->subscriptions->contains($topicObj)) {
|
||||||
$conn->WAMP->subscriptions->detach($topicObj);
|
|
||||||
} else {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->topicLookup[$topic]->remove($conn);
|
$this->cleanTopic($topicObj, $conn);
|
||||||
|
|
||||||
$this->app->onUnsubscribe($conn, $topicObj);
|
$this->app->onUnsubscribe($conn, $topicObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -77,11 +76,8 @@ class TopicManager implements WsServerInterface, WampServerInterface {
|
|||||||
public function onClose(ConnectionInterface $conn) {
|
public function onClose(ConnectionInterface $conn) {
|
||||||
$this->app->onClose($conn);
|
$this->app->onClose($conn);
|
||||||
|
|
||||||
foreach ($this->topicLookup as $topic => $storage) {
|
foreach ($this->topicLookup as $topic) {
|
||||||
$storage->remove($conn);
|
$this->cleanTopic($topic, $conn);
|
||||||
if (0 === $storage->count()) {
|
|
||||||
unset($this->topicLookup[$topic]);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -114,4 +110,16 @@ class TopicManager implements WsServerInterface, WampServerInterface {
|
|||||||
|
|
||||||
return $this->topicLookup[$topic];
|
return $this->topicLookup[$topic];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected function cleanTopic(Topic $topic, ConnectionInterface $conn) {
|
||||||
|
if ($conn->WAMP->subscriptions->contains($topic)) {
|
||||||
|
$conn->WAMP->subscriptions->detach($topic);
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->topicLookup[$topic->getId()]->remove($conn);
|
||||||
|
|
||||||
|
if ($topic->autoDelete && 0 === $topic->count()) {
|
||||||
|
unset($this->topicLookup[$topic->getId()]);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -159,9 +159,7 @@ class TopicManagerTest extends \PHPUnit_Framework_TestCase {
|
|||||||
$this->mngr->onClose($this->conn);
|
$this->mngr->onClose($this->conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testConnIsRemovedFromTopicOnClose() {
|
protected function topicProvider($name) {
|
||||||
$name = 'State testing';
|
|
||||||
|
|
||||||
$class = new \ReflectionClass('Ratchet\Wamp\TopicManager');
|
$class = new \ReflectionClass('Ratchet\Wamp\TopicManager');
|
||||||
$method = $class->getMethod('getTopic');
|
$method = $class->getMethod('getTopic');
|
||||||
$method->setAccessible(true);
|
$method->setAccessible(true);
|
||||||
@ -171,14 +169,42 @@ class TopicManagerTest extends \PHPUnit_Framework_TestCase {
|
|||||||
|
|
||||||
$topic = $method->invokeArgs($this->mngr, array($name));
|
$topic = $method->invokeArgs($this->mngr, array($name));
|
||||||
|
|
||||||
|
return array($topic, $attribute);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testConnIsRemovedFromTopicOnClose() {
|
||||||
|
$name = 'State Testing';
|
||||||
|
list($topic, $attribute) = $this->topicProvider($name);
|
||||||
|
|
||||||
$this->assertCount(1, $attribute->getValue($this->mngr));
|
$this->assertCount(1, $attribute->getValue($this->mngr));
|
||||||
|
|
||||||
$this->mngr->onSubscribe($this->conn, $name);
|
$this->mngr->onSubscribe($this->conn, $name);
|
||||||
$this->mngr->onClose($this->conn);
|
$this->mngr->onClose($this->conn);
|
||||||
|
|
||||||
$this->assertFalse($topic->has($this->conn));
|
$this->assertFalse($topic->has($this->conn));
|
||||||
|
}
|
||||||
|
|
||||||
$this->assertCount(0, $attribute->getValue($this->mngr));
|
public static function topicConnExpectationProvider() {
|
||||||
|
return array(
|
||||||
|
array(true, 'onClose', 0)
|
||||||
|
, array(true, 'onUnsubscribe', 0)
|
||||||
|
, array(false, 'onClose', 1)
|
||||||
|
, array(false, 'onUnsubscribe', 1)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @dataProvider topicConnExpectationProvider
|
||||||
|
*/
|
||||||
|
public function testTopicRetentionFromLeavingConnections($autoDelete, $methodCall, $expectation) {
|
||||||
|
$topicName = 'checkTopic';
|
||||||
|
list($topic, $attribute) = $this->topicProvider($topicName);
|
||||||
|
$topic->autoDelete = $autoDelete;
|
||||||
|
|
||||||
|
$this->mngr->onSubscribe($this->conn, $topicName);
|
||||||
|
call_user_func_array(array($this->mngr, $methodCall), array($this->conn, $topicName));
|
||||||
|
|
||||||
|
$this->assertCount($expectation, $attribute->getValue($this->mngr));
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testOnErrorBubbles() {
|
public function testOnErrorBubbles() {
|
||||||
|
Loading…
Reference in New Issue
Block a user