Skip to content

Commit

Permalink
Merge pull request prometheus#11840 from prometheus/beorn7/histogram-…
Browse files Browse the repository at this point in the history
…gauge

tsdb: Add integer gauge histogram support
  • Loading branch information
beorn7 authored Jan 11, 2023
2 parents 6948fb1 + 6dcd03d commit ac96da3
Show file tree
Hide file tree
Showing 8 changed files with 487 additions and 89 deletions.
4 changes: 2 additions & 2 deletions model/textparse/openmetricsparse.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ func (p *OpenMetricsParser) Series() ([]byte, *int64, float64) {
return p.series, nil, p.val
}

// Histogram always returns (nil, nil, nil, nil) because OpenMetrics does not support
// sparse histograms.
// Histogram returns (nil, nil, nil, nil) for now because OpenMetrics does not
// support sparse histograms yet.
func (p *OpenMetricsParser) Histogram() ([]byte, *int64, *histogram.Histogram, *histogram.FloatHistogram) {
return nil, nil, nil, nil
}
Expand Down
4 changes: 2 additions & 2 deletions model/textparse/promparse.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ func (p *PromParser) Series() ([]byte, *int64, float64) {
return p.series, nil, p.val
}

// Histogram always returns (nil, nil, nil, nil) because the Prometheus text format
// does not support sparse histograms.
// Histogram returns (nil, nil, nil, nil) for now because the Prometheus text
// format does not support sparse histograms yet.
func (p *PromParser) Histogram() ([]byte, *int64, *histogram.Histogram, *histogram.FloatHistogram) {
return nil, nil, nil, nil
}
Expand Down
138 changes: 108 additions & 30 deletions tsdb/chunkenc/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func newHistogramIterator(b []byte) *histogramIterator {
// The first 3 bytes contain chunk headers.
// We skip that for actual samples.
_, _ = it.br.readBits(24)
it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000)
return it
}

Expand Down Expand Up @@ -222,6 +223,14 @@ type HistogramAppender struct {
trailing uint8
}

func (a *HistogramAppender) GetCounterResetHeader() CounterResetHeader {
return CounterResetHeader(a.b.bytes()[2] & 0b11000000)
}

func (a *HistogramAppender) NumSamples() int {
return int(binary.BigEndian.Uint16(a.b.bytes()))
}

