diff --git a/cloud/tasks/persistence/health.go b/cloud/tasks/persistence/health.go new file mode 100644 index 0000000000..4b7b08b57a --- /dev/null +++ b/cloud/tasks/persistence/health.go @@ -0,0 +1,54 @@ +package persistence + +import ( + "time" + + "github.com/ydb-platform/nbs/cloud/tasks/metrics" +) + +//////////////////////////////////////////////////////////////////////////////// + +type healthCheck struct { + queriesCount uint64 + successQueriesCount uint64 + registry metrics.Registry + metricsCollectionInterval time.Duration +} + +func (h *healthCheck) accountQuery(err error) { + h.queriesCount++ + if err == nil { + h.successQueriesCount++ + } +} + +func (h *healthCheck) reportSuccessRate() { + if h.queriesCount == 0 { + h.registry.Gauge("successRate").Set(0) + return + } + + h.registry.Gauge("successRate").Set(float64(h.successQueriesCount) / float64(h.queriesCount)) +} + +func newHealthCheck(componentName string, registry metrics.Registry) *healthCheck { + subRegistry := registry.WithTags(map[string]string{ + "component": componentName, + }) + + h := healthCheck{ + registry: subRegistry, + metricsCollectionInterval: 15 * time.Second, // todo + } + + go func() { + ticker := time.NewTicker(h.metricsCollectionInterval) + defer ticker.Stop() + + for range ticker.C { + h.reportSuccessRate() + } + }() + + return &h +} diff --git a/cloud/tasks/persistence/health_test.go b/cloud/tasks/persistence/health_test.go new file mode 100644 index 0000000000..de59f9798b --- /dev/null +++ b/cloud/tasks/persistence/health_test.go @@ -0,0 +1,48 @@ +package persistence + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/ydb-platform/nbs/cloud/tasks/errors" + "github.com/ydb-platform/nbs/cloud/tasks/metrics/mocks" +) + +//////////////////////////////////////////////////////////////////////////////// + +func TestHealthCheckMetric(t *testing.T) { + registry := mocks.NewRegistryMock() + healthCheck := newHealthCheck("test", registry) + + gaugeSetWg := sync.WaitGroup{} + + gaugeSetWg.Add(1) + registry.GetGauge( + "successRate", + map[string]string{"component": "test"}, + ).On("Set", float64(0)).Once().Run( + func(args mock.Arguments) { + gaugeSetWg.Done() + }, + ) + gaugeSetWg.Wait() + + healthCheck.accountQuery(nil) + healthCheck.accountQuery(nil) + healthCheck.accountQuery(nil) + healthCheck.accountQuery(errors.NewEmptyRetriableError()) + + gaugeSetWg.Add(1) + registry.GetGauge( + "successRate", + map[string]string{"component": "test"}, + ).On("Set", float64(3.0/4.0)).Once().Run( + func(args mock.Arguments) { + gaugeSetWg.Done() + }, + ) + gaugeSetWg.Wait() + + registry.AssertAllExpectations(t) +} diff --git a/cloud/tasks/persistence/s3.go b/cloud/tasks/persistence/s3.go index 1deeacded7..3fab65e9dc 100644 --- a/cloud/tasks/persistence/s3.go +++ b/cloud/tasks/persistence/s3.go @@ -53,11 +53,12 @@ func NewS3Client( region string, credentials S3Credentials, callTimeout time.Duration, - registry metrics.Registry, + s3MetricsRegistry metrics.Registry, + healthMetricsRegistry metrics.Registry, maxRetriableErrorCount uint64, ) (*S3Client, error) { - s3Metrics := newS3Metrics(registry, callTimeout) + s3Metrics := newS3Metrics(callTimeout, s3MetricsRegistry, healthMetricsRegistry) sessionConfig := &aws.Config{ Credentials: aws_credentials.NewStaticCredentials( @@ -90,7 +91,8 @@ func NewS3Client( func NewS3ClientFromConfig( config *persistence_config.S3Config, - registry metrics.Registry, + s3MetricsRegistry metrics.Registry, + healthMetricsRegistry metrics.Registry, ) (*S3Client, error) { credentials, err := NewS3CredentialsFromFile(config.GetCredentialsFilePath()) @@ -111,7 +113,8 @@ func NewS3ClientFromConfig( config.GetRegion(), credentials, callTimeout, - registry, + s3MetricsRegistry, + healthMetricsRegistry, config.GetMaxRetriableErrorCount(), ) } diff --git a/cloud/tasks/persistence/s3_metrics.go b/cloud/tasks/persistence/s3_metrics.go index 645cb4c227..197f622c98 100644 --- a/cloud/tasks/persistence/s3_metrics.go +++ b/cloud/tasks/persistence/s3_metrics.go @@ -24,8 +24,9 @@ func s3CallDurationBuckets() metrics.DurationBuckets { //////////////////////////////////////////////////////////////////////////////// type s3Metrics struct { - registry metrics.Registry - callTimeout time.Duration + callTimeout time.Duration + s3MetricsRegistry metrics.Registry + healthCheck *healthCheck } func (m *s3Metrics) StatCall( @@ -38,18 +39,20 @@ func (m *s3Metrics) StatCall( start := time.Now() return func(err *error) { - subRegistry := m.registry.WithTags(map[string]string{ + subRegistry := m.s3MetricsRegistry.WithTags(map[string]string{ "call": name, }) // Should initialize all counters before using them, to avoid 'no data'. - errorCounter := subRegistry.Counter("errors") successCounter := subRegistry.Counter("success") hangingCounter := subRegistry.Counter("hanging") + errorCounter := subRegistry.Counter("errors") timeoutCounter := subRegistry.Counter("errors/timeout") canceledCounter := subRegistry.Counter("errors/canceled") timeHistogram := subRegistry.DurationHistogram("time", s3CallDurationBuckets()) + m.healthCheck.accountQuery(*err) + if time.Since(start) >= m.callTimeout { logging.Error( ctx, @@ -110,7 +113,7 @@ func (m *s3Metrics) OnRetry(req *request.Request) { req.RetryCount+1, ) - subRegistry := m.registry.WithTags(map[string]string{ + subRegistry := m.s3MetricsRegistry.WithTags(map[string]string{ "call": req.Operation.Name, }) @@ -119,9 +122,14 @@ func (m *s3Metrics) OnRetry(req *request.Request) { retryCounter.Inc() } -func newS3Metrics(registry metrics.Registry, callTimeout time.Duration) *s3Metrics { +func newS3Metrics( + callTimeout time.Duration, + s3MetricsRegistry metrics.Registry, + healthMetricsRegistry metrics.Registry, +) *s3Metrics { return &s3Metrics{ - registry: registry, - callTimeout: callTimeout, + callTimeout: callTimeout, + s3MetricsRegistry: s3MetricsRegistry, + healthCheck: newHealthCheck("s3", healthMetricsRegistry), } } diff --git a/cloud/tasks/persistence/s3_test.go b/cloud/tasks/persistence/s3_test.go index fabb5fff32..4c4697f42d 100644 --- a/cloud/tasks/persistence/s3_test.go +++ b/cloud/tasks/persistence/s3_test.go @@ -17,8 +17,9 @@ const maxRetriableErrorCount = 3 //////////////////////////////////////////////////////////////////////////////// func newS3Client( - metricsRegistry *mocks.RegistryMock, callTimeout time.Duration, + s3MetricsRegistry *mocks.RegistryMock, + healthMetricsRegistry *mocks.RegistryMock, ) (*S3Client, error) { credentials := NewS3Credentials("test", "test") @@ -27,7 +28,8 @@ func newS3Client( "region", credentials, callTimeout, - metricsRegistry, + s3MetricsRegistry, + healthMetricsRegistry, maxRetriableErrorCount, ) } @@ -37,19 +39,20 @@ func newS3Client( func TestS3ShouldSendErrorCanceledMetric(t *testing.T) { ctx, cancel := context.WithCancel(newContext()) - metricsRegistry := mocks.NewRegistryMock() + s3MetricsRegistry := mocks.NewRegistryMock() + healthMetricsRegistry := mocks.NewRegistryMock() - s3, err := newS3Client(metricsRegistry, 10*time.Second /* callTimeout */) + s3, err := newS3Client(10*time.Second /* callTimeout */, s3MetricsRegistry, healthMetricsRegistry) require.NoError(t, err) cancel() - metricsRegistry.GetCounter( + s3MetricsRegistry.GetCounter( "errors", map[string]string{"call": "CreateBucket"}, ).On("Inc").Once() - metricsRegistry.GetCounter( + s3MetricsRegistry.GetCounter( "errors/canceled", map[string]string{"call": "CreateBucket"}, ).On("Inc").Once() @@ -57,29 +60,30 @@ func TestS3ShouldSendErrorCanceledMetric(t *testing.T) { err = s3.CreateBucket(ctx, "test") require.True(t, errors.Is(err, errors.NewEmptyRetriableError())) - metricsRegistry.AssertAllExpectations(t) + s3MetricsRegistry.AssertAllExpectations(t) } func TestS3ShouldSendErrorTimeoutMetric(t *testing.T) { ctx, cancel := context.WithCancel(newContext()) defer cancel() - metricsRegistry := mocks.NewRegistryMock() + s3MetricsRegistry := mocks.NewRegistryMock() + healthMetricsRegistry := mocks.NewRegistryMock() - s3, err := newS3Client(metricsRegistry, 0 /* callTimeout */) + s3, err := newS3Client(0 /* callTimeout */, s3MetricsRegistry, healthMetricsRegistry) require.NoError(t, err) - metricsRegistry.GetCounter( + s3MetricsRegistry.GetCounter( "errors", map[string]string{"call": "CreateBucket"}, ).On("Inc").Once() - metricsRegistry.GetCounter( + s3MetricsRegistry.GetCounter( "hanging", map[string]string{"call": "CreateBucket"}, ).On("Inc").Once() - metricsRegistry.GetCounter( + s3MetricsRegistry.GetCounter( "errors/timeout", map[string]string{"call": "CreateBucket"}, ).On("Inc").Once() @@ -87,24 +91,25 @@ func TestS3ShouldSendErrorTimeoutMetric(t *testing.T) { err = s3.CreateBucket(ctx, "test") require.True(t, errors.Is(err, errors.NewEmptyRetriableError())) - metricsRegistry.AssertAllExpectations(t) + s3MetricsRegistry.AssertAllExpectations(t) } func TestS3ShouldRetryRequests(t *testing.T) { ctx, cancel := context.WithCancel(newContext()) defer cancel() - metricsRegistry := mocks.NewRegistryMock() + s3MetricsRegistry := mocks.NewRegistryMock() + healthMetricsRegistry := mocks.NewRegistryMock() - s3, err := newS3Client(metricsRegistry, 10*time.Second /* callTimeout */) + s3, err := newS3Client(10*time.Second /* callTimeout */, s3MetricsRegistry, healthMetricsRegistry) require.NoError(t, err) - metricsRegistry.GetCounter( + s3MetricsRegistry.GetCounter( "errors", map[string]string{"call": "CreateBucket"}, ).On("Inc").Once() - metricsRegistry.GetCounter( + s3MetricsRegistry.GetCounter( "retry", map[string]string{"call": "CreateBucket"}, ).On("Inc").Times(maxRetriableErrorCount) @@ -112,5 +117,31 @@ func TestS3ShouldRetryRequests(t *testing.T) { err = s3.CreateBucket(ctx, "test") require.True(t, errors.Is(err, errors.NewEmptyRetriableError())) - metricsRegistry.AssertAllExpectations(t) + s3MetricsRegistry.AssertAllExpectations(t) +} + +func TestS3ShouldSendHealthMetric(t *testing.T) { + ctx, cancel := context.WithCancel(newContext()) + defer cancel() + + s3MetricsRegistry := mocks.NewRegistryMock() + healthMetricsRegistry := mocks.NewRegistryMock() + + s3, err := newS3Client(10*time.Second /* callTimeout */, s3MetricsRegistry, healthMetricsRegistry) + require.NoError(t, err) + + s3MetricsRegistry.GetCounter( + "errors", + map[string]string{"call": "CreateBucket"}, + ).On("Inc").Once() + + s3MetricsRegistry.GetCounter( + "retry", + map[string]string{"call": "CreateBucket"}, + ).On("Inc").Times(maxRetriableErrorCount) + + err = s3.CreateBucket(ctx, "test") + require.True(t, errors.Is(err, errors.NewEmptyRetriableError())) + + s3MetricsRegistry.AssertAllExpectations(t) } diff --git a/cloud/tasks/persistence/ya.make b/cloud/tasks/persistence/ya.make index bde3608b4c..eb6bb1e781 100644 --- a/cloud/tasks/persistence/ya.make +++ b/cloud/tasks/persistence/ya.make @@ -1,6 +1,7 @@ GO_LIBRARY() SRCS( + health.go s3.go s3_metrics.go ydb.go @@ -9,6 +10,7 @@ SRCS( ) GO_TEST_SRCS( + health_test.go s3_test.go ydb_test.go )