Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Cursors through API Query Params #14110

Open
wants to merge 20 commits into
base: master
Choose a base branch
from

Conversation

vrajat
Copy link
Collaborator

@vrajat vrajat commented Sep 30, 2024

Cursor support will allow Pinot clients to consume query results in smaller chunks. This feature allows clients to work with lesser resources esp. memory. Application logic is simpler with cursors. For example an app UI paginates through results in a table or a graph. Cursor support has been implemented using APIs.

Design Doc

Implementation for #13185

API

POST /query/sql

A new broker API parameter has been added to trigger pagination.
The API accepts the following new optional query parameters:

  • getCursor(boolean):
  • numRows (int): The number of rows to return in the first page.

The response contains the following extra fields:

Field Description
brokerHost hostname of the processing broker
brokerPort port of the processing broker
offset starting offset of the result table slice in the cursor response
numRows Number of rows in result table slice in the cursor response
cursorResultWriteTimeMs Time taken to write the query response to ResponseStore
cursorFetchTimeMs Time taken to read a slice of the result table from ResponseStore
submissionTime Unix timestamp when the query was submitted
expirationTime Unix timestamp when the response can be deleted from the ResponseStore
bytesWritten number of bytes written to the response store when storing the result table

GET /resultStore/{requestId}/results

This is a broker API that can be used to iterate over the result set of a query submitted using the above API.
The API accepts the following query parameters:

  • offset (int) (required): The start offset of the page of results.
  • numRows (int) (optional): The number of rows in the page. By default it will use the default size.

GET /resultStore/{requestId}/

Returns the BrokerResponse metadata of the query.

GET /resultStore

Lists all the requestIds of all the query results available in the response store.

DELETE /resultStore/{requestId}/

Delete the results of a query.
The API accepts the following query parameters:

  • requestId (required)

SPI

The PR implements a FileSystem ResponseStore and a JSON ResponseSerde.

The feature provides two SPIs to extend the feature to support other implementations:

  • ResponseSerde: Serialize/Deserialize the response.
  • ResponseStore: Store responses in a storage system.
    Both SPIs use Java SPI and the default ServiceLoader to find implementation of the SPIs. All implementation should be annotated with AutoService to help generate files for discovering the implementations.

Configuration

ResponseStore

Configuration Default Description
pinot.broker.cursor.response.store.type file The protocol to use for storage
pinot.broker.cursor.response.store.serde json The Serialization/Deserialization protocol to use for the result set

File Response Store

Configuration Default Description
pinot.broker.cursor.response.store.file.data.dir /tmp/pinot/broker/response_store/data Location where result files will be stored.
pinot.broker.cursor.response.store.file.data.dir file:///tmp/pinot/broker/response_store/data Location where temporary files will be created.
pinot.broker.cursor.response.store.file.extension json The file name extension

Miscellaneous

Configuration Default Description
pinot.broker.cursor.result.size 10000 The result size if numRows is not specified in the API call.
pinot.broker.cursor.response.store.expiration 1h The time before a query result will be deleted.
controller.cluster.response.store.cleaner.frequencyPeriod 1h The frequency of the periodic task that deletes expired query results
controller.cluster.response.store.cursor.cleaner.initialDelay random The initial delay before the first run of the periodic task.

tags: feature, multi-stage, release-notes

@codecov-commenter
Copy link

codecov-commenter commented Sep 30, 2024

Codecov Report

Attention: Patch coverage is 3.73134% with 129 lines in your changes missing coverage. Please review.

Project coverage is 55.36%. Comparing base (59551e4) to head (fb03d62).
Report is 1199 commits behind head on master.

Files with missing lines Patch % Lines
...he/pinot/common/cursors/AbstractResponseStore.java 0.00% 53 Missing ⚠️
...t/common/response/broker/CursorResponseNative.java 0.00% 29 Missing ⚠️
...apache/pinot/spi/cursors/ResponseSerdeService.java 0.00% 22 Missing ⚠️
...apache/pinot/spi/cursors/ResponseStoreService.java 0.00% 22 Missing ⚠️
...e/pinot/common/utils/config/QueryOptionsUtils.java 0.00% 2 Missing ⚠️
...va/org/apache/pinot/spi/utils/CommonConstants.java 0.00% 1 Missing ⚠️

❗ There is a different number of reports uploaded between BASE (59551e4) and HEAD (fb03d62). Click for more details.

