diff --git a/CHANGES.txt b/CHANGES.txt index f38a692f5..5012b2f4e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.0.0 ----- + * Create Endpoint that Triggers an Immediate Schema Report (CASSSIDECAR-203) * Adapt to cluster topology change for restore jobs (CASSSIDECAR-185) * Fix PeriodicTaskExecutor double execution due to race from reschedule (CASSSIDECAR-210) * Upgrade Netty to 4.1.118.Final and Vert.x to 4.5.13 Version (CASSSIDECAR-207) @@ -8,7 +9,7 @@ * Sidecar schema initialization can be executed on multiple thread (CASSSIDECAR-200) * Make sidecar operations resilient to down Cassandra nodes (CASSSIDECAR-201) * Fix Cassandra instance not found error (CASSSIDECAR-192) - * Implemented Schema Reporter for Integration with DataHub (CASSSIDECAR-191) + * Implement Schema Reporter for Integration with DataHub (CASSSIDECAR-191) * Add sidecar endpoint to retrieve stream stats (CASSSIDECAR-180) * Add sidecar endpoint to retrieve cassandra gossip health (CASSSIDECAR-173) * Fix SidecarSchema stuck at initialization due to ClusterLeaseTask scheduling (CASSSIDECAR-189) diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java index 511a5daa4..84258a89f 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java @@ -121,6 +121,9 @@ public final class ApiEndpointsV1 public static final String LIST_CDC_SEGMENTS_ROUTE = API_V1 + CDC_PATH + "/segments"; public static final String STREAM_CDC_SEGMENTS_ROUTE = LIST_CDC_SEGMENTS_ROUTE + "/" + SEGMENT_PATH_PARAM; + // Schema Reporting + private static final String REPORT_SCHEMA = "/report-schema"; + public static final String REPORT_SCHEMA_ROUTE = API_V1 + REPORT_SCHEMA; public static final String CONNECTED_CLIENT_STATS_ROUTE = API_V1 + CASSANDRA + "/stats/connected-clients"; diff --git a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java index 240cec81f..12e091d7f 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java @@ -63,6 +63,9 @@ public class BasicPermissions public static final Permission READ_OPERATIONAL_JOB = new DomainAwarePermission("OPERATIONAL_JOB:READ", OPERATION_SCOPE); public static final Permission DECOMMISSION_NODE = new DomainAwarePermission("NODE:DECOMMISSION", OPERATION_SCOPE); + // Permissions related to Schema Reporting + public static final Permission REPORT_SCHEMA = new DomainAwarePermission("SCHEMA:REPORT", CLUSTER_SCOPE); + // cassandra cluster related permissions public static final Permission READ_SCHEMA = new DomainAwarePermission("SCHEMA:READ", CLUSTER_SCOPE); public static final Permission READ_SCHEMA_KEYSPACE_SCOPED = new DomainAwarePermission("SCHEMA:READ", KEYSPACE_SCOPE); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/datahub/IdentifiersProvider.java b/server/src/main/java/org/apache/cassandra/sidecar/datahub/IdentifiersProvider.java index e90f4ca46..3e27dd676 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/datahub/IdentifiersProvider.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/datahub/IdentifiersProvider.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import com.datastax.driver.core.KeyspaceMetadata; import com.datastax.driver.core.TableMetadata; @@ -32,7 +33,7 @@ /** * An abstract class that has to be extended and instantiated for every Cassandra - * cluster that needs its schema converted into a DataHub-compliant format. + * cluster that needs its schema converted into a DataHub-compliant format */ public abstract class IdentifiersProvider { @@ -42,7 +43,26 @@ public abstract class IdentifiersProvider protected static final String DATA_PLATFORM_INSTANCE = "dataPlatformInstance"; protected static final String CONTAINER = "container"; protected static final String DATASET = "dataset"; - protected static final String PROD = "PROD"; // DataHub requires this to be {@code PROD} regardless + protected static final String PROD = "PROD"; // DataHub requires this to be {@code PROD} + + protected static final ToStringStyle STYLE = new ToStringStyle() + {{ + setUseShortClassName(false); + setUseClassName(true); + setUseIdentityHashCode(false); + setUseFieldNames(false); + setContentStart("("); + setFieldSeparatorAtStart(false); + setFieldSeparator(","); + setFieldSeparatorAtEnd(false); + setContentEnd(")"); + setDefaultFullDetail(true); + setArrayContentDetail(true); + setArrayStart("("); + setArraySeparator(","); + setArrayEnd(")"); + setNullText("null"); + }}; /** * A public getter method that returns the name of Cassandra Organization @@ -231,13 +251,19 @@ public boolean equals(@Nullable Object other) @NotNull public String toString() { - return new ToStringBuilder(this) + return new ToStringBuilder(this, STYLE) .append(this.organization()) .append(this.platform()) .append(this.environment()) .append(this.application()) .append(this.cluster()) .append(this.identifier()) - .toString(); + .toString() + .replaceAll("\\s", ""); + + // Use of a custom {@link ToStringStyle} implementation prevents the hash code from being + // included into the {@link String} representation; which, in conjunction with the removal + // of all whitespace characters, simplifies extraction of {@link IdentifierProvider} + // objects from the logs with Splunk; without negatively affecting the readability much } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java b/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java index ab2ddb544..91a87fa40 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReporter.java @@ -20,9 +20,13 @@ package org.apache.cassandra.sidecar.datahub; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.Streams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.KeyspaceMetadata; @@ -34,6 +38,9 @@ import datahub.client.Emitter; import datahub.event.MetadataChangeProposalWrapper; import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils; +import org.apache.cassandra.sidecar.metrics.DeltaGauge; +import org.apache.cassandra.sidecar.metrics.SidecarMetrics; +import org.apache.cassandra.sidecar.metrics.server.SchemaReportingMetrics; import org.jetbrains.annotations.NotNull; /** @@ -47,6 +54,8 @@ @Singleton public class SchemaReporter { + private static final Logger LOGGER = LoggerFactory.getLogger(SchemaReporter.class); + @NotNull protected final IdentifiersProvider identifiersProvider; @NotNull @@ -57,6 +66,8 @@ public class SchemaReporter protected final List> tableConverters; @NotNull protected final EmitterFactory emitterFactory; + @NotNull + protected final SchemaReportingMetrics reportingMetrics; /** * The public constructor that instantiates {@link SchemaReporter} with default configuration. @@ -66,10 +77,12 @@ public class SchemaReporter * * @param identifiersProvider an instance of {@link IdentifiersProvider} to use * @param emitterFactory an instance of {@link EmitterFactory} to use + * @param sidecarMetrics an instance of {@link SidecarMetrics} to obtain {@link SchemaReportingMetrics} from */ @Inject public SchemaReporter(@NotNull IdentifiersProvider identifiersProvider, - @NotNull EmitterFactory emitterFactory) + @NotNull EmitterFactory emitterFactory, + @NotNull SidecarMetrics sidecarMetrics) { this(identifiersProvider, ImmutableList.of(new ClusterToDataPlatformInfoConverter(identifiersProvider), @@ -85,7 +98,8 @@ public SchemaReporter(@NotNull IdentifiersProvider identifiersProvider, new TableToDataPlatformInstanceConverter(identifiersProvider), new TableToBrowsePathsV2Converter(identifiersProvider), new TableToBrowsePathsConverter(identifiersProvider)), - emitterFactory); + emitterFactory, + sidecarMetrics.server().schemaReporting()); } /** @@ -96,35 +110,73 @@ public SchemaReporter(@NotNull IdentifiersProvider identifiersProvider, * @param keyspaceConverters a {@link List} of {@link KeyspaceToAspectConverter} instances to use * @param tableConverters a {@link List} of {@link TableToAspectConverter} instances to use * @param emitterFactory an instance of {@link EmitterFactory} to use + * @param reportingMetrics an instance of {@link SchemaReportingMetrics} to use */ protected SchemaReporter(@NotNull IdentifiersProvider identifiersProvider, @NotNull List> clusterConverters, @NotNull List> keyspaceConverters, @NotNull List> tableConverters, - @NotNull EmitterFactory emitterFactory) + @NotNull EmitterFactory emitterFactory, + @NotNull SchemaReportingMetrics reportingMetrics) { this.identifiersProvider = identifiersProvider; this.clusterConverters = clusterConverters; this.keyspaceConverters = keyspaceConverters; this.tableConverters = tableConverters; this.emitterFactory = emitterFactory; + this.reportingMetrics = reportingMetrics; + } + + /** + * Public method for converting and reporting the Cassandra schema when triggered by a scheduled periodic task + * + * @param cluster the {@link Cluster} to extract Cassandra schema from + */ + public void processScheduled(@NotNull Cluster cluster) + { + process(cluster.getMetadata(), reportingMetrics.startedSchedule.metric); } /** - * Public method for converting and reporting the Cassandra schema + * Public method for converting and reporting the Cassandra schema when triggered by a received API request * - * @param cluster a {@link Cluster} to extract Cassandra schema from + * @param metadata the {@link Metadata} to extract Cassandra schema from */ - public void process(@NotNull Cluster cluster) + public void processRequested(@NotNull Metadata metadata) { + process(metadata, reportingMetrics.startedRequest.metric); + } + + /** + * Private method for converting and reporting the Cassandra schema + * + * @param metadata the {@link Metadata} to extract Cassandra schema from + * @param started the {@link DeltaGauge} for the metric counting invocations + */ + private void process(@NotNull Metadata metadata, + @NotNull DeltaGauge started) + { + String action = " reporting schema for cluster with identifiers " + identifiersProvider; + LOGGER.info("Started" + action); + started.increment(); + try (Emitter emitter = emitterFactory.emitter()) { - stream(cluster.getMetadata()) - .forEach(ThrowableUtils.consumer(emitter::emit)); + Stopwatch stopwatch = Stopwatch.createStarted(); + long counter = stream(metadata) + .map(ThrowableUtils.function(emitter::emit)) + .count(); + + reportingMetrics.durationMilliseconds.metric.update(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + reportingMetrics.sizeAspects.metric.update(counter); + reportingMetrics.finishedSuccess.metric.increment(); + LOGGER.info("Success" + action); } catch (Exception exception) { - throw new RuntimeException("Cannot extract schema for cluster " + identifiersProvider.cluster(), exception); + reportingMetrics.finishedFailure.metric.increment(); + LOGGER.error("Failure" + action); + throw new RuntimeException(action, exception); } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReportingTask.java b/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReportingTask.java index e512f19a0..c46bb571a 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReportingTask.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/datahub/SchemaReportingTask.java @@ -89,7 +89,7 @@ public void execute(Promise promise) { try { - reporter.process(session.get().getCluster()); + reporter.processScheduled(session.get().getCluster()); promise.complete(); } catch (Throwable throwable) diff --git a/server/src/main/java/org/apache/cassandra/sidecar/metrics/DeltaGauge.java b/server/src/main/java/org/apache/cassandra/sidecar/metrics/DeltaGauge.java index aae5bd663..b2ae14a6a 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/metrics/DeltaGauge.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/metrics/DeltaGauge.java @@ -33,7 +33,15 @@ public class DeltaGauge implements Gauge, Metric public DeltaGauge() { - this.count = new AtomicLong(); + this.count = new AtomicLong(0L); + } + + /** + * Increments the cumulative value tracked by this {@link DeltaGauge} + */ + public void increment() + { + count.incrementAndGet(); } /** @@ -55,6 +63,6 @@ public void update(long delta) @Override public Long getValue() { - return count.getAndSet(0); + return count.getAndSet(0L); } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java index 0fb32eb41..ea8d78577 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java @@ -19,6 +19,7 @@ package org.apache.cassandra.sidecar.metrics; import org.apache.cassandra.sidecar.db.schema.SidecarSchema; +import org.apache.cassandra.sidecar.metrics.server.SchemaReportingMetrics; import static org.apache.cassandra.sidecar.metrics.SidecarMetrics.APP_PREFIX; @@ -49,6 +50,11 @@ public interface ServerMetrics */ SchemaMetrics schema(); + /** + * @return metrics for the schema reporting + */ + SchemaReportingMetrics schemaReporting(); + /** * @return metrics related to internal caches that are tracked. */ diff --git a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java index 75b0a2482..15e31e6ae 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java @@ -21,6 +21,7 @@ import java.util.Objects; import com.codahale.metrics.MetricRegistry; +import org.apache.cassandra.sidecar.metrics.server.SchemaReportingMetrics; /** * {@link ServerMetrics} tracks metrics related to Sidecar server. @@ -32,6 +33,7 @@ public class ServerMetricsImpl implements ServerMetrics protected final ResourceMetrics resourceMetrics; protected final RestoreMetrics restoreMetrics; protected final SchemaMetrics schemaMetrics; + protected final SchemaReportingMetrics schemaReportingMetrics; protected final CacheMetrics cacheMetrics; protected final CoordinationMetrics coordinationMetrics; @@ -43,6 +45,7 @@ public ServerMetricsImpl(MetricRegistry metricRegistry) this.resourceMetrics = new ResourceMetrics(metricRegistry); this.restoreMetrics = new RestoreMetrics(metricRegistry); this.schemaMetrics = new SchemaMetrics(metricRegistry); + this.schemaReportingMetrics = new SchemaReportingMetrics(metricRegistry); this.cacheMetrics = new CacheMetrics(metricRegistry); this.coordinationMetrics = new CoordinationMetrics(metricRegistry); } @@ -71,6 +74,12 @@ public SchemaMetrics schema() return schemaMetrics; } + @Override + public SchemaReportingMetrics schemaReporting() + { + return schemaReportingMetrics; + } + @Override public CacheMetrics cache() { diff --git a/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/SchemaReportingMetrics.java b/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/SchemaReportingMetrics.java new file mode 100644 index 000000000..931bcdeb3 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/SchemaReportingMetrics.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.metrics.server; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import org.apache.cassandra.sidecar.metrics.DeltaGauge; +import org.apache.cassandra.sidecar.metrics.NamedMetric; +import org.apache.cassandra.sidecar.metrics.ServerMetrics; +import org.jetbrains.annotations.NotNull; + +/** + * Tracks metrics for the schema reporting done by Sidecar + */ +public class SchemaReportingMetrics +{ + protected static final String DOMAIN = ServerMetrics.SERVER_PREFIX + ".SchemaReporting"; + protected static final String STARTED = "Started"; + protected static final String FINISHED = "Finished"; + protected static final String DURATION = "Duration"; + protected static final String SIZE = "Size"; + protected static final String TRIGGER = "Trigger"; + protected static final String RESULT = "Result"; + protected static final String UNIT = "Unit"; + protected static final String REQUEST = "Request"; + protected static final String SCHEDULE = "Schedule"; + protected static final String SUCCESS = "Success"; + protected static final String FAILURE = "Failure"; + protected static final String ASPECTS = "Aspects"; + protected static final String MILLISECONDS = "Milliseconds"; + + public final NamedMetric startedRequest; + public final NamedMetric startedSchedule; + public final NamedMetric finishedSuccess; + public final NamedMetric finishedFailure; + public final NamedMetric durationMilliseconds; + public final NamedMetric sizeAspects; + + public SchemaReportingMetrics(@NotNull MetricRegistry registry) + { + startedSchedule = NamedMetric.builder(name -> registry.gauge(name, DeltaGauge::new)) + .withDomain(DOMAIN) + .withName(STARTED) + .addTag(TRIGGER, SCHEDULE) + .build(); + startedRequest = NamedMetric.builder(name -> registry.gauge(name, DeltaGauge::new)) + .withDomain(DOMAIN) + .withName(STARTED) + .addTag(TRIGGER, REQUEST) + .build(); + + finishedSuccess = NamedMetric.builder(name -> registry.gauge(name, DeltaGauge::new)) + .withDomain(DOMAIN) + .withName(FINISHED) + .addTag(RESULT, SUCCESS) + .build(); + finishedFailure = NamedMetric.builder(name -> registry.gauge(name, DeltaGauge::new)) + .withDomain(DOMAIN) + .withName(FINISHED) + .addTag(RESULT, FAILURE) + .build(); + + sizeAspects = NamedMetric.builder(registry::histogram) + .withDomain(DOMAIN) + .withName(SIZE) + .addTag(UNIT, ASPECTS) + .build(); + + durationMilliseconds = NamedMetric.builder(registry::histogram) + .withDomain(DOMAIN) + .withName(DURATION) + .addTag(UNIT, MILLISECONDS) + .build(); + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandler.java new file mode 100644 index 000000000..bc8a30179 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandler.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.Collections; +import java.util.Set; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.datahub.SchemaReporter; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * An implementation of {@link AbstractHandler} used to trigger an immediate, + * synchronous conversion and report of the current schema + */ +@Singleton +public class ReportSchemaHandler extends AbstractHandler implements AccessProtected +{ + @NotNull + private final SchemaReporter schemaReporter; + + /** + * Constructs a new instance of {@link ReportSchemaHandler} using the provided instances + * of {@link InstanceMetadataFetcher}, {@link ExecutorPools}, and {@link SchemaReporter} + * + * @param metadata the metadata fetcher + * @param executor executor pools for blocking executions + * @param reporter executor pools for blocking executions + */ + @Inject + public ReportSchemaHandler(@NotNull InstanceMetadataFetcher metadata, + @NotNull ExecutorPools executor, + @NotNull SchemaReporter reporter) + { + super(metadata, executor, null); + + schemaReporter = reporter; + } + + /** + * {@inheritDoc} + */ + @Override + @NotNull + public Set requiredAuthorizations() + { + return Collections.singleton(BasicPermissions.REPORT_SCHEMA.toAuthorization()); + } + + /** + * {@inheritDoc} + */ + @Override + @Nullable + protected Void extractParamsOrThrow(@NotNull RoutingContext context) + { + return null; + } + + /** + * {@inheritDoc} + */ + @Override + protected void handleInternal(@NotNull RoutingContext context, + @NotNull HttpServerRequest http, + @NotNull String host, + @NotNull SocketAddress address, + @Nullable Void request) + { + executorPools.service() + .runBlocking(() -> metadataFetcher.runOnFirstAvailableInstance(instance -> + schemaReporter.processRequested(instance.delegate().metadata()))) + .onSuccess(context::json) + .onFailure(throwable -> processFailure(throwable, context, host, address, request)); + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java index bd8d448be..8386fc517 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java @@ -131,6 +131,7 @@ import org.apache.cassandra.sidecar.routes.ListOperationalJobsHandler; import org.apache.cassandra.sidecar.routes.NodeDecommissionHandler; import org.apache.cassandra.sidecar.routes.OperationalJobHandler; +import org.apache.cassandra.sidecar.routes.ReportSchemaHandler; import org.apache.cassandra.sidecar.routes.RingHandler; import org.apache.cassandra.sidecar.routes.RoutingOrder; import org.apache.cassandra.sidecar.routes.SchemaHandler; @@ -341,6 +342,7 @@ public Router vertxRouter(Vertx vertx, SSTableCleanupHandler ssTableCleanupHandler, StreamCdcSegmentHandler streamCdcSegmentHandler, ListCdcDirHandler listCdcDirHandler, + ReportSchemaHandler reportSchemaHandler, RestoreRequestValidationHandler validateRestoreJobRequest, DiskSpaceProtectionHandler diskSpaceProtection, ValidateTableExistenceHandler validateTableExistence, @@ -598,6 +600,14 @@ public Router vertxRouter(Vertx vertx, .handler(streamCdcSegmentHandler) .build(); + // Schema Reporting + protectedRouteBuilderFactory.get() + .router(router) + .method(HttpMethod.GET) + .endpoint(ApiEndpointsV1.REPORT_SCHEMA_ROUTE) + .handler(reportSchemaHandler) + .build(); + return router; } @@ -902,7 +912,8 @@ public IdentifiersProvider identifiersProvider(@NotNull InstanceMetadataFetcher @NotNull protected String initialize() { - return fetcher.callOnFirstAvailableInstance(i -> i.delegate().storageOperations().clusterName()); + return fetcher.callOnFirstAvailableInstance(instance -> + instance.delegate().storageOperations().clusterName()); } }; diff --git a/server/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java b/server/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java index ff85cd73c..9889adff2 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java @@ -19,8 +19,8 @@ package org.apache.cassandra.sidecar.utils; import java.util.List; +import java.util.function.Consumer; import java.util.function.Function; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,13 +97,29 @@ public CassandraAdapterDelegate delegate(@NotNull String host) throws NoSuchCass } /** - * Iterate through the local instances and call the function on the first available instance, i.e. no CassandraUnavailableException - * or OperationUnavailableException is thrown for the operations + * Iterate through the local instances and run the {@link Consumer} on the first available one, + * so no {@link CassandraUnavailableException} or {@link OperationUnavailableException} is thrown for the operations + * + * @param consumer a {@link Consumer} that processes {@link InstanceMetadata} and returns no result + * @throws CassandraUnavailableException if all local instances were exhausted + */ + public void runOnFirstAvailableInstance(Consumer consumer) throws CassandraUnavailableException + { + callOnFirstAvailableInstance(metadata -> + { + consumer.accept(metadata); + return null; + }); + } + + /** + * Iterate through the local instances and call the {@link Function} on the first available one, + * so no {@link CassandraUnavailableException} or {@link OperationUnavailableException} is thrown for the operations * - * @param function function applies to {@link InstanceMetadata} - * @return function eval result. Null can be returned when all local instances are exhausted * @param type of the result - * @throws CassandraUnavailableException when all local instances are exhausted. + * @param function a {@link Function} that maps {@link InstanceMetadata} to {@link T} + * @return evaluation result of the {@code function}; can be {@code null} if all local instances were exhausted + * @throws CassandraUnavailableException if all local instances were exhausted */ @NotNull public T callOnFirstAvailableInstance(Function function) throws CassandraUnavailableException diff --git a/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java b/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java index 08c31097e..3c29bb0bd 100644 --- a/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java +++ b/server/src/test/integration/org/apache/cassandra/sidecar/datahub/SchemaReporterIntegrationTest.java @@ -21,26 +21,53 @@ import java.io.IOException; import java.io.StringReader; +import java.util.Collections; import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import com.codahale.metrics.SharedMetricRegistries; import com.datastax.driver.core.Session; import com.linkedin.data.DataList; import com.linkedin.data.codec.JacksonDataCodec; import org.apache.cassandra.sidecar.common.server.utils.IOUtils; +import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory; +import org.apache.cassandra.sidecar.metrics.SidecarMetrics; +import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl; +import org.apache.cassandra.sidecar.metrics.server.SchemaReportingMetrics; import org.apache.cassandra.sidecar.testing.IntegrationTestBase; import org.apache.cassandra.testing.CassandraIntegrationTest; import org.jetbrains.annotations.NotNull; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.assertj.core.api.Assertions.assertThat; /** * Integration test for {@link SchemaReporter} */ -@SuppressWarnings({"try", "unused"}) +@SuppressWarnings("resource") final class SchemaReporterIntegrationTest extends IntegrationTestBase { private static final IdentifiersProvider IDENTIFIERS = new TestIdentifiers(); private static final JacksonDataCodec CODEC = new JacksonDataCodec(); + private static final MetricRegistryFactory FACTORY = new MetricRegistryFactory(SchemaReporterTest.class.getSimpleName(), + Collections.emptyList(), + Collections.emptyList()); + + private SidecarMetrics metrics; + + @BeforeEach + void beforeEach() + { + metrics = new SidecarMetricsImpl(FACTORY, null); + } + + @AfterEach + void afterEach() + { + SharedMetricRegistries.clear(); + } /** * Private helper method that removes all numeric suffixes added non-deterministically @@ -97,21 +124,31 @@ void testSchemaConverter() throws IOException JsonEmitter emitter = new JsonEmitter(); try (Session session = maybeGetSession()) { - new SchemaReporter(IDENTIFIERS, () -> emitter) - .process(session.getCluster()); + new SchemaReporter(IDENTIFIERS, () -> emitter, metrics) + .processScheduled(session.getCluster()); } String actualJson = normalizeNames(emitter.content()); String expectedJson = IOUtils.readFully("/datahub/integration_test.json"); - assertThat(actualJson) .isEqualToNormalizingWhitespace(expectedJson); - // Finally, make sure the returned schema produces the same tree of + // Second, make sure the returned schema produces the same tree of // DataHub objects after having been normalized and deserialized DataList actualData = CODEC.readList(new StringReader(actualJson)); DataList expectedData = CODEC.readList(new StringReader(expectedJson)); - assertThat(actualData) .isEqualTo(expectedData); + + // Third, validate the captured metrics: one execution triggered by the schedule and + // completed successfully, with thirteen aspects produced in zero or more milliseconds + SchemaReportingMetrics metrics = this.metrics.server().schemaReporting(); + assertEquals(0L, metrics.startedRequest.metric.getValue()); + assertEquals(1L, metrics.startedSchedule.metric.getValue()); + assertEquals(1L, metrics.finishedSuccess.metric.getValue()); + assertEquals(0L, metrics.finishedFailure.metric.getValue()); + assertEquals(1L, metrics.sizeAspects.metric.getCount()); + assertEquals(13L, metrics.sizeAspects.metric.getSnapshot().getValues()[0]); + assertEquals(1L, metrics.durationMilliseconds.metric.getCount()); + assertTrue(0L <= metrics.durationMilliseconds.metric.getSnapshot().getValues()[0]); } } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java b/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java index 920aa9f6f..283fb8caa 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/datahub/SchemaReporterTest.java @@ -22,8 +22,11 @@ import java.io.IOException; import java.util.Collections; import com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.codahale.metrics.SharedMetricRegistries; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ColumnMetadata; import com.datastax.driver.core.DataType; @@ -33,70 +36,101 @@ import com.datastax.driver.core.TableOptionsMetadata; import com.datastax.driver.core.UserType; import org.apache.cassandra.sidecar.common.server.utils.IOUtils; +import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory; +import org.apache.cassandra.sidecar.metrics.SidecarMetrics; +import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl; +import org.apache.cassandra.sidecar.metrics.server.SchemaReportingMetrics; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; /** * Unit tests for {@link SchemaReporter} */ -@SuppressWarnings("try") +@SuppressWarnings("resource") final class SchemaReporterTest { private static final IdentifiersProvider IDENTIFIERS = new TestIdentifiers(); + private static final MetricRegistryFactory FACTORY = new MetricRegistryFactory(SchemaReporterTest.class.getSimpleName(), + Collections.emptyList(), + Collections.emptyList()); + + private SidecarMetrics metrics; + + @BeforeEach + void beforeEach() + { + metrics = new SidecarMetricsImpl(FACTORY, null); + } + + @AfterEach + void afterEach() + { + SharedMetricRegistries.clear(); + } @Test void testEmptyCluster() throws IOException { - Cluster cluster = mock(Cluster.class); - Metadata metadata = mock (Metadata.class); - when(cluster.getClusterName()).thenReturn("sample_cluster"); - when(cluster.getMetadata()).thenReturn(metadata); + Metadata metadata = mock(Metadata.class); when(metadata.getKeyspaces()).thenReturn(Collections.emptyList()); JsonEmitter emitter = new JsonEmitter(); - new SchemaReporter(IDENTIFIERS, () -> emitter) - .process(cluster); + new SchemaReporter(IDENTIFIERS, () -> emitter, metrics) + .processRequested(metadata); String actual = emitter.content(); String expected = IOUtils.readFully("/datahub/empty_cluster.json"); - assertEquals(expected, actual); + + SchemaReportingMetrics metrics = this.metrics.server().schemaReporting(); // Validate captured metrics: + assertEquals(1L, metrics.startedRequest.metric.getValue()); // * one execution triggered by request + assertEquals(0L, metrics.startedSchedule.metric.getValue()); // * zero executions triggered by schedule + assertEquals(1L, metrics.finishedSuccess.metric.getValue()); // * one execution resulted in success + assertEquals(0L, metrics.finishedFailure.metric.getValue()); // * zero executions resulted in failure + assertEquals(1L, metrics.sizeAspects.metric.getCount()); // * single number of aspects, + assertEquals(2L, metrics.sizeAspects.metric.getSnapshot().getValues()[0]); // equal to two + assertEquals(1L, metrics.durationMilliseconds.metric.getCount()); // * single duration of execution, + assertTrue(0L <= metrics.durationMilliseconds.metric.getSnapshot().getValues()[0]); // that is non-negative } @Test void testEmptyKeyspace() throws IOException { - Cluster cluster = mock(Cluster.class); - Metadata metadata = mock (Metadata.class); + Metadata metadata = mock(Metadata.class); KeyspaceMetadata keyspace = mock(KeyspaceMetadata.class); - when(cluster.getClusterName()).thenReturn("sample_cluster"); - when(cluster.getMetadata()).thenReturn(metadata); when(metadata.getKeyspaces()).thenReturn(Collections.singletonList(keyspace)); when(keyspace.getName()).thenReturn("sample_keyspace"); when(keyspace.getTables()).thenReturn(Collections.emptyList()); JsonEmitter emitter = new JsonEmitter(); - new SchemaReporter(IDENTIFIERS, () -> emitter) - .process(cluster); + new SchemaReporter(IDENTIFIERS, () -> emitter, metrics) + .processRequested(metadata); String actual = emitter.content(); String expected = IOUtils.readFully("/datahub/empty_keyspace.json"); - assertEquals(expected, actual); + + SchemaReportingMetrics metrics = this.metrics.server().schemaReporting(); // Validate captured metrics: + assertEquals(1L, metrics.startedRequest.metric.getValue()); // * one execution triggered by request + assertEquals(0L, metrics.startedSchedule.metric.getValue()); // * zero executions triggered by schedule + assertEquals(1L, metrics.finishedSuccess.metric.getValue()); // * one execution resulted in success + assertEquals(0L, metrics.finishedFailure.metric.getValue()); // * zero executions resulted in failure + assertEquals(1L, metrics.sizeAspects.metric.getCount()); // * single number of aspects, + assertEquals(6L, metrics.sizeAspects.metric.getSnapshot().getValues()[0]); // equal to six + assertEquals(1L, metrics.durationMilliseconds.metric.getCount()); // * single duration of execution, + assertTrue(0L <= metrics.durationMilliseconds.metric.getSnapshot().getValues()[0]); // that is non-negative } @Test void testEmptyTable() throws IOException { - Cluster cluster = mock(Cluster.class); - Metadata metadata = mock (Metadata.class); + Metadata metadata = mock(Metadata.class); KeyspaceMetadata keyspace = mock(KeyspaceMetadata.class); TableMetadata table = mock(TableMetadata.class); TableOptionsMetadata options = mock(TableOptionsMetadata.class); - when(cluster.getClusterName()).thenReturn("sample_cluster"); - when(cluster.getMetadata()).thenReturn(metadata); when(metadata.getKeyspaces()).thenReturn(Collections.singletonList(keyspace)); when(keyspace.getName()).thenReturn("sample_keyspace"); when(keyspace.getTables()).thenReturn(Collections.singletonList(table)); @@ -107,20 +141,29 @@ void testEmptyTable() throws IOException when(options.getComment()).thenReturn("table comment"); JsonEmitter emitter = new JsonEmitter(); - new SchemaReporter(IDENTIFIERS, () -> emitter) - .process(cluster); + new SchemaReporter(IDENTIFIERS, () -> emitter, metrics) + .processRequested(metadata); String actual = emitter.content(); String expected = IOUtils.readFully("/datahub/empty_table.json"); - assertEquals(expected, actual); + + SchemaReportingMetrics metrics = this.metrics.server().schemaReporting(); // Validate captured metrics: + assertEquals(1L, metrics.startedRequest.metric.getValue()); // * one execution triggered by request + assertEquals(0L, metrics.startedSchedule.metric.getValue()); // * zero executions triggered by schedule + assertEquals(1L, metrics.finishedSuccess.metric.getValue()); // * one execution resulted in success + assertEquals(0L, metrics.finishedFailure.metric.getValue()); // * zero executions resulted in failure + assertEquals(1L, metrics.sizeAspects.metric.getCount()); // * single number of aspects, + assertEquals(13L, metrics.sizeAspects.metric.getSnapshot().getValues()[0]); // equal to thirteen + assertEquals(1L, metrics.durationMilliseconds.metric.getCount()); // * single duration of execution, + assertTrue(0L <= metrics.durationMilliseconds.metric.getSnapshot().getValues()[0]); // that is non-negative } @Test void testPrimitiveTypes() throws IOException { Cluster cluster = mock(Cluster.class); - Metadata metadata = mock (Metadata.class); + Metadata metadata = mock(Metadata.class); KeyspaceMetadata keyspace = mock(KeyspaceMetadata.class); TableMetadata table = mock(TableMetadata.class); TableOptionsMetadata options = mock(TableOptionsMetadata.class); @@ -136,7 +179,6 @@ void testPrimitiveTypes() throws IOException ColumnMetadata c6 = mock(ColumnMetadata.class); ColumnMetadata c7 = mock(ColumnMetadata.class); ColumnMetadata c8 = mock(ColumnMetadata.class); - when(cluster.getClusterName()).thenReturn("sample_cluster"); when(cluster.getMetadata()).thenReturn(metadata); when(metadata.getKeyspaces()).thenReturn(Collections.singletonList(keyspace)); when(keyspace.getName()).thenReturn("sample_keyspace"); @@ -187,20 +229,29 @@ void testPrimitiveTypes() throws IOException when(c8.getType()).thenReturn(DataType.map(DataType.timestamp(), DataType.inet(), false)); JsonEmitter emitter = new JsonEmitter(); - new SchemaReporter(IDENTIFIERS, () -> emitter) - .process(cluster); + new SchemaReporter(IDENTIFIERS, () -> emitter, metrics) + .processScheduled(cluster); String actual = emitter.content(); String expected = IOUtils.readFully("/datahub/primitive_types.json"); - assertEquals(expected, actual); + + SchemaReportingMetrics metrics = this.metrics.server().schemaReporting(); // Validate captured metrics: + assertEquals(0L, metrics.startedRequest.metric.getValue()); // * zero executions triggered by request + assertEquals(1L, metrics.startedSchedule.metric.getValue()); // * one execution triggered by schedule + assertEquals(1L, metrics.finishedSuccess.metric.getValue()); // * one execution resulted in success + assertEquals(0L, metrics.finishedFailure.metric.getValue()); // * zero executions resulted in failure + assertEquals(1L, metrics.sizeAspects.metric.getCount()); // * single number of aspects, + assertEquals(13L, metrics.sizeAspects.metric.getSnapshot().getValues()[0]); // equal to thirteen + assertEquals(1L, metrics.durationMilliseconds.metric.getCount()); // * single duration of execution, + assertTrue(0L <= metrics.durationMilliseconds.metric.getSnapshot().getValues()[0]); // that is non-negative } @Test void testUserTypes() throws IOException { Cluster cluster = mock(Cluster.class); - Metadata metadata = mock (Metadata.class); + Metadata metadata = mock(Metadata.class); KeyspaceMetadata keyspace = mock(KeyspaceMetadata.class); TableMetadata table = mock(TableMetadata.class); TableOptionsMetadata options = mock(TableOptionsMetadata.class); @@ -215,7 +266,6 @@ void testUserTypes() throws IOException UserType.Field udt1c1 = mock(UserType.Field.class); UserType.Field udt1udt2 = mock(UserType.Field.class); UserType.Field udt2c2 = mock(UserType.Field.class); - when(cluster.getClusterName()).thenReturn("sample_cluster"); when(cluster.getMetadata()).thenReturn(metadata); when(metadata.getKeyspaces()).thenReturn(Collections.singletonList(keyspace)); when(keyspace.getName()).thenReturn("sample_keyspace"); @@ -261,12 +311,21 @@ void testUserTypes() throws IOException when(udt2c2.getType()).thenReturn(DataType.cboolean()); JsonEmitter emitter = new JsonEmitter(); - new SchemaReporter(IDENTIFIERS, () -> emitter) - .process(cluster); + new SchemaReporter(IDENTIFIERS, () -> emitter, metrics) + .processScheduled(cluster); String actual = emitter.content(); String expected = IOUtils.readFully("/datahub/user_types.json"); - assertEquals(expected, actual); + + SchemaReportingMetrics metrics = this.metrics.server().schemaReporting(); // Validate captured metrics: + assertEquals(0L, metrics.startedRequest.metric.getValue()); // * zero executions triggered by request + assertEquals(1L, metrics.startedSchedule.metric.getValue()); // * one execution triggered by schedule + assertEquals(1L, metrics.finishedSuccess.metric.getValue()); // * one execution resulted in success + assertEquals(0L, metrics.finishedFailure.metric.getValue()); // * zero executions resulted in failure + assertEquals(1L, metrics.sizeAspects.metric.getCount()); // * single number of aspects, + assertEquals(13L, metrics.sizeAspects.metric.getSnapshot().getValues()[0]); // equal to thirteen + assertEquals(1L, metrics.durationMilliseconds.metric.getCount()); // * single duration of execution, + assertTrue(0L <= metrics.durationMilliseconds.metric.getSnapshot().getValues()[0]); // that is non-negative } } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/metrics/DeltaGaugeTest.java b/server/src/test/java/org/apache/cassandra/sidecar/metrics/DeltaGaugeTest.java index ab132684a..398ebc009 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/metrics/DeltaGaugeTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/metrics/DeltaGaugeTest.java @@ -27,6 +27,23 @@ */ class DeltaGaugeTest { + @Test + void testCumulativeCountIncremented() + { + DeltaGauge deltaGauge = new DeltaGauge(); + + assertThat(deltaGauge.getValue()).isEqualTo(0L); + deltaGauge.increment(); + assertThat(deltaGauge.getValue()).isEqualTo(1L); + assertThat(deltaGauge.getValue()).isEqualTo(0L); + deltaGauge.increment(); + deltaGauge.increment(); + deltaGauge.increment(); + assertThat(deltaGauge.getValue()).isEqualTo(3L); + assertThat(deltaGauge.getValue()).isEqualTo(0L); + assertThat(deltaGauge.getValue()).isEqualTo(0L); + } + @Test void testCumulativeCountUpdated() { diff --git a/server/src/test/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandlerTest.java new file mode 100644 index 000000000..088b10c53 --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/routes/ReportSchemaHandlerTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import com.datastax.driver.core.Metadata; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import com.linkedin.mxe.MetadataChangeProposal; +import datahub.client.Callback; +import datahub.client.MetadataWriteResponse; +import datahub.shaded.findbugs.annotations.SuppressFBWarnings; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.predicate.ResponsePredicate; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.common.server.utils.IOUtils; +import org.apache.cassandra.sidecar.datahub.EmitterFactory; +import org.apache.cassandra.sidecar.datahub.JsonEmitter; +import org.apache.cassandra.sidecar.server.MainModule; +import org.apache.cassandra.sidecar.server.Server; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link ReportSchemaHandler} + */ +@ExtendWith(VertxExtension.class) +final class ReportSchemaHandlerTest +{ + private static final String CLUSTER = "cluster"; + private static final String DIRECTORY = "/tmp"; + private static final int IDENTIFIER = 42; + private static final String LOCALHOST = "127.0.0.1"; + private static final int PORT = 9042; + private static final String ENDPOINT = "/api/v1/report-schema"; + private static final Duration TIMEOUT = Duration.ofSeconds(5); + + private static final class ThrowingEmitter extends JsonEmitter + { + @Override + @NotNull + public synchronized Future emit(@NotNull MetadataChangeProposal proposal, + @Nullable Callback callback) throws IOException + { + throw new IOException(); + } + } + + private final class ReportSchemaHandlerTestModule extends AbstractModule + { + @Provides + @Singleton + @NotNull + public InstancesMetadata instancesMetadata() + { + Metadata metadata = mock(Metadata.class); + when(metadata.getKeyspaces()).thenReturn(Collections.emptyList()); + + StorageOperations operations = mock(StorageOperations.class); + when(operations.clusterName()).thenReturn(CLUSTER); + + CassandraAdapterDelegate delegate = mock(CassandraAdapterDelegate.class); + when(delegate.storageOperations()).thenReturn(operations); + when(delegate.metadata()).thenReturn(metadata); + + InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); + when(instanceMetadata.stagingDir()).thenReturn(DIRECTORY); + when(instanceMetadata.id()).thenReturn(IDENTIFIER); + when(instanceMetadata.host()).thenReturn(LOCALHOST); + when(instanceMetadata.port()).thenReturn(PORT); + when(instanceMetadata.delegate()).thenReturn(delegate); + + InstancesMetadata instances = mock(InstancesMetadata.class); + when(instances.instances()).thenReturn(Collections.singletonList(instanceMetadata)); + when(instances.instanceFromId(IDENTIFIER)).thenReturn(instanceMetadata); + when(instances.instanceFromHost(LOCALHOST)).thenReturn(instanceMetadata); + return instances; + } + + @Provides + @Singleton + @NotNull + public EmitterFactory emitterFactory() + { + return () -> emitter; + } + } + + private final Injector injector = Guice.createInjector(Modules.override(new MainModule()).with( + Modules.override(new TestModule()).with( + new ReportSchemaHandlerTestModule()))); + private WebClient client; + private Server server; + private JsonEmitter emitter; + + @BeforeEach + void before() throws InterruptedException + { + client = WebClient.create(injector.getInstance(Vertx.class)); + server = injector.getInstance(Server.class); + + VertxTestContext context = new VertxTestContext(); + server.start() + .onSuccess(result -> context.completeNow()) + .onFailure(context::failNow); + context.awaitCompletion(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + } + + @AfterEach + @SuppressWarnings("ResultOfMethodCallIgnored") + @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED") + void after() throws InterruptedException + { + CountDownLatch latch = new CountDownLatch(1); + server.close() + .onSuccess(future -> latch.countDown()); + latch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + } + + @Test + @SuppressWarnings("deprecation") + void testSuccess(@NotNull VertxTestContext context) throws IOException + { + String expected = IOUtils.readFully("/datahub/empty_cluster.json"); + emitter = new JsonEmitter(); + + client.get(server.actualPort(), LOCALHOST, ENDPOINT) + .expect(ResponsePredicate.SC_OK) + .send(context.succeeding(response -> + { + assertThat(response.statusCode()) + .isEqualTo(HttpResponseStatus.OK.code()); + assertThat(emitter.content()) + .isEqualTo(expected); + context.completeNow(); + })); + } + + @Test + @SuppressWarnings("deprecation") + void testFailure(@NotNull VertxTestContext context) + { + String expected = "[\n]"; + emitter = new ThrowingEmitter(); + + client.get(server.actualPort(), LOCALHOST, ENDPOINT) + .expect(ResponsePredicate.SC_INTERNAL_SERVER_ERROR) + .send(context.succeeding(response -> + { + assertThat(response.statusCode()) + .isEqualTo(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()); + assertThat(emitter.content()) + .isEqualTo(expected); + context.completeNow(); + })); + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcherTest.java b/server/src/test/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcherTest.java index fbc8704f3..44b4f08b8 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcherTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcherTest.java @@ -21,7 +21,7 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.List; - +import com.google.common.collect.ImmutableList; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -32,16 +32,37 @@ import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadataImpl; import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; +import org.jetbrains.annotations.NotNull; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; +/** + * Unit tests for {@link InstanceMetadataFetcher} + */ class InstanceMetadataFetcherTest { @TempDir Path tempDir; + @Test + void testRunOnFirstAvailableInstance() + { + List instances = ImmutableList.of(instance(1, "127.0.0.1", false), + instance(2, "127.0.0.2", true), + instance(3, "127.0.0.3", true)); + InstancesMetadataImpl instancesMetadata = new InstancesMetadataImpl(instances, DnsResolver.DEFAULT); + InstanceMetadataFetcher fetcher = new InstanceMetadataFetcher(instancesMetadata); + + fetcher.runOnFirstAvailableInstance(metadata -> + assertThat(metadata.delegate()) + .describedAs("The delegate of instance 2 should be used") + .isNotNull() + .isSameAs(instances.get(1).delegate()) + ); + } + @Test void testCallOnFirstAvailableInstance() { @@ -50,11 +71,13 @@ void testCallOnFirstAvailableInstance() instance(3, "127.0.0.3", true)); InstancesMetadataImpl instancesMetadata = new InstancesMetadataImpl(instances, DnsResolver.DEFAULT); InstanceMetadataFetcher fetcher = new InstanceMetadataFetcher(instancesMetadata); + CassandraAdapterDelegate delegate = fetcher.callOnFirstAvailableInstance(InstanceMetadata::delegate); + assertThat(delegate) - .describedAs("The delegate of instance 2 should be returned") - .isNotNull() - .isSameAs(instances.get(1).delegate()); + .describedAs("The delegate of instance 2 should be returned") + .isNotNull() + .isSameAs(instances.get(1).delegate()); } @Test @@ -64,12 +87,14 @@ void testCallOnFirstAvailableInstanceExhausts() instance(2, "127.0.0.2", false)); InstancesMetadataImpl instancesMetadata = new InstancesMetadataImpl(instances, DnsResolver.DEFAULT); InstanceMetadataFetcher fetcher = new InstanceMetadataFetcher(instancesMetadata); + assertThatThrownBy(() -> fetcher.callOnFirstAvailableInstance(InstanceMetadata::delegate)) - .isExactlyInstanceOf(CassandraUnavailableException.class) - .hasMessageContaining("All local Cassandra nodes are exhausted. But none is available"); + .isExactlyInstanceOf(CassandraUnavailableException.class) + .hasMessageContaining("All local Cassandra nodes are exhausted. But none is available"); } - private InstanceMetadata instance(int id, String host, boolean isAvailable) + @NotNull + private InstanceMetadata instance(int id, @NotNull String host, boolean isAvailable) { InstanceMetadataImpl.Builder builder = InstanceMetadataImpl.builder() .id(id) @@ -79,7 +104,7 @@ private InstanceMetadata instance(int id, String host, boolean isAvailable) .metricRegistry(new MetricRegistry()); if (isAvailable) { - builder.delegate(mock(CassandraAdapterDelegate.class)); + builder = builder.delegate(mock(CassandraAdapterDelegate.class)); } return builder.build(); }