Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSSIDECAR-216: Capture Metrics for Schema Reporting #204

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
1.0.0
-----
* Create Endpoint that Triggers an Immediate Schema Report (CASSSIDECAR-203)
* Adapt to cluster topology change for restore jobs (CASSSIDECAR-185)
* Fix PeriodicTaskExecutor double execution due to race from reschedule (CASSSIDECAR-210)
* Upgrade Netty to 4.1.118.Final and Vert.x to 4.5.13 Version (CASSSIDECAR-207)
Expand All @@ -8,7 +9,7 @@
* Sidecar schema initialization can be executed on multiple thread (CASSSIDECAR-200)
* Make sidecar operations resilient to down Cassandra nodes (CASSSIDECAR-201)
* Fix Cassandra instance not found error (CASSSIDECAR-192)
* Implemented Schema Reporter for Integration with DataHub (CASSSIDECAR-191)
* Implement Schema Reporter for Integration with DataHub (CASSSIDECAR-191)
* Add sidecar endpoint to retrieve stream stats (CASSSIDECAR-180)
* Add sidecar endpoint to retrieve cassandra gossip health (CASSSIDECAR-173)
* Fix SidecarSchema stuck at initialization due to ClusterLeaseTask scheduling (CASSSIDECAR-189)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ public final class ApiEndpointsV1
public static final String LIST_CDC_SEGMENTS_ROUTE = API_V1 + CDC_PATH + "/segments";
public static final String STREAM_CDC_SEGMENTS_ROUTE = LIST_CDC_SEGMENTS_ROUTE + "/" + SEGMENT_PATH_PARAM;

// Schema Reporting
private static final String REPORT_SCHEMA = "/report-schema";
public static final String REPORT_SCHEMA_ROUTE = API_V1 + REPORT_SCHEMA;

public static final String CONNECTED_CLIENT_STATS_ROUTE = API_V1 + CASSANDRA + "/stats/connected-clients";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public class BasicPermissions
public static final Permission READ_OPERATIONAL_JOB = new DomainAwarePermission("OPERATIONAL_JOB:READ", OPERATION_SCOPE);
public static final Permission DECOMMISSION_NODE = new DomainAwarePermission("NODE:DECOMMISSION", OPERATION_SCOPE);

// Permissions related to Schema Reporting
public static final Permission REPORT_SCHEMA = new DomainAwarePermission("SCHEMA:REPORT", CLUSTER_SCOPE);

