Skip to content

Commit

Permalink
Add broker query event listener (#11437)
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 authored Sep 26, 2023
1 parent ff264a3 commit 1748be4
Show file tree
Hide file tree
Showing 12 changed files with 231 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
import org.apache.pinot.spi.eventlistener.query.PinotBrokerQueryEventListenerFactory;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.services.ServiceRole;
Expand Down Expand Up @@ -124,6 +126,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
protected HelixManager _participantHelixManager;
// Handles the server routing stats.
protected ServerRoutingStatsManager _serverRoutingStatsManager;
protected BrokerQueryEventListener _brokerQueryEventListener;

@Override
public void init(PinotConfiguration brokerConf)
Expand Down Expand Up @@ -284,6 +287,10 @@ public void start()
TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf, Broker.BROKER_TLS_PREFIX);
NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX);

LOGGER.info("Initializing Broker Event Listener Factory");
_brokerQueryEventListener = PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener(
_brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));

// Create Broker request handler.
String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
String brokerRequestHandlerType =
Expand All @@ -292,16 +299,18 @@ public void start()
if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
singleStageBrokerRequestHandler =
new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, queryQuotaManager,
tableCache, _brokerMetrics, null);
tableCache, _brokerMetrics, null, _brokerQueryEventListener);
} else { // default request handler type, e.g. netty
if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, tlsDefaults, _serverRoutingStatsManager);
queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, tlsDefaults, _serverRoutingStatsManager,
_brokerQueryEventListener);
} else {
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, null, _serverRoutingStatsManager);
queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, null, _serverRoutingStatsManager,
_brokerQueryEventListener);
}
}

Expand All @@ -312,7 +321,7 @@ public void start()
// TODO: decouple protocol and engine selection.
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics);
queryQuotaManager, tableCache, _brokerMetrics, _brokerQueryEventListener);
}

_brokerRequestHandler = new BrokerRequestHandlerDelegate(brokerId, singleStageBrokerRequestHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.Tracing;
Expand Down Expand Up @@ -138,6 +139,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
protected final long _brokerTimeoutMs;
protected final int _queryResponseLimit;
protected final QueryLogger _queryLogger;
protected final BrokerQueryEventListener _brokerQueryEventListener;

private final boolean _disableGroovy;
private final boolean _useApproximateFunction;
Expand All @@ -148,7 +150,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {

public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics) {
BrokerMetrics brokerMetrics, BrokerQueryEventListener brokerQueryEventListener) {
_brokerId = brokerId;
_brokerIdGenerator = new BrokerRequestIdGenerator(brokerId);
_config = config;
Expand All @@ -172,6 +174,7 @@ public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, Brok
boolean enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
_queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null;
_brokerQueryEventListener = brokerQueryEventListener;
LOGGER.info(
"Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps, "
+ "enabling query cancellation: {}", _brokerId, _brokerTimeoutMs, _queryResponseLimit,
Expand Down Expand Up @@ -222,8 +225,8 @@ public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, Htt
// Unexpected server responses are collected and returned as exception.
if (status != 200 && status != 404) {
String responseString = EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
throw new Exception(String.format("Unexpected status=%d and response='%s' from uri='%s'", status,
responseString, uri));
throw new Exception(
String.format("Unexpected status=%d and response='%s' from uri='%s'", status, responseString, uri));
}
if (serverResponses != null) {
serverResponses.put(uri.getHost() + ":" + uri.getPort(), status);
Expand Down Expand Up @@ -251,19 +254,23 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
throws Exception {
requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());

long requestId = _brokerIdGenerator.get();
requestContext.setRequestId(requestId);

// First-stage access control to prevent unauthenticated requests from using up resources. Secondary table-level
// check comes later.
boolean hasAccess = _accessControlFactory.create().hasAccess(requesterIdentity);
if (!hasAccess) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1);
requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
_brokerQueryEventListener.onQueryCompletion(requestContext);
throw new WebApplicationException("Permission denied", Response.Status.FORBIDDEN);
}

long requestId = _brokerIdGenerator.get();
requestContext.setRequestId(requestId);
JsonNode sql = request.get(Broker.Request.SQL);
if (sql == null) {
requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
_brokerQueryEventListener.onQueryCompletion(requestContext);
throw new BadQueryRequestException("Failed to find 'sql' in the request: " + request);
}
String query = sql.asText();
Expand All @@ -282,6 +289,7 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
brokerResponse.setRequestId(String.valueOf(requestId));
brokerResponse.setBrokerId(_brokerId);
brokerResponse.setBrokerReduceTimeMs(requestContext.getReduceTimeMillis());
_brokerQueryEventListener.onQueryCompletion(requestContext);
return brokerResponse;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
import org.apache.pinot.spi.trace.RequestContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -61,8 +62,9 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
// TODO: Support TLS
public GrpcBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics, TlsConfig tlsConfig) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
BrokerMetrics brokerMetrics, TlsConfig tlsConfig, BrokerQueryEventListener brokerQueryEventListener) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics,
brokerQueryEventListener);
LOGGER.info("Using Grpc BrokerRequestHandler.");
_grpcConfig = GrpcConfig.buildGrpcQueryConfig(config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.pinot.query.type.TypeFactory;
import org.apache.pinot.query.type.TypeSystem;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
Expand All @@ -77,10 +78,13 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private final MailboxService _mailboxService;
private final QueryDispatcher _queryDispatcher;

public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);

