Skip to content
This repository has been archived by the owner on Nov 15, 2022. It is now read-only.

Add TCP driver alongiside UDP one #124

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/InfluxDB/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use InfluxDB\Driver\Exception as DriverException;
use InfluxDB\Driver\Guzzle;
use InfluxDB\Driver\QueryDriverInterface;
use InfluxDB\Driver\TCP;
use InfluxDB\Driver\UDP;

/**
Expand Down Expand Up @@ -258,6 +259,7 @@ public function listUsers()
*
* https+influxdb://username:pass@localhost:8086/databasename
* udp+influxdb://username:pass@localhost:4444/databasename
* tcp+influxdb://username:pass@localhost:8094/databasename
*
* @param string $dsn
* @param float $timeout
Expand Down Expand Up @@ -303,9 +305,12 @@ public static function fromDSN($dsn, $timeout = 0, $verifySSL = false, $connectT
$connectTimeout
);

// set the UDP driver when the DSN specifies UDP
if ($modifier === 'udp') {
// set the UDP driver when the DSN specifies UDP
$client->setDriver(new UDP($connParams['host'], $connParams['port']));
} elseif ($modifier === 'tcp') {
// set the TCP driver when the DSN specifies TCP
$client->setDriver(new TCP($connParams['host'], $connParams['port']));
}

return ($dbName ? $client->selectDB($dbName) : $client);
Expand Down
99 changes: 99 additions & 0 deletions src/InfluxDB/Driver/AbstractSocketDriver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
<?php
/**
* @author Stephen "TheCodeAssassin" Hoogendijk
*/

namespace InfluxDB\Driver;

/**
* Class UDP
*
* @package InfluxDB\Driver
*/
abstract class AbstractSocketDriver implements DriverInterface
{
/**
* Parameters
*
* @var array
*/
private $parameters;

/**
* @var array
*/
protected $config;

/**
*
* @var resource
*/
protected $stream;

/**
* @param string $host IP/hostname of the InfluxDB host
* @param int $port Port of the InfluxDB process
*/
public function __construct($host, $port)
{
$this->config['host'] = $host;
$this->config['port'] = $port;
}

/**
* Close the stream (if created)
*/
public function __destruct()
{
if (isset($this->stream) && is_resource($this->stream)) {
fclose($this->stream);
}
}

/**
* {@inheritdoc}
*/
public function setParameters(array $parameters)
{
$this->parameters = $parameters;
}

/**
* {@inheritdoc}
*/
public function getParameters()
{
return $this->parameters;
}

/**
* {@inheritdoc}
*/
public function write($data = null)
{
if (isset($this->stream) === false) {
$this->createStream();
}

if ($data !== null) {
$this->doWrite($data);
}
}

/**
* {@inheritdoc}
*/
abstract public function isSuccess();

/**
* Perform write to socket
* @param mixed $data
*/
abstract protected function doWrite($data);

/**
* Create the resource stream
*/
abstract protected function createStream();

}
44 changes: 44 additions & 0 deletions src/InfluxDB/Driver/TCP.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php
/**
* @author Stephen "TheCodeAssassin" Hoogendijk
*/

namespace InfluxDB\Driver;

/**
* Class UDP
*
* @package InfluxDB\Driver
*/
class TCP extends AbstractSocketDriver implements DriverInterface
{

private $result;

/**
* {@inheritdoc}
*/
public function isSuccess()
{
return (bool) $this->result;
}

/**
* {@inheritdoc}
*/
protected function createStream()
{
$host = sprintf('tcp://%s:%d', $this->config['host'], $this->config['port']);

// stream the data using TCP and suppress any errors
$this->stream = @stream_socket_client($host);
}

/**
* {@inheritdoc}
*/
protected function doWrite($data)
{
$this->result = false !== @fwrite($this->stream, "$data\n");
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An issue can occur when multiple messages are written into TCP socket:

telegraf_1_279d9d02c69e | 2019-02-27T10:54:58Z E! [inputs.socket_listener]: Error in plugin: unable to parse incoming line: metric parse error: expected field at offset 49: "xxx,tag=tag_value parsed=0i,processing_time=85.6xxx,tag=tag_value parsed=0i...

Each message has to end with newline.

}
}
79 changes: 9 additions & 70 deletions src/InfluxDB/Driver/UDP.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,76 +10,8 @@
*
* @package InfluxDB\Driver
*/
class UDP implements DriverInterface
class UDP extends AbstractSocketDriver implements DriverInterface
{
/**
* Parameters
*
* @var array
*/
private $parameters;

/**
* @var array
*/
private $config;

/**
*
* @var resource
*/
private $stream;

/**
* @param string $host IP/hostname of the InfluxDB host
* @param int $port Port of the InfluxDB process
*/
public function __construct($host, $port)
{
$this->config['host'] = $host;
$this->config['port'] = $port;
}

/**
* Close the stream (if created)
*/
public function __destruct()
{
if (isset($this->stream) && is_resource($this->stream)) {
fclose($this->stream);
}
}

/**
* {@inheritdoc}
*/
public function setParameters(array $parameters)
{
$this->parameters = $parameters;
}

/**
* {@inheritdoc}
*/
public function getParameters()
{
return $this->parameters;
}

/**
* {@inheritdoc}
*/
public function write($data = null)
{
if (isset($this->stream) === false) {
$this->createStream();
}

@stream_socket_sendto($this->stream, $data);

return true;
}

/**
* {@inheritdoc}
*/
Expand All @@ -89,7 +21,7 @@ public function isSuccess()
}

/**
* Create the resource stream
* {@inheritdoc}
*/
protected function createStream()
{
Expand All @@ -99,4 +31,11 @@ protected function createStream()
$this->stream = @stream_socket_client($host);
}

/**
* {@inheritdoc}
*/
protected function doWrite($data)
{
@stream_socket_sendto($this->stream, $data);
}
}