Skip to content

Commit

Permalink
IGNITE-22767 Remove MarshallingContext (#11442)
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov authored Aug 21, 2024
1 parent 818118e commit 12cb12c
Show file tree
Hide file tree
Showing 17 changed files with 64 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;

Expand Down Expand Up @@ -161,13 +163,13 @@ public Throwable error() {
}

/** {@inheritDoc} */
@Override public void prepareMarshal(MarshallingContext ctx) throws IgniteCheckedException {
errBytes = ctx.marshal(err);
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
errBytes = U.marshal(ctx, err);
}

/** {@inheritDoc} */
@Override public void prepareUnmarshal(MarshallingContext ctx) throws IgniteCheckedException {
@Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
if (errBytes != null)
err = ctx.unmarshal(errBytes);
err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ctx.gridConfig()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;

Expand Down Expand Up @@ -50,15 +52,16 @@ public GenericValueMessage(Object val) {
}

/** {@inheritDoc} */
@Override public void prepareMarshal(MarshallingContext ctx) throws IgniteCheckedException {
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
if (val != null && serialized == null)
serialized = ctx.marshal(val);
serialized = U.marshal(ctx, val);
}

/** {@inheritDoc} */
@Override public void prepareUnmarshal(MarshallingContext ctx) throws IgniteCheckedException {
if (serialized != null && val == null)
val = ctx.unmarshal(serialized);
@Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
if (serialized != null && val == null) {
val = U.unmarshal(ctx, serialized, U.resolveClassLoader(ctx.gridConfig()));
}
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.message;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;

/**
*
Expand All @@ -26,14 +27,14 @@ public interface MarshalableMessage extends CalciteMessage {
/**
* Prepares the message before sending.
*
* @param ctx Marshaling context.
* @param ctx Cache shared context.
*/
void prepareMarshal(MarshallingContext ctx) throws IgniteCheckedException;
void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException;

/**
* Prepares the message before processing.
*
* @param ctx Marshaling context.
* @param ctx Cache shared context.
*/
void prepareUnmarshal(MarshallingContext ctx) throws IgniteCheckedException;
void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException;
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,49 +31,47 @@
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutor;
import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.plugin.extensions.communication.Message;

/**
*
*/
public class MessageServiceImpl extends AbstractService implements MessageService, MarshallingContext {
public class MessageServiceImpl extends AbstractService implements MessageService {
/** */
private final GridMessageListener msgLsnr;

/** */
private UUID localNodeId;
private final GridCacheSharedContext<?, ?> ctx;

/** */
private GridIoManager ioManager;
private UUID localNodeId;

/** */
private ClassLoader classLoader;
private final GridIoManager ioManager;

/** */
private QueryTaskExecutor taskExecutor;

/** */
private FailureProcessor failureProcessor;

/** */
private Marshaller marsh;

/** */
private EnumMap<MessageType, MessageListener> lsnrs;

/** */
public MessageServiceImpl(GridKernalContext ctx) {
super(ctx);

this.ctx = ctx.cache().context();
this.ioManager = ctx.io();
msgLsnr = this::onMessage;
}

Expand All @@ -91,32 +89,13 @@ public UUID localNodeId() {
return localNodeId;
}

/**
* @param ioManager IO manager.
*/
public void ioManager(GridIoManager ioManager) {
this.ioManager = ioManager;
}

/**
* @return IO manager.
*/
public GridIoManager ioManager() {
return ioManager;
}

/**
* @param classLoader Class loader.
*/
public void classLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
}

/** {@inheritDoc} */
@Override public ClassLoader classLoader() {
return classLoader;
}

/**
* @param taskExecutor Task executor.
*/
Expand All @@ -131,18 +110,6 @@ public QueryTaskExecutor taskExecutor() {
return taskExecutor;
}

/**
* @param marsh Marshaller.
*/
public void marshaller(Marshaller marsh) {
this.marsh = marsh;
}

/** {@inheritDoc} */
@Override public Marshaller marshaller() {
return marsh;
}

/**
* @param failureProcessor Failure processor.
*/
Expand All @@ -160,16 +127,6 @@ public FailureProcessor failureProcessor() {
/** {@inheritDoc} */
@Override public void onStart(GridKernalContext ctx) {
localNodeId(ctx.localNodeId());
classLoader(U.resolveClassLoader(ctx.config()));
ioManager(ctx.io());

@SuppressWarnings("deprecation")
Marshaller marsh0 = ctx.config().getMarshaller();

if (marsh0 == null) // Stubbed context doesn't have a marshaller
marsh0 = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());

marshaller(marsh0);

CalciteQueryProcessor proc = Objects.requireNonNull(Commons.lookupComponent(ctx, CalciteQueryProcessor.class));

Expand Down Expand Up @@ -225,7 +182,7 @@ public FailureProcessor failureProcessor() {
protected void prepareMarshal(Message msg) throws IgniteCheckedException {
try {
if (msg instanceof MarshalableMessage)
((MarshalableMessage)msg).prepareMarshal(this);
((MarshalableMessage)msg).prepareMarshal(ctx);
}
catch (Exception e) {
failureProcessor().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
Expand All @@ -238,7 +195,7 @@ protected void prepareMarshal(Message msg) throws IgniteCheckedException {
protected void prepareUnmarshal(Message msg) throws IgniteCheckedException {
try {
if (msg instanceof MarshalableMessage)
((MarshalableMessage)msg).prepareUnmarshal(this);
((MarshalableMessage)msg).prepareUnmarshal(ctx);
}
catch (Exception e) {
failureProcessor().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
Expand Down Expand Up @@ -108,7 +109,7 @@ public List<Object> rows() {
}

/** {@inheritDoc} */
@Override public void prepareMarshal(MarshallingContext ctx) throws IgniteCheckedException {
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
if (mRows != null || rows == null)
return;

Expand All @@ -126,7 +127,7 @@ public List<Object> rows() {
}

/** {@inheritDoc} */
@Override public void prepareUnmarshal(MarshallingContext ctx) throws IgniteCheckedException {
@Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
if (rows != null || mRows == null)
return;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -167,17 +169,17 @@ public long timeout() {
}

/** {@inheritDoc} */
@Override public void prepareMarshal(MarshallingContext ctx) throws IgniteCheckedException {
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
if (paramsBytes == null && params != null)
paramsBytes = ctx.marshal(params);
paramsBytes = U.marshal(ctx, params);

fragmentDesc.prepareMarshal(ctx);
}

/** {@inheritDoc} */
@Override public void prepareUnmarshal(MarshallingContext ctx) throws IgniteCheckedException {
@Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
if (params == null && paramsBytes != null)
params = ctx.unmarshal(paramsBytes);
params = U.unmarshal(ctx, paramsBytes, U.resolveClassLoader(ctx.gridConfig()));

fragmentDesc.prepareUnmarshal(ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;

Expand Down Expand Up @@ -78,15 +80,15 @@ public Throwable error() {
}

/** {@inheritDoc} */
@Override public void prepareMarshal(MarshallingContext ctx) throws IgniteCheckedException {
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
if (error != null)
errBytes = ctx.marshal(error);
errBytes = U.marshal(ctx, error);
}

/** {@inheritDoc} */
@Override public void prepareUnmarshal(MarshallingContext ctx) throws IgniteCheckedException {
@Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
if (errBytes != null)
error = ctx.unmarshal(errBytes);
error = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ctx.gridConfig()));
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.message;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;

/** */
public interface ValueMessage extends MarshalableMessage {
Expand All @@ -27,12 +28,12 @@ public interface ValueMessage extends MarshalableMessage {
Object value();

/** {@inheritDoc} */
@Override default void prepareMarshal(MarshallingContext ctx) throws IgniteCheckedException {
@Override default void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
// No-op
}

/** {@inheritDoc} */
@Override default void prepareUnmarshal(MarshallingContext ctx) throws IgniteCheckedException {
@Override default void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
// No-op
}
}
Loading

0 comments on commit 12cb12c

Please sign in to comment.