Skip to content
This repository has been archived by the owner on Mar 15, 2021. It is now read-only.

Commit

Permalink
[PubsubPluginSink] Make max input metrics into a config variable (#251)
Browse files Browse the repository at this point in the history
* Make max input metrics into a config variable

* Add note to docs

* Add default val to pubsub test
  • Loading branch information
lmuhlha authored Mar 1, 2021
1 parent a7b5286 commit 3fd963f
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class OutputManagerModule {
private static final Integer DEFAULT_MIN_FREQUENCY_MS_ALLOWED = 1000;
private static final Integer DEFAULT_MIN_NUMBER_OF_TRIGGERS = 5;
private static final Long DEFAULT_HIGH_FREQUENCY_DATA_RECYCLE_MS = 3_600_000L;
// Limit amount of input metrics to serialize
public static final Integer DEFAULT_MAX_INPUT_METRICS = 500_000;

private final List<OutputPlugin> plugins;
private final Filter filter;
Expand All @@ -69,6 +71,7 @@ public class OutputManagerModule {
private final int minNumberOfTriggers;
private final long highFrequencyDataRecycleMS;
@Nullable private final String dynamicTagsFile;
@Nullable private final int maxInputMetrics;

@JsonCreator
public OutputManagerModule(
Expand All @@ -81,7 +84,8 @@ public OutputManagerModule(
@JsonProperty("minFrequencyMillisAllowed") @Nullable Integer minFrequencyMillisAllowed,
@JsonProperty("minNumberOfTriggers") @Nullable Integer minNumberOfTriggers,
@JsonProperty("highFrequencyDataRecycleMS") @Nullable Long highFrequencyDataRecycleMS,
@JsonProperty("dynamicTagsFile") @Nullable String dynamicTagsFile) {
@JsonProperty("dynamicTagsFile") @Nullable String dynamicTagsFile,
@JsonProperty("maxInputMetrics") @Nullable Integer maxInputMetrics) {
this.plugins = Optional.ofNullable(plugins).orElse(DEFAULT_PLUGINS);
this.filter = Optional.ofNullable(filter).orElseGet(TrueFilter::new);
this.rateLimit = rateLimit;
Expand All @@ -97,6 +101,7 @@ public OutputManagerModule(
Optional.ofNullable(highFrequencyDataRecycleMS)
.orElse(DEFAULT_HIGH_FREQUENCY_DATA_RECYCLE_MS);
this.dynamicTagsFile = dynamicTagsFile;
this.maxInputMetrics = Optional.ofNullable(maxInputMetrics).orElse(DEFAULT_MAX_INPUT_METRICS);
}

//CHECKSTYLE:OFF:MethodLength
Expand Down Expand Up @@ -249,6 +254,11 @@ public String dynamicTagsFile() {
return dynamicTagsFile;
}

@Provides
@Singleton
@Named("maxInputMetrics")
public int maxInputMetrics() { return maxInputMetrics; }

@Override
protected void configure() {
bind(OutputManager.class).to(CoreOutputManager.class).in(Scopes.SINGLETON);
Expand Down Expand Up @@ -326,6 +336,6 @@ static Map<String, String> filterEnvironmentResourceTags(final Map<String, Strin

public static Supplier<OutputManagerModule> supplyDefault() {
return () -> new OutputManagerModule(null, null, null, null, null, null, null, null, null,
null);
null, null);
}
}
4 changes: 4 additions & 0 deletions docs/content/_docs/pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ based on request size, message count and time since last publish. The client by
* `publishDelayThresholdMs` - amount of type to wait for.


Amount of metrics allowed on input can be configured using `maxInputMetrics`.


Here is an example config of setting up the pubsub plugin.

```
output:
maxInputMetrics: 50000 # Default: 500_000
plugins:
- type: pubsub
project: google-test-project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
Expand Down Expand Up @@ -60,9 +61,6 @@
*/
public class PubsubPluginSink implements BatchablePluginSink {

// Limit amount of input metrics to serialize
private final static int MAX_INPUT_METRICS = 10_000;

// Pubsub publish request limit is 10MB
private final static double MAX_BATCH_SIZE_BYTES = 10_000_000.0;

Expand All @@ -85,6 +83,10 @@ public class PubsubPluginSink implements BatchablePluginSink {
OutputPluginStatistics statistics;
@Inject
Optional<Cache<String, Boolean>> cache;
// Limit amount of input metrics to serialize
@Inject
@Named("maxInputMetrics")
int maxInputMetrics;

@Override
public boolean isReady() {
Expand Down Expand Up @@ -122,7 +124,7 @@ public void onSuccess(String messageId) {
public AsyncFuture<Void> sendMetrics(Collection<Metric> metrics) {
logger.debug("Sending {} metrics", metrics.size());

if (metrics.size() < MAX_INPUT_METRICS) {
if (metrics.size() < maxInputMetrics) {
try {
final ByteString m = ByteString.copyFrom(serializer.serialize(metrics, writeCache));
if (m.size() > MAX_BATCH_SIZE_BYTES) {
Expand All @@ -148,7 +150,7 @@ public AsyncFuture<Void> sendMetrics(Collection<Metric> metrics) {
} else {
logger.info("Above input metric limit {}, size was {}, sample key: {}, sample tags: {}; "
+ "dropping metrics",
MAX_INPUT_METRICS,
maxInputMetrics,
metrics.size(),
metrics.iterator().next().getKey(),
metrics.iterator().next().getTags());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.spotify.ffwd.model.v2.Metric;
import com.spotify.ffwd.serializer.Spotify100ProtoSerializer;
import eu.toolchain.async.AsyncFramework;
import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -66,6 +65,7 @@ public void setUp() {
sink.serializer = new Spotify100ProtoSerializer();
sink.writeCache = new NoopCache();
sink.logger = LoggerFactory.getLogger(PubsubPluginSinkTest.class);
sink.maxInputMetrics = 500_000;

metric = makeMetric();

Expand Down

0 comments on commit 3fd963f

Please sign in to comment.