From 5539e489acdae58853c45604696ca587cbcdc624 Mon Sep 17 00:00:00 2001 From: Jeremy Custenborder Date: Tue, 24 Jan 2017 14:03:25 -0600 Subject: [PATCH] Parent refactor (#1) * Refactored to use parent pom. Moved namespace to common namespace. * Changed to common kafka-connect pipeline. * Checkstyle cleanup. * Moved to config. --- .gitignore | 4 +- Jenkinsfile | 14 +- README.md | 2 +- .../connect-avro-docker.properties | 4 +- docker-compose.yml | 4 +- pom.xml | 147 +++----- src/main/assembly/package.xml | 40 --- .../connect/twitter/StatusConverter.java | 204 +++++------ .../twitter/TwitterSourceConnector.java | 22 +- .../twitter/TwitterSourceConnectorConfig.java | 32 +- .../connect/twitter/TwitterSourceTask.java | 32 +- .../kafka/connect/twitter/VersionUtil.java | 29 ++ .../kafka/connect/twitter/VersionUtil.java | 14 - .../connect/twitter/StatusConverterTest.java | 327 ++++++++++++++++++ .../TwitterSourceConnectorConfigTest.java | 26 ++ .../twitter/TwitterSourceConnectorTest.java | 26 ++ .../twitter/TwitterSourceTaskTest.java | 26 ++ .../connect/twitter/StatusConverterTest.java | 312 ----------------- .../TwitterSourceConnectorConfigTest.java | 11 - .../twitter/TwitterSourceConnectorTest.java | 10 - .../twitter/TwitterSourceTaskTest.java | 10 - 21 files changed, 663 insertions(+), 633 deletions(-) rename {connect => config}/connect-avro-docker.properties (95%) delete mode 100644 src/main/assembly/package.xml rename src/main/java/{io/confluent => com/github/jcustenborder}/kafka/connect/twitter/StatusConverter.java (87%) rename src/main/java/{io/confluent => com/github/jcustenborder}/kafka/connect/twitter/TwitterSourceConnector.java (74%) rename src/main/java/{io/confluent => com/github/jcustenborder}/kafka/connect/twitter/TwitterSourceConnectorConfig.java (85%) rename src/main/java/{io/confluent => com/github/jcustenborder}/kafka/connect/twitter/TwitterSourceTask.java (75%) create mode 100644 src/main/java/com/github/jcustenborder/kafka/connect/twitter/VersionUtil.java delete mode 100644 src/main/java/io/confluent/kafka/connect/twitter/VersionUtil.java create mode 100644 src/test/java/com/github/jcustenborder/kafka/connect/twitter/StatusConverterTest.java create mode 100644 src/test/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceConnectorConfigTest.java create mode 100644 src/test/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceConnectorTest.java create mode 100644 src/test/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceTaskTest.java delete mode 100644 src/test/java/io/confluent/kafka/connect/twitter/StatusConverterTest.java delete mode 100644 src/test/java/io/confluent/kafka/connect/twitter/TwitterSourceConnectorConfigTest.java delete mode 100644 src/test/java/io/confluent/kafka/connect/twitter/TwitterSourceConnectorTest.java delete mode 100644 src/test/java/io/confluent/kafka/connect/twitter/TwitterSourceTaskTest.java diff --git a/.gitignore b/.gitignore index 450f831..3307697 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ -TwitterSourceConnector.properties \ No newline at end of file +TwitterSourceConnector.properties +*.iml +/target/ diff --git a/Jenkinsfile b/Jenkinsfile index 5f34535..e0f7e4c 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -1,13 +1,5 @@ #!groovy -node { - def jdk8_docker_image = 'maven:3.3.3-jdk-8' +@Library('jenkins-pipeline') import com.github.jcustenborder.jenkins.pipeline.KafkaConnectPipeline - checkout scm - - docker.image(jdk8_docker_image).inside { - stage('build') { - sh "mvn --batch-mode clean package" - junit '**/target/surefire-reports/TEST-*.xml' - } - } -} +def pipe = new KafkaConnectPipeline() +pipe.execute() \ No newline at end of file diff --git a/README.md b/README.md index 2e6da79..bc215b1 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ Twitter Status object as possible. ``` name=TwitterSourceConnector tasks.max=1 -connector.class=io.confluent.kafka.connect.twitter.TwitterSourceConnector +connector.class=com.github.jcustenborder.kafka.connect.TwitterSourceConnector twitter.oauth.consumerKey= twitter.oauth.consumerSecret= twitter.oauth.accessToken= diff --git a/connect/connect-avro-docker.properties b/config/connect-avro-docker.properties similarity index 95% rename from connect/connect-avro-docker.properties rename to config/connect-avro-docker.properties index a18c875..c29937d 100644 --- a/connect/connect-avro-docker.properties +++ b/config/connect-avro-docker.properties @@ -1,11 +1,11 @@ # -# Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) +# Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, diff --git a/docker-compose.yml b/docker-compose.yml index 14f4eda..6a5fb44 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,11 @@ # -# Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) +# Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pom.xml b/pom.xml index 55c0167..66cee52 100644 --- a/pom.xml +++ b/pom.xml @@ -1,31 +1,58 @@ - - 4.0.0 + + + + 4.0.0 + + com.github.jcustenborder.kafka.connect + kafka-connect-parent + 0.10.1.0-cp1 + + kafka-connect-twitter + 0.2-SNAPSHOT + kafka-connect-twitter + https://github.com/jcustenborder/kafka-connect-twitter + 2016 + + + Jeremy Custenborder + jeremy@confluent.io + https://api.github.com/users/jcustenborder + + maintainer + + + + + scm:git:https://github.com/jcustenborder/kafka-connect-twitter.git + scm:git:git@github.com:jcustenborder/kafka-connect-twitter.git + https://api.github.com/repos/jcustenborder/kafka-connect-twitter + + + github + https://api.github.com/repos/jcustenborder/kafka-connect-twitter/issues + - 0.10.0.0-cp1 [4.0,) - 4.12 - - - io.confluent.kafka - connect-utils - [0.1.0,0.1.100) - - - com.google.guava - guava - 18.0 - org.twitter4j twitter4j-core @@ -36,81 +63,5 @@ twitter4j-stream ${twitter4j.version} - - org.apache.kafka - connect-api - ${kafka.version} - provided - - - junit - junit - ${junit.version} - test - - - org.mockito - mockito-core - 2.0.106-beta - test - - - - - - org.apache.maven.plugins - maven-jar-plugin - - - - true - true - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 2.5.1 - true - - 1.7 - 1.7 - - - - maven-assembly-plugin - 2.5.3 - - - src/main/assembly/package.xml - - - - - make-assembly - package - - single - - - - - - - - src/main/resources - true - - - - - - confluent - http://packages.confluent.io/maven/ - - - - \ No newline at end of file + diff --git a/src/main/assembly/package.xml b/src/main/assembly/package.xml deleted file mode 100644 index cef09f6..0000000 --- a/src/main/assembly/package.xml +++ /dev/null @@ -1,40 +0,0 @@ - - - package - - dir - - false - - - ${project.basedir} - share/doc/${project.name}/ - - README* - LICENSE* - NOTICE* - licenses/ - - - - ${project.basedir}/config - etc/${project.name} - - * - - - - - - share/java/${project.name} - true - true - - org.apache.kafka:connect-api - - - - diff --git a/src/main/java/io/confluent/kafka/connect/twitter/StatusConverter.java b/src/main/java/com/github/jcustenborder/kafka/connect/twitter/StatusConverter.java similarity index 87% rename from src/main/java/io/confluent/kafka/connect/twitter/StatusConverter.java rename to src/main/java/com/github/jcustenborder/kafka/connect/twitter/StatusConverter.java index 9b6c5f2..34af036 100644 --- a/src/main/java/io/confluent/kafka/connect/twitter/StatusConverter.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/twitter/StatusConverter.java @@ -1,4 +1,19 @@ -package io.confluent.kafka.connect.twitter; +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.twitter; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -15,11 +30,17 @@ public class StatusConverter { - public static final Schema userSchema; + public static final Schema USER_SCHEMA; + public final static Schema PLACE_SCHEMA; + public final static Schema GEO_LOCATION_SCHEMA; + public static final Schema SCHEMA_STATUS_DELETION_NOTICE; + public static final Schema SCHEMA_STATUS_DELETION_NOTICE_KEY; + static final Schema STATUS_SCHEMA_KEY; + static final Schema STATUS_SCHEMA; static { - userSchema = SchemaBuilder.struct() - .name("io.confluent.examples.kafka.connect.twitter.User") + USER_SCHEMA = SchemaBuilder.struct() + .name("com.github.jcustenborder.kafka.connect.twitter.User") .doc("Return the user associated with the status.\n" + "This can be null if the instance is from User.getStatus().") .field("Id", SchemaBuilder.int64().doc("Returns the id of the user").optional().build()) @@ -73,6 +94,83 @@ public class StatusConverter { .build(); } + static { + PLACE_SCHEMA = SchemaBuilder.struct() + .name("com.github.jcustenborder.kafka.connect.twitter.Place") + .optional() + .doc("Returns the place attached to this status") + .field("Name", SchemaBuilder.string().optional().build()) + .field("StreetAddress", SchemaBuilder.string().optional().build()) + .field("CountryCode", SchemaBuilder.string().optional().build()) + .field("Id", SchemaBuilder.string().optional().build()) + .field("Country", SchemaBuilder.string().optional().build()) + .field("PlaceType", SchemaBuilder.string().optional().build()) + .field("URL", SchemaBuilder.string().optional().build()) + .field("FullName", SchemaBuilder.string().optional().build()) + .build(); + } + + static { + GEO_LOCATION_SCHEMA = SchemaBuilder.struct() + .name("com.github.jcustenborder.kafka.connect.twitter.GeoLocation") + .optional() + .doc("Returns The location that this tweet refers to if available.") + .field("Latitude", Schema.FLOAT64_SCHEMA) + .field("Longitude", Schema.FLOAT64_SCHEMA) + .build(); + } + + static { + STATUS_SCHEMA_KEY = SchemaBuilder.struct() + .name("com.github.jcustenborder.kafka.connect.twitter.StatusKey") + .doc("Key for a twitter status.") + .field("Id", Schema.OPTIONAL_INT64_SCHEMA) + .build(); + } + + static { + STATUS_SCHEMA = SchemaBuilder.struct() + .name("com.github.jcustenborder.kafka.connect.twitter.Status") + .field("CreatedAt", Timestamp.builder().doc("Return the created_at").optional().build()) + .field("Id", SchemaBuilder.int64().doc("Returns the id of the status").optional().build()) + .field("Text", SchemaBuilder.string().doc("Returns the text of the status").optional().build()) + .field("Source", SchemaBuilder.string().doc("Returns the source").optional().build()) + .field("Truncated", SchemaBuilder.bool().doc("Test if the status is truncated").optional().build()) + .field("InReplyToStatusId", SchemaBuilder.int64().doc("Returns the in_reply_tostatus_id").optional().build()) + .field("InReplyToUserId", SchemaBuilder.int64().doc("Returns the in_reply_user_id").optional().build()) + .field("InReplyToScreenName", SchemaBuilder.string().doc("Returns the in_reply_to_screen_name").optional().build()) + .field("GeoLocation", GEO_LOCATION_SCHEMA) + .field("Place", PLACE_SCHEMA) + .field("Favorited", SchemaBuilder.bool().doc("Test if the status is favorited").optional().build()) + .field("Retweeted", SchemaBuilder.bool().doc("Test if the status is retweeted").optional().build()) + .field("FavoriteCount", SchemaBuilder.int32().doc("Indicates approximately how many times this Tweet has been \"favorited\" by Twitter users.").optional().build()) + .field("User", USER_SCHEMA) + .field("Retweet", SchemaBuilder.bool().optional().build()) + .field("Contributors", SchemaBuilder.array(Schema.INT64_SCHEMA).doc("Returns an array of contributors, or null if no contributor is associated with this status.").build()) + .field("RetweetCount", SchemaBuilder.int32().doc("Returns the number of times this tweet has been retweeted, or -1 when the tweet was created before this feature was enabled.").optional().build()) + .field("RetweetedByMe", SchemaBuilder.bool().optional().build()) + .field("CurrentUserRetweetId", SchemaBuilder.int64().doc("Returns the authenticating user's retweet's id of this tweet, or -1L when the tweet was created before this feature was enabled.").optional().build()) + .field("PossiblySensitive", SchemaBuilder.bool().optional().build()) + .field("Lang", SchemaBuilder.string().doc("Returns the lang of the status text if available.").optional().build()) + .field("WithheldInCountries", SchemaBuilder.array(Schema.STRING_SCHEMA).doc("Returns the list of country codes where the tweet is withheld").build()) + .build(); + } + + static { + SCHEMA_STATUS_DELETION_NOTICE = SchemaBuilder.struct() + .name("com.github.jcustenborder.kafka.connect.twitter.StatusDeletionNotice") + .field("StatusId", Schema.INT64_SCHEMA) + .field("UserId", Schema.INT64_SCHEMA) + .build(); + } + + static { + SCHEMA_STATUS_DELETION_NOTICE_KEY = SchemaBuilder.struct() + .name("com.github.jcustenborder.kafka.connect.twitter.StatusDeletionNoticeKey") + .field("StatusId", Schema.INT64_SCHEMA) + .build(); + } + public static void convert(User user, Struct struct) { struct .put("Id", user.getId()) @@ -133,26 +231,6 @@ public static void convert(User user, Struct struct) { } - - public final static Schema placeSchema; - - static { - placeSchema = SchemaBuilder.struct() - .name("io.confluent.examples.kafka.connect.twitter.Place") - .optional() - .doc("Returns the place attached to this status") - .field("Name", SchemaBuilder.string().optional().build()) - .field("StreetAddress", SchemaBuilder.string().optional().build()) - .field("CountryCode", SchemaBuilder.string().optional().build()) - .field("Id", SchemaBuilder.string().optional().build()) - .field("Country", SchemaBuilder.string().optional().build()) - .field("PlaceType", SchemaBuilder.string().optional().build()) - .field("URL", SchemaBuilder.string().optional().build()) - .field("FullName", SchemaBuilder.string().optional().build()) - .build(); - } - - public static void convert(Place place, Struct struct) { if (null == place) { return; @@ -167,18 +245,6 @@ public static void convert(Place place, Struct struct) { .put("FullName", place.getFullName()); } - public final static Schema geoLocationSchema; - - static { - geoLocationSchema = SchemaBuilder.struct() - .name("io.confluent.examples.kafka.connect.twitter.GeoLocation") - .optional() - .doc("Returns The location that this tweet refers to if available.") - .field("Latitude", Schema.FLOAT64_SCHEMA) - .field("Longitude", Schema.FLOAT64_SCHEMA) - .build(); - } - public static void convert(GeoLocation geoLocation, Struct struct) { if (null == geoLocation) { return; @@ -187,51 +253,10 @@ public static void convert(GeoLocation geoLocation, Struct struct) { .put("Longitude", geoLocation.getLongitude()); } - static final Schema statusSchemaKey; - - static { - statusSchemaKey = SchemaBuilder.struct() - .name("io.confluent.examples.kafka.connect.twitter.StatusKey") - .doc("Key for a twitter status.") - .field("Id", Schema.OPTIONAL_INT64_SCHEMA) - .build(); - } - public static void convertKey(Status status, Struct struct) { struct.put("Id", status.getId()); } - - static final Schema statusSchema; - - static { - statusSchema = SchemaBuilder.struct() - .name("io.confluent.examples.kafka.connect.twitter.Status") - .field("CreatedAt", Timestamp.builder().doc("Return the created_at").optional().build()) - .field("Id", SchemaBuilder.int64().doc("Returns the id of the status").optional().build()) - .field("Text", SchemaBuilder.string().doc("Returns the text of the status").optional().build()) - .field("Source", SchemaBuilder.string().doc("Returns the source").optional().build()) - .field("Truncated", SchemaBuilder.bool().doc("Test if the status is truncated").optional().build()) - .field("InReplyToStatusId", SchemaBuilder.int64().doc("Returns the in_reply_tostatus_id").optional().build()) - .field("InReplyToUserId", SchemaBuilder.int64().doc("Returns the in_reply_user_id").optional().build()) - .field("InReplyToScreenName", SchemaBuilder.string().doc("Returns the in_reply_to_screen_name").optional().build()) - .field("GeoLocation", geoLocationSchema) - .field("Place", placeSchema) - .field("Favorited", SchemaBuilder.bool().doc("Test if the status is favorited").optional().build()) - .field("Retweeted", SchemaBuilder.bool().doc("Test if the status is retweeted").optional().build()) - .field("FavoriteCount", SchemaBuilder.int32().doc("Indicates approximately how many times this Tweet has been \"favorited\" by Twitter users.").optional().build()) - .field("User", userSchema) - .field("Retweet", SchemaBuilder.bool().optional().build()) - .field("Contributors", SchemaBuilder.array(Schema.INT64_SCHEMA).doc("Returns an array of contributors, or null if no contributor is associated with this status.").build()) - .field("RetweetCount", SchemaBuilder.int32().doc("Returns the number of times this tweet has been retweeted, or -1 when the tweet was created before this feature was enabled.").optional().build()) - .field("RetweetedByMe", SchemaBuilder.bool().optional().build()) - .field("CurrentUserRetweetId", SchemaBuilder.int64().doc("Returns the authenticating user's retweet's id of this tweet, or -1L when the tweet was created before this feature was enabled.").optional().build()) - .field("PossiblySensitive", SchemaBuilder.bool().optional().build()) - .field("Lang", SchemaBuilder.string().doc("Returns the lang of the status text if available.").optional().build()) - .field("WithheldInCountries", SchemaBuilder.array(Schema.STRING_SCHEMA).doc("Returns the list of country codes where the tweet is withheld").build()) - .build(); - } - public static void convert(Status status, Struct struct) { struct .put("CreatedAt", status.getCreatedAt()) @@ -254,7 +279,7 @@ public static void convert(Status status, Struct struct) { Struct userStruct; if (null != status.getUser()) { - userStruct = new Struct(userSchema); + userStruct = new Struct(USER_SCHEMA); convert(status.getUser(), userStruct); } else { userStruct = null; @@ -263,7 +288,7 @@ public static void convert(Status status, Struct struct) { Struct placeStruct; if (null != status.getPlace()) { - placeStruct = new Struct(placeSchema); + placeStruct = new Struct(PLACE_SCHEMA); convert(status.getPlace(), placeStruct); } else { placeStruct = null; @@ -272,7 +297,7 @@ public static void convert(Status status, Struct struct) { Struct geoLocationStruct; if (null != status.getGeoLocation()) { - geoLocationStruct = new Struct(geoLocationSchema); + geoLocationStruct = new Struct(GEO_LOCATION_SCHEMA); convert(status.getGeoLocation(), geoLocationStruct); } else { geoLocationStruct = null; @@ -297,30 +322,11 @@ public static void convert(Status status, Struct struct) { struct.put("WithheldInCountries", withheldInCountries); } - public static final Schema schemaStatusDeletionNotice; - - static { - schemaStatusDeletionNotice = SchemaBuilder.struct() - .name("io.confluent.examples.kafka.connect.twitter.StatusDeletionNotice") - .field("StatusId", Schema.INT64_SCHEMA) - .field("UserId", Schema.INT64_SCHEMA) - .build(); - } - public static void convert(StatusDeletionNotice statusDeletionNotice, Struct struct) { struct.put("StatusId", statusDeletionNotice.getStatusId()); struct.put("UserId", statusDeletionNotice.getUserId()); } - public static final Schema schemaStatusDeletionNoticeKey; - - static { - schemaStatusDeletionNoticeKey = SchemaBuilder.struct() - .name("io.confluent.examples.kafka.connect.twitter.StatusDeletionNoticeKey") - .field("StatusId", Schema.INT64_SCHEMA) - .build(); - } - public static void convertKey(StatusDeletionNotice statusDeletionNotice, Struct struct) { struct.put("StatusId", statusDeletionNotice.getStatusId()); } diff --git a/src/main/java/io/confluent/kafka/connect/twitter/TwitterSourceConnector.java b/src/main/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceConnector.java similarity index 74% rename from src/main/java/io/confluent/kafka/connect/twitter/TwitterSourceConnector.java rename to src/main/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceConnector.java index f34acdc..93cc7a3 100644 --- a/src/main/java/io/confluent/kafka/connect/twitter/TwitterSourceConnector.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceConnector.java @@ -1,4 +1,19 @@ -package io.confluent.kafka.connect.twitter; +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.twitter; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -15,6 +30,7 @@ public class TwitterSourceConnector extends SourceConnector { private static Logger log = LoggerFactory.getLogger(TwitterSourceConnector.class); + Map settings; private TwitterSourceConnectorConfig config; @Override @@ -22,8 +38,6 @@ public String version() { return VersionUtil.getVersion(); } - Map settings; - @Override public void start(Map map) { this.config = new TwitterSourceConnectorConfig(map); @@ -64,7 +78,7 @@ public List> taskConfigs(int maxTasks) { Map taskSettings = new HashMap<>(); taskSettings.putAll(this.settings); - if(!k.isEmpty()){ + if (!k.isEmpty()) { taskSettings.put(TwitterSourceConnectorConfig.FILTER_KEYWORDS_CONF, Joiner.on(',').join(k)); taskConfigs.add(taskSettings); } diff --git a/src/main/java/io/confluent/kafka/connect/twitter/TwitterSourceConnectorConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceConnectorConfig.java similarity index 85% rename from src/main/java/io/confluent/kafka/connect/twitter/TwitterSourceConnectorConfig.java rename to src/main/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceConnectorConfig.java index 92ce9b1..0e5b692 100644 --- a/src/main/java/io/confluent/kafka/connect/twitter/TwitterSourceConnectorConfig.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceConnectorConfig.java @@ -1,4 +1,19 @@ -package io.confluent.kafka.connect.twitter; +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.twitter; import com.google.common.collect.Iterables; import org.apache.kafka.common.config.AbstractConfig; @@ -16,23 +31,23 @@ public class TwitterSourceConnectorConfig extends AbstractConfig { public static final String TWITTER_DEBUG_CONF = "twitter.debug"; - private static final String TWITTER_DEBUG_DOC = "Flag to enable debug logging for the twitter api."; public static final String TWITTER_OAUTH_CONSUMER_KEY_CONF = "twitter.oauth.consumerKey"; - private static final String TWITTER_OAUTH_CONSUMER_KEY_DOC = "OAuth consumer key"; public static final String TWITTER_OAUTH_SECRET_KEY_CONF = "twitter.oauth.consumerSecret"; - private static final String TWITTER_OAUTH_SECRET_KEY_DOC = "OAuth consumer secret"; public static final String TWITTER_OAUTH_ACCESS_TOKEN_CONF = "twitter.oauth.accessToken"; - private static final String TWITTER_OAUTH_ACCESS_TOKEN_DOC = "OAuth access token"; public static final String TWITTER_OAUTH_ACCESS_TOKEN_SECRET_CONF = "twitter.oauth.accessTokenSecret"; - private static final String TWITTER_OAUTH_ACCESS_TOKEN_SECRET_DOC = "OAuth access token secret"; public static final String FILTER_KEYWORDS_CONF = "filter.keywords"; - private static final String FILTER_KEYWORDS_DOC = "Twitter keywords to filter for."; public static final String KAFKA_STATUS_TOPIC_CONF = "kafka.status.topic"; public static final String KAFKA_STATUS_TOPIC_DOC = "Kafka topic to write the statuses to."; public static final String KAFKA_DELETE_TOPIC_CONF = "kafka.delete.topic"; public static final String KAFKA_DELETE_TOPIC_DOC = "Kafka topic to write delete events to."; public static final String PROCESS_DELETES_CONF = "process.deletes"; public static final String PROCESS_DELETES_DOC = "Should this connector process deletes."; + private static final String TWITTER_DEBUG_DOC = "Flag to enable debug logging for the twitter api."; + private static final String TWITTER_OAUTH_CONSUMER_KEY_DOC = "OAuth consumer key"; + private static final String TWITTER_OAUTH_SECRET_KEY_DOC = "OAuth consumer secret"; + private static final String TWITTER_OAUTH_ACCESS_TOKEN_DOC = "OAuth access token"; + private static final String TWITTER_OAUTH_ACCESS_TOKEN_SECRET_DOC = "OAuth access token secret"; + private static final String FILTER_KEYWORDS_DOC = "Twitter keywords to filter for."; public TwitterSourceConnectorConfig(ConfigDef config, Map parsedConfig) { super(config, parsedConfig); @@ -52,8 +67,7 @@ public static ConfigDef conf() { .define(FILTER_KEYWORDS_CONF, Type.LIST, Importance.HIGH, FILTER_KEYWORDS_DOC) .define(KAFKA_STATUS_TOPIC_CONF, Type.STRING, Importance.HIGH, KAFKA_STATUS_TOPIC_DOC) .define(KAFKA_DELETE_TOPIC_CONF, Type.STRING, Importance.HIGH, KAFKA_DELETE_TOPIC_DOC) - .define(PROCESS_DELETES_CONF, Type.BOOLEAN, Importance.HIGH, PROCESS_DELETES_DOC) - ; + .define(PROCESS_DELETES_CONF, Type.BOOLEAN, Importance.HIGH, PROCESS_DELETES_DOC); } public boolean twitterDebug() { diff --git a/src/main/java/io/confluent/kafka/connect/twitter/TwitterSourceTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceTask.java similarity index 75% rename from src/main/java/io/confluent/kafka/connect/twitter/TwitterSourceTask.java rename to src/main/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceTask.java index dd943b8..fc09be6 100644 --- a/src/main/java/io/confluent/kafka/connect/twitter/TwitterSourceTask.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceTask.java @@ -1,4 +1,19 @@ -package io.confluent.kafka.connect.twitter; +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.twitter; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; @@ -25,14 +40,13 @@ public class TwitterSourceTask extends SourceTask implements StatusListener { final ConcurrentLinkedDeque messageQueue = new ConcurrentLinkedDeque<>(); TwitterStream twitterStream; + TwitterSourceConnectorConfig config; @Override public String version() { return VersionUtil.getVersion(); } - TwitterSourceConnectorConfig config; - @Override public void start(Map map) { this.config = new TwitterSourceConnectorConfig(map); @@ -92,8 +106,8 @@ public void stop() { @Override public void onStatus(Status status) { try { - Struct keyStruct = new Struct(StatusConverter.statusSchemaKey); - Struct valueStruct = new Struct(StatusConverter.statusSchema); + Struct keyStruct = new Struct(StatusConverter.STATUS_SCHEMA_KEY); + Struct valueStruct = new Struct(StatusConverter.STATUS_SCHEMA); StatusConverter.convertKey(status, keyStruct); StatusConverter.convert(status, valueStruct); @@ -101,7 +115,7 @@ public void onStatus(Status status) { Map sourcePartition = ImmutableMap.of(); Map sourceOffset = ImmutableMap.of(); - SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, this.config.kafkaStatusTopic(), StatusConverter.statusSchemaKey, keyStruct, StatusConverter.statusSchema, valueStruct); + SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, this.config.kafkaStatusTopic(), StatusConverter.STATUS_SCHEMA_KEY, keyStruct, StatusConverter.STATUS_SCHEMA, valueStruct); this.messageQueue.add(record); } catch (Exception ex) { if (log.isErrorEnabled()) { @@ -117,8 +131,8 @@ public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) { } try { - Struct keyStruct = new Struct(StatusConverter.schemaStatusDeletionNoticeKey); - Struct valueStruct = new Struct(StatusConverter.schemaStatusDeletionNotice); + Struct keyStruct = new Struct(StatusConverter.SCHEMA_STATUS_DELETION_NOTICE_KEY); + Struct valueStruct = new Struct(StatusConverter.SCHEMA_STATUS_DELETION_NOTICE); StatusConverter.convertKey(statusDeletionNotice, keyStruct); StatusConverter.convert(statusDeletionNotice, valueStruct); @@ -126,7 +140,7 @@ public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) { Map sourcePartition = ImmutableMap.of(); Map sourceOffset = ImmutableMap.of(); - SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, this.config.kafkaDeleteTopic(), StatusConverter.schemaStatusDeletionNoticeKey, keyStruct, StatusConverter.schemaStatusDeletionNotice, valueStruct); + SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, this.config.kafkaDeleteTopic(), StatusConverter.SCHEMA_STATUS_DELETION_NOTICE_KEY, keyStruct, StatusConverter.SCHEMA_STATUS_DELETION_NOTICE, valueStruct); this.messageQueue.add(record); } catch (Exception ex) { if (log.isErrorEnabled()) { diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/twitter/VersionUtil.java b/src/main/java/com/github/jcustenborder/kafka/connect/twitter/VersionUtil.java new file mode 100644 index 0000000..6a68eef --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/twitter/VersionUtil.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.twitter; + +/** + * Created by jeremy on 5/3/16. + */ +class VersionUtil { + public static String getVersion() { + try { + return VersionUtil.class.getPackage().getImplementationVersion(); + } catch (Exception ex) { + return "0.0.0.0"; + } + } +} diff --git a/src/main/java/io/confluent/kafka/connect/twitter/VersionUtil.java b/src/main/java/io/confluent/kafka/connect/twitter/VersionUtil.java deleted file mode 100644 index f1d9cad..0000000 --- a/src/main/java/io/confluent/kafka/connect/twitter/VersionUtil.java +++ /dev/null @@ -1,14 +0,0 @@ -package io.confluent.kafka.connect.twitter; - -/** - * Created by jeremy on 5/3/16. - */ -class VersionUtil { - public static String getVersion() { - try { - return VersionUtil.class.getPackage().getImplementationVersion(); - } catch(Exception ex){ - return "0.0.0.0"; - } - } -} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/twitter/StatusConverterTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/twitter/StatusConverterTest.java new file mode 100644 index 0000000..6ca4b5c --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/twitter/StatusConverterTest.java @@ -0,0 +1,327 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.twitter; + +import org.apache.kafka.connect.data.Struct; +import org.junit.jupiter.api.Test; +import twitter4j.GeoLocation; +import twitter4j.Place; +import twitter4j.Status; +import twitter4j.StatusDeletionNotice; +import twitter4j.User; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class StatusConverterTest { + + public static GeoLocation mockGeoLocation() { + return new GeoLocation(30.2672D, 97.7431D); + } + + public static Place mockPlace() { + Place place = mock(Place.class); + when(place.getName()).thenReturn("Example place"); + when(place.getStreetAddress()).thenReturn("123 Example St"); + when(place.getCountryCode()).thenReturn("US"); + when(place.getId()).thenReturn("asdfaisdfasd"); + when(place.getCountry()).thenReturn("United States"); + when(place.getPlaceType()).thenReturn("ADF"); + when(place.getURL()).thenReturn("http://www.example.com/"); + when(place.getFullName()).thenReturn("Example place"); + return place; + } + + public static Status mockStatus() { + Status status = mock(Status.class); + User user = mockUser(); + GeoLocation geoLocation = mockGeoLocation(); + Place place = mockPlace(); + + when(status.getCreatedAt()).thenReturn(new Date(1471667709998L)); + when(status.getId()).thenReturn(9823452L); + when(status.getText()).thenReturn("This is a twit"); + when(status.getSource()).thenReturn("foo"); + when(status.isTruncated()).thenReturn(false); + when(status.getInReplyToStatusId()).thenReturn(2345234L); + when(status.getInReplyToUserId()).thenReturn(8756786L); + when(status.getInReplyToScreenName()).thenReturn("foo"); + when(status.getGeoLocation()).thenReturn(geoLocation); + when(status.getPlace()).thenReturn(place); + when(status.isFavorited()).thenReturn(true); + when(status.isRetweeted()).thenReturn(false); + when(status.getFavoriteCount()).thenReturn(1234); + when(status.getUser()).thenReturn(user); + when(status.isRetweet()).thenReturn(false); + when(status.getContributors()).thenReturn(new long[]{431234L, 986789678L}); + when(status.getRetweetCount()).thenReturn(1234); + when(status.isRetweetedByMe()).thenReturn(false); + when(status.getCurrentUserRetweetId()).thenReturn(653456345L); + when(status.isPossiblySensitive()).thenReturn(false); + when(status.getLang()).thenReturn("en-US"); + when(status.getWithheldInCountries()).thenReturn(new String[]{"CN"}); + + return status; + } + + public static User mockUser() { + User user = mock(User.class); + + when(user.getId()).thenReturn(1234L); + when(user.getName()).thenReturn("Example User"); + when(user.getScreenName()).thenReturn("example"); + when(user.getLocation()).thenReturn("Austin, TX"); + when(user.getDescription()).thenReturn("This is a description"); + when(user.isContributorsEnabled()).thenReturn(true); + when(user.getProfileImageURL()).thenReturn("http://i.twittercdn.com/profile.jpg"); + when(user.getBiggerProfileImageURL()).thenReturn("http://i.twittercdn.com/biggerprofile.jpg"); + when(user.getMiniProfileImageURL()).thenReturn("http://i.twittercdn.com/mini.profile.jpg"); + when(user.getOriginalProfileImageURL()).thenReturn("http://i.twittercdn.com/original.profile.jpg"); + when(user.getProfileImageURLHttps()).thenReturn("https://i.twittercdn.com/profile.jpg"); + when(user.getBiggerProfileImageURLHttps()).thenReturn("https://i.twittercdn.com/bigger.profile.jpg"); + when(user.getMiniProfileImageURLHttps()).thenReturn("https://i.twittercdn.com/mini.profile.jpg"); + when(user.getOriginalProfileImageURLHttps()).thenReturn("https://i.twittercdn.com/original.profile.jpg"); + when(user.isDefaultProfileImage()).thenReturn(true); + when(user.getURL()).thenReturn("https://www.twitter.com/example"); + when(user.isProtected()).thenReturn(false); + when(user.getFollowersCount()).thenReturn(54245); + when(user.getProfileBackgroundColor()).thenReturn("#ffffff"); + when(user.getProfileTextColor()).thenReturn("#000000"); + when(user.getProfileLinkColor()).thenReturn("#aaaaaa"); + when(user.getProfileSidebarFillColor()).thenReturn("#333333"); + when(user.getProfileSidebarBorderColor()).thenReturn("#555555"); + when(user.isProfileUseBackgroundImage()).thenReturn(true); + when(user.isDefaultProfile()).thenReturn(true); + when(user.isShowAllInlineMedia()).thenReturn(true); + when(user.getFriendsCount()).thenReturn(452345234); + when(user.getCreatedAt()).thenReturn(new Date(1471665653209L)); + when(user.getFavouritesCount()).thenReturn(12341); + when(user.getUtcOffset()).thenReturn(8); + when(user.getTimeZone()).thenReturn("UTC"); + when(user.getProfileBackgroundImageURL()).thenReturn("https://i.twittercdn.com/original.background.jpg"); + when(user.getProfileBackgroundImageUrlHttps()).thenReturn("https://i.twittercdn.com/original.background.jpg"); + when(user.getProfileBannerURL()).thenReturn("https://i.twittercdn.com/original.banner.jpg"); + when(user.getProfileBannerRetinaURL()).thenReturn("https://i.twittercdn.com/original.banner.jpg"); + when(user.getProfileBannerIPadURL()).thenReturn("https://i.twittercdn.com/original.banner.jpg"); + when(user.getProfileBannerIPadRetinaURL()).thenReturn("https://i.twittercdn.com/original.banner.jpg"); + when(user.getProfileBannerMobileURL()).thenReturn("https://i.twittercdn.com/original.banner.jpg"); + when(user.getProfileBannerMobileRetinaURL()).thenReturn("https://i.twittercdn.com/original.banner.jpg"); + when(user.isProfileBackgroundTiled()).thenReturn(false); + when(user.getLang()).thenReturn("en-us"); + when(user.getStatusesCount()).thenReturn(543); + when(user.isGeoEnabled()).thenReturn(true); + when(user.isVerified()).thenReturn(true); + when(user.isTranslator()).thenReturn(false); + when(user.getListedCount()).thenReturn(4); + when(user.isFollowRequestSent()).thenReturn(false); + when(user.getWithheldInCountries()).thenReturn(new String[]{"CN"}); + + + return user; + } + + public static StatusDeletionNotice mockStatusDeletionNotice() { + StatusDeletionNotice statusDeletionNotice = mock(StatusDeletionNotice.class); + when(statusDeletionNotice.getStatusId()).thenReturn(1234565345L); + when(statusDeletionNotice.getUserId()).thenReturn(6543456354L); + return statusDeletionNotice; + } + + List convert(long[] values) { + List list = new ArrayList<>(); + for (Long l : values) { + list.add(l); + } + return list; + } + + List convert(String[] values) { + List list = new ArrayList<>(); + for (String l : values) { + list.add(l); + } + return list; + } + + void assertStatus(Status status, Struct struct) { + assertEquals(status.getCreatedAt(), struct.get("CreatedAt"), "CreatedAt does not match."); + assertEquals(status.getId(), struct.get("Id"), "Id does not match."); + assertEquals(status.getText(), struct.get("Text"), "Text does not match."); + assertEquals(status.getSource(), struct.get("Source"), "Source does not match."); + assertEquals(status.isTruncated(), struct.get("Truncated"), "Truncated does not match."); + assertEquals(status.getInReplyToStatusId(), struct.get("InReplyToStatusId"), "InReplyToStatusId does not match."); + assertEquals(status.getInReplyToUserId(), struct.get("InReplyToUserId"), "InReplyToUserId does not match."); + assertEquals(status.getInReplyToScreenName(), struct.get("InReplyToScreenName"), "InReplyToScreenName does not match."); + assertEquals(status.isFavorited(), struct.get("Favorited"), "Favorited does not match."); + assertEquals(status.isRetweeted(), struct.get("Retweeted"), "Retweeted does not match."); + assertEquals(status.getFavoriteCount(), struct.get("FavoriteCount"), "FavoriteCount does not match."); + assertEquals(status.isRetweet(), struct.get("Retweet"), "Retweet does not match."); + assertEquals(status.getRetweetCount(), struct.get("RetweetCount"), "RetweetCount does not match."); + assertEquals(status.isRetweetedByMe(), struct.get("RetweetedByMe"), "RetweetedByMe does not match."); + assertEquals(status.getCurrentUserRetweetId(), struct.get("CurrentUserRetweetId"), "CurrentUserRetweetId does not match."); + assertEquals(status.isPossiblySensitive(), struct.get("PossiblySensitive"), "PossiblySensitive does not match."); + assertEquals(status.getLang(), struct.get("Lang"), "Lang does not match."); + + assertUser(status.getUser(), struct.getStruct("User")); + assertPlace(status.getPlace(), struct.getStruct("Place")); + assertGeoLocation(status.getGeoLocation(), struct.getStruct("GeoLocation")); + + assertEquals(convert(status.getContributors()), struct.getArray("Contributors"), "Contributors does not match."); + assertEquals(convert(status.getWithheldInCountries()), struct.get("WithheldInCountries"), "WithheldInCountries does not match."); + } + + void assertGeoLocation(GeoLocation geoLocation, Struct struct) { + assertEquals(struct.getFloat64("Latitude"), 1, geoLocation.getLatitude()); + assertEquals(struct.getFloat64("Longitude"), 1, geoLocation.getLongitude()); + } + + void assertPlace(Place place, Struct struct) { + assertEquals(place.getName(), struct.get("Name"), "Name does not match."); + assertEquals(place.getStreetAddress(), struct.get("StreetAddress"), "StreetAddress does not match."); + assertEquals(place.getCountryCode(), struct.get("CountryCode"), "CountryCode does not match."); + assertEquals(place.getId(), struct.get("Id"), "Id does not match."); + assertEquals(place.getCountry(), struct.get("Country"), "Country does not match."); + assertEquals(place.getPlaceType(), struct.get("PlaceType"), "PlaceType does not match."); + assertEquals(place.getURL(), struct.get("URL"), "URL does not match."); + assertEquals(place.getFullName(), struct.get("FullName"), "FullName does not match."); + } + + void assertUser(User user, Struct struct) { + assertNotNull(struct, "struct should not be null."); + assertEquals(user.getId(), struct.get("Id"), "Id does not match."); + assertEquals(user.getName(), struct.get("Name"), "Name does not match."); + assertEquals(user.getScreenName(), struct.get("ScreenName"), "ScreenName does not match."); + assertEquals(user.getLocation(), struct.get("Location"), "Location does not match."); + assertEquals(user.getDescription(), struct.get("Description"), "Description does not match."); + assertEquals(user.isContributorsEnabled(), struct.get("ContributorsEnabled"), "ContributorsEnabled does not match."); + assertEquals(user.getProfileImageURL(), struct.get("ProfileImageURL"), "ProfileImageURL does not match."); + assertEquals(user.getBiggerProfileImageURL(), struct.get("BiggerProfileImageURL"), "BiggerProfileImageURL does not match."); + assertEquals(user.getMiniProfileImageURL(), struct.get("MiniProfileImageURL"), "MiniProfileImageURL does not match."); + assertEquals(user.getOriginalProfileImageURL(), struct.get("OriginalProfileImageURL"), "OriginalProfileImageURL does not match."); + assertEquals(user.getProfileImageURLHttps(), struct.get("ProfileImageURLHttps"), "ProfileImageURLHttps does not match."); + assertEquals(user.getBiggerProfileImageURLHttps(), struct.get("BiggerProfileImageURLHttps"), "BiggerProfileImageURLHttps does not match."); + assertEquals(user.getMiniProfileImageURLHttps(), struct.get("MiniProfileImageURLHttps"), "MiniProfileImageURLHttps does not match."); + assertEquals(user.getOriginalProfileImageURLHttps(), struct.get("OriginalProfileImageURLHttps"), "OriginalProfileImageURLHttps does not match."); + assertEquals(user.isDefaultProfileImage(), struct.get("DefaultProfileImage"), "DefaultProfileImage does not match."); + assertEquals(user.getURL(), struct.get("URL"), "URL does not match."); + assertEquals(user.isProtected(), struct.get("Protected"), "Protected does not match."); + assertEquals(user.getFollowersCount(), struct.get("FollowersCount"), "FollowersCount does not match."); + assertEquals(user.getProfileBackgroundColor(), struct.get("ProfileBackgroundColor"), "ProfileBackgroundColor does not match."); + assertEquals(user.getProfileTextColor(), struct.get("ProfileTextColor"), "ProfileTextColor does not match."); + assertEquals(user.getProfileLinkColor(), struct.get("ProfileLinkColor"), "ProfileLinkColor does not match."); + assertEquals(user.getProfileSidebarFillColor(), struct.get("ProfileSidebarFillColor"), "ProfileSidebarFillColor does not match."); + assertEquals(user.getProfileSidebarBorderColor(), struct.get("ProfileSidebarBorderColor"), "ProfileSidebarBorderColor does not match."); + assertEquals(user.isProfileUseBackgroundImage(), struct.get("ProfileUseBackgroundImage"), "ProfileUseBackgroundImage does not match."); + assertEquals(user.isDefaultProfile(), struct.get("DefaultProfile"), "DefaultProfile does not match."); + assertEquals(user.isShowAllInlineMedia(), struct.get("ShowAllInlineMedia"), "ShowAllInlineMedia does not match."); + assertEquals(user.getFriendsCount(), struct.get("FriendsCount"), "FriendsCount does not match."); + assertEquals(user.getCreatedAt(), struct.get("CreatedAt"), "CreatedAt does not match."); + assertEquals(user.getFavouritesCount(), struct.get("FavouritesCount"), "FavouritesCount does not match."); + assertEquals(user.getUtcOffset(), struct.get("UtcOffset"), "UtcOffset does not match."); + assertEquals(user.getTimeZone(), struct.get("TimeZone"), "TimeZone does not match."); + assertEquals(user.getProfileBackgroundImageURL(), struct.get("ProfileBackgroundImageURL"), "ProfileBackgroundImageURL does not match."); + assertEquals(user.getProfileBackgroundImageUrlHttps(), struct.get("ProfileBackgroundImageUrlHttps"), "ProfileBackgroundImageUrlHttps does not match."); + assertEquals(user.getProfileBannerURL(), struct.get("ProfileBannerURL"), "ProfileBannerURL does not match."); + assertEquals(user.getProfileBannerRetinaURL(), struct.get("ProfileBannerRetinaURL"), "ProfileBannerRetinaURL does not match."); + assertEquals(user.getProfileBannerIPadURL(), struct.get("ProfileBannerIPadURL"), "ProfileBannerIPadURL does not match."); + assertEquals(user.getProfileBannerIPadRetinaURL(), struct.get("ProfileBannerIPadRetinaURL"), "ProfileBannerIPadRetinaURL does not match."); + assertEquals(user.getProfileBannerMobileURL(), struct.get("ProfileBannerMobileURL"), "ProfileBannerMobileURL does not match."); + assertEquals(user.getProfileBannerMobileRetinaURL(), struct.get("ProfileBannerMobileRetinaURL"), "ProfileBannerMobileRetinaURL does not match."); + assertEquals(user.isProfileBackgroundTiled(), struct.get("ProfileBackgroundTiled"), "ProfileBackgroundTiled does not match."); + assertEquals(user.getLang(), struct.get("Lang"), "Lang does not match."); + assertEquals(user.getStatusesCount(), struct.get("StatusesCount"), "StatusesCount does not match."); + assertEquals(user.isGeoEnabled(), struct.get("GeoEnabled"), "GeoEnabled does not match."); + assertEquals(user.isVerified(), struct.get("Verified"), "Verified does not match."); + assertEquals(user.isTranslator(), struct.get("Translator"), "Translator does not match."); + assertEquals(user.getListedCount(), struct.get("ListedCount"), "ListedCount does not match."); + assertEquals(user.isFollowRequestSent(), struct.get("FollowRequestSent"), "FollowRequestSent does not match."); + } + + void assertKey(Status status, Struct struct) { + assertEquals(status.getId(), struct.get("Id"), "Id does not match."); + } + + @Test + public void convertStatus() { + Status status = mockStatus(); + Struct struct = new Struct(StatusConverter.STATUS_SCHEMA); + StatusConverter.convert(status, struct); + assertStatus(status, struct); + } + + @Test + public void convertUser() { + User user = mockUser(); + Struct struct = new Struct(StatusConverter.USER_SCHEMA); + StatusConverter.convert(user, struct); + assertUser(user, struct); + } + + @Test + public void convertPlace() { + Place place = mockPlace(); + Struct struct = new Struct(StatusConverter.PLACE_SCHEMA); + StatusConverter.convert(place, struct); + assertPlace(place, struct); + } + + @Test + public void convertGeoLocation() { + GeoLocation geoLocation = mockGeoLocation(); + Struct struct = new Struct(StatusConverter.GEO_LOCATION_SCHEMA); + StatusConverter.convert(geoLocation, struct); + assertGeoLocation(geoLocation, struct); + } + + @Test + public void convertStatusKey() { + Status status = mockStatus(); + Struct struct = new Struct(StatusConverter.STATUS_SCHEMA_KEY); + StatusConverter.convertKey(status, struct); + assertKey(status, struct); + } + + void assertStatusDeletionNotice(StatusDeletionNotice statusDeletionNotice, Struct struct) { + assertEquals(statusDeletionNotice.getStatusId(), struct.get("StatusId"), "StatusId does not match."); + assertEquals(statusDeletionNotice.getUserId(), struct.get("UserId"), "UserId does not match."); + } + + void assertStatusDeletionNoticeKey(StatusDeletionNotice statusDeletionNotice, Struct struct) { + assertEquals(statusDeletionNotice.getStatusId(), struct.get("StatusId"), "StatusId does not match."); + } + + @Test + public void convertStatusDeletionNotice() { + StatusDeletionNotice statusDeletionNotice = mockStatusDeletionNotice(); + Struct struct = new Struct(StatusConverter.SCHEMA_STATUS_DELETION_NOTICE); + StatusConverter.convert(statusDeletionNotice, struct); + assertStatusDeletionNotice(statusDeletionNotice, struct); + } + + @Test + public void convertKeyStatusDeletionNotice() { + StatusDeletionNotice statusDeletionNotice = mockStatusDeletionNotice(); + Struct struct = new Struct(StatusConverter.SCHEMA_STATUS_DELETION_NOTICE_KEY); + StatusConverter.convertKey(statusDeletionNotice, struct); + assertStatusDeletionNoticeKey(statusDeletionNotice, struct); + } +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceConnectorConfigTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceConnectorConfigTest.java new file mode 100644 index 0000000..6faaa46 --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceConnectorConfigTest.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.twitter; + +import io.confluent.kafka.connect.utils.config.MarkdownFormatter; +import org.junit.jupiter.api.Test; + +public class TwitterSourceConnectorConfigTest { + @Test + public void doc() { + System.out.println(MarkdownFormatter.toMarkdown(TwitterSourceConnectorConfig.conf())); + } +} \ No newline at end of file diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceConnectorTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceConnectorTest.java new file mode 100644 index 0000000..7285610 --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceConnectorTest.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.twitter; + + +import org.junit.jupiter.api.Test; + +public class TwitterSourceConnectorTest { + @Test + public void test() { + // Congrats on a passing test! + } +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceTaskTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceTaskTest.java new file mode 100644 index 0000000..1c7e083 --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/twitter/TwitterSourceTaskTest.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.twitter; + + +import org.junit.jupiter.api.Test; + +public class TwitterSourceTaskTest { + @Test + public void test() { + // Congrats on a passing test! + } +} \ No newline at end of file diff --git a/src/test/java/io/confluent/kafka/connect/twitter/StatusConverterTest.java b/src/test/java/io/confluent/kafka/connect/twitter/StatusConverterTest.java deleted file mode 100644 index 90a9525..0000000 --- a/src/test/java/io/confluent/kafka/connect/twitter/StatusConverterTest.java +++ /dev/null @@ -1,312 +0,0 @@ -package io.confluent.kafka.connect.twitter; - -import org.apache.kafka.connect.data.Struct; -import org.junit.Test; -import twitter4j.GeoLocation; -import twitter4j.Place; -import twitter4j.Status; -import twitter4j.StatusDeletionNotice; -import twitter4j.User; - -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class StatusConverterTest { - - public static GeoLocation mockGeoLocation() { - return new GeoLocation(30.2672D, 97.7431D); - } - - public static Place mockPlace() { - Place place = mock(Place.class); - when(place.getName()).thenReturn("Example place"); - when(place.getStreetAddress()).thenReturn("123 Example St"); - when(place.getCountryCode()).thenReturn("US"); - when(place.getId()).thenReturn("asdfaisdfasd"); - when(place.getCountry()).thenReturn("United States"); - when(place.getPlaceType()).thenReturn("ADF"); - when(place.getURL()).thenReturn("http://www.example.com/"); - when(place.getFullName()).thenReturn("Example place"); - return place; - } - - public static Status mockStatus() { - Status status = mock(Status.class); - User user = mockUser(); - GeoLocation geoLocation = mockGeoLocation(); - Place place = mockPlace(); - - when(status.getCreatedAt()).thenReturn(new Date(1471667709998L)); - when(status.getId()).thenReturn(9823452L); - when(status.getText()).thenReturn("This is a twit"); - when(status.getSource()).thenReturn("foo"); - when(status.isTruncated()).thenReturn(false); - when(status.getInReplyToStatusId()).thenReturn(2345234L); - when(status.getInReplyToUserId()).thenReturn(8756786L); - when(status.getInReplyToScreenName()).thenReturn("foo"); - when(status.getGeoLocation()).thenReturn(geoLocation); - when(status.getPlace()).thenReturn(place); - when(status.isFavorited()).thenReturn(true); - when(status.isRetweeted()).thenReturn(false); - when(status.getFavoriteCount()).thenReturn(1234); - when(status.getUser()).thenReturn(user); - when(status.isRetweet()).thenReturn(false); - when(status.getContributors()).thenReturn(new long[]{431234L, 986789678L}); - when(status.getRetweetCount()).thenReturn(1234); - when(status.isRetweetedByMe()).thenReturn(false); - when(status.getCurrentUserRetweetId()).thenReturn(653456345L); - when(status.isPossiblySensitive()).thenReturn(false); - when(status.getLang()).thenReturn("en-US"); - when(status.getWithheldInCountries()).thenReturn(new String[]{"CN"}); - - return status; - } - - public static User mockUser() { - User user = mock(User.class); - - when(user.getId()).thenReturn(1234L); - when(user.getName()).thenReturn("Example User"); - when(user.getScreenName()).thenReturn("example"); - when(user.getLocation()).thenReturn("Austin, TX"); - when(user.getDescription()).thenReturn("This is a description"); - when(user.isContributorsEnabled()).thenReturn(true); - when(user.getProfileImageURL()).thenReturn("http://i.twittercdn.com/profile.jpg"); - when(user.getBiggerProfileImageURL()).thenReturn("http://i.twittercdn.com/biggerprofile.jpg"); - when(user.getMiniProfileImageURL()).thenReturn("http://i.twittercdn.com/mini.profile.jpg"); - when(user.getOriginalProfileImageURL()).thenReturn("http://i.twittercdn.com/original.profile.jpg"); - when(user.getProfileImageURLHttps()).thenReturn("https://i.twittercdn.com/profile.jpg"); - when(user.getBiggerProfileImageURLHttps()).thenReturn("https://i.twittercdn.com/bigger.profile.jpg"); - when(user.getMiniProfileImageURLHttps()).thenReturn("https://i.twittercdn.com/mini.profile.jpg"); - when(user.getOriginalProfileImageURLHttps()).thenReturn("https://i.twittercdn.com/original.profile.jpg"); - when(user.isDefaultProfileImage()).thenReturn(true); - when(user.getURL()).thenReturn("https://www.twitter.com/example"); - when(user.isProtected()).thenReturn(false); - when(user.getFollowersCount()).thenReturn(54245); - when(user.getProfileBackgroundColor()).thenReturn("#ffffff"); - when(user.getProfileTextColor()).thenReturn("#000000"); - when(user.getProfileLinkColor()).thenReturn("#aaaaaa"); - when(user.getProfileSidebarFillColor()).thenReturn("#333333"); - when(user.getProfileSidebarBorderColor()).thenReturn("#555555"); - when(user.isProfileUseBackgroundImage()).thenReturn(true); - when(user.isDefaultProfile()).thenReturn(true); - when(user.isShowAllInlineMedia()).thenReturn(true); - when(user.getFriendsCount()).thenReturn(452345234); - when(user.getCreatedAt()).thenReturn(new Date(1471665653209L)); - when(user.getFavouritesCount()).thenReturn(12341); - when(user.getUtcOffset()).thenReturn(8); - when(user.getTimeZone()).thenReturn("UTC"); - when(user.getProfileBackgroundImageURL()).thenReturn("https://i.twittercdn.com/original.background.jpg"); - when(user.getProfileBackgroundImageUrlHttps()).thenReturn("https://i.twittercdn.com/original.background.jpg"); - when(user.getProfileBannerURL()).thenReturn("https://i.twittercdn.com/original.banner.jpg"); - when(user.getProfileBannerRetinaURL()).thenReturn("https://i.twittercdn.com/original.banner.jpg"); - when(user.getProfileBannerIPadURL()).thenReturn("https://i.twittercdn.com/original.banner.jpg"); - when(user.getProfileBannerIPadRetinaURL()).thenReturn("https://i.twittercdn.com/original.banner.jpg"); - when(user.getProfileBannerMobileURL()).thenReturn("https://i.twittercdn.com/original.banner.jpg"); - when(user.getProfileBannerMobileRetinaURL()).thenReturn("https://i.twittercdn.com/original.banner.jpg"); - when(user.isProfileBackgroundTiled()).thenReturn(false); - when(user.getLang()).thenReturn("en-us"); - when(user.getStatusesCount()).thenReturn(543); - when(user.isGeoEnabled()).thenReturn(true); - when(user.isVerified()).thenReturn(true); - when(user.isTranslator()).thenReturn(false); - when(user.getListedCount()).thenReturn(4); - when(user.isFollowRequestSent()).thenReturn(false); - when(user.getWithheldInCountries()).thenReturn(new String[]{"CN"}); - - - return user; - } - - List convert(long[] values) { - List list = new ArrayList<>(); - for (Long l : values) { - list.add(l); - } - return list; - } - - List convert(String[] values) { - List list = new ArrayList<>(); - for (String l : values) { - list.add(l); - } - return list; - } - - void assertStatus(Status status, Struct struct) { - assertEquals("CreatedAt does not match.", status.getCreatedAt(), struct.get("CreatedAt")); - assertEquals("Id does not match.", status.getId(), struct.get("Id")); - assertEquals("Text does not match.", status.getText(), struct.get("Text")); - assertEquals("Source does not match.", status.getSource(), struct.get("Source")); - assertEquals("Truncated does not match.", status.isTruncated(), struct.get("Truncated")); - assertEquals("InReplyToStatusId does not match.", status.getInReplyToStatusId(), struct.get("InReplyToStatusId")); - assertEquals("InReplyToUserId does not match.", status.getInReplyToUserId(), struct.get("InReplyToUserId")); - assertEquals("InReplyToScreenName does not match.", status.getInReplyToScreenName(), struct.get("InReplyToScreenName")); - assertEquals("Favorited does not match.", status.isFavorited(), struct.get("Favorited")); - assertEquals("Retweeted does not match.", status.isRetweeted(), struct.get("Retweeted")); - assertEquals("FavoriteCount does not match.", status.getFavoriteCount(), struct.get("FavoriteCount")); - assertEquals("Retweet does not match.", status.isRetweet(), struct.get("Retweet")); - assertEquals("RetweetCount does not match.", status.getRetweetCount(), struct.get("RetweetCount")); - assertEquals("RetweetedByMe does not match.", status.isRetweetedByMe(), struct.get("RetweetedByMe")); - assertEquals("CurrentUserRetweetId does not match.", status.getCurrentUserRetweetId(), struct.get("CurrentUserRetweetId")); - assertEquals("PossiblySensitive does not match.", status.isPossiblySensitive(), struct.get("PossiblySensitive")); - assertEquals("Lang does not match.", status.getLang(), struct.get("Lang")); - - assertUser(status.getUser(), struct.getStruct("User")); - assertPlace(status.getPlace(), struct.getStruct("Place")); - assertGeoLocation(status.getGeoLocation(), struct.getStruct("GeoLocation")); - - assertEquals("Contributors does not match.", convert(status.getContributors()), struct.getArray("Contributors")); - assertEquals("WithheldInCountries does not match.", convert(status.getWithheldInCountries()), struct.get("WithheldInCountries")); - } - - void assertGeoLocation(GeoLocation geoLocation, Struct struct) { - assertEquals(geoLocation.getLatitude(), struct.getFloat64("Latitude"), 1); - assertEquals(geoLocation.getLongitude(), struct.getFloat64("Longitude"), 1); - } - - void assertPlace(Place place, Struct struct) { - assertEquals("Name does not match.", place.getName(), struct.get("Name")); - assertEquals("StreetAddress does not match.", place.getStreetAddress(), struct.get("StreetAddress")); - assertEquals("CountryCode does not match.", place.getCountryCode(), struct.get("CountryCode")); - assertEquals("Id does not match.", place.getId(), struct.get("Id")); - assertEquals("Country does not match.", place.getCountry(), struct.get("Country")); - assertEquals("PlaceType does not match.", place.getPlaceType(), struct.get("PlaceType")); - assertEquals("URL does not match.", place.getURL(), struct.get("URL")); - assertEquals("FullName does not match.", place.getFullName(), struct.get("FullName")); - } - - void assertUser(User user, Struct struct) { - assertNotNull("struct should not be null.", struct); - assertEquals("Id does not match.", user.getId(), struct.get("Id")); - assertEquals("Name does not match.", user.getName(), struct.get("Name")); - assertEquals("ScreenName does not match.", user.getScreenName(), struct.get("ScreenName")); - assertEquals("Location does not match.", user.getLocation(), struct.get("Location")); - assertEquals("Description does not match.", user.getDescription(), struct.get("Description")); - assertEquals("ContributorsEnabled does not match.", user.isContributorsEnabled(), struct.get("ContributorsEnabled")); - assertEquals("ProfileImageURL does not match.", user.getProfileImageURL(), struct.get("ProfileImageURL")); - assertEquals("BiggerProfileImageURL does not match.", user.getBiggerProfileImageURL(), struct.get("BiggerProfileImageURL")); - assertEquals("MiniProfileImageURL does not match.", user.getMiniProfileImageURL(), struct.get("MiniProfileImageURL")); - assertEquals("OriginalProfileImageURL does not match.", user.getOriginalProfileImageURL(), struct.get("OriginalProfileImageURL")); - assertEquals("ProfileImageURLHttps does not match.", user.getProfileImageURLHttps(), struct.get("ProfileImageURLHttps")); - assertEquals("BiggerProfileImageURLHttps does not match.", user.getBiggerProfileImageURLHttps(), struct.get("BiggerProfileImageURLHttps")); - assertEquals("MiniProfileImageURLHttps does not match.", user.getMiniProfileImageURLHttps(), struct.get("MiniProfileImageURLHttps")); - assertEquals("OriginalProfileImageURLHttps does not match.", user.getOriginalProfileImageURLHttps(), struct.get("OriginalProfileImageURLHttps")); - assertEquals("DefaultProfileImage does not match.", user.isDefaultProfileImage(), struct.get("DefaultProfileImage")); - assertEquals("URL does not match.", user.getURL(), struct.get("URL")); - assertEquals("Protected does not match.", user.isProtected(), struct.get("Protected")); - assertEquals("FollowersCount does not match.", user.getFollowersCount(), struct.get("FollowersCount")); - assertEquals("ProfileBackgroundColor does not match.", user.getProfileBackgroundColor(), struct.get("ProfileBackgroundColor")); - assertEquals("ProfileTextColor does not match.", user.getProfileTextColor(), struct.get("ProfileTextColor")); - assertEquals("ProfileLinkColor does not match.", user.getProfileLinkColor(), struct.get("ProfileLinkColor")); - assertEquals("ProfileSidebarFillColor does not match.", user.getProfileSidebarFillColor(), struct.get("ProfileSidebarFillColor")); - assertEquals("ProfileSidebarBorderColor does not match.", user.getProfileSidebarBorderColor(), struct.get("ProfileSidebarBorderColor")); - assertEquals("ProfileUseBackgroundImage does not match.", user.isProfileUseBackgroundImage(), struct.get("ProfileUseBackgroundImage")); - assertEquals("DefaultProfile does not match.", user.isDefaultProfile(), struct.get("DefaultProfile")); - assertEquals("ShowAllInlineMedia does not match.", user.isShowAllInlineMedia(), struct.get("ShowAllInlineMedia")); - assertEquals("FriendsCount does not match.", user.getFriendsCount(), struct.get("FriendsCount")); - assertEquals("CreatedAt does not match.", user.getCreatedAt(), struct.get("CreatedAt")); - assertEquals("FavouritesCount does not match.", user.getFavouritesCount(), struct.get("FavouritesCount")); - assertEquals("UtcOffset does not match.", user.getUtcOffset(), struct.get("UtcOffset")); - assertEquals("TimeZone does not match.", user.getTimeZone(), struct.get("TimeZone")); - assertEquals("ProfileBackgroundImageURL does not match.", user.getProfileBackgroundImageURL(), struct.get("ProfileBackgroundImageURL")); - assertEquals("ProfileBackgroundImageUrlHttps does not match.", user.getProfileBackgroundImageUrlHttps(), struct.get("ProfileBackgroundImageUrlHttps")); - assertEquals("ProfileBannerURL does not match.", user.getProfileBannerURL(), struct.get("ProfileBannerURL")); - assertEquals("ProfileBannerRetinaURL does not match.", user.getProfileBannerRetinaURL(), struct.get("ProfileBannerRetinaURL")); - assertEquals("ProfileBannerIPadURL does not match.", user.getProfileBannerIPadURL(), struct.get("ProfileBannerIPadURL")); - assertEquals("ProfileBannerIPadRetinaURL does not match.", user.getProfileBannerIPadRetinaURL(), struct.get("ProfileBannerIPadRetinaURL")); - assertEquals("ProfileBannerMobileURL does not match.", user.getProfileBannerMobileURL(), struct.get("ProfileBannerMobileURL")); - assertEquals("ProfileBannerMobileRetinaURL does not match.", user.getProfileBannerMobileRetinaURL(), struct.get("ProfileBannerMobileRetinaURL")); - assertEquals("ProfileBackgroundTiled does not match.", user.isProfileBackgroundTiled(), struct.get("ProfileBackgroundTiled")); - assertEquals("Lang does not match.", user.getLang(), struct.get("Lang")); - assertEquals("StatusesCount does not match.", user.getStatusesCount(), struct.get("StatusesCount")); - assertEquals("GeoEnabled does not match.", user.isGeoEnabled(), struct.get("GeoEnabled")); - assertEquals("Verified does not match.", user.isVerified(), struct.get("Verified")); - assertEquals("Translator does not match.", user.isTranslator(), struct.get("Translator")); - assertEquals("ListedCount does not match.", user.getListedCount(), struct.get("ListedCount")); - assertEquals("FollowRequestSent does not match.", user.isFollowRequestSent(), struct.get("FollowRequestSent")); - } - - void assertKey(Status status, Struct struct) { - assertEquals("Id does not match.", status.getId(), struct.get("Id")); - } - - @Test - public void convertStatus() { - Status status = mockStatus(); - Struct struct = new Struct(StatusConverter.statusSchema); - StatusConverter.convert(status, struct); - assertStatus(status, struct); - } - - @Test - public void convertUser() { - User user = mockUser(); - Struct struct = new Struct(StatusConverter.userSchema); - StatusConverter.convert(user, struct); - assertUser(user, struct); - } - - @Test - public void convertPlace() { - Place place = mockPlace(); - Struct struct = new Struct(StatusConverter.placeSchema); - StatusConverter.convert(place, struct); - assertPlace(place, struct); - } - - @Test - public void convertGeoLocation() { - GeoLocation geoLocation = mockGeoLocation(); - Struct struct = new Struct(StatusConverter.geoLocationSchema); - StatusConverter.convert(geoLocation, struct); - assertGeoLocation(geoLocation, struct); - } - - @Test - public void convertStatusKey() { - Status status = mockStatus(); - Struct struct = new Struct(StatusConverter.statusSchemaKey); - StatusConverter.convertKey(status, struct); - assertKey(status, struct); - } - - public static StatusDeletionNotice mockStatusDeletionNotice() { - StatusDeletionNotice statusDeletionNotice = mock(StatusDeletionNotice.class); - when(statusDeletionNotice.getStatusId()).thenReturn(1234565345L); - when(statusDeletionNotice.getUserId()).thenReturn(6543456354L); - return statusDeletionNotice; - } - - void assertStatusDeletionNotice(StatusDeletionNotice statusDeletionNotice, Struct struct) { - assertEquals("StatusId does not match.", statusDeletionNotice.getStatusId(), struct.get("StatusId")); - assertEquals("UserId does not match.", statusDeletionNotice.getUserId(), struct.get("UserId")); - } - - void assertStatusDeletionNoticeKey(StatusDeletionNotice statusDeletionNotice, Struct struct) { - assertEquals("StatusId does not match.", statusDeletionNotice.getStatusId(), struct.get("StatusId")); - } - - @Test - public void convertStatusDeletionNotice() { - StatusDeletionNotice statusDeletionNotice = mockStatusDeletionNotice(); - Struct struct = new Struct(StatusConverter.schemaStatusDeletionNotice); - StatusConverter.convert(statusDeletionNotice, struct); - assertStatusDeletionNotice(statusDeletionNotice, struct); - } - - @Test - public void convertKeyStatusDeletionNotice() { - StatusDeletionNotice statusDeletionNotice = mockStatusDeletionNotice(); - Struct struct = new Struct(StatusConverter.schemaStatusDeletionNoticeKey); - StatusConverter.convertKey(statusDeletionNotice, struct); - assertStatusDeletionNoticeKey(statusDeletionNotice, struct); - } -} diff --git a/src/test/java/io/confluent/kafka/connect/twitter/TwitterSourceConnectorConfigTest.java b/src/test/java/io/confluent/kafka/connect/twitter/TwitterSourceConnectorConfigTest.java deleted file mode 100644 index 5c23e2e..0000000 --- a/src/test/java/io/confluent/kafka/connect/twitter/TwitterSourceConnectorConfigTest.java +++ /dev/null @@ -1,11 +0,0 @@ -package io.confluent.kafka.connect.twitter; - -import io.confluent.kafka.connect.utils.config.MarkdownFormatter; -import org.junit.Test; - -public class TwitterSourceConnectorConfigTest { - @Test - public void doc() { - System.out.println(MarkdownFormatter.toMarkdown(TwitterSourceConnectorConfig.conf())); - } -} \ No newline at end of file diff --git a/src/test/java/io/confluent/kafka/connect/twitter/TwitterSourceConnectorTest.java b/src/test/java/io/confluent/kafka/connect/twitter/TwitterSourceConnectorTest.java deleted file mode 100644 index 734bb50..0000000 --- a/src/test/java/io/confluent/kafka/connect/twitter/TwitterSourceConnectorTest.java +++ /dev/null @@ -1,10 +0,0 @@ -package io.confluent.kafka.connect.twitter; - -import org.junit.Test; - -public class TwitterSourceConnectorTest { - @Test - public void test() { - // Congrats on a passing test! - } -} diff --git a/src/test/java/io/confluent/kafka/connect/twitter/TwitterSourceTaskTest.java b/src/test/java/io/confluent/kafka/connect/twitter/TwitterSourceTaskTest.java deleted file mode 100644 index 7051337..0000000 --- a/src/test/java/io/confluent/kafka/connect/twitter/TwitterSourceTaskTest.java +++ /dev/null @@ -1,10 +0,0 @@ -package io.confluent.kafka.connect.twitter; - -import org.junit.Test; - -public class TwitterSourceTaskTest { - @Test - public void test() { - // Congrats on a passing test! - } -} \ No newline at end of file