Skip to content
This repository has been archived by the owner on Oct 24, 2023. It is now read-only.
/ camel Public archive
forked from apache/camel

Netty, connect with empty payload #2

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,6 @@ public boolean process(final Exchange exchange, AsyncCallback callback) {
Object body;
try {
body = getRequestBody(exchange);
if (body == null) {
noReplyLogger.log("No payload to send for exchange: " + exchange);
callback.done(true);
return true;
}
return processWithBody(exchange, body, new BodyReleaseCallback(callback, body));
} catch (Exception e) {
exchange.setException(e);
Expand Down Expand Up @@ -345,47 +340,51 @@ public void onComplete(Exchange exchange) {
}

// write body
NettyHelper.writeBodyAsync(LOG, channel, remoteAddress, body, exchange, new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
LOG.trace("Operation complete {}", channelFuture);
if (!channelFuture.isSuccess()) {
// no success then exit, (any exception has been handled by ClientChannelHandler#exceptionCaught)
return;
}
if (body != null) {
NettyHelper.writeBodyAsync(LOG, channel, remoteAddress, body, exchange, new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
LOG.trace("Operation complete {}", channelFuture);
if (!channelFuture.isSuccess()) {
// no success then exit, (any exception has been handled by ClientChannelHandler#exceptionCaught)
return;
}

// if we do not expect any reply then signal callback to continue routing
if (!configuration.isSync()) {
try {
// should channel be closed after complete?
Boolean close;
if (ExchangeHelper.isOutCapable(exchange)) {
close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE,
Boolean.class);
} else {
close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
}
// if we do not expect any reply then signal callback to continue routing
if (!configuration.isSync()) {
try {
// should channel be closed after complete?
Boolean close;
if (ExchangeHelper.isOutCapable(exchange)) {
close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE,
Boolean.class);
} else {
close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
}

// should we disconnect, the header can override the configuration
boolean disconnect = getConfiguration().isDisconnect();
if (close != null) {
disconnect = close;
}
// should we disconnect, the header can override the configuration
boolean disconnect = getConfiguration().isDisconnect();
if (close != null) {
disconnect = close;
}

// we should not close if we are reusing the channel
if (!configuration.isReuseChannel() && disconnect) {
if (LOG.isTraceEnabled()) {
LOG.trace("Closing channel when complete at address: {}",
getEndpoint().getConfiguration().getAddress());
// we should not close if we are reusing the channel
if (!configuration.isReuseChannel() && disconnect) {
if (LOG.isTraceEnabled()) {
LOG.trace("Closing channel when complete at address: {}",
getEndpoint().getConfiguration().getAddress());
}
NettyHelper.close(channel);
}
NettyHelper.close(channel);
} finally {
// signal callback to continue routing
producerCallback.done(false);
}
} finally {
// signal callback to continue routing
producerCallback.done(false);
}
}
}
});
});
} else {
noReplyLogger.log("Connected. No payload was sent: " + exchange);
}

}

Expand Down