From 6628251513b0c977b7ae9ab94b74af8594a7c665 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Wed, 15 Jan 2025 16:20:03 +0100 Subject: [PATCH] feat: add `Client::insertPayload()` --- .github/workflows/ci.yml | 4 ++ .github/workflows/coding-standards.yml | 2 + .github/workflows/static-analysis.yml | 2 + composer.json | 7 ++- src/Client/ClickHouseClient.php | 18 +++++++ src/Client/Http/RequestFactory.php | 27 +++++++--- src/Client/Http/RequestOptions.php | 13 +---- src/Client/Http/RequestSettings.php | 22 +++++++++ src/Client/PsrClickHouseAsyncClient.php | 11 +++-- src/Client/PsrClickHouseClient.php | 57 +++++++++++++++++++-- src/Format/RowBinary.php | 28 +++++++++++ tests/Client/Http/RequestFactoryTest.php | 15 ++++-- tests/Client/Http/RequestOptionsTest.php | 27 ---------- tests/Client/Http/RequestSettingsTest.php | 25 ++++++++++ tests/Client/InsertTest.php | 60 +++++++++++++++++++++++ 15 files changed, 257 insertions(+), 61 deletions(-) create mode 100644 src/Client/Http/RequestSettings.php create mode 100644 src/Format/RowBinary.php delete mode 100644 tests/Client/Http/RequestOptionsTest.php create mode 100644 tests/Client/Http/RequestSettingsTest.php diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b76504b..87343b3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,6 +44,8 @@ jobs: - name: "Install dependencies with Composer" uses: "ramsey/composer-install@v3" + env: + COMPOSER_AUTH: '{"github-oauth": {"github.com": "${{ secrets.COMPOSER_AUTH }}"}}' with: composer-options: "${{ matrix.composer-options }}" dependency-versions: "${{ matrix.dependency-versions }}" @@ -93,6 +95,8 @@ jobs: - name: "Install dependencies with Composer" uses: "ramsey/composer-install@v3" + env: + COMPOSER_AUTH: '{"github-oauth": {"github.com": "${{ secrets.COMPOSER_AUTH }}"}}' with: dependency-versions: "${{ matrix.dependency-versions }}" diff --git a/.github/workflows/coding-standards.yml b/.github/workflows/coding-standards.yml index c79a1ab..fb03674 100644 --- a/.github/workflows/coding-standards.yml +++ b/.github/workflows/coding-standards.yml @@ -30,6 +30,8 @@ jobs: - name: "Install dependencies with Composer" uses: "ramsey/composer-install@v3" + env: + COMPOSER_AUTH: '{"github-oauth": {"github.com": "${{ secrets.COMPOSER_AUTH }}"}}' - name: "Run PHP_CodeSniffer" run: "vendor/bin/phpcs -q --no-colors --report=checkstyle | cs2pr" diff --git a/.github/workflows/static-analysis.yml b/.github/workflows/static-analysis.yml index dccb673..0b0885f 100644 --- a/.github/workflows/static-analysis.yml +++ b/.github/workflows/static-analysis.yml @@ -30,6 +30,8 @@ jobs: - name: "Install dependencies with Composer" uses: "ramsey/composer-install@v3" + env: + COMPOSER_AUTH: '{"github-oauth": {"github.com": "${{ secrets.COMPOSER_AUTH }}"}}' - name: "Run a static analysis with phpstan/phpstan" run: "vendor/bin/phpstan analyse --error-format=checkstyle | cs2pr" diff --git a/composer.json b/composer.json index bdefceb..297a3b4 100644 --- a/composer.json +++ b/composer.json @@ -39,6 +39,7 @@ "require-dev": { "cdn77/coding-standard": "^7.0", "infection/infection": "^0.29.0", + "kafkiansky/phpclick": "dev-bump", "nyholm/psr7": "^1.2", "php-http/message-factory": "^1.1", "phpstan/extension-installer": "^1.1", @@ -58,5 +59,9 @@ "psr-4": { "SimPod\\ClickHouseClient\\Tests\\": "tests/" } - } + }, + "repositories": [{ + "type": "vcs", + "url": "https://github.com/simPod/PHPClick" + }] } diff --git a/src/Client/ClickHouseClient.php b/src/Client/ClickHouseClient.php index 7e8abcc..36a203a 100644 --- a/src/Client/ClickHouseClient.php +++ b/src/Client/ClickHouseClient.php @@ -5,6 +5,7 @@ namespace SimPod\ClickHouseClient\Client; use Psr\Http\Client\ClientExceptionInterface; +use Psr\Http\Message\StreamInterface; use SimPod\ClickHouseClient\Exception\CannotInsert; use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Exception\UnsupportedParamType; @@ -85,4 +86,21 @@ public function insert(string $table, array $values, array|null $columns = null, * @template O of Output */ public function insertWithFormat(string $table, Format $inputFormat, string $data, array $settings = []): void; + + /** + * @param array $settings + * @param list $columns + * @param Format> $inputFormat + * + * @throws ClientExceptionInterface + * @throws CannotInsert + * @throws ServerError + */ + public function insertPayload( + string $table, + Format $inputFormat, + StreamInterface $payload, + array $columns = [], + array $settings = [], + ): void; } diff --git a/src/Client/Http/RequestFactory.php b/src/Client/Http/RequestFactory.php index 4c74502..0c50559 100644 --- a/src/Client/Http/RequestFactory.php +++ b/src/Client/Http/RequestFactory.php @@ -49,11 +49,13 @@ public function __construct( $this->uri = $uri; } - /** @throws UnsupportedParamType */ - public function prepareRequest(RequestOptions $requestOptions): RequestInterface - { + /** @param array $additionalOptions */ + public function initRequest( + RequestSettings $requestSettings, + array $additionalOptions = [], + ): RequestInterface { $query = http_build_query( - $requestOptions->settings, + $requestSettings->settings + $additionalOptions, '', '&', PHP_QUERY_RFC3986, @@ -70,11 +72,20 @@ public function prepareRequest(RequestOptions $requestOptions): RequestInterface } } - $request = $this->requestFactory->createRequest('POST', $uri); + return $this->requestFactory->createRequest('POST', $uri); + } - preg_match_all('~\{([a-zA-Z\d]+):([a-zA-Z\d ]+(\(.+\))?)}~', $requestOptions->sql, $matches); + /** @throws UnsupportedParamType */ + public function prepareSqlRequest( + string $sql, + RequestSettings $requestSettings, + RequestOptions $requestOptions, + ): RequestInterface { + $request = $this->initRequest($requestSettings); + + preg_match_all('~\{([a-zA-Z\d]+):([a-zA-Z\d ]+(\(.+\))?)}~', $sql, $matches); if ($matches[0] === []) { - $body = $this->streamFactory->createStream($requestOptions->sql); + $body = $this->streamFactory->createStream($sql); try { return $request->withBody($body); } catch (InvalidArgumentException) { @@ -93,7 +104,7 @@ static function (array $acc, string|int $k) use ($matches) { [], ); - $streamElements = [['name' => 'query', 'contents' => $requestOptions->sql]]; + $streamElements = [['name' => 'query', 'contents' => $sql]]; foreach ($requestOptions->params as $name => $value) { $type = $paramToType[$name] ?? null; if ($type === null) { diff --git a/src/Client/Http/RequestOptions.php b/src/Client/Http/RequestOptions.php index 1f5e12b..772ffd5 100644 --- a/src/Client/Http/RequestOptions.php +++ b/src/Client/Http/RequestOptions.php @@ -6,20 +6,9 @@ final class RequestOptions { - /** @var array */ - public array $settings; - - /** - * @param array $params - * @param array $defaultSettings - * @param array $querySettings - */ + /** @param array $params */ public function __construct( - public string $sql, public array $params, - array $defaultSettings, - array $querySettings, ) { - $this->settings = $querySettings + $defaultSettings; } } diff --git a/src/Client/Http/RequestSettings.php b/src/Client/Http/RequestSettings.php new file mode 100644 index 0000000..0f99893 --- /dev/null +++ b/src/Client/Http/RequestSettings.php @@ -0,0 +1,22 @@ + */ + public array $settings; + + /** + * @param array $defaultSettings + * @param array $querySettings + */ + public function __construct( + array $defaultSettings, + array $querySettings, + ) { + $this->settings = $querySettings + $defaultSettings; + } +} diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index d1a1007..2cce723 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -12,6 +12,7 @@ use Psr\Http\Message\ResponseInterface; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; +use SimPod\ClickHouseClient\Client\Http\RequestSettings; use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Format\Format; use SimPod\ClickHouseClient\Output\Output; @@ -83,13 +84,15 @@ private function executeRequest( array $settings = [], callable|null $processResponse = null, ): PromiseInterface { - $request = $this->requestFactory->prepareRequest( - new RequestOptions( - $sql, - $params, + $request = $this->requestFactory->prepareSqlRequest( + $sql, + new RequestSettings( $this->defaultSettings, $settings, ), + new RequestOptions( + $params, + ), ); return Create::promiseFor( diff --git a/src/Client/PsrClickHouseClient.php b/src/Client/PsrClickHouseClient.php index e274529..dc61a62 100644 --- a/src/Client/PsrClickHouseClient.php +++ b/src/Client/PsrClickHouseClient.php @@ -5,11 +5,14 @@ namespace SimPod\ClickHouseClient\Client; use DateTimeZone; +use InvalidArgumentException; use Psr\Http\Client\ClientExceptionInterface; use Psr\Http\Client\ClientInterface; use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\StreamInterface; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; +use SimPod\ClickHouseClient\Client\Http\RequestSettings; use SimPod\ClickHouseClient\Exception\CannotInsert; use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Exception\UnsupportedParamType; @@ -198,6 +201,50 @@ public function insertWithFormat(string $table, Format $inputFormat, string $dat } } + public function insertPayload( + string $table, + Format $inputFormat, + StreamInterface $payload, + array $columns = [], + array $settings = [], + ): void { + if ($payload->getSize() === 0) { + throw CannotInsert::noValues(); + } + + $formatSql = $inputFormat::toSql(); + + $table = Escaper::quoteIdentifier($table); + + $columnsSql = $columns === [] ? '' : sprintf('(%s)', implode(',', $columns)); + + $sql = <<requestFactory->initRequest( + new RequestSettings( + $this->defaultSettings, + $settings, + ), + ['query' => $sql], + ); + + try { + $request = $request->withBody($payload); + } catch (InvalidArgumentException) { + absurd(); + } + + $response = $this->client->sendRequest($request); + + if ($response->getStatusCode() !== 200) { + throw ServerError::fromResponse($response); + } + + return; + } + /** * @param array $params * @param array $settings @@ -208,13 +255,15 @@ public function insertWithFormat(string $table, Format $inputFormat, string $dat */ private function executeRequest(string $sql, array $params, array $settings): ResponseInterface { - $request = $this->requestFactory->prepareRequest( - new RequestOptions( - $sql, - $params, + $request = $this->requestFactory->prepareSqlRequest( + $sql, + new RequestSettings( $this->defaultSettings, $settings, ), + new RequestOptions( + $params, + ), ); $response = $this->client->sendRequest($request); diff --git a/src/Format/RowBinary.php b/src/Format/RowBinary.php new file mode 100644 index 0000000..ff3eb55 --- /dev/null +++ b/src/Format/RowBinary.php @@ -0,0 +1,28 @@ +> + */ +final class RowBinary implements Format +{ + public static function output(string $contents): Output + { + /** @var Basic $output */ + $output = new Basic($contents); + + return $output; + } + + public static function toSql(): string + { + return 'FORMAT RowBinary'; + } +} diff --git a/tests/Client/Http/RequestFactoryTest.php b/tests/Client/Http/RequestFactoryTest.php index 04bc6a3..2bbb916 100644 --- a/tests/Client/Http/RequestFactoryTest.php +++ b/tests/Client/Http/RequestFactoryTest.php @@ -10,6 +10,7 @@ use PHPUnit\Framework\Attributes\DataProvider; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; +use SimPod\ClickHouseClient\Client\Http\RequestSettings; use SimPod\ClickHouseClient\Param\ParamValueConverterRegistry; use SimPod\ClickHouseClient\Tests\TestCaseBase; @@ -28,12 +29,16 @@ public function testPrepareRequest(string $uri, string $expectedUri): void $uri, ); - $request = $requestFactory->prepareRequest(new RequestOptions( + $request = $requestFactory->prepareSqlRequest( 'SELECT 1', - [], - ['max_block_size' => 1], - ['database' => 'database'], - )); + new RequestSettings( + ['max_block_size' => 1], + ['database' => 'database'], + ), + new RequestOptions( + [], + ), + ); self::assertSame('POST', $request->getMethod()); self::assertSame( diff --git a/tests/Client/Http/RequestOptionsTest.php b/tests/Client/Http/RequestOptionsTest.php deleted file mode 100644 index 3eb7018..0000000 --- a/tests/Client/Http/RequestOptionsTest.php +++ /dev/null @@ -1,27 +0,0 @@ - 'foo', 'a' => 1], - ['database' => 'bar', 'b' => 2], - ); - - self::assertSame('bar', $requestOptions->settings['database']); - self::assertSame(1, $requestOptions->settings['a']); - self::assertSame(2, $requestOptions->settings['b']); - } -} diff --git a/tests/Client/Http/RequestSettingsTest.php b/tests/Client/Http/RequestSettingsTest.php new file mode 100644 index 0000000..532874a --- /dev/null +++ b/tests/Client/Http/RequestSettingsTest.php @@ -0,0 +1,25 @@ + 'foo', 'a' => 1], + ['database' => 'bar', 'b' => 2], + ); + + self::assertSame('bar', $requestSettings->settings['database']); + self::assertSame(1, $requestSettings->settings['a']); + self::assertSame(2, $requestSettings->settings['b']); + } +} diff --git a/tests/Client/InsertTest.php b/tests/Client/InsertTest.php index 76415d9..91852f5 100644 --- a/tests/Client/InsertTest.php +++ b/tests/Client/InsertTest.php @@ -4,6 +4,10 @@ namespace SimPod\ClickHouseClient\Tests\Client; +use Kafkiansky\Binary\Buffer; +use Kafkiansky\Binary\Endianness; +use Kafkiansky\PHPClick; +use Nyholm\Psr7\Factory\Psr17Factory; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\Attributes\DataProvider; use SimPod\ClickHouseClient\Client\Http\RequestFactory; @@ -12,6 +16,7 @@ use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Format\JsonCompact; use SimPod\ClickHouseClient\Format\JsonEachRow; +use SimPod\ClickHouseClient\Format\RowBinary; use SimPod\ClickHouseClient\Tests\TestCaseBase; use SimPod\ClickHouseClient\Tests\WithClient; @@ -21,6 +26,7 @@ #[CoversClass(ServerError::class)] #[CoversClass(JsonEachRow::class)] #[CoversClass(JsonCompact::class)] +#[CoversClass(RowBinary::class)] final class InsertTest extends TestCaseBase { use WithClient; @@ -107,6 +113,60 @@ public function testInsertUseColumnsWithTypes(string $tableSql): void self::assertSame($expectedData, $output->data); } + #[DataProvider('providerInsert')] + public function testInsertPayload(string $tableSql): void + { + $data = [ + ['PageViews' => 5, 'UserID' => 4324182021466249494, 'Duration' => 146, 'Sign' => -1], + ['PageViews' => 6, 'UserID' => 4324182021466249494, 'Duration' => 185, 'Sign' => 1], + ]; + + $rows = [ + PHPClick\Row::columns( + PHPClick\Column::uint32(5), + PHPClick\Column::uint64(4324182021466249494), + PHPClick\Column::uint32(146), + PHPClick\Column::int8(-1), + ), + PHPClick\Row::columns( + PHPClick\Column::uint32(6), + PHPClick\Column::uint64(4324182021466249494), + PHPClick\Column::uint32(185), + PHPClick\Column::int8(1), + ), + ]; + + $buffer = Buffer::empty(Endianness::little()); + + foreach ($rows as $row) { + $row->writeToBuffer($buffer); + } + + self::$client->executeQuery($tableSql); + + $psr17Factory = new Psr17Factory(); + + self::$client->insertPayload( + 'UserActivity', + new RowBinary(), + $psr17Factory->createStream( + $buffer->reset(), + ), + ['PageViews', 'UserID', 'Duration', 'Sign'], + ); + + $output = self::$client->select( + <<<'CLICKHOUSE' + SELECT * FROM UserActivity + CLICKHOUSE, + new JsonEachRow(), + ); + + $data[0]['UserID'] = (string) $data[0]['UserID']; + $data[1]['UserID'] = (string) $data[1]['UserID']; + self::assertSame($data, $output->data); + } + public function testInsertEscaping(): void { self::$client->executeQuery(