// cassandra cluster related permissions
public static final Permission READ_SCHEMA = new DomainAwarePermission("SCHEMA:READ", CLUSTER_SCOPE);
public static final Permission READ_SCHEMA_KEYSPACE_SCOPED = new DomainAwarePermission("SCHEMA:READ", KEYSPACE_SCOPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;

import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.TableMetadata;
Expand All @@ -32,7 +33,7 @@

/**
* An abstract class that has to be extended and instantiated for every Cassandra
* cluster that needs its schema converted into a DataHub-compliant format.
* cluster that needs its schema converted into a DataHub-compliant format
*/
public abstract class IdentifiersProvider
{
Expand All @@ -42,7 +43,26 @@ public abstract class IdentifiersProvider
protected static final String DATA_PLATFORM_INSTANCE = "dataPlatformInstance";
protected static final String CONTAINER = "container";
protected static final String DATASET = "dataset";
protected static final String PROD = "PROD"; // DataHub requires this to be {@code PROD} regardless
protected static final String PROD = "PROD"; // DataHub requires this to be {@code PROD}

protected static final ToStringStyle STYLE = new ToStringStyle()
{{
setUseShortClassName(false);
setUseClassName(true);
setUseIdentityHashCode(false);
setUseFieldNames(false);
setContentStart("(");
setFieldSeparatorAtStart(false);
setFieldSeparator(",");
setFieldSeparatorAtEnd(false);
setContentEnd(")");
setDefaultFullDetail(true);
setArrayContentDetail(true);
setArrayStart("(");
setArraySeparator(",");
setArrayEnd(")");
setNullText("null");
}};

/**
* A public getter method that returns the name of Cassandra Organization
Expand Down Expand Up @@ -231,13 +251,19 @@ public boolean equals(@Nullable Object other)
@NotNull
public String toString()
{
return new ToStringBuilder(this)
return new ToStringBuilder(this, STYLE)
.append(this.organization())
.append(this.platform())
.append(this.environment())
.append(this.application())
.append(this.cluster())
.append(this.identifier())
.toString();
.toString()
.replaceAll("\\s", "");
Comment on lines +254 to +262
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The toString is relatively complex. Is there a test?


// Use of a custom {@link ToStringStyle} implementation prevents the hash code from being
// included into the {@link String} representation; which, in conjunction with the removal
// of all whitespace characters, simplifies extraction of {@link IdentifierProvider}
// objects from the logs with Splunk; without negatively affecting the readability much
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
package org.apache.cassandra.sidecar.datahub;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.KeyspaceMetadata;
Expand All @@ -34,6 +38,9 @@
import datahub.client.Emitter;
import datahub.event.MetadataChangeProposalWrapper;
import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils;
import org.apache.cassandra.sidecar.metrics.DeltaGauge;
import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
import org.apache.cassandra.sidecar.metrics.server.SchemaReportingMetrics;
import org.jetbrains.annotations.NotNull;

/**
Expand All @@ -47,6 +54,8 @@
@Singleton
public class SchemaReporter
{
private static final Logger LOGGER = LoggerFactory.getLogger(SchemaReporter.class);

@NotNull
protected final IdentifiersProvider identifiersProvider;
@NotNull
Expand All @@ -57,6 +66,8 @@ public class SchemaReporter
protected final List<TableToAspectConverter<? extends RecordTemplate>> tableConverters;
@NotNull
protected final EmitterFactory emitterFactory;
@NotNull
protected final SchemaReportingMetrics reportingMetrics;

/**
* The public constructor that instantiates {@link SchemaReporter} with default configuration.
Expand All @@ -66,10 +77,12 @@ public class SchemaReporter
*
* @param identifiersProvider an instance of {@link IdentifiersProvider} to use
* @param emitterFactory an instance of {@link EmitterFactory} to use
* @param sidecarMetrics an instance of {@link SidecarMetrics} to obtain {@link SchemaReportingMetrics} from
*/
@Inject
public SchemaReporter(@NotNull IdentifiersProvider identifiersProvider,
@NotNull EmitterFactory emitterFactory)
@NotNull EmitterFactory emitterFactory,
@NotNull SidecarMetrics sidecarMetrics)
{
this(identifiersProvider,
ImmutableList.of(new ClusterToDataPlatformInfoConverter(identifiersProvider),
Expand All @@ -85,7 +98,8 @@ public SchemaReporter(@NotNull IdentifiersProvider identifiersProvider,
new TableToDataPlatformInstanceConverter(identifiersProvider),
new TableToBrowsePathsV2Converter(identifiersProvider),
new TableToBrowsePathsConverter(identifiersProvider)),
emitterFactory);
emitterFactory,
sidecarMetrics.server().schemaReporting());
}

/**
Expand All @@ -96,35 +110,73 @@ public SchemaReporter(@NotNull IdentifiersProvider identifiersProvider,
* @param keyspaceConverters a {@link List} of {@link KeyspaceToAspectConverter} instances to use
* @param tableConverters a {@link List} of {@link TableToAspectConverter} instances to use
* @param emitterFactory an instance of {@link EmitterFactory} to use
* @param reportingMetrics an instance of {@link SchemaReportingMetrics} to use
*/
protected SchemaReporter(@NotNull IdentifiersProvider identifiersProvider,
@NotNull List<ClusterToAspectConverter<? extends RecordTemplate>> clusterConverters,
@NotNull List<KeyspaceToAspectConverter<? extends RecordTemplate>> keyspaceConverters,
@NotNull List<TableToAspectConverter<? extends RecordTemplate>> tableConverters,
@NotNull EmitterFactory emitterFactory)
@NotNull EmitterFactory emitterFactory,
@NotNull SchemaReportingMetrics reportingMetrics)
{
this.identifiersProvider = identifiersProvider;
this.clusterConverters = clusterConverters;
this.keyspaceConverters = keyspaceConverters;
this.tableConverters = tableConverters;
this.emitterFactory = emitterFactory;
this.reportingMetrics = reportingMetrics;
}

/**
* Public method for converting and reporting the Cassandra schema when triggered by a scheduled periodic task
*
* @param cluster the {@link Cluster} to extract Cassandra schema from
*/
public void processScheduled(@NotNull Cluster cluster)
{
process(cluster.getMetadata(), reportingMetrics.startedSchedule.metric);
}

