diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/converter/DefaultColumnConverter.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/converter/DefaultColumnConverter.java index a16233fb..14f887e1 100644 --- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/converter/DefaultColumnConverter.java +++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/converter/DefaultColumnConverter.java @@ -337,15 +337,6 @@ public String convertToConnectFieldSchema(ColumnDefinition columnDefinition, Sch case Types.DOUBLE: builder.field(fieldName, SchemaBuilder.float64().build()); break; - case Types.DECIMAL: - scale = decimalScale(columnDefinition); - SchemaBuilder fieldBuilder = Decimal.builder(scale); - if (optional) { - fieldBuilder.optional(); - } - builder.field(fieldName, fieldBuilder.build()); - break; - case Types.NUMERIC: if (numericMapping == NumericMapping.PRECISION_ONLY) { log.debug("NUMERIC with precision: '{}' and scale: '{}'", precision, scale); @@ -376,6 +367,14 @@ public String convertToConnectFieldSchema(ColumnDefinition columnDefinition, Sch break; } } + case Types.DECIMAL: + scale = decimalScale(columnDefinition); + SchemaBuilder fieldBuilder = Decimal.builder(scale); + if (optional) { + fieldBuilder.optional(); + } + builder.field(fieldName, fieldBuilder.build()); + break; case Types.CHAR: case Types.VARCHAR: diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingCriteria.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingCriteria.java index 210b03ac..e6cb32c6 100644 --- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingCriteria.java +++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/TimestampIncrementingCriteria.java @@ -19,6 +19,7 @@ import io.openmessaging.connector.api.data.Field; import io.openmessaging.connector.api.data.Schema; import io.openmessaging.connector.api.data.Struct; +import io.openmessaging.connector.api.data.logical.Decimal; import io.openmessaging.connector.api.errors.ConnectException; import java.math.BigDecimal; import java.sql.PreparedStatement; @@ -216,6 +217,9 @@ protected Long extractOffsetIncrementedId( extractedId = ((Number) incrementingColumnValue).longValue(); } else if (isLongFromString(incrementingColumnValue)) { extractedId = Long.parseLong((String) incrementingColumnValue); + } else if (incrementingColumnSchema.getName() != null && incrementingColumnSchema.getName().equals( + Decimal.LOGICAL_NAME)) { + extractedId = extractDecimalId(incrementingColumnValue); } else { throw new ConnectException( "Invalid type for incrementing column: " + incrementingColumnSchema.getFieldType()); @@ -224,6 +228,17 @@ protected Long extractOffsetIncrementedId( return extractedId; } + protected Long extractDecimalId(Object incrementingColumnValue) { + final BigDecimal decimal = ((BigDecimal) incrementingColumnValue); + if (decimal.compareTo(LONG_MAX_VALUE_AS_BIGDEC) > 0) { + throw new ConnectException("Decimal value for incrementing column exceeded Long.MAX_VALUE"); + } + if (decimal.scale() != 0) { + throw new ConnectException("Scale of Decimal value for incrementing column must be 0"); + } + return decimal.longValue(); + } + protected boolean isLongFromString(Object incrementingColumnValue) { if (!(incrementingColumnValue instanceof String)) { return false;