From 490bbbd01e6fb9ee1c80cc82dcad0a33d972bc84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Fri, 22 Feb 2019 10:58:48 +0100 Subject: [PATCH] Add TCP driver --- src/InfluxDB/Client.php | 7 +- src/InfluxDB/Driver/AbstractSocketDriver.php | 97 ++++++++++++++++++++ src/InfluxDB/Driver/TCP.php | 44 +++++++++ src/InfluxDB/Driver/UDP.php | 79 ++-------------- 4 files changed, 156 insertions(+), 71 deletions(-) create mode 100644 src/InfluxDB/Driver/AbstractSocketDriver.php create mode 100644 src/InfluxDB/Driver/TCP.php diff --git a/src/InfluxDB/Client.php b/src/InfluxDB/Client.php index 0aa14c5..9085fd1 100644 --- a/src/InfluxDB/Client.php +++ b/src/InfluxDB/Client.php @@ -8,6 +8,7 @@ use InfluxDB\Driver\Exception as DriverException; use InfluxDB\Driver\Guzzle; use InfluxDB\Driver\QueryDriverInterface; +use InfluxDB\Driver\TCP; use InfluxDB\Driver\UDP; /** @@ -258,6 +259,7 @@ public function listUsers() * * https+influxdb://username:pass@localhost:8086/databasename * udp+influxdb://username:pass@localhost:4444/databasename + * tcp+influxdb://username:pass@localhost:8094/databasename * * @param string $dsn * @param float $timeout @@ -303,9 +305,12 @@ public static function fromDSN($dsn, $timeout = 0, $verifySSL = false, $connectT $connectTimeout ); - // set the UDP driver when the DSN specifies UDP if ($modifier === 'udp') { + // set the UDP driver when the DSN specifies UDP $client->setDriver(new UDP($connParams['host'], $connParams['port'])); + } elseif ($modifier === 'tcp') { + // set the TCP driver when the DSN specifies TCP + $client->setDriver(new TCP($connParams['host'], $connParams['port'])); } return ($dbName ? $client->selectDB($dbName) : $client); diff --git a/src/InfluxDB/Driver/AbstractSocketDriver.php b/src/InfluxDB/Driver/AbstractSocketDriver.php new file mode 100644 index 0000000..7e4ef6b --- /dev/null +++ b/src/InfluxDB/Driver/AbstractSocketDriver.php @@ -0,0 +1,97 @@ +config['host'] = $host; + $this->config['port'] = $port; + } + + /** + * Close the stream (if created) + */ + public function __destruct() + { + if (isset($this->stream) && is_resource($this->stream)) { + fclose($this->stream); + } + } + + /** + * {@inheritdoc} + */ + public function setParameters(array $parameters) + { + $this->parameters = $parameters; + } + + /** + * {@inheritdoc} + */ + public function getParameters() + { + return $this->parameters; + } + + /** + * {@inheritdoc} + */ + public function write($data = null) + { + if (isset($this->stream) === false) { + $this->createStream(); + } + + $this->doWrite($data); + } + + /** + * {@inheritdoc} + */ + abstract public function isSuccess(); + + /** + * Perform write to socket + * @param mixed|null $data + */ + abstract protected function doWrite($data = null); + + /** + * Create the resource stream + */ + abstract protected function createStream(); + +} diff --git a/src/InfluxDB/Driver/TCP.php b/src/InfluxDB/Driver/TCP.php new file mode 100644 index 0000000..66f6f58 --- /dev/null +++ b/src/InfluxDB/Driver/TCP.php @@ -0,0 +1,44 @@ +result; + } + + /** + * {@inheritdoc} + */ + protected function createStream() + { + $host = sprintf('tcp://%s:%d', $this->config['host'], $this->config['port']); + + // stream the data using TCP and suppress any errors + $this->stream = @stream_socket_client($host); + } + + /** + * {@inheritdoc} + */ + protected function doWrite($data = null) + { + $this->result = false !== fwrite($this->stream, $data); + } +} diff --git a/src/InfluxDB/Driver/UDP.php b/src/InfluxDB/Driver/UDP.php index 0cd3010..6b5da80 100644 --- a/src/InfluxDB/Driver/UDP.php +++ b/src/InfluxDB/Driver/UDP.php @@ -10,76 +10,8 @@ * * @package InfluxDB\Driver */ -class UDP implements DriverInterface +class UDP extends AbstractSocketDriver implements DriverInterface { - /** - * Parameters - * - * @var array - */ - private $parameters; - - /** - * @var array - */ - private $config; - - /** - * - * @var resource - */ - private $stream; - - /** - * @param string $host IP/hostname of the InfluxDB host - * @param int $port Port of the InfluxDB process - */ - public function __construct($host, $port) - { - $this->config['host'] = $host; - $this->config['port'] = $port; - } - - /** - * Close the stream (if created) - */ - public function __destruct() - { - if (isset($this->stream) && is_resource($this->stream)) { - fclose($this->stream); - } - } - - /** - * {@inheritdoc} - */ - public function setParameters(array $parameters) - { - $this->parameters = $parameters; - } - - /** - * {@inheritdoc} - */ - public function getParameters() - { - return $this->parameters; - } - - /** - * {@inheritdoc} - */ - public function write($data = null) - { - if (isset($this->stream) === false) { - $this->createStream(); - } - - @stream_socket_sendto($this->stream, $data); - - return true; - } - /** * {@inheritdoc} */ @@ -89,7 +21,7 @@ public function isSuccess() } /** - * Create the resource stream + * {@inheritdoc} */ protected function createStream() { @@ -99,4 +31,11 @@ protected function createStream() $this->stream = @stream_socket_client($host); } + /** + * {@inheritdoc} + */ + protected function doWrite($data = null) + { + @stream_socket_sendto($this->stream, $data); + } }