Skip to content

Commit

Permalink
add traces to compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
louiseschmidtgen committed Jul 12, 2024
1 parent 213b745 commit 85af5c3
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 0 deletions.
20 changes: 20 additions & 0 deletions pkg/kine/logstructured/sqllog/otel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package sqllog

import (
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)

const name = "sqllog"

var (
tracer = otel.Tracer(name)
meter = otel.Meter(name)
compactCnt metric.Int64Counter
)

func init() {
compactCnt, _ = meter.Int64Counter(fmt.Sprintf("%s.compact", name), metric.WithDescription("Number of compact requests"))
}
27 changes: 27 additions & 0 deletions pkg/kine/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/canonical/k8s-dqlite/pkg/kine/broadcaster"
"github.com/canonical/k8s-dqlite/pkg/kine/server"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
)

const SupersededCount = 100
Expand Down Expand Up @@ -124,17 +125,26 @@ func (s *SQLLog) DoCompact(ctx context.Context) error {
}

func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) {
spanCtx, span := tracer.Start(s.ctx, "sqllog.compactor")
s.ctx = spanCtx
defer span.End()
span.AddEvent(fmt.Sprintf("nextEnd: %d", nextEnd))

currentRev, err := s.d.CurrentRevision(ctx)
span.AddEvent(fmt.Sprintf("get current revision: %d", currentRev))
if err != nil {
span.RecordError(err)
logrus.Errorf("failed to get current revision: %v", err)
return nextEnd, fmt.Errorf("failed to get current revision: %v", err)
}

cursor, _, err := s.d.GetCompactRevision(ctx)
if err != nil {
span.RecordError(err)
logrus.Errorf("failed to get compact revision: %v", err)
return nextEnd, fmt.Errorf("failed to get compact revision: %v", err)
}
span.AddEvent(fmt.Sprintf("get compact revision: %d", cursor))

end := nextEnd
nextEnd = currentRev
Expand All @@ -148,15 +158,19 @@ func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) {
savedCursor := cursor
// Purposefully start at the current and redo the current as
// it could have failed before actually compacting
compactCnt.Add(ctx, 1)
span.AddEvent(fmt.Sprintf("start compaction from %d to %d", cursor, end))
for ; cursor <= end; cursor++ {
rows, err := s.d.GetRevision(ctx, cursor)
if err != nil {
span.RecordError(err)
logrus.Errorf("failed to get revision %d: %v", cursor, err)
return nextEnd, fmt.Errorf("failed to get revision %d: %v", cursor, err)
}

events, err := RowsToEvents(rows)
if err != nil {
span.RecordError(err)
logrus.Errorf("failed to convert to events: %v", err)
return nextEnd, fmt.Errorf("failed to convert to events: %v", err)
}
Expand All @@ -168,6 +182,7 @@ func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) {
event := events[0]

if event.KV.Key == "compact_rev_key" {
span.AddEvent("skip compact_rev_key")
// don't compact the compact key
continue
}
Expand All @@ -176,6 +191,8 @@ func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) {
if event.PrevKV != nil && event.PrevKV.ModRevision != 0 {
if savedCursor != cursor {
if err := s.d.SetCompactRevision(ctx, cursor); err != nil {
span.RecordError(err)
span.AddEvent(fmt.Sprintf("failed to record compact revision: %v", err))
logrus.Errorf("failed to record compact revision: %v", err)
return nextEnd, fmt.Errorf("failed to record compact revision: %v", err)
}
Expand All @@ -184,6 +201,8 @@ func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) {
}

if err := s.d.DeleteRevision(ctx, event.PrevKV.ModRevision); err != nil {
span.RecordError(err)
span.AddEvent(fmt.Sprintf("failed to delete revision %d", event.PrevKV.ModRevision))
logrus.Errorf("failed to delete revision %d: %v", event.PrevKV.ModRevision, err)
return nextEnd, fmt.Errorf("failed to delete revision %d: %v", event.PrevKV.ModRevision, err)
}
Expand All @@ -192,13 +211,15 @@ func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) {
if event.Delete {
if !setRev && savedCursor != cursor {
if err := s.d.SetCompactRevision(ctx, cursor); err != nil {
span.RecordError(err)
logrus.Errorf("failed to record compact revision: %v", err)
return nextEnd, fmt.Errorf("failed to record compact revision: %v", err)
}
savedCursor = cursor
}

if err := s.d.DeleteRevision(ctx, cursor); err != nil {
span.RecordError(err)
logrus.Errorf("failed to delete current revision %d: %v", cursor, err)
return nextEnd, fmt.Errorf("failed to delete current revision %d: %v", cursor, err)
}
Expand All @@ -207,6 +228,7 @@ func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) {

if savedCursor != cursor {
if err := s.d.SetCompactRevision(ctx, cursor); err != nil {
span.RecordError(err)
logrus.Errorf("failed to record compact revision: %v", err)
return nextEnd, fmt.Errorf("failed to record compact revision: %v", err)
}
Expand All @@ -216,14 +238,19 @@ func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) {
}

func (s *SQLLog) compact() {
ctx, span := tracer.Start(s.ctx, "sqllog.compact")
s.ctx = ctx // yes? no?
defer span.End()
var nextEnd int64

t := time.NewTicker(s.d.GetCompactInterval())
nextEnd, _ = s.d.CurrentRevision(s.ctx)
span.SetAttributes(attribute.Int64("nextEnd", nextEnd))

for {
select {
case <-s.ctx.Done():
span.AddEvent("context done")
return
case <-t.C:
}
Expand Down

0 comments on commit 85af5c3

Please sign in to comment.