// Append implements Appender. This implementation panics because normal float
// samples must never be appended to a histogram chunk.
func (a *HistogramAppender) Append(int64, float64) {
Expand All @@ -237,19 +246,16 @@ func (a *HistogramAppender) AppendFloatHistogram(int64, *histogram.FloatHistogra
// Appendable returns whether the chunk can be appended to, and if so
// whether any recoding needs to happen using the provided interjections
// (in case of any new buckets, positive or negative range, respectively).
// If the sample is a gauge histogram, AppendableGauge must be used instead.
//
// The chunk is not appendable in the following cases:
//
// • The schema has changed.
//
// • The threshold for the zero bucket has changed.
//
// • Any buckets have disappeared.
//
// • There was a counter reset in the count of observations or in any bucket,
// including the zero bucket.
//
// • The last sample in the chunk was stale while the current sample is not stale.
// - The schema has changed.
// - The threshold for the zero bucket has changed.
// - Any buckets have disappeared.
// - There was a counter reset in the count of observations or in any bucket,
// including the zero bucket.
// - The last sample in the chunk was stale while the current sample is not stale.
//
// The method returns an additional boolean set to true if it is not appendable
// because of a counter reset. If the given sample is stale, it is always ok to
Expand All @@ -258,6 +264,9 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) (
positiveInterjections, negativeInterjections []Interjection,
okToAppend, counterReset bool,
) {
if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType {
return
}
if value.IsStaleNaN(h.Sum) {
// This is a stale sample whose buckets and spans don't matter.
okToAppend = true
Expand Down Expand Up @@ -307,8 +316,47 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) (
return
}

type bucketValue interface {
int64 | float64
// AppendableGauge returns whether the chunk can be appended to, and if so
// whether:
// 1. Any recoding needs to happen to the chunk using the provided interjections
// (in case of any new buckets, positive or negative range, respectively).
// 2. Any recoding needs to happen for the histogram being appended, using the backward interjections
// (in case of any missing buckets, positive or negative range, respectively).
//
// This method must be only used for gauge histograms.
//
// The chunk is not appendable in the following cases:
// - The schema has changed.
// - The threshold for the zero bucket has changed.
// - The last sample in the chunk was stale while the current sample is not stale.
func (a *HistogramAppender) AppendableGauge(h *histogram.Histogram) (
positiveInterjections, negativeInterjections []Interjection,
backwardPositiveInterjections, backwardNegativeInterjections []Interjection,
positiveSpans, negativeSpans []histogram.Span,
okToAppend bool,
) {
if a.NumSamples() > 0 && a.GetCounterResetHeader() != GaugeType {
return
}
if value.IsStaleNaN(h.Sum) {
// This is a stale sample whose buckets and spans don't matter.
okToAppend = true
return
}
if value.IsStaleNaN(a.sum) {
// If the last sample was stale, then we can only accept stale
// samples in this chunk.
return
}

if h.Schema != a.schema || h.ZeroThreshold != a.zThreshold {
return
}

positiveInterjections, backwardPositiveInterjections, positiveSpans = bidirectionalCompareSpans(a.pSpans, h.PositiveSpans)
negativeInterjections, backwardNegativeInterjections, negativeSpans = bidirectionalCompareSpans(a.nSpans, h.NegativeSpans)
okToAppend = true
return
}

// counterResetInAnyBucket returns true if there was a counter reset for any
Expand Down Expand Up @@ -542,6 +590,22 @@ func (a *HistogramAppender) Recode(
return hc, app
}

// RecodeHistogramm converts the current histogram (in-place) to accommodate an expansion of the set of
// (positive and/or negative) buckets used.
func (a *HistogramAppender) RecodeHistogramm(
h *histogram.Histogram,
pBackwardInter, nBackwardInter []Interjection,
) {
if len(pBackwardInter) > 0 {
numPositiveBuckets := countSpans(h.PositiveSpans)
h.PositiveBuckets = interject(h.PositiveBuckets, make([]int64, numPositiveBuckets), pBackwardInter, true)
}
if len(nBackwardInter) > 0 {
numNegativeBuckets := countSpans(h.NegativeSpans)
h.NegativeBuckets = interject(h.NegativeBuckets, make([]int64, numNegativeBuckets), nBackwardInter, true)
}
}

func (a *HistogramAppender) writeSumDelta(v float64) {
xorWrite(a.b, v, a.sum, &a.leading, &a.trailing)
}
Expand All @@ -551,6 +615,8 @@ type histogramIterator struct {
numTotal uint16
numRead uint16

counterResetHeader CounterResetHeader

// Layout:
schema int32
zThreshold float64
Expand Down Expand Up @@ -599,16 +665,21 @@ func (it *histogramIterator) AtHistogram() (int64, *histogram.Histogram) {
return it.t, &histogram.Histogram{Sum: it.sum}
}
it.atHistogramCalled = true
crHint := histogram.UnknownCounterReset
if it.counterResetHeader == GaugeType {
crHint = histogram.GaugeType
}
return it.t, &histogram.Histogram{
Count: it.cnt,
ZeroCount: it.zCnt,
Sum: it.sum,
ZeroThreshold: it.zThreshold,
Schema: it.schema,
PositiveSpans: it.pSpans,
NegativeSpans: it.nSpans,
PositiveBuckets: it.pBuckets,
NegativeBuckets: it.nBuckets,
CounterResetHint: crHint,
Count: it.cnt,
ZeroCount: it.zCnt,
Sum: it.sum,
ZeroThreshold: it.zThreshold,
Schema: it.schema,
PositiveSpans: it.pSpans,
NegativeSpans: it.nSpans,
PositiveBuckets: it.pBuckets,
NegativeBuckets: it.nBuckets,
}
}

Expand All @@ -617,16 +688,21 @@ func (it *histogramIterator) AtFloatHistogram() (int64, *histogram.FloatHistogra
return it.t, &histogram.FloatHistogram{Sum: it.sum}
}
it.atFloatHistogramCalled = true
crHint := histogram.UnknownCounterReset
if it.counterResetHeader == GaugeType {
crHint = histogram.GaugeType
}
return it.t, &histogram.FloatHistogram{
Count: float64(it.cnt),
ZeroCount: float64(it.zCnt),
Sum: it.sum,
ZeroThreshold: it.zThreshold,
Schema: it.schema,
PositiveSpans: it.pSpans,
NegativeSpans: it.nSpans,
PositiveBuckets: it.pFloatBuckets,
NegativeBuckets: it.nFloatBuckets,
CounterResetHint: crHint,
Count: float64(it.cnt),
ZeroCount: float64(it.zCnt),
Sum: it.sum,
ZeroThreshold: it.zThreshold,
Schema: it.schema,
PositiveSpans: it.pSpans,
NegativeSpans: it.nSpans,
PositiveBuckets: it.pFloatBuckets,
NegativeBuckets: it.nFloatBuckets,
}
}

Expand All @@ -645,6 +721,8 @@ func (it *histogramIterator) Reset(b []byte) {
it.numTotal = binary.BigEndian.Uint16(b)
it.numRead = 0

it.counterResetHeader = CounterResetHeader(b[2] & 0b11000000)

it.t, it.cnt, it.zCnt = 0, 0, 0
it.tDelta, it.cntDelta, it.zCntDelta = 0, 0, 0

Expand Down
4 changes: 4 additions & 0 deletions tsdb/chunkenc/histogram_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ loop:
return interjections, bInterjections, mergedSpans
}

type bucketValue interface {
int64 | float64
}

// interject merges 'in' with the provided interjections and writes them into
// 'out', which must already have the appropriate length.
func interject[BV bucketValue](in, out []BV, interjections []Interjection, deltas bool) []BV {
Expand Down
Loading

0 comments on commit ac96da3

Please sign in to comment.