From cd4b7ff7d2d10c930cb2c14cef0bf1e6d282c49a Mon Sep 17 00:00:00 2001 From: He Wang Date: Mon, 16 Oct 2023 19:08:40 +0800 Subject: [PATCH] [Feature][Connector-V2][Jdbc] Add OceanBase catalog (#5439) * feat: add oceanbase jdbc catalog * remove oceanbase-client and disable it cases * update EXCLUDE_SCHEMAS and skip database check in OceanBase Oracle mode --- .../jdbc/catalog/JdbcCatalogOptions.java | 8 + .../oceanbase/OceanBaseCatalogFactory.java | 83 ++++++++ .../oceanbase/OceanBaseMySqlCatalog.java | 39 ++++ .../oceanbase/OceanBaseOracleCatalog.java | 99 ++++++++++ .../jdbc/catalog/oracle/OracleCatalog.java | 2 +- .../seatunnel/jdbc/AbstractJdbcIT.java | 3 +- .../seatunnel/jdbc/JdbcOceanBaseITBase.java | 73 +------ .../seatunnel/jdbc/JdbcOceanBaseMysqlIT.java | 104 +++++++--- .../seatunnel/jdbc/JdbcOceanBaseOracleIT.java | 179 ++++++++++++++---- .../jdbc_oceanbase_mysql_source_and_sink.conf | 4 +- ...jdbc_oceanbase_oracle_source_and_sink.conf | 12 +- 11 files changed, 466 insertions(+), 140 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java index 712eefacb84..fc58a45c28b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java @@ -50,6 +50,14 @@ public interface JdbcCatalogOptions { .withDescription( "for databases that support the schema parameter, give it priority."); + Option COMPATIBLE_MODE = + Options.key("compatibleMode") + .stringType() + .noDefaultValue() + .withDescription( + "The compatible mode of database, required when the database supports multiple compatible modes. " + + "For example, when using OceanBase database, you need to set it to 'mysql' or 'oracle'."); + OptionRule.Builder BASE_RULE = OptionRule.builder().required(BASE_URL).required(USERNAME, PASSWORD).optional(SCHEMA); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java new file mode 100644 index 00000000000..aa8b016d087 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase; + +import org.apache.seatunnel.shade.com.google.common.base.Preconditions; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.configuration.util.OptionValidationException; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.factory.CatalogFactory; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions; + +import org.apache.commons.lang3.StringUtils; + +import com.google.auto.service.AutoService; + +import java.util.Optional; + +@AutoService(Factory.class) +public class OceanBaseCatalogFactory implements CatalogFactory { + + public static final String IDENTIFIER = "OceanBase"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Catalog createCatalog(String catalogName, ReadonlyConfig options) { + String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL); + Preconditions.checkArgument( + StringUtils.isNoneBlank(urlWithDatabase), + "Miss config ! Please check your config."); + JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase); + Optional defaultDatabase = urlInfo.getDefaultDatabase(); + if (!defaultDatabase.isPresent()) { + throw new OptionValidationException(JdbcCatalogOptions.BASE_URL); + } + + String compatibleMode = options.get(JdbcCatalogOptions.COMPATIBLE_MODE); + Preconditions.checkArgument( + StringUtils.isNoneBlank(compatibleMode), + "Miss config ! Please check your config."); + + if ("oracle".equalsIgnoreCase(compatibleMode.trim())) { + return new OceanBaseOracleCatalog( + catalogName, + options.get(JdbcCatalogOptions.USERNAME), + options.get(JdbcCatalogOptions.PASSWORD), + urlInfo, + options.get(JdbcCatalogOptions.SCHEMA)); + } + return new OceanBaseMySqlCatalog( + catalogName, + options.get(JdbcCatalogOptions.USERNAME), + options.get(JdbcCatalogOptions.PASSWORD), + urlInfo); + } + + @Override + public OptionRule optionRule() { + return JdbcCatalogOptions.BASE_RULE.required(JdbcCatalogOptions.COMPATIBLE_MODE).build(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java new file mode 100644 index 00000000000..58cdb5c4131 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase; + +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog; + +public class OceanBaseMySqlCatalog extends MySqlCatalog { + + static { + SYS_DATABASES.clear(); + SYS_DATABASES.add("information_schema"); + SYS_DATABASES.add("mysql"); + SYS_DATABASES.add("oceanbase"); + SYS_DATABASES.add("LBACSYS"); + SYS_DATABASES.add("ORAAUDITOR"); + SYS_DATABASES.add("SYS"); + } + + public OceanBaseMySqlCatalog( + String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) { + super(catalogName, username, pwd, urlInfo); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java new file mode 100644 index 00000000000..b4ece7db9c2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalog; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull; + +public class OceanBaseOracleCatalog extends OracleCatalog { + + static { + EXCLUDED_SCHEMAS = + Collections.unmodifiableList( + Arrays.asList("oceanbase", "LBACSYS", "ORAAUDITOR", "SYS")); + } + + public OceanBaseOracleCatalog( + String catalogName, + String username, + String pwd, + JdbcUrlUtil.UrlInfo urlInfo, + String defaultSchema) { + super(catalogName, username, pwd, urlInfo, defaultSchema); + } + + @Override + protected String getListDatabaseSql() { + throw new UnsupportedOperationException(); + } + + @Override + public List listTables(String databaseName) + throws CatalogException, DatabaseNotExistException { + String dbUrl = getUrlFromDatabaseName(databaseName); + try { + return queryString(dbUrl, getListTableSql(databaseName), this::getTableName); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", catalogName), e); + } + } + + @Override + public boolean tableExists(TablePath tablePath) throws CatalogException { + try { + return listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath)); + } catch (DatabaseNotExistException e) { + return false; + } + } + + @Override + public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + checkNotNull(tablePath, "Table path cannot be null"); + + if (defaultSchema.isPresent()) { + tablePath = + new TablePath( + tablePath.getDatabaseName(), + defaultSchema.get(), + tablePath.getTableName()); + } + + if (tableExists(tablePath)) { + if (ignoreIfExists) { + return; + } + throw new TableAlreadyExistException(catalogName, tablePath); + } + + createTableInternal(tablePath, table); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java index 570f3874113..f72cf2a75dc 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java @@ -54,7 +54,7 @@ public class OracleCatalog extends AbstractJdbcCatalog { private static final OracleDataTypeConvertor DATA_TYPE_CONVERTOR = new OracleDataTypeConvertor(); - private static final List EXCLUDED_SCHEMAS = + protected static List EXCLUDED_SCHEMAS = Collections.unmodifiableList( Arrays.asList( "APPQOSSYS", diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java index 2738f5ccc36..3798c64c177 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java @@ -229,10 +229,11 @@ public String buildTableInfoWithSchema(String schema, String table) { @Override public void startUp() { dbServer = initContainer().withImagePullPolicy(PullPolicy.alwaysPull()); - jdbcCase = getJdbcCase(); Startables.deepStart(Stream.of(dbServer)).join(); + jdbcCase = getJdbcCase(); + given().ignoreExceptions() .await() .atMost(360, TimeUnit.SECONDS) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java index b8202e697a1..50177ef1a86 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java @@ -17,11 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; - -import org.apache.commons.lang3.tuple.Pair; - import org.junit.jupiter.api.Assertions; import org.testcontainers.shaded.org.apache.commons.io.IOUtils; @@ -29,29 +24,18 @@ import java.nio.charset.StandardCharsets; import java.sql.ResultSet; import java.sql.Statement; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; public abstract class JdbcOceanBaseITBase extends AbstractJdbcIT { - private static final String OCEANBASE_DATABASE = "seatunnel"; - private static final String OCEANBASE_SOURCE = "source"; - private static final String OCEANBASE_SINK = "sink"; - - private static final String OCEANBASE_JDBC_TEMPLATE = "jdbc:oceanbase://" + HOST + ":%s"; - private static final String OCEANBASE_DRIVER_CLASS = "com.oceanbase.jdbc.Driver"; - - abstract String imageName(); + protected static final String OCEANBASE_SOURCE = "source"; + protected static final String OCEANBASE_SINK = "sink"; - abstract String host(); + protected static final String OCEANBASE_CATALOG_TABLE = "catalog_table"; - abstract int port(); - - abstract String username(); - - abstract String password(); + protected static final String OCEANBASE_JDBC_TEMPLATE = "jdbc:oceanbase://" + HOST + ":%s/%s"; + protected static final String OCEANBASE_DRIVER_CLASS = "com.oceanbase.jdbc.Driver"; abstract List configFile(); @@ -59,44 +43,14 @@ public abstract class JdbcOceanBaseITBase extends AbstractJdbcIT { abstract String[] getFieldNames(); - @Override - JdbcCase getJdbcCase() { - Map containerEnv = new HashMap<>(); - String jdbcUrl = String.format(OCEANBASE_JDBC_TEMPLATE, port()); - Pair> testDataSet = initTestData(); - String[] fieldNames = testDataSet.getKey(); - - String insertSql = insertTable(OCEANBASE_DATABASE, OCEANBASE_SOURCE, fieldNames); - - return JdbcCase.builder() - .dockerImage(imageName()) - .networkAliases(host()) - .containerEnv(containerEnv) - .driverClass(OCEANBASE_DRIVER_CLASS) - .host(HOST) - .port(port()) - .localPort(port()) - .jdbcTemplate(OCEANBASE_JDBC_TEMPLATE) - .jdbcUrl(jdbcUrl) - .userName(username()) - .password(password()) - .database(OCEANBASE_DATABASE) - .sourceTable(OCEANBASE_SOURCE) - .sinkTable(OCEANBASE_SINK) - .createSql(createSqlTemplate()) - .configFile(configFile()) - .insertSql(insertSql) - .testData(testDataSet) - .build(); - } + abstract String getFullTableName(String tableName); @Override void compareResult() { String sourceSql = - String.format( - "select * from %s.%s order by 1", OCEANBASE_DATABASE, OCEANBASE_SOURCE); + String.format("select * from %s order by 1", getFullTableName(OCEANBASE_SOURCE)); String sinkSql = - String.format("select * from %s.%s order by 1", OCEANBASE_DATABASE, OCEANBASE_SINK); + String.format("select * from %s order by 1", getFullTableName(OCEANBASE_SINK)); try { Statement sourceStatement = connection.createStatement(); Statement sinkStatement = connection.createStatement(); @@ -133,15 +87,4 @@ void compareResult() { String driverUrl() { return "https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar"; } - - @Override - protected void createSchemaIfNeeded() { - String sql = "CREATE DATABASE IF NOT EXISTS " + OCEANBASE_DATABASE; - try { - connection.prepareStatement(sql).executeUpdate(); - } catch (Exception e) { - throw new SeaTunnelRuntimeException( - JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql " + sql, e); - } - } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java index 548fecaee66..b4e0ff1b48f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java @@ -18,6 +18,9 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase.OceanBaseMySqlCatalog; import org.apache.commons.lang3.tuple.Pair; @@ -36,39 +39,70 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; @Disabled("Disabled due to insufficient hardware resources in the CI environment") public class JdbcOceanBaseMysqlIT extends JdbcOceanBaseITBase { - @Override - String imageName() { - return "oceanbase/oceanbase-ce:4.0.0.0"; - } + private static final String IMAGE = "oceanbase/oceanbase-ce:4.1.0.0"; - @Override - String host() { - return "e2e_oceanbase_mysql"; - } + private static final String HOSTNAME = "e2e_oceanbase_mysql"; + private static final int PORT = 2881; + private static final String USERNAME = "root@test"; + private static final String PASSWORD = ""; + private static final String OCEANBASE_DATABASE = "seatunnel"; + private static final String OCEANBASE_CATALOG_DATABASE = "seatunnel_catalog"; @Override - int port() { - return 2881; + List configFile() { + return Lists.newArrayList("/jdbc_oceanbase_mysql_source_and_sink.conf"); } @Override - String username() { - return "root"; - } + JdbcCase getJdbcCase() { + Map containerEnv = new HashMap<>(); + String jdbcUrl = + String.format(OCEANBASE_JDBC_TEMPLATE, dbServer.getMappedPort(PORT), "test"); + Pair> testDataSet = initTestData(); + String[] fieldNames = testDataSet.getKey(); - @Override - String password() { - return ""; + String insertSql = insertTable(OCEANBASE_DATABASE, OCEANBASE_SOURCE, fieldNames); + + return JdbcCase.builder() + .dockerImage(IMAGE) + .networkAliases(HOSTNAME) + .containerEnv(containerEnv) + .driverClass(OCEANBASE_DRIVER_CLASS) + .host(HOST) + .port(PORT) + .localPort(dbServer.getMappedPort(PORT)) + .jdbcTemplate(OCEANBASE_JDBC_TEMPLATE) + .jdbcUrl(jdbcUrl) + .userName(USERNAME) + .password(PASSWORD) + .database(OCEANBASE_DATABASE) + .sourceTable(OCEANBASE_SOURCE) + .sinkTable(OCEANBASE_SINK) + .catalogDatabase(OCEANBASE_CATALOG_DATABASE) + .catalogTable(OCEANBASE_CATALOG_TABLE) + .createSql(createSqlTemplate()) + .configFile(configFile()) + .insertSql(insertSql) + .testData(testDataSet) + .build(); } @Override - List configFile() { - return Lists.newArrayList("/jdbc_oceanbase_mysql_source_and_sink.conf"); + protected void createSchemaIfNeeded() { + String sql = "CREATE DATABASE IF NOT EXISTS " + OCEANBASE_DATABASE; + try { + connection.prepareStatement(sql).executeUpdate(); + } catch (Exception e) { + throw new SeaTunnelRuntimeException( + JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql " + sql, e); + } } @Override @@ -239,18 +273,30 @@ Pair> initTestData() { } @Override - GenericContainer initContainer() { - GenericContainer container = - new GenericContainer<>(imageName()) - .withNetwork(NETWORK) - .withNetworkAliases(host()) - .waitingFor(Wait.forLogMessage(".*boot success!.*", 1)) - .withStartupTimeout(Duration.ofMinutes(5)) - .withLogConsumer( - new Slf4jLogConsumer(DockerLoggerFactory.getLogger(imageName()))); + String getFullTableName(String tableName) { + return buildTableInfoWithSchema(OCEANBASE_DATABASE, tableName); + } - container.setPortBindings(Lists.newArrayList(String.format("%s:%s", port(), port()))); + @Override + GenericContainer initContainer() { + return new GenericContainer<>(IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(HOSTNAME) + .withExposedPorts(PORT) + .waitingFor(Wait.forLogMessage(".*boot success!.*", 1)) + .withStartupTimeout(Duration.ofMinutes(3)) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE))); + } - return container; + @Override + protected void initCatalog() { + catalog = + new OceanBaseMySqlCatalog( + "oceanbase", + USERNAME, + PASSWORD, + JdbcUrlUtil.getUrlInfo( + jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost()))); + catalog.open(); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java index 4c3cca5ddc1..f28c26b35f9 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java @@ -17,76 +17,139 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase.OceanBaseOracleCatalog; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; import org.testcontainers.containers.GenericContainer; import com.google.common.collect.Lists; import java.math.BigDecimal; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; import java.sql.Date; +import java.sql.Driver; +import java.sql.SQLException; import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; -import java.util.concurrent.TimeUnit; - -import static org.awaitility.Awaitility.given; +import java.util.Map; +import java.util.Properties; @Disabled("Oracle mode of OceanBase Enterprise Edition does not provide docker environment") public class JdbcOceanBaseOracleIT extends JdbcOceanBaseITBase { - @Override - String imageName() { - return null; - } + private static final String HOSTNAME = "e2e_oceanbase_oracle"; + private static final int PORT = 2883; + private static final String USERNAME = "TESTUSER@test"; + private static final String PASSWORD = ""; + private static final String SCHEMA = "TESTUSER"; @Override - String host() { - return "e2e_oceanbase_oracle"; + List configFile() { + return Lists.newArrayList("/jdbc_oceanbase_oracle_source_and_sink.conf"); } @Override - int port() { - return 2883; + GenericContainer initContainer() { + throw new UnsupportedOperationException(); } + @BeforeAll @Override - String username() { - return "root"; - } + public void startUp() { + jdbcCase = getJdbcCase(); - @Override - String password() { - return ""; + try { + initConnection(); + } catch (Exception e) { + throw new RuntimeException("Failed to initial jdbc connection", e); + } + + createNeededTables(); + insertTestData(); + initCatalog(); } @Override - List configFile() { - return Lists.newArrayList("/jdbc_oceanbase_oracle_source_and_sink.conf"); + public void tearDown() throws SQLException { + if (connection != null) { + connection + .createStatement() + .execute("DROP TABLE " + getFullTableName(OCEANBASE_SOURCE)); + connection.createStatement().execute("DROP TABLE " + getFullTableName(OCEANBASE_SINK)); + } + super.tearDown(); } @Override - GenericContainer initContainer() { - throw new UnsupportedOperationException(); + JdbcCase getJdbcCase() { + Map containerEnv = new HashMap<>(); + String jdbcUrl = String.format(OCEANBASE_JDBC_TEMPLATE, PORT, SCHEMA); + Pair> testDataSet = initTestData(); + String[] fieldNames = testDataSet.getKey(); + + String insertSql = insertTable(SCHEMA, OCEANBASE_SOURCE.toUpperCase(), fieldNames); + + return JdbcCase.builder() + .dockerImage(null) + .networkAliases(HOSTNAME) + .containerEnv(containerEnv) + .driverClass(OCEANBASE_DRIVER_CLASS) + .host(HOST) + .port(PORT) + .localPort(PORT) + .jdbcTemplate(OCEANBASE_JDBC_TEMPLATE) + .jdbcUrl(jdbcUrl) + .userName(USERNAME) + .password(PASSWORD) + .schema(SCHEMA) + .sourceTable(OCEANBASE_SOURCE.toUpperCase()) + .sinkTable(OCEANBASE_SINK.toUpperCase()) + .catalogSchema(SCHEMA) + .catalogTable(OCEANBASE_CATALOG_TABLE) + .createSql(createSqlTemplate()) + .configFile(configFile()) + .insertSql(insertSql) + .testData(testDataSet) + .build(); } - @Override - public void startUp() { - jdbcCase = getJdbcCase(); + private void initConnection() + throws SQLException, ClassNotFoundException, MalformedURLException, + InstantiationException, IllegalAccessException { + URLClassLoader urlClassLoader = + new URLClassLoader( + new URL[] {new URL(driverUrl())}, + JdbcOceanBaseOracleIT.class.getClassLoader()); + Thread.currentThread().setContextClassLoader(urlClassLoader); + Driver driver = (Driver) urlClassLoader.loadClass(jdbcCase.getDriverClass()).newInstance(); + Properties props = new Properties(); - given().ignoreExceptions() - .await() - .atMost(360, TimeUnit.SECONDS) - .untilAsserted(() -> this.initializeJdbcConnection(jdbcCase.getJdbcUrl())); + if (StringUtils.isNotBlank(jdbcCase.getUserName())) { + props.put("user", jdbcCase.getUserName()); + } - createSchemaIfNeeded(); - createNeededTables(); - insertTestData(); + if (StringUtils.isNotBlank(jdbcCase.getPassword())) { + props.put("password", jdbcCase.getPassword()); + } + + connection = driver.connect(jdbcCase.getJdbcUrl().replace(HOST, HOSTNAME), props); + connection.setAutoCommit(false); } @Override @@ -108,8 +171,7 @@ String createSqlTemplate() { + " BINARY_FLOAT_COL binary_float,\n" + " BINARY_DOUBLE_COL binary_double,\n" + " DATE_COL date,\n" - + " TIMESTAMP_WITH_3_FRAC_SEC_COL timestamp(3),\n" - + " TIMESTAMP_WITH_LOCAL_TZ timestamp with local time zone\n" + + " TIMESTAMP_WITH_3_FRAC_SEC_COL timestamp(3)\n" + ")"; } @@ -126,8 +188,7 @@ String[] getFieldNames() { "BINARY_FLOAT_COL", "BINARY_DOUBLE_COL", "DATE_COL", - "TIMESTAMP_WITH_3_FRAC_SEC_COL", - "TIMESTAMP_WITH_LOCAL_TZ" + "TIMESTAMP_WITH_3_FRAC_SEC_COL" }; } @@ -150,7 +211,6 @@ Pair> initTestData() { Float.parseFloat("22.2"), Double.parseDouble("2.2"), Date.valueOf(LocalDate.now()), - Timestamp.valueOf(LocalDateTime.now()), Timestamp.valueOf(LocalDateTime.now()) }); rows.add(row); @@ -158,4 +218,51 @@ Pair> initTestData() { return Pair.of(fieldNames, rows); } + + @Override + String getFullTableName(String tableName) { + return buildTableInfoWithSchema(SCHEMA, tableName.toUpperCase()); + } + + @Override + protected void clearTable(String database, String schema, String table) { + clearTable(schema, table); + } + + @Override + protected String buildTableInfoWithSchema(String database, String schema, String table) { + return buildTableInfoWithSchema(schema, table); + } + + @Override + protected void initCatalog() { + catalog = + new OceanBaseOracleCatalog( + "oceanbase", + USERNAME, + PASSWORD, + JdbcUrlUtil.getUrlInfo(jdbcCase.getJdbcUrl().replace(HOST, HOSTNAME)), + SCHEMA); + catalog.open(); + } + + @Test + @Override + public void testCatalog() { + TablePath sourceTablePath = + new TablePath( + jdbcCase.getDatabase(), jdbcCase.getSchema(), jdbcCase.getSourceTable()); + TablePath targetTablePath = + new TablePath( + jdbcCase.getCatalogDatabase(), + jdbcCase.getCatalogSchema(), + jdbcCase.getCatalogTable()); + + CatalogTable catalogTable = catalog.getTable(sourceTablePath); + catalog.createTable(targetTablePath, catalogTable, false); + Assertions.assertTrue(catalog.tableExists(targetTablePath)); + + catalog.dropTable(targetTablePath, false); + Assertions.assertFalse(catalog.tableExists(targetTablePath)); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf index 098d3ffae26..3025ca79343 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf @@ -31,7 +31,7 @@ source { Jdbc { driver = com.oceanbase.jdbc.Driver url = "jdbc:oceanbase://e2e_oceanbase_mysql:2881/seatunnel?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=UTC" - user = root + user = "root@test" password = "" query = "SELECT c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, c_binary, c_year, c_int_unsigned, c_integer_unsigned, c_bigint_30, c_decimal_unsigned_30, c_decimal_30 FROM source" compatible_mode = "mysql" @@ -45,7 +45,7 @@ sink { Jdbc { driver = com.oceanbase.jdbc.Driver url = "jdbc:oceanbase://e2e_oceanbase_mysql:2881/seatunnel?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=UTC" - user = root + user = "root@test" password = "" query = "insert into sink(c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);" compatible_mode = "mysql" diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf index bf2b1ccf067..226a325275e 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf @@ -29,11 +29,11 @@ env { source { jdbc{ # This is a example source plugin **only for test and demonstrate the feature source plugin** - url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/seatunnel" + url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/TESTUSER" driver = com.oceanbase.jdbc.Driver - user = "root" + user = "TESTUSER@test" password = "" - query = "SELECT VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ FROM source" + query = "SELECT VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL FROM SOURCE" compatible_mode = "oracle" } } @@ -43,11 +43,11 @@ transform { sink { jdbc{ - url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/seatunnel" + url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/TESTUSER" driver = com.oceanbase.jdbc.Driver - user = "root" + user = "TESTUSER@test" password = "" - query = "INSERT INTO sink (VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ) VALUES(?,?,?,?,?,?,?,?,?,?,?,?)" + query = "INSERT INTO SINK (VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL) VALUES(?,?,?,?,?,?,?,?,?,?,?)" compatible_mode = "oracle" } }