From 85af5c36af73012ef359826d8a9b8e729c7b1fa9 Mon Sep 17 00:00:00 2001 From: louiseschmidtgen Date: Fri, 12 Jul 2024 10:33:36 +0200 Subject: [PATCH] add traces to compaction --- pkg/kine/logstructured/sqllog/otel.go | 20 ++++++++++++++++++++ pkg/kine/logstructured/sqllog/sql.go | 27 +++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) create mode 100644 pkg/kine/logstructured/sqllog/otel.go diff --git a/pkg/kine/logstructured/sqllog/otel.go b/pkg/kine/logstructured/sqllog/otel.go new file mode 100644 index 00000000..e7c87999 --- /dev/null +++ b/pkg/kine/logstructured/sqllog/otel.go @@ -0,0 +1,20 @@ +package sqllog + +import ( + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" +) + +const name = "sqllog" + +var ( + tracer = otel.Tracer(name) + meter = otel.Meter(name) + compactCnt metric.Int64Counter +) + +func init() { + compactCnt, _ = meter.Int64Counter(fmt.Sprintf("%s.compact", name), metric.WithDescription("Number of compact requests")) +} diff --git a/pkg/kine/logstructured/sqllog/sql.go b/pkg/kine/logstructured/sqllog/sql.go index c688a594..9fcfcdf6 100644 --- a/pkg/kine/logstructured/sqllog/sql.go +++ b/pkg/kine/logstructured/sqllog/sql.go @@ -11,6 +11,7 @@ import ( "github.com/canonical/k8s-dqlite/pkg/kine/broadcaster" "github.com/canonical/k8s-dqlite/pkg/kine/server" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" ) const SupersededCount = 100 @@ -124,17 +125,26 @@ func (s *SQLLog) DoCompact(ctx context.Context) error { } func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) { + spanCtx, span := tracer.Start(s.ctx, "sqllog.compactor") + s.ctx = spanCtx + defer span.End() + span.AddEvent(fmt.Sprintf("nextEnd: %d", nextEnd)) + currentRev, err := s.d.CurrentRevision(ctx) + span.AddEvent(fmt.Sprintf("get current revision: %d", currentRev)) if err != nil { + span.RecordError(err) logrus.Errorf("failed to get current revision: %v", err) return nextEnd, fmt.Errorf("failed to get current revision: %v", err) } cursor, _, err := s.d.GetCompactRevision(ctx) if err != nil { + span.RecordError(err) logrus.Errorf("failed to get compact revision: %v", err) return nextEnd, fmt.Errorf("failed to get compact revision: %v", err) } + span.AddEvent(fmt.Sprintf("get compact revision: %d", cursor)) end := nextEnd nextEnd = currentRev @@ -148,15 +158,19 @@ func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) { savedCursor := cursor // Purposefully start at the current and redo the current as // it could have failed before actually compacting + compactCnt.Add(ctx, 1) + span.AddEvent(fmt.Sprintf("start compaction from %d to %d", cursor, end)) for ; cursor <= end; cursor++ { rows, err := s.d.GetRevision(ctx, cursor) if err != nil { + span.RecordError(err) logrus.Errorf("failed to get revision %d: %v", cursor, err) return nextEnd, fmt.Errorf("failed to get revision %d: %v", cursor, err) } events, err := RowsToEvents(rows) if err != nil { + span.RecordError(err) logrus.Errorf("failed to convert to events: %v", err) return nextEnd, fmt.Errorf("failed to convert to events: %v", err) } @@ -168,6 +182,7 @@ func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) { event := events[0] if event.KV.Key == "compact_rev_key" { + span.AddEvent("skip compact_rev_key") // don't compact the compact key continue } @@ -176,6 +191,8 @@ func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) { if event.PrevKV != nil && event.PrevKV.ModRevision != 0 { if savedCursor != cursor { if err := s.d.SetCompactRevision(ctx, cursor); err != nil { + span.RecordError(err) + span.AddEvent(fmt.Sprintf("failed to record compact revision: %v", err)) logrus.Errorf("failed to record compact revision: %v", err) return nextEnd, fmt.Errorf("failed to record compact revision: %v", err) } @@ -184,6 +201,8 @@ func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) { } if err := s.d.DeleteRevision(ctx, event.PrevKV.ModRevision); err != nil { + span.RecordError(err) + span.AddEvent(fmt.Sprintf("failed to delete revision %d", event.PrevKV.ModRevision)) logrus.Errorf("failed to delete revision %d: %v", event.PrevKV.ModRevision, err) return nextEnd, fmt.Errorf("failed to delete revision %d: %v", event.PrevKV.ModRevision, err) } @@ -192,6 +211,7 @@ func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) { if event.Delete { if !setRev && savedCursor != cursor { if err := s.d.SetCompactRevision(ctx, cursor); err != nil { + span.RecordError(err) logrus.Errorf("failed to record compact revision: %v", err) return nextEnd, fmt.Errorf("failed to record compact revision: %v", err) } @@ -199,6 +219,7 @@ func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) { } if err := s.d.DeleteRevision(ctx, cursor); err != nil { + span.RecordError(err) logrus.Errorf("failed to delete current revision %d: %v", cursor, err) return nextEnd, fmt.Errorf("failed to delete current revision %d: %v", cursor, err) } @@ -207,6 +228,7 @@ func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) { if savedCursor != cursor { if err := s.d.SetCompactRevision(ctx, cursor); err != nil { + span.RecordError(err) logrus.Errorf("failed to record compact revision: %v", err) return nextEnd, fmt.Errorf("failed to record compact revision: %v", err) } @@ -216,14 +238,19 @@ func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) { } func (s *SQLLog) compact() { + ctx, span := tracer.Start(s.ctx, "sqllog.compact") + s.ctx = ctx // yes? no? + defer span.End() var nextEnd int64 t := time.NewTicker(s.d.GetCompactInterval()) nextEnd, _ = s.d.CurrentRevision(s.ctx) + span.SetAttributes(attribute.Int64("nextEnd", nextEnd)) for { select { case <-s.ctx.Done(): + span.AddEvent("context done") return case <-t.C: }