Socket Buffering
Server now buffers incoming messages until it (thinks it) receives the full message. Slight tweak of HyBi-10: spacing, FIN indicator, continuation frame recognition Sockets close() if container is destroyed
This commit is contained in:
		
							parent
							
								
									bf0787b7cd
								
							
						
					
					
						commit
						ad258e6eaa
					
				@ -36,70 +36,82 @@ class HyBi10 implements VersionInterface {
 | 
				
			|||||||
     */
 | 
					     */
 | 
				
			||||||
    public function unframe($message) {
 | 
					    public function unframe($message) {
 | 
				
			||||||
        $data        = $message;
 | 
					        $data        = $message;
 | 
				
			||||||
		$mask        = $payloadLength = $unmaskedPayload = '';
 | 
					        $mask        = $payloadLength = $unmaskedPayload = '';
 | 
				
			||||||
		$decodedData = array();
 | 
					        $decodedData = array();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// estimate frame type:
 | 
					        // estimate frame type:
 | 
				
			||||||
		$firstByteBinary  = sprintf('%08b', ord($data[0]));		
 | 
					        $firstByteBinary  = sprintf('%08b', ord($data[0]));
 | 
				
			||||||
		$secondByteBinary = sprintf('%08b', ord($data[1]));
 | 
					        $finIndicator     = bindec(substr($firstByteBinary, 0, 1));
 | 
				
			||||||
		$opcode           = bindec(substr($firstByteBinary, 4, 4));
 | 
					        $opcode           = bindec(substr($firstByteBinary, 4, 4));
 | 
				
			||||||
		$isMasked         = ($secondByteBinary[0] == '1') ? true : false;
 | 
					 | 
				
			||||||
		$payloadLength    = ord($data[1]) & 127;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// close connection if unmasked frame is received:
 | 
					        $secondByteBinary = sprintf('%08b', ord($data[1]));
 | 
				
			||||||
		if($isMasked === false) {
 | 
					        $isMasked         = ($secondByteBinary[0] == '1') ? true : false;
 | 
				
			||||||
 | 
					        $payloadLength    = ord($data[1]) & 127;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // close connection if unmasked frame is received:
 | 
				
			||||||
 | 
					        if($isMasked === false) {
 | 
				
			||||||
            throw new \UnexpectedValueException('Masked byte is false');
 | 
					            throw new \UnexpectedValueException('Masked byte is false');
 | 
				
			||||||
		}
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		switch($opcode) {
 | 
					        switch($opcode) {
 | 
				
			||||||
			// text frame:
 | 
					            // continuation frame
 | 
				
			||||||
			case 1:
 | 
					            case 0:
 | 
				
			||||||
				$decodedData['type'] = 'text';				
 | 
					                $decodedData['type'] = 'text'; // incomplete
 | 
				
			||||||
			break;
 | 
					            break;
 | 
				
			||||||
			
 | 
					
 | 
				
			||||||
			// connection close frame:
 | 
					            // text frame:
 | 
				
			||||||
			case 8:
 | 
					            case 1:
 | 
				
			||||||
				$decodedData['type'] = 'close';
 | 
					                $decodedData['type'] = 'text';  
 | 
				
			||||||
			break;
 | 
					            break;
 | 
				
			||||||
			
 | 
					
 | 
				
			||||||
			// ping frame:
 | 
					            // binary data frame
 | 
				
			||||||
			case 9:
 | 
					            case 2:
 | 
				
			||||||
				$decodedData['type'] = 'ping';				
 | 
					                $decodedData['type'] = 'binary';
 | 
				
			||||||
			break;
 | 
					            break;
 | 
				
			||||||
			
 | 
					
 | 
				
			||||||
			// pong frame:
 | 
					            // connection close frame:
 | 
				
			||||||
			case 10:
 | 
					            case 8:
 | 
				
			||||||
				$decodedData['type'] = 'pong';
 | 
					                $decodedData['type'] = 'close';
 | 
				
			||||||
			break;
 | 
					            break;
 | 
				
			||||||
			
 | 
					
 | 
				
			||||||
			default:
 | 
					            // ping frame:
 | 
				
			||||||
				// Close connection on unknown opcode:
 | 
					            case 9:
 | 
				
			||||||
 | 
					                $decodedData['type'] = 'ping';                
 | 
				
			||||||
 | 
					            break;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            // pong frame:
 | 
				
			||||||
 | 
					            case 10:
 | 
				
			||||||
 | 
					                $decodedData['type'] = 'pong';
 | 
				
			||||||
 | 
					            break;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            default:
 | 
				
			||||||
 | 
					                // Close connection on unknown opcode:
 | 
				
			||||||
                throw new \UnexpectedValueException("Unknown opcode ({$opcode})");
 | 
					                throw new \UnexpectedValueException("Unknown opcode ({$opcode})");
 | 
				
			||||||
			break;
 | 
					            break;
 | 
				
			||||||
		}
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if($payloadLength === 126) {
 | 
					        if($payloadLength === 126) {
 | 
				
			||||||
		   $mask = substr($data, 4, 4);
 | 
					           $mask = substr($data, 4, 4);
 | 
				
			||||||
		   $payloadOffset = 8;
 | 
					           $payloadOffset = 8;
 | 
				
			||||||
		} elseif($payloadLength === 127) {
 | 
					        } elseif($payloadLength === 127) {
 | 
				
			||||||
			$mask = substr($data, 10, 4);
 | 
					            $mask = substr($data, 10, 4);
 | 
				
			||||||
			$payloadOffset = 14;
 | 
					            $payloadOffset = 14;
 | 
				
			||||||
		} else {
 | 
					        } else {
 | 
				
			||||||
			$mask = substr($data, 2, 4);	
 | 
					            $mask = substr($data, 2, 4);    
 | 
				
			||||||
			$payloadOffset = 6;
 | 
					            $payloadOffset = 6;
 | 
				
			||||||
		}
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		$dataLength = strlen($data);
 | 
					        $dataLength = strlen($data);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if($isMasked === true) {
 | 
					        if($isMasked === true) { // This will always pass...
 | 
				
			||||||
			for($i = $payloadOffset; $i < $dataLength; $i++) {
 | 
					            for($i = $payloadOffset; $i < $dataLength; $i++) {
 | 
				
			||||||
				$j = $i - $payloadOffset;
 | 
					                $j = $i - $payloadOffset;
 | 
				
			||||||
				$unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
 | 
					                $unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
 | 
				
			||||||
			}
 | 
					            }
 | 
				
			||||||
			$decodedData['payload'] = $unmaskedPayload;
 | 
					            $decodedData['payload'] = $unmaskedPayload;
 | 
				
			||||||
		}
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		return $decodedData;
 | 
					        return $decodedData;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    /**
 | 
					    /**
 | 
				
			||||||
@ -114,74 +126,74 @@ class HyBi10 implements VersionInterface {
 | 
				
			|||||||
        $type    = 'text';
 | 
					        $type    = 'text';
 | 
				
			||||||
        $masked  = true;
 | 
					        $masked  = true;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		$frameHead = array();
 | 
					        $frameHead = array();
 | 
				
			||||||
		$frame = '';
 | 
					        $frame = '';
 | 
				
			||||||
		$payloadLength = strlen($payload);
 | 
					        $payloadLength = strlen($payload);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		switch($type) {
 | 
					        switch($type) {
 | 
				
			||||||
			case 'text':
 | 
					            case 'text':
 | 
				
			||||||
				// first byte indicates FIN, Text-Frame (10000001):
 | 
					                // first byte indicates FIN, Text-Frame (10000001):
 | 
				
			||||||
				$frameHead[0] = 129;				
 | 
					                $frameHead[0] = 129;                
 | 
				
			||||||
			break;			
 | 
					            break;            
 | 
				
			||||||
		
 | 
					 | 
				
			||||||
			case 'close':
 | 
					 | 
				
			||||||
				// first byte indicates FIN, Close Frame(10001000):
 | 
					 | 
				
			||||||
				$frameHead[0] = 136;
 | 
					 | 
				
			||||||
			break;
 | 
					 | 
				
			||||||
		
 | 
					 | 
				
			||||||
			case 'ping':
 | 
					 | 
				
			||||||
				// first byte indicates FIN, Ping frame (10001001):
 | 
					 | 
				
			||||||
				$frameHead[0] = 137;
 | 
					 | 
				
			||||||
			break;
 | 
					 | 
				
			||||||
		
 | 
					 | 
				
			||||||
			case 'pong':
 | 
					 | 
				
			||||||
				// first byte indicates FIN, Pong frame (10001010):
 | 
					 | 
				
			||||||
				$frameHead[0] = 138;
 | 
					 | 
				
			||||||
			break;
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		
 | 
					 | 
				
			||||||
		// set mask and payload length (using 1, 3 or 9 bytes) 
 | 
					 | 
				
			||||||
		if($payloadLength > 65535) {
 | 
					 | 
				
			||||||
			$payloadLengthBin = str_split(sprintf('%064b', $payloadLength), 8);
 | 
					 | 
				
			||||||
			$frameHead[1] = ($masked === true) ? 255 : 127;
 | 
					 | 
				
			||||||
			for($i = 0; $i < 8; $i++) {
 | 
					 | 
				
			||||||
				$frameHead[$i+2] = bindec($payloadLengthBin[$i]);
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			// most significant bit MUST be 0 (return false if to much data)
 | 
					 | 
				
			||||||
			if($frameHead[2] > 127) {
 | 
					 | 
				
			||||||
				return false;
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		} elseif($payloadLength > 125) {
 | 
					 | 
				
			||||||
			$payloadLengthBin = str_split(sprintf('%016b', $payloadLength), 8);
 | 
					 | 
				
			||||||
			$frameHead[1] = ($masked === true) ? 254 : 126;
 | 
					 | 
				
			||||||
			$frameHead[2] = bindec($payloadLengthBin[0]);
 | 
					 | 
				
			||||||
			$frameHead[3] = bindec($payloadLengthBin[1]);
 | 
					 | 
				
			||||||
		} else {
 | 
					 | 
				
			||||||
			$frameHead[1] = ($masked === true) ? $payloadLength + 128 : $payloadLength;
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// convert frame-head to string:
 | 
					            case 'close':
 | 
				
			||||||
		foreach(array_keys($frameHead) as $i) {
 | 
					                // first byte indicates FIN, Close Frame(10001000):
 | 
				
			||||||
			$frameHead[$i] = chr($frameHead[$i]);
 | 
					                $frameHead[0] = 136;
 | 
				
			||||||
		} if($masked === true) {
 | 
					            break;
 | 
				
			||||||
			// generate a random mask:
 | 
					 | 
				
			||||||
			$mask = array();
 | 
					 | 
				
			||||||
			for($i = 0; $i < 4; $i++)
 | 
					 | 
				
			||||||
			{
 | 
					 | 
				
			||||||
				$mask[$i] = chr(rand(0, 255));
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			
 | 
					 | 
				
			||||||
			$frameHead = array_merge($frameHead, $mask);			
 | 
					 | 
				
			||||||
		}						
 | 
					 | 
				
			||||||
		$frame = implode('', $frameHead);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// append payload to frame:
 | 
					            case 'ping':
 | 
				
			||||||
		$framePayload = array();
 | 
					                // first byte indicates FIN, Ping frame (10001001):
 | 
				
			||||||
		for($i = 0; $i < $payloadLength; $i++) {		
 | 
					                $frameHead[0] = 137;
 | 
				
			||||||
			$frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i];
 | 
					            break;
 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		return $frame;
 | 
					            case 'pong':
 | 
				
			||||||
 | 
					                // first byte indicates FIN, Pong frame (10001010):
 | 
				
			||||||
 | 
					                $frameHead[0] = 138;
 | 
				
			||||||
 | 
					            break;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // set mask and payload length (using 1, 3 or 9 bytes) 
 | 
				
			||||||
 | 
					        if($payloadLength > 65535) {
 | 
				
			||||||
 | 
					            $payloadLengthBin = str_split(sprintf('%064b', $payloadLength), 8);
 | 
				
			||||||
 | 
					            $frameHead[1] = ($masked === true) ? 255 : 127;
 | 
				
			||||||
 | 
					            for($i = 0; $i < 8; $i++) {
 | 
				
			||||||
 | 
					                $frameHead[$i+2] = bindec($payloadLengthBin[$i]);
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            // most significant bit MUST be 0 (return false if to much data)
 | 
				
			||||||
 | 
					            if($frameHead[2] > 127) {
 | 
				
			||||||
 | 
					                return false;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        } elseif($payloadLength > 125) {
 | 
				
			||||||
 | 
					            $payloadLengthBin = str_split(sprintf('%016b', $payloadLength), 8);
 | 
				
			||||||
 | 
					            $frameHead[1] = ($masked === true) ? 254 : 126;
 | 
				
			||||||
 | 
					            $frameHead[2] = bindec($payloadLengthBin[0]);
 | 
				
			||||||
 | 
					            $frameHead[3] = bindec($payloadLengthBin[1]);
 | 
				
			||||||
 | 
					        } else {
 | 
				
			||||||
 | 
					            $frameHead[1] = ($masked === true) ? $payloadLength + 128 : $payloadLength;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // convert frame-head to string:
 | 
				
			||||||
 | 
					        foreach(array_keys($frameHead) as $i) {
 | 
				
			||||||
 | 
					            $frameHead[$i] = chr($frameHead[$i]);
 | 
				
			||||||
 | 
					        } if($masked === true) {
 | 
				
			||||||
 | 
					            // generate a random mask:
 | 
				
			||||||
 | 
					            $mask = array();
 | 
				
			||||||
 | 
					            for($i = 0; $i < 4; $i++)
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                $mask[$i] = chr(rand(0, 255));
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            $frameHead = array_merge($frameHead, $mask);            
 | 
				
			||||||
 | 
					        }                        
 | 
				
			||||||
 | 
					        $frame = implode('', $frameHead);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // append payload to frame:
 | 
				
			||||||
 | 
					        $framePayload = array();
 | 
				
			||||||
 | 
					        for($i = 0; $i < $payloadLength; $i++) {        
 | 
				
			||||||
 | 
					            $frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i];
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return $frame;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    /**
 | 
					    /**
 | 
				
			||||||
 | 
				
			|||||||
@ -8,8 +8,14 @@ use Ratchet\Command\CommandInterface;
 | 
				
			|||||||
 * Creates an open-ended socket to listen on a port for incomming connections.  Events are delegated through this to attached applications
 | 
					 * Creates an open-ended socket to listen on a port for incomming connections.  Events are delegated through this to attached applications
 | 
				
			||||||
 * @todo Consider using _connections as master reference and passing iterator_to_array(_connections) to socket_select
 | 
					 * @todo Consider using _connections as master reference and passing iterator_to_array(_connections) to socket_select
 | 
				
			||||||
 * @todo Currently passing Socket object down the decorated chain - should be sending reference to it instead; Receivers do not interact with the Socket directly, they do so through the Command pattern
 | 
					 * @todo Currently passing Socket object down the decorated chain - should be sending reference to it instead; Receivers do not interact with the Socket directly, they do so through the Command pattern
 | 
				
			||||||
 | 
					 * @todo With all these options for the server I should probably use a DIC
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
class Server implements SocketObserver, \IteratorAggregate {
 | 
					class Server implements SocketObserver, \IteratorAggregate {
 | 
				
			||||||
 | 
					    /**
 | 
				
			||||||
 | 
					     * This is probably temporary
 | 
				
			||||||
 | 
					     */
 | 
				
			||||||
 | 
					    const RECV_BYTES = 1024;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    /**
 | 
					    /**
 | 
				
			||||||
     * The master socket, receives all connections
 | 
					     * The master socket, receives all connections
 | 
				
			||||||
     * @type Socket
 | 
					     * @type Socket
 | 
				
			||||||
@ -65,6 +71,9 @@ class Server implements SocketObserver, \IteratorAggregate {
 | 
				
			|||||||
        set_time_limit(0);
 | 
					        set_time_limit(0);
 | 
				
			||||||
        ob_implicit_flush();
 | 
					        ob_implicit_flush();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        $this->_master->set_nonblock();
 | 
				
			||||||
 | 
					        declare(ticks = 1);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if (false === ($this->_master->bind($address, (int)$port))) {
 | 
					        if (false === ($this->_master->bind($address, (int)$port))) {
 | 
				
			||||||
            throw new Exception($this->_master);
 | 
					            throw new Exception($this->_master);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
@ -73,9 +82,6 @@ class Server implements SocketObserver, \IteratorAggregate {
 | 
				
			|||||||
            throw new Exception($this->_master);
 | 
					            throw new Exception($this->_master);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        $this->_master->set_nonblock();
 | 
					 | 
				
			||||||
        declare(ticks = 1); 
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        do {
 | 
					        do {
 | 
				
			||||||
            try {
 | 
					            try {
 | 
				
			||||||
                $changed     = $this->_resources;
 | 
					                $changed     = $this->_resources;
 | 
				
			||||||
@ -86,13 +92,24 @@ class Server implements SocketObserver, \IteratorAggregate {
 | 
				
			|||||||
                        $res = $this->onOpen($this->_master);
 | 
					                        $res = $this->onOpen($this->_master);
 | 
				
			||||||
                    } else {
 | 
					                    } else {
 | 
				
			||||||
                        $conn  = $this->_connections[$resource];
 | 
					                        $conn  = $this->_connections[$resource];
 | 
				
			||||||
                        $data  = null;
 | 
					                        $data  = $buf = '';
 | 
				
			||||||
                        $bytes = $conn->recv($data, 4096, 0);
 | 
					
 | 
				
			||||||
 | 
					                        $bytes = $conn->recv($buf, static::RECV_BYTES, 0);
 | 
				
			||||||
 | 
					                        if ($bytes > 0) {
 | 
				
			||||||
 | 
					                            $data = $buf;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                            // This idea works* but...
 | 
				
			||||||
 | 
					                            // 1) A single DDOS attack will block the entire application (I think)
 | 
				
			||||||
 | 
					                            // 2) What if the last message in the frame is equal to RECV_BYTES?  Would loop until another msg is sent
 | 
				
			||||||
 | 
					                            // Need to 1) proc_open the recv() calls.  2) ???
 | 
				
			||||||
 | 
					                            while ($bytes === static::RECV_BYTES) {
 | 
				
			||||||
 | 
					                                $bytes = $conn->recv($buf, static::RECV_BYTES, 0);
 | 
				
			||||||
 | 
					                                $data .= $buf;
 | 
				
			||||||
 | 
					                            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        if ($bytes == 0) {
 | 
					 | 
				
			||||||
                            $res = $this->onClose($conn);
 | 
					 | 
				
			||||||
                        } else {
 | 
					 | 
				
			||||||
                            $res = $this->onRecv($conn, $data);
 | 
					                            $res = $this->onRecv($conn, $data);
 | 
				
			||||||
 | 
					                        } else {
 | 
				
			||||||
 | 
					                            $res = $this->onClose($conn);
 | 
				
			||||||
                        }
 | 
					                        }
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -34,6 +34,10 @@ class Socket implements SocketInterface {
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public function __destruct() {
 | 
				
			||||||
 | 
					        @socket_close($this->_resource);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    /**
 | 
					    /**
 | 
				
			||||||
     * @return resource (Socket)
 | 
					     * @return resource (Socket)
 | 
				
			||||||
     */
 | 
					     */
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
		Reference in New Issue
	
	Block a user