Skip to content

Commit

Permalink
Use latest RADAR-Schemas
Browse files Browse the repository at this point in the history
  • Loading branch information
blootsvoets committed Sep 26, 2017
1 parent 182ac1c commit f711a4e
Show file tree
Hide file tree
Showing 21 changed files with 216 additions and 168 deletions.
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ allprojects {
ext.mathVersion = '3.0'
ext.hamcrestVersion = '1.3'
ext.codacyVersion = '1.0.10'
ext.radarSchemasVersion = '0.1'
ext.radarSchemasVersion = '0.2-alpha.3-SNAPSHOT'

ext.githubUrl = 'https://github.com/' + githubRepoName + '.git'
ext.issueUrl = 'https://github.com/' + githubRepoName + '/issues'
Expand All @@ -67,6 +67,9 @@ allprojects {
jcenter()
maven { url 'http://packages.confluent.io/maven/' }
maven { url 'http://dl.bintray.com/typesafe/maven-releases' }
flatDir {
dirs "${project.rootDir}/libs"
}
}

ext.pomConfig = {
Expand Down
1 change: 1 addition & 0 deletions libs/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.jar
3 changes: 3 additions & 0 deletions src/main/java/org/radarcns/topic/SensorTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public SensorTopic(String name, Schema keySchema, Schema valueSchema,
throw new IllegalArgumentException("Sensors must send records as values");
}

if (keySchema.getField("projectId") == null) {
throw new IllegalArgumentException("Key schema must have a project ID");
}
if (keySchema.getField("userId") == null) {
throw new IllegalArgumentException("Key schema must have a user ID");
}
Expand Down
28 changes: 17 additions & 11 deletions src/test/java/org/radarcns/data/SpecificRecordDecorderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import java.io.IOException;
import org.junit.Test;
import org.radarcns.empatica.EmpaticaE4BloodVolumePulse;
import org.radarcns.key.MeasurementKey;
import org.radarcns.passive.empatica.EmpaticaE4BloodVolumePulse;
import org.radarcns.kafka.ObservationKey;
import org.radarcns.topic.AvroTopic;

/**
Expand All @@ -30,13 +30,14 @@
public class SpecificRecordDecorderTest {

@Test
public void decordJson() throws IOException {
public void decodeJson() throws IOException {
SpecificRecordDecoder decoder = new SpecificRecordDecoder(false);
AvroTopic<MeasurementKey, EmpaticaE4BloodVolumePulse> topic = new AvroTopic<>("keeeeys", MeasurementKey.getClassSchema(), EmpaticaE4BloodVolumePulse.getClassSchema(), MeasurementKey.class, EmpaticaE4BloodVolumePulse.class);
AvroDecoder.AvroReader<MeasurementKey> keyDecoder = decoder.reader(topic.getKeySchema(), topic.getKeyClass());
AvroTopic<ObservationKey, EmpaticaE4BloodVolumePulse> topic = new AvroTopic<>("keeeeys", ObservationKey.getClassSchema(), EmpaticaE4BloodVolumePulse.getClassSchema(), ObservationKey.class, EmpaticaE4BloodVolumePulse.class);
AvroDecoder.AvroReader<ObservationKey> keyDecoder = decoder.reader(topic.getKeySchema(), topic.getKeyClass());
AvroDecoder.AvroReader<EmpaticaE4BloodVolumePulse> valueDecoder = decoder.reader(topic.getValueSchema(), topic.getValueClass());

MeasurementKey key = keyDecoder.decode("{\"userId\":\"a\",\"sourceId\":\"b\"}".getBytes());
ObservationKey key = keyDecoder.decode("{\"projectId\":{\"string\":\"test\"},\"userId\":\"a\",\"sourceId\":\"b\"}".getBytes());
assertEquals(key.get("projectId"), "test");
assertEquals(key.get("userId"), "a");
assertEquals(key.get("sourceId"), "b");

Expand All @@ -47,15 +48,20 @@ public void decordJson() throws IOException {
}

@Test
public void decordBinary() throws IOException {
public void decodeBinary() throws IOException {

SpecificRecordDecoder decoder = new SpecificRecordDecoder(true);
AvroTopic<MeasurementKey, EmpaticaE4BloodVolumePulse> topic = new AvroTopic<>("keeeeys", MeasurementKey.getClassSchema(), EmpaticaE4BloodVolumePulse.getClassSchema(), MeasurementKey.class, EmpaticaE4BloodVolumePulse.class);
AvroDecoder.AvroReader<MeasurementKey> keyDecoder = decoder.reader(topic.getKeySchema(), topic.getKeyClass());
AvroTopic<ObservationKey, EmpaticaE4BloodVolumePulse> topic = new AvroTopic<>("keeeeys", ObservationKey.getClassSchema(), EmpaticaE4BloodVolumePulse.getClassSchema(), ObservationKey.class, EmpaticaE4BloodVolumePulse.class);
AvroDecoder.AvroReader<ObservationKey> keyDecoder = decoder.reader(topic.getKeySchema(), topic.getKeyClass());
AvroDecoder.AvroReader<EmpaticaE4BloodVolumePulse> valueDecoder = decoder.reader(topic.getValueSchema(), topic.getValueClass());

byte[] inputKey = {2, 97, 2, 98};
MeasurementKey key = keyDecoder.decode( inputKey);
// note that positive numbers are multiplied by two in avro binary encoding, due to the
// zig-zag encoding schema used.
// See http://avro.apache.org/docs/1.8.1/spec.html#binary_encoding
// type index 1, length 4, char t, char e, char s, char t, length 1, char a, length 1, char b
byte[] inputKey = {2, 8, 116, 101, 115, 116, 2, 97, 2, 98};
ObservationKey key = keyDecoder.decode( inputKey);
assertEquals(key.get("projectId"), "test");
assertEquals(key.get("userId"), "a");
assertEquals(key.get("sourceId"), "b");

Expand Down
25 changes: 14 additions & 11 deletions src/test/java/org/radarcns/data/SpecificRecordEncoderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import junit.framework.TestCase;

import org.radarcns.empatica.EmpaticaE4BloodVolumePulse;
import org.radarcns.key.MeasurementKey;
import org.radarcns.passive.empatica.EmpaticaE4BloodVolumePulse;
import org.radarcns.kafka.ObservationKey;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -28,25 +28,28 @@
public class SpecificRecordEncoderTest extends TestCase {
public void testJson() throws IOException {
SpecificRecordEncoder encoder = new SpecificRecordEncoder(false);
AvroTopic<MeasurementKey, EmpaticaE4BloodVolumePulse> topic = new AvroTopic<>("keeeeys", MeasurementKey.getClassSchema(), EmpaticaE4BloodVolumePulse.getClassSchema(), MeasurementKey.class, EmpaticaE4BloodVolumePulse.class);
AvroEncoder.AvroWriter<MeasurementKey> keyEncoder = encoder.writer(topic.getKeySchema(), topic.getKeyClass());
AvroTopic<ObservationKey, EmpaticaE4BloodVolumePulse> topic = new AvroTopic<>("keeeeys", ObservationKey.getClassSchema(), EmpaticaE4BloodVolumePulse.getClassSchema(), ObservationKey.class, EmpaticaE4BloodVolumePulse.class);
AvroEncoder.AvroWriter<ObservationKey> keyEncoder = encoder.writer(topic.getKeySchema(), topic.getKeyClass());
AvroEncoder.AvroWriter<EmpaticaE4BloodVolumePulse> valueEncoder = encoder.writer(topic.getValueSchema(), topic.getValueClass());

byte[] key = keyEncoder.encode(new MeasurementKey("a", "b"));
byte[] key = keyEncoder.encode(new ObservationKey("test", "a", "b"));
byte[] value = valueEncoder.encode(new EmpaticaE4BloodVolumePulse(0d, 0d, 0f));
assertEquals("{\"userId\":\"a\",\"sourceId\":\"b\"}", new String(key));
assertEquals("{\"projectId\":{\"string\":\"test\"},\"userId\":\"a\",\"sourceId\":\"b\"}", new String(key));
assertEquals("{\"time\":0.0,\"timeReceived\":0.0,\"bloodVolumePulse\":0.0}", new String(value));
}

public void testBinary() throws IOException {
SpecificRecordEncoder encoder = new SpecificRecordEncoder(true);
AvroTopic<MeasurementKey, EmpaticaE4BloodVolumePulse> topic = new AvroTopic<>("keeeeys", MeasurementKey.getClassSchema(), EmpaticaE4BloodVolumePulse.getClassSchema(), MeasurementKey.class, EmpaticaE4BloodVolumePulse.class);
AvroEncoder.AvroWriter<MeasurementKey> keyEncoder = encoder.writer(topic.getKeySchema(), topic.getKeyClass());
AvroTopic<ObservationKey, EmpaticaE4BloodVolumePulse> topic = new AvroTopic<>("keeeeys", ObservationKey.getClassSchema(), EmpaticaE4BloodVolumePulse.getClassSchema(), ObservationKey.class, EmpaticaE4BloodVolumePulse.class);
AvroEncoder.AvroWriter<ObservationKey> keyEncoder = encoder.writer(topic.getKeySchema(), topic.getKeyClass());
AvroEncoder.AvroWriter<EmpaticaE4BloodVolumePulse> valueEncoder = encoder.writer(topic.getValueSchema(), topic.getValueClass());

byte[] key = keyEncoder.encode(new MeasurementKey("a", "b"));
// start string, char a, start string, char b
byte[] expectedKey = {2, 97, 2, 98};
byte[] key = keyEncoder.encode(new ObservationKey("test", "a", "b"));
// note that positive numbers are multiplied by two in avro binary encoding, due to the
// zig-zag encoding schema used.
// See http://avro.apache.org/docs/1.8.1/spec.html#binary_encoding
// type index 1, length 4, char t, char e, char s, char t, length 1, char a, length 1, char b
byte[] expectedKey = {2, 8, 116, 101, 115, 116, 2, 97, 2, 98};
System.out.println("key: 0x" + byteArrayToHex(key));
System.out.println("expected: 0x" + byteArrayToHex(expectedKey));
assertTrue(Arrays.equals(expectedKey, key));
Expand Down
38 changes: 19 additions & 19 deletions src/test/java/org/radarcns/producer/rest/RestSenderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.radarcns.config.ServerConfig;
import org.radarcns.data.Record;
import org.radarcns.data.SpecificRecordEncoder;
import org.radarcns.key.MeasurementKey;
import org.radarcns.phone.PhoneLight;
import org.radarcns.kafka.ObservationKey;
import org.radarcns.passive.phone.PhoneLight;
import org.radarcns.producer.KafkaTopicSender;
import org.radarcns.topic.AvroTopic;

Expand All @@ -47,7 +47,7 @@

public class RestSenderTest {
private SchemaRetriever retriever;
private RestSender<MeasurementKey, SpecificRecord> sender;
private RestSender<ObservationKey, SpecificRecord> sender;

@Rule
public MockWebServer webServer = new MockWebServer();
Expand All @@ -58,7 +58,7 @@ public void setUp() {
SpecificRecordEncoder encoder = new SpecificRecordEncoder(false);

ServerConfig config = new ServerConfig(webServer.url("/").url());
this.sender = new RestSender.Builder<MeasurementKey, SpecificRecord>()
this.sender = new RestSender.Builder<ObservationKey, SpecificRecord>()
.server(config)
.schemaRetriever(retriever)
.encoders(encoder, encoder)
Expand All @@ -68,18 +68,18 @@ public void setUp() {

@Test
public void sender() throws Exception {
Schema keySchema = MeasurementKey.getClassSchema();
Schema keySchema = ObservationKey.getClassSchema();
Schema valueSchema = PhoneLight.getClassSchema();
AvroTopic<MeasurementKey, PhoneLight> topic = new AvroTopic<>("test",
keySchema, valueSchema, MeasurementKey.class, PhoneLight.class);
AvroTopic<ObservationKey, PhoneLight> topic = new AvroTopic<>("test",
keySchema, valueSchema, ObservationKey.class, PhoneLight.class);
Headers headers = new Headers.Builder()
.add("Cookie", "ab")
.add("Cookie", "bc")
.build();
sender.setHeaders(headers);
KafkaTopicSender<MeasurementKey, PhoneLight> topicSender = sender.sender(topic);
KafkaTopicSender<ObservationKey, PhoneLight> topicSender = sender.sender(topic);

MeasurementKey key = new MeasurementKey("a", "b");
ObservationKey key = new ObservationKey("test","a", "b");
PhoneLight value = new PhoneLight(0.1, 0.2, 0.3f);
ParsedSchemaMetadata keySchemaMetadata = new ParsedSchemaMetadata(10, 2, keySchema);
ParsedSchemaMetadata valueSchemaMetadata = new ParsedSchemaMetadata(10, 2, valueSchema);
Expand Down Expand Up @@ -118,13 +118,13 @@ public void sender() throws Exception {

@Test
public void sendTwo() throws Exception {
Schema keySchema = MeasurementKey.getClassSchema();
Schema keySchema = ObservationKey.getClassSchema();
Schema valueSchema = PhoneLight.getClassSchema();
AvroTopic<MeasurementKey, PhoneLight> topic = new AvroTopic<>("test",
keySchema, valueSchema, MeasurementKey.class, PhoneLight.class);
KafkaTopicSender<MeasurementKey, PhoneLight> topicSender = sender.sender(topic);
AvroTopic<ObservationKey, PhoneLight> topic = new AvroTopic<>("test",
keySchema, valueSchema, ObservationKey.class, PhoneLight.class);
KafkaTopicSender<ObservationKey, PhoneLight> topicSender = sender.sender(topic);

MeasurementKey key = new MeasurementKey("a", "b");
ObservationKey key = new ObservationKey("test", "a", "b");
PhoneLight value = new PhoneLight(0.1, 0.2, 0.3f);
ParsedSchemaMetadata keySchemaMetadata = new ParsedSchemaMetadata(10, 2, keySchema);
ParsedSchemaMetadata valueSchemaMetadata = new ParsedSchemaMetadata(10, 2, valueSchema);
Expand Down Expand Up @@ -193,13 +193,13 @@ public void withCompression() throws IOException, InterruptedException {
webServer.enqueue(new MockResponse()
.setHeader("Content-Type", "application/json; charset=utf-8")
.setBody("{\"offset\": 100}"));
Schema keySchema = MeasurementKey.getClassSchema();
Schema keySchema = ObservationKey.getClassSchema();
Schema valueSchema = PhoneLight.getClassSchema();
AvroTopic<MeasurementKey, PhoneLight> topic = new AvroTopic<>("test",
keySchema, valueSchema, MeasurementKey.class, PhoneLight.class);
KafkaTopicSender<MeasurementKey, PhoneLight> topicSender = sender.sender(topic);
AvroTopic<ObservationKey, PhoneLight> topic = new AvroTopic<>("test",
keySchema, valueSchema, ObservationKey.class, PhoneLight.class);
KafkaTopicSender<ObservationKey, PhoneLight> topicSender = sender.sender(topic);

MeasurementKey key = new MeasurementKey("a", "b");
ObservationKey key = new ObservationKey("test", "a", "b");
PhoneLight value = new PhoneLight(0.1, 0.2, 0.3f);
ParsedSchemaMetadata keySchemaMetadata = new ParsedSchemaMetadata(10, 2, keySchema);
ParsedSchemaMetadata valueSchemaMetadata = new ParsedSchemaMetadata(10, 2, valueSchema);
Expand Down
20 changes: 11 additions & 9 deletions src/test/java/org/radarcns/topic/SensorTopicTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package org.radarcns.topic;

import com.fasterxml.jackson.databind.annotation.JsonTypeResolver;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
import org.junit.Test;
import org.radarcns.key.MeasurementKey;
import org.radarcns.phone.PhoneAcceleration;
import org.radarcns.kafka.ObservationKey;
import org.radarcns.passive.phone.PhoneAcceleration;

import static org.junit.Assert.assertEquals;

Expand All @@ -34,6 +35,7 @@ public class SensorTopicTest {
@Test
public void workingConstructor() {
Schema keySchema = SchemaBuilder.record("key").fields()
.name("projectId").type(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING))).withDefault(null)
.name("userId").type(Schema.create(Type.STRING)).noDefault()
.name("sourceId").type(Schema.create(Type.STRING)).noDefault()
.endRecord();
Expand Down Expand Up @@ -123,28 +125,28 @@ public void notARecord() {

@Test
public void parseTopic() {
SensorTopic<MeasurementKey, PhoneAcceleration> topic = SensorTopic.parse("test",
MeasurementKey.class.getName(), PhoneAcceleration.class.getName());
SensorTopic<ObservationKey, PhoneAcceleration> topic = SensorTopic.parse("test",
ObservationKey.class.getName(), PhoneAcceleration.class.getName());

SensorTopic<MeasurementKey, PhoneAcceleration> expected = new SensorTopic<>("test",
MeasurementKey.getClassSchema(), PhoneAcceleration.getClassSchema(),
MeasurementKey.class, PhoneAcceleration.class);
SensorTopic<ObservationKey, PhoneAcceleration> expected = new SensorTopic<>("test",
ObservationKey.getClassSchema(), PhoneAcceleration.getClassSchema(),
ObservationKey.class, PhoneAcceleration.class);

assertEquals(expected, topic);
}

@Test(expected = IllegalArgumentException.class)
public void parseUnexistingKey() {
SensorTopic.parse("test",
"unexisting." + MeasurementKey.class.getName(),
"unexisting." + ObservationKey.class.getName(),
PhoneAcceleration.class.getName());
}


@Test(expected = IllegalArgumentException.class)
public void parseUnexistingValue() {
SensorTopic.parse("test",
MeasurementKey.class.getName(),
ObservationKey.class.getName(),
"unexisting." + PhoneAcceleration.class.getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.junit.Test;
import org.radarcns.key.MeasurementKey;
import org.radarcns.kafka.ObservationKey;
import org.radarcns.producer.rest.SchemaRetriever;
import org.radarcns.producer.rest.ParsedSchemaMetadata;

Expand All @@ -44,7 +44,7 @@ public void serialize() throws Exception {
testSerialization("this", Schema.create(Type.STRING));
testSerialization(10, Schema.create(Type.INT));

MeasurementKey key = new MeasurementKey("a", "b");
ObservationKey key = new ObservationKey("test", "a", "b");
testSerialization(key, key.getSchema());

Schema genericSchema = Schema.createRecord(Arrays.asList(
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ number_of_devices: 1
data:
- file: integration_test.csv
topic: integration_test
key_schema: org.radarcns.key.MeasurementKey
key_schema: org.radarcns.kafka.ObservationKey
value_schema: org.radarcns.aggregator.DoubleArrayAggregator
frequency: 32
sensor: TEMPERATURE
4 changes: 2 additions & 2 deletions testing/src/main/java/org/radarcns/mock/MockFileSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.io.IOException;
import org.apache.avro.specific.SpecificRecord;
import org.radarcns.data.Record;
import org.radarcns.key.MeasurementKey;
import org.radarcns.kafka.ObservationKey;
import org.radarcns.mock.data.MockCsvParser;
import org.radarcns.producer.KafkaSender;
import org.radarcns.producer.KafkaTopicSender;
Expand All @@ -33,7 +33,7 @@ public class MockFileSender {
private final KafkaSender sender;
private final MockCsvParser parser;

public MockFileSender(KafkaSender<MeasurementKey, SpecificRecord> sender,
public MockFileSender(KafkaSender<ObservationKey, SpecificRecord> sender,
MockCsvParser parser) {
this.parser = parser;
this.sender = sender;
Expand Down
Loading

0 comments on commit f711a4e

Please sign in to comment.