Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
BarkovBG committed Jan 25, 2025
1 parent 3419a77 commit 00d006a
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 30 deletions.
54 changes: 54 additions & 0 deletions cloud/tasks/persistence/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package persistence

import (
"time"

"github.com/ydb-platform/nbs/cloud/tasks/metrics"
)

////////////////////////////////////////////////////////////////////////////////

type healthCheck struct {
queriesCount uint64
successQueriesCount uint64
registry metrics.Registry
metricsCollectionInterval time.Duration
}

func (h *healthCheck) accountQuery(err error) {
h.queriesCount++
if err == nil {
h.successQueriesCount++
}
}

func (h *healthCheck) reportSuccessRate() {
if h.queriesCount == 0 {
h.registry.Gauge("successRate").Set(0)
return
}

h.registry.Gauge("successRate").Set(float64(h.successQueriesCount) / float64(h.queriesCount))
}

func newHealthCheck(componentName string, registry metrics.Registry) *healthCheck {
subRegistry := registry.WithTags(map[string]string{
"component": componentName,
})

h := healthCheck{
registry: subRegistry,
metricsCollectionInterval: 15 * time.Second, // todo
}

go func() {
ticker := time.NewTicker(h.metricsCollectionInterval)
defer ticker.Stop()

for range ticker.C {
h.reportSuccessRate()
}
}()

return &h
}
48 changes: 48 additions & 0 deletions cloud/tasks/persistence/health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package persistence

import (
"sync"
"testing"

"github.com/stretchr/testify/mock"
"github.com/ydb-platform/nbs/cloud/tasks/errors"
"github.com/ydb-platform/nbs/cloud/tasks/metrics/mocks"
)

////////////////////////////////////////////////////////////////////////////////

func TestHealthCheckMetric(t *testing.T) {
registry := mocks.NewRegistryMock()
healthCheck := newHealthCheck("test", registry)

gaugeSetWg := sync.WaitGroup{}

gaugeSetWg.Add(1)
registry.GetGauge(
"successRate",
map[string]string{"component": "test"},
).On("Set", float64(0)).Once().Run(
func(args mock.Arguments) {
gaugeSetWg.Done()
},
)
gaugeSetWg.Wait()

healthCheck.accountQuery(nil)
healthCheck.accountQuery(nil)
healthCheck.accountQuery(nil)
healthCheck.accountQuery(errors.NewEmptyRetriableError())

gaugeSetWg.Add(1)
registry.GetGauge(
"successRate",
map[string]string{"component": "test"},
).On("Set", float64(3.0/4.0)).Once().Run(
func(args mock.Arguments) {
gaugeSetWg.Done()
},
)
gaugeSetWg.Wait()

registry.AssertAllExpectations(t)
}
11 changes: 7 additions & 4 deletions cloud/tasks/persistence/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ func NewS3Client(
region string,
credentials S3Credentials,
callTimeout time.Duration,
registry metrics.Registry,
s3MetricsRegistry metrics.Registry,
healthMetricsRegistry metrics.Registry,
maxRetriableErrorCount uint64,
) (*S3Client, error) {

s3Metrics := newS3Metrics(registry, callTimeout)
s3Metrics := newS3Metrics(callTimeout, s3MetricsRegistry, healthMetricsRegistry)

sessionConfig := &aws.Config{
Credentials: aws_credentials.NewStaticCredentials(
Expand Down Expand Up @@ -90,7 +91,8 @@ func NewS3Client(

func NewS3ClientFromConfig(
config *persistence_config.S3Config,
registry metrics.Registry,
s3MetricsRegistry metrics.Registry,
healthMetricsRegistry metrics.Registry,
) (*S3Client, error) {

credentials, err := NewS3CredentialsFromFile(config.GetCredentialsFilePath())
Expand All @@ -111,7 +113,8 @@ func NewS3ClientFromConfig(
config.GetRegion(),
credentials,
callTimeout,
registry,
s3MetricsRegistry,
healthMetricsRegistry,
config.GetMaxRetriableErrorCount(),
)
}
Expand Down
24 changes: 16 additions & 8 deletions cloud/tasks/persistence/s3_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ func s3CallDurationBuckets() metrics.DurationBuckets {
////////////////////////////////////////////////////////////////////////////////

type s3Metrics struct {
registry metrics.Registry
callTimeout time.Duration
callTimeout time.Duration
s3MetricsRegistry metrics.Registry
healthCheck *healthCheck
}

func (m *s3Metrics) StatCall(
Expand All @@ -38,18 +39,20 @@ func (m *s3Metrics) StatCall(
start := time.Now()

return func(err *error) {
subRegistry := m.registry.WithTags(map[string]string{
subRegistry := m.s3MetricsRegistry.WithTags(map[string]string{
"call": name,
})

// Should initialize all counters before using them, to avoid 'no data'.
errorCounter := subRegistry.Counter("errors")
successCounter := subRegistry.Counter("success")
hangingCounter := subRegistry.Counter("hanging")
errorCounter := subRegistry.Counter("errors")
timeoutCounter := subRegistry.Counter("errors/timeout")
canceledCounter := subRegistry.Counter("errors/canceled")
timeHistogram := subRegistry.DurationHistogram("time", s3CallDurationBuckets())

m.healthCheck.accountQuery(*err)

if time.Since(start) >= m.callTimeout {
logging.Error(
ctx,
Expand Down Expand Up @@ -110,7 +113,7 @@ func (m *s3Metrics) OnRetry(req *request.Request) {
req.RetryCount+1,
)

subRegistry := m.registry.WithTags(map[string]string{
subRegistry := m.s3MetricsRegistry.WithTags(map[string]string{
"call": req.Operation.Name,
})

Expand All @@ -119,9 +122,14 @@ func (m *s3Metrics) OnRetry(req *request.Request) {
retryCounter.Inc()
}

func newS3Metrics(registry metrics.Registry, callTimeout time.Duration) *s3Metrics {
func newS3Metrics(
callTimeout time.Duration,
s3MetricsRegistry metrics.Registry,
healthMetricsRegistry metrics.Registry,
) *s3Metrics {
return &s3Metrics{
registry: registry,
callTimeout: callTimeout,
callTimeout: callTimeout,
s3MetricsRegistry: s3MetricsRegistry,
healthCheck: newHealthCheck("s3", healthMetricsRegistry),
}
}
67 changes: 49 additions & 18 deletions cloud/tasks/persistence/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ const maxRetriableErrorCount = 3
////////////////////////////////////////////////////////////////////////////////

func newS3Client(
metricsRegistry *mocks.RegistryMock,
callTimeout time.Duration,
s3MetricsRegistry *mocks.RegistryMock,
healthMetricsRegistry *mocks.RegistryMock,
) (*S3Client, error) {

credentials := NewS3Credentials("test", "test")
Expand All @@ -27,7 +28,8 @@ func newS3Client(
"region",
credentials,
callTimeout,
metricsRegistry,
s3MetricsRegistry,
healthMetricsRegistry,
maxRetriableErrorCount,
)
}
Expand All @@ -37,80 +39,109 @@ func newS3Client(
func TestS3ShouldSendErrorCanceledMetric(t *testing.T) {
ctx, cancel := context.WithCancel(newContext())

metricsRegistry := mocks.NewRegistryMock()
s3MetricsRegistry := mocks.NewRegistryMock()
healthMetricsRegistry := mocks.NewRegistryMock()

s3, err := newS3Client(metricsRegistry, 10*time.Second /* callTimeout */)
s3, err := newS3Client(10*time.Second /* callTimeout */, s3MetricsRegistry, healthMetricsRegistry)
require.NoError(t, err)

cancel()

metricsRegistry.GetCounter(
s3MetricsRegistry.GetCounter(
"errors",
map[string]string{"call": "CreateBucket"},
).On("Inc").Once()

metricsRegistry.GetCounter(
s3MetricsRegistry.GetCounter(
"errors/canceled",
map[string]string{"call": "CreateBucket"},
).On("Inc").Once()

err = s3.CreateBucket(ctx, "test")
require.True(t, errors.Is(err, errors.NewEmptyRetriableError()))

metricsRegistry.AssertAllExpectations(t)
s3MetricsRegistry.AssertAllExpectations(t)
}

func TestS3ShouldSendErrorTimeoutMetric(t *testing.T) {
ctx, cancel := context.WithCancel(newContext())
defer cancel()

metricsRegistry := mocks.NewRegistryMock()
s3MetricsRegistry := mocks.NewRegistryMock()
healthMetricsRegistry := mocks.NewRegistryMock()

s3, err := newS3Client(metricsRegistry, 0 /* callTimeout */)
s3, err := newS3Client(0 /* callTimeout */, s3MetricsRegistry, healthMetricsRegistry)
require.NoError(t, err)

metricsRegistry.GetCounter(
s3MetricsRegistry.GetCounter(
"errors",
map[string]string{"call": "CreateBucket"},
).On("Inc").Once()

metricsRegistry.GetCounter(
s3MetricsRegistry.GetCounter(
"hanging",
map[string]string{"call": "CreateBucket"},
).On("Inc").Once()

metricsRegistry.GetCounter(
s3MetricsRegistry.GetCounter(
"errors/timeout",
map[string]string{"call": "CreateBucket"},
).On("Inc").Once()

err = s3.CreateBucket(ctx, "test")
require.True(t, errors.Is(err, errors.NewEmptyRetriableError()))

metricsRegistry.AssertAllExpectations(t)
s3MetricsRegistry.AssertAllExpectations(t)
}

func TestS3ShouldRetryRequests(t *testing.T) {
ctx, cancel := context.WithCancel(newContext())
defer cancel()

metricsRegistry := mocks.NewRegistryMock()
s3MetricsRegistry := mocks.NewRegistryMock()
healthMetricsRegistry := mocks.NewRegistryMock()

s3, err := newS3Client(metricsRegistry, 10*time.Second /* callTimeout */)
s3, err := newS3Client(10*time.Second /* callTimeout */, s3MetricsRegistry, healthMetricsRegistry)
require.NoError(t, err)

metricsRegistry.GetCounter(
s3MetricsRegistry.GetCounter(
"errors",
map[string]string{"call": "CreateBucket"},
).On("Inc").Once()

metricsRegistry.GetCounter(
s3MetricsRegistry.GetCounter(
"retry",
map[string]string{"call": "CreateBucket"},
).On("Inc").Times(maxRetriableErrorCount)

err = s3.CreateBucket(ctx, "test")
require.True(t, errors.Is(err, errors.NewEmptyRetriableError()))

metricsRegistry.AssertAllExpectations(t)
s3MetricsRegistry.AssertAllExpectations(t)
}

func TestS3ShouldSendHealthMetric(t *testing.T) {
ctx, cancel := context.WithCancel(newContext())
defer cancel()

s3MetricsRegistry := mocks.NewRegistryMock()
healthMetricsRegistry := mocks.NewRegistryMock()

s3, err := newS3Client(10*time.Second /* callTimeout */, s3MetricsRegistry, healthMetricsRegistry)
require.NoError(t, err)

s3MetricsRegistry.GetCounter(
"errors",
map[string]string{"call": "CreateBucket"},
).On("Inc").Once()

s3MetricsRegistry.GetCounter(
"retry",
map[string]string{"call": "CreateBucket"},
).On("Inc").Times(maxRetriableErrorCount)

err = s3.CreateBucket(ctx, "test")
require.True(t, errors.Is(err, errors.NewEmptyRetriableError()))

s3MetricsRegistry.AssertAllExpectations(t)
}
2 changes: 2 additions & 0 deletions cloud/tasks/persistence/ya.make
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
GO_LIBRARY()

SRCS(
health.go
s3.go
s3_metrics.go
ydb.go
Expand All @@ -9,6 +10,7 @@ SRCS(
)

GO_TEST_SRCS(
health_test.go
s3_test.go
ydb_test.go
)
Expand Down

0 comments on commit 00d006a

Please sign in to comment.