From 5278251be4a182f929a82965d811d929a06f4703 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Wed, 30 Nov 2022 14:12:53 +0100 Subject: [PATCH] Remove schema registry server code --- java-sdk/gradle.properties | 2 - .../build.gradle.kts | 10 +- .../registration/JsonSchemaBackupStorage.java | 121 ------ .../registration/SchemaBackupStorage.java | 36 -- .../registration/SchemaTopicBackup.java | 242 ----------- .../registration/SchemaTopicManager.java | 384 ------------------ .../JsonSchemaBackupStorageTest.java | 31 -- .../test/resources/key_measurement_test.avsc | 10 - .../src/test/resources/key_windowed_test.avsc | 11 - .../src/test/resources/log4j2.xml | 14 - .../src/test/resources/schema.yml | 14 - .../radarbase/schema/tools/CommandLineApp.kt | 1 - .../schema/tools/SchemaTopicManagerCommand.kt | 92 ----- 13 files changed, 2 insertions(+), 966 deletions(-) delete mode 100644 java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/JsonSchemaBackupStorage.java delete mode 100644 java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaBackupStorage.java delete mode 100644 java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaTopicBackup.java delete mode 100644 java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaTopicManager.java delete mode 100644 java-sdk/radar-schemas-registration/src/test/java/org/radarbase/schema/registration/JsonSchemaBackupStorageTest.java delete mode 100644 java-sdk/radar-schemas-registration/src/test/resources/key_measurement_test.avsc delete mode 100644 java-sdk/radar-schemas-registration/src/test/resources/key_windowed_test.avsc delete mode 100644 java-sdk/radar-schemas-registration/src/test/resources/log4j2.xml delete mode 100644 java-sdk/radar-schemas-registration/src/test/resources/schema.yml delete mode 100644 java-sdk/radar-schemas-tools/src/main/java/org/radarbase/schema/tools/SchemaTopicManagerCommand.kt diff --git a/java-sdk/gradle.properties b/java-sdk/gradle.properties index bed50658..c2f80d26 100644 --- a/java-sdk/gradle.properties +++ b/java-sdk/gradle.properties @@ -19,5 +19,3 @@ slf4jVersion=2.0.5 javaxValidationVersion=2.0.1.Final jsoupVersion=1.15.3 log4j2Version=2.19.0 -nettyVersion=4.1.85.Final -jettyVersion=9.4.49.v20220914 diff --git a/java-sdk/radar-schemas-registration/build.gradle.kts b/java-sdk/radar-schemas-registration/build.gradle.kts index 07984f0f..ceba6259 100644 --- a/java-sdk/radar-schemas-registration/build.gradle.kts +++ b/java-sdk/radar-schemas-registration/build.gradle.kts @@ -12,16 +12,10 @@ dependencies { val radarCommonsVersion: String by project api("org.radarbase:radar-commons-server:$radarCommonsVersion") - val nettyVersion: String by project - implementation(platform("io.netty:netty-bom:$nettyVersion")) - val jettyVersion: String by project - implementation(platform("org.eclipse.jetty:jetty-bom:$jettyVersion")) - val confluentVersion: String by project implementation("io.confluent:kafka-connect-avro-converter:$confluentVersion") - implementation("io.confluent:kafka-schema-registry:$confluentVersion") { - exclude(group = "org.slf4j", module = "slf4j-reload4j") - } + implementation("io.confluent:kafka-schema-registry-client:$confluentVersion") + val kafkaVersion: String by project implementation("org.apache.kafka:connect-json:$kafkaVersion") diff --git a/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/JsonSchemaBackupStorage.java b/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/JsonSchemaBackupStorage.java deleted file mode 100644 index c6d7e560..00000000 --- a/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/JsonSchemaBackupStorage.java +++ /dev/null @@ -1,121 +0,0 @@ -package org.radarbase.schema.registration; - -import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; - -import com.fasterxml.jackson.annotation.JsonInclude.Include; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.io.Reader; -import java.io.Writer; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.attribute.FileTime; -import java.util.Arrays; -import java.util.Locale; -import javax.validation.constraints.NotNull; - -/** - * Schema topic backup storage to JSON files. - */ -public class JsonSchemaBackupStorage implements SchemaBackupStorage { - private static final Logger logger = LoggerFactory.getLogger(JsonSchemaBackupStorage.class); - - private static final String EXT = ".json"; - private static final String INVALID_EXT = ".invalid" + EXT; - private static final ObjectMapper MAPPER = new ObjectMapper() - .setSerializationInclusion(Include.NON_NULL); - private final Path path; - - public JsonSchemaBackupStorage(@NotNull Path path) { - this.path = path.toAbsolutePath(); - } - - static boolean contentEquals(@NotNull Path path, @NotNull Path backupPath) throws IOException { - if (Files.size(path) != Files.size(backupPath)) { - return false; - } - - try (InputStream input1 = Files.newInputStream(path); - InputStream input2 = Files.newInputStream(backupPath)) { - - byte[] buf1 = new byte[4096]; - byte[] buf2 = new byte[4096]; - - int numRead1 = input1.read(buf1); - - while (numRead1 != -1) { - int numRead2 = input2.readNBytes(buf2, 0, numRead1); - if (numRead2 != numRead1 - || !Arrays.equals(buf1, 0, numRead1, buf2, 0, numRead1)) { - return false; - } - numRead1 = input1.read(buf1); - } - - return input2.read() == -1; - } - } - - @Override - public void store(SchemaTopicBackup topic) throws IOException { - Path tmpPath = Files.createTempFile(path.getParent(), ".schema-backup", EXT); - - try (Writer writer = Files.newBufferedWriter(tmpPath)) { - MAPPER.writeValue(writer, topic); - } - - replaceAndBackup(tmpPath, path, EXT); - } - - private void replaceAndBackup(Path tmpPath, Path mainPath, String suffix) throws IOException { - if (!Files.exists(mainPath)) { - logger.info("Creating new {}", mainPath); - Files.move(tmpPath, mainPath, ATOMIC_MOVE); - } else if (contentEquals(mainPath, tmpPath)) { - logger.info("Not replacing old identical value {}", mainPath); - Files.delete(tmpPath); - } else { - FileTime lastModified = Files.getLastModifiedTime(mainPath); - Path backupPath = changeJsonSuffix(mainPath, "." + lastModified.toInstant() + suffix); - logger.info("Creating new {} and moving the existing value to {}", - mainPath, backupPath); - Files.copy(mainPath, backupPath); - Files.move(tmpPath, mainPath, ATOMIC_MOVE); - } - } - - @Override - public void storeInvalid(@NotNull SchemaTopicBackup topic) throws IOException { - Path tmpPath = Files.createTempFile(path.getParent(), ".schema-backup", INVALID_EXT); - - try (Writer writer = Files.newBufferedWriter(tmpPath)) { - MAPPER.writeValue(writer, topic); - } - - replaceAndBackup(tmpPath, changeJsonSuffix(path, INVALID_EXT), INVALID_EXT); - } - - private Path changeJsonSuffix(@NotNull Path path, @NotNull String suffix) { - String filename = path.getFileName().toString(); - if (filename.toLowerCase(Locale.US).endsWith(EXT)) { - filename = filename.substring(0, filename.length() - EXT.length()); - } - return path.getParent().resolve(filename + suffix); - } - - @Override - public SchemaTopicBackup load() throws IOException { - try (Reader reader = Files.newBufferedReader(path)) { - return MAPPER.readValue(reader, SchemaTopicBackup.class); - } - } - - @Override - public Path getPath() { - return path; - } -} diff --git a/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaBackupStorage.java b/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaBackupStorage.java deleted file mode 100644 index 0127dcf9..00000000 --- a/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaBackupStorage.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.radarbase.schema.registration; - -import java.io.IOException; -import java.nio.file.Path; - -/** - * Storage for _schemas topic backups. - */ -public interface SchemaBackupStorage { - - /** - * Store a valid _schemas topic backup. - * - * @param topic backup to store. - * @throws IOException if the data cannot be stored. - */ - void store(SchemaTopicBackup topic) throws IOException; - - /** - * Store an invalid _schemas topic backup. - * - * @param topic backup to store. - * @throws IOException if the data cannot be stored. - */ - void storeInvalid(SchemaTopicBackup topic) throws IOException; - - /** - * Load a valid _schemas topic backup from storage. - * - * @return backup or {@code null} if no backup was available. - * @throws IOException if the data cannot be stored. - */ - SchemaTopicBackup load() throws IOException; - - Path getPath(); -} diff --git a/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaTopicBackup.java b/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaTopicBackup.java deleted file mode 100644 index abf7bd26..00000000 --- a/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaTopicBackup.java +++ /dev/null @@ -1,242 +0,0 @@ -package org.radarbase.schema.registration; - -import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT; -import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonGetter; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; -import io.confluent.kafka.schemaregistry.storage.SchemaRegistryKey; -import io.confluent.kafka.schemaregistry.storage.SchemaRegistryKeyType; -import io.confluent.kafka.schemaregistry.storage.SchemaRegistryValue; -import io.confluent.kafka.schemaregistry.storage.SchemaValue; -import io.confluent.kafka.schemaregistry.storage.exceptions.SerializationException; -import io.confluent.kafka.schemaregistry.storage.serialization.SchemaRegistrySerializer; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; -import javax.validation.constraints.NotNull; -import org.apache.kafka.clients.admin.Config; -import org.apache.kafka.clients.admin.ConfigEntry; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -/** - * Data class for containing the data and metadata of the _schemas topic. - */ -public class SchemaTopicBackup { - - @JsonProperty - private final Map settings; - @JsonProperty - private final Set records; - - /** - * Empty backup. - */ - public SchemaTopicBackup() { - settings = new HashMap<>(); - records = new LinkedHashSet<>(); - } - - /** - * Fully ready backup. - */ - @JsonCreator - public SchemaTopicBackup( - @JsonProperty("settings") Map settings, - @JsonProperty("records") List records) { - this.settings = new HashMap<>(settings); - this.records = new LinkedHashSet<>(records); - } - - /** - * Whether the backup contains schemas starting from ID 1. If so, the schema topic is valid. - */ - public boolean startsAtFirstId() { - return records.stream() - .map(SchemaRecord::getSchemaId) - .filter(Objects::nonNull) - .anyMatch(r -> r == 1); - } - - @JsonGetter - @NotNull - public Map getSettings() { - return settings; - } - - /** - * Put settings. - * - * @param settings Kafka topic config settings - */ - public void putAll(@NotNull Map settings) { - this.settings.putAll(settings); - if (!Objects.equals(settings.get(CLEANUP_POLICY_CONFIG), CLEANUP_POLICY_COMPACT)) { - this.settings.put(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT); - } - } - - /** - * Add schema from topic. This will deduplicate using record key. - */ - public void addSchemaRecord( - SchemaRegistrySerializer serializer, - ConsumerRecord record) throws IOException { - - SchemaRecord schemaRecord = null; - SchemaRegistryKey messageKey; - try { - messageKey = serializer.deserializeKey(record.key()); - - if (messageKey.getKeyType() == SchemaRegistryKeyType.SCHEMA - && record.value() != null) { - SchemaRegistryValue message = serializer.deserializeValue( - messageKey, record.value()); - - if (message instanceof SchemaValue) { - SchemaValue schemaValue = (SchemaValue) message; - schemaRecord = new SchemaRecord( - messageKey.getKeyType(), - schemaValue.getSubject(), - schemaValue.getId(), - record.key(), - record.value()); - } - } - } catch (SerializationException ex) { - throw new IOException("Cannot deserialize message", ex); - } - if (schemaRecord == null) { - schemaRecord = new SchemaRecord( - messageKey.getKeyType(), - null, - null, - record.key(), - record.value()); - } - - // preserve order - records.remove(schemaRecord); - records.add(schemaRecord); - } - - @JsonGetter - @NotNull - public List getRecords() { - return new ArrayList<>(records); - } - - /** - * Get the Kafka config of a topic. - * - * @return configuration of the topic. It may be empty if not initialized. - */ - @JsonIgnore - @NotNull - public Config getConfig() { - return new Config(settings.entrySet().stream() - .map(e -> new ConfigEntry(e.getKey(), e.getValue())) - .collect(Collectors.toList())); - } - - /** - * Set the Kafka config of a topic. - * - * @param config configuration of the topic. - */ - @JsonIgnore - public void setConfig(@NotNull Config config) { - putAll(config.entries().stream() - .collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value))); - } - - /** - * Schema record. - */ - public static class SchemaRecord { - - private final SchemaRegistryKeyType type; - private final Integer schemaId; - private final String subject; - private final byte[] key; - private final byte[] value; - - /** - * Full constructor of all properties. - * - * @param type record type - * @param subject schema subject that the record belongs to, or null if not a schema. - * @param schemaId schema ID or null if not a schema. - * @param key raw key data - * @param value raw value data - */ - @JsonCreator - @SuppressWarnings("PMD.ArrayIsStoredDirectly") - public SchemaRecord( - @NotNull @JsonProperty("type") SchemaRegistryKeyType type, - @JsonProperty("subject") String subject, - @JsonProperty("schemaId") Integer schemaId, - @JsonProperty("key") byte[] key, - @JsonProperty("value") byte[] value) { - this.type = type; - this.schemaId = schemaId; - this.subject = subject; - this.key = key; - this.value = value; - } - - public SchemaRegistryKeyType getType() { - return type; - } - - public Integer getSchemaId() { - return schemaId; - } - - public String getSubject() { - return subject; - } - - public byte[] getKey() { - if (key != null) { - return Arrays.copyOf(key, key.length); - } else { - return null; - } - } - - public byte[] getValue() { - if (value != null) { - return Arrays.copyOf(value, value.length); - } else { - return null; - } - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - SchemaRecord that = (SchemaRecord) o; - return Arrays.equals(key, that.key); - } - - @Override - public int hashCode() { - return Arrays.hashCode(key); - } - } -} diff --git a/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaTopicManager.java b/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaTopicManager.java deleted file mode 100644 index 92ce1dc9..00000000 --- a/java-sdk/radar-schemas-registration/src/main/java/org/radarbase/schema/registration/SchemaTopicManager.java +++ /dev/null @@ -1,384 +0,0 @@ -package org.radarbase.schema.registration; - -import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; - -import io.confluent.kafka.schemaregistry.storage.serialization.SchemaRegistrySerializer; -import java.io.IOException; -import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import javax.validation.constraints.NotNull; -import org.apache.kafka.clients.admin.AlterConfigOp; -import org.apache.kafka.clients.admin.AlterConfigOp.OpType; -import org.apache.kafka.clients.admin.AlterConfigsResult; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.config.ConfigResource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Manages the _schemas topic. Currently, this backs up and restores all data in the _schemas - * topic. - */ -@SuppressWarnings("WeakerAccess") -public class SchemaTopicManager { - private static final Logger logger = LoggerFactory.getLogger(SchemaTopicManager.class); - private static final String TOPIC_NAME = "_schemas"; - private static final Duration SECONDARY_TIMEOUT = Duration.ofSeconds(10L); - private final TopicRegistrar topics; - private final SchemaBackupStorage storage; - private final SchemaRegistrySerializer serializer; - private final ConfigResource topicResource; - private boolean isInitialized; - - /** - * Schema topic manager. - * - * @param topicRegistrar topic registrar - * @param storage storage medium to read and write backups from and to. - */ - public SchemaTopicManager( - @NotNull TopicRegistrar topicRegistrar, - @NotNull SchemaBackupStorage storage - ) { - topics = topicRegistrar; - this.storage = storage; - serializer = new SchemaRegistrySerializer(); - topicResource = new ConfigResource(TOPIC, TOPIC_NAME); - isInitialized = false; - } - - /** - * Wait for brokers and topics to become available. - * - * @param numBrokers number of brokers to wait for - * @throws InterruptedException if waiting for the brokers or topics was interrupted. - * @throws IllegalStateException if the brokers or topics are not available - */ - public void initialize(int numBrokers) throws InterruptedException { - topics.initialize(numBrokers); - isInitialized = true; - } - - private void ensureInitialized() { - if (!isInitialized) { - throw new IllegalStateException("Manager is not initialized yet"); - } - } - - /** - * Read a backup from the _schemas topic. This backup only includes the actual schemas, not - * configuration changes or NOOP messages. - * - * @param timeout time to wait for first schema records to become available. - * @return backup of the _schemas topic. - * @throws IOException if a message in the topic cannot be read - * @throws ExecutionException if the topic configuration cannot be read - * @throws InterruptedException if the process was interrupted before finishing - * @throws RuntimeException storage failure or any other error. - * @throws IllegalStateException if this manager was not initialized - */ - @NotNull - public SchemaTopicBackup readBackup(Duration timeout) - throws IOException, ExecutionException, InterruptedException { - ensureInitialized(); - - SchemaTopicBackup storeTopic = new SchemaTopicBackup(); - - try { - readSchemas(getConsumerProps(), storeTopic, timeout); - - storeTopic.setConfig(topics.getKafkaClient() - .describeConfigs(List.of(topicResource)) - .values() - .get(topicResource) - .get()); - } catch (IOException e) { - logger.error("Failed to deserialize the schema or config key", e); - throw e; - } catch (ExecutionException e) { - logger.error("Failed to get _schemas config", e); - throw e; - } catch (RuntimeException ex) { - logger.error("Failed to store schemas", ex); - throw ex; - } catch (InterruptedException ex) { - logger.error("Failed waiting for _schemas records", ex); - Thread.currentThread().interrupt(); - throw ex; - } - - return storeTopic; - } - - /** - * Read a backup from the _schemas topic and store it. This backup only includes the actual - * schemas, not configuration changes or NOOP messages. - * - * @param timeout time to wait for first schema records to become available. - * @throws IOException if a message in the topic cannot be read - * @throws ExecutionException if the topic configuration cannot be read - * @throws InterruptedException if the process was interrupted before finishing - * @throws RuntimeException storage failure or any other error. - * @throws IllegalStateException if this manager was not initialized - */ - public void makeBackup(Duration timeout) - throws IOException, InterruptedException, ExecutionException { - logger.info("Reading backup data"); - SchemaTopicBackup storeTopic = readBackup(timeout); - try { - if (storeTopic.startsAtFirstId()) { - logger.info("Storing valid backup"); - storage.store(storeTopic); - } else { - logger.info("Storing invalid backup"); - storage.storeInvalid(storeTopic); - } - } catch (IOException e) { - logger.error("Failed to store _schemas data", e); - throw e; - } - } - - @NotNull - private Map getConsumerProps() { - Map consumerProps = new HashMap<>(); - - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, - "schema-backup-" + UUID.randomUUID()); - consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "schema-backup"); - - consumerProps.putAll(topics.getKafkaProperties()); - consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - org.apache.kafka.common.serialization.ByteArrayDeserializer.class); - consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - org.apache.kafka.common.serialization.ByteArrayDeserializer.class); - return consumerProps; - } - - @NotNull - private Map getProducerProps() { - Map producerProps = new HashMap<>(); - producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "schema-backup"); - - producerProps.putAll(topics.getKafkaProperties()); - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - org.apache.kafka.common.serialization.ByteArraySerializer.class); - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - org.apache.kafka.common.serialization.ByteArraySerializer.class); - return producerProps; - } - - /** - * Checks if the current _schemas topic is valid. If not, the topic is replaced by the data in - * the backup. The _schemas topic is considered valid if it exists, is not empty and starts at - * id 1. - * - * @param timeout time to wait for first schema records to become available. - * @throws IOException if a message in the topic cannot be read or written - * @throws ExecutionException if the topic configuration cannot be read or written - * @throws InterruptedException if the process was interrupted before finishing - * @throws RuntimeException storage failure or any other error. - * @throws IllegalStateException if this manager was not initialized or schema registry was - * running - */ - public void ensure(short replication, Duration timeout) - throws InterruptedException, ExecutionException, IOException { - ensureInitialized(); - - boolean topicExists = topics.getTopics().contains(TOPIC_NAME); - if (topicExists) { - SchemaTopicBackup backup = readBackup(timeout); - if (backup.startsAtFirstId()) { - logger.info("Existing topic is valid."); - return; - } - logger.info("Existing _schemas topic is invalid. Backing up current topic."); - - try { - storage.storeInvalid(backup); - } catch (IOException e) { - logger.error("Backup storage failure.", e); - throw e; - } - } - SchemaTopicBackup newBackup; - try { - logger.info("Loading data from backup storage at {}", storage.getPath()); - newBackup = storage.load(); - } catch (IOException e) { - logger.error("Backup storage loading failure at {}", storage.getPath(), e); - throw e; - } - if (newBackup == null) { - logger.error("No valid backup in storage at {}", storage.getPath()); - return; - } - - // Backups successful. Remove old topic. - if (topicExists) { - logger.info("Removing existing {} topic", TOPIC_NAME); - topics.getKafkaClient().deleteTopics(List.of(TOPIC_NAME)) - .all().get(); - try { - topics.refreshTopics(); - } catch (InterruptedException e) { - logger.info("Failed to wait to refresh topics"); - Thread.currentThread().interrupt(); - throw e; - } - } - - if (!topics.createTopics(Stream.of(TOPIC_NAME), 1, replication)) { - throw new IllegalStateException("Failed to create _schemas topic"); - } - - commitBackup(newBackup); - } - - /** - * Read the schemas in the _schemas topic and put them in a backup. - */ - private void readSchemas(Map consumerProps, SchemaTopicBackup storeTopic, - Duration timeout) throws IOException { - try (Consumer consumer = new KafkaConsumer<>(consumerProps)) { - ensurePartitions(consumer); - - TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0); - consumer.assign(List.of(topicPartition)); - consumer.seekToBeginning(List.of(topicPartition)); - - logger.info("Kafka store reader thread started"); - - int numRecords = -1; - Duration duration = timeout; - - while (numRecords != 0) { - ConsumerRecords records = consumer.poll(duration); - duration = SECONDARY_TIMEOUT; - numRecords = records.count(); - for (ConsumerRecord record : records) { - storeTopic.addSchemaRecord(serializer, record); - } - } - } - } - - /** - * Ensure that _schemas has exactly one partition. - * - * @param consumer consumer to subscribe partitions with. - * @throws IllegalArgumentException if the _schemas topic cannot be found. - * @throws IllegalStateException if the _schemas topic has more than one partition. - */ - private void ensurePartitions(Consumer consumer) { - // Include a few retries since topic creation may take some time to propagate and schema - // registry is often started immediately after creating the schemas topic. - int retries = 0; - List partitions; - do { - partitions = consumer.partitionsFor(TOPIC_NAME); - if (partitions != null && !partitions.isEmpty()) { - break; - } - - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // ignore - } - retries++; - } - while (retries < 10); - - if (partitions == null || partitions.isEmpty()) { - throw new IllegalArgumentException("Unable to subscribe to the Kafka topic " - + TOPIC_NAME - + " backing this data store. Topic may not exist."); - } else if (partitions.size() > 1) { - throw new IllegalStateException("Unexpected number of partitions in the " - + TOPIC_NAME - + " topic. Expected 1 and instead got " + partitions.size()); - } - } - - /** - * Restores the _schemas from backup. - * - * @throws RuntimeException storage failure or any other error. - * @throws ExecutionException if the topic configuration cannot be written - * @throws IllegalStateException if this manager was not initialized or if the schema registry - * is running. - */ - public void restoreBackup(short replication) - throws IOException, ExecutionException, InterruptedException { - ensureInitialized(); - SchemaTopicBackup storeTopic; - logger.info("Loading backup from {}", storage.getPath()); - try { - storeTopic = storage.load(); - } catch (IOException e) { - logger.error("Failed to load _schemas data", e); - throw e; - } - - if (storeTopic == null) { - logger.error("Backup not available"); - return; - } - - if (topics.getTopics().contains(TOPIC_NAME)) { - throw new IllegalStateException( - "Topic _schemas already exists, cannot restore it from backup"); - } - - if (!topics.createTopics(Stream.of(TOPIC_NAME), 1, replication)) { - throw new IllegalStateException("Failed to create _schemas topic"); - } - - logger.info("Restoring backup"); - commitBackup(storeTopic); - } - - private void commitBackup(SchemaTopicBackup backup) - throws ExecutionException, InterruptedException { - AlterConfigsResult alterResult = topics.getKafkaClient() - .incrementalAlterConfigs(Map.of(topicResource, backup.getConfig().entries().stream() - .map(e -> new AlterConfigOp(e, OpType.SET)) - .collect(Collectors.toList()))); - - try (KafkaProducer producer = new KafkaProducer<>(getProducerProps())) { - List> futures = backup.getRecords().stream() - .map(r -> new ProducerRecord<>(TOPIC_NAME, r.getKey(), r.getValue())) - .map(producer::send) - .collect(Collectors.toList()); - - logger.info("Waiting for records to be committed"); - // collect so we can do a blocking operation after all records have been sent - for (Future future : futures) { - future.get(); - } - logger.info("Records have been committed"); - } - - alterResult.all().get(); - } -} diff --git a/java-sdk/radar-schemas-registration/src/test/java/org/radarbase/schema/registration/JsonSchemaBackupStorageTest.java b/java-sdk/radar-schemas-registration/src/test/java/org/radarbase/schema/registration/JsonSchemaBackupStorageTest.java deleted file mode 100644 index 6603f1f9..00000000 --- a/java-sdk/radar-schemas-registration/src/test/java/org/radarbase/schema/registration/JsonSchemaBackupStorageTest.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.radarbase.schema.registration; - -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; - -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class JsonSchemaBackupStorageTest { - - @Test - public void contentEquals() throws IOException { - Path path1 = Files.createTempFile("some", "test"); - Path path2 = Files.createTempFile("some", "test"); - Path path3 = Files.createTempFile("some", "test"); - Path path4 = Files.createTempFile("some", "test"); - Files.writeString(path1, "some"); - Files.writeString(path2, "some"); - Files.writeString(path3, "soma"); - Files.writeString(path4, "somee"); - - assertTrue(JsonSchemaBackupStorage.contentEquals(path1, path2)); - assertTrue(JsonSchemaBackupStorage.contentEquals(path1, path1)); - assertFalse(JsonSchemaBackupStorage.contentEquals(path1, path3)); - assertFalse(JsonSchemaBackupStorage.contentEquals(path1, path4)); - assertFalse(JsonSchemaBackupStorage.contentEquals(path4, path1)); - } -} diff --git a/java-sdk/radar-schemas-registration/src/test/resources/key_measurement_test.avsc b/java-sdk/radar-schemas-registration/src/test/resources/key_measurement_test.avsc deleted file mode 100644 index a8fc5f2b..00000000 --- a/java-sdk/radar-schemas-registration/src/test/resources/key_measurement_test.avsc +++ /dev/null @@ -1,10 +0,0 @@ -{ - "namespace": "org.radarcns.kafka.key", - "type": "record", - "name": "ObservationKeyTest", - "doc": "Measurement key in the RADAR-CNS project.", - "fields": [ - {"name": "userTestId", "type": "string", "doc": "User Identifier created during the enrolment."}, - {"name": "sourceTestId", "type": "string", "doc": "Unique identifier associated with the source."} - ] -} diff --git a/java-sdk/radar-schemas-registration/src/test/resources/key_windowed_test.avsc b/java-sdk/radar-schemas-registration/src/test/resources/key_windowed_test.avsc deleted file mode 100644 index bf682e32..00000000 --- a/java-sdk/radar-schemas-registration/src/test/resources/key_windowed_test.avsc +++ /dev/null @@ -1,11 +0,0 @@ -{"namespace": "org.radarcns.kafka.key", - "type": "record", - "name": "AggregateKeyTest", - "doc": "Windowed key in the RADAR-CNS project.", - "fields": [ - {"name": "userTestId", "type": "string", "doc": "User Identifier created during the enrolment."}, - {"name": "sourceTestId", "type": "string", "doc": "Unique identifier associated with the source."}, - {"name": "start", "type": "long", "doc": "First timestamp in UNIX time contained in the time window."}, - {"name": "end", "type": "long", "doc": "Last timestamp in UNIX time contained in the time window."} - ] -} diff --git a/java-sdk/radar-schemas-registration/src/test/resources/log4j2.xml b/java-sdk/radar-schemas-registration/src/test/resources/log4j2.xml deleted file mode 100644 index cee4d6bf..00000000 --- a/java-sdk/radar-schemas-registration/src/test/resources/log4j2.xml +++ /dev/null @@ -1,14 +0,0 @@ - - - - - - - - - - - - - - diff --git a/java-sdk/radar-schemas-registration/src/test/resources/schema.yml b/java-sdk/radar-schemas-registration/src/test/resources/schema.yml deleted file mode 100644 index c56f343d..00000000 --- a/java-sdk/radar-schemas-registration/src/test/resources/schema.yml +++ /dev/null @@ -1,14 +0,0 @@ -#============================= SKIP FILE =============================# -files: - - README.md - - .DS_Store - -#====================== SKIP TEST CONFIGURATION ======================# -validation: - org.radarcns.passive.biovotion.BiovotionVsm1OxygenSaturation: - fields: - - spO2 - - spO2Quality - org.radarcns.passive.empatica.EmpaticaE4InterBeatInterval: - fields: - - interBeatInterval diff --git a/java-sdk/radar-schemas-tools/src/main/java/org/radarbase/schema/tools/CommandLineApp.kt b/java-sdk/radar-schemas-tools/src/main/java/org/radarbase/schema/tools/CommandLineApp.kt index 1a6a9f04..10dc867a 100644 --- a/java-sdk/radar-schemas-tools/src/main/java/org/radarbase/schema/tools/CommandLineApp.kt +++ b/java-sdk/radar-schemas-tools/src/main/java/org/radarbase/schema/tools/CommandLineApp.kt @@ -111,7 +111,6 @@ class CommandLineApp( SchemaRegistryCommand(), ListCommand(), ValidatorCommand(), - SchemaTopicManagerCommand(), ).sortedBy { it.name } val parser = getArgumentParser(subCommands) diff --git a/java-sdk/radar-schemas-tools/src/main/java/org/radarbase/schema/tools/SchemaTopicManagerCommand.kt b/java-sdk/radar-schemas-tools/src/main/java/org/radarbase/schema/tools/SchemaTopicManagerCommand.kt deleted file mode 100644 index 5e604e1c..00000000 --- a/java-sdk/radar-schemas-tools/src/main/java/org/radarbase/schema/tools/SchemaTopicManagerCommand.kt +++ /dev/null @@ -1,92 +0,0 @@ -package org.radarbase.schema.tools - -import net.sourceforge.argparse4j.impl.action.StoreConstArgumentAction -import net.sourceforge.argparse4j.inf.ArgumentParser -import net.sourceforge.argparse4j.inf.Namespace -import org.radarbase.schema.registration.* -import org.radarbase.schema.registration.KafkaTopics.Companion.configureKafka -import org.radarbase.schema.tools.SubCommand.Companion.addRootArgument -import org.slf4j.LoggerFactory -import java.nio.file.Paths -import java.time.Duration - -class SchemaTopicManagerCommand : SubCommand { - override val name = "schema-topic" - - override fun execute(options: Namespace, app: CommandLineApp): Int { - val toolConfig = app.config - .configureKafka(bootstrapServers = options.getString("bootstrap_servers")) - try { - KafkaTopics(toolConfig).use { topics -> - val jsonStorage = JsonSchemaBackupStorage( - Paths.get(options.getString("file"))) - val manager = SchemaTopicManager(topics, jsonStorage) - manager.initialize(options.getInt("brokers")) - val timeout = Duration.ofSeconds(options.getInt("timeout").toLong()) - when (options.get(SUBACTION)) { - SubAction.BACKUP -> manager.makeBackup(timeout) - SubAction.RESTORE -> manager.restoreBackup(options.getShort("replication")) - SubAction.ENSURE -> manager.ensure(options.getShort("replication"), timeout) - else -> { - logger.error("Unknown action") - return 3 - } - } - return 0 - } - } catch (ex: Exception) { - logger.error("Action failed: {}", ex.toString()) - return 2 - } - } - - override fun addParser(parser: ArgumentParser) { - parser.apply { - description("Manage the _schemas topic") - addArgument("--backup") - .help("back up schema topic data") - .action(StoreConstArgumentAction()) - .setConst(SubAction.BACKUP) - .dest(SUBACTION) - addArgument("--restore") - .help("restore schema topic from backup") - .action(StoreConstArgumentAction()) - .setConst(SubAction.RESTORE) - .dest(SUBACTION) - addArgument("--ensure") - .help("ensure that the schema topic is restored if needed") - .action(StoreConstArgumentAction()) - .setConst(SubAction.ENSURE) - .dest(SUBACTION) - addArgument("-r", "--replication") - .help("number of replicas per data packet") - .type(Short::class.java).default = 3.toShort() - addArgument("-t", "--timeout") - .help("time (seconds) to wait for records in the _schemas topic to become" - + " available") - .setDefault(600) - .type(Int::class.java) - addArgument("-f", "--file") - .help("JSON file to load _schemas from") - .type(String::class.java) - .required(true) - addArgument("-b", "--brokers") - .help("number of brokers that are expected to be available.") - .type(Int::class.java).default = 3 - addArgument("-s", "--bootstrap-servers") - .help("Kafka hosts, ports and protocols, comma-separated") - .type(String::class.java) - addRootArgument() - } - } - - private enum class SubAction { - BACKUP, RESTORE, ENSURE - } - - companion object { - private const val SUBACTION = "subaction" - private val logger = LoggerFactory.getLogger( - SchemaTopicManagerCommand::class.java) - } -}