diff --git a/connectors/rocketmq-connect-connectors-archetype/README.md b/connectors/rocketmq-connect-connectors-archetype/README.md new file mode 100644 index 00000000..e9fee6f0 --- /dev/null +++ b/connectors/rocketmq-connect-connectors-archetype/README.md @@ -0,0 +1,36 @@ +## How to Use Connnector-Archetype + +1. 进入脚手架文件夹 + + ```shell + cd rocketmq-connect-connectors-archetype/ + ``` + +2. 将脚手架安装到本地 + + ```shell + mvn -e clean install + ``` + +3. 创建connector模版工程 + + ```shell + cd connectors/ + mvn archetype:generate \ + -DarchetypeGroupId=org.apache.rocketmq \ + -DarchetypeArtifactId=rocketmq-connect-connectors-archetype \ + -DarchetypeVersion=1.0-SNAPSHOT \ + -DdatabaseName= + ``` + + 例:创建Clickhouse-Connector + + ```shell + mvn archetype:generate \ + -DarchetypeGroupId=org.apache.rocketmq \ + -DarchetypeArtifactId=rocketmq-connect-connectors-archetype \ + -DarchetypeVersion=1.0-SNAPSHOT \ + -DdatabaseName=clickhouse + ``` + +4. 如上指令将创建一个connector的框架,开发者主要关心`helper/xxxHelperClient`以及`xxxxSourceTask`,`xxxSinkTask`的实现即可,剩余配置可以按需修改。 diff --git a/connectors/rocketmq-connect-connectors-archetype/pom.xml b/connectors/rocketmq-connect-connectors-archetype/pom.xml new file mode 100644 index 00000000..6081f655 --- /dev/null +++ b/connectors/rocketmq-connect-connectors-archetype/pom.xml @@ -0,0 +1,37 @@ + + + 4.0.0 + + org.apache.rocketmq + rocketmq-connect-connectors-archetype + 1.0-SNAPSHOT + maven-archetype + + rocketmq-connect-connectors-archetype + + + + + org.apache.maven.archetype + archetype-packaging + 3.2.1 + + + + + + + maven-archetype-plugin + 3.2.1 + + + + + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + diff --git a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml new file mode 100644 index 00000000..ba974e82 --- /dev/null +++ b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/META-INF/maven/archetype-metadata.xml @@ -0,0 +1,66 @@ + + + + + + + org.apache.rocketmq + + + rocketmq-connect-${dbNameToLowerCase} + + + 1.0.0-SNAPSHOT + + + org.apache.rocketmq.connect.${dbNameToLowerCase} + + + + + + + ${databaseName.toUpperCase()} + + + + ${databaseName.toLowerCase().substring(0,1).toUpperCase()}${databaseName.toLowerCase().substring(1)} + + + + ${databaseName.toLowerCase()} + + + + + + + + .reviewboardrc + README.md + + + + + + .gitignore + TODO.md + + + + + + .gitignore + + + + src/main/java + + + src/test/java + + + + diff --git a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/README.md b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/README.md new file mode 100644 index 00000000..2cd7f28b --- /dev/null +++ b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/README.md @@ -0,0 +1,55 @@ +## FIXME: fix document + +##### ${dbNameToCamel}SourceConnector fully-qualified name + +org.apache.rocketmq.connect.${dbNameToLowerCase}.source.${dbNameToCamel}SourceConnector + +**${dbNameToLowerCase}-source-connector** start + +``` +POST http://${runtime-ip}:${runtime-port}/connectors/${dbNameToLowerCase}SourceConnector +{ + "connector.class":"org.apache.rocketmq.connect.${dbNameToLowerCase}.source.${dbNameToCamel}SourceConnector", + "${dbNameToLowerCase}host":"localhost", + "${dbNameToLowerCase}port":8123, + "database":"default", + "username":"default", + "password":"123456", + "table":"tableName", + "topic":"test${dbNameToCamel}Topic", + "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", + "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" +} +``` + +##### ${dbNameToCamel}SinkConnector fully-qualified name + +org.apache.rocketmq.connect.${dbNameToLowerCase}.sink.${dbNameToCamel}SinkConnector + +**${dbNameToLowerCase}-sink-connector** start + +``` +POST http://${runtime-ip}:${runtime-port}/connectors/${dbNameToLowerCase}SinkConnector +{ + "connector.class":"org.apache.rocketmq.connect.${dbNameToLowerCase}.sink.${dbNameToCamel}SinkConnector", + "${dbNameToLowerCase}host":"localhost", + "${dbNameToLowerCase}port":8123, + "database":"clickhouse", + "username":"default", + "password":"123456", + "connect.topicnames":"test${dbNameToCamel}Topic", + "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", + "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" +} +``` + +##### parameter configuration + +| parameter | effect | required | default | +|--------------------------|---------------------------------------------------|-------------------|---------| +| ${dbNameToLowerCase}host | The Host of the ${dbNameToCamel} server | yes | null | +| ${dbNameToLowerCase}port | The Port of the ${dbNameToCamel} server | yes | null | +| database | The database to read or write | yes | null | +| table | The source table to read | yes (source only) | null | +| topic | RocketMQ topic for source connector to write into | yes (source only) | null | +| connect.topicnames | RocketMQ topic for sink connector to read from | yes (sink only) | null | diff --git a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/pom.xml b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/pom.xml new file mode 100644 index 00000000..c948d03b --- /dev/null +++ b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/pom.xml @@ -0,0 +1,188 @@ + + + 4.0.0 + + ${groupId} + ${artifactId} + ${version} + + connect-${dbNameToLowerCase} + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + jira + https://issues.apache.org/jira/browse/RocketMQ + + + + + + org.codehaus.mojo + versions-maven-plugin + 2.3 + + + org.codehaus.mojo + clirr-maven-plugin + 2.7 + + + maven-compiler-plugin + 3.6.1 + + ${maven.compiler.source} + ${maven.compiler.target} + ${maven.compiler.source} + true + true + + + + maven-surefire-plugin + 2.19.1 + + -Xms512m -Xmx1024m + always + + **/*Test.java + + + + + maven-site-plugin + 3.6 + + en_US + UTF-8 + UTF-8 + + + + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + maven-javadoc-plugin + 2.10.4 + + UTF-8 + en_US + io.openmessaging.internal + + + + aggregate + + aggregate + + site + + + + + maven-resources-plugin + 3.0.2 + + ${project.build.sourceEncoding} + + + + org.codehaus.mojo + findbugs-maven-plugin + 3.0.4 + + + org.apache.rat + apache-rat-plugin + 0.12 + + + README.md + README-CN.md + + + + + maven-assembly-plugin + 3.0.0 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + + 8 + 8 + UTF-8 + + + + io.openmessaging + openmessaging-connector + 0.1.4 + compile + + + + + + org.lz4 + lz4-java + 1.8.0 + + + com.alibaba + fastjson + 1.2.83 + compile + + + junit + junit + RELEASE + test + + + org.slf4j + slf4j-api + 1.7.7 + + + + diff --git a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__BaseConfig.java b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__BaseConfig.java new file mode 100644 index 00000000..c13c241c --- /dev/null +++ b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__BaseConfig.java @@ -0,0 +1,131 @@ +#set( $symbol_pound = '#' ) +#set( $symbol_dollar = '$' ) +#set( $symbol_escape = '\' ) +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package}.config; + +import io.openmessaging.KeyValue; +import java.lang.reflect.Method; + +public class ${dbNameToCamel}BaseConfig { + + private String ${databaseName}Host; + + private Integer ${databaseName}Port; + + private String database; + + private String userName; + + private String passWord; + + private String topic; + + public String get${dbNameToCamel}Host() { + return ${databaseName}Host; + } + + public void set${dbNameToCamel}Host(String ${databaseName}Host) { + this.${databaseName}Host = ${databaseName}Host; + } + + public Integer get${dbNameToCamel}Port() { + return ${databaseName}Port; + } + + public void set${dbNameToCamel}Port(Integer ${databaseName}Port) { + this.${databaseName}Port = ${databaseName}Port; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getPassWord() { + return passWord; + } + + public void setPassWord(String passWord) { + this.passWord = passWord; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public void load(KeyValue props) { + properties2Object(props, this); + } + + private void properties2Object(final KeyValue p, final Object object) { + + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(3); + String key = tmp.toLowerCase(); + + String property = p.getString(key); + if (property != null) { + Class[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, arg); + } + } + } catch (Throwable ignored) { + } + } + } + } +} diff --git a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__Constants.java b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__Constants.java new file mode 100644 index 00000000..0973299d --- /dev/null +++ b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__Constants.java @@ -0,0 +1,50 @@ +#set( $symbol_pound = '#' ) +#set( $symbol_dollar = '$' ) +#set( $symbol_escape = '\' ) +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package}.config; + +public class ${dbNameToCamel}Constants { + public static final String ${dbNameToUpperCase}_HOST = "${dbNameToLowerCase}host"; + + public static final String ${dbNameToUpperCase}_PORT = "${dbNameToLowerCase}port"; + + public static final String ${dbNameToUpperCase}_DATABASE = "database"; + + public static final String ${dbNameToUpperCase}_USERNAME = "username"; + + public static final String ${dbNameToUpperCase}_PASSWORD = "password"; + + public static final String ${dbNameToUpperCase}_TABLE = "table"; + + public static final String TOPIC = "topic"; + + public static final String ${dbNameToUpperCase}_OFFSET = "OFFSET"; + + public static final String ${dbNameToUpperCase}_PARTITION = "${dbNameToUpperCase}_PARTITION"; + + public static final Integer defaultTimeoutSeconds = 30; + + public static final int MILLI_IN_A_SEC = 1000; + + public static final Integer retryCountDefault = 3; + + public static final int BATCH_SIZE = 2000; + +} diff --git a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__SinkConfig.java b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__SinkConfig.java new file mode 100644 index 00000000..e0a2d632 --- /dev/null +++ b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__SinkConfig.java @@ -0,0 +1,34 @@ +#set( $symbol_pound = '#' ) +#set( $symbol_dollar = '$' ) +#set( $symbol_escape = '\' ) +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package}.config; + +import java.util.HashSet; +import java.util.Set; + +public class ${dbNameToCamel}SinkConfig extends ${dbNameToCamel}BaseConfig { + public static final Set SINK_REQUEST_CONFIG = new HashSet() { + { + add(${dbNameToCamel}Constants.${dbNameToUpperCase}_HOST); + add(${dbNameToCamel}Constants.${dbNameToUpperCase}_PORT); + // FIXME: add config you need + } + }; +} diff --git a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__SourceConfig.java b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__SourceConfig.java new file mode 100644 index 00000000..4f02861e --- /dev/null +++ b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/config/__dbNameToCamel__SourceConfig.java @@ -0,0 +1,45 @@ +#set( $symbol_pound = '#' ) +#set( $symbol_dollar = '$' ) +#set( $symbol_escape = '\' ) +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package}.config; + +import java.util.HashSet; +import java.util.Set; + +public class ${dbNameToCamel}SourceConfig extends ${dbNameToCamel}BaseConfig { + + public static final Set REQUEST_CONFIG = new HashSet() { + { + add(${dbNameToCamel}Constants.${dbNameToUpperCase}_HOST); + add(${dbNameToCamel}Constants.${dbNameToUpperCase}_PORT); + add(${dbNameToCamel}Constants.${dbNameToUpperCase}_TABLE); + add(${dbNameToCamel}Constants.TOPIC); + } + }; + private String table; + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } +} diff --git a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/helper/__dbNameToCamel__HelperClient.java b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/helper/__dbNameToCamel__HelperClient.java new file mode 100644 index 00000000..38623c36 --- /dev/null +++ b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/helper/__dbNameToCamel__HelperClient.java @@ -0,0 +1,72 @@ +#set( $symbol_pound = '#' ) +#set( $symbol_dollar = '$' ) +#set( $symbol_escape = '\' ) +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package}.helper; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import ${package}.config.${dbNameToCamel}Constants; +import ${package}.config.${dbNameToCamel}BaseConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ${dbNameToCamel}HelperClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(${dbNameToCamel}HelperClient.class); + + private ${dbNameToCamel}BaseConfig config; + private int timeout = ${dbNameToCamel}Constants.defaultTimeoutSeconds * ${dbNameToCamel}Constants.MILLI_IN_A_SEC; + private int retry = ${dbNameToCamel}Constants.retryCountDefault; + + public ${dbNameToCamel}HelperClient(${dbNameToCamel}BaseConfig config) { + this.config = config; + initConnection(); + } + + public void initConnection() { + // FIXME: Write your code here + throw new RuntimeException("Method not implemented"); + } + + public boolean ping() { + // FIXME: Write your code here + throw new RuntimeException("Method not implemented"); + } + + public List query(long offset, int batchSize) { + // FIXME: Write your code here + throw new RuntimeException("Method not implemented"); + } + + public void batchInsert(List) { + // FIXME: Write your code here + throw new RuntimeException("Method not implemented"); + } + + public boolean stop() { + // FIXME: Write your code here + throw new RuntimeException("Method not implemented"); + } +} diff --git a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/helper/__dbNameToCamel__Record.java b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/helper/__dbNameToCamel__Record.java new file mode 100644 index 00000000..fa5c3106 --- /dev/null +++ b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/helper/__dbNameToCamel__Record.java @@ -0,0 +1,25 @@ +#set( $symbol_pound = '#' ) +#set( $symbol_dollar = '$' ) +#set( $symbol_escape = '\' ) +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package}.helper; + +public class ${dbNameToCamel}Record { + // FIXME: Write your code here +} diff --git a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/sink/__dbNameToCamel__SinkConnector.java b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/sink/__dbNameToCamel__SinkConnector.java new file mode 100644 index 00000000..a9b23a89 --- /dev/null +++ b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/sink/__dbNameToCamel__SinkConnector.java @@ -0,0 +1,60 @@ +#set( $symbol_pound = '#' ) +#set( $symbol_dollar = '$' ) +#set( $symbol_escape = '\' ) +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package}.sink; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.sink.SinkConnector; +import java.util.ArrayList; +import java.util.List; +import ${package}.config.${dbNameToCamel}SinkConfig; + +public class ${dbNameToCamel}SinkConnector extends SinkConnector { + + private KeyValue keyValue; + + @Override public List taskConfigs(int maxTasks) { + List configs = new ArrayList<>(); + for (int i = 0; i < maxTasks; i++) { + configs.add(this.keyValue); + } + return configs; + } + + @Override public Class taskClass() { + return ${dbNameToCamel}SinkTask.class; + } + + @Override public void start(KeyValue value) { + + for (String requestKey : ${dbNameToCamel}SinkConfig.SINK_REQUEST_CONFIG) { + if (!value.containsKey(requestKey)) { + throw new RuntimeException("Request config key: " + requestKey); + } + } + + this.keyValue = value; + } + + @Override public void stop() { + this.keyValue = null; + } +} diff --git a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/sink/__dbNameToCamel__SinkTask.java b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/sink/__dbNameToCamel__SinkTask.java new file mode 100644 index 00000000..a4ee54a2 --- /dev/null +++ b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/sink/__dbNameToCamel__SinkTask.java @@ -0,0 +1,69 @@ +#set( $symbol_pound = '#' ) +#set( $symbol_dollar = '$' ) +#set( $symbol_escape = '\' ) +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package}.sink; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.sink.SinkTask; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.Field; +import io.openmessaging.connector.api.data.Struct; +import io.openmessaging.connector.api.errors.ConnectException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import ${package}.helper.${dbNameToCamel}HelperClient; +import ${package}.config.${dbNameToCamel}SinkConfig; + +public class ${dbNameToCamel}SinkTask extends SinkTask { + + public ${dbNameToCamel}SinkConfig config; + + private ${dbNameToCamel}HelperClient helperClient; + + @Override public void put(List sinkRecords) throws ConnectException { + if (sinkRecords == null || sinkRecords.size() < 1) { + return; + } + for (ConnectRecord record : sinkRecords) { + String table = record.getSchema().getName(); + final List fields = record.getSchema().getFields(); + final Struct structData = (Struct) record.getData(); + + // FIXME: Write your code here + throw new RuntimeException("Method not implemented"); + } + } + + @Override public void start(KeyValue keyValue) { + this.config = new ${dbNameToCamel}SinkConfig(); + this.config.load(keyValue); + this.helperClient = new ${dbNameToCamel}HelperClient(this.config); + if (!helperClient.ping()) { + throw new RuntimeException("Cannot connect to ${dbNameToLowerCase} server!"); + } + } + + @Override public void stop() { + this.helperClient.stop(); + } +} diff --git a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/source/__dbNameToCamel__SourceConnector.java b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/source/__dbNameToCamel__SourceConnector.java new file mode 100644 index 00000000..d1a5daf5 --- /dev/null +++ b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/source/__dbNameToCamel__SourceConnector.java @@ -0,0 +1,60 @@ +#set( $symbol_pound = '#' ) +#set( $symbol_dollar = '$' ) +#set( $symbol_escape = '\' ) +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package}.source; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.source.SourceConnector; +import java.util.ArrayList; +import java.util.List; +import ${package}.config.${dbNameToCamel}SourceConfig; + +public class ${dbNameToCamel}SourceConnector extends SourceConnector { + + private KeyValue keyValue; + + @Override public List taskConfigs(int maxTasks) { + List configs = new ArrayList<>(); + for (int i = 0; i < maxTasks; i++) { + configs.add(this.keyValue); + } + return configs; + } + + @Override public Class taskClass() { + return ${dbNameToCamel}SourceTask.class; + } + + @Override public void start(KeyValue config) { + + for (String requestKey : ${dbNameToCamel}SourceConfig.REQUEST_CONFIG) { + if (!config.containsKey(requestKey)) { + throw new RuntimeException("Request config key: " + requestKey); + } + } + this.keyValue = config; + + } + + @Override public void stop() { + this.keyValue = null; + } +} diff --git a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/source/__dbNameToCamel__SourceTask.java b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/source/__dbNameToCamel__SourceTask.java new file mode 100644 index 00000000..3a0e3cb4 --- /dev/null +++ b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/main/java/source/__dbNameToCamel__SourceTask.java @@ -0,0 +1,167 @@ +#set( $symbol_pound = '#' ) +#set( $symbol_dollar = '$' ) +#set( $symbol_escape = '\' ) +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${package}.source; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.source.SourceTask; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.Field; +import io.openmessaging.connector.api.data.RecordOffset; +import io.openmessaging.connector.api.data.RecordPartition; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.SchemaBuilder; +import io.openmessaging.connector.api.data.Struct; +import io.openmessaging.internal.DefaultKeyValue; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import ${package}.helper.${dbNameToCamel}HelperClient; +import ${package}.helper.${dbNameToCamel}Record; +import ${package}.config.${dbNameToCamel}Constants; +import ${package}.config.${dbNameToCamel}SourceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ${dbNameToCamel}SourceTask extends SourceTask { + + private static final Logger log = LoggerFactory.getLogger(${dbNameToCamel}SourceTask.class); + + private ${dbNameToCamel}SourceConfig config; + + private ${dbNameToCamel}HelperClient helperClient; + + @Override public List poll() { + List res = new ArrayList<>(); + long offset = readRecordOffset(); + List<${dbNameToCamel}Record> recordList = helperClient.query(offset, ${dbNameToCamel}Constants.BATCH_SIZE); + res = recordList.stream().map(record -> ${dbNameToLowerCase}Record2ConnectRecord(record, offset)).collect(Collectors.toList()); + // FIXME: Write your code here + throw new RuntimeException("Method not implemented"); + + return res; + } + + private long readRecordOffset() { + final RecordOffset positionInfo = this.sourceTaskContext.offsetStorageReader().readOffset(buildRecordPartition(config.getTable())); + if (positionInfo == null) { + return 0; + } + Object offset = positionInfo.getOffset().get(config.getTable() + "_" + ${dbNameToCamel}Constants.${dbNameToUpperCase}_OFFSET); + return offset == null ? 0 : Long.parseLong(offset.toString()); + } + + private ConnectRecord ${dbNameToLowerCase}Record2ConnectRecord(${dbNameToCamel}Record ${dbNameToLowerCase}Record, long offset) + throws NoSuchFieldException, IllegalAccessException { + Schema schema = SchemaBuilder.struct().name(config.getTable()).build(); + final List fields = buildFields(${dbNameToLowerCase}Record); + schema.setFields(fields); + final ConnectRecord connectRecord = new ConnectRecord(buildRecordPartition(config.getTable()), + buildRecordOffset(offset), + System.currentTimeMillis(), + schema, + this.buildPayLoad(fields, schema, ${dbNameToLowerCase}Record)); + connectRecord.setExtensions(this.buildExtensions(${dbNameToLowerCase}Record)); + return connectRecord; + } + + private List buildFields( + ${dbNameToCamel}Record ${dbNameToLowerCase}Record) throws NoSuchFieldException, IllegalAccessException { + List fields = new ArrayList<>(); + + // FIXME: Write your code here + + return fields; + } + + private RecordPartition buildRecordPartition(String partitionValue) { + Map partitionMap = new HashMap<>(); + partitionMap.put(${dbNameToCamel}Constants.${dbNameToUpperCase}_PARTITION, partitionValue); + return new RecordPartition(partitionMap); + } + + private Struct buildPayLoad(List fields, Schema schema, ${dbNameToCamel}Record ${dbNameToLowerCase}Record) { + Struct payLoad = new Struct(schema); + for (int i = 0; i < fields.size(); i++) { + // FIXME: Write your code here + } + return payLoad; + } + + private KeyValue buildExtensions(${dbNameToUpperCase}Record ${dbNameToLowerCase}Record) { + KeyValue keyValue = new DefaultKeyValue(); + String topicName = config.getTopic(); + if (topicName == null || topicName.equals("")) { + String connectorName = this.sourceTaskContext.getConnectorName(); + topicName = config.getTable() + "_" + connectorName; + } + keyValue.put(${dbNameToUpperCase}Constants.TOPIC, topicName); + return keyValue; + } + + private RecordOffset buildRecordOffset(long offset) { + Map offsetMap = new HashMap<>(); + offsetMap.put(config.getTable() + "_" + ${dbNameToCamel}Constants.${dbNameToUpperCase}_OFFSET, offset); + return new RecordOffset(offsetMap); + } + + private static Schema getSchema(Class clazz) { + if (clazz.equals(Byte.class)) { + return SchemaBuilder.int8().build(); + } else if (clazz.equals(Short.class) || clazz.equals(UnsignedByte.class)) { + return SchemaBuilder.int16().build(); + } else if (clazz.equals(Integer.class) || clazz.equals(UnsignedShort.class)) { + return SchemaBuilder.int32().build(); + } else if (clazz.equals(Long.class) || clazz.equals(UnsignedInteger.class)) { + return SchemaBuilder.int64().build(); + } else if (clazz.equals(Float.class)) { + return SchemaBuilder.float32().build(); + } else if (clazz.equals(Double.class)) { + return SchemaBuilder.float64().build(); + } else if (clazz.equals(String.class)) { + return SchemaBuilder.string().build(); + } else if (clazz.equals(Date.class) || clazz.equals(LocalDateTime.class) || clazz.equals(LocalDate.class)) { + return SchemaBuilder.time().build(); + } else if (clazz.equals(Timestamp.class)) { + return SchemaBuilder.timestamp().build(); + } else if (clazz.equals(Boolean.class)) { + return SchemaBuilder.bool().build(); + } + return SchemaBuilder.string().build(); + } + + @Override public void start(KeyValue keyValue) { + this.config = new ${dbNameToCamel}SourceConfig(); + this.config.load(keyValue); + this.helperClient = new ${dbNameToCamel}HelperClient(this.config); + if (!helperClient.ping()) { + throw new RuntimeException("Cannot connect to ${dbNameToLowerCase} server!"); + } + } + + @Override public void stop() { + this.helperClient = null; + } +} diff --git a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/test/java/sink/__dbNameToCamel__SinkTaskTest.java b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/test/java/sink/__dbNameToCamel__SinkTaskTest.java new file mode 100644 index 00000000..4f6d7f59 --- /dev/null +++ b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/test/java/sink/__dbNameToCamel__SinkTaskTest.java @@ -0,0 +1,86 @@ +#set( $symbol_pound = '#' ) +#set( $symbol_dollar = '$' ) +#set( $symbol_escape = '\' ) +package ${package}.sink; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.RecordOffset; +import io.openmessaging.connector.api.data.RecordPartition; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.SchemaBuilder; +import io.openmessaging.connector.api.data.Struct; +import io.openmessaging.internal.DefaultKeyValue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + + +class ${dbNameToCamel}SinkTaskTest { +// +// private static final String host = "127.0.0.1"; +// private static final String port = "8123"; +// private static final String db = "default"; +// private static final String username = "default"; +// private static final String password = "123456"; +// +// +// +// public static void main(String[] args) { +// List records = new ArrayList<>(); +// // build schema +// Schema schema = SchemaBuilder.struct() +// .name("tableName") +// .field("c1",SchemaBuilder.string().build()) +// .field("c2", SchemaBuilder.string().build()) +// .build(); +// // build record +// String param0 = "1001"; +// Struct struct= new Struct(schema); +// struct.put("c1",param0); +// struct.put("c2",String.format("test-data-%s", param0)); +// +// Schema schema2 = SchemaBuilder.struct() +// .name("t1") +// .field("c1",SchemaBuilder.string().build()) +// .field("c2", SchemaBuilder.string().build()) +// .build(); +// // build record +// Struct struct2= new Struct(schema2); +// struct.put("c1",param0); +// struct.put("c2",String.format("test-data-%s", param0)); +// +// for (int i = 0; i < 4; i++) { +// ConnectRecord record = new ConnectRecord( +// // offset partition +// // offset partition" +// new RecordPartition(new ConcurrentHashMap<>()), +// new RecordOffset(new HashMap<>()), +// System.currentTimeMillis(), +// schema, +// struct +// ); +// records.add(record); +// +// ConnectRecord record2 = new ConnectRecord( +// // offset partition +// // offset partition" +// new RecordPartition(new ConcurrentHashMap<>()), +// new RecordOffset(new HashMap<>()), +// System.currentTimeMillis(), +// schema2, +// struct +// ); +// records.add(record2); +// +// } +// +// ${dbNameToCamel}SinkTask task = new ${dbNameToCamel}SinkTask(); +// KeyValue config = new DefaultKeyValue(); +// task.start(config); +// task.put(records); +// +// } + +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/test/java/source/__dbNameToCamel__SourceTaskTest.java b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/test/java/source/__dbNameToCamel__SourceTaskTest.java new file mode 100644 index 00000000..66ec034b --- /dev/null +++ b/connectors/rocketmq-connect-connectors-archetype/src/main/resources/archetype-resources/src/test/java/source/__dbNameToCamel__SourceTaskTest.java @@ -0,0 +1,28 @@ +#set( $symbol_pound = '#' ) +#set( $symbol_dollar = '$' ) +#set( $symbol_escape = '\' ) +package ${package}.source; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.internal.DefaultKeyValue; +import java.util.List; +import junit.framework.TestCase; +import ${package}.config.${dbNameToCamel}Constants; + +import static java.lang.Thread.sleep; + +public class ${dbNameToCamel}SourceTaskTest { + +// private static final String host = "127.0.0.1"; +// private static final String port = "8123"; +// private static final String db = "default"; +// private static final String username = "default"; +// private static final String password = "123456"; +// +// public void testPoll() { +// } +// +// public void testStart() throws InterruptedException { +// } +} \ No newline at end of file diff --git a/connectors/rocketmq-connect-connectors-archetype/src/test/resources/projects/basic/archetype.properties b/connectors/rocketmq-connect-connectors-archetype/src/test/resources/projects/basic/archetype.properties new file mode 100644 index 00000000..0d5342b4 --- /dev/null +++ b/connectors/rocketmq-connect-connectors-archetype/src/test/resources/projects/basic/archetype.properties @@ -0,0 +1,11 @@ +#Mon Aug 14 22:31:14 CST 2023 +package=it.pkg +version=0.1-SNAPSHOT +groupId=archetype.it +artifactId=basic +databaseName=test +dbNameToUpperCase=TEST +dbNameToCamel=Test +dbNameToLowerCase=test +archetype.filteredExtensions=java,sql,yml,xml,properties,factories,ftl,md + diff --git a/connectors/rocketmq-connect-connectors-archetype/src/test/resources/projects/basic/goal.txt b/connectors/rocketmq-connect-connectors-archetype/src/test/resources/projects/basic/goal.txt new file mode 100644 index 00000000..e69de29b