Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt Periodic Puma Metrics #3521

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/controllers/internal/metrics_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class MetricsController < RestController::BaseController
get '/internal/v4/metrics', :index

def index
CloudController::DependencyLocator.instance.periodic_updater.update!
CloudController::DependencyLocator.instance.periodic_updater.update! unless VCAP::CloudController::Config.config.get(:webserver) == 'puma'
[200, Prometheus::Client::Formats::Text.marshal(Prometheus::Client.registry)]
end
end
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/internal/staging_completion_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def report_metrics(bbs_staging_response)
end

def statsd_updater
@statsd_updater ||= VCAP::CloudController::Metrics::StatsdUpdater.new
CloudController::DependencyLocator.instance.statsd_updater
end

def prometheus_updater
Expand Down
2 changes: 1 addition & 1 deletion app/jobs/diego/sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module VCAP::CloudController
module Jobs
module Diego
class Sync < VCAP::CloudController::Jobs::CCJob
def initialize(statsd=Statsd.new)
def initialize(statsd=CloudController::DependencyLocator.instance.statsd_client)
@statsd = statsd
end

Expand Down
3 changes: 3 additions & 0 deletions lib/cloud_controller/config_schemas/base/clock_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ class ClockSchema < VCAP::Config
optional(:priorities) => Hash
},

statsd_host: String,
statsd_port: Integer,

max_labels_per_resource: Integer,
max_annotations_per_resource: Integer,
custom_metric_tag_prefix_list: Array
Expand Down
2 changes: 1 addition & 1 deletion lib/cloud_controller/diego/messenger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
module VCAP::CloudController
module Diego
class Messenger
def initialize(statsd_updater=VCAP::CloudController::Metrics::StatsdUpdater.new, prometheus_updater=CloudController::DependencyLocator.instance.prometheus_updater)
def initialize(statsd_updater=CloudController::DependencyLocator.instance.statsd_updater, prometheus_updater=CloudController::DependencyLocator.instance.prometheus_updater)
@statsd_updater = statsd_updater
@prometheus_updater = prometheus_updater
end
Expand Down
2 changes: 1 addition & 1 deletion lib/cloud_controller/diego/processes_sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class Error < StandardError
class BBSFetchError < Error
end

def initialize(config:, statsd_updater: VCAP::CloudController::Metrics::StatsdUpdater.new)
def initialize(config:, statsd_updater: CloudController::DependencyLocator.instance.statsd_updater)
@config = config
@workpool = WorkPool.new(50, store_exceptions: true)
@statsd_updater = statsd_updater
Expand Down
7 changes: 4 additions & 3 deletions lib/cloud_controller/metrics/periodic_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ def update_job_queue_length
end

def update_thread_info
local_thread_info = thread_info
return unless VCAP::CloudController::Config.config.get(:webserver) == 'thin'

[@statsd_updater, @prometheus_updater].each { |u| u.update_thread_info(local_thread_info) }
local_thread_info = thread_info_thin
[@statsd_updater, @prometheus_updater].each { |u| u.update_thread_info_thin(local_thread_info) }
end

def update_failed_job_count
Expand Down Expand Up @@ -132,7 +133,7 @@ def update_vitals
@prometheus_updater.update_vitals(prom_vitals)
end

