From b5fb95a0e760a1a6706edd1afda5835f9383391f Mon Sep 17 00:00:00 2001 From: vfomin Date: Fri, 21 May 2021 17:51:53 +0300 Subject: [PATCH] Netty, connect with empty payload --- .../camel/component/netty/NettyProducer.java | 77 +++++++++---------- 1 file changed, 38 insertions(+), 39 deletions(-) diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java index bf01b73a957f1..adc8d9f074f9a 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java @@ -211,11 +211,6 @@ public boolean process(final Exchange exchange, AsyncCallback callback) { Object body; try { body = getRequestBody(exchange); - if (body == null) { - noReplyLogger.log("No payload to send for exchange: " + exchange); - callback.done(true); - return true; - } return processWithBody(exchange, body, new BodyReleaseCallback(callback, body)); } catch (Exception e) { exchange.setException(e); @@ -345,47 +340,51 @@ public void onComplete(Exchange exchange) { } // write body - NettyHelper.writeBodyAsync(LOG, channel, remoteAddress, body, exchange, new ChannelFutureListener() { - public void operationComplete(ChannelFuture channelFuture) throws Exception { - LOG.trace("Operation complete {}", channelFuture); - if (!channelFuture.isSuccess()) { - // no success then exit, (any exception has been handled by ClientChannelHandler#exceptionCaught) - return; - } + if (body != null) { + NettyHelper.writeBodyAsync(LOG, channel, remoteAddress, body, exchange, new ChannelFutureListener() { + public void operationComplete(ChannelFuture channelFuture) throws Exception { + LOG.trace("Operation complete {}", channelFuture); + if (!channelFuture.isSuccess()) { + // no success then exit, (any exception has been handled by ClientChannelHandler#exceptionCaught) + return; + } - // if we do not expect any reply then signal callback to continue routing - if (!configuration.isSync()) { - try { - // should channel be closed after complete? - Boolean close; - if (ExchangeHelper.isOutCapable(exchange)) { - close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, - Boolean.class); - } else { - close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); - } + // if we do not expect any reply then signal callback to continue routing + if (!configuration.isSync()) { + try { + // should channel be closed after complete? + Boolean close; + if (ExchangeHelper.isOutCapable(exchange)) { + close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, + Boolean.class); + } else { + close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class); + } - // should we disconnect, the header can override the configuration - boolean disconnect = getConfiguration().isDisconnect(); - if (close != null) { - disconnect = close; - } + // should we disconnect, the header can override the configuration + boolean disconnect = getConfiguration().isDisconnect(); + if (close != null) { + disconnect = close; + } - // we should not close if we are reusing the channel - if (!configuration.isReuseChannel() && disconnect) { - if (LOG.isTraceEnabled()) { - LOG.trace("Closing channel when complete at address: {}", - getEndpoint().getConfiguration().getAddress()); + // we should not close if we are reusing the channel + if (!configuration.isReuseChannel() && disconnect) { + if (LOG.isTraceEnabled()) { + LOG.trace("Closing channel when complete at address: {}", + getEndpoint().getConfiguration().getAddress()); + } + NettyHelper.close(channel); } - NettyHelper.close(channel); + } finally { + // signal callback to continue routing + producerCallback.done(false); } - } finally { - // signal callback to continue routing - producerCallback.done(false); } } - } - }); + }); + } else { + noReplyLogger.log("Connected. No payload was sent: " + exchange); + } }