Skip to content

Commit

Permalink
Optimized jdbc connector bug (#524)
Browse files Browse the repository at this point in the history
* fixed null pointer exception #294

* Fix invalid offset submitted by sinktask #310

* optimize jdbc connector

* fixed

* remove unused import

* fixed convertToConnectFieldSchema  bug

* fixed
  • Loading branch information
sunxiaojian authored Aug 17, 2023
1 parent e29c1c1 commit 03adc7a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,15 +337,6 @@ public String convertToConnectFieldSchema(ColumnDefinition columnDefinition, Sch
case Types.DOUBLE:
builder.field(fieldName, SchemaBuilder.float64().build());
break;
case Types.DECIMAL:
scale = decimalScale(columnDefinition);
SchemaBuilder fieldBuilder = Decimal.builder(scale);
if (optional) {
fieldBuilder.optional();
}
builder.field(fieldName, fieldBuilder.build());
break;

case Types.NUMERIC:
if (numericMapping == NumericMapping.PRECISION_ONLY) {
log.debug("NUMERIC with precision: '{}' and scale: '{}'", precision, scale);
Expand Down Expand Up @@ -376,6 +367,14 @@ public String convertToConnectFieldSchema(ColumnDefinition columnDefinition, Sch
break;
}
}
case Types.DECIMAL:
scale = decimalScale(columnDefinition);
SchemaBuilder fieldBuilder = Decimal.builder(scale);
if (optional) {
fieldBuilder.optional();
}
builder.field(fieldName, fieldBuilder.build());
break;

case Types.CHAR:
case Types.VARCHAR:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.openmessaging.connector.api.data.Field;
import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.connector.api.data.Struct;
import io.openmessaging.connector.api.data.logical.Decimal;
import io.openmessaging.connector.api.errors.ConnectException;
import java.math.BigDecimal;
import java.sql.PreparedStatement;
Expand Down Expand Up @@ -216,6 +217,9 @@ protected Long extractOffsetIncrementedId(
extractedId = ((Number) incrementingColumnValue).longValue();
} else if (isLongFromString(incrementingColumnValue)) {
extractedId = Long.parseLong((String) incrementingColumnValue);
} else if (incrementingColumnSchema.getName() != null && incrementingColumnSchema.getName().equals(
Decimal.LOGICAL_NAME)) {
extractedId = extractDecimalId(incrementingColumnValue);
} else {
throw new ConnectException(
"Invalid type for incrementing column: " + incrementingColumnSchema.getFieldType());
Expand All @@ -224,6 +228,17 @@ protected Long extractOffsetIncrementedId(
return extractedId;
}

protected Long extractDecimalId(Object incrementingColumnValue) {
final BigDecimal decimal = ((BigDecimal) incrementingColumnValue);
if (decimal.compareTo(LONG_MAX_VALUE_AS_BIGDEC) > 0) {
throw new ConnectException("Decimal value for incrementing column exceeded Long.MAX_VALUE");
}
if (decimal.scale() != 0) {
throw new ConnectException("Scale of Decimal value for incrementing column must be 0");
}
return decimal.longValue();
}

protected boolean isLongFromString(Object incrementingColumnValue) {
if (!(incrementingColumnValue instanceof String)) {
return false;
Expand Down

0 comments on commit 03adc7a

Please sign in to comment.