Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Report aggregated results #6

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ linters-settings:
errcheck:
exclude: .errcheck_excludes
lll:
line-length: 140
funlen:
lines: 120
statements: 45
line-length: 120
gocognit:
min-complexity: 40
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/gogo/protobuf v1.2.2-0.20190730201129-28a6bbf47e48
github.com/golang/snappy v0.0.1
github.com/oklog/run v1.0.0
github.com/pkg/errors v0.8.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/prometheus/common v0.6.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
83 changes: 55 additions & 28 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type metrics struct {
metricValueDifference prometheus.Histogram
}

//nolint: funlen
func main() {
l := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
l = log.WithPrefix(l, "ts", log.DefaultTimestampUTC)
Expand Down Expand Up @@ -207,7 +208,7 @@ func main() {
l := log.With(l, "component", "writer")
level.Info(l).Log("msg", "starting the writer")

return runPeriodically(ctx, opts, m.remoteWriteRequests, l, func(rCtx context.Context) {
return runPeriodically(ctx, opts, l, func(rCtx context.Context) {
if err := write(rCtx, opts.WriteEndpoint, opts.Token, generate(opts.Labels), l); err != nil {
m.remoteWriteRequests.WithLabelValues("error").Inc()
level.Error(l).Log("msg", "failed to make request", "err", err)
Expand Down Expand Up @@ -235,28 +236,41 @@ func main() {

level.Info(l).Log("msg", "start querying for metrics")

return runPeriodically(ctx, opts, m.queryResponses, l, func(rCtx context.Context) {
return runPeriodically(ctx, opts, l, func(rCtx context.Context) {
if err := read(rCtx, opts.ReadEndpoint, opts.Labels, -1*opts.InitialQueryDelay, opts.Latency, m); err != nil {
m.queryResponses.WithLabelValues("error").Inc()
level.Error(l).Log("msg", "failed to query", "err", err)
} else {
m.queryResponses.WithLabelValues("success").Inc()
}
})
}, func(_ error) {
}, func(err error) {
// TODO: Conseder introducing custom errors.
// For known errors wait for as much as initial delay to finish reading.
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
<-time.After(opts.InitialQueryDelay)
}
cancel()
})
}

if err := g.Run(); err != nil {
level.Error(l).Log("msg", "run group exited with error", "err", err)
// Run actors.
err = g.Run()

// Report results before exit.
if err := reportResults(l, m, opts); err != nil {
level.Error(l).Log("msg", "ratio is below threshold", "err", err)
os.Exit(1)
}

if err != nil {
level.Error(l).Log("msg", "run group exited with error", "err", err)
}

level.Info(l).Log("msg", "up completed its mission!")
}

func runPeriodically(ctx context.Context, opts options, c *prometheus.CounterVec, l log.Logger, f func(rCtx context.Context)) error {
func runPeriodically(ctx context.Context, opts options, l log.Logger, f func(rCtx context.Context)) error {
var (
t = time.NewTicker(opts.Period)
deadline time.Time
Expand Down Expand Up @@ -285,10 +299,11 @@ func runPeriodically(ctx context.Context, opts options, c *prometheus.CounterVec
// If it gets immediately cancelled, zero value of deadline won't cause a lock!
case <-time.After(time.Until(deadline)):
rCancel()
return errors.Wrap(context.DeadlineExceeded, "periodic request stopped")
case <-rCtx.Done():
return errors.Wrap(context.Canceled, "periodic request stopped")
}

return reportResults(l, c, opts.SuccessThreshold)
// TODO: Conseder introducing custom errors.
}
}
}
Expand Down Expand Up @@ -377,35 +392,47 @@ func write(ctx context.Context, endpoint fmt.Stringer, token string, wreq proto.
return nil
}

func reportResults(l log.Logger, c *prometheus.CounterVec, threshold float64) error {
metrics := make(chan prometheus.Metric, 2)
c.Collect(metrics)
close(metrics)
func reportResults(l log.Logger, mtr metrics, opts options) error {
collect := func(l log.Logger, c *prometheus.CounterVec) (success, errors float64) {
mtrs := make(chan prometheus.Metric, 2)

var success, errors float64
c.Collect(mtrs)
close(mtrs)

for m := range metrics {
m1 := &dto.Metric{}
if err := m.Write(m1); err != nil {
level.Warn(l).Log("msg", "cannot read success and error count from prometheus counter", "err", err)
}
for m := range mtrs {
m1 := &dto.Metric{}
if err := m.Write(m1); err != nil {
level.Warn(l).Log("msg", "cannot read success and error count from prometheus counter", "err", err)
}

for _, l := range m1.Label {
switch *l.Value {
case "error":
errors = m1.GetCounter().GetValue()
case "success":
success = m1.GetCounter().GetValue()
for _, l := range m1.Label {
switch *l.Value {
case "error":
errors += m1.GetCounter().GetValue()
case "success":
success += m1.GetCounter().GetValue()
}
}
}

level.Info(l).Log("msg", "number of requests", "success", success, "errors", errors)

return
}

success, errors := collect(log.With(l, "component", "writer"), mtr.remoteWriteRequests)

if opts.ReadEndpoint != nil {
s, e := collect(log.With(l, "component", "reader"), mtr.queryResponses)
success += s
errors += e
}

level.Info(l).Log("msg", "number of requests", "success", success, "errors", errors)
level.Info(l).Log("msg", "total number of requests", "success", success, "errors", errors)

ratio := success / (success + errors)
if ratio < threshold {
level.Error(l).Log("msg", "ratio is below threshold")
return fmt.Errorf("failed with less than %2.f%% success ratio - actual %2.f%%", threshold*100, ratio*100)
if ratio < opts.SuccessThreshold {
return fmt.Errorf("failed with less than %2.f%% success ratio - actual %2.f%%", opts.SuccessThreshold*100, ratio*100)
}

return nil
Expand Down