HEAD has 34 uploads less than BASE
Flag BASE (59551e4) HEAD (fb03d62)
integration 7 2
integration2 3 2
temurin 12 5
java-21 7 3
skip-bytebuffers-true 3 1
skip-bytebuffers-false 7 4
unittests 5 3
java-11 5 2
unittests2 3 0
integration1 2 0
custom-integration1 2 0
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14110      +/-   ##
============================================
- Coverage     61.75%   55.36%   -6.40%     
- Complexity      207      791     +584     
============================================
  Files          2436     2078     -358     
  Lines        133233   109585   -23648     
  Branches      20636    17341    -3295     
============================================
- Hits          82274    60668   -21606     
+ Misses        44911    44073     -838     
+ Partials       6048     4844    -1204     
Flag Coverage Δ
custom-integration1 ?
integration 0.00% <ø> (-0.01%) ⬇️
integration1 ?
integration2 0.00% <ø> (ø)
java-11 55.32% <3.73%> (-6.39%) ⬇️
java-21 55.22% <3.73%> (-6.41%) ⬇️
skip-bytebuffers-false 55.35% <3.73%> (-6.40%) ⬇️
skip-bytebuffers-true 55.19% <3.73%> (+27.46%) ⬆️
temurin 55.36% <3.73%> (-6.40%) ⬇️
unittests 55.36% <3.73%> (-6.39%) ⬇️
unittests1 55.36% <3.73%> (+8.47%) ⬆️
unittests2 ?

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Setup CursorRequestHandlerDelegate

Checkpoint - compiles and sanity test works.

Setup CursorRequestHandlerDelegate

Checkpoint - compiles and sanity test works.

add spi for cursors

Checkpoint: testCursorWorkflow works.

Checkpoint: most tests work.

Fix checkstyle

Register ResultStoreCleaner

Redesign SPI. Make cursor integration test work

Add Test for FsResultStore

Add Test for Auth

Remove unnecessary files.

Checkpoint.

Fix tests to use URLs

Undo unnecessary changes.
@vrajat vrajat changed the title Cursors Add support for Cursors through API Query Params Oct 17, 2024
@vrajat vrajat added feature multi-stage Related to the multi-stage query engine labels Oct 17, 2024
@vrajat vrajat marked this pull request as ready for review October 17, 2024 16:27
CommonConstants.CursorConfigs.DEFAULT_QUERY_RESULT_SIZE);
}

if (numRows > CommonConstants.CursorConfigs.MAX_QUERY_RESULT_SIZE) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to introduce the MAX_QUERY_RESULT_SIZE here if there is no such restriction on the generic queries?

@@ -100,6 +103,9 @@
public class PinotClientRequest {
private static final Logger LOGGER = LoggerFactory.getLogger(PinotClientRequest.class);

@Inject
PinotConfiguration _brokerConf;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have to inject the broker config here as it's already in the BaseBrokerRequestHandler class.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no getter right now to get the config. I can add that. However is that a better option than injecting the config ?

private static final Logger LOGGER = LoggerFactory.getLogger(ResponseStoreResource.class);

@Inject
private PinotConfiguration _brokerConf;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, you might not need to inject the broker config here.

}

response.setResultTable(
new ResultTable(resultTable.getDataSchema(), resultTable.getRows().subList(offset, sliceEnd)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean every time a read request will fetch the whole result table into memory first and then pick the page from the memory?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation does read the whole result table into memory. Does LinkedIn implementation support seeking within the result table ? Do you have suggestions on how can the interface be changed to also support seek ?

* Starts from 0.
* @return current offset.
*/
int getOffset();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put the methods for the same object together, i.e. swapping the setNumRows() and getOffset() methods

x -> new InstanceInfo(x.getInstanceName(), x.getHostName(), Integer.parseInt(x.getPort()))));

try {
Map<String, List<CursorResponseNative>> brokerResponses = getAllQueryResults(brokers, Collections.emptyMap());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, it seems every controller will try to fetch all the results from all the brokers, is it kind of a waste of resources to do that? Could we distribute the cleanup workloads to controllers instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assumed that only the lead controller will run the periodic tasks. Right now, the controller does not have any information about ResponseStores. Only the broker has information.

The major point to discuss - is it a requirement that a ResponseStore should be accessible from all nodes ? Then the default implementation of using the broker local filesystem has to be changed.

The current perioidc task and cleaner assumes that a only a specific broker knows about the responses it has stored.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature multi-stage Related to the multi-stage query engine
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants