214 lines
6.4 KiB
PHP
214 lines
6.4 KiB
PHP
<?php
|
|
/**
|
|
* Connection pool class. Contains the SocketPool.
|
|
* @author jpt
|
|
* @package Connection
|
|
* @depends Socket
|
|
*/
|
|
class Connection_ConnectionPool {
|
|
|
|
/**
|
|
* SocketPool instance.
|
|
* @var Socket_SocketPool
|
|
*/
|
|
protected $socketPool;
|
|
|
|
/**
|
|
* Contains all ConnectionHandler instances.
|
|
* @var array
|
|
*/
|
|
protected $connectionHandlers;
|
|
|
|
/**
|
|
* @var int Next ID for a new ConnectionHandler
|
|
*/
|
|
protected $nextID;
|
|
|
|
/**
|
|
* Creates an Instance of SocketPool
|
|
* @return void
|
|
*/
|
|
public function __construct() {
|
|
$this->connectionHandlers = array();
|
|
$this->socketPool = new Socket_SocketPool();
|
|
$this->nextID = 1;
|
|
}
|
|
|
|
/**
|
|
* Destroys the SocketPool
|
|
* @return void
|
|
*/
|
|
public function __destruct() {
|
|
unset($this->socketPool);
|
|
}
|
|
|
|
/**
|
|
* Creates a new TcpConnection.
|
|
* @param boolean $IPv6 will determine whether the socket uses IPv4 or IPv6.
|
|
* @return Connection_ConnectionHandler
|
|
*/
|
|
public function createTcpConnection($group = "", $protocol = "RAW", $IPv6 = FALSE) {
|
|
$socket = $this->socketPool->createTcpSocket($IPv6);
|
|
$connectionHandler = new Connection_ConnectionHandler($socket, $this->nextID, $group, $protocol);
|
|
$connectionHandler->setIPv6($IPv6);
|
|
$connectionHandler->injectConnectionPool($this);
|
|
$this->addConnectionHandler($connectionHandler);
|
|
return $connectionHandler;
|
|
}
|
|
|
|
/**
|
|
* Returns the array with the current connectionHandlers.
|
|
* @return array
|
|
*/
|
|
public function getConnectionHandlers() {
|
|
return $this->connectionHandlers;
|
|
}
|
|
|
|
/**
|
|
* Adds a ConnectionHandler to the pool.
|
|
* @param Connection_ConnectionHandler $addConnectionHandler
|
|
* @return void
|
|
*/
|
|
public function addConnectionHandler($addConnectionHandler) {
|
|
array_push($this->connectionHandlers, $addConnectionHandler);
|
|
$this->nextID++;
|
|
}
|
|
|
|
/**
|
|
* Removes a ConnectionHandler from the pool.
|
|
* @param Connection_ConnectionHandler $removeConnectionHandler
|
|
* @return void
|
|
*/
|
|
public function removeConnectionHandler($removeConnectionHandler) {
|
|
foreach($this->connectionHandlers AS $key=>$connectionHandler) {
|
|
if($connectionHandler === $removeConnectionHandler) {
|
|
$this->socketPool->removeSocket($removeConnectionHandler->getSocket());
|
|
$removeConnectionHandler->close();
|
|
unset($this->connectionHandlers[$key]);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Returns ConnectionHandler for the given socket ressource.
|
|
* @param ressource $socketRessource
|
|
* @return Connection_ConnectionHandler
|
|
*/
|
|
protected function getConnectionHandlerForSocketRessource($socketRessource) {
|
|
foreach($this->connectionHandlers AS $connectionHandler) {
|
|
if($connectionHandler->getSocket() === $socketRessource) {
|
|
return $connectionHandler;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Calls select() on SocketPool and updates the ConnectionHandler.
|
|
* Will also accept incoming connections and add them to the pool.
|
|
* @return array An array containing the Connection_ConnectionHandler for each Socket with new data.
|
|
* @throws Exception_GeneralException
|
|
* @throws Exception_SocketException
|
|
*/
|
|
public function select() {
|
|
$read = array();
|
|
$write = array();
|
|
$except = array();
|
|
foreach($this->connectionHandlers AS $connectionHandler) {
|
|
$connectionSocket = $connectionHandler->getSocket();
|
|
$read[] = $connectionSocket;
|
|
if($connectionHandler->canWrite() === TRUE && $connectionHandler->isServer() === FALSE) {
|
|
$write[] = $connectionSocket;
|
|
//the line above does not work for freshly connected stuff.
|
|
//this is the fallback - just write the stuff - no matter what happens.
|
|
if($connectionHandler->writeFromBuffer() === FALSE) $this->removeConnectionHandler($connectionHandler);
|
|
}
|
|
}
|
|
$except = $read;
|
|
|
|
//Arrays are prepared, let's have socket_select() take a look and process its results.
|
|
$tempArray = array();
|
|
$selectedSockets = $this->socketPool->select($read, $write, $except);
|
|
foreach($selectedSockets AS $selectedType=>$selectedArray) { //read, write, except, this loop won't kill performance
|
|
foreach($selectedArray AS $socket) {
|
|
$connectionHandler = $this->getConnectionHandlerForSocketRessource($socket);
|
|
switch($selectedType) {
|
|
case "read":
|
|
if($connectionHandler->isServer() === TRUE) {
|
|
$acceptedSocket = $connectionHandler->accept();
|
|
$acceptedSocketHandler = new Connection_ConnectionHandler($acceptedSocket, $this->nextID, $connectionHandler->getGroup(), $connectionHandler->getProtocol());
|
|
$acceptedSocketHandler->hasBeenAccepted();
|
|
$this->addConnectionHandler($acceptedSocketHandler);
|
|
} else {
|
|
$connectionHandler->readToBuffer();
|
|
}
|
|
break;
|
|
case "write":
|
|
//this might still work on active connections that are "in use" and already received data.
|
|
//however, it does not for freshly connected ones.
|
|
if($connectionHandler->writeFromBuffer() === FALSE) $this->removeConnectionHandler($connectionHandler);
|
|
break;
|
|
case "except":
|
|
$connectionHandler->handleSocketError();
|
|
break;
|
|
default:
|
|
throw new Exception_GeneralException("Unknown select type: '" . $selectedType . "'", 1289737080);
|
|
break;
|
|
}
|
|
//Put the ConnectionHandler into the array. We'll return this for further operations.
|
|
$tempArray[$selectedType][] = $connectionHandler;
|
|
}
|
|
}
|
|
return $tempArray;
|
|
}
|
|
|
|
/**
|
|
* Writes the given data to all sockets in $group.
|
|
* @param string $group
|
|
* @param string $data
|
|
* @throws Exception_SocketException
|
|
* @return void
|
|
*/
|
|
public function writeToGroup($group, $data) {
|
|
foreach($this->connectionHandlers AS $connectionHandler) {
|
|
if($connectionHandler->getGroup() === $group && $connectionHandler->isServer() === FALSE) {
|
|
$connectionHandler->write($data);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Writes the given data to the socket with $id.
|
|
* @param int $id
|
|
* @param string $data
|
|
* @return void
|
|
*/
|
|
public function writeToID($id, $data) {
|
|
foreach($this->connectionHandlers AS $connectionHandler) {
|
|
if($connectionHandler->getID() === $id && $connectionHandler->isServer() === FALSE) {
|
|
$connectionHandler->write($data);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @see Socket_SocketPool
|
|
* @return int
|
|
*/
|
|
public function countSockets() {
|
|
return $this->socketPool->countSockets();
|
|
}
|
|
|
|
/**
|
|
* Returns the amount of active connections.
|
|
* A connection is active when the handler is connected.
|
|
* @return int
|
|
*/
|
|
public function countActiveConnections() {
|
|
$count = 0;
|
|
foreach($this->connectionHandlers AS $connectionHandler) {
|
|
if($connectionHandler->isConnected() === TRUE) $count++;
|
|
}
|
|
return $count;
|
|
}
|
|
}
|
|
?>
|