Skip to content

Commit

Permalink
Fix - Support keep amqp message header's original type. (#909)
Browse files Browse the repository at this point in the history
  • Loading branch information
casuallc authored Jun 30, 2023
1 parent 5287d25 commit 78f3a96
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.qpid.server.protocol.v0_8.transport.BasicGetEmptyBody;
Expand Down Expand Up @@ -110,7 +111,7 @@ public void readEntriesComplete(List<Entry> list, Object o) {
try {
message.complete(Pair.of(index.getPosition(),
MessageConvertUtils.entryToAmqpBody(msg)));
} catch (UnsupportedEncodingException e) {
} catch (UnsupportedEncodingException | DecoderException e) {
log.error("Failed to convert entry to AMQP body", e);
}
consumer.addUnAckMessages(indexMessage.getExchangeName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
Expand All @@ -45,6 +47,7 @@
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.bytebuffer.SingleQpidByteBuffer;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;
import org.apache.qpid.server.protocol.v0_8.FieldTable;
import org.apache.qpid.server.protocol.v0_8.FieldTableFactory;
import org.apache.qpid.server.protocol.v0_8.IncomingMessage;
import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
Expand Down Expand Up @@ -85,6 +88,7 @@ public final class MessageConvertUtils {
private static final String PROP_IMMEDIATE = BASIC_PUBLISH_INFO_PRE + "immediate";
private static final String PROP_MANDATORY = BASIC_PUBLISH_INFO_PRE + "mandatory";
public static final String PROP_ROUTING_KEY = BASIC_PUBLISH_INFO_PRE + "routingKey";
public static final String PROP_CUSTOM_PROPERTIES = BASIC_PROP_HEADER_PRE + "custom_properties";

private static final Clock clock = Clock.systemDefaultZone();

Expand Down Expand Up @@ -132,10 +136,8 @@ public static MessageImpl<byte[]> toPulsarMessage(IncomingMessage incomingMessag
setProp(builder, PROP_CLUSTER_ID, props.getClusterIdAsString());
setProp(builder, PROP_PROPERTY_FLAGS, props.getPropertyFlags());

Map<String, Object> headers = props.getHeadersAsMap();
for (Map.Entry<String, Object> entry : headers.entrySet()) {
setProp(builder, BASIC_PROP_HEADER_PRE + entry.getKey(), entry.getValue());
}
byte[] headers = FieldTableFactory.createFieldTable(props.getHeadersAsMap()).getDataAsBytes();
builder.property(PROP_CUSTOM_PROPERTIES, Hex.encodeHexString(headers));
}

setProp(builder, PROP_EXCHANGE, incomingMessage.getMessagePublishInfo().getExchange());
Expand Down Expand Up @@ -188,10 +190,10 @@ public static ByteBuf messageToByteBuf(Message<byte[]> message) {
}

public static Pair<BasicContentHeaderProperties, MessagePublishInfo> getPropertiesFromMetadata(
List<KeyValue> propertiesList) throws UnsupportedEncodingException {
List<KeyValue> propertiesList) throws UnsupportedEncodingException, DecoderException {
BasicContentHeaderProperties props = new BasicContentHeaderProperties();
Map<String, Object> headers = new HashMap<>();
MessagePublishInfo messagePublishInfo = new MessagePublishInfo();
FieldTable headers = null;

for (KeyValue keyValue : propertiesList) {
switch (keyValue.getKey()) {
Expand Down Expand Up @@ -249,14 +251,28 @@ public static Pair<BasicContentHeaderProperties, MessagePublishInfo> getProperti
case PROP_ROUTING_KEY:
messagePublishInfo.setRoutingKey(AMQShortString.createAMQShortString(keyValue.getValue()));
break;
case PROP_CUSTOM_PROPERTIES:
headers = setOriginalProperties(keyValue);
break;
default:
headers.put(keyValue.getKey().substring(BASIC_PROP_HEADER_PRE.length()), keyValue.getValue());
log.warn("unknown property: {}, value: {}", keyValue.getKey(), keyValue.getValue());
break;
}
}
props.setHeaders(FieldTableFactory.createFieldTable(headers));
props.setHeaders(headers);
return Pair.of(props, messagePublishInfo);
}

private static FieldTable setOriginalProperties(KeyValue keyValue) throws DecoderException {
if (keyValue.getValue() == null) {
return null;
}
byte[] bytes = Hex.decodeHex(keyValue.getValue());
try (QpidByteBuffer buffer = QpidByteBuffer.wrap(bytes)) {
return FieldTableFactory.createFieldTable(buffer);
}
}

public static Pair<BasicContentHeaderProperties, MessagePublishInfo> getPropertiesFromMetadata(
Map<String, String> messageProperties) throws UnsupportedEncodingException {
BasicContentHeaderProperties props = new BasicContentHeaderProperties();
Expand Down Expand Up @@ -328,7 +344,7 @@ public static Pair<BasicContentHeaderProperties, MessagePublishInfo> getProperti
}

public static List<AmqpMessageData> entriesToAmqpBodyList(List<Entry> entries)
throws UnsupportedEncodingException {
throws UnsupportedEncodingException, DecoderException {
ImmutableList.Builder<AmqpMessageData> builder = ImmutableList.builder();
// TODO convert bk entries to amqpbody,
// then assemble deliver body with ContentHeaderBody and ContentBody
Expand Down Expand Up @@ -369,7 +385,7 @@ public static List<AmqpMessageData> entriesToAmqpBodyList(List<Entry> entries)
}

public static AmqpMessageData entryToAmqpBody(Entry entry)
throws UnsupportedEncodingException {
throws UnsupportedEncodingException, DecoderException {
AmqpMessageData amqpMessage = null;
// TODO convert bk entries to amqpbody,
// then assemble deliver body with ContentHeaderBody and ContentBody
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.commons.codec.DecoderException;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.bytebuffer.SingleQpidByteBuffer;
Expand All @@ -46,7 +47,7 @@
public class MessageConvertTest {

@Test
private void test() throws UnsupportedEncodingException {
private void test() throws UnsupportedEncodingException, DecoderException {
String exchange = "testExchange";
boolean immediate = true;
boolean mandatory = false;
Expand Down

0 comments on commit 78f3a96

Please sign in to comment.