def thread_info
def thread_info_thin
threadqueue = EM.instance_variable_get(:@threadqueue) || []
resultqueue = EM.instance_variable_get(:@resultqueue) || []
{
Expand Down
73 changes: 42 additions & 31 deletions lib/cloud_controller/metrics/prometheus_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,40 @@ class PrometheusUpdater
DURATION_BUCKETS = [5, 10, 30, 60, 300, 600, 890].freeze

METRICS = [
{ type: :gauge, name: :cc_job_queues_length_total, docstring: 'Job queues length of worker processes', labels: [:queue] },
{ type: :gauge, name: :cc_failed_jobs_total, docstring: 'Number of failed jobs of worker processes', labels: [:queue] },
{ type: :gauge, name: :cc_job_queues_length_total, docstring: 'Job queues length of worker processes', labels: [:queue], aggregation: :most_recent },
{ type: :gauge, name: :cc_failed_jobs_total, docstring: 'Number of failed jobs of worker processes', labels: [:queue], aggregation: :most_recent },
{ type: :counter, name: :cc_staging_requests_total, docstring: 'Number of staging requests' },
{ type: :histogram, name: :cc_staging_succeeded_duration_seconds, docstring: 'Durations of successful staging events', buckets: DURATION_BUCKETS },
{ type: :histogram, name: :cc_staging_failed_duration_seconds, docstring: 'Durations of failed staging events', buckets: DURATION_BUCKETS },
{ type: :gauge, name: :cc_requests_outstanding_total, docstring: 'Requests outstanding' },
{ type: :gauge, name: :cc_requests_outstanding_total, docstring: 'Requests outstanding', aggregation: :sum },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be interesting to see how the metrics look like if we don't aggregate them. Would the different values emitted by each worker be stored in the same time series, using a label "worker", or would they simply overwrite each other and lead to wrong metrics?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metrics would be labelled with the pid of the worker processes. I'm in favor of aggregating them to be compatible with the Thin metrics, i.e. have one metric per vm. In the end we are not even interested in the outstanding requests per vm, but for all apis, so I don't see a use case for having metrics per process.

{ type: :counter, name: :cc_requests_completed_total, docstring: 'Requests completed' },
{ type: :gauge, name: :cc_vitals_started_at, docstring: 'CloudController Vitals: started_at', aggregation: :most_recent },
{ type: :gauge, name: :cc_vitals_mem_bytes, docstring: 'CloudController Vitals: mem_bytes', aggregation: :most_recent },
{ type: :gauge, name: :cc_vitals_cpu_load_avg, docstring: 'CloudController Vitals: cpu_load_avg', aggregation: :most_recent },
{ type: :gauge, name: :cc_vitals_mem_used_bytes, docstring: 'CloudController Vitals: mem_used_bytes', aggregation: :most_recent },
{ type: :gauge, name: :cc_vitals_mem_free_bytes, docstring: 'CloudController Vitals: mem_free_bytes', aggregation: :most_recent },
{ type: :gauge, name: :cc_vitals_num_cores, docstring: 'CloudController Vitals: num_cores', aggregation: :most_recent },
{ type: :gauge, name: :cc_running_tasks_total, docstring: 'Total running tasks', aggregation: :most_recent },
{ type: :gauge, name: :cc_running_tasks_memory_bytes, docstring: 'Total memory consumed by running tasks', aggregation: :most_recent },
{ type: :gauge, name: :cc_users_total, docstring: 'Number of users', aggregation: :most_recent },
{ type: :gauge, name: :cc_deployments_in_progress_total, docstring: 'Number of in progress deployments', aggregation: :most_recent }
].freeze

THIN_METRICS = [
{ type: :gauge, name: :cc_thread_info_thread_count, docstring: 'Thread count' },
{ type: :gauge, name: :cc_thread_info_event_machine_connection_count, docstring: 'EventMachine connection count' },
{ type: :gauge, name: :cc_thread_info_event_machine_threadqueue_size, docstring: 'EventMachine thread queue size' },
{ type: :gauge, name: :cc_thread_info_event_machine_threadqueue_num_waiting, docstring: 'EventMachine num waiting in thread' },
{ type: :gauge, name: :cc_thread_info_event_machine_resultqueue_size, docstring: 'EventMachine queue size' },
{ type: :gauge, name: :cc_thread_info_event_machine_resultqueue_num_waiting, docstring: 'EventMachine requests waiting in queue' },
{ type: :gauge, name: :cc_vitals_started_at, docstring: 'CloudController Vitals: started_at' },
{ type: :gauge, name: :cc_vitals_mem_bytes, docstring: 'CloudController Vitals: mem_bytes' },
{ type: :gauge, name: :cc_vitals_cpu_load_avg, docstring: 'CloudController Vitals: cpu_load_avg' },
{ type: :gauge, name: :cc_vitals_mem_used_bytes, docstring: 'CloudController Vitals: mem_used_bytes' },
{ type: :gauge, name: :cc_vitals_mem_free_bytes, docstring: 'CloudController Vitals: mem_free_bytes' },
{ type: :gauge, name: :cc_vitals_num_cores, docstring: 'CloudController Vitals: num_cores' },
{ type: :gauge, name: :cc_running_tasks_total, docstring: 'Total running tasks' },
{ type: :gauge, name: :cc_running_tasks_memory_bytes, docstring: 'Total memory consumed by running tasks' },
{ type: :gauge, name: :cc_users_total, docstring: 'Number of users' },
{ type: :gauge, name: :cc_deployments_in_progress_total, docstring: 'Number of in progress deployments' }
{ type: :gauge, name: :cc_thread_info_event_machine_resultqueue_num_waiting, docstring: 'EventMachine requests waiting in queue' }
].freeze

def initialize(registry=Prometheus::Client.registry)
@registry = registry

# Register all metrics, to initialize them for discoverability
METRICS.map do |metric|
register_metric(metric[:type], metric[:name], metric[:docstring], labels: metric[:labels] || {}, buckets: metric[:buckets] || {}) unless @registry.exist?(metric[:name])
end
end

def register_metric(type, name, message, labels: {}, buckets: {})
case type
when :gauge
@registry.gauge(name, docstring: message, labels: labels)
when :counter
@registry.counter(name, docstring: message, labels: labels)
when :histogram
@registry.histogram(name, docstring: message, labels: labels, buckets: buckets)
else
throw ArgumentError("Metric type #{type} does not exist.")
end
METRICS.each { |metric| register(metric) }
THIN_METRICS.each { |metric| register(metric) } if VCAP::CloudController::Config.config.get(:webserver) == 'thin'
end

def update_gauge_metric(metric, value, labels: {})
Expand Down Expand Up @@ -90,7 +79,7 @@ def update_job_queue_length(pending_job_count_by_queue)
end
end

def update_thread_info(thread_info)
def update_thread_info_thin(thread_info)
update_gauge_metric(:cc_thread_info_thread_count, thread_info[:thread_count])
update_gauge_metric(:cc_thread_info_event_machine_connection_count, thread_info[:event_machine][:connection_count])
update_gauge_metric(:cc_thread_info_event_machine_threadqueue_size, thread_info[:event_machine][:threadqueue][:size])
Expand Down Expand Up @@ -131,6 +120,28 @@ def report_staging_failure_metrics(duration_ns)

private

def register(metric)
return if @registry.exist?(metric[:name])

register_metric(metric[:type], metric[:name], metric[:docstring], labels: metric[:labels] || [], buckets: metric[:buckets] || [], aggregation: metric[:aggregation])
end

def register_metric(type, name, message, labels:, buckets:, aggregation:)
store_settings = {}
store_settings[:aggregation] = aggregation if aggregation.present? && Prometheus::Client.config.data_store.instance_of?(Prometheus::Client::DataStores::DirectFileStore)

case type
when :gauge
@registry.gauge(name, docstring: message, labels: labels, store_settings: store_settings)
when :counter
@registry.counter(name, docstring: message, labels: labels, store_settings: store_settings)
when :histogram
@registry.histogram(name, docstring: message, labels: labels, buckets: buckets, store_settings: store_settings)
else
throw ArgumentError("Metric type #{type} does not exist.")
end
end

def nanoseconds_to_seconds(time_ns)
(time_ns / 1e9).to_f
end
Expand Down
2 changes: 1 addition & 1 deletion lib/cloud_controller/metrics/request_metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module VCAP::CloudController
module Metrics
class RequestMetrics
def initialize(statsd=Statsd.new, prometheus_updater=CloudController::DependencyLocator.instance.prometheus_updater)
def initialize(statsd=CloudController::DependencyLocator.instance.statsd_client, prometheus_updater=CloudController::DependencyLocator.instance.prometheus_updater)
@counter = 0
@statsd = statsd
@prometheus_updater = prometheus_updater
Expand Down
4 changes: 2 additions & 2 deletions lib/cloud_controller/metrics/statsd_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module VCAP::CloudController::Metrics
class StatsdUpdater
def initialize(statsd=Statsd.new)
def initialize(statsd=CloudController::DependencyLocator.instance.statsd_client)
@statsd = statsd
end

Expand All @@ -23,7 +23,7 @@ def update_job_queue_length(pending_job_count_by_queue, total)
end
end

def update_thread_info(thread_info)
def update_thread_info_thin(thread_info)
@statsd.batch do |batch|
batch.gauge('cc.thread_info.thread_count', thread_info[:thread_count])
batch.gauge('cc.thread_info.event_machine.connection_count', thread_info[:event_machine][:connection_count])
Expand Down
20 changes: 20 additions & 0 deletions lib/cloud_controller/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
require 'cloud_controller/secrets_fetcher'
require 'cloud_controller/runners/thin_runner'
require 'cloud_controller/runners/puma_runner'
require 'prometheus/client/data_stores/direct_file_store'

module VCAP::CloudController
class Runner
Expand Down Expand Up @@ -95,6 +96,7 @@ def run!
private

def setup_cloud_controller
setup_metrics
setup_logging
setup_telemetry_logging
setup_db
Expand All @@ -112,6 +114,24 @@ def create_pidfile
raise "ERROR: Can't create pid file #{@config.get(:pid_filename)}"
end

def setup_metrics
return if @setup_metrics

@setup_metrics = true

return unless @config.get(:webserver) == 'puma'

prometheus_dir = File.join(@config.get(:directories, :tmpdir), 'prometheus')
FileUtils.mkdir_p(prometheus_dir)

# Resetting metrics on startup
Dir["#{prometheus_dir}/*.bin"].each do |file_path|
File.unlink(file_path)
end

Prometheus::Client.config.data_store = Prometheus::Client::DataStores::DirectFileStore.new(dir: prometheus_dir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for using a file datastore instead of in memory, if we want to reset the metrics on startup anyways?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The in-memory data store does not work for metrics collected by different processes (i.e. the outstanding requests), thus we need some kind of "central" storage; the easiest is the direct file store.

end

def setup_logging
return if @setup_logging

Expand Down
25 changes: 22 additions & 3 deletions lib/vcap/stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ module VCAP
class Stats
class << self
def process_memory_bytes_and_cpu
rss, pcpu = `ps -o rss=,pcpu= -p #{Process.pid}`.split.map(&:to_i)
rss_bytes = rss * 1024
[rss_bytes, pcpu]
rss = []
pcpu = []

ps_out = ps_pid
ps_out += ps_ppid if VCAP::CloudController::Config.config.get(:webserver) == 'puma'
ps_out.split.each_with_index { |e, i| i.even? ? rss << e : pcpu << e }

[rss.map(&:to_i).sum * 1024, pcpu.map(&:to_f).sum.round]
end

def memory_used_bytes
Expand All @@ -23,6 +28,20 @@ def memory_free_bytes
def cpu_load_average
Vmstat.load_average.one_minute
end

private

def ps_pid
`ps -o rss=,pcpu= -p #{Process.pid}`
end

def ps_ppid
if RUBY_PLATFORM.match?(/darwin/)
`ps ax -o ppid,rss,pcpu | awk '$1 == #{Process.pid} { print $2,$3 }'`
else
`ps -o rss=,pcpu= --ppid #{Process.pid}`
end
end
end
end
end
5 changes: 4 additions & 1 deletion spec/unit/jobs/diego/sync_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
module VCAP::CloudController
module Jobs::Diego
RSpec.describe Sync, job_context: :clock do
let(:statsd_client) { instance_double(Statsd) }
let(:processes_sync) { instance_double(Diego::ProcessesSync) }
let(:tasks_sync) { instance_double(Diego::ProcessesSync) }

subject(:job) { Sync.new }

describe '#perform' do
before do
allow_any_instance_of(CloudController::DependencyLocator).to receive(:statsd_client).and_return(statsd_client)
allow(Diego::ProcessesSync).to receive(:new).and_return(processes_sync)
allow(Diego::TasksSync).to receive(:new).and_return(tasks_sync)

allow(statsd_client).to receive(:timing)
allow(processes_sync).to receive(:sync)
allow(tasks_sync).to receive(:sync)
end
Expand All @@ -35,7 +38,7 @@ module Jobs::Diego
expect(processes_sync).to receive(:sync)
expect(tasks_sync).to receive(:sync)
expect(Time).to receive(:now).twice # Ensure that we get two time measurements. _Hopefully_ they get turned into an elapsed time and passed in where they need to be!
expect_any_instance_of(Statsd).to receive(:timing).with('cc.diego_sync.duration', kind_of(Numeric))
expect(statsd_client).to receive(:timing).with('cc.diego_sync.duration', kind_of(Numeric))

job.perform
end
Expand Down
1 change: 1 addition & 0 deletions spec/unit/lib/cloud_controller/clock/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ module VCAP::CloudController
end

it 'schedules the frequent inline jobs' do
allow_any_instance_of(CloudController::DependencyLocator).to receive(:statsd_client).and_return(instance_double(Statsd))
allow(clock).to receive(:schedule_daily_job)
allow(clock).to receive(:schedule_frequent_worker_job)
expect(clock).to receive(:schedule_frequent_inline_job) do |args, &block|
Expand Down
34 changes: 25 additions & 9 deletions spec/unit/lib/cloud_controller/metrics/periodic_updater_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ module VCAP::CloudController::Metrics
before do
allow(statsd_updater).to receive(:update_user_count)
allow(statsd_updater).to receive(:update_job_queue_length)
allow(statsd_updater).to receive(:update_thread_info)
allow(statsd_updater).to receive(:update_thread_info_thin)
allow(statsd_updater).to receive(:update_failed_job_count)
allow(statsd_updater).to receive(:update_vitals)
allow(statsd_updater).to receive(:update_log_counts)
Expand All @@ -81,7 +81,7 @@ module VCAP::CloudController::Metrics

allow(prometheus_updater).to receive(:update_user_count)
allow(prometheus_updater).to receive(:update_job_queue_length)
allow(prometheus_updater).to receive(:update_thread_info)
allow(prometheus_updater).to receive(:update_thread_info_thin)
allow(prometheus_updater).to receive(:update_failed_job_count)
allow(prometheus_updater).to receive(:update_vitals)
allow(prometheus_updater).to receive(:update_log_counts)
Expand Down Expand Up @@ -449,10 +449,8 @@ module VCAP::CloudController::Metrics

describe '#update_thread_info' do
before do
allow(statsd_updater).to receive(:update_thread_info)
allow(prometheus_updater).to receive(:update_thread_info)

periodic_updater.update_thread_info
allow(statsd_updater).to receive(:update_thread_info_thin)
allow(prometheus_updater).to receive(:update_thread_info_thin)
end

it 'contains EventMachine data and send it to all updaters' do
Expand All @@ -471,8 +469,10 @@ module VCAP::CloudController::Metrics
}
}

expect(statsd_updater).to have_received(:update_thread_info).with(expected_thread_info)
expect(prometheus_updater).to have_received(:update_thread_info).with(expected_thread_info)
periodic_updater.update_thread_info

expect(statsd_updater).to have_received(:update_thread_info_thin).with(expected_thread_info)
expect(prometheus_updater).to have_received(:update_thread_info_thin).with(expected_thread_info)
end

context 'when resultqueue and/or threadqueue is not a queue' do
Expand All @@ -495,7 +495,23 @@ module VCAP::CloudController::Metrics
}
}

expect(statsd_updater).to have_received(:update_thread_info).with(expected_thread_info)
periodic_updater.update_thread_info

expect(statsd_updater).to have_received(:update_thread_info_thin).with(expected_thread_info)
expect(prometheus_updater).to have_received(:update_thread_info_thin).with(expected_thread_info)
end
end

context 'when Puma is configured as webserver' do
before do
TestConfig.override(webserver: 'puma')
end

it 'does not send EventMachine data to updaters' do
periodic_updater.update_thread_info

expect(statsd_updater).not_to have_received(:update_thread_info_thin)
expect(prometheus_updater).not_to have_received(:update_thread_info_thin)
end
end
end
Expand Down
Loading