diff --git a/README.md b/README.md index cf090b3..98387c7 100644 --- a/README.md +++ b/README.md @@ -1,37 +1,100 @@ -### [WIP] WebSocket server for [Pathfinder](https://github.com/exodus4d/pathfinder) +## WebSocket server for [Pathfinder](https://github.com/exodus4d/pathfinder) -####Requirements: -1. A working instance of *[Pathfinder](https://github.com/exodus4d/pathfinder)* **(>= v1.2.0)**. -2. A working installation of *[ØMQ](http://zeromq.org/area:download)* **(>= v4.2.0)**. - Which is a "network library" written in C (very fast) that handles TCP based socket connections - between your existing _Pathfinder_ installation and this WebSocket server extension. [download *ØMQ*](http://zeromq.org/area:download). -3. A new [PHP extension for *ØMQ*](http://zeromq.org/bindings:php) that handles the communication between this WebSocket server and *ØMQ*. **(>= v1.1.3)** +### Requirements +- A working instance of *[Pathfinder](https://github.com/exodus4d/pathfinder)* **(≥ v1.2.0)** +- [_Composer_](https://getcomposer.org/download/) to install packages for the WebSocket server -####Install: -Make sure you meet the requirements before continue with the installation. - -1. Install [Composer](https://getcomposer.org/download/) +### Install +1. Checkout this project in a **new** folder (NOT the install for _Pathfinder_ itself) e.g. `/var/www/websocket.pathfinder` +1. Install [_Composer_](https://getcomposer.org/download/) 2. Install Composer dependencies from `composer.json` file: - - `$ composer install` OR - - `$ php composer.phar install` (change composer.phar path to your Composer directory) -3. Start WebSocket server `php cmd.php` + - `$ cd /var/www/websocket.pathfinder` + - `$ composer install` +3. Start WebSocket server `$ php cmd.php` + +### Configuration + +#### Default -####Default Configuration **Clients (WebBrowser) listen for connections** -- Host:`0.0.0.0.` (=> any client can connect) -- Port:`8020` -- URI:`127.0.0.1:8020` (Your WebServer (e.g. Nginx) should pass all WebSocket connections to this source) +- Host: `0.0.0.0.` (=> any client can connect) +- Port: `8020` +- URI: `127.0.0.1:8020` (Your WebServer (e.g. Nginx) should pass all WebSocket connections to this source) -**TCP Socket connection (Internal use fore WebServer <=> WebSocket communication)** -- Host:`127.0.0.1` (=> Assumed WebServer and WebSocket Server running on the same machine) -- Port:`5555` +**TCP TcpSocket connection (Internal use for WebServer ⇄ WebSocket communication)** +- Host: `127.0.0.1` (=> Assumed WebServer and WebSocket Server running on the same machine) +- Port: `5555` - URI: `tcp://127.0.0.1:5555` + +#### Custom [Optional] -**[Optional]** The default configuration should be fine for most installations. You can change/overwrite the default **Host** and **Port** configuration by adding additional CLI parameters when starting the WebSocket server: -`php cmd.php --pf_listen_host [CLIENTS_HOST] --pf_listen_port [CLIENTS_PORT] --pf_host [TCP_HOST] --pf_port [TCP_PORT]` +`$ php cmd.php --pf_listen_host [CLIENTS_HOST] --pf_listen_port [CLIENTS_PORT] --pf_host [TCP_HOST] --pf_port [TCP_PORT]` + +### Unix Service (systemd) + +#### New Service +It is recommended to wrap the `cmd.php` script in a Unix service, that over control the WebSocket server. +This creates a systemd service on CentOS7: +1. `$ cd /etc/systemd/system` +2. `$ vi websocket.pathfinder.service` +3. Copy script and adjust `ExecStart` and `WorkingDirectory` values: + +``` +[Unit] +Description = WebSocket server (Pathfinder) [LIVE] environment +After = multi-user.target + +[Service] +Type = idle +ExecStart = /usr/bin/php /var/www/websocket.pathfinder/pathfinder_websocket/cmd.php +WorkingDirectory = /var/www/websocket.pathfinder/pathfinder_websocket +TimeoutStopSec = 0 +Restart = always +LimitNOFILE = 10000 +Nice = 10 + +[Install] +WantedBy = multi-user.target +``` + +Now you can use the service to start/stop/restart your WebSocket server +- `$ systemctl start websocket.pathfinder.service` +- `$ systemctl restart websocket.pathfinder.service` +- `$ systemctl stop websocket.pathfinder.service` + +#### Auto-Restart the Service +You can automatically restart your service (e.g. on _EVE-Online_ downtime). Create a new "timer" for the automatic restart. +1. `$ cd /etc/systemd/system` (same dir as before) +2. `$ vi restart.websocket.pathfinder.timer` +3. Copy script: + +``` +[Unit] +Description = Restart timer (EVE downtime) for WebSocket server [LIVE] + +[Timer] +OnCalendar = *-*-* 12:01:00 +Persistent = true + +[Install] +WantedBy = timer.target +``` +Now we need a new "restart service" for the timer: +1. `$ cd /etc/systemd/system` (same dir as before) +2. `$ vi restart.websocket.pathfinder.service` +3. Copy script: + +``` +[Unit] +Description = Restart (periodically) WebSocket server [LIVE] + +[Service] +Type = oneshot +ExecStart = /usr/bin/systemctl try-restart websocket.pathfinder.service +``` -####Info: +### Info - [*Ratchet*](http://socketo.me/) - "WebSockets for PHP" diff --git a/app/Main/MapUpdate.php b/app/Main/MapUpdate.php index b69d172..6c9f3c8 100644 --- a/app/Main/MapUpdate.php +++ b/app/Main/MapUpdate.php @@ -360,7 +360,14 @@ private function getCharacterIdsByConnection(ConnectionInterface $conn){ * @return array */ private function getCharacterIdsByMapId(int $mapId) : array { - return array_keys((array)$this->subscriptions[$mapId]); + $characterIds = []; + if( + array_key_exists($mapId, $this->subscriptions) && + is_array($this->subscriptions[$mapId]) + ){ + $characterIds = array_keys($this->subscriptions[$mapId]); + } + return $characterIds; } /** @@ -485,43 +492,44 @@ private function broadcastData($connections, string $task, $load, array $charact /** * receive data from TCP socket (main App) * -> send response back - * @param $data + * @param string $task + * @param null $load + * @return bool|float|int|null */ - public function receiveData($data){ - $data = (array)json_decode($data, true); - $load = $data['load']; - $task = $data['task']; - $response = false; + public function receiveData(string $task, $load = null){ + $responseLoad = null; switch($task){ + case 'healthCheck': + $this->healthCheckToken = (float)$load; + $responseLoad = $this->healthCheckToken; + break; case 'characterUpdate': - $response = $this->updateCharacterData($load); + $this->updateCharacterData($load); $mapIds = $this->getMapIdsByCharacterId((int)$load['id']); $this->broadcastMapSubscriptions('mapSubscriptions', $mapIds); break; case 'characterLogout': - $response = $this->unSubscribeCharacterIds($load); + $responseLoad = $this->unSubscribeCharacterIds($load); break; case 'mapConnectionAccess': - $response = $this->setConnectionAccess($load); + $responseLoad = $this->setConnectionAccess($load); break; case 'mapAccess': - $response = $this->setAccess($task, $load); + $responseLoad = $this->setAccess($task, $load); break; case 'mapUpdate': - $response = $this->broadcastMapUpdate($task, $load); + $responseLoad = $this->broadcastMapUpdate($task, $load); break; case 'mapDeleted': - $response = $this->deleteMapId($task, $load); - break; - case 'healthCheck': - $this->healthCheckToken = (float)$load; - $response = 'OK'; + $responseLoad = $this->deleteMapId($task, $load); break; case 'logData': $this->handleLogData((array)$load['meta'], (array)$load['log']); break; } + + return $responseLoad; } private function setCharacterData(array $characterData){ diff --git a/app/Socket/TcpSocket.php b/app/Socket/TcpSocket.php new file mode 100644 index 0000000..cf2f229 --- /dev/null +++ b/app/Socket/TcpSocket.php @@ -0,0 +1,549 @@ + affects en/decoding socket data + */ + const DEFAULT_ACCEPT_TYPE = 'json'; + + /** + * default for: wait timeout + * -> timeout until connection gets closed + * timeout should be "reset" right after successful response send to client + */ + const DEFAULT_WAIT_TIMEOUT = 3.0; + + /** + * default for: send response by end() method (rather than write()) + * -> connection will get closed right after successful response send to client + */ + const DEFAULT_END_WITH_RESPONSE = true; + + /** + * default for: add socket statistic to response payload + */ + const DEFAULT_STATS = true; + + /** + * default for: echo debug messages + */ + const DEFAULT_DEBUG = false; + + /** + * global server loop + * @var EventLoop\LoopInterface + */ + private $loop; + + /** + * @var MessageComponentInterface + */ + private $handler; + + /** + * @see TcpSocket::DEFAULT_ACCEPT_TYPE + * @var string + */ + private $acceptType = self::DEFAULT_ACCEPT_TYPE; + + /** + * @see TcpSocket::DEFAULT_WAIT_TIMEOUT + * @var float + */ + private $waitTimeout = self::DEFAULT_WAIT_TIMEOUT; + + /** + * @see TcpSocket::DEFAULT_END_WITH_RESPONSE + * @var bool + */ + private $endWithResponse = self::DEFAULT_END_WITH_RESPONSE; + + /** + * @see TcpSocket::DEFAULT_STATS + * @var bool + */ + private $stats = self::DEFAULT_STATS; + + /** + * storage for all active connections + * -> can be used to get current count of connected clients + * @var \SplObjectStorage + */ + private $connections; + + /** + * max count of concurrent open connections + * -> represents number of active connected clients + * @var int + */ + private $maxConnections = 0; + + /** + * timestamp on startup + * @var int + */ + private $startupTime = 0; + + /** + * TcpSocket constructor. + * @param EventLoop\LoopInterface $loop + * @param MessageComponentInterface $handler + * @param string $acceptType + * @param float $waitTimeout + * @param bool $endWithResponse + */ + public function __construct( + EventLoop\LoopInterface $loop, + MessageComponentInterface $handler, + string $acceptType = self::DEFAULT_ACCEPT_TYPE, + float $waitTimeout = self::DEFAULT_WAIT_TIMEOUT, + bool $endWithResponse = self::DEFAULT_END_WITH_RESPONSE + ){ + $this->loop = $loop; + $this->handler = $handler; + $this->acceptType = $acceptType; + $this->waitTimeout = $waitTimeout; + $this->endWithResponse = $endWithResponse; + $this->connections = new \SplObjectStorage(); + $this->startupTime = time(); + } + + /** + * @param Socket\ConnectionInterface $connection + */ + public function onConnect(Socket\ConnectionInterface $connection){ + $this->debug($connection, __FUNCTION__, '----------------------------------------'); + + if($this->isValidConnection($connection)){ + // connection can be used + // add connection to global connection pool + $this->addConnection($connection); + // set waitTimeout timer for connection + $this->setTimerTimeout($connection, $this->waitTimeout); + + $this->debug($connection, __FUNCTION__); + + // register connection events ... ------------------------------------------------------------------------- + $this->initRead($connection) + ->then($this->initDispatch()) + ->then($this->initResponse($connection)) + ->then( + function(string $message) use ($connection) { + $this->debug($connection, 'DONE', $message); + }, + function(\Exception $e) use ($connection) { + $this->debug($connection, 'ERROR', $e->getMessage()); + $this->connectionError($connection, $e); + }); + + $connection->on('end', function() use ($connection) { + $this->debug($connection, 'onEnd'); + }); + + $connection->on('close', function() use ($connection){ + $this->debug($connection, 'onClose'); + $this->removeConnection($connection); + }); + + $connection->on('error', function(\Exception $e) use ($connection) { + $this->debug($connection, 'onError', $e->getMessage()); + }); + }else{ + // invalid connection -> can not be used + $connection->close(); + } + } + + /** + * @param Socket\ConnectionInterface $connection + * @return Promise\PromiseInterface + */ + protected function initRead(Socket\ConnectionInterface $connection) : Promise\PromiseInterface { + if($connection->isReadable()){ + if('json' == $this->acceptType){ + // new empty stream for processing JSON + $stream = new Stream\ThroughStream(); + $streamDecoded = new NDJson\Decoder($stream, true); + + // promise get resolved on first emit('data') + $promise = Promise\Stream\first($streamDecoded); + + // register on('data') for main input stream + $connection->on('data', function ($chunk) use ($stream) { + // send current data chunk to processing stream -> resolves promise + $stream->emit('data', [$chunk]); + }); + + return $promise; + }else{ + return new Promise\RejectedPromise( + new \InvalidArgumentException( + sprintf(self::ERROR_ACCEPT_TYPE, $this->acceptType) + ) + ); + } + }else{ + return new Promise\RejectedPromise( + new \Exception( + sprintf(self::ERROR_STREAM_NOT_READABLE, $connection->getRemoteAddress()) + ) + ); + } + } + + /** + * init dispatcher for payload + * @return callable + */ + protected function initDispatch() : callable { + return function($payload) : Promise\PromiseInterface { + $task = (string)$payload['task']; + if(!empty($task)){ + $load = $payload['load']; + $deferred = new Promise\Deferred(); + $this->dispatch($deferred, $task, $load); + return $deferred->promise(); + }else{ + return new Promise\RejectedPromise( + new \InvalidArgumentException(self::ERROR_TASK_MISSING) + ); + } + }; + } + + /** + * @param Promise\Deferred $deferred + * @param string $task + * @param null $load + */ + protected function dispatch(Promise\Deferred $deferred, string $task, $load = null) : void { + + switch($task){ + case 'getStats': + $deferred->resolve($this->newPayload($task, null)); + break; + case 'healthCheck': + case 'characterUpdate': + case 'characterLogout': + case 'mapConnectionAccess': + case 'mapAccess': + case 'mapUpdate': + case 'mapDeleted': + case 'logData': + if(method_exists($this->handler, 'receiveData')){ + $deferred->resolve( + $this->newPayload( + $task, + call_user_func_array([$this->handler, 'receiveData'], [$task, $load]) + ) + ); + }else{ + $deferred->reject(new \Exception(sprintf(self::ERROR_METHOD_MISSING, 'receiveData'))); + } + break; + default: + $deferred->reject(new \InvalidArgumentException(sprintf(self::ERROR_TASK_UNKNOWN, $task))); + } + } + + /** + * @param Socket\ConnectionInterface $connection + * @return callable + */ + protected function initResponse(Socket\ConnectionInterface $connection) : callable { + return function(array $payload) use ($connection) : Promise\PromiseInterface { + $this->debug($connection, 'initResponse'); + + $deferred = new Promise\Deferred(); + $this->write($deferred, $connection, $payload); + + return $deferred->promise(); + }; + } + + /** + * @param Promise\Deferred $deferred + * @param Socket\ConnectionInterface $connection + * @param array $payload + */ + protected function write(Promise\Deferred $deferred, Socket\ConnectionInterface $connection, array $payload) : void { + $write = false; + if($connection->isWritable()){ + if('json' == $this->acceptType){ + $connection = new NDJson\Encoder($connection); + } + + // write a new chunk of data to connection stream + $write = $connection->write($payload); + + if($this->endWithResponse){ + // connection should be closed (and removed from this socket server) + $connection->end(); + } + } + + if($write){ + $deferred->resolve('OK'); + }else{ + $deferred->reject(new \Exception( + sprintf(self::ERROR_STREAM_NOT_WRITABLE, $connection->getRemoteAddress()) + )); + } + } + + /** + * $connection has error + * -> if writable -> end() connection with $payload (close() is called by default) + * -> if readable -> close() connection + * @param Socket\ConnectionInterface $connection + * @param \Exception $e + */ + protected function connectionError(Socket\ConnectionInterface $connection, \Exception $e){ + $this->debug($connection, __FUNCTION__, $e->getMessage()); + + if($connection->isWritable()){ + if('json' == $this->acceptType){ + $connection = new NDJson\Encoder($connection); + } + + // send "end" data, then close + $connection->end($this->newPayload('error', $e->getMessage())); + }else{ + // close connection + $connection->close(); + } + } + + /** + * check if $connection is found in global pool + * @param Socket\ConnectionInterface $connection + * @return bool + */ + protected function hasConnection(Socket\ConnectionInterface $connection) : bool { + return $this->connections->contains($connection); + } + + /** + * cancels a previously set timer callback for a $connection + * @param Socket\ConnectionInterface $connection + * @param string $timerName + */ + protected function cancelTimer(Socket\ConnectionInterface $connection, string $timerName){ + if( + $this->hasConnection($connection) && + ($data = (array)$this->connections->offsetGet($connection)) && + isset($data['timers']) && isset($data['timers'][$timerName]) && + ($data['timers'][$timerName] instanceof EventLoop\Timer\TimerInterface) + ){ + $this->loop->cancelTimer($data['timers'][$timerName]); + + unset($data['timers'][$timerName]); + $this->connections->offsetSet($connection, $data); + } + } + + /** + * cancels all previously set timers for a $connection + * @param Socket\ConnectionInterface $connection + */ + protected function cancelTimers(Socket\ConnectionInterface $connection){ + if( + $this->hasConnection($connection) && + ($data = (array)$this->connections->offsetGet($connection)) && + isset($data['timers']) + ){ + foreach((array)$data['timers'] as $timerName => $timer){ + $this->loop->cancelTimer($timer); + } + + $data['timers'] = []; + $this->connections->offsetSet($connection, $data); + } + } + + /** + * @param Socket\ConnectionInterface $connection + * @param string $timerName + * @param float $interval + * @param callable $timerCallback + */ + protected function setTimer(Socket\ConnectionInterface $connection, string $timerName, float $interval, callable $timerCallback){ + if( + $this->hasConnection($connection) && + ($data = (array)$this->connections->offsetGet($connection)) && + isset($data['timers']) + ){ + $data['timers'][$timerName] = $this->loop->addTimer($interval, function() use ($connection, $timerCallback) { + $timerCallback($connection); + }); + + // store new timer to $connection + $this->connections->offsetSet($connection, $data); + } + } + + /** + * cancels and removes previous connection timeout timers + * -> set new connection timeout + * @param Socket\ConnectionInterface $connection + * @param float $waitTimeout + */ + protected function setTimerTimeout(Socket\ConnectionInterface $connection, float $waitTimeout = self::DEFAULT_WAIT_TIMEOUT){ + $this->cancelTimer($connection, 'disconnectTimer'); + $this->setTimer($connection, 'disconnectTimer', $waitTimeout, function(Socket\ConnectionInterface $connection) use ($waitTimeout) { + $errorMessage = sprintf(self::ERROR_WAIT_TIMEOUT, $waitTimeout); + + $this->connectionError( + $connection, + new Promise\Timer\TimeoutException($waitTimeout, $errorMessage) + ); + }); + } + + /** + * add new connection to global pool + * @param Socket\ConnectionInterface $connection + */ + protected function addConnection(Socket\ConnectionInterface $connection){ + if(!$this->hasConnection($connection)){ + $this->connections->attach($connection, [ + 'remoteAddress' => $connection->getRemoteAddress(), + 'timers' => [] + ]); + + // update maxConnections count + $this->maxConnections = max($this->connections->count(), $this->maxConnections); + + $this->debug($connection, __FUNCTION__); + } + } + + /** + * remove $connection from global connection pool + * @param Socket\ConnectionInterface $connection + */ + protected function removeConnection(Socket\ConnectionInterface $connection){ + if($this->hasConnection($connection)){ + $this->debug($connection, __FUNCTION__); + $this->cancelTimers($connection); + $this->connections->detach($connection); + } + } + + /** + * get new payload + * @param string $task + * @param null $load + * @return array + */ + protected function newPayload(string $task, $load = null) : array { + $payload = [ + 'task' => $task, + 'load' => $load + ]; + + if($this->stats){ + // add socket statistics + $payload['stats'] = $this->getStats(); + } + + return $payload; + } + + /** + * check if connection is "valid" and can be used for data transfer + * @param Socket\ConnectionInterface $connection + * @return bool + */ + protected function isValidConnection(Socket\ConnectionInterface $connection) : bool { + return $connection->isReadable() || $connection->isWritable(); + } + + /** + * get socket server statistics + * -> e.g. connected clients count + * @return array + */ + protected function getStats() : array { + return [ + 'startup' => time() - $this->startupTime, + 'connections' => $this->connections->count(), + 'maxConnections' => $this->maxConnections + ]; + } + + /** + * echo debug messages + * @param Socket\ConnectionInterface $connection + * @param string $method + * @param string $message + */ + protected function debug(Socket\ConnectionInterface $connection, string $method, string $message = ''){ + if(self::DEFAULT_DEBUG){ + $data = [ + date('Y-m-d H:i:s'), + __CLASS__ . '()', + $connection->getRemoteAddress(), + $method . '()', + $message + ]; + + echo implode(array_filter($data), ' | ') . PHP_EOL; + } + } +} \ No newline at end of file diff --git a/app/WebSockets.php b/app/WebSockets.php index 6e4ca61..d5b5e0b 100644 --- a/app/WebSockets.php +++ b/app/WebSockets.php @@ -8,52 +8,77 @@ namespace Exodus4D\Socket; + +use Exodus4D\Socket\Socket\TcpSocket; +use React\EventLoop; +use React\Socket; use Ratchet\Server\IoServer; use Ratchet\Http\HttpServer; use Ratchet\WebSocket\WsServer; -use React; - class WebSockets { - protected $dns; + /** + * @var string + */ + protected $dsn; + + /** + * @var int + */ protected $wsListenPort; + + /** + * @var string + */ protected $wsListenHost; - function __construct($dns, $wsListenPort, $wsListenHost){ - $this->dns = $dns; - $this->wsListenPort = (int)$wsListenPort; + /** + * WebSockets constructor. + * @param string $dsn + * @param int $wsListenPort + * @param string $wsListenHost + */ + function __construct(string $dsn, int $wsListenPort, string $wsListenHost){ + $this->dsn = $dsn; + $this->wsListenPort = $wsListenPort; $this->wsListenHost = $wsListenHost; $this->startMapSocket(); } private function startMapSocket(){ - $loop = React\EventLoop\Factory::create(); + // global EventLoop + $loop = EventLoop\Factory::create(); - // Listen for the web server to make a ZeroMQ push after an ajax request - $context = new React\ZMQ\Context($loop); + // global MessageComponent (main app) (handles all business logic) + $mapUpdate = new Main\MapUpdate(); - // Socket for map update data ------------------------------------------------------------- - $pull = $context->getSocket(\ZMQ::SOCKET_PULL); - // Binding to 127.0.0.1 means, the only client that can connect is itself - $pull->bind( $this->dns ); + // TCP Socket ------------------------------------------------------------------------------------------------- + $tcpSocket = new TcpSocket($loop, $mapUpdate); + // TCP Server (WebServer <-> TCPServer <-> TCPSocket communication) + $server = new Socket\Server($this->dsn, $loop, [ + 'tcp' => [ + 'backlog' => 20, + 'so_reuseport' => true + ] + ]); - // main app -> inject socket for response - $mapUpdate = new Main\MapUpdate(); - // "onMessage" listener - $pull->on('message', [$mapUpdate, 'receiveData']); + $server->on('connection', function(Socket\ConnectionInterface $connection) use ($tcpSocket){ + $tcpSocket->onConnect($connection); + }); + + $server->on('error', function(\Exception $e){ + echo 'error: ' . $e->getMessage() . PHP_EOL; + }); - // Socket for log data -------------------------------------------------------------------- - //$pullSocketLogs = $context->getSocket(\ZMQ::SOCKET_PULL); - //$pullSocketLogs->bind( "tcp://127.0.0.1:5555" ); - //$pullSocketLogs->on('message', [$mapUpdate, 'receiveLogData']) ; + // WebSocketServer -------------------------------------------------------------------------------------------- // Binding to 0.0.0.0 means remotes can connect (Web Clients) $webSocketURI = $this->wsListenHost . ':' . $this->wsListenPort; - // Set up our WebSocket server for clients wanting real-time updates - $webSock = new React\Socket\Server($webSocketURI, $loop); + // Set up our WebSocket server for clients subscriptions + $webSock = new Socket\TcpServer($webSocketURI, $loop); new IoServer( new HttpServer( new WsServer( diff --git a/cmd.php b/cmd.php index 7d1c44b..04a282b 100644 --- a/cmd.php +++ b/cmd.php @@ -25,9 +25,9 @@ $host = (!empty($options['pf_host'])) ? $options['pf_host'] : '127.0.0.1' ; $port = (!empty($options['pf_port'])) ? (int)$options['pf_port'] : 5555 ; - $dns = 'tcp://' . $host . ':' . $port; + $dsn = 'tcp://' . $host . ':' . $port; - new Socket\WebSockets($dns, $wsListenPort, $wsListenHost); + new Socket\WebSockets($dsn, $wsListenPort, $wsListenHost); }else{ echo "Script need to be called by CLI!"; } diff --git a/composer.json b/composer.json index 79d635d..6e2c5f9 100644 --- a/composer.json +++ b/composer.json @@ -15,9 +15,13 @@ } }, "require": { - "php-64bit": ">=7.0", - "ext-zmq": ">=1.1.3", + "php-64bit": ">=7.1", + "ext-json": "*", "cboden/ratchet": "0.4.x", - "react/zmq": "0.3.*" + "react/promise-stream": "1.1.*", + "clue/ndjson-react": "1.0.*" + }, + "suggest": { + "ext-event": "If installed, 'ExtEventLoop' class will get used as default event loop. Better performance. https://pecl.php.net/package/event" } } \ No newline at end of file