diff --git a/osbenchmark/telemetry.py b/osbenchmark/telemetry.py index 234c2b316..6a6f74165 100644 --- a/osbenchmark/telemetry.py +++ b/osbenchmark/telemetry.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -36,15 +36,29 @@ from osbenchmark.utils import io, sysstats, console, opts, process from osbenchmark.utils.versions import components + def list_telemetry(): console.println("Available telemetry devices:\n") - devices = [[device.command, device.human_name, device.help] for device in [JitCompiler, Gc, FlightRecorder, - Heapdump, NodeStats, RecoveryStats, - CcrStats, SegmentStats, TransformStats, - SearchableSnapshotsStats, - SegmentReplicationStats]] + devices = [ + [device.command, device.human_name, device.help] + for device in [ + JitCompiler, + Gc, + FlightRecorder, + Heapdump, + NodeStats, + RecoveryStats, + CcrStats, + SegmentStats, + TransformStats, + SearchableSnapshotsStats, + SegmentReplicationStats, + ] + ] console.println(tabulate.tabulate(devices, ["Command", "Name", "Description"])) - console.println("\nKeep in mind that each telemetry device may incur a runtime overhead which can skew results.") + console.println( + "\nKeep in mind that each telemetry device may incur a runtime overhead which can skew results." + ) class Telemetry: @@ -105,6 +119,7 @@ def _enabled(self, device): # ######################################################################################## + class TelemetryDevice: def __init__(self): self.logger = logging.getLogger(__name__) @@ -161,7 +176,9 @@ def run(self): self.recorder.record() time.sleep(self.recorder.sample_interval) except BaseException: - logging.getLogger(__name__).exception("Could not determine %s", self.recorder) + logging.getLogger(__name__).exception( + "Could not determine %s", self.recorder + ) class FlightRecorder(TelemetryDevice): @@ -182,17 +199,36 @@ def instrument_java_opts(self): # JFR was integrated into OpenJDK 11 and is not a commercial feature anymore. if self.java_major_version < 11: - console.println("\n***************************************************************************\n") - console.println("[WARNING] Java flight recorder is a commercial feature of the Oracle JDK.\n") - console.println("You are using Java flight recorder which requires that you comply with\nthe licensing terms stated in:\n") - console.println(console.format.link("http://www.oracle.com/technetwork/java/javase/terms/license/index.html")) - console.println("\nBy using this feature you confirm that you comply with these license terms.\n") - console.println("Otherwise, please abort and rerun Benchmark without the \"jfr\" telemetry device.") - console.println("\n***************************************************************************\n") + console.println( + "\n***************************************************************************\n" + ) + console.println( + "[WARNING] Java flight recorder is a commercial feature of the Oracle JDK.\n" + ) + console.println( + "You are using Java flight recorder which requires that you comply with\nthe licensing terms stated in:\n" + ) + console.println( + console.format.link( + "http://www.oracle.com/technetwork/java/javase/terms/license/index.html" + ) + ) + console.println( + "\nBy using this feature you confirm that you comply with these license terms.\n" + ) + console.println( + 'Otherwise, please abort and rerun Benchmark without the "jfr" telemetry device.' + ) + console.println( + "\n***************************************************************************\n" + ) time.sleep(3) - console.info("%s: Writing flight recording to [%s]" % (self.human_name, log_file), logger=self.logger) + console.info( + "%s: Writing flight recording to [%s]" % (self.human_name, log_file), + logger=self.logger, + ) java_opts = self.java_opts(log_file) @@ -208,17 +244,27 @@ def java_opts(self, log_file): if self.java_major_version < 9: java_opts.append("-XX:+FlightRecorder") - java_opts.append("-XX:FlightRecorderOptions=disk=true,maxage=0s,maxsize=0,dumponexit=true,dumponexitpath={}".format(log_file)) + java_opts.append( + "-XX:FlightRecorderOptions=disk=true,maxage=0s,maxsize=0,dumponexit=true,dumponexitpath={}".format( + log_file + ) + ) jfr_cmd = "-XX:StartFlightRecording=defaultrecording=true" if recording_template: - self.logger.info("jfr: Using recording template [%s].", recording_template) + self.logger.info( + "jfr: Using recording template [%s].", recording_template + ) jfr_cmd += ",settings={}".format(recording_template) else: self.logger.info("jfr: Using default recording template.") else: - jfr_cmd += "-XX:StartFlightRecording=maxsize=0,maxage=0s,disk=true,dumponexit=true,filename={}".format(log_file) + jfr_cmd += "-XX:StartFlightRecording=maxsize=0,maxage=0s,disk=true,dumponexit=true,filename={}".format( + log_file + ) if recording_template: - self.logger.info("jfr: Using recording template [%s].", recording_template) + self.logger.info( + "jfr: Using recording template [%s].", recording_template + ) jfr_cmd += ",settings={}".format(recording_template) else: self.logger.info("jfr: Using default recording template.") @@ -239,9 +285,17 @@ def __init__(self, log_root): def instrument_java_opts(self): io.ensure_dir(self.log_root) log_file = os.path.join(self.log_root, "jit.log") - console.info("%s: Writing JIT compiler log to [%s]" % (self.human_name, log_file), logger=self.logger) - return ["-XX:+UnlockDiagnosticVMOptions", "-XX:+TraceClassLoading", "-XX:+LogCompilation", - "-XX:LogFile={}".format(log_file), "-XX:+PrintAssembly"] + console.info( + "%s: Writing JIT compiler log to [%s]" % (self.human_name, log_file), + logger=self.logger, + ) + return [ + "-XX:+UnlockDiagnosticVMOptions", + "-XX:+TraceClassLoading", + "-XX:+LogCompilation", + "-XX:LogFile={}".format(log_file), + "-XX:+PrintAssembly", + ] class Gc(TelemetryDevice): @@ -259,18 +313,31 @@ def __init__(self, telemetry_params, log_root, java_major_version): def instrument_java_opts(self): io.ensure_dir(self.log_root) log_file = os.path.join(self.log_root, "gc.log") - console.info("%s: Writing GC log to [%s]" % (self.human_name, log_file), logger=self.logger) + console.info( + "%s: Writing GC log to [%s]" % (self.human_name, log_file), + logger=self.logger, + ) return self.java_opts(log_file) def java_opts(self, log_file): if self.java_major_version < 9: - return ["-Xloggc:{}".format(log_file), "-XX:+PrintGCDetails", "-XX:+PrintGCDateStamps", "-XX:+PrintGCTimeStamps", - "-XX:+PrintGCApplicationStoppedTime", "-XX:+PrintGCApplicationConcurrentTime", - "-XX:+PrintTenuringDistribution"] + return [ + "-Xloggc:{}".format(log_file), + "-XX:+PrintGCDetails", + "-XX:+PrintGCDateStamps", + "-XX:+PrintGCTimeStamps", + "-XX:+PrintGCApplicationStoppedTime", + "-XX:+PrintGCApplicationConcurrentTime", + "-XX:+PrintTenuringDistribution", + ] else: - log_config = self.telemetry_params.get("gc-log-config", "gc*=info,safepoint=info,age*=trace") + log_config = self.telemetry_params.get( + "gc-log-config", "gc*=info,safepoint=info,age*=trace" + ) # see https://docs.oracle.com/javase/9/tools/java.htm#JSWOR-GUID-BE93ABDC-999C-4CB5-A88B-1994AAAC74D5 - return [f"-Xlog:{log_config}:file={log_file}:utctime,uptimemillis,level,tags:filecount=0"] + return [ + f"-Xlog:{log_config}:file={log_file}:utctime,uptimemillis,level,tags:filecount=0" + ] class Heapdump(TelemetryDevice): @@ -285,8 +352,13 @@ def __init__(self, log_root): def detach_from_node(self, node, running): if running: - heap_dump_file = os.path.join(self.log_root, "heap_at_exit_{}.hprof".format(node.pid)) - console.info("{}: Writing heap dump to [{}]".format(self.human_name, heap_dump_file), logger=self.logger) + heap_dump_file = os.path.join( + self.log_root, "heap_at_exit_{}.hprof".format(node.pid) + ) + console.info( + "{}: Writing heap dump to [{}]".format(self.human_name, heap_dump_file), + logger=self.logger, + ) cmd = "jmap -dump:format=b,file={} {}".format(heap_dump_file, node.pid) if process.run_subprocess_with_logging(cmd): self.logger.warning("Could not write heap dump to [%s]", heap_dump_file) @@ -308,7 +380,10 @@ def on_benchmark_stop(self): try: segment_stats = self.client.cat.segments(index="_all", v=True) stats_file = os.path.join(self.log_root, "segment_stats.log") - console.info(f"{self.human_name}: Writing segment stats to [{stats_file}]", logger=self.logger) + console.info( + f"{self.human_name}: Writing segment stats to [{stats_file}]", + logger=self.logger, + ) with open(stats_file, "wt") as f: f.write(segment_stats) except BaseException: @@ -319,12 +394,15 @@ class CcrStats(TelemetryDevice): internal = False command = "ccr-stats" human_name = "CCR Stats" - help = ("Regularly samples Cross Cluster Replication (CCR) leader and follower(s) checkpoint at index level" - "and calculates replication lag") + help = ( + "Regularly samples Cross Cluster Replication (CCR) leader and follower(s) checkpoint at index level" + "and calculates replication lag" + ) """ Gathers CCR stats on a cluster level """ + def __init__(self, telemetry_params, clients, metrics_store): """ :param telemetry_params: The configuration object for telemetry_params. @@ -343,17 +421,25 @@ def __init__(self, telemetry_params, clients, metrics_store): self.sample_interval = telemetry_params.get("ccr-stats-sample-interval", 1) if self.sample_interval <= 0: raise exceptions.SystemSetupError( - "The telemetry parameter 'ccr-stats-sample-interval' must be greater than zero but was {}.".format(self.sample_interval)) + "The telemetry parameter 'ccr-stats-sample-interval' must be greater than zero but was {}.".format( + self.sample_interval + ) + ) self.specified_cluster_names = self.clients.keys() self.indices_per_cluster = self.telemetry_params.get("ccr-stats-indices", False) - self.max_replication_lag_seconds = self.telemetry_params.get("ccr-max-replication-lag-seconds", 60*60*60) + self.max_replication_lag_seconds = self.telemetry_params.get( + "ccr-max-replication-lag-seconds", 60 * 60 * 60 + ) if self.indices_per_cluster: for cluster_name in self.indices_per_cluster.keys(): if cluster_name not in clients: raise exceptions.SystemSetupError( "The telemetry parameter 'ccr-stats-indices' must be a JSON Object with keys matching " "the cluster names [{}] specified in --target-hosts " - "but it had [{}].".format(",".join(sorted(clients.keys())), cluster_name)) + "but it had [{}].".format( + ",".join(sorted(clients.keys())), cluster_name + ) + ) self.specified_cluster_names = self.indices_per_cluster.keys() self.metrics_store = metrics_store @@ -362,9 +448,16 @@ def __init__(self, telemetry_params, clients, metrics_store): def on_benchmark_start(self): recorder = [] for cluster_name in self.specified_cluster_names: - recorder = CcrStatsRecorder(cluster_name, self.clients[cluster_name], self.metrics_store, self.sample_interval, - self.max_replication_lag_seconds, - self.indices_per_cluster[cluster_name] if self.indices_per_cluster else None) + recorder = CcrStatsRecorder( + cluster_name, + self.clients[cluster_name], + self.metrics_store, + self.sample_interval, + self.max_replication_lag_seconds, + self.indices_per_cluster[cluster_name] + if self.indices_per_cluster + else None, + ) sampler = SamplerThread(recorder) self.samplers.append(sampler) sampler.setDaemon(True) @@ -382,7 +475,15 @@ class CcrStatsRecorder: Collects and pushes CCR stats for the specified cluster to the metric store. """ - def __init__(self, cluster_name, client, metrics_store, sample_interval, max_replication_lag_seconds, indices=None): + def __init__( + self, + cluster_name, + client, + metrics_store, + sample_interval, + max_replication_lag_seconds, + indices=None, + ): """ :param cluster_name: The cluster_name that the client connects to, as specified in target.hosts. :param client: The OpenSearch client for this cluster. @@ -394,12 +495,14 @@ def __init__(self, cluster_name, client, metrics_store, sample_interval, max_rep self.cluster_name = cluster_name self.client = client self.metrics_store = metrics_store - self.sample_interval= sample_interval + self.sample_interval = sample_interval self.indices = indices self.logger = logging.getLogger(__name__) self.leader_checkpoints = dict() for index in self.indices: - self.leader_checkpoints[index] = deque(maxlen = int(max_replication_lag_seconds/sample_interval)) + self.leader_checkpoints[index] = deque( + maxlen=int(max_replication_lag_seconds / sample_interval) + ) def __str__(self): return "ccr stats" @@ -416,7 +519,9 @@ def record(self): def log_leader_stats(self): try: - stats = self.client.transport.perform_request("GET", "/_plugins/_replication/leader_stats") + stats = self.client.transport.perform_request( + "GET", "/_plugins/_replication/leader_stats" + ) self.record_cluster_level_stats(stats) except opensearchpy.TransportError: msg = "A transport error occurred while collecting CCR leader stats" @@ -425,24 +530,31 @@ def log_leader_stats(self): def log_follower_stats(self): try: - stats = self.client.transport.perform_request("GET", "/_plugins/_replication/follower_stats") + stats = self.client.transport.perform_request( + "GET", "/_plugins/_replication/follower_stats" + ) self.record_cluster_level_stats(stats) except opensearchpy.TransportError: - msg = "A transport error occurred while collecting follower stats for remote cluster: {}".format(self.cluster_name) + msg = "A transport error occurred while collecting follower stats for remote cluster: {}".format( + self.cluster_name + ) self.logger.exception(msg) raise exceptions.BenchmarkError(msg) - def log_ccr_lag_per_index(self): for index in self.indices: try: - stats = self.client.transport.perform_request("GET", "/_plugins/_replication/" + index + "/_status") + stats = self.client.transport.perform_request( + "GET", "/_plugins/_replication/" + index + "/_status" + ) if stats["status"] == "SYNCING": self.record_stats_per_index(index, stats) else: self.logger.info("CCR Status is not syncing. Ignoring for now!") except opensearchpy.TransportError: - msg = "A transport error occurred while collecting CCR stats for remote cluster: {}".format(self.cluster_name) + msg = "A transport error occurred while collecting CCR stats for remote cluster: {}".format( + self.cluster_name + ) self.logger.exception(msg) raise exceptions.BenchmarkError(msg) @@ -456,25 +568,32 @@ def record_stats_per_index(self, name, stats): "index": name, "leader_checkpoint": stats["syncing_details"]["leader_checkpoint"], "follower_checkpoint": stats["syncing_details"]["follower_checkpoint"], - "replication_lag": self.calculate_lag(name, stats["syncing_details"]["leader_checkpoint"], - stats["syncing_details"]["follower_checkpoint"]) - } - index_metadata = { - "cluster": self.cluster_name, - "index": name + "replication_lag": self.calculate_lag( + name, + stats["syncing_details"]["leader_checkpoint"], + stats["syncing_details"]["follower_checkpoint"], + ), } - self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=index_metadata) + index_metadata = {"cluster": self.cluster_name, "index": name} + self.metrics_store.put_doc( + doc, level=MetaInfoScope.cluster, meta_data=index_metadata + ) def record_cluster_level_stats(self, data): metadata = { "cluster": self.cluster_name, } - self.metrics_store.put_doc(data, level=MetaInfoScope.cluster, meta_data=metadata) + self.metrics_store.put_doc( + data, level=MetaInfoScope.cluster, meta_data=metadata + ) def calculate_lag(self, index, leader_checkpoint, follower_checkpoint): leader_checkpoint_queue = self.leader_checkpoints[index] - while(leader_checkpoint_queue and leader_checkpoint_queue[0] <= follower_checkpoint): + while ( + leader_checkpoint_queue + and leader_checkpoint_queue[0] <= follower_checkpoint + ): leader_checkpoint_queue.popleft() if leader_checkpoint > follower_checkpoint: @@ -482,8 +601,6 @@ def calculate_lag(self, index, leader_checkpoint, follower_checkpoint): return len(leader_checkpoint_queue) * self.sample_interval - - class RecoveryStats(TelemetryDevice): internal = False command = "recovery-stats" @@ -515,8 +632,10 @@ def __init__(self, telemetry_params, clients, metrics_store): self.sample_interval = telemetry_params.get("recovery-stats-sample-interval", 1) if self.sample_interval <= 0: raise exceptions.SystemSetupError( - "The telemetry parameter 'recovery-stats-sample-interval' must be greater than zero but was {}." - .format(self.sample_interval)) + "The telemetry parameter 'recovery-stats-sample-interval' must be greater than zero but was {}.".format( + self.sample_interval + ) + ) self.specified_cluster_names = self.clients.keys() indices_per_cluster = self.telemetry_params.get("recovery-stats-indices", False) # allow the user to specify either an index pattern as string or as a JSON object @@ -531,7 +650,10 @@ def __init__(self, telemetry_params, clients, metrics_store): raise exceptions.SystemSetupError( "The telemetry parameter 'recovery-stats-indices' must be a JSON Object with keys matching " "the cluster names [{}] specified in --target-hosts " - "but it had [{}].".format(",".join(sorted(clients.keys())), cluster_name)) + "but it had [{}].".format( + ",".join(sorted(clients.keys())), cluster_name + ) + ) self.specified_cluster_names = self.indices_per_cluster.keys() self.metrics_store = metrics_store @@ -539,9 +661,15 @@ def __init__(self, telemetry_params, clients, metrics_store): def on_benchmark_start(self): for cluster_name in self.specified_cluster_names: - recorder = RecoveryStatsRecorder(cluster_name, self.clients[cluster_name], self.metrics_store, - self.sample_interval, - self.indices_per_cluster[cluster_name] if self.indices_per_cluster else "") + recorder = RecoveryStatsRecorder( + cluster_name, + self.clients[cluster_name], + self.metrics_store, + self.sample_interval, + self.indices_per_cluster[cluster_name] + if self.indices_per_cluster + else "", + ) sampler = SamplerThread(recorder) self.samplers.append(sampler) sampler.setDaemon(True) @@ -559,7 +687,9 @@ class RecoveryStatsRecorder: Collects and pushes recovery stats for the specified cluster to the metric store. """ - def __init__(self, cluster_name, client, metrics_store, sample_interval, indices=None): + def __init__( + self, cluster_name, client, metrics_store, sample_interval, indices=None + ): """ :param cluster_name: The cluster_name that the client connects to, as specified in target.hosts. :param client: The OpenSearch client for this cluster. @@ -583,24 +713,27 @@ def record(self): Collect recovery stats for indexes (optionally) specified in telemetry parameters and push to metrics store. """ try: - stats = self.client.indices.recovery(index=self.indices, active_only=True, detailed=False) + stats = self.client.indices.recovery( + index=self.indices, active_only=True, detailed=False + ) except opensearchpy.TransportError: - msg = "A transport error occurred while collecting recovery stats on cluster [{}]".format(self.cluster_name) + msg = "A transport error occurred while collecting recovery stats on cluster [{}]".format( + self.cluster_name + ) self.logger.exception(msg) raise exceptions.BenchmarkError(msg) for idx, idx_stats in stats.items(): for shard in idx_stats["shards"]: - doc = { - "name": "recovery-stats", - "shard": shard - } + doc = {"name": "recovery-stats", "shard": shard} shard_metadata = { "cluster": self.cluster_name, "index": idx, - "shard": shard["id"] + "shard": shard["id"], } - self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=shard_metadata) + self.metrics_store.put_doc( + doc, level=MetaInfoScope.cluster, meta_data=shard_metadata + ) class NodeStats(TelemetryDevice): @@ -632,11 +765,18 @@ def on_benchmark_start(self): distribution_version = default_client.info()["version"]["number"] major, minor = components(distribution_version)[:2] - if distribution_name != NodeStats.opensearch_distribution_name and (major < 7 or (major == 7 and minor < 2)): + if distribution_name != NodeStats.opensearch_distribution_name and ( + major < 7 or (major == 7 and minor < 2) + ): console.warn(NodeStats.warning, logger=self.logger) for cluster_name in self.specified_cluster_names: - recorder = NodeStatsRecorder(self.telemetry_params, cluster_name, self.clients[cluster_name], self.metrics_store) + recorder = NodeStatsRecorder( + self.telemetry_params, + cluster_name, + self.clients[cluster_name], + self.metrics_store, + ) sampler = SamplerThread(recorder) self.samplers.append(sampler) sampler.setDaemon(True) @@ -654,32 +794,58 @@ def __init__(self, telemetry_params, cluster_name, client, metrics_store): self.sample_interval = telemetry_params.get("node-stats-sample-interval", 1) if self.sample_interval <= 0: raise exceptions.SystemSetupError( - "The telemetry parameter 'node-stats-sample-interval' must be greater than zero but was {}.".format(self.sample_interval)) + "The telemetry parameter 'node-stats-sample-interval' must be greater than zero but was {}.".format( + self.sample_interval + ) + ) self.include_indices = telemetry_params.get("node-stats-include-indices", False) - self.include_indices_metrics = telemetry_params.get("node-stats-include-indices-metrics", False) + self.include_indices_metrics = telemetry_params.get( + "node-stats-include-indices-metrics", False + ) if self.include_indices_metrics: if isinstance(self.include_indices_metrics, str): - self.include_indices_metrics_list = opts.csv_to_list(self.include_indices_metrics) + self.include_indices_metrics_list = opts.csv_to_list( + self.include_indices_metrics + ) else: # we don't validate the allowable metrics as they may change across ES versions raise exceptions.SystemSetupError( "The telemetry parameter 'node-stats-include-indices-metrics' must be a comma-separated string but was {}".format( - type(self.include_indices_metrics)) + type(self.include_indices_metrics) ) + ) else: - self.include_indices_metrics_list = ["docs", "store", "indexing", "search", "merges", "query_cache", - "fielddata", "segments", "translog", "request_cache"] - - self.include_thread_pools = telemetry_params.get("node-stats-include-thread-pools", True) - self.include_buffer_pools = telemetry_params.get("node-stats-include-buffer-pools", True) - self.include_breakers = telemetry_params.get("node-stats-include-breakers", True) + self.include_indices_metrics_list = [ + "docs", + "store", + "indexing", + "search", + "merges", + "query_cache", + "fielddata", + "segments", + "translog", + "request_cache", + ] + + self.include_thread_pools = telemetry_params.get( + "node-stats-include-thread-pools", True + ) + self.include_buffer_pools = telemetry_params.get( + "node-stats-include-buffer-pools", True + ) + self.include_breakers = telemetry_params.get( + "node-stats-include-breakers", True + ) self.include_network = telemetry_params.get("node-stats-include-network", True) self.include_process = telemetry_params.get("node-stats-include-process", True) self.include_mem_stats = telemetry_params.get("node-stats-include-mem", True) self.include_gc_stats = telemetry_params.get("node-stats-include-gc", True) - self.include_indexing_pressure = telemetry_params.get("node-stats-include-indexing-pressure", True) + self.include_indexing_pressure = telemetry_params.get( + "node-stats-include-indexing-pressure", True + ) self.client = client self.metrics_store = metrics_store self.cluster_name = cluster_name @@ -693,20 +859,29 @@ def record(self): node_name = node_stats["name"] metrics_store_meta_data = { "cluster": self.cluster_name, - "node_name": node_name + "node_name": node_name, } collected_node_stats = collections.OrderedDict() collected_node_stats["name"] = "node-stats" if self.include_indices or self.include_indices_metrics: collected_node_stats.update( - self.indices_stats(node_name, node_stats, include=self.include_indices_metrics_list)) + self.indices_stats( + node_name, node_stats, include=self.include_indices_metrics_list + ) + ) if self.include_thread_pools: - collected_node_stats.update(self.thread_pool_stats(node_name, node_stats)) + collected_node_stats.update( + self.thread_pool_stats(node_name, node_stats) + ) if self.include_breakers: - collected_node_stats.update(self.circuit_breaker_stats(node_name, node_stats)) + collected_node_stats.update( + self.circuit_breaker_stats(node_name, node_stats) + ) if self.include_buffer_pools: - collected_node_stats.update(self.jvm_buffer_pool_stats(node_name, node_stats)) + collected_node_stats.update( + self.jvm_buffer_pool_stats(node_name, node_stats) + ) if self.include_mem_stats: collected_node_stats.update(self.jvm_mem_stats(node_name, node_stats)) if self.include_gc_stats: @@ -716,12 +891,16 @@ def record(self): if self.include_process: collected_node_stats.update(self.process_stats(node_name, node_stats)) if self.include_indexing_pressure: - collected_node_stats.update(self.indexing_pressure(node_name, node_stats)) + collected_node_stats.update( + self.indexing_pressure(node_name, node_stats) + ) - self.metrics_store.put_doc(dict(collected_node_stats), - level=MetaInfoScope.node, - node_name=node_name, - meta_data=metrics_store_meta_data) + self.metrics_store.put_doc( + dict(collected_node_stats), + level=MetaInfoScope.node, + node_name=node_name, + meta_data=metrics_store_meta_data, + ) def flatten_stats_fields(self, prefix=None, stats=None): """ @@ -737,11 +916,17 @@ def iterate(): if isinstance(section_value, dict): new_prefix = "{}_{}".format(prefix, section_name) # https://www.python.org/dev/peps/pep-0380/ - yield from self.flatten_stats_fields(prefix=new_prefix, stats=section_value).items() + yield from self.flatten_stats_fields( + prefix=new_prefix, stats=section_value + ).items() # Avoid duplication for metric fields that have unit embedded in value as they are also recorded elsewhere # example: `breakers_parent_limit_size_in_bytes` vs `breakers_parent_limit_size` - elif isinstance(section_value, (int, float)) and not isinstance(section_value, bool): - yield "{}{}".format(prefix + "_" if prefix else "", section_name), section_value + elif isinstance(section_value, (int, float)) and not isinstance( + section_value, bool + ): + yield "{}{}".format( + prefix + "_" if prefix else "", section_name + ), section_value if stats: return dict(iterate()) @@ -753,33 +938,51 @@ def indices_stats(self, node_name, node_stats, include): ordered_results = collections.OrderedDict() for section in include: if section in idx_stats: - ordered_results.update(self.flatten_stats_fields(prefix="indices_" + section, stats=idx_stats[section])) + ordered_results.update( + self.flatten_stats_fields( + prefix="indices_" + section, stats=idx_stats[section] + ) + ) return ordered_results def thread_pool_stats(self, node_name, node_stats): - return self.flatten_stats_fields(prefix="thread_pool", stats=node_stats["thread_pool"]) + return self.flatten_stats_fields( + prefix="thread_pool", stats=node_stats["thread_pool"] + ) def circuit_breaker_stats(self, node_name, node_stats): - return self.flatten_stats_fields(prefix="breakers", stats=node_stats["breakers"]) + return self.flatten_stats_fields( + prefix="breakers", stats=node_stats["breakers"] + ) def jvm_buffer_pool_stats(self, node_name, node_stats): - return self.flatten_stats_fields(prefix="jvm_buffer_pools", stats=node_stats["jvm"]["buffer_pools"]) + return self.flatten_stats_fields( + prefix="jvm_buffer_pools", stats=node_stats["jvm"]["buffer_pools"] + ) def jvm_mem_stats(self, node_name, node_stats): - return self.flatten_stats_fields(prefix="jvm_mem", stats=node_stats["jvm"]["mem"]) + return self.flatten_stats_fields( + prefix="jvm_mem", stats=node_stats["jvm"]["mem"] + ) def jvm_gc_stats(self, node_name, node_stats): return self.flatten_stats_fields(prefix="jvm_gc", stats=node_stats["jvm"]["gc"]) def network_stats(self, node_name, node_stats): - return self.flatten_stats_fields(prefix="transport", stats=node_stats.get("transport")) + return self.flatten_stats_fields( + prefix="transport", stats=node_stats.get("transport") + ) def process_stats(self, node_name, node_stats): - return self.flatten_stats_fields(prefix="process_cpu", stats=node_stats["process"]["cpu"]) + return self.flatten_stats_fields( + prefix="process_cpu", stats=node_stats["process"]["cpu"] + ) def indexing_pressure(self, node_name, node_stats): - return self.flatten_stats_fields(prefix="indexing_pressure", stats=node_stats["indexing_pressure"]) + return self.flatten_stats_fields( + prefix="indexing_pressure", stats=node_stats["indexing_pressure"] + ) def sample(self): try: @@ -810,20 +1013,26 @@ def __init__(self, telemetry_params, clients, metrics_store): self.telemetry_params = telemetry_params self.clients = clients - self.sample_interval = telemetry_params.get("transform-stats-sample-interval", 1) + self.sample_interval = telemetry_params.get( + "transform-stats-sample-interval", 1 + ) if self.sample_interval <= 0: raise exceptions.SystemSetupError( f"The telemetry parameter 'transform-stats-sample-interval' must be greater than zero " - f"but was [{self.sample_interval}].") + f"but was [{self.sample_interval}]." + ) self.specified_cluster_names = self.clients.keys() - self.transforms_per_cluster = self.telemetry_params.get("transform-stats-transforms", False) + self.transforms_per_cluster = self.telemetry_params.get( + "transform-stats-transforms", False + ) if self.transforms_per_cluster: for cluster_name in self.transforms_per_cluster.keys(): if cluster_name not in clients: raise exceptions.SystemSetupError( f"The telemetry parameter 'transform-stats-transforms' must be a JSON Object with keys " f"matching the cluster names [{','.join(sorted(clients.keys()))}] specified in --target-hosts " - f"but it had [{cluster_name}].") + f"but it had [{cluster_name}]." + ) self.specified_cluster_names = self.transforms_per_cluster.keys() self.metrics_store = metrics_store @@ -831,10 +1040,15 @@ def __init__(self, telemetry_params, clients, metrics_store): def on_benchmark_start(self): for cluster_name in self.specified_cluster_names: - recorder = TransformStatsRecorder(cluster_name, self.clients[cluster_name], self.metrics_store, - self.sample_interval, - self.transforms_per_cluster[ - cluster_name] if self.transforms_per_cluster else None) + recorder = TransformStatsRecorder( + cluster_name, + self.clients[cluster_name], + self.metrics_store, + self.sample_interval, + self.transforms_per_cluster[cluster_name] + if self.transforms_per_cluster + else None, + ) sampler = SamplerThread(recorder) self.samplers.append(sampler) sampler.setDaemon(True) @@ -854,7 +1068,9 @@ class TransformStatsRecorder: Collects and pushes Transform stats for the specified cluster to the metric store. """ - def __init__(self, cluster_name, client, metrics_store, sample_interval, transforms=None): + def __init__( + self, cluster_name, client, metrics_store, sample_interval, transforms=None + ): """ :param cluster_name: The cluster_name that the client connects to, as specified in target.hosts. :param client: The OpenSearch client for this cluster. @@ -895,8 +1111,10 @@ def _record(self, prefix=""): stats = self.client.transform.get_transform_stats("_all") except opensearchpy.TransportError: - msg = f"A transport error occurred while collecting transform stats on " \ - f"cluster [{self.cluster_name}]" + msg = ( + f"A transport error occurred while collecting transform stats on " + f"cluster [{self.cluster_name}]" + ) self.logger.exception(msg) raise exceptions.BenchmarkError(msg) @@ -906,7 +1124,9 @@ def _record(self, prefix=""): # Skip metrics for transform not part of user supplied whitelist (transform-stats-transforms) # in telemetry params. continue - self.record_stats_per_transform(transform["id"], transform["stats"], prefix) + self.record_stats_per_transform( + transform["id"], transform["stats"], prefix + ) except KeyError: self.logger.warning( @@ -921,41 +1141,67 @@ def record_stats_per_transform(self, transform_id, stats, prefix=""): :param prefix: A prefix for the counters/values, e.g. for total runtimes """ - meta_data = { - "transform_id": transform_id - } - - self.metrics_store.put_value_cluster_level(prefix + "transform_pages_processed", - stats.get("pages_processed", 0), - meta_data=meta_data) - self.metrics_store.put_value_cluster_level(prefix + "transform_documents_processed", - stats.get("documents_processed", 0), - meta_data=meta_data) - self.metrics_store.put_value_cluster_level(prefix + "transform_documents_indexed", - stats.get("documents_indexed", 0), - meta_data=meta_data) - self.metrics_store.put_value_cluster_level(prefix + "transform_index_total", stats.get("index_total", 0), - meta_data=meta_data) - self.metrics_store.put_value_cluster_level(prefix + "transform_index_failures", stats.get("index_failures", 0), - meta_data=meta_data) - self.metrics_store.put_value_cluster_level(prefix + "transform_search_total", stats.get("search_total", 0), - meta_data=meta_data) - self.metrics_store.put_value_cluster_level(prefix + "transform_search_failures", - stats.get("search_failures", 0), - meta_data=meta_data) - self.metrics_store.put_value_cluster_level(prefix + "transform_processing_total", - stats.get("processing_total", 0), - meta_data=meta_data) - - self.metrics_store.put_value_cluster_level(prefix + "transform_search_time", - stats.get("search_time_in_ms", 0), - "ms", meta_data=meta_data) - self.metrics_store.put_value_cluster_level(prefix + "transform_index_time", - stats.get("index_time_in_ms", 0), "ms", - meta_data=meta_data) - self.metrics_store.put_value_cluster_level(prefix + "transform_processing_time", - stats.get("processing_time_in_ms", 0), "ms", - meta_data=meta_data) + meta_data = {"transform_id": transform_id} + + self.metrics_store.put_value_cluster_level( + prefix + "transform_pages_processed", + stats.get("pages_processed", 0), + meta_data=meta_data, + ) + self.metrics_store.put_value_cluster_level( + prefix + "transform_documents_processed", + stats.get("documents_processed", 0), + meta_data=meta_data, + ) + self.metrics_store.put_value_cluster_level( + prefix + "transform_documents_indexed", + stats.get("documents_indexed", 0), + meta_data=meta_data, + ) + self.metrics_store.put_value_cluster_level( + prefix + "transform_index_total", + stats.get("index_total", 0), + meta_data=meta_data, + ) + self.metrics_store.put_value_cluster_level( + prefix + "transform_index_failures", + stats.get("index_failures", 0), + meta_data=meta_data, + ) + self.metrics_store.put_value_cluster_level( + prefix + "transform_search_total", + stats.get("search_total", 0), + meta_data=meta_data, + ) + self.metrics_store.put_value_cluster_level( + prefix + "transform_search_failures", + stats.get("search_failures", 0), + meta_data=meta_data, + ) + self.metrics_store.put_value_cluster_level( + prefix + "transform_processing_total", + stats.get("processing_total", 0), + meta_data=meta_data, + ) + + self.metrics_store.put_value_cluster_level( + prefix + "transform_search_time", + stats.get("search_time_in_ms", 0), + "ms", + meta_data=meta_data, + ) + self.metrics_store.put_value_cluster_level( + prefix + "transform_index_time", + stats.get("index_time_in_ms", 0), + "ms", + meta_data=meta_data, + ) + self.metrics_store.put_value_cluster_level( + prefix + "transform_processing_time", + stats.get("processing_time_in_ms", 0), + "ms", + meta_data=meta_data, + ) documents_processed = stats.get("documents_processed", 0) processing_time = stats.get("search_time_in_ms", 0) @@ -964,8 +1210,12 @@ def record_stats_per_transform(self, transform_id, stats, prefix=""): if processing_time > 0: throughput = documents_processed / processing_time * 1000 - self.metrics_store.put_value_cluster_level(prefix + "transform_throughput", throughput, - "docs/s", meta_data=meta_data) + self.metrics_store.put_value_cluster_level( + prefix + "transform_throughput", + throughput, + "docs/s", + meta_data=meta_data, + ) class SearchableSnapshotsStats(TelemetryDevice): @@ -977,6 +1227,7 @@ class SearchableSnapshotsStats(TelemetryDevice): """ Gathers searchable snapshots stats on a cluster level """ + def __init__(self, telemetry_params, clients, metrics_store): """ :param telemetry_params: The configuration object for telemetry_params. @@ -1016,13 +1267,18 @@ def __init__(self, telemetry_params, clients, metrics_store): self.telemetry_params = telemetry_params self.clients = clients - self.sample_interval = telemetry_params.get("searchable-snapshots-stats-sample-interval", 1) + self.sample_interval = telemetry_params.get( + "searchable-snapshots-stats-sample-interval", 1 + ) if self.sample_interval <= 0: raise exceptions.SystemSetupError( f"The telemetry parameter 'searchable-snapshots-stats-sample-interval' must be greater than zero " - f"but was {self.sample_interval}.") + f"but was {self.sample_interval}." + ) self.specified_cluster_names = self.clients.keys() - indices_per_cluster = self.telemetry_params.get("searchable-snapshots-stats-indices", None) + indices_per_cluster = self.telemetry_params.get( + "searchable-snapshots-stats-indices", None + ) # allow the user to specify either an index pattern as string or as a JSON object if isinstance(indices_per_cluster, str): self.indices_per_cluster = {opts.TargetHosts.DEFAULT: [indices_per_cluster]} @@ -1035,7 +1291,8 @@ def __init__(self, telemetry_params, clients, metrics_store): raise exceptions.SystemSetupError( f"The telemetry parameter 'searchable-snapshots-stats-indices' must be a JSON Object " f"with keys matching the cluster names [{','.join(sorted(clients.keys()))}] specified in " - f"--target-hosts but it had [{cluster_name}].") + f"--target-hosts but it had [{cluster_name}]." + ) self.specified_cluster_names = self.indices_per_cluster.keys() self.metrics_store = metrics_store @@ -1044,8 +1301,14 @@ def __init__(self, telemetry_params, clients, metrics_store): def on_benchmark_start(self): for cluster_name in self.specified_cluster_names: recorder = SearchableSnapshotsStatsRecorder( - cluster_name, self.clients[cluster_name], self.metrics_store, self.sample_interval, - self.indices_per_cluster[cluster_name] if self.indices_per_cluster else None) + cluster_name, + self.clients[cluster_name], + self.metrics_store, + self.sample_interval, + self.indices_per_cluster[cluster_name] + if self.indices_per_cluster + else None, + ) sampler = SamplerThread(recorder) self.samplers.append(sampler) sampler.setDaemon(True) @@ -1063,7 +1326,9 @@ class SearchableSnapshotsStatsRecorder: Collects and pushes searchable snapshots stats for the specified cluster to the metric store. """ - def __init__(self, cluster_name, client, metrics_store, sample_interval, indices=None): + def __init__( + self, cluster_name, client, metrics_store, sample_interval, indices=None + ): """ :param cluster_name: The cluster_name that the client connects to, as specified in target.hosts. :param client: The OpenSearch client for this cluster. @@ -1092,18 +1357,25 @@ def record(self): level = "indices" if self.indices else "cluster" # we don't use the existing client support (searchable_snapshots.stats()) # as the API is deliberately undocumented and might change: - stats = self.client.transport.perform_request("GET", stats_api_endpoint, params={"level": level}) + stats = self.client.transport.perform_request( + "GET", stats_api_endpoint, params={"level": level} + ) except opensearchpy.NotFoundError as e: - if "No searchable snapshots indices found" in e.info.get("error").get("reason"): + if "No searchable snapshots indices found" in e.info.get("error").get( + "reason" + ): self.logger.info( "Unable to find valid indices while collecting searchable snapshots stats " - "on cluster [%s]", self.cluster_name) + "on cluster [%s]", + self.cluster_name, + ) # allow collection, indices might be mounted later on return except opensearchpy.TransportError: raise exceptions.BenchmarkError( f"A transport error occurred while collecting searchable snapshots stats on cluster " - f"[{self.cluster_name}]") from None + f"[{self.cluster_name}]" + ) from None total_stats = stats.get("total", []) for lucene_file_stats in total_stats: @@ -1128,12 +1400,11 @@ def _push_stats(self, level, stats, index=None): if index: doc["index"] = index - meta_data = { - "cluster": self.cluster_name, - "level": level - } + meta_data = {"cluster": self.cluster_name, "level": level} - self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=meta_data) + self.metrics_store.put_doc( + doc, level=MetaInfoScope.cluster, meta_data=meta_data + ) # TODO Consider moving under the utils package for broader/future use? # at the moment it's only useful here as this stats API is undocumented and we don't wan't to use @@ -1150,6 +1421,7 @@ def _match_list_or_pattern(self, idx): return True return False + class StartupTime(InternalTelemetryDevice): def __init__(self, stopwatch=time.StopWatch): super().__init__() @@ -1162,13 +1434,16 @@ def attach_to_node(self, node): self.timer.stop() def store_system_metrics(self, node, metrics_store): - metrics_store.put_value_node_level(node.node_name, "node_startup_time", self.timer.total_time(), "s") + metrics_store.put_value_node_level( + node.node_name, "node_startup_time", self.timer.total_time(), "s" + ) class DiskIo(InternalTelemetryDevice): """ Gathers disk I/O stats. """ + def __init__(self, node_count_on_host): super().__init__() self.node_count_on_host = node_count_on_host @@ -1188,10 +1463,14 @@ def attach_to_node(self, node): disk_start = sysstats.disk_io_counters() self.read_bytes = disk_start.read_bytes self.write_bytes = disk_start.write_bytes - self.logger.warning("Process I/O counters are not supported on this platform. Falling back to less " - "accurate disk I/O counters.") + self.logger.warning( + "Process I/O counters are not supported on this platform. Falling back to less " + "accurate disk I/O counters." + ) except BaseException: - self.logger.exception("Could not determine I/O stats at benchmark start.") + self.logger.exception( + "Could not determine I/O stats at benchmark start." + ) def detach_from_node(self, node, running): if running: @@ -1208,12 +1487,20 @@ def detach_from_node(self, node, running): else: disk_end = sysstats.disk_io_counters() if self.node_count_on_host > 1: - self.logger.info("There are [%d] nodes on this host and Benchmark fell back to disk I/O counters. " - "Attributing [1/%d] of total I/O to [%s].", - self.node_count_on_host, self.node_count_on_host, node.node_name) - - self.read_bytes = (disk_end.read_bytes - self.read_bytes) // self.node_count_on_host - self.write_bytes = (disk_end.write_bytes - self.write_bytes) // self.node_count_on_host + self.logger.info( + "There are [%d] nodes on this host and Benchmark fell back to disk I/O counters. " + "Attributing [1/%d] of total I/O to [%s].", + self.node_count_on_host, + self.node_count_on_host, + node.node_name, + ) + + self.read_bytes = ( + disk_end.read_bytes - self.read_bytes + ) // self.node_count_on_host + self.write_bytes = ( + disk_end.write_bytes - self.write_bytes + ) // self.node_count_on_host # Catching RuntimeException is not sufficient: psutil might raise AccessDenied (derived from Exception) except BaseException: self.logger.exception("Could not determine I/O stats at benchmark end.") @@ -1223,9 +1510,13 @@ def detach_from_node(self, node, running): def store_system_metrics(self, node, metrics_store): if self.write_bytes is not None: - metrics_store.put_value_node_level(node.node_name, "disk_io_write_bytes", self.write_bytes, "byte") + metrics_store.put_value_node_level( + node.node_name, "disk_io_write_bytes", self.write_bytes, "byte" + ) if self.read_bytes is not None: - metrics_store.put_value_node_level(node.node_name, "disk_io_read_bytes", self.read_bytes, "byte") + metrics_store.put_value_node_level( + node.node_name, "disk_io_read_bytes", self.read_bytes, "byte" + ) def store_node_attribute_metadata(metrics_store, nodes_info): @@ -1235,14 +1526,18 @@ def store_node_attribute_metadata(metrics_store, nodes_info): if "attributes" in node: for k, v in node["attributes"].items(): attribute_key = "attribute_%s" % str(k) - metrics_store.add_meta_info(metrics.MetaInfoScope.node, node["name"], attribute_key, v) + metrics_store.add_meta_info( + metrics.MetaInfoScope.node, node["name"], attribute_key, v + ) if attribute_key not in pseudo_cluster_attributes: pseudo_cluster_attributes[attribute_key] = set() pseudo_cluster_attributes[attribute_key].add(v) for k, v in pseudo_cluster_attributes.items(): if len(v) == 1: - metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, k, next(iter(v))) + metrics_store.add_meta_info( + metrics.MetaInfoScope.cluster, None, k, next(iter(v)) + ) def store_plugin_metadata(metrics_store, nodes_info): @@ -1251,7 +1546,11 @@ def store_plugin_metadata(metrics_store, nodes_info): all_same = False for node in nodes_info: - plugins = [p["name"] for p in extract_value(node, ["plugins"], fallback=[]) if "name" in p] + plugins = [ + p["name"] + for p in extract_value(node, ["plugins"], fallback=[]) + if "name" in p + ] if not all_nodes_plugins: all_nodes_plugins = plugins.copy() all_same = True @@ -1260,10 +1559,14 @@ def store_plugin_metadata(metrics_store, nodes_info): all_same = all_same and set(all_nodes_plugins) == set(plugins) if plugins: - metrics_store.add_meta_info(metrics.MetaInfoScope.node, node["name"], "plugins", plugins) + metrics_store.add_meta_info( + metrics.MetaInfoScope.node, node["name"], "plugins", plugins + ) if all_same and all_nodes_plugins: - metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "plugins", all_nodes_plugins) + metrics_store.add_meta_info( + metrics.MetaInfoScope.cluster, None, "plugins", all_nodes_plugins + ) def extract_value(node, path, fallback="unknown"): @@ -1280,6 +1583,7 @@ class ClusterEnvironmentInfo(InternalTelemetryDevice): """ Gathers static environment information on a cluster level (e.g. version numbers). """ + def __init__(self, client, metrics_store): super().__init__() self.metrics_store = metrics_store @@ -1296,9 +1600,21 @@ def on_benchmark_start(self): distribution_version = client_info["version"]["number"] # older versions (pre 6.3.0) don't expose a build_flavor property because the only (implicit) flavor was "oss". distribution_flavor = client_info["version"].get("build_flavor", "oss") - self.metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "source_revision", revision) - self.metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "distribution_version", distribution_version) - self.metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "distribution_flavor", distribution_flavor) + self.metrics_store.add_meta_info( + metrics.MetaInfoScope.cluster, None, "source_revision", revision + ) + self.metrics_store.add_meta_info( + metrics.MetaInfoScope.cluster, + None, + "distribution_version", + distribution_version, + ) + self.metrics_store.add_meta_info( + metrics.MetaInfoScope.cluster, + None, + "distribution_flavor", + distribution_flavor, + ) info = self.client.nodes.info(node_id="_all") nodes_info = info["nodes"].values() @@ -1307,8 +1623,18 @@ def on_benchmark_start(self): # while we could determine this for bare-metal nodes that are # provisioned by Benchmark, there are other cases (Docker, externally # provisioned clusters) where it's not that easy. - self.metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "jvm_vendor", extract_value(node, ["jvm", "vm_vendor"])) - self.metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "jvm_version", extract_value(node, ["jvm", "version"])) + self.metrics_store.add_meta_info( + metrics.MetaInfoScope.node, + node_name, + "jvm_vendor", + extract_value(node, ["jvm", "vm_vendor"]), + ) + self.metrics_store.add_meta_info( + metrics.MetaInfoScope.node, + node_name, + "jvm_version", + extract_value(node, ["jvm", "version"]), + ) store_plugin_metadata(self.metrics_store, nodes_info) store_node_attribute_metadata(self.metrics_store, nodes_info) @@ -1318,19 +1644,40 @@ def add_metadata_for_node(metrics_store, node_name, host_name): """ Gathers static environment information like OS or CPU details for benchmark-provisioned nodes. """ - metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "os_name", sysstats.os_name()) - metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "os_version", sysstats.os_version()) - metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "cpu_logical_cores", sysstats.logical_cpu_cores()) - metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "cpu_physical_cores", sysstats.physical_cpu_cores()) - metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "cpu_model", sysstats.cpu_model()) - metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "node_name", node_name) - metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "host_name", host_name) + metrics_store.add_meta_info( + metrics.MetaInfoScope.node, node_name, "os_name", sysstats.os_name() + ) + metrics_store.add_meta_info( + metrics.MetaInfoScope.node, node_name, "os_version", sysstats.os_version() + ) + metrics_store.add_meta_info( + metrics.MetaInfoScope.node, + node_name, + "cpu_logical_cores", + sysstats.logical_cpu_cores(), + ) + metrics_store.add_meta_info( + metrics.MetaInfoScope.node, + node_name, + "cpu_physical_cores", + sysstats.physical_cpu_cores(), + ) + metrics_store.add_meta_info( + metrics.MetaInfoScope.node, node_name, "cpu_model", sysstats.cpu_model() + ) + metrics_store.add_meta_info( + metrics.MetaInfoScope.node, node_name, "node_name", node_name + ) + metrics_store.add_meta_info( + metrics.MetaInfoScope.node, node_name, "host_name", host_name + ) class ExternalEnvironmentInfo(InternalTelemetryDevice): """ Gathers static environment information for externally provisioned clusters. """ + def __init__(self, client, metrics_store): super().__init__() self.metrics_store = metrics_store @@ -1352,14 +1699,20 @@ def on_benchmark_start(self): for node in nodes_stats: node_name = node["name"] host = node.get("host", "unknown") - self.metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "node_name", node_name) - self.metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, "host_name", host) + self.metrics_store.add_meta_info( + metrics.MetaInfoScope.node, node_name, "node_name", node_name + ) + self.metrics_store.add_meta_info( + metrics.MetaInfoScope.node, node_name, "host_name", host + ) for node in nodes_info: node_name = node["name"] self.store_node_info(node_name, "os_name", node, ["os", "name"]) self.store_node_info(node_name, "os_version", node, ["os", "version"]) - self.store_node_info(node_name, "cpu_logical_cores", node, ["os", "available_processors"]) + self.store_node_info( + node_name, "cpu_logical_cores", node, ["os", "available_processors"] + ) self.store_node_info(node_name, "jvm_vendor", node, ["jvm", "vm_vendor"]) self.store_node_info(node_name, "jvm_version", node, ["jvm", "version"]) @@ -1367,13 +1720,16 @@ def on_benchmark_start(self): store_node_attribute_metadata(self.metrics_store, nodes_info) def store_node_info(self, node_name, metric_key, node, path): - self.metrics_store.add_meta_info(metrics.MetaInfoScope.node, node_name, metric_key, extract_value(node, path)) + self.metrics_store.add_meta_info( + metrics.MetaInfoScope.node, node_name, metric_key, extract_value(node, path) + ) class JvmStatsSummary(InternalTelemetryDevice): """ Gathers a summary of various JVM statistics during the whole test execution. """ + def __init__(self, client, metrics_store): super().__init__() self.metrics_store = metrics_store @@ -1394,38 +1750,66 @@ def on_benchmark_stop(self): for node_name, jvm_stats_end in jvm_stats_at_end.items(): if node_name in self.jvm_stats_per_node: jvm_stats_start = self.jvm_stats_per_node[node_name] - young_gc_time = max(jvm_stats_end["young_gc_time"] - jvm_stats_start["young_gc_time"], 0) - young_gc_count = max(jvm_stats_end["young_gc_count"] - jvm_stats_start["young_gc_count"], 0) - old_gc_time = max(jvm_stats_end["old_gc_time"] - jvm_stats_start["old_gc_time"], 0) - old_gc_count = max(jvm_stats_end["old_gc_count"] - jvm_stats_start["old_gc_count"], 0) + young_gc_time = max( + jvm_stats_end["young_gc_time"] - jvm_stats_start["young_gc_time"], 0 + ) + young_gc_count = max( + jvm_stats_end["young_gc_count"] - jvm_stats_start["young_gc_count"], + 0, + ) + old_gc_time = max( + jvm_stats_end["old_gc_time"] - jvm_stats_start["old_gc_time"], 0 + ) + old_gc_count = max( + jvm_stats_end["old_gc_count"] - jvm_stats_start["old_gc_count"], 0 + ) total_young_gen_collection_time += young_gc_time total_young_gen_collection_count += young_gc_count total_old_gen_collection_time += old_gc_time total_old_gen_collection_count += old_gc_count - self.metrics_store.put_value_node_level(node_name, "node_young_gen_gc_time", young_gc_time, "ms") - self.metrics_store.put_value_node_level(node_name, "node_young_gen_gc_count", young_gc_count) - self.metrics_store.put_value_node_level(node_name, "node_old_gen_gc_time", old_gc_time, "ms") - self.metrics_store.put_value_node_level(node_name, "node_old_gen_gc_count", old_gc_count) + self.metrics_store.put_value_node_level( + node_name, "node_young_gen_gc_time", young_gc_time, "ms" + ) + self.metrics_store.put_value_node_level( + node_name, "node_young_gen_gc_count", young_gc_count + ) + self.metrics_store.put_value_node_level( + node_name, "node_old_gen_gc_time", old_gc_time, "ms" + ) + self.metrics_store.put_value_node_level( + node_name, "node_old_gen_gc_count", old_gc_count + ) - all_pool_stats = { - "name": "jvm_memory_pool_stats" - } + all_pool_stats = {"name": "jvm_memory_pool_stats"} for pool_name, pool_stats in jvm_stats_end["pools"].items(): all_pool_stats[pool_name] = { "peak_usage": pool_stats["peak"], - "unit": "byte" + "unit": "byte", } - self.metrics_store.put_doc(all_pool_stats, level=MetaInfoScope.node, node_name=node_name) + self.metrics_store.put_doc( + all_pool_stats, level=MetaInfoScope.node, node_name=node_name + ) else: - self.logger.warning("Cannot determine JVM stats for [%s] (not in the cluster at the start of the benchmark).", node_name) + self.logger.warning( + "Cannot determine JVM stats for [%s] (not in the cluster at the start of the benchmark).", + node_name, + ) - self.metrics_store.put_value_cluster_level("node_total_young_gen_gc_time", total_young_gen_collection_time, "ms") - self.metrics_store.put_value_cluster_level("node_total_young_gen_gc_count", total_young_gen_collection_count) - self.metrics_store.put_value_cluster_level("node_total_old_gen_gc_time", total_old_gen_collection_time, "ms") - self.metrics_store.put_value_cluster_level("node_total_old_gen_gc_count", total_old_gen_collection_count) + self.metrics_store.put_value_cluster_level( + "node_total_young_gen_gc_time", total_young_gen_collection_time, "ms" + ) + self.metrics_store.put_value_cluster_level( + "node_total_young_gen_gc_count", total_young_gen_collection_count + ) + self.metrics_store.put_value_cluster_level( + "node_total_old_gen_gc_time", total_old_gen_collection_time, "ms" + ) + self.metrics_store.put_value_cluster_level( + "node_total_old_gen_gc_count", total_old_gen_collection_count + ) self.jvm_stats_per_node = None @@ -1450,7 +1834,7 @@ def jvm_stats(self): "young_gc_count": young_gen_collection_count, "old_gc_time": old_gen_collection_time, "old_gc_count": old_gen_collection_count, - "pools": {} + "pools": {}, } pool_usage = node["jvm"]["mem"]["pools"] for pool_name, pool_stats in pool_usage.items(): @@ -1464,6 +1848,7 @@ class IndexStats(InternalTelemetryDevice): """ Gathers statistics via the OpenSearch index stats API """ + def __init__(self, client, metrics_store): super().__init__() self.client = client @@ -1479,9 +1864,14 @@ def on_benchmark_start(self): n = t["name"] v = t["value"] if t["value"] > 0: - console.warn("%s is %d ms indicating that the cluster is not in a defined clean state. Recorded index time " - "metrics may be misleading." % (n, v), logger=self.logger) + console.warn( + "%s is %d ms indicating that the cluster is not in a defined clean state. Recorded index time " + "metrics may be misleading." % (n, v), + logger=self.logger, + ) self.first_time = False + console.println("") + def on_benchmark_stop(self): self.logger.info("Gathering indices stats for all primaries on benchmark stop.") @@ -1493,7 +1883,11 @@ def on_benchmark_stop(self): p = index_stats["_all"]["primaries"] # actually this is add_count self.add_metrics(self.extract_value(p, ["segments", "count"]), "segments_count") - self.add_metrics(self.extract_value(p, ["segments", "memory_in_bytes"]), "segments_memory_in_bytes", "byte") + self.add_metrics( + self.extract_value(p, ["segments", "memory_in_bytes"]), + "segments_memory_in_bytes", + "byte", + ) for t in self.index_times(index_stats): self.metrics_store.put_doc(doc=t, level=metrics.MetaInfoScope.cluster) @@ -1501,14 +1895,45 @@ def on_benchmark_stop(self): for ct in self.index_counts(index_stats): self.metrics_store.put_doc(doc=ct, level=metrics.MetaInfoScope.cluster) - self.add_metrics(self.extract_value(p, ["segments", "doc_values_memory_in_bytes"]), "segments_doc_values_memory_in_bytes", "byte") - self.add_metrics(self.extract_value(p, ["segments", "stored_fields_memory_in_bytes"]), "segments_stored_fields_memory_in_bytes", - "byte") - self.add_metrics(self.extract_value(p, ["segments", "terms_memory_in_bytes"]), "segments_terms_memory_in_bytes", "byte") - self.add_metrics(self.extract_value(p, ["segments", "norms_memory_in_bytes"]), "segments_norms_memory_in_bytes", "byte") - self.add_metrics(self.extract_value(p, ["segments", "points_memory_in_bytes"]), "segments_points_memory_in_bytes", "byte") - self.add_metrics(self.extract_value(index_stats, ["_all", "total", "store", "size_in_bytes"]), "store_size_in_bytes", "byte") - self.add_metrics(self.extract_value(index_stats, ["_all", "total", "translog", "size_in_bytes"]), "translog_size_in_bytes", "byte") + self.add_metrics( + self.extract_value(p, ["segments", "doc_values_memory_in_bytes"]), + "segments_doc_values_memory_in_bytes", + "byte", + ) + self.add_metrics( + self.extract_value(p, ["segments", "stored_fields_memory_in_bytes"]), + "segments_stored_fields_memory_in_bytes", + "byte", + ) + self.add_metrics( + self.extract_value(p, ["segments", "terms_memory_in_bytes"]), + "segments_terms_memory_in_bytes", + "byte", + ) + self.add_metrics( + self.extract_value(p, ["segments", "norms_memory_in_bytes"]), + "segments_norms_memory_in_bytes", + "byte", + ) + self.add_metrics( + self.extract_value(p, ["segments", "points_memory_in_bytes"]), + "segments_points_memory_in_bytes", + "byte", + ) + self.add_metrics( + self.extract_value( + index_stats, ["_all", "total", "store", "size_in_bytes"] + ), + "store_size_in_bytes", + "byte", + ) + self.add_metrics( + self.extract_value( + index_stats, ["_all", "total", "translog", "size_in_bytes"] + ), + "translog_size_in_bytes", + "byte", + ) def index_stats(self): # noinspection PyBroadException @@ -1520,16 +1945,54 @@ def index_stats(self): def index_times(self, stats, per_shard_stats=True): times = [] - self.index_time(times, stats, "merges_total_time", ["merges", "total_time_in_millis"], per_shard_stats) - self.index_time(times, stats, "merges_total_throttled_time", ["merges", "total_throttled_time_in_millis"], per_shard_stats) - self.index_time(times, stats, "indexing_total_time", ["indexing", "index_time_in_millis"], per_shard_stats) - self.index_time(times, stats, "indexing_throttle_time", ["indexing", "throttle_time_in_millis"], per_shard_stats) - self.index_time(times, stats, "refresh_total_time", ["refresh", "total_time_in_millis"], per_shard_stats) - self.index_time(times, stats, "flush_total_time", ["flush", "total_time_in_millis"], per_shard_stats) + self.index_time( + times, + stats, + "merges_total_time", + ["merges", "total_time_in_millis"], + per_shard_stats, + ) + self.index_time( + times, + stats, + "merges_total_throttled_time", + ["merges", "total_throttled_time_in_millis"], + per_shard_stats, + ) + self.index_time( + times, + stats, + "indexing_total_time", + ["indexing", "index_time_in_millis"], + per_shard_stats, + ) + self.index_time( + times, + stats, + "indexing_throttle_time", + ["indexing", "throttle_time_in_millis"], + per_shard_stats, + ) + self.index_time( + times, + stats, + "refresh_total_time", + ["refresh", "total_time_in_millis"], + per_shard_stats, + ) + self.index_time( + times, + stats, + "flush_total_time", + ["flush", "total_time_in_millis"], + per_shard_stats, + ) return times def index_time(self, values, stats, name, path, per_shard_stats): - primary_total_stats = self.extract_value(stats, ["_all", "primaries"], default_value={}) + primary_total_stats = self.extract_value( + stats, ["_all", "primaries"], default_value={} + ) value = self.extract_value(primary_total_stats, path) if value is not None: doc = { @@ -1549,13 +2012,12 @@ def index_counts(self, stats): return counts def index_count(self, values, stats, name, path): - primary_total_stats = self.extract_value(stats, ["_all", "primaries"], default_value={}) + primary_total_stats = self.extract_value( + stats, ["_all", "primaries"], default_value={} + ) value = self.extract_value(primary_total_stats, path) if value is not None: - doc = { - "name": name, - "value": value - } + doc = {"name": name, "value": value} values.append(doc) def primary_shard_stats(self, stats, path): @@ -1565,9 +2027,13 @@ def primary_shard_stats(self, stats, path): for shard in shards["shards"].values(): for shard_metrics in shard: if shard_metrics["routing"]["primary"]: - shard_stats.append(self.extract_value(shard_metrics, path, default_value=0)) + shard_stats.append( + self.extract_value(shard_metrics, path, default_value=0) + ) except KeyError: - self.logger.warning("Could not determine primary shard stats at path [%s].", ",".join(path)) + self.logger.warning( + "Could not determine primary shard stats at path [%s].", ",".join(path) + ) return shard_stats def add_metrics(self, value, metric_key, unit=None): @@ -1584,7 +2050,11 @@ def extract_value(self, primaries, path, default_value=None): value = value[k] return value except KeyError: - self.logger.warning("Could not determine value at path [%s]. Returning default value [%s]", ",".join(path), str(default_value)) + self.logger.warning( + "Could not determine value at path [%s]. Returning default value [%s]", + ",".join(path), + str(default_value), + ) return default_value @@ -1596,37 +2066,29 @@ def __init__(self, client, metrics_store): def on_benchmark_stop(self): try: - results = self.client.search(index=".ml-anomalies-*", body={ - "size": 0, - "query": { - "bool": { - "must": [ - {"term": {"result_type": "bucket"}} - ] - } - }, - "aggs": { - "jobs": { - "terms": { - "field": "job_id" - }, - "aggs": { - "min_pt": { - "min": {"field": "processing_time_ms"} + results = self.client.search( + index=".ml-anomalies-*", + body={ + "size": 0, + "query": {"bool": {"must": [{"term": {"result_type": "bucket"}}]}}, + "aggs": { + "jobs": { + "terms": {"field": "job_id"}, + "aggs": { + "min_pt": {"min": {"field": "processing_time_ms"}}, + "max_pt": {"max": {"field": "processing_time_ms"}}, + "mean_pt": {"avg": {"field": "processing_time_ms"}}, + "median_pt": { + "percentiles": { + "field": "processing_time_ms", + "percents": [50], + } + }, }, - "max_pt": { - "max": {"field": "processing_time_ms"} - }, - "mean_pt": { - "avg": {"field": "processing_time_ms"} - }, - "median_pt": { - "percentiles": {"field": "processing_time_ms", "percents": [50]} - } } - } - } - }) + }, + }, + ) except opensearchpy.TransportError: self.logger.exception("Could not retrieve ML bucket processing time.") return @@ -1640,7 +2102,9 @@ def on_benchmark_stop(self): ml_job_stats["median"] = job["median_pt"]["values"]["50.0"] ml_job_stats["max"] = job["max_pt"]["value"] ml_job_stats["unit"] = "ms" - self.metrics_store.put_doc(doc=dict(ml_job_stats), level=MetaInfoScope.cluster) + self.metrics_store.put_doc( + doc=dict(ml_job_stats), level=MetaInfoScope.cluster + ) except KeyError: # no ML running pass @@ -1650,6 +2114,7 @@ class IndexSize(InternalTelemetryDevice): """ Measures the final size of the index """ + def __init__(self, data_paths): super().__init__() self.data_paths = data_paths @@ -1670,7 +2135,10 @@ def detach_from_node(self, node, running): def store_system_metrics(self, node, metrics_store): if self.index_size_bytes: - metrics_store.put_value_node_level(node.node_name, "final_index_size_bytes", self.index_size_bytes, "byte") + metrics_store.put_value_node_level( + node.node_name, "final_index_size_bytes", self.index_size_bytes, "byte" + ) + class SegmentReplicationStats(TelemetryDevice): internal = False @@ -1712,13 +2180,18 @@ def __init__(self, telemetry_params, clients, metrics_store): self.telemetry_params = telemetry_params self.clients = clients - self.sample_interval = telemetry_params.get("segment-replication-stats-sample-interval", 1) + self.sample_interval = telemetry_params.get( + "segment-replication-stats-sample-interval", 1 + ) if self.sample_interval <= 0: raise exceptions.SystemSetupError( f"The telemetry parameter 'segment-replication-stats-sample-interval' must be greater than zero " - f"but was [{self.sample_interval}].") + f"but was [{self.sample_interval}]." + ) self.specified_cluster_names = self.clients.keys() - indices_per_cluster = self.telemetry_params.get("segment-replication-stats-indices", None) + indices_per_cluster = self.telemetry_params.get( + "segment-replication-stats-indices", None + ) # allow the user to specify either an index pattern as string or as a JSON object if isinstance(indices_per_cluster, str): self.indices_per_cluster = {opts.TargetHosts.DEFAULT: [indices_per_cluster]} @@ -1731,7 +2204,8 @@ def __init__(self, telemetry_params, clients, metrics_store): raise exceptions.SystemSetupError( f"The telemetry parameter 'segment-replication-stats-indices' must be a JSON Object with keys " f"matching the cluster names [{','.join(sorted(clients.keys()))}] specified in --target-hosts " - f"but it had [{cluster_name}].") + f"but it had [{cluster_name}]." + ) self.specified_cluster_names = self.indices_per_cluster.keys() self.metrics_store = metrics_store @@ -1740,8 +2214,14 @@ def __init__(self, telemetry_params, clients, metrics_store): def on_benchmark_start(self): for cluster_name in self.specified_cluster_names: recorder = SegmentReplicationStatsRecorder( - cluster_name, self.clients[cluster_name], self.metrics_store, self.sample_interval, - self.indices_per_cluster[cluster_name] if self.indices_per_cluster else None) + cluster_name, + self.clients[cluster_name], + self.metrics_store, + self.sample_interval, + self.indices_per_cluster[cluster_name] + if self.indices_per_cluster + else None, + ) sampler = SamplerThread(recorder) self.samplers.append(sampler) sampler.setDaemon(True) @@ -1753,12 +2233,15 @@ def on_benchmark_stop(self): for sampler in self.samplers: sampler.finish() + class SegmentReplicationStatsRecorder: """ Collects and pushes segment replication stats for the specified cluster to the metric store. """ - def __init__(self, cluster_name, client, metrics_store, sample_interval, indices=None): + def __init__( + self, cluster_name, client, metrics_store, sample_interval, indices=None + ): """ :param cluster_name: The cluster_name that the client connects to, as specified in target.hosts. :param client: The OpenSearch client for this cluster. @@ -1789,18 +2272,21 @@ def record(self): try: stats_api_endpoint = "/_cat/segment_replication/" stats = self.client.transport.perform_request( - "GET", stats_api_endpoint + index, params={"time": "ms", "bytes": "b", "format": "JSON"}) + "GET", + stats_api_endpoint + index, + params={"time": "ms", "bytes": "b", "format": "JSON"}, + ) except opensearchpy.TransportError: raise exceptions.BenchmarkError( f"A transport error occurred while collecting segment replication stats on cluster " - f"[{self.cluster_name}]") from None + f"[{self.cluster_name}]" + ) from None # parse the REST API response, each element in the list is a shard for shard_stats in stats: self._push_stats(stats=shard_stats, index=index) def _push_stats(self, stats, index=None): - doc = { "name": "segment-replication-stats", "shard_id": stats["shardId"], @@ -1810,12 +2296,11 @@ def _push_stats(self, stats, index=None): "bytes_behind": int(stats["bytes_behind"]), "current_lag_in_millis": int(stats["current_lag"]), "last_completed_lag_in_millis": int(stats["last_completed_lag"]), - "rejected_requests": int(stats["rejected_requests"]) + "rejected_requests": int(stats["rejected_requests"]), } - meta_data = { - "cluster": self.cluster_name, - "index": index - } + meta_data = {"cluster": self.cluster_name, "index": index} - self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=meta_data) + self.metrics_store.put_doc( + doc, level=MetaInfoScope.cluster, meta_data=meta_data + ) diff --git a/osbenchmark/utils/io.py b/osbenchmark/utils/io.py index 4ef02a62e..d3075ae95 100644 --- a/osbenchmark/utils/io.py +++ b/osbenchmark/utils/io.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -41,6 +41,7 @@ class FileSource: """ FileSource is a wrapper around a plain file which simplifies testing of file I/O calls. """ + def __init__(self, file_name, mode, encoding="utf-8"): self.file_name = file_name self.mode = mode @@ -91,6 +92,7 @@ class MmapSource: """ MmapSource is a wrapper around a memory-mapped file which simplifies testing of file I/O calls. """ + def __init__(self, file_name, mode, encoding="utf-8"): self.file_name = file_name self.mode = mode @@ -151,6 +153,7 @@ class DictStringFileSourceFactory: It is intended for scenarios where multiple files may be read by client code. """ + def __init__(self, name_to_contents): self.name_to_contents = name_to_contents @@ -163,6 +166,7 @@ class StringAsFileSource: Implementation of ``FileSource`` intended for tests. It's kept close to ``FileSource`` to simplify maintenance but it is not meant to be used in production code. """ + def __init__(self, contents, mode, encoding="utf-8"): """ :param contents: The file contents as an array of strings. Each item in the array should correspond to one line. @@ -240,7 +244,10 @@ def _zipdir(source_directory, archive): for file in files: archive.write( filename=os.path.join(root, file), - arcname=os.path.relpath(os.path.join(root, file), os.path.join(source_directory, ".."))) + arcname=os.path.relpath( + os.path.join(root, file), os.path.join(source_directory, "..") + ), + ) def is_archive(name): @@ -296,38 +303,66 @@ def decompress(zip_name, target_directory): elif extension == ".bz2": decompressor_args = ["pbzip2", "-d", "-k", "-m10000", "-c"] decompressor_lib = bz2.open - _do_decompress_manually(target_directory, zip_name, decompressor_args, decompressor_lib) + _do_decompress_manually( + target_directory, zip_name, decompressor_args, decompressor_lib + ) elif extension == ".gz": decompressor_args = ["pigz", "-d", "-k", "-c"] decompressor_lib = gzip.open - _do_decompress_manually(target_directory, zip_name, decompressor_args, decompressor_lib) + _do_decompress_manually( + target_directory, zip_name, decompressor_args, decompressor_lib + ) elif extension in [".tar", ".tar.gz", ".tgz", ".tar.bz2"]: _do_decompress(target_directory, tarfile.open(zip_name)) else: - raise RuntimeError("Unsupported file extension [%s]. Cannot decompress [%s]" % (extension, zip_name)) + raise RuntimeError( + "Unsupported file extension [%s]. Cannot decompress [%s]" + % (extension, zip_name) + ) -def _do_decompress_manually(target_directory, filename, decompressor_args, decompressor_lib): +def _do_decompress_manually( + target_directory, filename, decompressor_args, decompressor_lib +): decompressor_bin = decompressor_args[0] base_path_without_extension = basename(splitext(filename)[0]) if is_executable(decompressor_bin): - if _do_decompress_manually_external(target_directory, filename, base_path_without_extension, decompressor_args): + if _do_decompress_manually_external( + target_directory, filename, base_path_without_extension, decompressor_args + ): return else: - logging.getLogger(__name__).warning("%s not found in PATH. Using standard library, decompression will take longer.", - decompressor_bin) - - _do_decompress_manually_with_lib(target_directory, filename, decompressor_lib(filename)) - - -def _do_decompress_manually_external(target_directory, filename, base_path_without_extension, decompressor_args): - with open(os.path.join(target_directory, base_path_without_extension), "wb") as new_file: + logging.getLogger(__name__).warning( + "%s not found in PATH. Using standard library, decompression will take longer.", + decompressor_bin, + ) + + _do_decompress_manually_with_lib( + target_directory, filename, decompressor_lib(filename) + ) + + +def _do_decompress_manually_external( + target_directory, filename, base_path_without_extension, decompressor_args +): + with open( + os.path.join(target_directory, base_path_without_extension), "wb" + ) as new_file: try: - subprocess.run(decompressor_args + [filename], stdout=new_file, stderr=subprocess.PIPE, check=True) + subprocess.run( + decompressor_args + [filename], + stdout=new_file, + stderr=subprocess.PIPE, + check=True, + ) except subprocess.CalledProcessError as err: - logging.getLogger(__name__).warning("Failed to decompress [%s] with [%s]. Error [%s]. Falling back to standard library.", - filename, err.cmd, err.stderr) + logging.getLogger(__name__).warning( + "Failed to decompress [%s] with [%s]. Error [%s]. Falling back to standard library.", + filename, + err.cmd, + err.stderr, + ) return False return True @@ -337,7 +372,9 @@ def _do_decompress_manually_with_lib(target_directory, filename, compressed_file ensure_dir(target_directory) try: - with open(os.path.join(target_directory, path_without_extension), "wb") as new_file: + with open( + os.path.join(target_directory, path_without_extension), "wb" + ) as new_file: for data in iter(lambda: compressed_file.read(100 * 1024), b""): new_file.write(data) finally: @@ -348,7 +385,9 @@ def _do_decompress(target_directory, compressed_file): try: compressed_file.extractall(path=target_directory) except BaseException: - raise RuntimeError("Could not decompress provided archive [%s]" % compressed_file.filename) + raise RuntimeError( + "Could not decompress provided archive [%s]" % compressed_file.filename + ) finally: compressed_file.close() @@ -422,6 +461,7 @@ class FileOffsetTable: The FileOffsetTable represents a persistent mapping from lines in a data file to their offset in bytes in the data file. This helps bulk-indexing clients to advance quickly to a certain position in a large data file. """ + def __init__(self, data_file_path, offset_table_path, mode): """ Creates a new FileOffsetTable instance. The constructor should not be called directly but instead the @@ -447,7 +487,9 @@ def is_valid(self): """ :return: True iff the file offset table exists and it is up-to-date. """ - return self.exists() and os.path.getmtime(self.offset_table_path) >= os.path.getmtime(self.data_file_path) + return self.exists() and os.path.getmtime( + self.offset_table_path + ) >= os.path.getmtime(self.data_file_path) def __enter__(self): self.offset_file = open(self.offset_table_path, self.mode) @@ -530,7 +572,11 @@ def prepare_file_offset_table(data_file_path): """ file_offset_table = FileOffsetTable.create_for_data_file(data_file_path) if not file_offset_table.is_valid(): - console.info("Preparing file offset table for [%s] ... " % data_file_path, end="", flush=True) + console.info( + "Preparing file offset table for [%s] ... " % data_file_path, + end="\n", + flush=True, + ) line_number = 0 with file_offset_table: with open(data_file_path, mode="rt", encoding="utf-8") as data_file: @@ -541,7 +587,7 @@ def prepare_file_offset_table(data_file_path): line_number += 1 if line_number % 50000 == 0: file_offset_table.add_offset(line_number, data_file.tell()) - console.println("[OK]") + console.print("[OK]") return line_number else: return None @@ -572,7 +618,9 @@ def skip_lines(data_file_path, data_file, number_of_lines_to_skip): # can we fast forward? if file_offset_table.exists(): with file_offset_table: - offset, remaining_lines = file_offset_table.find_closest_offset(number_of_lines_to_skip) + offset, remaining_lines = file_offset_table.find_closest_offset( + number_of_lines_to_skip + ) else: offset = 0 remaining_lines = number_of_lines_to_skip diff --git a/osbenchmark/worker_coordinator/worker_coordinator.py b/osbenchmark/worker_coordinator/worker_coordinator.py index 05ac03535..d0eff9ca6 100644 --- a/osbenchmark/worker_coordinator/worker_coordinator.py +++ b/osbenchmark/worker_coordinator/worker_coordinator.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -32,6 +32,7 @@ import multiprocessing import queue import threading +from tqdm import tqdm from dataclasses import dataclass from typing import Callable @@ -40,9 +41,23 @@ import thespian.actors -from osbenchmark import actor, config, exceptions, metrics, workload, client, paths, PROGRAM_NAME, telemetry +from osbenchmark import ( + actor, + config, + exceptions, + metrics, + workload, + client, + paths, + PROGRAM_NAME, + telemetry, +) from osbenchmark.worker_coordinator import runner, scheduler -from osbenchmark.workload import WorkloadProcessorRegistry, load_workload, load_workload_plugins +from osbenchmark.workload import ( + WorkloadProcessorRegistry, + load_workload, + load_workload_plugins, +) from osbenchmark.utils import convert, console, net @@ -74,6 +89,7 @@ class PrepareWorkload: Initiates preparation of a workload. """ + def __init__(self, cfg, workload): """ :param cfg: Benchmark internal configuration object. @@ -104,6 +120,7 @@ class WorkerTask: """ Unit of work that should be completed by the low-level TaskExecutionActor """ + func: Callable params: dict @@ -220,22 +237,36 @@ def __init__(self): self.cluster_details = None def receiveMsg_PoisonMessage(self, poisonmsg, sender): - self.logger.error("Main worker_coordinator received a fatal indication from load generator (%s). Shutting down.", poisonmsg.details) + self.logger.error( + "Main worker_coordinator received a fatal indication from load generator (%s). Shutting down.", + poisonmsg.details, + ) self.coordinator.close() - self.send(self.start_sender, actor.BenchmarkFailure("Fatal workload or load generator indication", poisonmsg.details)) + self.send( + self.start_sender, + actor.BenchmarkFailure( + "Fatal workload or load generator indication", poisonmsg.details + ), + ) def receiveMsg_BenchmarkFailure(self, msg, sender): - self.logger.error("Main worker_coordinator received a fatal exception from load generator. Shutting down.") + self.logger.error( + "Main worker_coordinator received a fatal exception from load generator. Shutting down." + ) self.coordinator.close() self.send(self.start_sender, msg) def receiveMsg_BenchmarkCancelled(self, msg, sender): - self.logger.info("Main worker_coordinator received a notification that the benchmark has been cancelled.") + self.logger.info( + "Main worker_coordinator received a notification that the benchmark has been cancelled." + ) self.coordinator.close() self.send(self.start_sender, msg) def receiveMsg_ActorExitRequest(self, msg, sender): - self.logger.info("Main worker_coordinator received ActorExitRequest and will terminate all load generators.") + self.logger.info( + "Main worker_coordinator received ActorExitRequest and will terminate all load generators." + ) self.status = "exiting" def receiveMsg_ChildActorExited(self, msg, sender): @@ -245,13 +276,24 @@ def receiveMsg_ChildActorExited(self, msg, sender): if self.status == "exiting": self.logger.info("Worker [%d] has exited.", worker_index) else: - self.logger.error("Worker [%d] has exited prematurely. Aborting benchmark.", worker_index) - self.send(self.start_sender, actor.BenchmarkFailure("Worker [{}] has exited prematurely.".format(worker_index))) + self.logger.error( + "Worker [%d] has exited prematurely. Aborting benchmark.", + worker_index, + ) + self.send( + self.start_sender, + actor.BenchmarkFailure( + "Worker [{}] has exited prematurely.".format(worker_index) + ), + ) else: self.logger.info("A workload preparator has exited.") def receiveUnrecognizedMessage(self, msg, sender): - self.logger.info("Main worker_coordinator received unknown message [%s] (ignoring).", str(msg)) + self.logger.info( + "Main worker_coordinator received unknown message [%s] (ignoring).", + str(msg), + ) @actor.no_retry("worker_coordinator") # pylint: disable=no-value-for-parameter def receiveMsg_PrepareBenchmark(self, msg, sender): @@ -263,16 +305,25 @@ def receiveMsg_PrepareBenchmark(self, msg, sender): def receiveMsg_StartBenchmark(self, msg, sender): self.start_sender = sender self.coordinator.start_benchmark() - self.wakeupAfter(datetime.timedelta(seconds=WorkerCoordinatorActor.WAKEUP_INTERVAL_SECONDS)) + self.wakeupAfter( + datetime.timedelta(seconds=WorkerCoordinatorActor.WAKEUP_INTERVAL_SECONDS) + ) @actor.no_retry("worker_coordinator") # pylint: disable=no-value-for-parameter def receiveMsg_WorkloadPrepared(self, msg, sender): - self.transition_when_all_children_responded(sender, msg, - expected_status=None, new_status=None, transition=self._after_workload_prepared) + self.transition_when_all_children_responded( + sender, + msg, + expected_status=None, + new_status=None, + transition=self._after_workload_prepared, + ) @actor.no_retry("worker_coordinator") # pylint: disable=no-value-for-parameter def receiveMsg_JoinPointReached(self, msg, sender): - self.coordinator.joinpoint_reached(msg.worker_id, msg.worker_timestamp, msg.task) + self.coordinator.joinpoint_reached( + msg.worker_id, msg.worker_timestamp, msg.task + ) @actor.no_retry("worker_coordinator") # pylint: disable=no-value-for-parameter def receiveMsg_UpdateSamples(self, msg, sender): @@ -284,17 +335,28 @@ def receiveMsg_WakeupMessage(self, msg, sender): self.coordinator.reset_relative_time() elif not self.coordinator.finished(): self.post_process_timer += WorkerCoordinatorActor.WAKEUP_INTERVAL_SECONDS - if self.post_process_timer >= WorkerCoordinatorActor.POST_PROCESS_INTERVAL_SECONDS: + if ( + self.post_process_timer + >= WorkerCoordinatorActor.POST_PROCESS_INTERVAL_SECONDS + ): self.post_process_timer = 0 self.coordinator.post_process_samples() self.coordinator.update_progress_message() - self.wakeupAfter(datetime.timedelta(seconds=WorkerCoordinatorActor.WAKEUP_INTERVAL_SECONDS)) + self.wakeupAfter( + datetime.timedelta( + seconds=WorkerCoordinatorActor.WAKEUP_INTERVAL_SECONDS + ) + ) def create_client(self, host): - return self.createActor(Worker, targetActorRequirements=self._requirements(host)) + return self.createActor( + Worker, targetActorRequirements=self._requirements(host) + ) def start_worker(self, worker_coordinator, worker_id, cfg, workload, allocations): - self.send(worker_coordinator, StartWorker(worker_id, cfg, workload, allocations)) + self.send( + worker_coordinator, StartWorker(worker_id, cfg, workload, allocations) + ) def drive_at(self, worker_coordinator, client_start_timestamp): self.send(worker_coordinator, Drive(client_start_timestamp)) @@ -304,7 +366,10 @@ def complete_current_task(self, worker_coordinator): def on_task_finished(self, metrics, next_task_scheduled_in): if next_task_scheduled_in > 0: - self.wakeupAfter(datetime.timedelta(seconds=next_task_scheduled_in), payload=WorkerCoordinatorActor.RESET_RELATIVE_TIME_MARKER) + self.wakeupAfter( + datetime.timedelta(seconds=next_task_scheduled_in), + payload=WorkerCoordinatorActor.RESET_RELATIVE_TIME_MARKER, + ) else: self.coordinator.reset_relative_time() self.send(self.start_sender, TaskFinished(metrics, next_task_scheduled_in)) @@ -326,32 +391,44 @@ def prepare_workload(self, hosts, cfg, workload): self.send(child, msg) def _create_workload_preparator(self, host): - return self.createActor(WorkloadPreparationActor, targetActorRequirements=self._requirements(host)) + return self.createActor( + WorkloadPreparationActor, targetActorRequirements=self._requirements(host) + ) def _after_workload_prepared(self): - cluster_version = self.cluster_details["version"] if self.cluster_details else {} + cluster_version = ( + self.cluster_details["version"] if self.cluster_details else {} + ) for child in self.children: self.send(child, thespian.actors.ActorExitRequest()) self.children = [] - self.send(self.start_sender, PreparationComplete( - # older versions (pre 6.3.0) don't expose build_flavor because the only (implicit) flavor was "oss" - cluster_version.get("build_flavor", "oss"), - cluster_version.get("number"), - cluster_version.get("build_hash") - )) + self.send( + self.start_sender, + PreparationComplete( + # older versions (pre 6.3.0) don't expose build_flavor because the only (implicit) flavor was "oss" + cluster_version.get("build_flavor", "oss"), + cluster_version.get("number"), + cluster_version.get("build_hash"), + ), + ) def on_benchmark_complete(self, metrics): self.send(self.start_sender, BenchmarkComplete(metrics)) def load_local_config(coordinator_config): - cfg = config.auto_load_local_config(coordinator_config, additional_sections=[ - # only copy the relevant bits - "workload", "worker_coordinator", "client", - # due to distribution version... - "builder", - "telemetry" - ]) + cfg = config.auto_load_local_config( + coordinator_config, + additional_sections=[ + # only copy the relevant bits + "workload", + "worker_coordinator", + "client", + # due to distribution version... + "builder", + "telemetry", + ], + ) # set root path (normally done by the main entry point) cfg.add(config.Scope.application, "node", "benchmark.root", paths.benchmark_root()) return cfg @@ -361,6 +438,7 @@ class TaskExecutionActor(actor.BenchmarkActor): """ This class should be used for long-running tasks, as it ensures they do not block the actor's messaging system """ + def __init__(self): super().__init__() self.pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) @@ -385,8 +463,10 @@ def receiveMsg_StartTaskLoop(self, msg, sender): def receiveMsg_DoTask(self, msg, sender): # actor can arbitrarily execute code based on these messages. if anyone besides our parent sends a task, ignore if sender != self.parent: - msg = f"TaskExecutionActor expected message from [{self.parent}] but the received the following from " \ - f"[{sender}]: {vars(msg)}" + msg = ( + f"TaskExecutionActor expected message from [{self.parent}] but the received the following from " + f"[{sender}]: {vars(msg)}" + ) raise exceptions.BenchmarkError(msg) task = msg.task if self.executor_future is not None: @@ -408,7 +488,10 @@ def receiveMsg_WakeupMessage(self, msg, sender): self.logger.exception("Worker failed. Notifying parent...", exc_info=e) # the exception might be user-defined and not be on the load path of the original sender. Hence, it # cannot be deserialized on the receiver so we convert it here to a plain string. - self.send(self.parent, actor.BenchmarkFailure("Error in task executor", str(e))) + self.send( + self.parent, + actor.BenchmarkFailure("Error in task executor", str(e)), + ) else: self.executor_future = None self.send(self.parent, ReadyForWork()) @@ -419,6 +502,7 @@ def receiveMsg_BenchmarkFailure(self, msg, sender): # sent by our no_retry infrastructure; forward to master self.send(self.parent, msg) + class WorkloadPreparationActor(actor.BenchmarkActor): class Status(Enum): INITIALIZING = "initializing" @@ -438,8 +522,16 @@ def __init__(self): self.workload = None def receiveMsg_PoisonMessage(self, poisonmsg, sender): - self.logger.error("Workload Preparator received a fatal indication from a load generator (%s). Shutting down.", poisonmsg.details) - self.send(self.original_sender, actor.BenchmarkFailure("Fatal workload preparation indication", poisonmsg.details)) + self.logger.error( + "Workload Preparator received a fatal indication from a load generator (%s). Shutting down.", + poisonmsg.details, + ) + self.send( + self.original_sender, + actor.BenchmarkFailure( + "Fatal workload preparation indication", poisonmsg.details + ), + ) @actor.no_retry("workload preparator") # pylint: disable=no-value-for-parameter def receiveMsg_ActorExitRequest(self, msg, sender): @@ -461,31 +553,52 @@ def receiveMsg_PrepareWorkload(self, msg, sender): tpr = WorkloadProcessorRegistry(self.cfg) self.workload = msg.workload self.logger.info("Preparing workload [%s]", self.workload.name) - self.logger.info("Reloading workload [%s] to ensure plugins are up-to-date.", self.workload.name) + self.logger.info( + "Reloading workload [%s] to ensure plugins are up-to-date.", + self.workload.name, + ) # the workload might have been loaded on a different machine (the coordinator machine) so we force a workload # update to ensure we use the latest version of plugins. load_workload(self.cfg) - load_workload_plugins(self.cfg, self.workload.name, register_workload_processor=tpr.register_workload_processor, - force_update=True) + load_workload_plugins( + self.cfg, + self.workload.name, + register_workload_processor=tpr.register_workload_processor, + force_update=True, + ) # we expect on_prepare_workload can take a long time. seed a queue of tasks and delegate to child workers - self.children = [self._create_task_executor() for _ in range(num_cores(self.cfg))] + self.children = [ + self._create_task_executor() for _ in range(num_cores(self.cfg)) + ] for processor in tpr.processors: self.processors.put(processor) self._seed_tasks(self.processors.get()) - self.send_to_children_and_transition(self, StartTaskLoop(self.workload.name, self.cfg), self.Status.INITIALIZING, - self.Status.PROCESSOR_RUNNING) + self.send_to_children_and_transition( + self, + StartTaskLoop(self.workload.name, self.cfg), + self.Status.INITIALIZING, + self.Status.PROCESSOR_RUNNING, + ) def resume(self): if not self.processors.empty(): self._seed_tasks(self.processors.get()) - self.send_to_children_and_transition(self, StartTaskLoop(self.workload.name, self.cfg), self.Status.PROCESSOR_COMPLETE, - self.Status.PROCESSOR_RUNNING) + self.send_to_children_and_transition( + self, + StartTaskLoop(self.workload.name, self.cfg), + self.Status.PROCESSOR_COMPLETE, + self.Status.PROCESSOR_RUNNING, + ) else: self.send(self.original_sender, WorkloadPrepared()) def _seed_tasks(self, processor): - self.tasks = list(WorkerTask(func, params) for func, params in - processor.on_prepare_workload(self.workload, self.data_root_dir)) + self.tasks = list( + WorkerTask(func, params) + for func, params in processor.on_prepare_workload( + self.workload, self.data_root_dir + ) + ) def _create_task_executor(self): return self.createActor(TaskExecutionActor) @@ -502,13 +615,24 @@ def receiveMsg_ReadyForWork(self, msg, sender): @actor.no_retry("workload preparator") # pylint: disable=no-value-for-parameter def receiveMsg_WorkerIdle(self, msg, sender): - self.transition_when_all_children_responded(sender, msg, self.Status.PROCESSOR_RUNNING, - self.Status.PROCESSOR_COMPLETE, self.resume) + self.transition_when_all_children_responded( + sender, + msg, + self.Status.PROCESSOR_RUNNING, + self.Status.PROCESSOR_COMPLETE, + self.resume, + ) def num_cores(cfg): - return int(cfg.opts("system", "available.cores", mandatory=False, - default_value=multiprocessing.cpu_count())) + return int( + cfg.opts( + "system", + "available.cores", + mandatory=False, + default_value=multiprocessing.cpu_count(), + ) + ) class WorkerCoordinator: @@ -533,7 +657,8 @@ def __init__(self, target, config, os_client_factory_class=client.OsClientFactor # which client ids are assigned to which workers? self.clients_per_worker = {} - self.progress_results_publisher = console.progress() + self.progress_bar = None + self.progress_bar_initalized = None self.progress_counter = 0 self.quiet = False self.allocations = None @@ -554,11 +679,15 @@ def create_os_clients(self): all_hosts = self.config.opts("client", "hosts").all_hosts opensearch = {} for cluster_name, cluster_hosts in all_hosts.items(): - all_client_options = self.config.opts("client", "options").all_client_options + all_client_options = self.config.opts( + "client", "options" + ).all_client_options cluster_client_options = dict(all_client_options[cluster_name]) # Use retries to avoid aborts on long living connections for telemetry devices cluster_client_options["retry-on-timeout"] = True - opensearch[cluster_name] = self.os_client_factory(cluster_hosts, cluster_client_options).create() + opensearch[cluster_name] = self.os_client_factory( + cluster_hosts, cluster_client_options + ).create() return opensearch def prepare_telemetry(self, opensearch, enable): @@ -578,10 +707,18 @@ def prepare_telemetry(self, opensearch, enable): telemetry.MlBucketProcessingTime(os_default, self.metrics_store), telemetry.SegmentStats(log_root, os_default), telemetry.CcrStats(telemetry_params, opensearch, self.metrics_store), - telemetry.RecoveryStats(telemetry_params, opensearch, self.metrics_store), - telemetry.TransformStats(telemetry_params, opensearch, self.metrics_store), - telemetry.SearchableSnapshotsStats(telemetry_params, opensearch, self.metrics_store), - telemetry.SegmentReplicationStats(telemetry_params, opensearch, self.metrics_store) + telemetry.RecoveryStats( + telemetry_params, opensearch, self.metrics_store + ), + telemetry.TransformStats( + telemetry_params, opensearch, self.metrics_store + ), + telemetry.SearchableSnapshotsStats( + telemetry_params, opensearch, self.metrics_store + ), + telemetry.SegmentReplicationStats( + telemetry_params, opensearch, self.metrics_store + ), ] else: devices = [] @@ -593,8 +730,12 @@ def wait_for_rest_api(self, opensearch): if client.wait_for_rest_layer(os_default, max_attempts=40): self.logger.info("REST API is available.") else: - self.logger.error("REST API layer is not yet available. Stopping benchmark.") - raise exceptions.SystemSetupError("OpenSearch REST API layer is not available.") + self.logger.error( + "REST API layer is not yet available. Stopping benchmark." + ) + raise exceptions.SystemSetupError( + "OpenSearch REST API layer is not available." + ) def retrieve_cluster_info(self, opensearch): try: @@ -606,31 +747,46 @@ def retrieve_cluster_info(self, opensearch): def prepare_benchmark(self, t): self.workload = t self.test_procedure = select_test_procedure(self.config, self.workload) - self.quiet = self.config.opts("system", "quiet.mode", mandatory=False, default_value=False) - downsample_factor = int(self.config.opts( - "results_publishing", "metrics.request.downsample.factor", - mandatory=False, default_value=1)) - self.metrics_store = metrics.metrics_store(cfg=self.config, - workload=self.workload.name, - test_procedure=self.test_procedure.name, - read_only=False) - - self.sample_post_processor = SamplePostprocessor(self.metrics_store, - downsample_factor, - self.workload.meta_data, - self.test_procedure.meta_data) + self.quiet = self.config.opts( + "system", "quiet.mode", mandatory=False, default_value=False + ) + downsample_factor = int( + self.config.opts( + "results_publishing", + "metrics.request.downsample.factor", + mandatory=False, + default_value=1, + ) + ) + self.metrics_store = metrics.metrics_store( + cfg=self.config, + workload=self.workload.name, + test_procedure=self.test_procedure.name, + read_only=False, + ) + + self.sample_post_processor = SamplePostprocessor( + self.metrics_store, + downsample_factor, + self.workload.meta_data, + self.test_procedure.meta_data, + ) os_clients = self.create_os_clients() skip_rest_api_check = self.config.opts("builder", "skip.rest.api.check") - uses_static_responses = self.config.opts("client", "options").uses_static_responses + uses_static_responses = self.config.opts( + "client", "options" + ).uses_static_responses if skip_rest_api_check: self.logger.info("Skipping REST API check as requested explicitly.") elif uses_static_responses: self.logger.info("Skipping REST API check as static responses are used.") else: self.wait_for_rest_api(os_clients) - self.target.on_cluster_details_retrieved(self.retrieve_cluster_info(os_clients)) + self.target.on_cluster_details_retrieved( + self.retrieve_cluster_info(os_clients) + ) # Avoid issuing any requests to the target cluster when static responses are enabled. The results # are not useful and attempts to connect to a non-existing cluster just lead to exception traces in logs. @@ -648,7 +804,9 @@ def prepare_benchmark(self, t): self.worker_ips.append(host_config) - self.target.prepare_workload([h["host"] for h in self.worker_ips], self.config, self.workload) + self.target.prepare_workload( + [h["host"] for h in self.worker_ips], self.config, self.workload + ) def start_benchmark(self): self.logger.info("Benchmark is about to start.") @@ -663,27 +821,45 @@ def start_benchmark(self): self.number_of_steps = len(allocator.join_points) - 1 self.tasks_per_join_point = allocator.tasks_per_joinpoint - self.logger.info("Benchmark consists of [%d] steps executed by [%d] clients.", - self.number_of_steps, len(self.allocations)) + self.logger.info( + "Benchmark consists of [%d] steps executed by [%d] clients.", + self.number_of_steps, + len(self.allocations), + ) # avoid flooding the log if there are too many clients if allocator.clients < 128: - self.logger.info("Allocation matrix:\n%s", "\n".join([str(a) for a in self.allocations])) + self.logger.info( + "Allocation matrix:\n%s", "\n".join([str(a) for a in self.allocations]) + ) - worker_assignments = calculate_worker_assignments(self.worker_ips, allocator.clients) + worker_assignments = calculate_worker_assignments( + self.worker_ips, allocator.clients + ) worker_id = 0 for assignment in worker_assignments: host = assignment["host"] for clients in assignment["workers"]: # don't assign workers without any clients if len(clients) > 0: - self.logger.info("Allocating worker [%d] on [%s] with [%d] clients.", worker_id, host, len(clients)) + self.logger.info( + "Allocating worker [%d] on [%s] with [%d] clients.", + worker_id, + host, + len(clients), + ) worker = self.target.create_client(host) client_allocations = ClientAllocations() for client_id in clients: client_allocations.add(client_id, self.allocations[client_id]) self.clients_per_worker[client_id] = worker_id - self.target.start_worker(worker, worker_id, self.config, self.workload, client_allocations) + self.target.start_worker( + worker, + worker_id, + self.config, + self.workload, + client_allocations, + ) self.workers.append(worker) worker_id += 1 @@ -691,11 +867,23 @@ def start_benchmark(self): def joinpoint_reached(self, worker_id, worker_local_timestamp, task_allocations): self.currently_completed += 1 - self.workers_completed_current_step[worker_id] = (worker_local_timestamp, time.perf_counter()) - self.logger.info("[%d/%d] workers reached join point [%d/%d].", - self.currently_completed, len(self.workers), self.current_step + 1, self.number_of_steps) + self.workers_completed_current_step[worker_id] = ( + worker_local_timestamp, + time.perf_counter(), + ) + self.logger.info( + "[%d/%d] workers reached join point [%d/%d].", + self.currently_completed, + len(self.workers), + self.current_step + 1, + self.number_of_steps, + ) if self.currently_completed == len(self.workers): - self.logger.info("All workers completed their tasks until join point [%d/%d].", self.current_step + 1, self.number_of_steps) + self.logger.info( + "All workers completed their tasks until join point [%d/%d].", + self.current_step + 1, + self.number_of_steps, + ) # we can go on to the next step self.currently_completed = 0 self.complete_current_task_sent = False @@ -747,21 +935,35 @@ def move_to_next_task(self, workers_curr_step): start_next_task = time.perf_counter() + waiting_period for worker_id, worker in enumerate(self.workers): worker_ended_task_at, master_received_msg_at = workers_curr_step[worker_id] - worker_start_timestamp = worker_ended_task_at + (start_next_task - master_received_msg_at) - self.logger.info("Scheduling next task for worker id [%d] at their timestamp [%f] (master timestamp [%f])", - worker_id, worker_start_timestamp, start_next_task) + worker_start_timestamp = worker_ended_task_at + ( + start_next_task - master_received_msg_at + ) + self.logger.info( + "Scheduling next task for worker id [%d] at their timestamp [%f] (master timestamp [%f])", + worker_id, + worker_start_timestamp, + start_next_task, + ) self.target.drive_at(worker, worker_start_timestamp) def may_complete_current_task(self, task_allocations): - joinpoints_completing_parent = [a for a in task_allocations if a.task.preceding_task_completes_parent] + joinpoints_completing_parent = [ + a for a in task_allocations if a.task.preceding_task_completes_parent + ] # we need to actively send CompleteCurrentTask messages to all remaining workers. - if len(joinpoints_completing_parent) > 0 and not self.complete_current_task_sent: + if ( + len(joinpoints_completing_parent) > 0 + and not self.complete_current_task_sent + ): # while this list could contain multiple items, it should always be the same task (but multiple # different clients) so any item is sufficient. current_join_point = joinpoints_completing_parent[0].task - self.logger.info("Tasks before join point [%s] are able to complete the parent structure. Checking " - "if all [%d] clients have finished yet.", - current_join_point, len(current_join_point.clients_executing_completing_task)) + self.logger.info( + "Tasks before join point [%s] are able to complete the parent structure. Checking " + "if all [%d] clients have finished yet.", + current_join_point, + len(current_join_point.clients_executing_completing_task), + ) pending_client_ids = [] for client_id in current_join_point.clients_executing_completing_task: @@ -775,14 +977,21 @@ def may_complete_current_task(self, task_allocations): # As we are waiting for other clients to finish, we would send this message over and over again. # Hence we need to memorize whether we have already sent it for the current step. self.complete_current_task_sent = True - self.logger.info("All affected clients have finished. Notifying all clients to complete their current tasks.") + self.logger.info( + "All affected clients have finished. Notifying all clients to complete their current tasks." + ) for worker in self.workers: self.target.complete_current_task(worker) else: if len(pending_client_ids) > 32: - self.logger.info("[%d] clients did not yet finish.", len(pending_client_ids)) + self.logger.info( + "[%d] clients did not yet finish.", len(pending_client_ids) + ) else: - self.logger.info("Client id(s) [%s] did not yet finish.", ",".join(map(str, pending_client_ids))) + self.logger.info( + "Client id(s) [%s] did not yet finish.", + ",".join(map(str, pending_client_ids)), + ) def reset_relative_time(self): self.logger.debug("Resetting relative time of request metrics store.") @@ -792,7 +1001,6 @@ def finished(self): return self.current_step == self.number_of_steps def close(self): - self.progress_results_publisher.finish() if self.metrics_store and self.metrics_store.opened: self.metrics_store.close() @@ -804,23 +1012,40 @@ def update_samples(self, samples): self.most_recent_sample_per_client[s.client_id] = s def update_progress_message(self, task_finished=False): - if not self.quiet and self.current_step >= 0: - tasks = ",".join([t.name for t in self.tasks_per_join_point[self.current_step]]) + tasks = ",".join([t.name for t in self.tasks_per_join_point[self.current_step]]) + + if not self.progress_bar_initalized: + self.progress_bar = tqdm(total=100, desc=f"Running {tasks}", unit="%") + self.progress_bar_initalized = True + return + if not self.quiet and self.current_step >= 0: if task_finished: total_progress = 1.0 else: # we only count clients which actually contribute to progress. If clients are executing tasks eternally in a parallel # structure, we should not count them. The reason is that progress depends entirely on the client(s) that execute the # task that is completing the parallel structure. - progress_per_client = [s.percent_completed - for s in self.most_recent_sample_per_client.values() if s.percent_completed is not None] + progress_per_client = [ + s.percent_completed + for s in self.most_recent_sample_per_client.values() + if s.percent_completed is not None + ] num_clients = max(len(progress_per_client), 1) total_progress = sum(progress_per_client) / num_clients - self.progress_results_publisher.print("Running %s" % tasks, "[%3d%% done]" % (round(total_progress * 100))) + + increment = float(round(total_progress * 100)) - float( + self.progress_counter + ) + self.progress_bar.update(increment) + + self.progress_counter = float(round(total_progress * 100)) + if task_finished: - self.progress_results_publisher.finish() + self.progress_bar.close() + self.progress_counter = 0 + self.progress_bar_initalized = False def post_process_samples(self): # we do *not* do this here to avoid concurrent updates (actors are single-threaded) but rather to make it clear that we use @@ -831,7 +1056,13 @@ def post_process_samples(self): class SamplePostprocessor: - def __init__(self, metrics_store, downsample_factor, workload_meta_data, test_procedure_meta_data): + def __init__( + self, + metrics_store, + downsample_factor, + workload_meta_data, + test_procedure_meta_data, + ): self.logger = logging.getLogger(__name__) self.metrics_store = metrics_store self.workload_meta_data = workload_meta_data @@ -853,35 +1084,66 @@ def __call__(self, raw_samples): self.test_procedure_meta_data, sample.operation_meta_data, sample.task.meta_data, - sample.request_meta_data) - - self.metrics_store.put_value_cluster_level(name="latency", value=convert.seconds_to_ms(sample.latency), - unit="ms", task=sample.task.name, - operation=sample.operation_name, operation_type=sample.operation_type, - sample_type=sample.sample_type, absolute_time=sample.absolute_time, - relative_time=sample.relative_time, meta_data=meta_data) - - self.metrics_store.put_value_cluster_level(name="service_time", value=convert.seconds_to_ms(sample.service_time), - unit="ms", task=sample.task.name, - operation=sample.operation_name, operation_type=sample.operation_type, - sample_type=sample.sample_type, absolute_time=sample.absolute_time, - relative_time=sample.relative_time, meta_data=meta_data) - - self.metrics_store.put_value_cluster_level(name="processing_time", value=convert.seconds_to_ms(sample.processing_time), - unit="ms", task=sample.task.name, - operation=sample.operation_name, operation_type=sample.operation_type, - sample_type=sample.sample_type, absolute_time=sample.absolute_time, - relative_time=sample.relative_time, meta_data=meta_data) + sample.request_meta_data, + ) + + self.metrics_store.put_value_cluster_level( + name="latency", + value=convert.seconds_to_ms(sample.latency), + unit="ms", + task=sample.task.name, + operation=sample.operation_name, + operation_type=sample.operation_type, + sample_type=sample.sample_type, + absolute_time=sample.absolute_time, + relative_time=sample.relative_time, + meta_data=meta_data, + ) + + self.metrics_store.put_value_cluster_level( + name="service_time", + value=convert.seconds_to_ms(sample.service_time), + unit="ms", + task=sample.task.name, + operation=sample.operation_name, + operation_type=sample.operation_type, + sample_type=sample.sample_type, + absolute_time=sample.absolute_time, + relative_time=sample.relative_time, + meta_data=meta_data, + ) + + self.metrics_store.put_value_cluster_level( + name="processing_time", + value=convert.seconds_to_ms(sample.processing_time), + unit="ms", + task=sample.task.name, + operation=sample.operation_name, + operation_type=sample.operation_type, + sample_type=sample.sample_type, + absolute_time=sample.absolute_time, + relative_time=sample.relative_time, + meta_data=meta_data, + ) for timing in sample.dependent_timings: - self.metrics_store.put_value_cluster_level(name="service_time", value=convert.seconds_to_ms(timing.service_time), - unit="ms", task=timing.task.name, - operation=timing.operation_name, operation_type=timing.operation_type, - sample_type=timing.sample_type, absolute_time=timing.absolute_time, - relative_time=timing.relative_time, meta_data=meta_data) + self.metrics_store.put_value_cluster_level( + name="service_time", + value=convert.seconds_to_ms(timing.service_time), + unit="ms", + task=timing.task.name, + operation=timing.operation_name, + operation_type=timing.operation_type, + sample_type=timing.sample_type, + absolute_time=timing.absolute_time, + relative_time=timing.relative_time, + meta_data=meta_data, + ) end = time.perf_counter() - self.logger.debug("Storing latency and service time took [%f] seconds.", (end - start)) + self.logger.debug( + "Storing latency and service time took [%f] seconds.", (end - start) + ) start = end aggregates = self.throughput_calculator.calculate(raw_samples) end = time.perf_counter() @@ -892,13 +1154,27 @@ def __call__(self, raw_samples): self.workload_meta_data, self.test_procedure_meta_data, task.operation.meta_data, - task.meta_data + task.meta_data, ) - for absolute_time, relative_time, sample_type, throughput, throughput_unit in samples: - self.metrics_store.put_value_cluster_level(name="throughput", value=throughput, unit=throughput_unit, task=task.name, - operation=task.operation.name, operation_type=task.operation.type, - sample_type=sample_type, absolute_time=absolute_time, - relative_time=relative_time, meta_data=meta_data) + for ( + absolute_time, + relative_time, + sample_type, + throughput, + throughput_unit, + ) in samples: + self.metrics_store.put_value_cluster_level( + name="throughput", + value=throughput, + unit=throughput_unit, + task=task.name, + operation=task.operation.name, + operation_type=task.operation.type, + sample_type=sample_type, + absolute_time=absolute_time, + relative_time=relative_time, + meta_data=meta_data, + ) end = time.perf_counter() self.logger.debug("Storing throughput took [%f] seconds.", (end - start)) start = end @@ -910,9 +1186,15 @@ def __call__(self, raw_samples): # no need for frequent refreshes. self.metrics_store.flush(refresh=False) end = time.perf_counter() - self.logger.debug("Flushing the metrics store took [%f] seconds.", (end - start)) - self.logger.debug("Postprocessing [%d] raw samples (downsampled to [%d] samples) took [%f] seconds in total.", - len(raw_samples), final_sample_count, (end - total_start)) + self.logger.debug( + "Flushing the metrics store took [%f] seconds.", (end - start) + ) + self.logger.debug( + "Postprocessing [%d] raw samples (downsampled to [%d] samples) took [%f] seconds in total.", + len(raw_samples), + final_sample_count, + (end - total_start), + ) def merge(self, *args): result = {} @@ -977,10 +1259,7 @@ def __init__(self): self.allocations = [] def add(self, client_id, tasks): - self.allocations.append({ - "client_id": client_id, - "tasks": tasks - }) + self.allocations.append({"client_id": client_id, "tasks": tasks}) def is_joinpoint(self, task_index): return all(isinstance(t.task, JoinPoint) for t in self.tasks(task_index)) @@ -990,7 +1269,9 @@ def tasks(self, task_index, remove_empty=True): for allocation in self.allocations: tasks_at_index = allocation["tasks"][task_index] if remove_empty and tasks_at_index is not None: - current_tasks.append(ClientAllocation(allocation["client_id"], tasks_at_index)) + current_tasks.append( + ClientAllocation(allocation["client_id"], tasks_at_index) + ) return current_tasks @@ -1032,7 +1313,14 @@ def receiveMsg_StartWorker(self, msg, sender): self.worker_id = msg.worker_id self.config = load_local_config(msg.config) self.on_error = self.config.opts("worker_coordinator", "on.error") - self.sample_queue_size = int(self.config.opts("results_publishing", "sample.queue.size", mandatory=False, default_value=1 << 20)) + self.sample_queue_size = int( + self.config.opts( + "results_publishing", + "sample.queue.size", + mandatory=False, + default_value=1 << 20, + ) + ) self.workload = msg.workload workload.set_absolute_data_path(self.config, self.workload) self.client_allocations = msg.client_allocations @@ -1043,14 +1331,26 @@ def receiveMsg_StartWorker(self, msg, sender): self.wakeup_interval = 0.5 runner.register_default_runners() if self.workload.has_plugins: - workload.load_workload_plugins(self.config, self.workload.name, runner.register_runner, scheduler.register_scheduler) + workload.load_workload_plugins( + self.config, + self.workload.name, + runner.register_runner, + scheduler.register_scheduler, + ) self.drive() @actor.no_retry("worker") # pylint: disable=no-value-for-parameter def receiveMsg_Drive(self, msg, sender): - sleep_time = datetime.timedelta(seconds=msg.client_start_timestamp - time.perf_counter()) - self.logger.info("Worker[%d] is continuing its work at task index [%d] on [%f], that is in [%s].", - self.worker_id, self.current_task_index, msg.client_start_timestamp, sleep_time) + sleep_time = datetime.timedelta( + seconds=msg.client_start_timestamp - time.perf_counter() + ) + self.logger.info( + "Worker[%d] is continuing its work at task index [%d] on [%f], that is in [%s].", + self.worker_id, + self.current_task_index, + msg.client_start_timestamp, + sleep_time, + ) self.start_driving = True self.wakeupAfter(sleep_time) @@ -1059,11 +1359,17 @@ def receiveMsg_CompleteCurrentTask(self, msg, sender): # finish now ASAP. Remaining samples will be sent with the next WakeupMessage. We will also need to skip to the next # JoinPoint. But if we are already at a JoinPoint at the moment, there is nothing to do. if self.at_joinpoint(): - self.logger.info("Worker[%s] has received CompleteCurrentTask but is currently at join point at index [%d]. Ignoring.", - str(self.worker_id), self.current_task_index) + self.logger.info( + "Worker[%s] has received CompleteCurrentTask but is currently at join point at index [%d]. Ignoring.", + str(self.worker_id), + self.current_task_index, + ) else: - self.logger.info("Worker[%s] has received CompleteCurrentTask. Completing tasks at index [%d].", - str(self.worker_id), self.current_task_index) + self.logger.info( + "Worker[%s] has received CompleteCurrentTask. Completing tasks at index [%d].", + str(self.worker_id), + self.current_task_index, + ) self.complete.set() @actor.no_retry("worker") # pylint: disable=no-value-for-parameter @@ -1075,48 +1381,78 @@ def receiveMsg_WakeupMessage(self, msg, sender): else: current_samples = self.send_samples() if self.cancel.is_set(): - self.logger.info("Worker[%s] has detected that benchmark has been cancelled. Notifying master...", - str(self.worker_id)) + self.logger.info( + "Worker[%s] has detected that benchmark has been cancelled. Notifying master...", + str(self.worker_id), + ) self.send(self.master, actor.BenchmarkCancelled()) elif self.executor_future is not None and self.executor_future.done(): e = self.executor_future.exception(timeout=0) if e: - self.logger.exception("Worker[%s] has detected a benchmark failure. Notifying master...", - str(self.worker_id), exc_info=e) + self.logger.exception( + "Worker[%s] has detected a benchmark failure. Notifying master...", + str(self.worker_id), + exc_info=e, + ) # the exception might be user-defined and not be on the load path of the master worker_coordinator. Hence, it cannot be # deserialized on the receiver so we convert it here to a plain string. - self.send(self.master, actor.BenchmarkFailure("Error in load generator [{}]".format(self.worker_id), str(e))) + self.send( + self.master, + actor.BenchmarkFailure( + "Error in load generator [{}]".format(self.worker_id), + str(e), + ), + ) else: - self.logger.info("Worker[%s] is ready for the next task.", str(self.worker_id)) + self.logger.info( + "Worker[%s] is ready for the next task.", str(self.worker_id) + ) self.executor_future = None self.drive() else: if current_samples and len(current_samples) > 0: most_recent_sample = current_samples[-1] if most_recent_sample.percent_completed is not None: - self.logger.debug("Worker[%s] is executing [%s] (%.2f%% complete).", - str(self.worker_id), most_recent_sample.task, most_recent_sample.percent_completed * 100.0) + self.logger.debug( + "Worker[%s] is executing [%s] (%.2f%% complete).", + str(self.worker_id), + most_recent_sample.task, + most_recent_sample.percent_completed * 100.0, + ) else: # TODO: This could be misleading given that one worker could execute more than one task... - self.logger.debug("Worker[%s] is executing [%s] (dependent eternal task).", - str(self.worker_id), most_recent_sample.task) + self.logger.debug( + "Worker[%s] is executing [%s] (dependent eternal task).", + str(self.worker_id), + most_recent_sample.task, + ) else: - self.logger.debug("Worker[%s] is executing (no samples).", str(self.worker_id)) + self.logger.debug( + "Worker[%s] is executing (no samples).", str(self.worker_id) + ) self.wakeupAfter(datetime.timedelta(seconds=self.wakeup_interval)) def receiveMsg_ActorExitRequest(self, msg, sender): - self.logger.info("Worker[%s] has received ActorExitRequest.", str(self.worker_id)) + self.logger.info( + "Worker[%s] has received ActorExitRequest.", str(self.worker_id) + ) if self.executor_future is not None and self.executor_future.running(): self.cancel.set() self.pool.shutdown() - self.logger.info("Worker[%s] is exiting due to ActorExitRequest.", str(self.worker_id)) + self.logger.info( + "Worker[%s] is exiting due to ActorExitRequest.", str(self.worker_id) + ) def receiveMsg_BenchmarkFailure(self, msg, sender): # sent by our no_retry infrastructure; forward to master self.send(self.master, msg) def receiveUnrecognizedMessage(self, msg, sender): - self.logger.info("Worker[%d] received unknown message [%s] (ignoring).", self.worker_id, str(msg)) + self.logger.info( + "Worker[%d] received unknown message [%s] (ignoring).", + self.worker_id, + str(msg), + ) def drive(self): task_allocations = self.current_tasks_and_advance() @@ -1125,7 +1461,11 @@ def drive(self): task_allocations = self.current_tasks_and_advance() if self.at_joinpoint(): - self.logger.info("Worker[%d] reached join point at index [%d].", self.worker_id, self.current_task_index) + self.logger.info( + "Worker[%d] reached join point at index [%d].", + self.worker_id, + self.current_task_index, + ) # clients that don't execute tasks don't need to care about waiting if self.executor_future is not None: self.executor_future.result() @@ -1139,13 +1479,31 @@ def drive(self): # There may be a situation where there are more (parallel) tasks than workers. If we were asked to complete all tasks, we not # only need to complete actively running tasks but actually all scheduled tasks until we reach the next join point. if self.complete.is_set(): - self.logger.info("Worker[%d] skips tasks at index [%d] because it has been asked to complete all " - "tasks until next join point.", self.worker_id, self.current_task_index) + self.logger.info( + "Worker[%d] skips tasks at index [%d] because it has been asked to complete all " + "tasks until next join point.", + self.worker_id, + self.current_task_index, + ) else: - self.logger.info("Worker[%d] is executing tasks at index [%d].", self.worker_id, self.current_task_index) - self.sampler = Sampler(start_timestamp=time.perf_counter(), buffer_size=self.sample_queue_size) - executor = AsyncIoAdapter(self.config, self.workload, task_allocations, self.sampler, - self.cancel, self.complete, self.on_error) + self.logger.info( + "Worker[%d] is executing tasks at index [%d].", + self.worker_id, + self.current_task_index, + ) + self.sampler = Sampler( + start_timestamp=time.perf_counter(), + buffer_size=self.sample_queue_size, + ) + executor = AsyncIoAdapter( + self.config, + self.workload, + task_allocations, + self.sampler, + self.cancel, + self.complete, + self.on_error, + ) self.executor_future = self.pool.submit(executor) self.wakeupAfter(datetime.timedelta(seconds=self.wakeup_interval)) @@ -1157,7 +1515,9 @@ def current_tasks_and_advance(self): self.current_task_index = self.next_task_index current = self.client_allocations.tasks(self.current_task_index) self.next_task_index += 1 - self.logger.debug("Worker[%d] is at task index [%d].", self.worker_id, self.current_task_index) + self.logger.debug( + "Worker[%d] is at task index [%d].", self.worker_id, self.current_task_index + ) return current def send_samples(self): @@ -1179,15 +1539,50 @@ def __init__(self, start_timestamp, buffer_size=16384): self.q = queue.Queue(maxsize=buffer_size) self.logger = logging.getLogger(__name__) - def add(self, task, client_id, sample_type, meta_data, absolute_time, request_start, latency, service_time, - processing_time, throughput, ops, ops_unit, time_period, percent_completed, dependent_timing=None): + def add( + self, + task, + client_id, + sample_type, + meta_data, + absolute_time, + request_start, + latency, + service_time, + processing_time, + throughput, + ops, + ops_unit, + time_period, + percent_completed, + dependent_timing=None, + ): try: self.q.put_nowait( - Sample(client_id, absolute_time, request_start, self.start_timestamp, task, sample_type, meta_data, - latency, service_time, processing_time, throughput, ops, ops_unit, time_period, - percent_completed, dependent_timing)) + Sample( + client_id, + absolute_time, + request_start, + self.start_timestamp, + task, + sample_type, + meta_data, + latency, + service_time, + processing_time, + throughput, + ops, + ops_unit, + time_period, + percent_completed, + dependent_timing, + ) + ) except queue.Full: - self.logger.warning("Dropping sample for [%s] due to a full sampling queue.", task.operation.name) + self.logger.warning( + "Dropping sample for [%s] due to a full sampling queue.", + task.operation.name, + ) @property def samples(self): @@ -1201,9 +1596,27 @@ def samples(self): class Sample: - def __init__(self, client_id, absolute_time, request_start, task_start, task, sample_type, request_meta_data, latency, - service_time, processing_time, throughput, total_ops, total_ops_unit, time_period, - percent_completed, dependent_timing=None, operation_name=None, operation_type=None): + def __init__( + self, + client_id, + absolute_time, + request_start, + task_start, + task, + sample_type, + request_meta_data, + latency, + service_time, + processing_time, + throughput, + total_ops, + total_ops_unit, + time_period, + percent_completed, + dependent_timing=None, + operation_name=None, + operation_type=None, + ): self.client_id = client_id self.absolute_time = absolute_time self.request_start = request_start @@ -1226,11 +1639,15 @@ def __init__(self, client_id, absolute_time, request_start, task_start, task, sa @property def operation_name(self): - return self._operation_name if self._operation_name else self.task.operation.name + return ( + self._operation_name if self._operation_name else self.task.operation.name + ) @property def operation_type(self): - return self._operation_type if self._operation_type else self.task.operation.type + return ( + self._operation_type if self._operation_type else self.task.operation.type + ) @property def operation_meta_data(self): @@ -1244,15 +1661,33 @@ def relative_time(self): def dependent_timings(self): if self._dependent_timing: for t in self._dependent_timing: - yield Sample(self.client_id, t["absolute_time"], t["request_start"], self.task_start, self.task, - self.sample_type, self.request_meta_data, 0, t["service_time"], 0, 0, self.total_ops, - self.total_ops_unit, self.time_period, self.percent_completed, None, - t["operation"], t["operation-type"]) + yield Sample( + self.client_id, + t["absolute_time"], + t["request_start"], + self.task_start, + self.task, + self.sample_type, + self.request_meta_data, + 0, + t["service_time"], + 0, + 0, + self.total_ops, + self.total_ops_unit, + self.time_period, + self.percent_completed, + None, + t["operation"], + t["operation-type"], + ) def __repr__(self, *args, **kwargs): - return f"[{self.absolute_time}; {self.relative_time}] [client [{self.client_id}]] [{self.task}] " \ - f"[{self.sample_type}]: [{self.latency}s] request latency, [{self.service_time}s] service time, " \ - f"[{self.total_ops} {self.total_ops_unit}]" + return ( + f"[{self.absolute_time}; {self.relative_time}] [client [{self.client_id}]] [{self.task}] " + f"[{self.sample_type}]: [{self.latency}s] request latency, [{self.service_time}s] service time, " + f"[{self.total_ops} {self.total_ops_unit}]" + ) def select_test_procedure(config, t): @@ -1260,8 +1695,11 @@ def select_test_procedure(config, t): selected_test_procedure = t.find_test_procedure_or_default(test_procedure_name) if not selected_test_procedure: - raise exceptions.SystemSetupError("Unknown test_procedure [%s] for workload [%s]. You can list the available workloads and their " - "test_procedures with %s list workloads." % (test_procedure_name, t.name, PROGRAM_NAME)) + raise exceptions.SystemSetupError( + "Unknown test_procedure [%s] for workload [%s]. You can list the available workloads and their " + "test_procedures with %s list workloads." + % (test_procedure_name, t.name, PROGRAM_NAME) + ) return selected_test_procedure @@ -1270,6 +1708,7 @@ class TaskStats: """ Stores per task numbers needed for throughput calculation in between multiple calculations. """ + def __init__(self, bucket_interval, sample_type, start_time): self.unprocessed = [] self.total_count = 0 @@ -1344,7 +1783,9 @@ def calculate(self, samples, bucket_interval_secs=1): # only transform the values into the expected structure. first_sample = current_samples[0] if first_sample.throughput is None: - task_throughput = self.calculate_task_throughput(task, current_samples, bucket_interval_secs) + task_throughput = self.calculate_task_throughput( + task, current_samples, bucket_interval_secs + ) else: task_throughput = self.map_task_throughput(current_samples) global_throughput[task].extend(task_throughput) @@ -1356,9 +1797,11 @@ def calculate_task_throughput(self, task, current_samples, bucket_interval_secs) if task not in self.task_stats: first_sample = current_samples[0] - self.task_stats[task] = ThroughputCalculator.TaskStats(bucket_interval=bucket_interval_secs, - sample_type=first_sample.sample_type, - start_time=first_sample.absolute_time - first_sample.time_period) + self.task_stats[task] = ThroughputCalculator.TaskStats( + bucket_interval=bucket_interval_secs, + sample_type=first_sample.sample_type, + start_time=first_sample.absolute_time - first_sample.time_period, + ) current = self.task_stats[task] count = current.total_count last_sample = None @@ -1379,12 +1822,16 @@ def calculate_task_throughput(self, task, current_samples, bucket_interval_secs) if current.can_calculate_throughput(): current.finish_bucket(count) - task_throughput.append((sample.absolute_time, - sample.relative_time, - current.sample_type, - current.throughput, - # we calculate throughput per second - f"{sample.total_ops_unit}/s")) + task_throughput.append( + ( + sample.absolute_time, + sample.relative_time, + current.sample_type, + current.throughput, + # we calculate throughput per second + f"{sample.total_ops_unit}/s", + ) + ) else: current.unprocessed.append(sample) @@ -1392,27 +1839,37 @@ def calculate_task_throughput(self, task, current_samples, bucket_interval_secs) # interval (mainly needed to ensure we show throughput data in test mode) if last_sample is not None and current.can_add_final_throughput_sample(): current.finish_bucket(count) - task_throughput.append((last_sample.absolute_time, - last_sample.relative_time, - current.sample_type, - current.throughput, - f"{last_sample.total_ops_unit}/s")) + task_throughput.append( + ( + last_sample.absolute_time, + last_sample.relative_time, + current.sample_type, + current.throughput, + f"{last_sample.total_ops_unit}/s", + ) + ) return task_throughput def map_task_throughput(self, current_samples): throughput = [] for sample in current_samples: - throughput.append((sample.absolute_time, - sample.relative_time, - sample.sample_type, - sample.throughput, - f"{sample.total_ops_unit}/s")) + throughput.append( + ( + sample.absolute_time, + sample.relative_time, + sample.sample_type, + sample.throughput, + f"{sample.total_ops_unit}/s", + ) + ) return throughput class AsyncIoAdapter: - def __init__(self, cfg, workload, task_allocations, sampler, cancel, complete, abort_on_error): + def __init__( + self, cfg, workload, task_allocations, sampler, cancel, complete, abort_on_error + ): self.cfg = cfg self.workload = workload self.task_allocations = task_allocations @@ -1422,7 +1879,9 @@ def __init__(self, cfg, workload, task_allocations, sampler, cancel, complete, a self.abort_on_error = abort_on_error self.profiling_enabled = self.cfg.opts("worker_coordinator", "profiling") self.assertions_enabled = self.cfg.opts("worker_coordinator", "assertions") - self.debug_event_loop = self.cfg.opts("system", "async.debug", mandatory=False, default_value=False) + self.debug_event_loop = self.cfg.opts( + "system", "async.debug", mandatory=False, default_value=False + ) self.logger = logging.getLogger(__name__) def __call__(self, *args, **kwargs): @@ -1448,14 +1907,18 @@ async def run(self): def os_clients(all_hosts, all_client_options): opensearch = {} for cluster_name, cluster_hosts in all_hosts.items(): - opensearch[cluster_name] = client.OsClientFactory(cluster_hosts, all_client_options[cluster_name]).create_async() + opensearch[cluster_name] = client.OsClientFactory( + cluster_hosts, all_client_options[cluster_name] + ).create_async() return opensearch # Properly size the internal connection pool to match the number of expected clients but allow the user # to override it if needed. client_count = len(self.task_allocations) - opensearch = os_clients(self.cfg.opts("client", "hosts").all_hosts, - self.cfg.opts("client", "options").with_max_connections(client_count)) + opensearch = os_clients( + self.cfg.opts("client", "hosts").all_hosts, + self.cfg.opts("client", "options").with_max_connections(client_count), + ) self.logger.info("Task assertions enabled: %s", str(self.assertions_enabled)) runner.enable_assertions(self.assertions_enabled) @@ -1476,11 +1939,24 @@ def os_clients(all_hosts, all_client_options): # # Now we need to ensure that we start partitioning parameters correctly in both cases. And that means we # need to start from (client) index 0 in both cases instead of 0 for indexA and 4 for indexB. - schedule = schedule_for(task, task_allocation.client_index_in_task, params_per_task[task]) + schedule = schedule_for( + task, task_allocation.client_index_in_task, params_per_task[task] + ) async_executor = AsyncExecutor( - client_id, task, schedule, opensearch, self.sampler, self.cancel, self.complete, - task.error_behavior(self.abort_on_error)) - final_executor = AsyncProfiler(async_executor) if self.profiling_enabled else async_executor + client_id, + task, + schedule, + opensearch, + self.sampler, + self.cancel, + self.complete, + task.error_behavior(self.abort_on_error), + ) + final_executor = ( + AsyncProfiler(async_executor) + if self.profiling_enabled + else async_executor + ) aws.append(final_executor()) run_start = time.perf_counter() try: @@ -1490,11 +1966,17 @@ def os_clients(all_hosts, all_client_options): self.logger.info("Total run duration: %f seconds.", (run_end - run_start)) await asyncio.get_event_loop().shutdown_asyncgens() shutdown_asyncgens_end = time.perf_counter() - self.logger.info("Total time to shutdown asyncgens: %f seconds.", (shutdown_asyncgens_end - run_end)) + self.logger.info( + "Total time to shutdown asyncgens: %f seconds.", + (shutdown_asyncgens_end - run_end), + ) for s in opensearch.values(): await s.transport.close() transport_close_end = time.perf_counter() - self.logger.info("Total time to close transports: %f seconds.", (shutdown_asyncgens_end - transport_close_end)) + self.logger.info( + "Total time to close transports: %f seconds.", + (shutdown_asyncgens_end - transport_close_end), + ) class AsyncProfiler: @@ -1510,19 +1992,23 @@ async def __call__(self, *args, **kwargs): # pylint: disable=import-outside-toplevel import yappi import io as python_io + yappi.start() try: return await self.target(*args, **kwargs) finally: yappi.stop() s = python_io.StringIO() - yappi.get_func_stats().print_all(out=s, columns={ - 0: ("name", 140), - 1: ("ncall", 8), - 2: ("tsub", 8), - 3: ("ttot", 8), - 4: ("tavg", 8) - }) + yappi.get_func_stats().print_all( + out=s, + columns={ + 0: ("name", 140), + 1: ("ncall", 8), + 2: ("tsub", 8), + 3: ("ttot", 8), + 4: ("tavg", 8), + }, + ) profile = "\n=== Profile START ===\n" profile += s.getvalue() @@ -1531,7 +2017,9 @@ async def __call__(self, *args, **kwargs): class AsyncExecutor: - def __init__(self, client_id, task, schedule, opensearch, sampler, cancel, complete, on_error): + def __init__( + self, client_id, task, schedule, opensearch, sampler, cancel, complete, on_error + ): """ Executes tasks according to the schedule for a given operation. @@ -1577,8 +2065,12 @@ async def __call__(self, *args, **kwargs): absolute_processing_start = time.time() processing_start = time.perf_counter() self.schedule_handle.before_request(processing_start) - async with self.opensearch["default"].new_request_context() as request_context: - total_ops, total_ops_unit, request_meta_data = await execute_single(runner, self.opensearch, params, self.on_error) + async with self.opensearch[ + "default" + ].new_request_context() as request_context: + total_ops, total_ops_unit, request_meta_data = await execute_single( + runner, self.opensearch, params, self.on_error + ) request_start = request_context.request_start request_end = request_context.request_end @@ -1586,7 +2078,9 @@ async def __call__(self, *args, **kwargs): service_time = request_end - request_start processing_time = processing_end - processing_start time_period = request_end - total_start - self.schedule_handle.after_request(processing_end, total_ops, total_ops_unit, request_meta_data) + self.schedule_handle.after_request( + processing_end, total_ops, total_ops_unit, request_meta_data + ) # Allow runners to override the throughput calculation in very specific circumstances. Usually, Benchmark # assumes that throughput is the "amount of work" (determined by the "weight") per unit of time # (determined by the elapsed time period). However, in certain cases (e.g. shard recovery or other @@ -1599,7 +2093,11 @@ async def __call__(self, *args, **kwargs): # throughput = request_meta_data.pop("throughput", None) # Do not calculate latency separately when we run unthrottled. This metric is just confusing then. - latency = request_end - absolute_expected_schedule_time if throughput_throttled else service_time + latency = ( + request_end - absolute_expected_schedule_time + if throughput_throttled + else service_time + ) # If this task completes the parent task we should *not* check for completion by another client but # instead continue until our own runner has completed. We need to do this because the current # worker (process) could run multiple clients that execute the same task. We do not want all clients to @@ -1617,22 +2115,43 @@ async def __call__(self, *args, **kwargs): else: progress = percent_completed - self.sampler.add(self.task, self.client_id, sample_type, request_meta_data, - absolute_processing_start, request_start, - latency, service_time, processing_time, throughput, total_ops, total_ops_unit, - time_period, progress, request_meta_data.pop("dependent_timing", None)) + self.sampler.add( + self.task, + self.client_id, + sample_type, + request_meta_data, + absolute_processing_start, + request_start, + latency, + service_time, + processing_time, + throughput, + total_ops, + total_ops_unit, + time_period, + progress, + request_meta_data.pop("dependent_timing", None), + ) if completed: - self.logger.info("Task [%s] is considered completed due to external event.", self.task) + self.logger.info( + "Task [%s] is considered completed due to external event.", + self.task, + ) break except BaseException as e: self.logger.exception("Could not execute schedule") - raise exceptions.BenchmarkError(f"Cannot run task [{self.task}]: {e}") from None + raise exceptions.BenchmarkError( + f"Cannot run task [{self.task}]: {e}" + ) from None finally: # Actively set it if this task completes its parent if task_completes_parent: - self.logger.info("Task [%s] completes parent. Client id [%s] is finished executing it and signals completion.", - self.task, self.client_id) + self.logger.info( + "Task [%s] completes parent. Client id [%s] is finished executing it and signals completion.", + self.task, + self.client_id, + ) self.complete.set() @@ -1644,6 +2163,7 @@ async def execute_single(runner, opensearch, params, on_error): """ # pylint: disable=import-outside-toplevel import opensearchpy + fatal_error = False try: async with runner: @@ -1669,10 +2189,7 @@ async def execute_single(runner, opensearch, params, on_error): total_ops = 0 total_ops_unit = "ops" - request_meta_data = { - "success": False, - "error-type": "transport" - } + request_meta_data = {"success": False, "error-type": "transport"} # The ES client will sometimes return string like "N/A" or "TIMEOUT" for connection errors. if isinstance(e.status_code, int): request_meta_data["http-status"] = e.status_code @@ -1688,13 +2205,22 @@ async def execute_single(runner, opensearch, params, on_error): error_description = str(e.error) request_meta_data["error-description"] = error_description except KeyError as e: - logging.getLogger(__name__).exception("Cannot execute runner [%s]; most likely due to missing parameters.", str(runner)) - msg = "Cannot execute [%s]. Provided parameters are: %s. Error: [%s]." % (str(runner), list(params.keys()), str(e)) + logging.getLogger(__name__).exception( + "Cannot execute runner [%s]; most likely due to missing parameters.", + str(runner), + ) + msg = "Cannot execute [%s]. Provided parameters are: %s. Error: [%s]." % ( + str(runner), + list(params.keys()), + str(e), + ) raise exceptions.SystemSetupError(msg) if not request_meta_data["success"]: if on_error == "abort" or fatal_error: - msg = "Request returned an error. Error type: %s" % request_meta_data.get("error-type", "Unknown") + msg = "Request returned an error. Error type: %s" % request_meta_data.get( + "error-type", "Unknown" + ) description = request_meta_data.get("error-description") if description: msg += ", Description: %s" % description @@ -1714,8 +2240,12 @@ def __init__(self, id, clients_executing_completing_task=None): clients_executing_completing_task = [] self.id = id self.clients_executing_completing_task = clients_executing_completing_task - self.num_clients_executing_completing_task = len(clients_executing_completing_task) - self.preceding_task_completes_parent = self.num_clients_executing_completing_task > 0 + self.num_clients_executing_completing_task = len( + clients_executing_completing_task + ) + self.preceding_task_completes_parent = ( + self.num_clients_executing_completing_task > 0 + ) def __hash__(self): return hash(self.id) @@ -1736,10 +2266,18 @@ def __hash__(self): return hash(self.task) ^ hash(self.client_index_in_task) def __eq__(self, other): - return isinstance(other, type(self)) and self.task == other.task and self.client_index_in_task == other.client_index_in_task + return ( + isinstance(other, type(self)) + and self.task == other.task + and self.client_index_in_task == other.client_index_in_task + ) def __repr__(self, *args, **kwargs): - return "TaskAllocation [%d/%d] for %s" % (self.client_index_in_task, self.task.clients, self.task) + return "TaskAllocation [%d/%d] for %s" % ( + self.client_index_in_task, + self.task.clients, + self.task, + ) class Allocator: @@ -1779,13 +2317,17 @@ def allocations(self): start_client_index = 0 clients_executing_completing_task = [] for sub_task in task: - for client_index in range(start_client_index, start_client_index + sub_task.clients): + for client_index in range( + start_client_index, start_client_index + sub_task.clients + ): # this is the actual client that will execute the task. It may differ from the logical one in case we over-commit (i.e. # more tasks than actually available clients) physical_client_index = client_index % max_clients if sub_task.completes_parent: clients_executing_completing_task.append(physical_client_index) - allocations[physical_client_index].append(TaskAllocation(sub_task, client_index - start_client_index)) + allocations[physical_client_index].append( + TaskAllocation(sub_task, client_index - start_client_index) + ) start_client_index += sub_task.clients # uneven distribution between tasks and clients, e.g. there are 5 (parallel) tasks but only 2 clients. Then, one of them @@ -1797,7 +2339,9 @@ def allocations(self): allocations[client_index].append(None) # let all clients join after each task, then we go on - next_join_point = JoinPoint(join_point_id, clients_executing_completing_task) + next_join_point = JoinPoint( + join_point_id, clients_executing_completing_task + ) for client_index in range(max_clients): allocations[client_index].append(next_join_point) join_point_id += 1 @@ -1808,7 +2352,11 @@ def join_points(self): """ :return: A list of all join points for this allocations. """ - return [allocation for allocation in self.allocations[0] if isinstance(allocation, JoinPoint)] + return [ + allocation + for allocation in self.allocations[0] + if isinstance(allocation, JoinPoint) + ] @property def tasks_per_joinpoint(self): @@ -1884,15 +2432,22 @@ def schedule_for(task, client_index, parameter_source): params_for_op = parameter_source.partition(client_index, num_clients) if hasattr(sched, "parameter_source"): if client_index == 0: - logger.debug("Setting parameter source [%s] for scheduler [%s]", params_for_op, sched) + logger.debug( + "Setting parameter source [%s] for scheduler [%s]", params_for_op, sched + ) sched.parameter_source = params_for_op if requires_time_period_schedule(task, runner_for_op, params_for_op): warmup_time_period = task.warmup_time_period if task.warmup_time_period else 0 if client_index == 0: - logger.info("Creating time-period based schedule with [%s] distribution for [%s] with a warmup period of [%s] " - "seconds and a time period of [%s] seconds.", task.schedule, task.name, - str(warmup_time_period), str(task.time_period)) + logger.info( + "Creating time-period based schedule with [%s] distribution for [%s] with a warmup period of [%s] " + "seconds and a time period of [%s] seconds.", + task.schedule, + task.name, + str(warmup_time_period), + str(task.time_period), + ) loop_control = TimePeriodBased(warmup_time_period, task.time_period) else: warmup_iterations = task.warmup_iterations if task.warmup_iterations else 0 @@ -1904,15 +2459,28 @@ def schedule_for(task, client_index, parameter_source): else: iterations = None if client_index == 0: - logger.info("Creating iteration-count based schedule with [%s] distribution for [%s] with [%s] warmup " - "iterations and [%s] iterations.", task.schedule, task.name, str(warmup_iterations), str(iterations)) + logger.info( + "Creating iteration-count based schedule with [%s] distribution for [%s] with [%s] warmup " + "iterations and [%s] iterations.", + task.schedule, + task.name, + str(warmup_iterations), + str(iterations), + ) loop_control = IterationBased(warmup_iterations, iterations) if client_index == 0: if loop_control.infinite: - logger.info("Parameter source will determine when the schedule for [%s] terminates.", task.name) + logger.info( + "Parameter source will determine when the schedule for [%s] terminates.", + task.name, + ) else: - logger.info("%s schedule will determine when the schedule for [%s] terminates.", str(loop_control), task.name) + logger.info( + "%s schedule will determine when the schedule for [%s] terminates.", + str(loop_control), + task.name, + ) return ScheduleHandle(task.name, sched, loop_control, runner_for_op, params_for_op) @@ -1948,10 +2516,10 @@ def __init__(self, task_name, sched, task_progress_control, runner, params): self.runner = runner self.params = params # TODO: Can we offload the parameter source execution to a different thread / process? Is this too heavy-weight? - #from concurrent.futures import ThreadPoolExecutor - #import asyncio - #self.io_pool_exc = ThreadPoolExecutor(max_workers=1) - #self.loop = asyncio.get_event_loop() + # from concurrent.futures import ThreadPoolExecutor + # import asyncio + # self.io_pool_exc = ThreadPoolExecutor(max_workers=1) + # self.loop = asyncio.get_event_loop() def before_request(self, now): self.sched.before_request(now) @@ -1968,10 +2536,19 @@ async def __call__(self): try: next_scheduled = self.sched.next(next_scheduled) # does not contribute at all to completion. Hence, we cannot define completion. - percent_completed = self.params.percent_completed if param_source_knows_progress else None - #current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params) - yield (next_scheduled, self.task_progress_control.sample_type, percent_completed, self.runner, - self.params.params()) + percent_completed = ( + self.params.percent_completed + if param_source_knows_progress + else None + ) + # current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params) + yield ( + next_scheduled, + self.task_progress_control.sample_type, + percent_completed, + self.runner, + self.params.params(), + ) self.task_progress_control.next() except StopIteration: return @@ -1980,12 +2557,14 @@ async def __call__(self): while not self.task_progress_control.completed: try: next_scheduled = self.sched.next(next_scheduled) - #current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params) - yield (next_scheduled, - self.task_progress_control.sample_type, - self.task_progress_control.percent_completed, - self.runner, - self.params.params()) + # current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params) + yield ( + next_scheduled, + self.task_progress_control.sample_type, + self.task_progress_control.percent_completed, + self.runner, + self.params.params(), + ) self.task_progress_control.next() except StopIteration: return @@ -2012,7 +2591,11 @@ def _elapsed(self): @property def sample_type(self): - return metrics.SampleType.Warmup if self._elapsed < self._warmup_time_period else metrics.SampleType.Normal + return ( + metrics.SampleType.Warmup + if self._elapsed < self._warmup_time_period + else metrics.SampleType.Normal + ) @property def infinite(self): @@ -2040,7 +2623,9 @@ def __init__(self, warmup_iterations, iterations): if warmup_iterations is not None and iterations is not None: self._total_iterations = self._warmup_iterations + self._iterations if self._total_iterations == 0: - raise exceptions.BenchmarkAssertionError("Operation must run at least for one iteration.") + raise exceptions.BenchmarkAssertionError( + "Operation must run at least for one iteration." + ) else: self._total_iterations = None self._it = None @@ -2050,7 +2635,11 @@ def start(self): @property def sample_type(self): - return metrics.SampleType.Warmup if self._it < self._warmup_iterations else metrics.SampleType.Normal + return ( + metrics.SampleType.Warmup + if self._it < self._warmup_iterations + else metrics.SampleType.Normal + ) @property def infinite(self): diff --git a/setup.py b/setup.py index f93e9d000..e4cbd4204 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -92,6 +92,8 @@ def str_from_file(name): "google-auth==1.22.1", # License: MIT "wheel==0.38.4", + # License: MIT + "tqdm==4.66.1", # License: Apache 2.0 # transitive dependencies: # botocore: Apache 2.0 @@ -104,7 +106,7 @@ def str_from_file(name): "ujson", "pytest==7.2.2", "pytest-benchmark==3.2.2", - "pytest-asyncio==0.14.0" + "pytest-asyncio==0.14.0", ] # These packages are only required when developing Benchmark @@ -115,65 +117,71 @@ def str_from_file(name): "wheel==0.38.4", "github3.py==1.3.0", "pylint==2.6.0", - "pylint-quotes==0.2.1" + "pylint-quotes==0.2.1", ] -python_version_classifiers = ["Programming Language :: Python :: {}.{}".format(major, minor) - for major, minor in supported_python_versions] +python_version_classifiers = [ + "Programming Language :: Python :: {}.{}".format(major, minor) + for major, minor in supported_python_versions +] -first_supported_version = "{}.{}".format(supported_python_versions[0][0], supported_python_versions[0][1]) +first_supported_version = "{}.{}".format( + supported_python_versions[0][0], supported_python_versions[0][1] +) # next minor after the latest supported version -first_unsupported_version = "{}.{}".format(supported_python_versions[-1][0], supported_python_versions[-1][1] + 1) +first_unsupported_version = "{}.{}".format( + supported_python_versions[-1][0], supported_python_versions[-1][1] + 1 +) -setup(name="opensearch-benchmark", - maintainer="Ian Hoang, Govind Kamat", - maintainer_email="hoangia@amazon.com, govkamat@amazon.com", - version=__versionstr__, - description="Macrobenchmarking framework for OpenSearch", - long_description=long_description, - long_description_content_type='text/markdown', - url="https://github.com/opensearch-project/OpenSearch-Benchmark", - license="Apache License, Version 2.0", - packages=find_packages( - where=".", - exclude=("tests*", "benchmarks*", "it*") - ), - include_package_data=True, - # supported Python versions. This will prohibit pip (> 9.0.0) from even installing Benchmark on an unsupported - # Python version. - # See also https://packaging.python.org/guides/distributing-packages-using-setuptools/#python-requires - # - # According to https://www.python.org/dev/peps/pep-0440/#version-matching, a trailing ".*" should - # ignore patch versions: - # - # "additional trailing segments will be ignored when determining whether or not a version identifier matches - # the clause" - # - # However, with the pattern ">=3.5.*,<=3.8.*", the version "3.8.0" is not accepted. Therefore, we match - # the minor version after the last supported one (i.e. if 3.8 is the last supported, we'll emit "<3.9") - python_requires=">={},<{}".format(first_supported_version, first_unsupported_version), - package_data={"": ["*.json", "*.yml"]}, - install_requires=install_requires, - test_suite="tests", - tests_require=tests_require, - extras_require={ - "develop": tests_require + develop_require - }, - entry_points={ - "console_scripts": [ - "opensearch-benchmark=osbenchmark.benchmark:main", - "opensearch-benchmarkd=osbenchmark.benchmarkd:main" - ], - }, - scripts=['scripts/expand-data-corpus.py'], - classifiers=[ - "Topic :: System :: Benchmark", - "Development Status :: 5 - Production/Stable", - "License :: OSI Approved :: Apache Software License", - "Intended Audience :: Developers", - "Operating System :: MacOS :: MacOS X", - "Operating System :: POSIX", - "Programming Language :: Python", - "Programming Language :: Python :: 3", - ] + python_version_classifiers, - zip_safe=False) +setup( + name="opensearch-benchmark", + maintainer="Ian Hoang, Govind Kamat", + maintainer_email="hoangia@amazon.com, govkamat@amazon.com", + version=__versionstr__, + description="Macrobenchmarking framework for OpenSearch", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://github.com/opensearch-project/OpenSearch-Benchmark", + license="Apache License, Version 2.0", + packages=find_packages(where=".", exclude=("tests*", "benchmarks*", "it*")), + include_package_data=True, + # supported Python versions. This will prohibit pip (> 9.0.0) from even installing Benchmark on an unsupported + # Python version. + # See also https://packaging.python.org/guides/distributing-packages-using-setuptools/#python-requires + # + # According to https://www.python.org/dev/peps/pep-0440/#version-matching, a trailing ".*" should + # ignore patch versions: + # + # "additional trailing segments will be ignored when determining whether or not a version identifier matches + # the clause" + # + # However, with the pattern ">=3.5.*,<=3.8.*", the version "3.8.0" is not accepted. Therefore, we match + # the minor version after the last supported one (i.e. if 3.8 is the last supported, we'll emit "<3.9") + python_requires=">={},<{}".format( + first_supported_version, first_unsupported_version + ), + package_data={"": ["*.json", "*.yml"]}, + install_requires=install_requires, + test_suite="tests", + tests_require=tests_require, + extras_require={"develop": tests_require + develop_require}, + entry_points={ + "console_scripts": [ + "opensearch-benchmark=osbenchmark.benchmark:main", + "opensearch-benchmarkd=osbenchmark.benchmarkd:main", + ], + }, + scripts=["scripts/expand-data-corpus.py"], + classifiers=[ + "Topic :: System :: Benchmark", + "Development Status :: 5 - Production/Stable", + "License :: OSI Approved :: Apache Software License", + "Intended Audience :: Developers", + "Operating System :: MacOS :: MacOS X", + "Operating System :: POSIX", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + ] + + python_version_classifiers, + zip_safe=False, +)