diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java index a21ba720ced04..8dbaf85ba70ae 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java @@ -58,8 +58,8 @@ import org.apache.ignite.events.SqlQueryExecutionEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.GridProcessorAdapter; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; @@ -119,10 +119,10 @@ import org.apache.ignite.internal.processors.query.calcite.util.Service; import org.apache.ignite.internal.processors.security.SecurityUtils; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.getLong; -import static org.apache.ignite.configuration.TransactionConfiguration.TX_AWARE_QUERIES_SUPPORTED_MODES; import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION; /** */ @@ -605,7 +605,7 @@ private static GridCacheVersion queryTransactionVersion(@Nullable QueryContext q /** */ private void ensureTransactionModeSupported(@Nullable QueryContext qryCtx) { - if (!ctx.config().getTransactionConfiguration().isTxAwareQueriesEnabled()) + if (!U.isTxAwareQueriesEnabled(ctx)) return; GridCacheVersion ver = queryTransactionVersion(qryCtx); @@ -613,12 +613,7 @@ private void ensureTransactionModeSupported(@Nullable QueryContext qryCtx) { if (ver == null) return; - final GridNearTxLocal userTx = ctx.cache().context().tm().tx(ver); - - if (TX_AWARE_QUERIES_SUPPORTED_MODES.contains(userTx.isolation())) - return; - - throw new IllegalStateException("Transaction isolation mode not supported for SQL queries: " + userTx.isolation()); + IgniteTxManager.ensureTransactionModeSupported(ctx.cache().context().tm().tx(ver).isolation()); } /** */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/ScanQuerySystemView.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/ScanQuerySystemView.java index 8583d358be293..5beb99372ee7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/ScanQuerySystemView.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/ScanQuerySystemView.java @@ -28,7 +28,7 @@ import org.apache.ignite.internal.managers.systemview.walker.ScanQueryViewWalker; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; -import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager.ScanQueryIterator; +import org.apache.ignite.internal.processors.cache.query.ScanQueryIterator; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.spi.IgniteSpiCloseableIterator; @@ -83,7 +83,7 @@ public ScanQuerySystemView(Collection> cctxs) { return new QueryDataIterator(); } - /** Class to iterate through all {@link GridCacheQueryManager.ScanQueryIterator}. */ + /** Class to iterate through all {@link ScanQueryIterator}. */ private class QueryDataIterator implements Iterator { /** Cache contexts iterator. */ private final Iterator> cctxsIter; @@ -95,7 +95,7 @@ private class QueryDataIterator implements Iterator { private Iterator.RequestFutureMap>> nodeQryIter; /** Local query iterator. */ - private Iterator localQryIter; + private Iterator localQryIter; /** Current node id. */ private UUID nodeId; @@ -173,7 +173,7 @@ public QueryDataIterator() { } /** - * @return {@code True} if next {@link GridCacheQueryManager.ScanQueryIterator} found. + * @return {@code True} if next {@link ScanQueryIterator} found. */ private boolean nextScanIter() { try { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java index 2d9474993d0f2..192c8f1b17899 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java @@ -785,7 +785,6 @@ private Collection nodes() throws IgniteCheckedException { top.readLock(); try { - Collection affNodes = nodes(cctx, null, null); List nodes = new ArrayList<>(affNodes); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 9d2e65ac5b6d3..0d386cd2c2cbe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -33,7 +33,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Queue; import java.util.Set; import java.util.UUID; @@ -46,7 +45,6 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheEntry; import org.apache.ignite.cache.QueryIndexType; @@ -76,10 +74,7 @@ import org.apache.ignite.internal.processors.cache.CacheObjectUtils; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; -import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; -import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; @@ -89,13 +84,11 @@ import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; import org.apache.ignite.internal.processors.datastructures.GridSetQueryPredicate; import org.apache.ignite.internal.processors.datastructures.SetItemKey; -import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.QueryUtils; -import org.apache.ignite.internal.processors.security.SecurityUtils; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.GridBoundedPriorityQueue; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; @@ -795,16 +788,13 @@ private GridCloseableIterator> sharedCacheSetIterator(CacheQ private GridCloseableIterator scanIterator(final CacheQuery qry, IgniteClosure transformer, boolean locNode) throws IgniteCheckedException { - final InternalScanFilter intFilter = internalFilter(qry.scanFilter()); - try { Integer part = qry.partition(); if (part != null && (part < 0 || part >= cctx.affinity().partitions())) return new GridEmptyCloseableIterator() { @Override public void close() throws IgniteCheckedException { - if (intFilter != null) - intFilter.close(); + ScanQueryIterator.closeFilter(qry.scanFilter()); super.close(); } @@ -856,8 +846,7 @@ private GridCloseableIterator scanIterator(final CacheQuery qry, IgniteClosur } ScanQueryIterator iter = new ScanQueryIterator(it, qry, topVer, locPart, - intFilter, - prepareTransformer(transformer), + transformer, locNode, locNode ? locIters : null, cctx, log); if (locNode) { @@ -869,35 +858,7 @@ private GridCloseableIterator scanIterator(final CacheQuery qry, IgniteClosur return iter; } catch (IgniteCheckedException | RuntimeException e) { - if (intFilter != null) - intFilter.close(); - - throw e; - } - } - - /** */ - private @Nullable IgniteClosure prepareTransformer(IgniteClosure transformer) throws IgniteCheckedException { - return SecurityUtils.sandboxedProxy(cctx.kernalContext(), IgniteClosure.class, injectResources(transformer)); - } - - /** */ - private @Nullable InternalScanFilter internalFilter(IgniteBiPredicate keyValFilter) throws IgniteCheckedException { - if (keyValFilter == null) - return null; - - try { - if (keyValFilter instanceof PlatformCacheEntryFilter) - ((PlatformCacheEntryFilter)keyValFilter).cacheContext(cctx); - else - injectResources(keyValFilter); - - keyValFilter = SecurityUtils.sandboxedProxy(cctx.kernalContext(), IgniteBiPredicate.class, keyValFilter); - - return new InternalScanFilter<>(keyValFilter); - } - catch (IgniteCheckedException | RuntimeException e) { - InternalScanFilter.close(keyValFilter); + ScanQueryIterator.closeFilter(qry.scanFilter()); throw e; } @@ -907,7 +868,7 @@ private GridCloseableIterator scanIterator(final CacheQuery qry, IgniteClosur * @param o Object to inject resources to. * @throws IgniteCheckedException If failure occurred while injecting resources. */ - private R injectResources(@Nullable R o) throws IgniteCheckedException { + static R injectResources(@Nullable R o, GridCacheContext cctx) throws IgniteCheckedException { if (o == null) return null; @@ -952,7 +913,7 @@ protected void runFieldsQuery(final GridCacheQueryInfo qryInfo) { try { // Preparing query closures. - final IgniteReducer rdc = injectResources((IgniteReducer)qryInfo.reducer()); + final IgniteReducer rdc = injectResources((IgniteReducer)qryInfo.reducer(), cctx); CacheQuery qry = qryInfo.query(); @@ -1149,7 +1110,8 @@ protected void runQuery(GridCacheQueryInfo qryInfo) { CacheQuery qry = qryInfo.query(); try { - IgniteReducer, Object> rdc = injectResources((IgniteReducer, Object>)qryInfo.reducer()); + IgniteReducer, Object> rdc = + injectResources((IgniteReducer, Object>)qryInfo.reducer(), cctx); int pageSize = qry.pageSize(); @@ -1398,9 +1360,6 @@ protected GridCloseableIterator scanQueryLocal(final CacheQuery qry, final String namex = cctx.name(); - final InternalScanFilter intFilter = qry.scanFilter() != null ? - new InternalScanFilter<>(qry.scanFilter()) : null; - try { assert qry.type() == SCAN; @@ -1419,7 +1378,7 @@ protected GridCloseableIterator scanQueryLocal(final CacheQuery qry, namex, null, null, - intFilter != null ? intFilter.scanFilter() : null, + qry.scanFilter(), null, null, securitySubjectId(cctx), @@ -1433,8 +1392,7 @@ protected GridCloseableIterator scanQueryLocal(final CacheQuery qry, return it; } catch (Exception e) { - if (intFilter != null) - intFilter.close(); + ScanQueryIterator.closeFilter(qry.scanFilter()); if (updateStatistics) cctx.queries().collectMetrics(GridCacheQueryType.SCAN, namex, startTime, @@ -3042,414 +3000,4 @@ public boolean isCanceled(Long key) { } } - /** */ - public static final class ScanQueryIterator extends GridCloseableIteratorAdapter { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final GridDhtCacheAdapter dht; - - /** */ - private final GridDhtLocalPartition locPart; - - /** */ - private final InternalScanFilter intScanFilter; - - /** */ - private final boolean statsEnabled; - - /** */ - private final GridIterator it; - - /** */ - private final GridCacheAdapter cache; - - /** */ - private final AffinityTopologyVersion topVer; - - /** */ - private final boolean keepBinary; - - /** */ - private final boolean readEvt; - - /** */ - private final String cacheName; - - /** */ - private final UUID subjId; - - /** */ - private final String taskName; - - /** */ - private final IgniteClosure transform; - - /** */ - private final CacheObjectContext objCtx; - - /** */ - private final GridCacheContext cctx; - - /** */ - private final IgniteLogger log; - - /** */ - private Object next; - - /** */ - private boolean needAdvance; - - /** */ - private IgniteCacheExpiryPolicy expiryPlc; - - /** */ - private final boolean locNode; - - /** */ - private final boolean incBackups; - - /** */ - private final long startTime; - - /** */ - private final int pageSize; - - /** */ - @Nullable private final GridConcurrentHashSet locIters; - - /** - * @param it Iterator. - * @param qry Query. - * @param topVer Topology version. - * @param locPart Local partition. - * @param intScanFilter Internal scan filter. - * @param transformer Transformer. - * @param locNode Local node flag. - * @param locIters Local iterators set. - * @param cctx Cache context. - * @param log Logger. - */ - ScanQueryIterator( - GridIterator it, - CacheQuery qry, - AffinityTopologyVersion topVer, - GridDhtLocalPartition locPart, - InternalScanFilter intScanFilter, - IgniteClosure transformer, - boolean locNode, - @Nullable GridConcurrentHashSet locIters, - GridCacheContext cctx, - IgniteLogger log) { - assert !locNode || locIters != null : "Local iterators can't be null for local query."; - - this.it = it; - this.topVer = topVer; - this.locPart = locPart; - this.intScanFilter = intScanFilter; - this.cctx = cctx; - - this.log = log; - this.locNode = locNode; - this.locIters = locIters; - - incBackups = qry.includeBackups(); - - statsEnabled = cctx.statisticsEnabled(); - - readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ) && - cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ); - - taskName = readEvt ? cctx.kernalContext().task().resolveTaskName(qry.taskHash()) : null; - - subjId = securitySubjectId(cctx); - - // keep binary for remote scans if possible - keepBinary = (!locNode && intScanFilter == null && transformer == null && !readEvt) || qry.keepBinary(); - transform = transformer; - dht = cctx.isNear() ? cctx.near().dht() : cctx.dht(); - cache = dht != null ? dht : cctx.cache(); - objCtx = cctx.cacheObjectContext(); - cacheName = cctx.name(); - - needAdvance = true; - expiryPlc = this.cctx.cache().expiryPolicy(null); - - startTime = U.currentTimeMillis(); - pageSize = qry.pageSize(); - } - - /** {@inheritDoc} */ - @Override protected Object onNext() { - if (needAdvance) - advance(); - else - needAdvance = true; - - if (next == null) - throw new NoSuchElementException(); - - return next; - } - - /** {@inheritDoc} */ - @Override protected boolean onHasNext() { - if (needAdvance) { - advance(); - - needAdvance = false; - } - - return next != null; - } - - /** {@inheritDoc} */ - @Override protected void onClose() { - if (expiryPlc != null && dht != null) { - dht.sendTtlUpdateRequest(expiryPlc); - - expiryPlc = null; - } - - if (locPart != null) - locPart.release(); - - if (intScanFilter != null) - intScanFilter.close(); - - if (locIters != null) - locIters.remove(this); - } - - /** - * Moves the iterator to the next cache entry. - */ - private void advance() { - long start = statsEnabled ? System.nanoTime() : 0L; - - Object next0 = null; - - while (it.hasNext()) { - CacheDataRow row = it.next(); - - KeyCacheObject key = row.key(); - CacheObject val; - - if (expiryPlc != null) { - try { - CacheDataRow tmp = row; - - while (true) { - try { - GridCacheEntryEx entry = cache.entryEx(key); - - entry.unswap(tmp); - - val = entry.peek(true, true, topVer, expiryPlc); - - entry.touch(); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - tmp = null; - } - } - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to peek value: " + e); - - val = null; - } - - if (dht != null && expiryPlc.readyToFlush(100)) - dht.sendTtlUpdateRequest(expiryPlc); - } - else - val = row.value(); - - // Filter backups for SCAN queries, if it isn't partition scan. - // Other types are filtered in indexing manager. - if (!cctx.isReplicated() && /*qry.partition()*/this.locPart == null && !incBackups && - !cctx.affinity().primaryByKey(cctx.localNode(), key, topVer)) { - if (log.isDebugEnabled()) - log.debug("Ignoring backup element [row=" + row + - ", cacheMode=" + cctx.config().getCacheMode() + ", incBackups=" + incBackups + - ", primary=" + cctx.affinity().primaryByKey(cctx.localNode(), key, topVer) + ']'); - - continue; - } - - if (log.isDebugEnabled()) { - ClusterNode primaryNode = cctx.affinity().primaryByKey(key, - cctx.affinity().affinityTopologyVersion()); - - log.debug(S.toString("Record", - "key", key, true, - "val", val, true, - "incBackups", incBackups, false, - "priNode", primaryNode != null ? U.id8(primaryNode.id()) : null, false, - "node", U.id8(cctx.localNode().id()), false)); - } - - if (val != null) { - K key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, keepBinary, false); - V val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, keepBinary, false); - - if (statsEnabled) { - CacheMetricsImpl metrics = cctx.cache().metrics0(); - - metrics.onRead(true); - - metrics.addGetTimeNanos(System.nanoTime() - start); - } - - if (intScanFilter == null || intScanFilter.apply(key0, val0)) { - if (readEvt) { - cctx.gridEvents().record(new CacheQueryReadEvent<>( - cctx.localNode(), - "Scan query entry read.", - EVT_CACHE_QUERY_OBJECT_READ, - CacheQueryType.SCAN.name(), - cacheName, - null, - null, - intScanFilter != null ? intScanFilter.scanFilter() : null, - null, - null, - subjId, - taskName, - key0, - val0, - null, - null)); - } - - if (transform != null) { - try { - next0 = transform.apply(new CacheQueryEntry<>(key0, val0)); - } - catch (Throwable e) { - throw new IgniteException(e); - } - } - else - next0 = !locNode ? new T2<>(key0, val0) : - new CacheQueryEntry<>(key0, val0); - - break; - } - } - } - - if ((this.next = next0) == null && expiryPlc != null && dht != null) { - dht.sendTtlUpdateRequest(expiryPlc); - - expiryPlc = null; - } - } - - /** */ - @Nullable public IgniteBiPredicate filter() { - return intScanFilter == null ? null : intScanFilter.scanFilter; - } - - /** */ - public AffinityTopologyVersion topVer() { - return topVer; - } - - /** */ - public GridDhtLocalPartition localPartition() { - return locPart; - } - - /** */ - public IgniteClosure transformer() { - return transform; - } - - /** */ - public long startTime() { - return startTime; - } - - /** */ - public boolean local() { - return locNode; - } - - /** */ - public boolean keepBinary() { - return keepBinary; - } - - /** */ - public UUID subjectId() { - return subjId; - } - - /** */ - public String taskName() { - return taskName; - } - - /** */ - public GridCacheContext cacheContext() { - return cctx; - } - - /** */ - public int pageSize() { - return pageSize; - } - } - - /** - * Wrap scan filter in order to catch unhandled errors. - */ - private static class InternalScanFilter implements IgniteBiPredicate { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final IgniteBiPredicate scanFilter; - - /** - * @param scanFilter User scan filter. - */ - InternalScanFilter(IgniteBiPredicate scanFilter) { - this.scanFilter = scanFilter; - } - - /** {@inheritDoc} */ - @Override public boolean apply(K k, V v) { - try { - return scanFilter == null || scanFilter.apply(k, v); - } - catch (Throwable e) { - throw new IgniteException(e); - } - } - - /** */ - void close() { - close(scanFilter); - } - - /** */ - static void close(IgniteBiPredicate scanFilter) { - if (scanFilter instanceof PlatformCacheEntryFilter) - ((PlatformCacheEntryFilter)scanFilter).onClose(); - } - - /** - * @return Wrapped scan filter. - */ - IgniteBiPredicate scanFilter() { - return scanFilter; - } - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java index 3015d335a0671..3129a991e8146 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java @@ -452,20 +452,22 @@ private static byte setDataPageScanEnabled(int flags, Boolean enabled) { Marshaller mrsh = ctx.marshaller(); + ClassLoader clsLdr = U.resolveClassLoader(ldr, ctx.gridConfig()); + if (keyValFilterBytes != null && keyValFilter == null) - keyValFilter = U.unmarshal(mrsh, keyValFilterBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + keyValFilter = U.unmarshal(mrsh, keyValFilterBytes, clsLdr); if (rdcBytes != null && rdc == null) rdc = U.unmarshal(mrsh, rdcBytes, ldr); if (transBytes != null && trans == null) - trans = U.unmarshal(mrsh, transBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + trans = U.unmarshal(mrsh, transBytes, clsLdr); if (argsBytes != null && args == null) - args = U.unmarshal(mrsh, argsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + args = U.unmarshal(mrsh, argsBytes, clsLdr); if (idxQryDescBytes != null && idxQryDesc == null) - idxQryDesc = U.unmarshal(mrsh, idxQryDescBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + idxQryDesc = U.unmarshal(mrsh, idxQryDescBytes, clsLdr); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java new file mode 100644 index 0000000000000..777b19f3eabcd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import java.util.NoSuchElementException; +import java.util.UUID; +import javax.cache.Cache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.CacheQueryReadEvent; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheMetricsImpl; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.CacheObjectUtils; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; +import org.apache.ignite.internal.processors.security.SecurityUtils; +import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.lang.GridIterator; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteClosure; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; +import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager.injectResources; +import static org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId; + +/** */ +public final class ScanQueryIterator extends GridCloseableIteratorAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final GridDhtCacheAdapter dht; + + /** */ + private final GridDhtLocalPartition locPart; + + /** */ + private final IgniteBiPredicate filter; + + /** */ + private final Runnable closeFilterClo; + + /** */ + private final boolean statsEnabled; + + /** */ + private final GridIterator it; + + /** */ + private final GridCacheAdapter cache; + + /** */ + private final AffinityTopologyVersion topVer; + + /** */ + private final boolean keepBinary; + + /** */ + private final boolean readEvt; + + /** */ + private final String cacheName; + + /** */ + private final UUID subjId; + + /** */ + private final String taskName; + + /** */ + private final IgniteClosure, R> transform; + + /** */ + private final CacheObjectContext objCtx; + + /** */ + private final GridCacheContext cctx; + + /** */ + private final IgniteLogger log; + + /** */ + private R next; + + /** */ + private boolean needAdvance; + + /** */ + private IgniteCacheExpiryPolicy expiryPlc; + + /** */ + private final boolean locNode; + + /** */ + private final boolean incBackups; + + /** */ + private final long startTime; + + /** */ + private final int pageSize; + + /** */ + @Nullable private final GridConcurrentHashSet> locIters; + + /** + * @param it Iterator. + * @param qry Query. + * @param topVer Topology version. + * @param locPart Local partition. + * @param transformer Transformer. + * @param locNode Local node flag. + * @param locIters Local iterators set. + * @param cctx Cache context. + * @param log Logger. + */ + ScanQueryIterator( + GridIterator it, + CacheQuery qry, + AffinityTopologyVersion topVer, + GridDhtLocalPartition locPart, + IgniteClosure, R> transformer, + boolean locNode, + @Nullable GridConcurrentHashSet> locIters, + GridCacheContext cctx, + IgniteLogger log) throws IgniteCheckedException { + assert !locNode || locIters != null : "Local iterators can't be null for local query."; + + this.it = it; + this.topVer = topVer; + this.locPart = locPart; + this.cctx = cctx; + + this.log = log; + this.locNode = locNode; + this.locIters = locIters; + + incBackups = qry.includeBackups(); + + statsEnabled = cctx.statisticsEnabled(); + + readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ) && + cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ); + + taskName = readEvt ? cctx.kernalContext().task().resolveTaskName(qry.taskHash()) : null; + + subjId = securitySubjectId(cctx); + + dht = cctx.isNear() ? cctx.near().dht() : cctx.dht(); + cache = dht != null ? dht : cctx.cache(); + objCtx = cctx.cacheObjectContext(); + cacheName = cctx.name(); + + needAdvance = true; + expiryPlc = this.cctx.cache().expiryPolicy(null); + + startTime = U.currentTimeMillis(); + pageSize = qry.pageSize(); + transform = SecurityUtils.sandboxedProxy(cctx.kernalContext(), IgniteClosure.class, injectResources(transformer, cctx)); + closeFilterClo = qry.scanFilter() instanceof PlatformCacheEntryFilter + ? () -> closeFilter(qry.scanFilter()) + : null; + filter = prepareFilter(qry.scanFilter()); + // keep binary for remote scans if possible + keepBinary = (!locNode && filter == null && transformer == null && !readEvt) || qry.keepBinary(); + } + + /** {@inheritDoc} */ + @Override protected R onNext() { + if (needAdvance) + advance(); + else + needAdvance = true; + + if (next == null) + throw new NoSuchElementException(); + + return next; + } + + /** {@inheritDoc} */ + @Override protected boolean onHasNext() { + if (needAdvance) { + advance(); + + needAdvance = false; + } + + return next != null; + } + + /** {@inheritDoc} */ + @Override protected void onClose() { + if (expiryPlc != null && dht != null) { + dht.sendTtlUpdateRequest(expiryPlc); + + expiryPlc = null; + } + + if (locPart != null) + locPart.release(); + + if (closeFilterClo != null) + closeFilterClo.run(); + + if (locIters != null) + locIters.remove(this); + } + + /** + * Moves the iterator to the next cache entry. + */ + private void advance() { + long start = statsEnabled ? System.nanoTime() : 0L; + + R next0 = null; + + while (it.hasNext()) { + CacheDataRow row = it.next(); + + KeyCacheObject key = row.key(); + CacheObject val; + + if (expiryPlc != null) { + try { + CacheDataRow tmp = row; + + while (true) { + try { + GridCacheEntryEx entry = cache.entryEx(key); + + entry.unswap(tmp); + + val = entry.peek(true, true, topVer, expiryPlc); + + entry.touch(); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + tmp = null; + } + } + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to peek value: " + e); + + val = null; + } + + if (dht != null && expiryPlc.readyToFlush(100)) + dht.sendTtlUpdateRequest(expiryPlc); + } + else + val = row.value(); + + // Filter backups for SCAN queries, if it isn't partition scan. + // Other types are filtered in indexing manager. + if (!cctx.isReplicated() && /*qry.partition()*/this.locPart == null && !incBackups && + !cctx.affinity().primaryByKey(cctx.localNode(), key, topVer)) { + if (log.isDebugEnabled()) + log.debug("Ignoring backup element [row=" + row + + ", cacheMode=" + cctx.config().getCacheMode() + ", incBackups=" + incBackups + + ", primary=" + cctx.affinity().primaryByKey(cctx.localNode(), key, topVer) + ']'); + + continue; + } + + if (log.isDebugEnabled()) { + ClusterNode primaryNode = cctx.affinity().primaryByKey(key, + cctx.affinity().affinityTopologyVersion()); + + log.debug(S.toString("Record", + "key", key, true, + "val", val, true, + "incBackups", incBackups, false, + "priNode", primaryNode != null ? U.id8(primaryNode.id()) : null, false, + "node", U.id8(cctx.localNode().id()), false)); + } + + if (val != null) { + K key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, keepBinary, false); + V val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, keepBinary, false); + + if (statsEnabled) { + CacheMetricsImpl metrics = cctx.cache().metrics0(); + + metrics.onRead(true); + + metrics.addGetTimeNanos(System.nanoTime() - start); + } + + boolean passFilter; + + try { + passFilter = filter == null || filter.apply(key0, val0); + } + catch (Throwable e) { + throw new IgniteException(e); + } + + if (passFilter) { + if (readEvt) { + cctx.gridEvents().record(new CacheQueryReadEvent<>( + cctx.localNode(), + "Scan query entry read.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.SCAN.name(), + cacheName, + null, + null, + filter, + null, + null, + subjId, + taskName, + key0, + val0, + null, + null)); + } + + if (transform != null) { + try { + next0 = transform.apply(new CacheQueryEntry<>(key0, val0)); + } + catch (Throwable e) { + throw new IgniteException(e); + } + } + else + next0 = (R)(!locNode ? new T2<>(key0, val0) : + new CacheQueryEntry<>(key0, val0)); + + break; + } + } + } + + if ((this.next = next0) == null && expiryPlc != null && dht != null) { + dht.sendTtlUpdateRequest(expiryPlc); + + expiryPlc = null; + } + } + + /** */ + @Nullable public IgniteBiPredicate filter() { + return filter; + } + + /** */ + public AffinityTopologyVersion topVer() { + return topVer; + } + + /** */ + public GridDhtLocalPartition localPartition() { + return locPart; + } + + /** */ + public IgniteClosure, R> transformer() { + return transform; + } + + /** */ + public long startTime() { + return startTime; + } + + /** */ + public boolean local() { + return locNode; + } + + /** */ + public boolean keepBinary() { + return keepBinary; + } + + /** */ + public UUID subjectId() { + return subjId; + } + + /** */ + public String taskName() { + return taskName; + } + + /** */ + public GridCacheContext cacheContext() { + return cctx; + } + + /** */ + public int pageSize() { + return pageSize; + } + + /** */ + private @Nullable IgniteBiPredicate prepareFilter(IgniteBiPredicate filter) throws IgniteCheckedException { + if (filter == null) + return null; + + try { + if (filter instanceof PlatformCacheEntryFilter) + ((PlatformCacheEntryFilter)filter).cacheContext(cctx); + else + injectResources(filter, cctx); + + return SecurityUtils.sandboxedProxy(cctx.kernalContext(), IgniteBiPredicate.class, filter); + } + catch (IgniteCheckedException | RuntimeException e) { + closeFilter(filter); + + throw e; + } + } + + /** */ + public static void closeFilter(IgniteBiPredicate filter) { + if (filter instanceof PlatformCacheEntryFilter) + ((PlatformCacheEntryFilter)filter).onClose(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index e0f7d1c73894f..3920585f00b6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -130,6 +130,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS; +import static org.apache.ignite.configuration.TransactionConfiguration.TX_AWARE_QUERIES_SUPPORTED_MODES; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -3594,4 +3595,15 @@ public boolean uncommitedTx(final DataEntry dataEntry) { public void clearUncommitedStates() { uncommitedTx = Collections.emptySet(); } + + /** + * Checks if transaction mode supported for transaction aware queries. + * @param isolation Transaction isolation to check. + */ + public static void ensureTransactionModeSupported(TransactionIsolation isolation) { + if (TX_AWARE_QUERIES_SUPPORTED_MODES.contains(isolation)) + return; + + throw new IllegalStateException("Transaction isolation mode not supported for SQL queries: " + isolation); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java index 935e209151074..a9eb55043cc42 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.nio.GridNioSession; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; @@ -206,7 +207,7 @@ public JdbcConnectionContext(GridKernalContext ctx, GridNioSession ses, GridSpin features = JdbcThinFeature.enumSet(cliFeatures); - if (!ctx.config().getTransactionConfiguration().isTxAwareQueriesEnabled()) + if (!U.isTxAwareQueriesEnabled(ctx)) features.remove(JdbcThinFeature.TX_AWARE_QUERIES); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java index 5314e1cfdf221..c4d4dc5986173 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler; import org.apache.ignite.internal.processors.platform.client.tx.ClientTxContext; import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.odbc.ClientListenerNioListener.THIN_CLIENT; @@ -195,7 +196,7 @@ public ClientProtocolContext currentProtocolContext() { features = ClientBitmaskFeature.enumSet(cliFeatures); - if (!ctx.config().getTransactionConfiguration().isTxAwareQueriesEnabled()) + if (!U.isTxAwareQueriesEnabled(ctx)) features.remove(ClientBitmaskFeature.TX_AWARE_QUERIES); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index fed8ec75d924f..9847d95169a26 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -349,8 +349,7 @@ public GridQueryProcessor(GridKernalContext ctx) throws IgniteCheckedException { initQueryEngines(); idxBuildStatusStorage = new IndexBuildStatusStorage(ctx); - - txAwareQueriesEnabled = ctx.config().getTransactionConfiguration().isTxAwareQueriesEnabled(); + txAwareQueriesEnabled = U.isTxAwareQueriesEnabled(ctx); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 824701999351b..28a43fee3dbb5 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -12536,4 +12536,9 @@ public static String extendToLen(String s, int targetLen) { return sb.toString(); } + + /** */ + public static boolean isTxAwareQueriesEnabled(GridKernalContext kctx) { + return kctx.config().getTransactionConfiguration().isTxAwareQueriesEnabled(); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/ScanQueryView.java b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/ScanQueryView.java index 571acd9303950..35adca4dae8ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/ScanQueryView.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/ScanQueryView.java @@ -20,7 +20,7 @@ import java.util.UUID; import org.apache.ignite.internal.managers.systemview.walker.Order; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager.ScanQueryIterator; +import org.apache.ignite.internal.processors.cache.query.ScanQueryIterator; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple;