) o;
- for (String userId : userIds) {
- if (null == Longs.tryParse(userId)) {
- throw new ConfigException(key, userId, "Could not parse to long.");
- }
- }
- }
- }
- }
-
- static final ConfigDef.Validator USERID_VALIDATOR = new UserIdValidator();
-
public static ConfigDef conf() {
return new ConfigDef()
- .define(TWITTER_DEBUG_CONF, Type.BOOLEAN, false, Importance.LOW, TWITTER_DEBUG_DOC)
- .define(TWITTER_OAUTH_CONSUMER_KEY_CONF, Type.PASSWORD, Importance.HIGH, TWITTER_OAUTH_CONSUMER_KEY_DOC)
- .define(TWITTER_OAUTH_SECRET_KEY_CONF, Type.PASSWORD, Importance.HIGH, TWITTER_OAUTH_SECRET_KEY_DOC)
- .define(TWITTER_OAUTH_ACCESS_TOKEN_CONF, Type.PASSWORD, Importance.HIGH, TWITTER_OAUTH_ACCESS_TOKEN_DOC)
- .define(TWITTER_OAUTH_ACCESS_TOKEN_SECRET_CONF, Type.PASSWORD, Importance.HIGH, TWITTER_OAUTH_ACCESS_TOKEN_SECRET_DOC)
- .define(FILTER_KEYWORDS_CONF, Type.LIST, Importance.HIGH, FILTER_KEYWORDS_DOC)
- .define(
- ConfigKeyBuilder.of(FILTER_USER_IDS_CONF, Type.LIST)
- .importance(Importance.HIGH)
- .documentation(FILTER_USER_IDS_DOC)
- .defaultValue(Collections.emptyList())
- .validator(USERID_VALIDATOR)
- .build()
- )
- .define(KAFKA_STATUS_TOPIC_CONF, Type.STRING, Importance.HIGH, KAFKA_STATUS_TOPIC_DOC)
- .define(PROCESS_DELETES_CONF, Type.BOOLEAN, Importance.HIGH, PROCESS_DELETES_DOC)
+ .define(TWITTER_BEARER_TOKEN_CONF, Type.PASSWORD, Importance.HIGH, TWITTER_BEARER_TOKEN_DOC)
+ .define(FILTER_RULE_CONF, Type.STRING, null, Importance.HIGH, FILTER_RULE_DOC)
+ .define(TWEET_FIELDS_CONF, Type.STRING, null, Importance.HIGH, TWEET_FIELDS_CONF)
+ .define(KAFKA_TWEETS_TOPIC_CONF, Type.STRING, Importance.HIGH, KAFKA_TWEETS_TOPIC_DOC)
.define(
ConfigKeyBuilder.of(QUEUE_EMPTY_MS_CONF, Type.INT)
.importance(Importance.LOW)
@@ -135,14 +83,4 @@ public static ConfigDef conf() {
);
}
-
- public Configuration configuration() {
- Properties properties = new Properties();
- /*
- Grab all of the key/values that have a key that starts with twitter. This will strip 'twitter.' from beginning of
- each key. This aligns with what the twitter4j framework is expecting.
- */
- properties.putAll(this.originalsWithPrefix("twitter."));
- return new PropertyConfiguration(properties);
- }
}
diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceTask.java
index 779a2bc..9a8ddf3 100644
--- a/src/main/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceTask.java
+++ b/src/main/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceTask.java
@@ -1,12 +1,12 @@
/**
* Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com)
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,144 +18,214 @@
import com.github.jcustenborder.kafka.connect.utils.VersionUtil;
import com.github.jcustenborder.kafka.connect.utils.data.SourceRecordDeque;
import com.github.jcustenborder.kafka.connect.utils.data.SourceRecordDequeBuilder;
-import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
+import com.twitter.clientlib.ApiException;
+import com.twitter.clientlib.TwitterCredentialsBearer;
+import com.twitter.clientlib.api.TweetsApi;
+import com.twitter.clientlib.api.TwitterApi;
+import com.twitter.clientlib.model.AddOrDeleteRulesRequest;
+import com.twitter.clientlib.model.AddOrDeleteRulesResponse;
+import com.twitter.clientlib.model.AddRulesRequest;
+import com.twitter.clientlib.model.DeleteRulesRequest;
+import com.twitter.clientlib.model.DeleteRulesRequestDelete;
+import com.twitter.clientlib.model.FilteredStreamingTweetResponse;
+import com.twitter.clientlib.model.Get2TweetsSampleStreamResponse;
+import com.twitter.clientlib.model.Rule;
+import com.twitter.clientlib.model.RuleNoId;
+import com.twitter.clientlib.model.Tweet;
+import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import twitter4j.FilterQuery;
-import twitter4j.StallWarning;
-import twitter4j.Status;
-import twitter4j.StatusDeletionNotice;
-import twitter4j.StatusListener;
-import twitter4j.TwitterStream;
-import twitter4j.TwitterStreamFactory;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
-public class TwitterSourceTask extends SourceTask implements StatusListener {
- static final Logger log = LoggerFactory.getLogger(TwitterSourceTask.class);
- SourceRecordDeque messageQueue;
+public class TwitterSourceTask extends SourceTask {
- TwitterStream twitterStream;
- TwitterSourceConnectorConfig config;
+ private static final Logger log = LoggerFactory.getLogger(TwitterSourceTask.class);
+ private static final int RETRIES = 10;
+ private SourceRecordDeque messageQueue;
+
+ private volatile boolean running;
+
+ private TwitterSourceConnectorConfig config;
@Override
public String version() {
- return VersionUtil.version(this.getClass());
+ return VersionUtil.version(getClass());
}
@Override
public void start(Map map) {
- this.config = new TwitterSourceConnectorConfig(map);
- this.messageQueue = SourceRecordDequeBuilder.of()
- .emptyWaitMs(this.config.queueEmptyMs)
- .batchSize(this.config.queueBatchSize)
+ config = new TwitterSourceConnectorConfig(map);
+ messageQueue = SourceRecordDequeBuilder.of()
+ .emptyWaitMs(config.queueEmptyMs)
+ .batchSize(config.queueBatchSize)
.build();
- TwitterStreamFactory twitterStreamFactory = new TwitterStreamFactory(this.config.configuration());
- this.twitterStream = twitterStreamFactory.getInstance();
- String[] keywords = this.config.filterKeywords.toArray(new String[0]);
- if (log.isInfoEnabled()) {
- log.info("Setting up filters. Keywords = {}", Joiner.on(", ").join(keywords));
+ TwitterApi apiInstance = new TwitterApi(new TwitterCredentialsBearer(config.bearerToken.value()));
+ InputStream twitterStream;
+ try {
+ twitterStream = initTweetsStreamProcessing(apiInstance);
+ } catch (ApiException e) {
+ // Api exception can be temporary. We will try to retry it a few times.
+ throw new RetriableException(e);
}
+ running = true;
+ Thread readingThread = new TweetsStreamProcessingThread(apiInstance, twitterStream);
+ readingThread.start();
+ }
+
+ private class TweetsStreamProcessingThread extends Thread {
+
+ private final TwitterApi apiInstance;
+
+ private InputStream twitterStream;
- FilterQuery filterQuery = new FilterQuery();
- filterQuery.track(keywords);
- if (!this.config.filterUserIds.isEmpty()) {
- long[] userIds = this.config.filterUserIds.stream().mapToLong(Long::valueOf).toArray();
- log.info("Setting up filters. userIds = {}", Joiner.on(", ").join(this.config.filterUserIds));
- filterQuery.follow(userIds);
+ public TweetsStreamProcessingThread(TwitterApi apiInstance, InputStream initialTwitterStream) {
+ this.apiInstance = apiInstance;
+ this.twitterStream = initialTwitterStream;
}
- if (log.isInfoEnabled()) {
- log.info("Starting the twitter stream.");
+ @Override
+ public void run() {
+ while (running) {
+ try {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(twitterStream));
+ String line = reader.readLine();
+ while (running && line != null) {
+ if (config.filterRule != null) {
+ processFilteredStreamingTweetResponse(line);
+ } else {
+ processGet2TweetsSampleStreamResponse(line);
+ }
+ line = reader.readLine();
+ }
+ closeTwitterStreamGracefully();
+ } catch (Exception ex) {
+ log.error("Exception during tweets stream processing. Restarting stream processing...", ex);
+ try {
+ closeTwitterStreamGracefully();
+ Thread.sleep(1000);
+ twitterStream = initTweetsStreamProcessing(apiInstance);
+ } catch (Exception exx) {
+ log.error("Exception during restart of stream processing. Stopping job...");
+ throw new RuntimeException(exx);
+ }
+ }
+ }
}
- twitterStream.addListener(this);
- twitterStream.filter(filterQuery);
- }
- @Override
- public List poll() throws InterruptedException {
- return this.messageQueue.getBatch();
+ private void closeTwitterStreamGracefully() {
+ try {
+ twitterStream.close();
+ } catch (IOException ex) {
+ log.error("Exception during tweets stream closing", ex);
+ }
+ }
}
- @Override
- public void stop() {
- if (log.isInfoEnabled()) {
- log.info("Shutting down twitter stream.");
+ private InputStream initTweetsStreamProcessing(TwitterApi apiInstance) throws ApiException {
+ InputStream twitterStream;
+ if (config.filterRule != null) {
+ log.info("Setting up filter rule = {}", config.filterRule);
+ setFilterRule(apiInstance);
+ log.info("Starting tweets search stream.");
+ TweetsApi.APIsearchStreamRequest builder = apiInstance.tweets().searchStream();
+ if (config.tweetFields != null) {
+ log.info("Setting up tweet fields = {}", config.tweetFields);
+ builder = builder.tweetFields(Arrays.stream(config.tweetFields.split(",")).collect(Collectors.toSet()));
+ }
+ twitterStream = builder.execute(RETRIES);
+ } else {
+ log.info("Starting tweets sample stream.");
+ TweetsApi.APIsampleStreamRequest builder = apiInstance.tweets().sampleStream();
+ if (config.tweetFields != null) {
+ log.info("Setting up tweet fields = {}", config.tweetFields);
+ builder = builder.tweetFields(Arrays.stream(config.tweetFields.split(",")).collect(Collectors.toSet()));
+ }
+ twitterStream = builder.execute(RETRIES);
}
- twitterStream.shutdown();
+ return twitterStream;
}
- @Override
- public void onStatus(Status status) {
- try {
- Struct keyStruct = new Struct(StatusConverter.STATUS_SCHEMA_KEY);
- Struct valueStruct = new Struct(StatusConverter.STATUS_SCHEMA);
-
- StatusConverter.convertKey(status, keyStruct);
- StatusConverter.convert(status, valueStruct);
-
- Map sourcePartition = ImmutableMap.of();
- Map sourceOffset = ImmutableMap.of();
-
- SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, this.config.topic, StatusConverter.STATUS_SCHEMA_KEY, keyStruct, StatusConverter.STATUS_SCHEMA, valueStruct);
- this.messageQueue.add(record);
- } catch (Exception ex) {
- if (log.isErrorEnabled()) {
- log.error("Exception thrown", ex);
+ private void setFilterRule(TwitterApi apiInstance) throws ApiException {
+ List currentRules = apiInstance.tweets().getRules().execute(RETRIES).getData();
+ if (currentRules != null && !currentRules.isEmpty()) {
+ List currentNotMatchingRulesIds = currentRules.stream()
+ .filter(rule -> !rule.getValue().equals(config.filterRule))
+ .map(Rule::getId).collect(Collectors.toList());
+ if (!currentNotMatchingRulesIds.isEmpty()) {
+ DeleteRulesRequest delete = new DeleteRulesRequest().delete(new DeleteRulesRequestDelete().ids(currentNotMatchingRulesIds));
+ AddOrDeleteRulesResponse deleteRulesResult = apiInstance.tweets().addOrDeleteRules(new AddOrDeleteRulesRequest(delete)).execute(RETRIES);
+ log.debug("Delete rules result: " + deleteRulesResult);
}
}
+ if (currentRules == null || currentRules.stream().noneMatch(rule -> rule.getValue().equals(config.filterRule))) {
+ RuleNoId rule = new RuleNoId().value(config.filterRule);
+ AddRulesRequest add = new AddRulesRequest().addAddItem(rule);
+ AddOrDeleteRulesResponse addRulesResult = apiInstance.tweets().addOrDeleteRules(new AddOrDeleteRulesRequest(add)).execute(RETRIES);
+ log.debug("Add rules result: " + addRulesResult);
+ } else {
+ log.debug("Filter rule already configured");
+ }
}
- @Override
- public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
- if (!this.config.processDeletes) {
- return;
+ private void processFilteredStreamingTweetResponse(String line) {
+ try {
+ FilteredStreamingTweetResponse tweetResponse = FilteredStreamingTweetResponse.fromJson(line);
+ if (tweetResponse != null) {
+ onTweet(tweetResponse.getData());
+ }
+ } catch (Exception ex) {
+ log.error("Exception during TweetsSampleStreamResponse processing - will be skipped", ex);
}
+ }
+ private void processGet2TweetsSampleStreamResponse(String line) {
try {
- Struct keyStruct = new Struct(StatusConverter.SCHEMA_STATUS_DELETION_NOTICE_KEY);
-
- StatusConverter.convertKey(statusDeletionNotice, keyStruct);
-
- Map sourcePartition = ImmutableMap.of();
- Map sourceOffset = ImmutableMap.of();
-
- SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, this.config.topic, StatusConverter.SCHEMA_STATUS_DELETION_NOTICE_KEY, keyStruct, null, null);
- this.messageQueue.add(record);
- } catch (Exception ex) {
- if (log.isErrorEnabled()) {
- log.error("Exception thrown", ex);
+ Get2TweetsSampleStreamResponse tweetResponse = Get2TweetsSampleStreamResponse.fromJson(line);
+ if (tweetResponse != null) {
+ onTweet(tweetResponse.getData());
}
+ } catch (Exception ex) {
+ log.error("Exception during Get2TweetsSampleStreamResponse processing - will be skipped", ex);
}
}
@Override
- public void onTrackLimitationNotice(int i) {
-
+ public List poll() throws InterruptedException {
+ return messageQueue.getBatch();
}
@Override
- public void onScrubGeo(long l, long l1) {
-
+ public void stop() {
+ log.info("Shutting down twitter stream.");
+ running = false;
}
- @Override
- public void onStallWarning(StallWarning stallWarning) {
- if (log.isWarnEnabled()) {
- log.warn("code = '{}' percentFull = '{}' - {}", stallWarning.getCode(), stallWarning.getPercentFull(), stallWarning.getMessage());
- }
- }
+ public void onTweet(Tweet tweet) {
+ try {
+ Struct value = TweetConverter.convert(tweet);
- @Override
- public void onException(Exception e) {
- if (log.isErrorEnabled()) {
- log.error("onException", e);
+ Map sourcePartition = ImmutableMap.of();
+ Map sourceOffset = ImmutableMap.of();
+
+ SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, config.topic, Schema.STRING_SCHEMA, tweet.getId(), TweetConverter.TWEET_SCHEMA, value);
+ messageQueue.add(record);
+ } catch (Exception ex) {
+ log.error("Exception thrown", ex);
}
}
-}
\ No newline at end of file
+
+}
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/twitter/DocumentationTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/twitter/DocumentationTest.java
deleted file mode 100644
index 6eed779..0000000
--- a/src/test/java/com/github/jcustenborder/kafka/connect/twitter/DocumentationTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package com.github.jcustenborder.kafka.connect.twitter;
-
-import com.github.jcustenborder.kafka.connect.utils.BaseDocumentationTest;
-import org.apache.kafka.connect.data.Schema;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class DocumentationTest extends BaseDocumentationTest {
- static Schema schema(Field field) {
- try {
- return (Schema) field.get(null);
- } catch (IllegalAccessException e) {
- throw new IllegalStateException(e);
- }
- }
-
- @Override
- protected List schemas() {
- List schemas = Arrays.stream(StatusConverter.class.getFields())
- .filter(field -> Modifier.isFinal(field.getModifiers()))
- .filter(field -> Modifier.isStatic(field.getModifiers()))
- .filter(field -> Schema.class.equals(field.getType()))
- .map(DocumentationTest::schema)
- .collect(Collectors.toList());
- return schemas;
- }
-}
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/twitter/SchemaGeneratorTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/twitter/SchemaGeneratorTest.java
deleted file mode 100644
index a898198..0000000
--- a/src/test/java/com/github/jcustenborder/kafka/connect/twitter/SchemaGeneratorTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-package com.github.jcustenborder.kafka.connect.twitter;
-
-import com.google.common.base.CaseFormat;
-import org.junit.jupiter.api.Test;
-import org.reflections.Reflections;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
-import twitter4j.MediaEntity;
-import twitter4j.TweetEntity;
-
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-public class SchemaGeneratorTest {
-
- List> list(Reflections reflections, Class cls) {
- List> classes = reflections.getSubTypesOf(cls)
- .stream()
- .filter(aClass -> Modifier.isInterface(aClass.getModifiers()))
- .collect(Collectors.toList());
- classes.sort(Comparator.comparing(Class::getName));
- return classes;
- }
-
- String schema(Class> cls) {
- String result;
-
- if (String.class.equals(cls)) {
- result = "SchemaBuilder.string().optional().doc(\"\").build()";
- } else if (int.class.equals(cls)) {
- result = "SchemaBuilder.int32().optional().doc(\"\").build()";
- } else if (long.class.equals(cls)) {
- result = "SchemaBuilder.int64().optional().doc(\"\").build()";
- } else if (cls.isArray()) {
- String childSchema = schema(cls.getComponentType());
- result = String.format("SchemaBuilder.array(%s).optional().doc(\"\").build()", childSchema);
- } else if (Map.class.isAssignableFrom(cls)) {
- result = "SchemaBuilder.map(Schema.STRING_SCHEMA, SCHEMA_MEDIA_ENTITY_SIZE)";
-
- } else {
- result = "SCHEMA_" + CaseFormat.UPPER_CAMEL.to(CaseFormat.UPPER_UNDERSCORE, cls.getSimpleName()).replace('$', '_');
- }
-
-
- return result;
- }
-
- void processClass(Class> cls, StringBuilder builder) {
-
- final String schemaConstantName;
- final String schemaName;
- final String typeName;
- final String convertMethodName;
-
- if (null == cls.getDeclaringClass()) {
- schemaConstantName = "SCHEMA_" + CaseFormat.UPPER_CAMEL.to(CaseFormat.UPPER_UNDERSCORE, cls.getSimpleName());
- schemaName = String.format("com.github.jcustenborder.kafka.connect.twitter.%s", cls.getSimpleName());
- typeName = cls.getSimpleName();
- convertMethodName = String.format("convert%s", cls.getSimpleName());
- } else {
- schemaConstantName = "SCHEMA_" + CaseFormat.UPPER_CAMEL.to(CaseFormat.UPPER_UNDERSCORE, cls.getDeclaringClass().getSimpleName() + cls.getSimpleName());
- typeName = String.format("%s.%s", cls.getDeclaringClass().getSimpleName(), cls.getSimpleName());
- schemaName = String.format("com.github.jcustenborder.kafka.connect.twitter.%s.%s", cls.getSimpleName(), cls.getDeclaringClass().getSimpleName());
- convertMethodName = String.format("convert%s%s", cls.getDeclaringClass().getSimpleName(), cls.getSimpleName());
- }
-
-
- builder.append(String.format("public static final Schema %s =SchemaBuilder.struct()\n", schemaConstantName));
- builder.append(String.format(" .name(\"%s\")\n", schemaName));
- builder.append(" .doc(\"\")\n");
-
- Set methods = new HashSet<>();
- for (Method method : cls.getMethods()) {
- String methodName = method.getName().replace("get", "");
- if (!methods.add(methodName)) {
- continue;
- }
- String expectedSchema = schema(method.getReturnType());
- builder.append(String.format(" .field(\"%s\", %s)\n", methodName, expectedSchema));
- }
- builder.append(" .build();\n\n");
-
- methods.clear();
- String variableName = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_CAMEL, cls.getSimpleName());
- builder.append(String.format("static Struct %s(%s %s) {\n", convertMethodName, typeName, variableName));
- builder.append(String.format(" return new Struct(%s)", schemaConstantName));
- for (Method method : cls.getMethods()) {
- String methodName = method.getName().replace("get", "");
- if (!methods.add(methodName)) {
- continue;
- }
- builder.append(String.format("\n .put(\"%s\", %s.%s())", methodName, variableName, method.getName()));
- }
- builder.append(";\n }\n");
-
- builder.append("\n");
- builder.append(String.format("public static List convert(%s[] items) {\n", typeName));
- builder.append(" List result = new ArrayList<>();\n");
- builder.append(" if(null==items) {\n");
- builder.append(" return result;\n");
- builder.append(" }\n");
- builder.append(String.format(" for(%s item: items) {\n", typeName));
- builder.append(String.format(" Struct struct = %s(item);\n", convertMethodName));
- builder.append(" result.add(struct);\n");
- builder.append(" }\n");
- builder.append(" return result;\n");
- builder.append("}\n");
-
-// }
-// public static List convert(UserMentionEntity[] userMentionEntities) {
-// List result = new ArrayList<>();
-// if(null==userMentionEntities) {
-// return result;
-// }
-// for(UserMentionEntity item: userMentionEntities) {
-// Struct struct = convertUserMentionEntity(item);
-// result.add(struct);
-// }
-// return result;
-// }
-
-
- }
-
- @Test
- public void tweetEntities() {
- Reflections reflections = new Reflections(new ConfigurationBuilder()
- .setUrls(ClasspathHelper.forJavaClassPath())
- .forPackages(TweetEntity.class.getPackage().getName())
- );
-
- List> allClasses = new ArrayList<>();
- List> classes = list(reflections, TweetEntity.class);
- allClasses.add(MediaEntity.Variant.class);
- allClasses.add(MediaEntity.Size.class);
- allClasses.addAll(classes);
-
-
- for (Class> cls : allClasses) {
- StringBuilder builder = new StringBuilder();
- processClass(cls, builder);
-
- System.out.println(builder);
- }
-
-
- }
-
-
-}
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/twitter/StatusConverterTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/twitter/StatusConverterTest.java
deleted file mode 100644
index 6ca4b5c..0000000
--- a/src/test/java/com/github/jcustenborder/kafka/connect/twitter/StatusConverterTest.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.github.jcustenborder.kafka.connect.twitter;
-
-import org.apache.kafka.connect.data.Struct;
-import org.junit.jupiter.api.Test;
-import twitter4j.GeoLocation;
-import twitter4j.Place;
-import twitter4j.Status;
-import twitter4j.StatusDeletionNotice;
-import twitter4j.User;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class StatusConverterTest {
-
- public static GeoLocation mockGeoLocation() {
- return new GeoLocation(30.2672D, 97.7431D);
- }
-
- public static Place mockPlace() {
- Place place = mock(Place.class);
- when(place.getName()).thenReturn("Example place");
- when(place.getStreetAddress()).thenReturn("123 Example St");
- when(place.getCountryCode()).thenReturn("US");
- when(place.getId()).thenReturn("asdfaisdfasd");
- when(place.getCountry()).thenReturn("United States");
- when(place.getPlaceType()).thenReturn("ADF");
- when(place.getURL()).thenReturn("http://www.example.com/");
- when(place.getFullName()).thenReturn("Example place");
- return place;
- }
-
- public static Status mockStatus() {
- Status status = mock(Status.class);
- User user = mockUser();
- GeoLocation geoLocation = mockGeoLocation();
- Place place = mockPlace();
-
- when(status.getCreatedAt()).thenReturn(new Date(1471667709998L));
- when(status.getId()).thenReturn(9823452L);
- when(status.getText()).thenReturn("This is a twit");
- when(status.getSource()).thenReturn("foo");
- when(status.isTruncated()).thenReturn(false);
- when(status.getInReplyToStatusId()).thenReturn(2345234L);
- when(status.getInReplyToUserId()).thenReturn(8756786L);
- when(status.getInReplyToScreenName()).thenReturn("foo");
- when(status.getGeoLocation()).thenReturn(geoLocation);
- when(status.getPlace()).thenReturn(place);
- when(status.isFavorited()).thenReturn(true);
- when(status.isRetweeted()).thenReturn(false);
- when(status.getFavoriteCount()).thenReturn(1234);
- when(status.getUser()).thenReturn(user);
- when(status.isRetweet()).thenReturn(false);
- when(status.getContributors()).thenReturn(new long[]{431234L, 986789678L});
- when(status.getRetweetCount()).thenReturn(1234);
- when(status.isRetweetedByMe()).thenReturn(false);
- when(status.getCurrentUserRetweetId()).thenReturn(653456345L);
- when(status.isPossiblySensitive()).thenReturn(false);
- when(status.getLang()).thenReturn("en-US");
- when(status.getWithheldInCountries()).thenReturn(new String[]{"CN"});
-
- return status;
- }
-
- public static User mockUser() {
- User user = mock(User.class);
-
- when(user.getId()).thenReturn(1234L);
- when(user.getName()).thenReturn("Example User");
- when(user.getScreenName()).thenReturn("example");
- when(user.getLocation()).thenReturn("Austin, TX");
- when(user.getDescription()).thenReturn("This is a description");
- when(user.isContributorsEnabled()).thenReturn(true);
- when(user.getProfileImageURL()).thenReturn("http://i.twittercdn.com/profile.jpg");
- when(user.getBiggerProfileImageURL()).thenReturn("http://i.twittercdn.com/biggerprofile.jpg");
- when(user.getMiniProfileImageURL()).thenReturn("http://i.twittercdn.com/mini.profile.jpg");
- when(user.getOriginalProfileImageURL()).thenReturn("http://i.twittercdn.com/original.profile.jpg");
- when(user.getProfileImageURLHttps()).thenReturn("https://i.twittercdn.com/profile.jpg");
- when(user.getBiggerProfileImageURLHttps()).thenReturn("https://i.twittercdn.com/bigger.profile.jpg");
- when(user.getMiniProfileImageURLHttps()).thenReturn("https://i.twittercdn.com/mini.profile.jpg");
- when(user.getOriginalProfileImageURLHttps()).thenReturn("https://i.twittercdn.com/original.profile.jpg");
- when(user.isDefaultProfileImage()).thenReturn(true);
- when(user.getURL()).thenReturn("https://www.twitter.com/example");
- when(user.isProtected()).thenReturn(false);
- when(user.getFollowersCount()).thenReturn(54245);
- when(user.getProfileBackgroundColor()).thenReturn("#ffffff");
- when(user.getProfileTextColor()).thenReturn("#000000");
- when(user.getProfileLinkColor()).thenReturn("#aaaaaa");
- when(user.getProfileSidebarFillColor()).thenReturn("#333333");
- when(user.getProfileSidebarBorderColor()).thenReturn("#555555");
- when(user.isProfileUseBackgroundImage()).thenReturn(true);
- when(user.isDefaultProfile()).thenReturn(true);
- when(user.isShowAllInlineMedia()).thenReturn(true);
- when(user.getFriendsCount()).thenReturn(452345234);
- when(user.getCreatedAt()).thenReturn(new Date(1471665653209L));
- when(user.getFavouritesCount()).thenReturn(12341);
- when(user.getUtcOffset()).thenReturn(8);
- when(user.getTimeZone()).thenReturn("UTC");
- when(user.getProfileBackgroundImageURL()).thenReturn("https://i.twittercdn.com/original.background.jpg");
- when(user.getProfileBackgroundImageUrlHttps()).thenReturn("https://i.twittercdn.com/original.background.jpg");
- when(user.getProfileBannerURL()).thenReturn("https://i.twittercdn.com/original.banner.jpg");
- when(user.getProfileBannerRetinaURL()).thenReturn("https://i.twittercdn.com/original.banner.jpg");
- when(user.getProfileBannerIPadURL()).thenReturn("https://i.twittercdn.com/original.banner.jpg");
- when(user.getProfileBannerIPadRetinaURL()).thenReturn("https://i.twittercdn.com/original.banner.jpg");
- when(user.getProfileBannerMobileURL()).thenReturn("https://i.twittercdn.com/original.banner.jpg");
- when(user.getProfileBannerMobileRetinaURL()).thenReturn("https://i.twittercdn.com/original.banner.jpg");
- when(user.isProfileBackgroundTiled()).thenReturn(false);
- when(user.getLang()).thenReturn("en-us");
- when(user.getStatusesCount()).thenReturn(543);
- when(user.isGeoEnabled()).thenReturn(true);
- when(user.isVerified()).thenReturn(true);
- when(user.isTranslator()).thenReturn(false);
- when(user.getListedCount()).thenReturn(4);
- when(user.isFollowRequestSent()).thenReturn(false);
- when(user.getWithheldInCountries()).thenReturn(new String[]{"CN"});
-
-
- return user;
- }
-
- public static StatusDeletionNotice mockStatusDeletionNotice() {
- StatusDeletionNotice statusDeletionNotice = mock(StatusDeletionNotice.class);
- when(statusDeletionNotice.getStatusId()).thenReturn(1234565345L);
- when(statusDeletionNotice.getUserId()).thenReturn(6543456354L);
- return statusDeletionNotice;
- }
-
- List convert(long[] values) {
- List list = new ArrayList<>();
- for (Long l : values) {
- list.add(l);
- }
- return list;
- }
-
- List convert(String[] values) {
- List list = new ArrayList<>();
- for (String l : values) {
- list.add(l);
- }
- return list;
- }
-
- void assertStatus(Status status, Struct struct) {
- assertEquals(status.getCreatedAt(), struct.get("CreatedAt"), "CreatedAt does not match.");
- assertEquals(status.getId(), struct.get("Id"), "Id does not match.");
- assertEquals(status.getText(), struct.get("Text"), "Text does not match.");
- assertEquals(status.getSource(), struct.get("Source"), "Source does not match.");
- assertEquals(status.isTruncated(), struct.get("Truncated"), "Truncated does not match.");
- assertEquals(status.getInReplyToStatusId(), struct.get("InReplyToStatusId"), "InReplyToStatusId does not match.");
- assertEquals(status.getInReplyToUserId(), struct.get("InReplyToUserId"), "InReplyToUserId does not match.");
- assertEquals(status.getInReplyToScreenName(), struct.get("InReplyToScreenName"), "InReplyToScreenName does not match.");
- assertEquals(status.isFavorited(), struct.get("Favorited"), "Favorited does not match.");
- assertEquals(status.isRetweeted(), struct.get("Retweeted"), "Retweeted does not match.");
- assertEquals(status.getFavoriteCount(), struct.get("FavoriteCount"), "FavoriteCount does not match.");
- assertEquals(status.isRetweet(), struct.get("Retweet"), "Retweet does not match.");
- assertEquals(status.getRetweetCount(), struct.get("RetweetCount"), "RetweetCount does not match.");
- assertEquals(status.isRetweetedByMe(), struct.get("RetweetedByMe"), "RetweetedByMe does not match.");
- assertEquals(status.getCurrentUserRetweetId(), struct.get("CurrentUserRetweetId"), "CurrentUserRetweetId does not match.");
- assertEquals(status.isPossiblySensitive(), struct.get("PossiblySensitive"), "PossiblySensitive does not match.");
- assertEquals(status.getLang(), struct.get("Lang"), "Lang does not match.");
-
- assertUser(status.getUser(), struct.getStruct("User"));
- assertPlace(status.getPlace(), struct.getStruct("Place"));
- assertGeoLocation(status.getGeoLocation(), struct.getStruct("GeoLocation"));
-
- assertEquals(convert(status.getContributors()), struct.getArray("Contributors"), "Contributors does not match.");
- assertEquals(convert(status.getWithheldInCountries()), struct.get("WithheldInCountries"), "WithheldInCountries does not match.");
- }
-
- void assertGeoLocation(GeoLocation geoLocation, Struct struct) {
- assertEquals(struct.getFloat64("Latitude"), 1, geoLocation.getLatitude());
- assertEquals(struct.getFloat64("Longitude"), 1, geoLocation.getLongitude());
- }
-
- void assertPlace(Place place, Struct struct) {
- assertEquals(place.getName(), struct.get("Name"), "Name does not match.");
- assertEquals(place.getStreetAddress(), struct.get("StreetAddress"), "StreetAddress does not match.");
- assertEquals(place.getCountryCode(), struct.get("CountryCode"), "CountryCode does not match.");
- assertEquals(place.getId(), struct.get("Id"), "Id does not match.");
- assertEquals(place.getCountry(), struct.get("Country"), "Country does not match.");
- assertEquals(place.getPlaceType(), struct.get("PlaceType"), "PlaceType does not match.");
- assertEquals(place.getURL(), struct.get("URL"), "URL does not match.");
- assertEquals(place.getFullName(), struct.get("FullName"), "FullName does not match.");
- }
-
- void assertUser(User user, Struct struct) {
- assertNotNull(struct, "struct should not be null.");
- assertEquals(user.getId(), struct.get("Id"), "Id does not match.");
- assertEquals(user.getName(), struct.get("Name"), "Name does not match.");
- assertEquals(user.getScreenName(), struct.get("ScreenName"), "ScreenName does not match.");
- assertEquals(user.getLocation(), struct.get("Location"), "Location does not match.");
- assertEquals(user.getDescription(), struct.get("Description"), "Description does not match.");
- assertEquals(user.isContributorsEnabled(), struct.get("ContributorsEnabled"), "ContributorsEnabled does not match.");
- assertEquals(user.getProfileImageURL(), struct.get("ProfileImageURL"), "ProfileImageURL does not match.");
- assertEquals(user.getBiggerProfileImageURL(), struct.get("BiggerProfileImageURL"), "BiggerProfileImageURL does not match.");
- assertEquals(user.getMiniProfileImageURL(), struct.get("MiniProfileImageURL"), "MiniProfileImageURL does not match.");
- assertEquals(user.getOriginalProfileImageURL(), struct.get("OriginalProfileImageURL"), "OriginalProfileImageURL does not match.");
- assertEquals(user.getProfileImageURLHttps(), struct.get("ProfileImageURLHttps"), "ProfileImageURLHttps does not match.");
- assertEquals(user.getBiggerProfileImageURLHttps(), struct.get("BiggerProfileImageURLHttps"), "BiggerProfileImageURLHttps does not match.");
- assertEquals(user.getMiniProfileImageURLHttps(), struct.get("MiniProfileImageURLHttps"), "MiniProfileImageURLHttps does not match.");
- assertEquals(user.getOriginalProfileImageURLHttps(), struct.get("OriginalProfileImageURLHttps"), "OriginalProfileImageURLHttps does not match.");
- assertEquals(user.isDefaultProfileImage(), struct.get("DefaultProfileImage"), "DefaultProfileImage does not match.");
- assertEquals(user.getURL(), struct.get("URL"), "URL does not match.");
- assertEquals(user.isProtected(), struct.get("Protected"), "Protected does not match.");
- assertEquals(user.getFollowersCount(), struct.get("FollowersCount"), "FollowersCount does not match.");
- assertEquals(user.getProfileBackgroundColor(), struct.get("ProfileBackgroundColor"), "ProfileBackgroundColor does not match.");
- assertEquals(user.getProfileTextColor(), struct.get("ProfileTextColor"), "ProfileTextColor does not match.");
- assertEquals(user.getProfileLinkColor(), struct.get("ProfileLinkColor"), "ProfileLinkColor does not match.");
- assertEquals(user.getProfileSidebarFillColor(), struct.get("ProfileSidebarFillColor"), "ProfileSidebarFillColor does not match.");
- assertEquals(user.getProfileSidebarBorderColor(), struct.get("ProfileSidebarBorderColor"), "ProfileSidebarBorderColor does not match.");
- assertEquals(user.isProfileUseBackgroundImage(), struct.get("ProfileUseBackgroundImage"), "ProfileUseBackgroundImage does not match.");
- assertEquals(user.isDefaultProfile(), struct.get("DefaultProfile"), "DefaultProfile does not match.");
- assertEquals(user.isShowAllInlineMedia(), struct.get("ShowAllInlineMedia"), "ShowAllInlineMedia does not match.");
- assertEquals(user.getFriendsCount(), struct.get("FriendsCount"), "FriendsCount does not match.");
- assertEquals(user.getCreatedAt(), struct.get("CreatedAt"), "CreatedAt does not match.");
- assertEquals(user.getFavouritesCount(), struct.get("FavouritesCount"), "FavouritesCount does not match.");
- assertEquals(user.getUtcOffset(), struct.get("UtcOffset"), "UtcOffset does not match.");
- assertEquals(user.getTimeZone(), struct.get("TimeZone"), "TimeZone does not match.");
- assertEquals(user.getProfileBackgroundImageURL(), struct.get("ProfileBackgroundImageURL"), "ProfileBackgroundImageURL does not match.");
- assertEquals(user.getProfileBackgroundImageUrlHttps(), struct.get("ProfileBackgroundImageUrlHttps"), "ProfileBackgroundImageUrlHttps does not match.");
- assertEquals(user.getProfileBannerURL(), struct.get("ProfileBannerURL"), "ProfileBannerURL does not match.");
- assertEquals(user.getProfileBannerRetinaURL(), struct.get("ProfileBannerRetinaURL"), "ProfileBannerRetinaURL does not match.");
- assertEquals(user.getProfileBannerIPadURL(), struct.get("ProfileBannerIPadURL"), "ProfileBannerIPadURL does not match.");
- assertEquals(user.getProfileBannerIPadRetinaURL(), struct.get("ProfileBannerIPadRetinaURL"), "ProfileBannerIPadRetinaURL does not match.");
- assertEquals(user.getProfileBannerMobileURL(), struct.get("ProfileBannerMobileURL"), "ProfileBannerMobileURL does not match.");
- assertEquals(user.getProfileBannerMobileRetinaURL(), struct.get("ProfileBannerMobileRetinaURL"), "ProfileBannerMobileRetinaURL does not match.");
- assertEquals(user.isProfileBackgroundTiled(), struct.get("ProfileBackgroundTiled"), "ProfileBackgroundTiled does not match.");
- assertEquals(user.getLang(), struct.get("Lang"), "Lang does not match.");
- assertEquals(user.getStatusesCount(), struct.get("StatusesCount"), "StatusesCount does not match.");
- assertEquals(user.isGeoEnabled(), struct.get("GeoEnabled"), "GeoEnabled does not match.");
- assertEquals(user.isVerified(), struct.get("Verified"), "Verified does not match.");
- assertEquals(user.isTranslator(), struct.get("Translator"), "Translator does not match.");
- assertEquals(user.getListedCount(), struct.get("ListedCount"), "ListedCount does not match.");
- assertEquals(user.isFollowRequestSent(), struct.get("FollowRequestSent"), "FollowRequestSent does not match.");
- }
-
- void assertKey(Status status, Struct struct) {
- assertEquals(status.getId(), struct.get("Id"), "Id does not match.");
- }
-
- @Test
- public void convertStatus() {
- Status status = mockStatus();
- Struct struct = new Struct(StatusConverter.STATUS_SCHEMA);
- StatusConverter.convert(status, struct);
- assertStatus(status, struct);
- }
-
- @Test
- public void convertUser() {
- User user = mockUser();
- Struct struct = new Struct(StatusConverter.USER_SCHEMA);
- StatusConverter.convert(user, struct);
- assertUser(user, struct);
- }
-
- @Test
- public void convertPlace() {
- Place place = mockPlace();
- Struct struct = new Struct(StatusConverter.PLACE_SCHEMA);
- StatusConverter.convert(place, struct);
- assertPlace(place, struct);
- }
-
- @Test
- public void convertGeoLocation() {
- GeoLocation geoLocation = mockGeoLocation();
- Struct struct = new Struct(StatusConverter.GEO_LOCATION_SCHEMA);
- StatusConverter.convert(geoLocation, struct);
- assertGeoLocation(geoLocation, struct);
- }
-
- @Test
- public void convertStatusKey() {
- Status status = mockStatus();
- Struct struct = new Struct(StatusConverter.STATUS_SCHEMA_KEY);
- StatusConverter.convertKey(status, struct);
- assertKey(status, struct);
- }
-
- void assertStatusDeletionNotice(StatusDeletionNotice statusDeletionNotice, Struct struct) {
- assertEquals(statusDeletionNotice.getStatusId(), struct.get("StatusId"), "StatusId does not match.");
- assertEquals(statusDeletionNotice.getUserId(), struct.get("UserId"), "UserId does not match.");
- }
-
- void assertStatusDeletionNoticeKey(StatusDeletionNotice statusDeletionNotice, Struct struct) {
- assertEquals(statusDeletionNotice.getStatusId(), struct.get("StatusId"), "StatusId does not match.");
- }
-
- @Test
- public void convertStatusDeletionNotice() {
- StatusDeletionNotice statusDeletionNotice = mockStatusDeletionNotice();
- Struct struct = new Struct(StatusConverter.SCHEMA_STATUS_DELETION_NOTICE);
- StatusConverter.convert(statusDeletionNotice, struct);
- assertStatusDeletionNotice(statusDeletionNotice, struct);
- }
-
- @Test
- public void convertKeyStatusDeletionNotice() {
- StatusDeletionNotice statusDeletionNotice = mockStatusDeletionNotice();
- Struct struct = new Struct(StatusConverter.SCHEMA_STATUS_DELETION_NOTICE_KEY);
- StatusConverter.convertKey(statusDeletionNotice, struct);
- assertStatusDeletionNoticeKey(statusDeletionNotice, struct);
- }
-}
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/twitter/TweetConverterTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/twitter/TweetConverterTest.java
new file mode 100644
index 0000000..3b987ad
--- /dev/null
+++ b/src/test/java/com/github/jcustenborder/kafka/connect/twitter/TweetConverterTest.java
@@ -0,0 +1,103 @@
+/**
+ * Copyright © 2022 Arek Burdach (arek.burdach@gmail.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.github.jcustenborder.kafka.connect.twitter;
+
+import com.twitter.clientlib.JSON;
+import com.twitter.clientlib.model.FullTextEntities;
+import com.twitter.clientlib.model.HashtagEntity;
+import com.twitter.clientlib.model.Point;
+import com.twitter.clientlib.model.Tweet;
+import com.twitter.clientlib.model.TweetEditControls;
+import com.twitter.clientlib.model.TweetGeo;
+import com.twitter.clientlib.model.UrlEntity;
+import io.confluent.connect.avro.AvroConverter;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import org.apache.commons.io.IOUtils;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TweetConverterTest {
+
+ {
+ // init twitter json deserializers
+ new JSON();
+ }
+
+ @Test
+ public void shouldConvertTweetWithPhoto() throws IOException {
+ Tweet tweet = Tweet.fromJson(IOUtils.resourceToString("/sample_tweets/with-photo.json", StandardCharsets.UTF_8));
+ Struct result = TweetConverter.convert(tweet);
+ assertEquals("How times have changed https://t.co/gCxUkZ4kZC", result.getString(Tweet.SERIALIZED_NAME_TEXT));
+ assertEquals(5, result
+ .getStruct(Tweet.SERIALIZED_NAME_EDIT_CONTROLS)
+ .getInt32(TweetEditControls.SERIALIZED_NAME_EDITS_REMAINING));
+ assertEquals("https://t.co/gCxUkZ4kZC", result
+ .getStruct(Tweet.SERIALIZED_NAME_ENTITIES)
+ .getArray(FullTextEntities.SERIALIZED_NAME_URLS)
+ .get(0)
+ .getString(UrlEntity.SERIALIZED_NAME_URL));
+ assertEquals("everyone", result.getString(Tweet.SERIALIZED_NAME_REPLY_SETTINGS));
+ }
+
+ @Test
+ public void shouldConvertTweetWithReference() throws IOException {
+ Tweet tweet = Tweet.fromJson(IOUtils.resourceToString("/sample_tweets/with-reference.json", StandardCharsets.UTF_8));
+ TweetConverter.convert(tweet);
+ }
+
+ @Test
+ public void shouldConvertReplyTweet() throws IOException {
+ Tweet tweet = Tweet.fromJson(IOUtils.resourceToString("/sample_tweets/reply.json", StandardCharsets.UTF_8));
+ TweetConverter.convert(tweet);
+ }
+
+ @Test
+ public void shouldConvertTweetWithHashtag() throws IOException {
+ Tweet tweet = Tweet.fromJson(IOUtils.resourceToString("/sample_tweets/with-hashtag.json", StandardCharsets.UTF_8));
+ Struct result = TweetConverter.convert(tweet);
+ assertEquals("Bitcoin", result
+ .getStruct(Tweet.SERIALIZED_NAME_ENTITIES)
+ .getArray(FullTextEntities.SERIALIZED_NAME_HASHTAGS)
+ .get(0)
+ .getString(HashtagEntity.SERIALIZED_NAME_TAG));
+ }
+
+ @Test
+ public void shouldConvertDecimalToDesiredScale() {
+ Tweet tweet = new Tweet();
+ tweet.setId("foo");
+ tweet.setText("foo");
+ TweetGeo geo = new TweetGeo();
+ Point point = new Point();
+ point.setType(Point.TypeEnum.POINT);
+ point.setCoordinates(Arrays.asList(new BigDecimal("12.12345678"), new BigDecimal("12.1234567")));
+ geo.setCoordinates(point);
+ tweet.setGeo(geo);
+ Struct result = TweetConverter.convert(tweet);
+ AvroConverter converter = new AvroConverter(new MockSchemaRegistryClient());
+ converter.configure(Collections.singletonMap("schema.registry.url", "http://localhost:8080/not_used"), false);
+ converter.fromConnectData("foo", TweetConverter.TWEET_SCHEMA, result);
+ }
+
+}
diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceConnectorTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceConnectorTest.java
deleted file mode 100644
index aec60af..0000000
--- a/src/test/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceConnectorTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.github.jcustenborder.kafka.connect.twitter;
-
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DynamicTest;
-import org.junit.jupiter.api.TestFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Stream;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.DynamicTest.dynamicTest;
-
-public class TwitterSourceConnectorTest {
-
- TwitterSourceConnector connector;
- Map defaultSettings;
-
- @BeforeEach
- public void setup() {
- this.connector = new TwitterSourceConnector();
- this.defaultSettings = new LinkedHashMap<>();
- this.defaultSettings.put(TwitterSourceConnectorConfig.TWITTER_OAUTH_ACCESS_TOKEN_CONF, "xxxxxx");
- this.defaultSettings.put(TwitterSourceConnectorConfig.TWITTER_OAUTH_SECRET_KEY_CONF, "xxxxxx");
- this.defaultSettings.put(TwitterSourceConnectorConfig.TWITTER_OAUTH_CONSUMER_KEY_CONF, "xxxxxx");
- this.defaultSettings.put(TwitterSourceConnectorConfig.TWITTER_OAUTH_ACCESS_TOKEN_SECRET_CONF, "xxxxxx");
- this.defaultSettings.put(TwitterSourceConnectorConfig.KAFKA_STATUS_TOPIC_CONF, "xxxxxx");
- this.defaultSettings.put(TwitterSourceConnectorConfig.PROCESS_DELETES_CONF, "false");
-
- }
-
- List