From 27f4a5ca307b0d5c6a01604d02b06b57b90dd2c3 Mon Sep 17 00:00:00 2001 From: yashaswaj Date: Fri, 17 Jul 2020 05:31:19 -0700 Subject: [PATCH] Introduce CustomMetricReporter --- .../release-0.3.14-(next-release).rst | 1 + .../common/metrics/BookKeeperMetrics.java | 17 ++-- .../metrics/CachingFileSystemMetrics.java | 45 ++++++++++ .../common/metrics/CustomMetricsReporter.java | 22 +++++ .../CustomMetricsReporterProvider.java | 86 +++++++++++++++++++ .../common/metrics/MetricsReporterType.java | 3 +- .../rubix/common/metrics/NoOpReporter.java | 32 +++++++ .../rubix/common/utils/ClusterUtil.java | 14 +++ .../rubix/common/utils/TestClusterUtil.java | 15 ++++ .../rubix/core/CachedReadRequestChain.java | 4 +- .../qubole/rubix/core/CachingFileSystem.java | 2 + .../qubole/rubix/core/CachingInputStream.java | 3 + .../rubix/core/NonLocalReadRequestChain.java | 3 + .../com/qubole/rubix/spi/CacheConfig.java | 12 +++ 14 files changed, 249 insertions(+), 10 deletions(-) create mode 100644 rubix-common/src/main/java/com/qubole/rubix/common/metrics/CachingFileSystemMetrics.java create mode 100644 rubix-common/src/main/java/com/qubole/rubix/common/metrics/CustomMetricsReporter.java create mode 100644 rubix-common/src/main/java/com/qubole/rubix/common/metrics/CustomMetricsReporterProvider.java create mode 100644 rubix-common/src/main/java/com/qubole/rubix/common/metrics/NoOpReporter.java diff --git a/docs/release/release_notes/release-0.3.14-(next-release).rst b/docs/release/release_notes/release-0.3.14-(next-release).rst index a1bc8653..9466448a 100644 --- a/docs/release/release_notes/release-0.3.14-(next-release).rst +++ b/docs/release/release_notes/release-0.3.14-(next-release).rst @@ -6,3 +6,4 @@ Fixes and Features ------------------ * Fixed a regression from 0.3.11 which slows down split generation. * Jmx stats refactoring to for better accounting of stats. +* Added support to plug in custom reporter for metrics that can send metrics to custom sinks. It can be set used by setting `rubix.metrics.reporters=CUSTOM` and providing implementation class using `rubix.metric-collector.impl`. \ No newline at end of file diff --git a/rubix-common/src/main/java/com/qubole/rubix/common/metrics/BookKeeperMetrics.java b/rubix-common/src/main/java/com/qubole/rubix/common/metrics/BookKeeperMetrics.java index 75b65180..251e214f 100644 --- a/rubix-common/src/main/java/com/qubole/rubix/common/metrics/BookKeeperMetrics.java +++ b/rubix-common/src/main/java/com/qubole/rubix/common/metrics/BookKeeperMetrics.java @@ -16,7 +16,6 @@ import com.codahale.metrics.JmxReporter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.ganglia.GangliaReporter; -import com.google.common.base.Splitter; import com.qubole.rubix.common.utils.ClusterUtil; import com.qubole.rubix.spi.CacheConfig; import com.readytalk.metrics.StatsDReporter; @@ -28,9 +27,12 @@ import java.io.Closeable; import java.io.IOException; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import static com.qubole.rubix.common.utils.ClusterUtil.getMetricsReporters; + public class BookKeeperMetrics implements AutoCloseable { private static Log log = LogFactory.getLog(BookKeeperMetrics.class); @@ -63,12 +65,7 @@ public MetricRegistry getMetricsRegistry() */ protected void initializeReporters() { - final Iterable metricsReporterNames = Splitter.on(",").trimResults().omitEmptyStrings().split(CacheConfig.getMetricsReporters(conf)); - - final Set metricsReporterTypes = new HashSet<>(); - for (String reporterName : metricsReporterNames) { - metricsReporterTypes.add(MetricsReporterType.valueOf(reporterName.toUpperCase())); - } + final Set metricsReporterTypes = getMetricsReporters(conf); for (MetricsReporterType reporter : metricsReporterTypes) { switch (reporter) { @@ -111,6 +108,10 @@ protected void initializeReporters() gangliaReporter.start(CacheConfig.getMetricsReportingInterval(conf), TimeUnit.MILLISECONDS); reporters.add(gangliaReporter); break; + case CUSTOM: + CustomMetricsReporterProvider.initialize(conf, Optional.of(metrics)); + reporters.add(CustomMetricsReporterProvider.getCustomMetricsReporter()); + break; } } } @@ -216,7 +217,7 @@ public enum CacheMetric ASYNC_QUEUE_SIZE_GAUGE("rubix.bookkeeper.gauge.async_queue_size"), ASYNC_DOWNLOADED_MB_COUNT("rubix.bookkeeper.count.async_downloaded_mb"), ASYNC_DOWNLOAD_TIME_COUNT("rubix.bookkeeper.count.async_download_time"), - LDTS_CACHING_EXCEPTION("caching_exception_while_transferring_data"); + LDTS_CACHING_EXCEPTION("rubix.ldts.exception.trasnsferdata"); private final String metricName; diff --git a/rubix-common/src/main/java/com/qubole/rubix/common/metrics/CachingFileSystemMetrics.java b/rubix-common/src/main/java/com/qubole/rubix/common/metrics/CachingFileSystemMetrics.java new file mode 100644 index 00000000..5b6eead3 --- /dev/null +++ b/rubix-common/src/main/java/com/qubole/rubix/common/metrics/CachingFileSystemMetrics.java @@ -0,0 +1,45 @@ +/** + * Copyright (c) 2019. Qubole Inc + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ +package com.qubole.rubix.common.metrics; + +import java.util.HashMap; +import java.util.Map; + +public enum CachingFileSystemMetrics { + LOCAL_FALLBACK_TO_DIRECT_READ("rubix_local_cache_fallback_direct_read"), + NON_LOCAL_FALLBACK_TO_DIRECT_READ("rubix_non_local_cache_fallback_direct_read"), + POSITIONAL_READ_FAILURE("rubix_positional_read_failure"); + + private final String metricName; + //reverse lookup map for metric. + private static final Map lookup = new HashMap<>(); + + CachingFileSystemMetrics(String metricName) + { + this.metricName = metricName; + } + + public String getMetricName() + { + return metricName; + } + + static { + for(CachingFileSystemMetrics s : CachingFileSystemMetrics.values()) + lookup.put(s.getMetricName(), s); + } + + public static CachingFileSystemMetrics get(String enumString) { + return lookup.get(enumString); + } +} diff --git a/rubix-common/src/main/java/com/qubole/rubix/common/metrics/CustomMetricsReporter.java b/rubix-common/src/main/java/com/qubole/rubix/common/metrics/CustomMetricsReporter.java new file mode 100644 index 00000000..e767f2aa --- /dev/null +++ b/rubix-common/src/main/java/com/qubole/rubix/common/metrics/CustomMetricsReporter.java @@ -0,0 +1,22 @@ +/** + * Copyright (c) 2019. Qubole Inc + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ +package com.qubole.rubix.common.metrics; + +import java.io.Closeable; + +public interface CustomMetricsReporter extends Closeable { + + public void start(); + + public void addMetric(CachingFileSystemMetrics cachingFileSystemMetrics); +} diff --git a/rubix-common/src/main/java/com/qubole/rubix/common/metrics/CustomMetricsReporterProvider.java b/rubix-common/src/main/java/com/qubole/rubix/common/metrics/CustomMetricsReporterProvider.java new file mode 100644 index 00000000..489c7ef7 --- /dev/null +++ b/rubix-common/src/main/java/com/qubole/rubix/common/metrics/CustomMetricsReporterProvider.java @@ -0,0 +1,86 @@ +/** + * Copyright (c) 2019. Qubole Inc + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ +package com.qubole.rubix.common.metrics; + +import com.codahale.metrics.MetricRegistry; +import com.qubole.rubix.spi.CacheConfig; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import static com.qubole.rubix.common.utils.ClusterUtil.getMetricsReporters; + +public class CustomMetricsReporterProvider { + private static final Log log = LogFactory.getLog(CustomMetricsReporterProvider.class); + + private static volatile AtomicReference reporterRunning = new AtomicReference<>(); + + private CustomMetricsReporter customMetricsReporter; + private static volatile CustomMetricsReporterProvider customMetricsReporterProvider; + + private CustomMetricsReporterProvider(CustomMetricsReporter customMetricsReporter) { + this.customMetricsReporter = customMetricsReporter; + } + + public static void initialize(Configuration configuration) + { + initialize(configuration, Optional.empty()); + } + + public static void initialize(Configuration configuration, Optional metricRegistry) { + if (customMetricsReporterProvider == null) { + synchronized (CustomMetricsReporterProvider.class) { + if (customMetricsReporterProvider == null) { + String className = CacheConfig.getRubixMetricCollectorImpl(configuration); + // check if custom reporter is enabled: Check here for CFS metrics Reporter. + boolean useCustomReporter = getMetricsReporters(configuration).contains(MetricsReporterType.CUSTOM); + CustomMetricsReporter customMetricsReporter; + if (useCustomReporter && !className.equals("com.qubole.rubix.common.metrics.NoOpReporter")) { + try { + Class collectorClass = Class.forName(className); + log.info(String.format("Using class for metric reporting: %s", className)); + customMetricsReporter = (CustomMetricsReporter) collectorClass.getDeclaredConstructor(Configuration.class, Optional.class) + .newInstance(configuration, metricRegistry); + } catch (Exception e) { + log.warn("External Metric Reporter class: %s can not be initialized: ", e); + customMetricsReporter = new NoOpReporter(); + } + } else { + customMetricsReporter = new NoOpReporter(); + } + customMetricsReporterProvider = new CustomMetricsReporterProvider(customMetricsReporter); + } + } + } + } + + public static CustomMetricsReporter getCustomMetricsReporter() { + if (reporterRunning.get() == null) { + synchronized (reporterRunning) { + if (reporterRunning.get() == null) { + try { + customMetricsReporterProvider.customMetricsReporter.start(); + } catch (Exception e) { + log.warn("Exception in starting Custom reporter: ", e); + customMetricsReporterProvider.customMetricsReporter = new NoOpReporter(); + } + reporterRunning.set(true); + } + } + } + return customMetricsReporterProvider.customMetricsReporter; + } +} diff --git a/rubix-common/src/main/java/com/qubole/rubix/common/metrics/MetricsReporterType.java b/rubix-common/src/main/java/com/qubole/rubix/common/metrics/MetricsReporterType.java index 28050125..a04e73b2 100644 --- a/rubix-common/src/main/java/com/qubole/rubix/common/metrics/MetricsReporterType.java +++ b/rubix-common/src/main/java/com/qubole/rubix/common/metrics/MetricsReporterType.java @@ -17,5 +17,6 @@ public enum MetricsReporterType { STATSD, JMX, - GANGLIA + GANGLIA, + CUSTOM } diff --git a/rubix-common/src/main/java/com/qubole/rubix/common/metrics/NoOpReporter.java b/rubix-common/src/main/java/com/qubole/rubix/common/metrics/NoOpReporter.java new file mode 100644 index 00000000..cd39d461 --- /dev/null +++ b/rubix-common/src/main/java/com/qubole/rubix/common/metrics/NoOpReporter.java @@ -0,0 +1,32 @@ +/** + * Copyright (c) 2019. Qubole Inc + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ +package com.qubole.rubix.common.metrics; + +import java.io.IOException; + +public class NoOpReporter implements CustomMetricsReporter { + public NoOpReporter() { + } + + @Override + public void start() { + } + + @Override + public void addMetric(CachingFileSystemMetrics cachingFileSystemMetrics) { + } + + @Override + public void close() throws IOException { + } +} diff --git a/rubix-common/src/main/java/com/qubole/rubix/common/utils/ClusterUtil.java b/rubix-common/src/main/java/com/qubole/rubix/common/utils/ClusterUtil.java index aa9c501f..11c5797e 100644 --- a/rubix-common/src/main/java/com/qubole/rubix/common/utils/ClusterUtil.java +++ b/rubix-common/src/main/java/com/qubole/rubix/common/utils/ClusterUtil.java @@ -13,6 +13,7 @@ package com.qubole.rubix.common.utils; +import com.qubole.rubix.common.metrics.MetricsReporterType; import com.qubole.rubix.spi.CacheConfig; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -20,7 +21,10 @@ import org.apache.hadoop.fs.Path; import java.io.File; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class ClusterUtil @@ -85,4 +89,14 @@ public static Configuration applyRubixSiteConfig(Configuration conf) return conf; } + + public static Set getMetricsReporters(Configuration configuration) + { + return Stream.of(CacheConfig.getMetricsReporters(configuration).split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .map(String::toUpperCase) + .map(MetricsReporterType::valueOf) + .collect(Collectors.toSet()); + } } diff --git a/rubix-common/src/test/java/com/qubole/rubix/common/utils/TestClusterUtil.java b/rubix-common/src/test/java/com/qubole/rubix/common/utils/TestClusterUtil.java index 602dfb58..b77e727c 100644 --- a/rubix-common/src/test/java/com/qubole/rubix/common/utils/TestClusterUtil.java +++ b/rubix-common/src/test/java/com/qubole/rubix/common/utils/TestClusterUtil.java @@ -13,17 +13,21 @@ package com.qubole.rubix.common.utils; +import com.qubole.rubix.common.metrics.MetricsReporterType; import com.qubole.rubix.spi.CacheConfig; import org.apache.hadoop.conf.Configuration; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import static com.qubole.rubix.common.utils.ClusterUtil.getMetricsReporters; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; public class TestClusterUtil { @@ -129,4 +133,15 @@ public void testEmptyRubixSite() CacheConfig.setRubixSiteLocation(configuration, rubixSiteXmlName); Assert.assertThrows(Exception.class, () -> ClusterUtil.applyRubixSiteConfig(configuration)); } + + @Test + public void testGetMetricReportors() + { + Configuration conf = new Configuration(); + CacheConfig.setMetricsReporters(conf, "JmX, , ganglia , , ,,"); + Set reporterSet = getMetricsReporters(conf); + assertEquals(reporterSet.size(), 2, "Number of reporter not correct"); + assertTrue(reporterSet.contains(MetricsReporterType.JMX), "Metrics reporters not resolved correctly"); + assertTrue(reporterSet.contains(MetricsReporterType.GANGLIA), "Metrics reporters not resolved correctly"); + } } diff --git a/rubix-core/src/main/java/com/qubole/rubix/core/CachedReadRequestChain.java b/rubix-core/src/main/java/com/qubole/rubix/core/CachedReadRequestChain.java index 86d8f647..5e89621a 100644 --- a/rubix-core/src/main/java/com/qubole/rubix/core/CachedReadRequestChain.java +++ b/rubix-core/src/main/java/com/qubole/rubix/core/CachedReadRequestChain.java @@ -13,6 +13,8 @@ package com.qubole.rubix.core; import com.google.common.annotations.VisibleForTesting; +import com.qubole.rubix.common.metrics.CachingFileSystemMetrics; +import com.qubole.rubix.common.metrics.CustomMetricsReporterProvider; import com.qubole.rubix.spi.BookKeeperFactory; import com.qubole.rubix.spi.CacheUtil; import com.qubole.rubix.spi.RetryingPooledBookkeeperClient; @@ -148,8 +150,8 @@ public Long call() throws IOException if (ex instanceof CancelledException) { throw ex; } - log.error(String.format("Fall back to read from object store for %s .Could not read data from cached file : ", localCachedFile), ex); + CustomMetricsReporterProvider.getCustomMetricsReporter().addMetric(CachingFileSystemMetrics.LOCAL_FALLBACK_TO_DIRECT_READ); needsInvalidation = true; directDataRead = readFromRemoteFileSystem(); return directDataRead; diff --git a/rubix-core/src/main/java/com/qubole/rubix/core/CachingFileSystem.java b/rubix-core/src/main/java/com/qubole/rubix/core/CachingFileSystem.java index 6067892f..33e9129f 100644 --- a/rubix-core/src/main/java/com/qubole/rubix/core/CachingFileSystem.java +++ b/rubix-core/src/main/java/com/qubole/rubix/core/CachingFileSystem.java @@ -14,6 +14,7 @@ import com.google.common.base.Strings; import com.google.common.base.Throwables; +import com.qubole.rubix.common.metrics.CustomMetricsReporterProvider; import com.qubole.rubix.spi.BookKeeperFactory; import com.qubole.rubix.spi.CacheConfig; import com.qubole.rubix.spi.ClusterManager; @@ -189,6 +190,7 @@ public void initialize(URI uri, Configuration conf) throws IOException conf = applyRubixSiteConfig(conf); initialize(conf, getClusterType()); super.initialize(getOriginalURI(uri), conf); + CustomMetricsReporterProvider.initialize(conf); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this); isRubixSchemeUsed = uri.getScheme().equals(CacheConfig.RUBIX_SCHEME); diff --git a/rubix-core/src/main/java/com/qubole/rubix/core/CachingInputStream.java b/rubix-core/src/main/java/com/qubole/rubix/core/CachingInputStream.java index a4d85986..602a42af 100644 --- a/rubix-core/src/main/java/com/qubole/rubix/core/CachingInputStream.java +++ b/rubix-core/src/main/java/com/qubole/rubix/core/CachingInputStream.java @@ -17,6 +17,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.qubole.rubix.common.metrics.CustomMetricsReporterProvider; import com.qubole.rubix.spi.BookKeeperFactory; import com.qubole.rubix.spi.CacheConfig; import com.qubole.rubix.spi.ClusterType; @@ -47,6 +48,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import static com.qubole.rubix.common.metrics.CachingFileSystemMetrics.POSITIONAL_READ_FAILURE; import static com.qubole.rubix.spi.CacheUtil.UNKONWN_GENERATION_NUMBER; import static org.apache.hadoop.fs.FSExceptionMessages.NEGATIVE_SEEK; @@ -185,6 +187,7 @@ public int read(byte[] buffer, int offset, int length) } catch (Exception e) { log.error(String.format("Failed to read from rubix for file %s position %d length %d. Falling back to remote", remotePath, nextReadPosition, length), e); + CustomMetricsReporterProvider.getCustomMetricsReporter().addMetric(POSITIONAL_READ_FAILURE); getParentDataInputStream().seek(nextReadPosition); int read = readFullyDirect(buffer, offset, length); if (read > 0) { diff --git a/rubix-core/src/main/java/com/qubole/rubix/core/NonLocalReadRequestChain.java b/rubix-core/src/main/java/com/qubole/rubix/core/NonLocalReadRequestChain.java index 7da5118b..9ddfa1cb 100644 --- a/rubix-core/src/main/java/com/qubole/rubix/core/NonLocalReadRequestChain.java +++ b/rubix-core/src/main/java/com/qubole/rubix/core/NonLocalReadRequestChain.java @@ -13,6 +13,7 @@ package com.qubole.rubix.core; import com.google.common.base.Throwables; +import com.qubole.rubix.common.metrics.CustomMetricsReporterProvider; import com.qubole.rubix.spi.BookKeeperFactory; import com.qubole.rubix.spi.CacheConfig; import com.qubole.rubix.spi.DataTransferClientHelper; @@ -35,6 +36,7 @@ import static com.google.common.base.Preconditions.checkState; import static com.qubole.rubix.spi.CacheUtil.DUMMY_MODE_GENERATION_NUMBER; import static com.qubole.rubix.spi.CacheUtil.UNKONWN_GENERATION_NUMBER; +import static com.qubole.rubix.common.metrics.CachingFileSystemMetrics.NON_LOCAL_FALLBACK_TO_DIRECT_READ; import static com.qubole.rubix.spi.DataTransferClientFactory.DataTransferClient; import static com.qubole.rubix.spi.DataTransferClientFactory.getClient; @@ -149,6 +151,7 @@ public Long call() } else { log.warn("Error in reading from node: " + remoteNodeName + " Using direct reads", e); + CustomMetricsReporterProvider.getCustomMetricsReporter().addMetric(NON_LOCAL_FALLBACK_TO_DIRECT_READ); return directReadRequest(readRequests.indexOf(readRequest)); } } diff --git a/rubix-spi/src/main/java/com/qubole/rubix/spi/CacheConfig.java b/rubix-spi/src/main/java/com/qubole/rubix/spi/CacheConfig.java index 14137e13..0aa4700b 100644 --- a/rubix-spi/src/main/java/com/qubole/rubix/spi/CacheConfig.java +++ b/rubix-spi/src/main/java/com/qubole/rubix/spi/CacheConfig.java @@ -43,6 +43,7 @@ public class CacheConfig private static final String KEY_RUBIX_SITE_CONFIG_APPLIED = "ruibx.site.config.applied"; private static final String KEY_BLOCK_SIZE = "rubix.cache.block.size"; private static final String KEY_CACHE_ENABLED = "rubix.cache.enabled"; + private static final String RUBIX_METRIC_COLLECTOR_IMPL = "rubix.metric-collector.impl"; private static final String KEY_DATA_CACHE_ENABLED_ON_MASTER = "rubix.cache.enabled-on-master"; private static final String KEY_CACHE_METADATA_FILE_SUFFIX = "rubix.cache.metadata.file.suffix"; private static final String KEY_SERVER_CONNECT_TIMEOUT = "rubix.network.server.connect.timeout"; @@ -170,6 +171,7 @@ public class CacheConfig private static final String DEFAULT_PRESTOSQL_CLUSTER_MANAGER = "com.qubole.rubix.prestosql.PrestoClusterManager"; private static final String DEFAULT_HADOOP_CLUSTER_MANAGER = "com.qubole.rubix.hadoop2.Hadoop2ClusterManager"; private static final String DEFAULT_DUMMY_CLUSTER_MANAGER = "com.qubole.rubix.core.utils.DummyClusterManager"; + private static final String DEFAULT_METRIC_REPORTER_IMPL = "com.qubole.rubix.common.metrics.NoOpReporter"; private static final boolean DEFAULT_ENABLE_FILE_STALESSNESS_CHECK = true; private static final int DEFAULT_STALE_FILEINFO_EXPIRY_PERIOD = 36000; // seconds private static final boolean DEFAULT_CLEANUP_FILES_DURING_START = true; @@ -557,6 +559,11 @@ public static String getCoordinatorHostName(Configuration conf) return conf.get(KEY_RUBIX_CLUSTER_MASTER_HOSTNAME, null); } + public static String getRubixMetricCollectorImpl(Configuration conf) + { + return conf.get(RUBIX_METRIC_COLLECTOR_IMPL, DEFAULT_METRIC_REPORTER_IMPL); + } + public static String getResourceManagerAddress(Configuration conf) { return conf.get(KEY_YARN_RESOURCEMANAGER_ADDRESS, null); @@ -622,6 +629,11 @@ public static void setCacheDataLocationWhitelist(Configuration conf, String whit conf.set(KEY_DATA_CACHE_LOCATION_WHITELIST, whitelist); } + public static void setMetricReporterImpl(Configuration conf, String className) + { + conf.set(RUBIX_METRIC_COLLECTOR_IMPL, className); + } + public static void setCacheDataLocationBlacklist(Configuration conf, String blacklist) { conf.set(KEY_DATA_CACHE_LOCATION_BLACKLIST, blacklist);