/**
* Public method for converting and reporting the Cassandra schema
* Public method for converting and reporting the Cassandra schema when triggered by a received API request
*
* @param cluster a {@link Cluster} to extract Cassandra schema from
* @param metadata the {@link Metadata} to extract Cassandra schema from
*/
public void process(@NotNull Cluster cluster)
public void processRequested(@NotNull Metadata metadata)
{
process(metadata, reportingMetrics.startedRequest.metric);
}

/**
* Private method for converting and reporting the Cassandra schema
*
* @param metadata the {@link Metadata} to extract Cassandra schema from
* @param started the {@link DeltaGauge} for the metric counting invocations
*/
private void process(@NotNull Metadata metadata,
@NotNull DeltaGauge started)
{
String action = " reporting schema for cluster with identifiers " + identifiersProvider;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can remove the heading space. It should make the error message in the RuntimeException look a bit nicer too.

LOGGER.info("Started" + action);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the value placeholder of logger to eval lazily. Update the other places too.

Suggested change
LOGGER.info("Started" + action);
LOGGER.info("Started {}", action);

started.increment();

try (Emitter emitter = emitterFactory.emitter())
{
stream(cluster.getMetadata())
.forEach(ThrowableUtils.consumer(emitter::emit));
Stopwatch stopwatch = Stopwatch.createStarted();
long counter = stream(metadata)
.map(ThrowableUtils.function(emitter::emit))
.count();

reportingMetrics.durationMilliseconds.metric.update(stopwatch.elapsed(TimeUnit.MILLISECONDS));
reportingMetrics.sizeAspects.metric.update(counter);
reportingMetrics.finishedSuccess.metric.increment();
LOGGER.info("Success" + action);
}
catch (Exception exception)
{
throw new RuntimeException("Cannot extract schema for cluster " + identifiersProvider.cluster(), exception);
reportingMetrics.finishedFailure.metric.increment();
LOGGER.error("Failure" + action);
throw new RuntimeException(action, exception);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void execute(Promise<Void> promise)
{
try
{
reporter.process(session.get().getCluster());
reporter.processScheduled(session.get().getCluster());
promise.complete();
}
catch (Throwable throwable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,15 @@ public class DeltaGauge implements Gauge<Long>, Metric

public DeltaGauge()
{
this.count = new AtomicLong();
this.count = new AtomicLong(0L);
}

/**
* Increments the cumulative value tracked by this {@link DeltaGauge}
*/
public void increment()
{
count.incrementAndGet();
}

/**
Expand All @@ -55,6 +63,6 @@ public void update(long delta)
@Override
public Long getValue()
{
return count.getAndSet(0);
return count.getAndSet(0L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.cassandra.sidecar.metrics;

import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
import org.apache.cassandra.sidecar.metrics.server.SchemaReportingMetrics;

import static org.apache.cassandra.sidecar.metrics.SidecarMetrics.APP_PREFIX;

Expand Down Expand Up @@ -49,6 +50,11 @@ public interface ServerMetrics
*/
SchemaMetrics schema();

/**
* @return metrics for the schema reporting
*/
SchemaReportingMetrics schemaReporting();

/**
* @return metrics related to internal caches that are tracked.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Objects;

import com.codahale.metrics.MetricRegistry;
import org.apache.cassandra.sidecar.metrics.server.SchemaReportingMetrics;

/**
* {@link ServerMetrics} tracks metrics related to Sidecar server.
Expand All @@ -32,6 +33,7 @@ public class ServerMetricsImpl implements ServerMetrics
protected final ResourceMetrics resourceMetrics;
protected final RestoreMetrics restoreMetrics;
protected final SchemaMetrics schemaMetrics;
protected final SchemaReportingMetrics schemaReportingMetrics;
protected final CacheMetrics cacheMetrics;
protected final CoordinationMetrics coordinationMetrics;

Expand All @@ -43,6 +45,7 @@ public ServerMetricsImpl(MetricRegistry metricRegistry)
this.resourceMetrics = new ResourceMetrics(metricRegistry);
this.restoreMetrics = new RestoreMetrics(metricRegistry);
this.schemaMetrics = new SchemaMetrics(metricRegistry);
this.schemaReportingMetrics = new SchemaReportingMetrics(metricRegistry);
this.cacheMetrics = new CacheMetrics(metricRegistry);
this.coordinationMetrics = new CoordinationMetrics(metricRegistry);
}
Expand Down Expand Up @@ -71,6 +74,12 @@ public SchemaMetrics schema()
return schemaMetrics;
}

@Override
public SchemaReportingMetrics schemaReporting()
{
return schemaReportingMetrics;
}

@Override
public CacheMetrics cache()
{
Expand Down
Loading