diff --git a/lib/Ratchet/Application/WAMP/App.php b/lib/Ratchet/Application/WAMP/App.php index cf31c92..f726460 100644 --- a/lib/Ratchet/Application/WAMP/App.php +++ b/lib/Ratchet/Application/WAMP/App.php @@ -3,6 +3,9 @@ namespace Ratchet\Application\WAMP; use Ratchet\Application\ApplicationInterface; use Ratchet\Application\WebSocket\WebSocketAppInterface; use Ratchet\Resource\Connection; +use Ratchet\Resource\Command\Composite; +use Ratchet\Resource\Command\CommandInterface; +use Ratchet\Application\WAMP\Command\Action\Prefix; /** * WebSocket Application Messaging Protocol @@ -24,6 +27,13 @@ use Ratchet\Resource\Connection; class App implements WebSocketAppInterface { protected $_app; + /** + * Any server to client prefixes are stored here + * They're taxied along with the next outgoing message + * @var Ratchet\Resource\Command\Composite + */ + protected $_msg_buffer = null; + public function getSubProtocol() { return 'wamp'; } @@ -31,13 +41,20 @@ class App implements WebSocketAppInterface { /** * @todo WAMP spec does not say what to do when there is an error with PREFIX... */ - public function addPrefix(Connection $conn, $curie, $uri) { + public function addPrefix(Connection $conn, $curie, $uri, $from_server = false) { // validate uri // validate curie // make sure the curie is shorter than the uri $conn->WAMP->prefixes[$curie] = $uri; + + if ($from_server) { + $prefix = new Prefix($conn); + $prefix->setPrefix($curie, $uri); + + $this->_msg_buffer->enqueue($prefix); + } } public function onOpen(Connection $conn) { @@ -45,6 +62,11 @@ class App implements WebSocketAppInterface { $conn->WAMP->prefixes = array(); $conn->WAMP->subscriptions = array(); + $wamp = $this; + $conn->WAMP->addPrefix = function($curie, $uri) use ($wamp, $conn) { + $wamp->addPrefix($conn, $curie, $uri, true); + }; + return $this->_app->onOpen($conn); } @@ -60,7 +82,7 @@ class App implements WebSocketAppInterface { switch ($json[0]) { case 1: - return $this->addPrefix($from, $json[1], $json[2]); + $ret = $this->addPrefix($from, $json[1], $json[2]); break; case 2: @@ -72,24 +94,26 @@ class App implements WebSocketAppInterface { $json = $json[0]; } - return $this->_app->onCall($from, $callID, $procURI, $json); + $ret = $this->_app->onCall($from, $callID, $procURI, $json); break; case 5: - return $this->_app->onSubscribe($from, $this->getUri($from, $json[1])); + $ret = $this->_app->onSubscribe($from, $this->getUri($from, $json[1])); break; case 6: - return $this->_app->onUnSubscribe($from, $this->getUri($from, $json[1])); + $ret = $this->_app->onUnSubscribe($from, $this->getUri($from, $json[1])); break; case 7: - return $this->_app->onPublish($from, $this->getUri($from, $json[1]), $json[2]); + $ret = $this->_app->onPublish($from, $this->getUri($from, $json[1]), $json[2]); break; default: throw new Exception('Invalid message type'); } + + return $this->attachStack($ret); } /** @@ -102,8 +126,22 @@ class App implements WebSocketAppInterface { return (isset($conn->WAMP->prefixes[$uri]) ? $conn->WAMP->prefixes[$uri] : $uri); } + /** + * @param Ratchet\Resource\Command\CommandInterface|NULL + * @return Ratchet\Resource\Command\Composite + */ + protected function attachStack(CommandInterface $command = null) { + $stack = $this->_msg_buffer; + $stack->enqueue($command); + + $this->_msg_buffer = new Composite; + + return $stack; + } + public function __construct(ServerInterface $app) { - $this->_app = $app; + $this->_app = $app; + $this->_msg_buffer = new Composite; } public function onClose(Connection $conn) { diff --git a/lib/Ratchet/Application/WAMP/Command/Action/Prefix.php b/lib/Ratchet/Application/WAMP/Command/Action/Prefix.php new file mode 100644 index 0000000..56a5d00 --- /dev/null +++ b/lib/Ratchet/Application/WAMP/Command/Action/Prefix.php @@ -0,0 +1,36 @@ +_curie = $curie; + $this->_uri = $uri; + + return $this->setMessage(json_encode(array(1, $curie, $uri))); + } + + /** + * @return string + */ + public function getCurie() { + return $this->_curie; + } + + /** + * @return string + */ + public function getUri() { + return $this->_uri; + } +}