_master = $host; $socket = $host->getResource(); $this->_resources[] = $socket; if (null === $logger) { $logger = new NullLogger; } $this->_log = $logger; $this->_connections = new \ArrayIterator(array()); $this->_app = $application; } /** * @return ArrayIterator of SocketInterfaces */ public function getIterator() { return $this->_connections; } /* * @param mixed The address to listen for incoming connections on. "0.0.0.0" to listen from anywhere * @param int The port to listen to connections on * @throws Exception * @todo Validate address. Use socket_get_option, if AF_INET must be IP, if AF_UNIX must be path * @todo Should I make handling open/close/msg an application? */ public function run($address = '127.0.0.1', $port = 1025) { set_time_limit(0); ob_implicit_flush(); if (false === ($this->_master->bind($address, (int)$port))) { throw new Exception; } if (false === ($this->_master->listen())) { throw new Exception; } $this->_master->set_nonblock(); do { try { $changed = $this->_resources; $num_changed = $this->_master->select($changed, $write = null, $except = null, null); foreach($changed as $resource) { if ($this->_master->getResource() === $resource) { $res = $this->onOpen($this->_master); } else { $conn = $this->_connections[$resource]; $data = null; $bytes = $conn->recv($data, 4096, 0); if ($bytes == 0) { $res = $this->onClose($conn); } else { $res = $this->onRecv($conn, $data); } } if ($res instanceof CommandInterface) { $res->execute(); } } } catch (Exception $se) { $this->_log->error($se->getMessage()); } catch (\Exception $e) { $this->_log->error('Big uh oh: ' . $e->getMessage()); } } while (true); // $this->_master->set_nonblock(); // declare(ticks = 1); } public function onOpen(SocketInterface $conn) { $new_connection = clone $conn; $this->_resources[] = $new_connection->getResource(); $this->_connections[$new_connection->getResource()] = $new_connection; $this->_log->note('New connection, ' . count($this->_connections) . ' total'); return $this->_app->onOpen($new_connection); } public function onRecv(SocketInterface $from, $msg) { $this->_log->note('New message "' . trim($msg) . '"'); return $this->_app->onRecv($from, $msg); } /** * @todo Make sure it's OK to executre the command after resources have been free'd */ public function onClose(SocketInterface $conn) { $resource = $conn->getResource(); $cmd = $this->_app->onClose($conn); unset($this->_connections[$resource]); unset($this->_resources[array_search($resource, $this->_resources)]); $this->_log->note('Connection closed, ' . count($this->_connections) . ' connections remain'); return $cmd; } }