Skip to content

Commit

Permalink
IGNITE-20746 Cache objects transformation never happen on TcpIgniteCl…
Browse files Browse the repository at this point in the history
…ient.putAllConflict()
  • Loading branch information
anton-vinogradov authored Oct 30, 2023
1 parent 7c11d93 commit 8c8c919
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.commons.io.FileUtils;
import org.apache.ignite.DataRegionMetrics;
Expand All @@ -34,8 +36,11 @@
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.client.thin.TcpClientCache;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.metric.LongMetric;
Expand Down Expand Up @@ -285,7 +290,7 @@ private Consumption doTest(int cnt, Function<Integer, Object> keyGen, Function<I

Ignite prim = primaryNode(0, CACHE_NAME);

if (mode == ConsumptionTestMode.THIN_CLIENT) {
if (mode == ConsumptionTestMode.THIN_CLIENT || mode == ConsumptionTestMode.THIN_CLIENT_INTERNAL_API) {
String host = prim.configuration().getLocalHost();
int port = prim.configuration().getClientConnectorConfiguration().getPort();

Expand All @@ -296,7 +301,19 @@ private Consumption doTest(int cnt, Function<Integer, Object> keyGen, Function<I
Object key = keyGen.apply(i);
Object val = valGen.apply(i);

cache.put(key, val);
if (mode == ConsumptionTestMode.THIN_CLIENT)
cache.put(key, val);
else {
assert mode == ConsumptionTestMode.THIN_CLIENT_INTERNAL_API;

Map<Object, T3<Object, GridCacheVersion, Long>> data = new HashMap<>();

GridCacheVersion otherVer = new GridCacheVersion(1, 1, 1, 0);

data.put(key, new T3<>(val, otherVer, 0L));

((TcpClientCache)cache).putAllConflict(data);
}

assertEqualsArraysAware(cache.get(key), val);
}
Expand Down Expand Up @@ -335,7 +352,7 @@ private Consumption doTest(int cnt, Function<Integer, Object> keyGen, Function<I
clNet += reg.<LongMetric>findMetric(SENT_BYTES_METRIC_NAME).value();
clNet += reg.<LongMetric>findMetric(RECEIVED_BYTES_METRIC_NAME).value();

if (mode != ConsumptionTestMode.THIN_CLIENT)
if (mode != ConsumptionTestMode.THIN_CLIENT && mode != ConsumptionTestMode.THIN_CLIENT_INTERNAL_API)
assertEquals(0, clNet);

net += clNet;
Expand Down Expand Up @@ -406,6 +423,9 @@ private enum ConsumptionTestMode {
/** Thin client. */
THIN_CLIENT,

/** Thin client uses internal API. */
THIN_CLIENT_INTERNAL_API,

/** Node + Persistent. */
PERSISTENT
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,10 +627,10 @@ public ClientListenerRequest decode(BinaryReaderExImpl reader) {
return new ClientServiceGetDescriptorRequest(reader);

case OP_DATA_STREAMER_START:
return new ClientDataStreamerStartRequest(reader);
return new ClientDataStreamerStartRequest(reader, ctx);

case OP_DATA_STREAMER_ADD_DATA:
return new ClientDataStreamerAddDataRequest(reader);
return new ClientDataStreamerAddDataRequest(reader, ctx);

case OP_ATOMIC_LONG_CREATE:
return new ClientAtomicLongCreateRequest(reader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public ClientCachePutAllConflictRequest(BinaryReaderExImpl reader, ClientConnect
map = new LinkedHashMap<>(cnt);

for (int i = 0; i < cnt; i++) {
KeyCacheObject key = readCacheObject(reader, true);
CacheObject val = readCacheObject(reader, false);
KeyCacheObject key = readCacheObject(reader, true, ctx);
CacheObject val = readCacheObject(reader, false, ctx);
GridCacheVersion ver = (GridCacheVersion)reader.readObjectDetached();
long expireTime = reader.readLong();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public ClientCacheRemoveAllConflictRequest(BinaryReaderExImpl reader) {
map = new LinkedHashMap<>(cnt);

for (int i = 0; i < cnt; i++) {
KeyCacheObject key = readCacheObject(reader, true);
KeyCacheObject key = readCacheObject(reader, true, null);
GridCacheVersion ver = (GridCacheVersion)reader.readObjectDetached();

map.put(key, ver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.platform.client.streamer;

import java.util.Collection;

import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
Expand Down Expand Up @@ -48,12 +47,12 @@ public class ClientDataStreamerAddDataRequest extends ClientDataStreamerRequest
*
* @param reader Data reader.
*/
public ClientDataStreamerAddDataRequest(BinaryReaderExImpl reader) {
public ClientDataStreamerAddDataRequest(BinaryReaderExImpl reader, ClientConnectionContext ctx) {
super(reader);

streamerId = reader.readLong();
flags = reader.readByte();
entries = ClientDataStreamerReader.read(reader);
entries = ClientDataStreamerReader.read(reader, ctx);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;

import static org.apache.ignite.internal.processors.platform.utils.PlatformUtils.readCacheObject;

Expand All @@ -34,7 +35,7 @@ class ClientDataStreamerReader {
* @param reader Data reader.
* @return Streamer entry.
*/
public static Collection<DataStreamerEntry> read(BinaryReaderExImpl reader) {
public static Collection<DataStreamerEntry> read(BinaryReaderExImpl reader, ClientConnectionContext ctx) {
int entriesCnt = reader.readInt();

if (entriesCnt == 0)
Expand All @@ -43,8 +44,8 @@ public static Collection<DataStreamerEntry> read(BinaryReaderExImpl reader) {
Collection<DataStreamerEntry> entries = new ArrayList<>(entriesCnt);

for (int i = 0; i < entriesCnt; i++) {
entries.add(new DataStreamerEntry(readCacheObject(reader, true),
readCacheObject(reader, false)));
entries.add(new DataStreamerEntry(readCacheObject(reader, true, ctx),
readCacheObject(reader, false, ctx)));
}

return entries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.platform.client.streamer;

import java.util.Collection;

import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.internal.GridKernalContext;
Expand Down Expand Up @@ -73,7 +72,7 @@ public class ClientDataStreamerStartRequest extends ClientDataStreamerRequest {
*
* @param reader Data reader.
*/
public ClientDataStreamerStartRequest(BinaryReaderExImpl reader) {
public ClientDataStreamerStartRequest(BinaryReaderExImpl reader, ClientConnectionContext ctx) {
super(reader);

cacheId = reader.readInt();
Expand All @@ -82,7 +81,7 @@ public ClientDataStreamerStartRequest(BinaryReaderExImpl reader) {
perThreadBufferSize = reader.readInt();
receiverObj = reader.readObjectDetached();
receiverPlatform = receiverObj == null ? 0 : reader.readByte();
entries = ClientDataStreamerReader.read(reader);
entries = ClientDataStreamerReader.read(reader, ctx);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.ignite.internal.processors.platform.PlatformExtendedException;
import org.apache.ignite.internal.processors.platform.PlatformNativeException;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetServiceImpl;
import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
Expand Down Expand Up @@ -1373,8 +1374,9 @@ else if (svcCfg instanceof ServiceConfiguration && svcCfg.getService() instanceo
*
* @param reader Reader.
* @param isKey {@code True} if object is a key.
* @param ctx Client connection context.
*/
public static <T extends CacheObject> T readCacheObject(BinaryReaderExImpl reader, boolean isKey) {
public static <T extends CacheObject> T readCacheObject(BinaryReaderExImpl reader, boolean isKey, ClientConnectionContext ctx) {
BinaryInputStream in = reader.in();

int pos0 = in.position();
Expand All @@ -1393,7 +1395,9 @@ public static <T extends CacheObject> T readCacheObject(BinaryReaderExImpl reade

byte[] objBytes = in.readByteArray(pos1 - pos0);

return isKey ? (T)new KeyCacheObjectImpl(obj, objBytes, -1) : (T)new CacheObjectImpl(obj, objBytes);
return isKey ?
(T)new KeyCacheObjectImpl(obj, objBytes, -1) :
(T)new CacheObjectImpl(obj, ctx.kernalContext().transformer() == null ? objBytes : null);
}

/**
Expand Down

0 comments on commit 8c8c919

Please sign in to comment.