diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java index c6b3627536d..49608095653 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java @@ -70,6 +70,8 @@ import org.apache.pinot.core.util.ListenerConfigUtil; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener; +import org.apache.pinot.spi.eventlistener.query.PinotBrokerQueryEventListenerFactory; import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.apache.pinot.spi.metrics.PinotMetricsRegistry; import org.apache.pinot.spi.services.ServiceRole; @@ -124,6 +126,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable { protected HelixManager _participantHelixManager; // Handles the server routing stats. protected ServerRoutingStatsManager _serverRoutingStatsManager; + protected BrokerQueryEventListener _brokerQueryEventListener; @Override public void init(PinotConfiguration brokerConf) @@ -284,6 +287,10 @@ public void start() TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf, Broker.BROKER_TLS_PREFIX); NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX); + LOGGER.info("Initializing Broker Event Listener Factory"); + _brokerQueryEventListener = PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener( + _brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX)); + // Create Broker request handler. String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId()); String brokerRequestHandlerType = @@ -292,16 +299,18 @@ public void start() if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) { singleStageBrokerRequestHandler = new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, queryQuotaManager, - tableCache, _brokerMetrics, null); + tableCache, _brokerMetrics, null, _brokerQueryEventListener); } else { // default request handler type, e.g. netty if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) { singleStageBrokerRequestHandler = new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, - queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, tlsDefaults, _serverRoutingStatsManager); + queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, tlsDefaults, _serverRoutingStatsManager, + _brokerQueryEventListener); } else { singleStageBrokerRequestHandler = new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, - queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, null, _serverRoutingStatsManager); + queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, null, _serverRoutingStatsManager, + _brokerQueryEventListener); } } @@ -312,7 +321,7 @@ public void start() // TODO: decouple protocol and engine selection. multiStageBrokerRequestHandler = new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, - queryQuotaManager, tableCache, _brokerMetrics); + queryQuotaManager, tableCache, _brokerMetrics, _brokerQueryEventListener); } _brokerRequestHandler = new BrokerRequestHandlerDelegate(brokerId, singleStageBrokerRequestHandler, diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 420dc15eb2b..47ffeddedf9 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -90,6 +90,7 @@ import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener; import org.apache.pinot.spi.exception.BadQueryRequestException; import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.trace.Tracing; @@ -138,6 +139,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { protected final long _brokerTimeoutMs; protected final int _queryResponseLimit; protected final QueryLogger _queryLogger; + protected final BrokerQueryEventListener _brokerQueryEventListener; private final boolean _disableGroovy; private final boolean _useApproximateFunction; @@ -148,7 +150,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, - BrokerMetrics brokerMetrics) { + BrokerMetrics brokerMetrics, BrokerQueryEventListener brokerQueryEventListener) { _brokerId = brokerId; _brokerIdGenerator = new BrokerRequestIdGenerator(brokerId); _config = config; @@ -172,6 +174,7 @@ public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, Brok boolean enableQueryCancellation = Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION)); _queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null; + _brokerQueryEventListener = brokerQueryEventListener; LOGGER.info( "Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps, " + "enabling query cancellation: {}", _brokerId, _brokerTimeoutMs, _queryResponseLimit, @@ -222,8 +225,8 @@ public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, Htt // Unexpected server responses are collected and returned as exception. if (status != 200 && status != 404) { String responseString = EntityUtils.toString(httpRequestResponse.getResponse().getEntity()); - throw new Exception(String.format("Unexpected status=%d and response='%s' from uri='%s'", status, - responseString, uri)); + throw new Exception( + String.format("Unexpected status=%d and response='%s' from uri='%s'", status, responseString, uri)); } if (serverResponses != null) { serverResponses.put(uri.getHost() + ":" + uri.getPort(), status); @@ -251,19 +254,23 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption throws Exception { requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis()); + long requestId = _brokerIdGenerator.get(); + requestContext.setRequestId(requestId); + // First-stage access control to prevent unauthenticated requests from using up resources. Secondary table-level // check comes later. boolean hasAccess = _accessControlFactory.create().hasAccess(requesterIdentity); if (!hasAccess) { _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1); requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE); + _brokerQueryEventListener.onQueryCompletion(requestContext); throw new WebApplicationException("Permission denied", Response.Status.FORBIDDEN); } - long requestId = _brokerIdGenerator.get(); - requestContext.setRequestId(requestId); JsonNode sql = request.get(Broker.Request.SQL); if (sql == null) { + requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE); + _brokerQueryEventListener.onQueryCompletion(requestContext); throw new BadQueryRequestException("Failed to find 'sql' in the request: " + request); } String query = sql.asText(); @@ -282,6 +289,7 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption brokerResponse.setRequestId(String.valueOf(requestId)); brokerResponse.setBrokerId(_brokerId); brokerResponse.setBrokerReduceTimeMs(requestContext.getReduceTimeMillis()); + _brokerQueryEventListener.onQueryCompletion(requestContext); return brokerResponse; } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java index 56be2ea4fb1..b2e36d16eff 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java @@ -42,6 +42,7 @@ import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener; import org.apache.pinot.spi.trace.RequestContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,8 +62,9 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { // TODO: Support TLS public GrpcBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, - BrokerMetrics brokerMetrics, TlsConfig tlsConfig) { - super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); + BrokerMetrics brokerMetrics, TlsConfig tlsConfig, BrokerQueryEventListener brokerQueryEventListener) { + super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics, + brokerQueryEventListener); LOGGER.info("Using Grpc BrokerRequestHandler."); _grpcConfig = GrpcConfig.buildGrpcQueryConfig(config); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index e352ce906e1..7fb9b24c14b 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -62,6 +62,7 @@ import org.apache.pinot.query.type.TypeFactory; import org.apache.pinot.query.type.TypeSystem; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener; import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -77,10 +78,13 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { private final MailboxService _mailboxService; private final QueryDispatcher _queryDispatcher; - public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, - AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, - BrokerMetrics brokerMetrics) { - super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); + + public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId, + BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory, + QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics, + BrokerQueryEventListener brokerQueryEventListener) { + super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, + brokerMetrics, brokerQueryEventListener); LOGGER.info("Using Multi-stage BrokerRequestHandler."); String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME); int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT)); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java index d78aa5ffe87..3ad86484426 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java @@ -50,6 +50,7 @@ import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener; import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -72,8 +73,10 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl public SingleConnectionBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics, NettyConfig nettyConfig, - TlsConfig tlsConfig, ServerRoutingStatsManager serverRoutingStatsManager) { - super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics); + TlsConfig tlsConfig, ServerRoutingStatsManager serverRoutingStatsManager, + BrokerQueryEventListener brokerQueryEventListener) { + super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics, + brokerQueryEventListener); LOGGER.info("Using Netty BrokerRequestHandler."); _brokerReduceService = new BrokerReduceService(_config); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java index f2ee4cd03eb..e38db6e020b 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java @@ -210,7 +210,8 @@ public void testCancelQuery() BaseBrokerRequestHandler requestHandler = new BaseBrokerRequestHandler(config, "testBrokerId", routingManager, new AllowAllAccessControlFactory(), queryQuotaManager, tableCache, - new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet())) { + new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, + Collections.emptySet()), null) { @Override public void start() { } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java index 78c059c696c..749afeb233b 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java @@ -31,6 +31,7 @@ import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.eventlistener.query.PinotBrokerQueryEventListenerFactory; import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.trace.Tracing; @@ -183,7 +184,8 @@ public void testBrokerRequestHandler() SingleConnectionBrokerRequestHandler requestHandler = new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId", null, ACCESS_CONTROL_FACTORY, null, null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), - null, null, mock(ServerRoutingStatsManager.class)); + null, null, mock(ServerRoutingStatsManager.class), + PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener()); long randNum = RANDOM.nextLong(); byte[] randBytes = new byte[12]; @@ -211,7 +213,8 @@ public void testBrokerRequestHandlerWithAsFunction() SingleConnectionBrokerRequestHandler requestHandler = new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId", null, ACCESS_CONTROL_FACTORY, null, null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), - null, null, mock(ServerRoutingStatsManager.class)); + null, null, mock(ServerRoutingStatsManager.class), + PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener()); long currentTsMin = System.currentTimeMillis(); JsonNode request = JsonUtils.stringToJsonNode( "{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd z') as firstDayOf2020\"}"); @@ -418,7 +421,8 @@ public void testExplainPlanLiteralOnly() SingleConnectionBrokerRequestHandler requestHandler = new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId", null, ACCESS_CONTROL_FACTORY, null, null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), - null, null, mock(ServerRoutingStatsManager.class)); + null, null, mock(ServerRoutingStatsManager.class), + PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener()); // Test 1: select constant JsonNode request = JsonUtils.stringToJsonNode("{\"sql\":\"EXPLAIN PLAN FOR SELECT 1.5, 'test'\"}"); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java index fe37953ae0a..45d41ffc6f9 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java @@ -29,6 +29,7 @@ import org.apache.pinot.common.config.provider.TableCache; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.eventlistener.query.PinotBrokerQueryEventListenerFactory; import org.apache.pinot.spi.trace.DefaultRequestContext; import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.utils.CommonConstants; @@ -65,8 +66,9 @@ public void setUp() { _config.setProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, "12345"); _accessControlFactory = new AllowAllAccessControlFactory(); _requestHandler = - new MultiStageBrokerRequestHandler(_config, "Broker_localhost", _routingManager, _accessControlFactory, - _queryQuotaManager, _tableCache, _brokerMetrics); + new MultiStageBrokerRequestHandler(_config, "Broker_localhost", _routingManager, + _accessControlFactory, _queryQuotaManager, _tableCache, _brokerMetrics, + PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener()); } @Test diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/BrokerQueryEventListener.java b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/BrokerQueryEventListener.java new file mode 100644 index 00000000000..ea07a2df64c --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/BrokerQueryEventListener.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.eventlistener.query; + +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.trace.RequestContext; + + +public interface BrokerQueryEventListener { + + void init(PinotConfiguration eventListenerConfiguration); + void onQueryCompletion(RequestContext requestContext); +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/NoOpBrokerQueryEventListener.java b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/NoOpBrokerQueryEventListener.java new file mode 100644 index 00000000000..5b0fbd3a40d --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/NoOpBrokerQueryEventListener.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.eventlistener.query; + +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.trace.RequestContext; + + +public class NoOpBrokerQueryEventListener implements BrokerQueryEventListener { + + @Override + public void init(PinotConfiguration eventListenerConfiguration) { + // Not implemented method + } + + @Override + public void onQueryCompletion(RequestContext requestContext) { + // Not implemented method + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java new file mode 100644 index 00000000000..828f214a16b --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/eventlistener/query/PinotBrokerQueryEventListenerFactory.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.eventlistener.query; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.Collections; +import java.util.Optional; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_BROKER_EVENT_LISTENER_CLASS_NAME; +import static org.apache.pinot.spi.utils.CommonConstants.DEFAULT_BROKER_EVENT_LISTENER_CLASS_NAME; + + +public class PinotBrokerQueryEventListenerFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(PinotBrokerQueryEventListenerFactory.class); + private static BrokerQueryEventListener _brokerQueryEventListener = null; + + private PinotBrokerQueryEventListenerFactory() { + } + + /** + * Initialize the BrokerQueryEventListener and registers the eventListener + */ + @VisibleForTesting + public synchronized static void init(PinotConfiguration eventListenerConfiguration) { + // Initializes BrokerQueryEventListener. + initializeBrokerQueryEventListener(eventListenerConfiguration); + } + + /** + * Initializes PinotBrokerQueryEventListener with event-listener configurations. + * @param eventListenerConfiguration The subset of the configuration containing the event-listener-related keys + */ + private static void initializeBrokerQueryEventListener(PinotConfiguration eventListenerConfiguration) { + String brokerQueryEventListenerClassName = + eventListenerConfiguration.getProperty(CONFIG_OF_BROKER_EVENT_LISTENER_CLASS_NAME, + DEFAULT_BROKER_EVENT_LISTENER_CLASS_NAME); + LOGGER.info("{} will be initialized as the PinotBrokerQueryEventListener", brokerQueryEventListenerClassName); + + Optional> clazzFound; + try { + clazzFound = Optional.of(Class.forName(brokerQueryEventListenerClassName)); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Failed to initialize BrokerQueryEventListener. " + + "Please check if any pinot-event-listener related jar is actually added to the classpath."); + } + + clazzFound.ifPresent(clazz -> { + try { + BrokerQueryEventListener brokerQueryEventListener = (BrokerQueryEventListener) clazz.newInstance(); + brokerQueryEventListener.init(eventListenerConfiguration); + registerBrokerEventListener(brokerQueryEventListener); + } catch (Exception e) { + LOGGER.error("Caught exception while initializing event listener registry: {}, skipping it", clazz, e); + } + }); + + Preconditions.checkState(_brokerQueryEventListener != null, "Failed to initialize BrokerQueryEventListener. " + + "Please check if any pinot-event-listener related jar is actually added to the classpath."); + } + + /** + * Registers a broker event listener. + */ + private static void registerBrokerEventListener(BrokerQueryEventListener brokerQueryEventListener) { + LOGGER.info("Registering broker event listener : {}", brokerQueryEventListener.getClass().getName()); + _brokerQueryEventListener = brokerQueryEventListener; + } + + /** + * Returns the brokerQueryEventListener. If the BrokerQueryEventListener is null, + * first creates and initializes the BrokerQueryEventListener. + * @param eventListenerConfiguration event-listener configs + */ + public static synchronized BrokerQueryEventListener getBrokerQueryEventListener( + PinotConfiguration eventListenerConfiguration) { + if (_brokerQueryEventListener == null) { + init(eventListenerConfiguration); + } + return _brokerQueryEventListener; + } + + @VisibleForTesting + public static BrokerQueryEventListener getBrokerQueryEventListener() { + return getBrokerQueryEventListener(new PinotConfiguration(Collections.emptyMap())); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 56e6dcf9992..80c1816702f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -43,8 +43,11 @@ private CommonConstants() { public static final String UNKNOWN = "unknown"; public static final String CONFIG_OF_METRICS_FACTORY_CLASS_NAME = "factory.className"; + public static final String CONFIG_OF_BROKER_EVENT_LISTENER_CLASS_NAME = "factory.className"; public static final String DEFAULT_METRICS_FACTORY_CLASS_NAME = "org.apache.pinot.plugin.metrics.yammer.YammerMetricsFactory"; + public static final String DEFAULT_BROKER_EVENT_LISTENER_CLASS_NAME = + "org.apache.pinot.spi.eventlistener.query.NoOpBrokerQueryEventListener"; public static final String SWAGGER_AUTHORIZATION_KEY = "oauth"; public static final String CONFIG_OF_SWAGGER_RESOURCES_PATH = "META-INF/resources/webjars/swagger-ui/5.1.0/"; @@ -205,6 +208,7 @@ public static class Broker { public static final String ROUTING_TABLE_CONFIG_PREFIX = "pinot.broker.routing.table"; public static final String ACCESS_CONTROL_CONFIG_PREFIX = "pinot.broker.access.control"; public static final String METRICS_CONFIG_PREFIX = "pinot.broker.metrics"; + public static final String EVENT_LISTENER_CONFIG_PREFIX = "pinot.broker.event.listener"; public static final String CONFIG_OF_METRICS_NAME_PREFIX = "pinot.broker.metrics.prefix"; public static final String DEFAULT_METRICS_NAME_PREFIX = "pinot.broker.";