public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId,
BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory,
QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics,
BrokerQueryEventListener brokerQueryEventListener) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache,
brokerMetrics, brokerQueryEventListener);
LOGGER.info("Using Multi-stage BrokerRequestHandler.");
String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
Expand All @@ -72,8 +73,10 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl
public SingleConnectionBrokerRequestHandler(PinotConfiguration config, String brokerId,
BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory,
QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics, NettyConfig nettyConfig,
TlsConfig tlsConfig, ServerRoutingStatsManager serverRoutingStatsManager) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
TlsConfig tlsConfig, ServerRoutingStatsManager serverRoutingStatsManager,
BrokerQueryEventListener brokerQueryEventListener) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics,
brokerQueryEventListener);
LOGGER.info("Using Netty BrokerRequestHandler.");

_brokerReduceService = new BrokerReduceService(_config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ public void testCancelQuery()
BaseBrokerRequestHandler requestHandler =
new BaseBrokerRequestHandler(config, "testBrokerId", routingManager, new AllowAllAccessControlFactory(),
queryQuotaManager, tableCache,
new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet())) {
new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true,
Collections.emptySet()), null) {
@Override
public void start() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.eventlistener.query.PinotBrokerQueryEventListenerFactory;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.Tracing;
Expand Down Expand Up @@ -183,7 +184,8 @@ public void testBrokerRequestHandler()
SingleConnectionBrokerRequestHandler requestHandler =
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId", null, ACCESS_CONTROL_FACTORY,
null, null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
null, null, mock(ServerRoutingStatsManager.class));
null, null, mock(ServerRoutingStatsManager.class),
PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener());

long randNum = RANDOM.nextLong();
byte[] randBytes = new byte[12];
Expand Down Expand Up @@ -211,7 +213,8 @@ public void testBrokerRequestHandlerWithAsFunction()
SingleConnectionBrokerRequestHandler requestHandler =
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId", null, ACCESS_CONTROL_FACTORY,
null, null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
null, null, mock(ServerRoutingStatsManager.class));
null, null, mock(ServerRoutingStatsManager.class),
PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener());
long currentTsMin = System.currentTimeMillis();
JsonNode request = JsonUtils.stringToJsonNode(
"{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd z') as firstDayOf2020\"}");
Expand Down Expand Up @@ -418,7 +421,8 @@ public void testExplainPlanLiteralOnly()
SingleConnectionBrokerRequestHandler requestHandler =
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId", null, ACCESS_CONTROL_FACTORY,
null, null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
null, null, mock(ServerRoutingStatsManager.class));
null, null, mock(ServerRoutingStatsManager.class),
PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener());

// Test 1: select constant
JsonNode request = JsonUtils.stringToJsonNode("{\"sql\":\"EXPLAIN PLAN FOR SELECT 1.5, 'test'\"}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.eventlistener.query.PinotBrokerQueryEventListenerFactory;
import org.apache.pinot.spi.trace.DefaultRequestContext;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
Expand Down Expand Up @@ -65,8 +66,9 @@ public void setUp() {
_config.setProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, "12345");
_accessControlFactory = new AllowAllAccessControlFactory();
_requestHandler =
new MultiStageBrokerRequestHandler(_config, "Broker_localhost", _routingManager, _accessControlFactory,
_queryQuotaManager, _tableCache, _brokerMetrics);
new MultiStageBrokerRequestHandler(_config, "Broker_localhost", _routingManager,
_accessControlFactory, _queryQuotaManager, _tableCache, _brokerMetrics,
PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.spi.eventlistener.query;

import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.trace.RequestContext;


public interface BrokerQueryEventListener {

void init(PinotConfiguration eventListenerConfiguration);
void onQueryCompletion(RequestContext requestContext);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.spi.eventlistener.query;

import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.trace.RequestContext;


public class NoOpBrokerQueryEventListener implements BrokerQueryEventListener {

@Override
public void init(PinotConfiguration eventListenerConfiguration) {
// Not implemented method
}

@Override
public void onQueryCompletion(RequestContext requestContext) {
// Not implemented method
}
}
Loading

0 comments on commit 1748be4

Please sign in to comment.