From f85f99809c8bfc6229821bd383abb006d8ad0790 Mon Sep 17 00:00:00 2001 From: Joshua McNamara Date: Tue, 12 Dec 2023 16:43:11 +0000 Subject: [PATCH 1/6] SCTASK0123222 - Implement failed messages queue --- conf/config.xml.example | 1 + lib/GRNOC/TSDS/Aggregate.pm | 2 +- lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm | 31 ++++++++++++++++++- 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/conf/config.xml.example b/conf/config.xml.example index 26d4673..8363c88 100644 --- a/conf/config.xml.example +++ b/conf/config.xml.example @@ -63,6 +63,7 @@ 5672 timeseries_pending_aggregate timeseries_finished_aggregate + timeseries_failed_aggregate diff --git a/lib/GRNOC/TSDS/Aggregate.pm b/lib/GRNOC/TSDS/Aggregate.pm index c1ccb05..9237248 100644 --- a/lib/GRNOC/TSDS/Aggregate.pm +++ b/lib/GRNOC/TSDS/Aggregate.pm @@ -3,6 +3,6 @@ package GRNOC::TSDS::Aggregate; use strict; use warnings; -our $VERSION = "1.2.1"; +our $VERSION = "1.2.2"; 1; diff --git a/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm b/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm index d65c257..752c616 100644 --- a/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm +++ b/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm @@ -26,6 +26,7 @@ use constant QUEUE_FETCH_TIMEOUT => 10 * 1000; use constant RECONNECT_TIMEOUT => 10; use constant PENDING_QUEUE_CHANNEL => 1; use constant FINISHED_QUEUE_CHANNEL => 2; +use constant FAILED_QUEUE_CHANNEL => 3; use constant SERVICE_CACHE_FILE => '/etc/grnoc/name-service-cacher/name-service.xml'; use constant COOKIES_FILE => '/var/lib/grnoc/tsds/aggregate/cookies.dat'; @@ -322,7 +323,19 @@ sub _consume_messages { try { - $self->_aggregate_messages( $aggregates_to_process ) if ( @$aggregates_to_process > 0 ); + my $results = $self->_aggregate_messages( $aggregates_to_process ) if ( @$aggregates_to_process > 0 ); + + # push any failed messages to the failed queue + my $failed_messages = $results->{'failed_messages'}; + if (@$failed_messages > 0) { + $self->logger->error( "Failed to aggregate " . @$failed_messages . " messages."); + $self->rabbit->publish( + FAILED_QUEUE_CHANNEL, + $self->config->get( '/config/rabbit/failed-queue' );, + $self->json->encode( \@failed_messages ), + {'exchange' => ''} + ); + } } catch { @@ -339,9 +352,11 @@ sub _aggregate_messages { my ( $self, $messages ) = @_; my $finished_messages = []; + my $results = {'failed_messages' => []}; foreach my $message ( @$messages ) { + try { my $type = $message->type; my $from = $message->interval_from; my $to = $message->interval_to; @@ -474,6 +489,13 @@ sub _aggregate_messages { push( @$finished_messages, $aggregated ); } } + } + catch { + # any failed aggregates are not added to 'finished_messages' + # and are instead pushed to a failed queue + $self->logger->error( "Error aggregating message: $_" ); + push( @{$results->{'failed_messages'}}, $message ); + } } my $num = @$finished_messages; @@ -487,6 +509,8 @@ sub _aggregate_messages { $self->rabbit->publish( FINISHED_QUEUE_CHANNEL, $queue, $self->json->encode( \@finished_messages ), {'exchange' => ''} ); } + + return $results; } sub _aggregate { @@ -785,6 +809,7 @@ sub _rabbit_connect { my $rabbit_port = $self->config->get( '/config/rabbit/port' ); my $rabbit_pending_queue = $self->config->get( '/config/rabbit/pending-queue' ); my $rabbit_finished_queue = $self->config->get( '/config/rabbit/finished-queue' ); + my $rabbit_failed_queue = $self->config->get( '/config/rabbit/failed-queue' ); while ( 1 ) { @@ -808,6 +833,10 @@ sub _rabbit_connect { $rabbit->channel_open( FINISHED_QUEUE_CHANNEL ); $rabbit->queue_declare( FINISHED_QUEUE_CHANNEL, $rabbit_finished_queue, {'auto_delete' => 0} ); + # open channel to the failed aggregate queue we'll send to + $rabbit->channel_open( FAILED_QUEUE_CHANNEL ); + $rabbit->queue_declare( FAILED_QUEUE_CHANNEL, $rabbit_failed_queue, {'auto_delete' => 0} ); + $self->_set_rabbit( $rabbit ); $connected = 1; From 6f75e025b76035b965f6c4b817b5773efe0b4549 Mon Sep 17 00:00:00 2001 From: Joshua McNamara Date: Tue, 12 Dec 2023 17:18:22 +0000 Subject: [PATCH 2/6] SCTASK0123222 - Bump version --- grnoc-tsds-aggregate.spec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/grnoc-tsds-aggregate.spec b/grnoc-tsds-aggregate.spec index 4ad2dcc..f06b285 100644 --- a/grnoc-tsds-aggregate.spec +++ b/grnoc-tsds-aggregate.spec @@ -1,6 +1,6 @@ Summary: GRNOC TSDS Aggregate Name: grnoc-tsds-aggregate -Version: 1.2.1 +Version: 1.2.2 Release: 1%{?dist} License: GRNOC Group: Measurement From 9b3076de4c7f4672419e1e458337df53b44cf8cb Mon Sep 17 00:00:00 2001 From: Joshua McNamara Date: Wed, 13 Dec 2023 18:21:49 +0000 Subject: [PATCH 3/6] SCTASK0123222 - Convert failed message to hash --- lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm b/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm index 752c616..e5f3077 100644 --- a/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm +++ b/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm @@ -331,8 +331,8 @@ sub _consume_messages { $self->logger->error( "Failed to aggregate " . @$failed_messages . " messages."); $self->rabbit->publish( FAILED_QUEUE_CHANNEL, - $self->config->get( '/config/rabbit/failed-queue' );, - $self->json->encode( \@failed_messages ), + $self->config->get( '/config/rabbit/failed-queue' ), + $self->json->encode( \@$failed_messages ), {'exchange' => ''} ); } @@ -494,7 +494,19 @@ sub _aggregate_messages { # any failed aggregates are not added to 'finished_messages' # and are instead pushed to a failed queue $self->logger->error( "Error aggregating message: $_" ); - push( @{$results->{'failed_messages'}}, $message ); + + # Convert Message object to hash (for encoding to JSON later) + my %failed_message = ( + type => $message->type, + interval_from => $message->interval_from; + interval_to => $message->interval_to; + start => $message->start; + end => $message->end; + meta => $message->meta; + values => $message->values; + required_meta => $message->required_meta; + ); + push( @{$results->{'failed_messages'}}, %failed_message ); } } From 0a02c0a78e184ea8891e1da32f88352c01aab05d Mon Sep 17 00:00:00 2001 From: Joshua McNamara Date: Wed, 13 Dec 2023 18:23:30 +0000 Subject: [PATCH 4/6] SCTASK0123222 - Fix indexing --- lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm | 212 +++++++++--------- 1 file changed, 106 insertions(+), 106 deletions(-) diff --git a/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm b/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm index e5f3077..3d7446c 100644 --- a/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm +++ b/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm @@ -357,138 +357,138 @@ sub _aggregate_messages { foreach my $message ( @$messages ) { try { - my $type = $message->type; - my $from = $message->interval_from; - my $to = $message->interval_to; - my $start = $message->start; - my $end = $message->end; - my $meta = $message->meta; - my $values = $message->values; - my $required_meta = $message->required_meta; - - # align to aggregation window we're getting data for - $start = nlowmult( $to, $start ); - $end = nhimult( $to, $end ); - - my $min_max_mappings = $self->_get_min_max_mappings( required_meta => $required_meta, - meta => $meta ); - - my $hist_mappings = $self->_get_histogram_mappings( $values ); - - # craft the query needed to fetch the data from the necessary interval - my $from_clause = "from $type"; - my $values_clause = $self->_get_values_clause( from => $from, values => $values, required_meta => $required_meta ); - my $between_clause = $self->_get_between_clause( start => $start, end => $end, to => $to ); - my $where_clause = $self->_get_where_clause( $meta ); - my $by_clause = $self->_get_by_clause( $required_meta ); - my $query = "$values_clause $between_clause $by_clause $from_clause $where_clause"; - - # issue the query to the webservice to retrieve the data we need to aggregate - $self->websvc->set_raw_output(1); - my $results = $self->websvc->query( query => $query, - output => 'bson'); - - # handle any errors attempting to query the webservice - if ( !$results || $self->websvc->get_error() ) { - - die( "Error querying TSDS web service: " . $self->websvc->get_error() ); - } + my $type = $message->type; + my $from = $message->interval_from; + my $to = $message->interval_to; + my $start = $message->start; + my $end = $message->end; + my $meta = $message->meta; + my $values = $message->values; + my $required_meta = $message->required_meta; + + # align to aggregation window we're getting data for + $start = nlowmult( $to, $start ); + $end = nhimult( $to, $end ); + + my $min_max_mappings = $self->_get_min_max_mappings( required_meta => $required_meta, + meta => $meta ); + + my $hist_mappings = $self->_get_histogram_mappings( $values ); + + # craft the query needed to fetch the data from the necessary interval + my $from_clause = "from $type"; + my $values_clause = $self->_get_values_clause( from => $from, values => $values, required_meta => $required_meta ); + my $between_clause = $self->_get_between_clause( start => $start, end => $end, to => $to ); + my $where_clause = $self->_get_where_clause( $meta ); + my $by_clause = $self->_get_by_clause( $required_meta ); + my $query = "$values_clause $between_clause $by_clause $from_clause $where_clause"; + + # issue the query to the webservice to retrieve the data we need to aggregate + $self->websvc->set_raw_output(1); + my $results = $self->websvc->query( query => $query, + output => 'bson'); + + # handle any errors attempting to query the webservice + if ( !$results || $self->websvc->get_error() ) { + + die( "Error querying TSDS web service: " . $self->websvc->get_error() ); + } - $results = MongoDB::BSON->new()->decode_one($results); + $results = MongoDB::BSON->new()->decode_one($results); - if ( $results->{'error'} ) { + if ( $results->{'error'} ) { - die( "Error retrieving data from TSDS: " . $results->{'error_text'} ); - } + die( "Error retrieving data from TSDS: " . $results->{'error_text'} ); + } - $results = $results->{'results'}; + $results = $results->{'results'}; - my $buckets = {}; - my $meta_info = {}; + my $buckets = {}; + my $meta_info = {}; - foreach my $result ( @$results ) { + foreach my $result ( @$results ) { - my @value_types = keys( %$result ); - my $meta_data = {}; - my @meta_keys; + my @value_types = keys( %$result ); + my $meta_data = {}; + my @meta_keys; - # the required fields are not one of the possible value types - # we're also going to omit anything that came back as a result of - # aggregation - foreach my $required ( @$required_meta ) { + # the required fields are not one of the possible value types + # we're also going to omit anything that came back as a result of + # aggregation + foreach my $required ( @$required_meta ) { - @value_types = grep { $_ ne $required && $_ !~ /__(min|max|hist)$/ } @value_types; - $meta_data->{$required} = $result->{$required}; - push( @meta_keys, $result->{$required} ); - } + @value_types = grep { $_ ne $required && $_ !~ /__(min|max|hist)$/ } @value_types; + $meta_data->{$required} = $result->{$required}; + push( @meta_keys, $result->{$required} ); + } - my $key = join( '__', @meta_keys ); - $meta_info->{$key} = $meta_data; - - # Put all of the data points into their respective floored - # buckets - foreach my $value_type ( @value_types ) { + my $key = join( '__', @meta_keys ); + $meta_info->{$key} = $meta_data; + + # Put all of the data points into their respective floored + # buckets + foreach my $value_type ( @value_types ) { - my $entries = $result->{$value_type}; + my $entries = $result->{$value_type}; - next if ( !defined( $entries ) ); + next if ( !defined( $entries ) ); - # Figure this out once, makes it easier later in the code to - # refer to a consistent flag - my $is_aggregate = exists($result->{$value_type . "__max"}) ? 1 : 0; + # Figure this out once, makes it easier later in the code to + # refer to a consistent flag + my $is_aggregate = exists($result->{$value_type . "__max"}) ? 1 : 0; - my $entries_max = $result->{$value_type . "__max"} || []; - my $entries_min = $result->{$value_type . "__min"} || []; - my $entries_hist = $result->{$value_type . "__hist"} || []; + my $entries_max = $result->{$value_type . "__max"} || []; + my $entries_min = $result->{$value_type . "__min"} || []; + my $entries_hist = $result->{$value_type . "__hist"} || []; - for (my $i = 0; $i < @$entries; $i++){ - my $entry = $entries->[$i]; + for (my $i = 0; $i < @$entries; $i++){ + my $entry = $entries->[$i]; - my ( $timestamp, $value ) = @$entry; + my ( $timestamp, $value ) = @$entry; - my $bucket = $to * int($timestamp / $to); + my $bucket = $to * int($timestamp / $to); - push( @{$buckets->{$key}{$bucket}{$value_type}}, {is_aggregate => $is_aggregate, - avg => $value, - min => $entries_min->[$i][1], - max => $entries_max->[$i][1], - hist => $entries_hist->[$i][1], - timestamp => $timestamp} - ); - } - } - } + push( @{$buckets->{$key}{$bucket}{$value_type}}, {is_aggregate => $is_aggregate, + avg => $value, + min => $entries_min->[$i][1], + max => $entries_max->[$i][1], + hist => $entries_hist->[$i][1], + timestamp => $timestamp} + ); + } + } + } - # handle every measurement that was bucketed - my @keys = keys( %$buckets ); + # handle every measurement that was bucketed + my @keys = keys( %$buckets ); - foreach my $key ( @keys ) { + foreach my $key ( @keys ) { - # grab meta data hash to pass for this measurement - my $meta_data = $meta_info->{$key}; + # grab meta data hash to pass for this measurement + my $meta_data = $meta_info->{$key}; - # handle every bucketed timestamp for this measurement - my @timestamps = keys( %{$buckets->{$key}} ); + # handle every bucketed timestamp for this measurement + my @timestamps = keys( %{$buckets->{$key}} ); - foreach my $time ( @timestamps ) { + foreach my $time ( @timestamps ) { - # all the data during this bucket to aggregate for this measurement - my $data = $buckets->{$key}{$time}; + # all the data during this bucket to aggregate for this measurement + my $data = $buckets->{$key}{$time}; - my $aggregated = $self->_aggregate( data => $data, - required_meta => $required_meta, - hist_mappings => $hist_mappings, - hist_min_max_mappings => $min_max_mappings, - key => $key ); - - $aggregated->{'type'} = "$type.aggregate"; - $aggregated->{'time'} = $time; - $aggregated->{'interval'} = $to; - $aggregated->{'meta'} = $meta_data; + my $aggregated = $self->_aggregate( data => $data, + required_meta => $required_meta, + hist_mappings => $hist_mappings, + hist_min_max_mappings => $min_max_mappings, + key => $key ); + + $aggregated->{'type'} = "$type.aggregate"; + $aggregated->{'time'} = $time; + $aggregated->{'interval'} = $to; + $aggregated->{'meta'} = $meta_data; - push( @$finished_messages, $aggregated ); - } - } + push( @$finished_messages, $aggregated ); + } + } } catch { # any failed aggregates are not added to 'finished_messages' From ab6ca27825093e1fe75ec9f083cd96e6a169c799 Mon Sep 17 00:00:00 2001 From: Joshua McNamara Date: Wed, 13 Dec 2023 18:24:32 +0000 Subject: [PATCH 5/6] SCTASK0123222 - VSCode hates Perl indexing from GitHub --- lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm | 208 +++++++++--------- 1 file changed, 104 insertions(+), 104 deletions(-) diff --git a/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm b/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm index 3d7446c..e33e4d0 100644 --- a/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm +++ b/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm @@ -357,139 +357,139 @@ sub _aggregate_messages { foreach my $message ( @$messages ) { try { - my $type = $message->type; - my $from = $message->interval_from; - my $to = $message->interval_to; - my $start = $message->start; - my $end = $message->end; - my $meta = $message->meta; - my $values = $message->values; - my $required_meta = $message->required_meta; - - # align to aggregation window we're getting data for - $start = nlowmult( $to, $start ); - $end = nhimult( $to, $end ); - - my $min_max_mappings = $self->_get_min_max_mappings( required_meta => $required_meta, - meta => $meta ); - - my $hist_mappings = $self->_get_histogram_mappings( $values ); - - # craft the query needed to fetch the data from the necessary interval - my $from_clause = "from $type"; - my $values_clause = $self->_get_values_clause( from => $from, values => $values, required_meta => $required_meta ); - my $between_clause = $self->_get_between_clause( start => $start, end => $end, to => $to ); - my $where_clause = $self->_get_where_clause( $meta ); - my $by_clause = $self->_get_by_clause( $required_meta ); - my $query = "$values_clause $between_clause $by_clause $from_clause $where_clause"; - - # issue the query to the webservice to retrieve the data we need to aggregate - $self->websvc->set_raw_output(1); - my $results = $self->websvc->query( query => $query, - output => 'bson'); - - # handle any errors attempting to query the webservice - if ( !$results || $self->websvc->get_error() ) { - - die( "Error querying TSDS web service: " . $self->websvc->get_error() ); - } + my $type = $message->type; + my $from = $message->interval_from; + my $to = $message->interval_to; + my $start = $message->start; + my $end = $message->end; + my $meta = $message->meta; + my $values = $message->values; + my $required_meta = $message->required_meta; + + # align to aggregation window we're getting data for + $start = nlowmult( $to, $start ); + $end = nhimult( $to, $end ); + + my $min_max_mappings = $self->_get_min_max_mappings( required_meta => $required_meta, + meta => $meta ); + + my $hist_mappings = $self->_get_histogram_mappings( $values ); + + # craft the query needed to fetch the data from the necessary interval + my $from_clause = "from $type"; + my $values_clause = $self->_get_values_clause( from => $from, values => $values, required_meta => $required_meta ); + my $between_clause = $self->_get_between_clause( start => $start, end => $end, to => $to ); + my $where_clause = $self->_get_where_clause( $meta ); + my $by_clause = $self->_get_by_clause( $required_meta ); + my $query = "$values_clause $between_clause $by_clause $from_clause $where_clause"; + + # issue the query to the webservice to retrieve the data we need to aggregate + $self->websvc->set_raw_output(1); + my $results = $self->websvc->query( query => $query, + output => 'bson'); + + # handle any errors attempting to query the webservice + if ( !$results || $self->websvc->get_error() ) { + + die( "Error querying TSDS web service: " . $self->websvc->get_error() ); + } - $results = MongoDB::BSON->new()->decode_one($results); + $results = MongoDB::BSON->new()->decode_one($results); - if ( $results->{'error'} ) { + if ( $results->{'error'} ) { - die( "Error retrieving data from TSDS: " . $results->{'error_text'} ); - } + die( "Error retrieving data from TSDS: " . $results->{'error_text'} ); + } - $results = $results->{'results'}; + $results = $results->{'results'}; - my $buckets = {}; - my $meta_info = {}; + my $buckets = {}; + my $meta_info = {}; - foreach my $result ( @$results ) { + foreach my $result ( @$results ) { - my @value_types = keys( %$result ); - my $meta_data = {}; - my @meta_keys; + my @value_types = keys( %$result ); + my $meta_data = {}; + my @meta_keys; - # the required fields are not one of the possible value types - # we're also going to omit anything that came back as a result of - # aggregation - foreach my $required ( @$required_meta ) { + # the required fields are not one of the possible value types + # we're also going to omit anything that came back as a result of + # aggregation + foreach my $required ( @$required_meta ) { - @value_types = grep { $_ ne $required && $_ !~ /__(min|max|hist)$/ } @value_types; - $meta_data->{$required} = $result->{$required}; - push( @meta_keys, $result->{$required} ); - } + @value_types = grep { $_ ne $required && $_ !~ /__(min|max|hist)$/ } @value_types; + $meta_data->{$required} = $result->{$required}; + push( @meta_keys, $result->{$required} ); + } - my $key = join( '__', @meta_keys ); - $meta_info->{$key} = $meta_data; - - # Put all of the data points into their respective floored - # buckets - foreach my $value_type ( @value_types ) { + my $key = join( '__', @meta_keys ); + $meta_info->{$key} = $meta_data; + + # Put all of the data points into their respective floored + # buckets + foreach my $value_type ( @value_types ) { - my $entries = $result->{$value_type}; + my $entries = $result->{$value_type}; - next if ( !defined( $entries ) ); + next if ( !defined( $entries ) ); - # Figure this out once, makes it easier later in the code to - # refer to a consistent flag - my $is_aggregate = exists($result->{$value_type . "__max"}) ? 1 : 0; + # Figure this out once, makes it easier later in the code to + # refer to a consistent flag + my $is_aggregate = exists($result->{$value_type . "__max"}) ? 1 : 0; - my $entries_max = $result->{$value_type . "__max"} || []; - my $entries_min = $result->{$value_type . "__min"} || []; - my $entries_hist = $result->{$value_type . "__hist"} || []; + my $entries_max = $result->{$value_type . "__max"} || []; + my $entries_min = $result->{$value_type . "__min"} || []; + my $entries_hist = $result->{$value_type . "__hist"} || []; - for (my $i = 0; $i < @$entries; $i++){ - my $entry = $entries->[$i]; + for (my $i = 0; $i < @$entries; $i++){ + my $entry = $entries->[$i]; - my ( $timestamp, $value ) = @$entry; + my ( $timestamp, $value ) = @$entry; - my $bucket = $to * int($timestamp / $to); + my $bucket = $to * int($timestamp / $to); - push( @{$buckets->{$key}{$bucket}{$value_type}}, {is_aggregate => $is_aggregate, - avg => $value, - min => $entries_min->[$i][1], - max => $entries_max->[$i][1], - hist => $entries_hist->[$i][1], - timestamp => $timestamp} - ); - } + push( @{$buckets->{$key}{$bucket}{$value_type}}, {is_aggregate => $is_aggregate, + avg => $value, + min => $entries_min->[$i][1], + max => $entries_max->[$i][1], + hist => $entries_hist->[$i][1], + timestamp => $timestamp} + ); + } + } } - } - # handle every measurement that was bucketed - my @keys = keys( %$buckets ); + # handle every measurement that was bucketed + my @keys = keys( %$buckets ); - foreach my $key ( @keys ) { + foreach my $key ( @keys ) { - # grab meta data hash to pass for this measurement - my $meta_data = $meta_info->{$key}; + # grab meta data hash to pass for this measurement + my $meta_data = $meta_info->{$key}; - # handle every bucketed timestamp for this measurement - my @timestamps = keys( %{$buckets->{$key}} ); + # handle every bucketed timestamp for this measurement + my @timestamps = keys( %{$buckets->{$key}} ); - foreach my $time ( @timestamps ) { + foreach my $time ( @timestamps ) { - # all the data during this bucket to aggregate for this measurement - my $data = $buckets->{$key}{$time}; + # all the data during this bucket to aggregate for this measurement + my $data = $buckets->{$key}{$time}; - my $aggregated = $self->_aggregate( data => $data, - required_meta => $required_meta, - hist_mappings => $hist_mappings, - hist_min_max_mappings => $min_max_mappings, - key => $key ); - - $aggregated->{'type'} = "$type.aggregate"; - $aggregated->{'time'} = $time; - $aggregated->{'interval'} = $to; - $aggregated->{'meta'} = $meta_data; + my $aggregated = $self->_aggregate( data => $data, + required_meta => $required_meta, + hist_mappings => $hist_mappings, + hist_min_max_mappings => $min_max_mappings, + key => $key ); + + $aggregated->{'type'} = "$type.aggregate"; + $aggregated->{'time'} = $time; + $aggregated->{'interval'} = $to; + $aggregated->{'meta'} = $meta_data; - push( @$finished_messages, $aggregated ); + push( @$finished_messages, $aggregated ); + } } } - } catch { # any failed aggregates are not added to 'finished_messages' # and are instead pushed to a failed queue From 2123c10eb3ea37b96fca790250557b9f745926ef Mon Sep 17 00:00:00 2001 From: Joshua McNamara Date: Wed, 13 Dec 2023 18:35:18 +0000 Subject: [PATCH 6/6] SCTASK0123222 - Forgot to remove --- lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm b/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm index e33e4d0..5fa071f 100644 --- a/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm +++ b/lib/GRNOC/TSDS/Aggregate/Aggregator/Worker.pm @@ -498,13 +498,13 @@ sub _aggregate_messages { # Convert Message object to hash (for encoding to JSON later) my %failed_message = ( type => $message->type, - interval_from => $message->interval_from; - interval_to => $message->interval_to; - start => $message->start; - end => $message->end; - meta => $message->meta; - values => $message->values; - required_meta => $message->required_meta; + interval_from => $message->interval_from, + interval_to => $message->interval_to, + start => $message->start, + end => $message->end, + meta => $message->meta, + values => $message->values, + required_meta => $message->required_meta ); push( @{$results->{'failed_messages'}}, %failed_message ); }