Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create metrics in a queue which can be processed in advance of the write queue #960

Merged
merged 2 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion lib/carbon/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import threading
from operator import itemgetter
from random import choice
from collections import defaultdict
from collections import defaultdict, deque

from carbon.conf import settings
from carbon import events, log
Expand Down Expand Up @@ -189,6 +189,7 @@ class _MetricCache(defaultdict):
def __init__(self, strategy=None):
self.lock = threading.Lock()
self.size = 0
self.new_metrics = deque()
self.strategy = None
if strategy:
self.strategy = strategy(self)
Expand Down Expand Up @@ -253,6 +254,8 @@ def store(self, metric, datapoint):
log.msg("MetricCache is full: self.size=%d" % self.size)
events.cacheFull()
else:
if not self[metric]:
self.new_metrics.append(metric)
self.size += 1
self[metric][timestamp] = value
if self.strategy:
Expand Down
14 changes: 7 additions & 7 deletions lib/carbon/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def drain(self, cost, blocking=False):
'''Given a number of tokens (or fractions) drain will return True and
drain the number of tokens from the bucket if the capacity allows,
otherwise we return false and leave the contents of the bucket.'''
if cost <= self.tokens:
if self.peek(cost):
self._tokens -= cost
return True

Expand All @@ -310,16 +310,16 @@ def setCapacityAndFillRate(self, new_capacity, new_fill_rate):
self.fill_rate = float(new_fill_rate)
self._tokens = delta + self._tokens

@property
def tokens(self):
'''The tokens property will return the current number of tokens in the
bucket.'''
if self._tokens < self.capacity:
def peek(self, cost):
'''Return true if the bucket can drain cost without blocking.'''
if self._tokens >= cost:
return True
else:
now = time()
delta = self.fill_rate * (now - self.timestamp)
self._tokens = min(self.capacity, self._tokens + delta)
self.timestamp = now
return self._tokens
return self._tokens >= cost


class PluginRegistrar(type):
Expand Down
41 changes: 27 additions & 14 deletions lib/carbon/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,24 +95,25 @@ def writeCachedDataPoints():

cache = MetricCache()
while cache:
(metric, datapoints) = cache.drain_metric()
if metric is None:
# end the loop
break
# First, create new metrics files, which is helpful for graphite-web
while cache.new_metrics and (not CREATE_BUCKET or CREATE_BUCKET.peek(1)):
metric = cache.new_metrics.popleft()

dbFileExists = state.database.exists(metric)
if metric not in cache:
# This metric has already been drained. There's no sense in creating it.
continue

if not dbFileExists:
if CREATE_BUCKET and not CREATE_BUCKET.drain(1):
# If our tokenbucket doesn't have enough tokens available to create a new metric
# file then we'll just drop the metric on the ground and move on to the next
# metric.
# XXX This behavior should probably be configurable to no tdrop metrics
# when rate limiting unless our cache is too big or some other legit
# reason.
instrumentation.increment('droppedCreates')
if state.database.exists(metric):
continue

if CREATE_BUCKET and not CREATE_BUCKET.drain(1):
# This should never actually happen as no other thread should be
# draining our tokens, and we just checked for a token.
# Just put the new metric back in the create list and we'll try again
# after writing an update.
cache.new_metrics.appendleft(metric)
break

archiveConfig = None
xFilesFactor, aggregationMethod = None, None

Expand Down Expand Up @@ -150,6 +151,18 @@ def writeCachedDataPoints():
instrumentation.increment('errors')
continue

# now drain and persist some data
(metric, datapoints) = cache.drain_metric()
if metric is None:
# end the loop
break

if not state.database.exists(metric):
# If we get here, the metric must still be in new_metrics. We're
# creating too fast, and we'll drop this data.
instrumentation.increment('droppedCreates')
continue

# If we've got a rate limit configured lets makes sure we enforce it
waitTime = 0
if UPDATE_BUCKET:
Expand Down
Loading