Skip to content

Commit

Permalink
IGNITE-20382 SQL Calcite: Add metrics for CalciteQueryExecutor thread…
Browse files Browse the repository at this point in the history
… pool - Fixes #10933.

Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com>
  • Loading branch information
alex-plekhanov committed Sep 25, 2023
1 parent 29c4b6e commit ad02c96
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@

package org.apache.ignite.internal.processors.query.calcite.exec;

import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;

import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static org.apache.ignite.internal.processors.pool.PoolProcessor.THREAD_POOLS;

/**
* TODO use {@link StripedExecutor}, registered in core pols.
* Query task executor.
*/
public class QueryTaskExecutorImpl extends AbstractService implements QueryTaskExecutor, Thread.UncaughtExceptionHandler {
/** */
public static final String THREAD_POOL_NAME = "CalciteQueryExecutor";

/** */
private IgniteStripedThreadPoolExecutor stripedThreadPoolExecutor;

Expand Down Expand Up @@ -82,16 +84,18 @@ public void exceptionHandler(Thread.UncaughtExceptionHandler eHnd) {
@Override public void onStart(GridKernalContext ctx) {
exceptionHandler(ctx.uncaughtExceptionHandler());

CalciteQueryProcessor proc = Objects.requireNonNull(Commons.lookupComponent(ctx, CalciteQueryProcessor.class));

stripedThreadPoolExecutor(new IgniteStripedThreadPoolExecutor(
IgniteStripedThreadPoolExecutor executor = new IgniteStripedThreadPoolExecutor(
ctx.config().getQueryThreadPoolSize(),
ctx.igniteInstanceName(),
"calciteQry",
this,
false,
0
));
);

stripedThreadPoolExecutor(executor);

executor.registerMetrics(ctx.metric().registry(metricName(THREAD_POOLS, THREAD_POOL_NAME)));
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.calcite.Query;
import org.apache.ignite.internal.processors.query.calcite.QueryRegistry;
import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand All @@ -73,6 +75,7 @@
import static org.apache.ignite.internal.processors.authentication.AuthenticationProcessorSelfTest.withSecurityContextOnAllNodes;
import static org.apache.ignite.internal.processors.authentication.User.DFAULT_USER_NAME;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.cleanPerformanceStatisticsDir;
import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.startCollectStatistics;
import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.stopCollectStatisticsAndRead;
Expand Down Expand Up @@ -302,6 +305,23 @@ public void testUserQueriesMetrics() throws Exception {
assertEquals(0, ((LongMetric)mreg1.findMetric("canceled")).value());
}

/** */
@Test
public void testThreadPoolMetrics() {
String regName = metricName(PoolProcessor.THREAD_POOLS, QueryTaskExecutorImpl.THREAD_POOL_NAME);
MetricRegistry mreg = client.context().metric().registry(regName);

LongMetric tasksCnt = mreg.findMetric("CompletedTaskCount");

tasksCnt.reset();

assertEquals(0, tasksCnt.value());

sql("SELECT 'test'");

assertTrue(tasksCnt.value() > 0);
}

/** */
@Test
public void testPerformanceStatistics() throws Exception {
Expand Down

0 comments on commit ad02c96

Please sign in to comment.