diff --git a/cloud/disk_manager/pkg/app/run.go b/cloud/disk_manager/pkg/app/run.go index 75d0272a956..3fc2ce63714 100644 --- a/cloud/disk_manager/pkg/app/run.go +++ b/cloud/disk_manager/pkg/app/run.go @@ -204,10 +204,15 @@ func run( s3Bucket = snapshotConfig.GetS3Bucket() // TODO: remove when s3 will always be initialized. if s3Config != nil { - registry := mon.NewRegistry("s3_client") + s3MetricsRegistry := mon.NewRegistry("s3_client") + healthMetricsRegistry := mon.NewRegistry("health") var err error - s3, err = persistence.NewS3ClientFromConfig(s3Config, registry) + s3, err = persistence.NewS3ClientFromConfig( + s3Config, + s3MetricsRegistry, + healthMetricsRegistry, + ) if err != nil { return err } diff --git a/cloud/disk_manager/pkg/schema/dataplane.go b/cloud/disk_manager/pkg/schema/dataplane.go index 6646ca1d8bf..5b58cbdf314 100644 --- a/cloud/disk_manager/pkg/schema/dataplane.go +++ b/cloud/disk_manager/pkg/schema/dataplane.go @@ -48,7 +48,11 @@ func initDataplane( var s3 *persistence.S3Client // TODO: remove when s3 will always be initialized. if s3Config != nil { - s3, err = persistence.NewS3ClientFromConfig(s3Config, metrics.NewEmptyRegistry()) + s3, err = persistence.NewS3ClientFromConfig( + s3Config, + metrics.NewEmptyRegistry(), + metrics.NewEmptyRegistry(), + ) if err != nil { return err } diff --git a/cloud/tasks/persistence/common.go b/cloud/tasks/persistence/common.go new file mode 100644 index 00000000000..2aa7b2ca408 --- /dev/null +++ b/cloud/tasks/persistence/common.go @@ -0,0 +1,30 @@ +package persistence + +import ( + "context" + + tasks_config "github.com/ydb-platform/nbs/cloud/tasks/config" +) + +//////////////////////////////////////////////////////////////////////////////// + +func CreateYDBTables( + ctx context.Context, + config *tasks_config.TasksConfig, + db *YDBClient, + dropUnusedColumns bool, +) error { + + err := db.CreateOrAlterTable( + ctx, + config.GetStorageFolder(), + "availability_monitoring", + availabilityMonitoringTableDescription(), + dropUnusedColumns, + ) + if err != nil { + return err + } + + return nil +} diff --git a/cloud/tasks/persistence/health.go b/cloud/tasks/persistence/health.go new file mode 100644 index 00000000000..4f693e43831 --- /dev/null +++ b/cloud/tasks/persistence/health.go @@ -0,0 +1,79 @@ +package persistence + +import ( + "context" + "time" + + "github.com/ydb-platform/nbs/cloud/tasks/metrics" +) + +//////////////////////////////////////////////////////////////////////////////// + +type AvailabilityMonitoring struct { + componentName string + host string + queriesCount uint64 + successQueriesCount uint64 + + storage *storageYDB + registry metrics.Registry + metricsCollectionInterval time.Duration +} + +func (h *AvailabilityMonitoring) reportSuccessRate(ctx context.Context) { + if h.queriesCount == 0 { + h.registry.Gauge("successRate").Set(0) + return + } + + successRate := float64(h.successQueriesCount) / float64(h.queriesCount) + h.registry.Gauge("successRate").Set(successRate) + h.storage.UpdateAvailabilityRate(ctx, h.componentName, h.host, successRate, time.Now()) +} + +//////////////////////////////////////////////////////////////////////////////// + +func (h *AvailabilityMonitoring) AccountQuery(err error) { + h.queriesCount++ + if err == nil { + h.successQueriesCount++ + } +} + +func NewAvailabilityMonitoring( + ctx context.Context, + componentName string, + host string, + storage *storageYDB, + registry metrics.Registry, +) *AvailabilityMonitoring { + + subRegistry := registry.WithTags(map[string]string{ + "component": componentName, + }) + + h := AvailabilityMonitoring{ + componentName: componentName, + host: host, + + storage: storage, + registry: subRegistry, + metricsCollectionInterval: 15 * time.Second, // todo + } + + go func() { + ticker := time.NewTicker(h.metricsCollectionInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + h.reportSuccessRate(ctx) + } + } + }() + + return &h +} diff --git a/cloud/tasks/persistence/health_storage.go b/cloud/tasks/persistence/health_storage.go new file mode 100644 index 00000000000..30193fd0441 --- /dev/null +++ b/cloud/tasks/persistence/health_storage.go @@ -0,0 +1,63 @@ +package persistence + +import ( + "context" + "fmt" + "time" +) + +//////////////////////////////////////////////////////////////////////////////// + +func availabilityMonitoringTableDescription() CreateTableDescription { + return NewCreateTableDescription( + WithColumn("component", Optional(TypeUTF8)), + WithColumn("host", Optional(TypeUTF8)), + WithColumn("update_at", Optional(TypeTimestamp)), + WithPrimaryKeyColumn("component", "host", "update_at"), + ) +} + +//////////////////////////////////////////////////////////////////////////////// + +func (s *storageYDB) updateAvailabilityRate( + ctx context.Context, + session *Session, + component string, + host string, + value float64, + ts time.Time, +) error { + + _, err := session.ExecuteRW(ctx, fmt.Sprintf(` + --!syntax_v1 + pragma TablePathPrefix = "%v"; + declare $component as Utf8; + declare $host as Utf8; + declare $value as Double; + declare $ts as Timestamp; + + upsert into nodes (component, host, value, ts) + values ($component, $host, $value, $ts); + `, s.tablesPath), + ValueParam("$component", UTF8Value(component)), + ValueParam("$host", UTF8Value(host)), + ValueParam("$value", DoubleValue(value)), + ValueParam("$update_at", TimestampValue(ts)), + ) + return err +} + +//////////////////////////////////////////////////////////////////////////////// + +func (s *storageYDB) UpdateAvailabilityRate( + ctx context.Context, + component string, + host string, + value float64, + ts time.Time, +) error { + + return s.db.Execute(ctx, func(ctx context.Context, session *Session) error { + return s.updateAvailabilityRate(ctx, session, component, host, value, ts) + }) +} diff --git a/cloud/tasks/persistence/health_test.go b/cloud/tasks/persistence/health_test.go new file mode 100644 index 00000000000..5491dafca26 --- /dev/null +++ b/cloud/tasks/persistence/health_test.go @@ -0,0 +1,92 @@ +package persistence + +import ( + "context" + "sync" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + tasks_config "github.com/ydb-platform/nbs/cloud/tasks/config" + "github.com/ydb-platform/nbs/cloud/tasks/errors" + "github.com/ydb-platform/nbs/cloud/tasks/metrics/empty" + "github.com/ydb-platform/nbs/cloud/tasks/metrics/mocks" +) + +//////////////////////////////////////////////////////////////////////////////// + +func newStorage( + t *testing.T, + ctx context.Context, + db *YDBClient, +) *storageYDB { + + config := &tasks_config.TasksConfig{} + err := CreateYDBTables( + ctx, + config, + db, + false, // dropUnusedColums + ) + require.NoError(t, err) + + storage := NewStorage(config, db) + require.NoError(t, err) + + return storage +} + +//////////////////////////////////////////////////////////////////////////////// + +func TestHealthCheckMetric(t *testing.T) { + ctx, cancel := context.WithCancel(newContext()) + defer cancel() + + db, err := newYDB(ctx, empty.NewRegistry()) + require.NoError(t, err) + defer db.Close(ctx) + + storage := newStorage(t, ctx, db) + registry := mocks.NewRegistryMock() + + availabilityMonitoring := NewAvailabilityMonitoring( + ctx, + "component", + "host", + storage, + registry, + ) + + gaugeSetWg := sync.WaitGroup{} + + gaugeSetWg.Add(1) + registry.GetGauge( + "successRate", + map[string]string{"component": "test"}, + ).On("Set", float64(0)).Once().Run( + func(args mock.Arguments) { + gaugeSetWg.Done() + }, + ) + gaugeSetWg.Wait() + + availabilityMonitoring.AccountQuery(nil) + availabilityMonitoring.AccountQuery(nil) + availabilityMonitoring.AccountQuery(nil) + availabilityMonitoring.AccountQuery(errors.NewEmptyRetriableError()) + + gaugeSetWg.Add(1) + registry.GetGauge( + "successRate", + map[string]string{"component": "test"}, + ).On("Set", float64(3.0/4.0)).Once().Run( + func(args mock.Arguments) { + gaugeSetWg.Done() + }, + ) + gaugeSetWg.Wait() + + registry.AssertAllExpectations(t) + + require.NotNil(t, nil) +} diff --git a/cloud/tasks/persistence/s3.go b/cloud/tasks/persistence/s3.go index 1deeacded7e..c9c95808bc3 100644 --- a/cloud/tasks/persistence/s3.go +++ b/cloud/tasks/persistence/s3.go @@ -49,15 +49,18 @@ type S3Client struct { } func NewS3Client( + ctx context.Context, endpoint string, region string, credentials S3Credentials, callTimeout time.Duration, - registry metrics.Registry, + s3MetricsRegistry metrics.Registry, + healthCheckStorage HealthStorage, + healthMetricsRegistry metrics.Registry, maxRetriableErrorCount uint64, ) (*S3Client, error) { - s3Metrics := newS3Metrics(registry, callTimeout) + s3Metrics := newS3Metrics(ctx, callTimeout, s3MetricsRegistry, healthCheckStorage, healthMetricsRegistry) sessionConfig := &aws.Config{ Credentials: aws_credentials.NewStaticCredentials( @@ -89,8 +92,11 @@ func NewS3Client( } func NewS3ClientFromConfig( + ctx context.Context, config *persistence_config.S3Config, - registry metrics.Registry, + s3MetricsRegistry metrics.Registry, + healthCheckStorage *healthCheckStorage, + healthMetricsRegistry metrics.Registry, ) (*S3Client, error) { credentials, err := NewS3CredentialsFromFile(config.GetCredentialsFilePath()) @@ -107,11 +113,14 @@ func NewS3ClientFromConfig( } return NewS3Client( + ctx, config.GetEndpoint(), config.GetRegion(), credentials, callTimeout, - registry, + s3MetricsRegistry, + healthCheckStorage, + healthMetricsRegistry, config.GetMaxRetriableErrorCount(), ) } diff --git a/cloud/tasks/persistence/s3_metrics.go b/cloud/tasks/persistence/s3_metrics.go index 645cb4c2277..707b6ef8319 100644 --- a/cloud/tasks/persistence/s3_metrics.go +++ b/cloud/tasks/persistence/s3_metrics.go @@ -24,8 +24,9 @@ func s3CallDurationBuckets() metrics.DurationBuckets { //////////////////////////////////////////////////////////////////////////////// type s3Metrics struct { - registry metrics.Registry - callTimeout time.Duration + callTimeout time.Duration + s3MetricsRegistry metrics.Registry + healthCheck *HealthCheck } func (m *s3Metrics) StatCall( @@ -38,18 +39,20 @@ func (m *s3Metrics) StatCall( start := time.Now() return func(err *error) { - subRegistry := m.registry.WithTags(map[string]string{ + subRegistry := m.s3MetricsRegistry.WithTags(map[string]string{ "call": name, }) // Should initialize all counters before using them, to avoid 'no data'. - errorCounter := subRegistry.Counter("errors") successCounter := subRegistry.Counter("success") hangingCounter := subRegistry.Counter("hanging") + errorCounter := subRegistry.Counter("errors") timeoutCounter := subRegistry.Counter("errors/timeout") canceledCounter := subRegistry.Counter("errors/canceled") timeHistogram := subRegistry.DurationHistogram("time", s3CallDurationBuckets()) + m.healthCheck.AccountQuery(*err) + if time.Since(start) >= m.callTimeout { logging.Error( ctx, @@ -110,7 +113,7 @@ func (m *s3Metrics) OnRetry(req *request.Request) { req.RetryCount+1, ) - subRegistry := m.registry.WithTags(map[string]string{ + subRegistry := m.s3MetricsRegistry.WithTags(map[string]string{ "call": req.Operation.Name, }) @@ -119,9 +122,16 @@ func (m *s3Metrics) OnRetry(req *request.Request) { retryCounter.Inc() } -func newS3Metrics(registry metrics.Registry, callTimeout time.Duration) *s3Metrics { +func newS3Metrics( + ctx context.Context, + callTimeout time.Duration, + s3MetricsRegistry metrics.Registry, + healthCheckStorage HealthStorage, + healthMetricsRegistry metrics.Registry, +) *s3Metrics { return &s3Metrics{ - registry: registry, - callTimeout: callTimeout, + callTimeout: callTimeout, + s3MetricsRegistry: s3MetricsRegistry, + healthCheck: NewHealthCheck(ctx, "s3", healthCheckStorage, healthMetricsRegistry), } } diff --git a/cloud/tasks/persistence/s3_test.go b/cloud/tasks/persistence/s3_test.go index fabb5fff32d..bb705f05655 100644 --- a/cloud/tasks/persistence/s3_test.go +++ b/cloud/tasks/persistence/s3_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/require" "github.com/ydb-platform/nbs/cloud/tasks/errors" + "github.com/ydb-platform/nbs/cloud/tasks/metrics/empty" "github.com/ydb-platform/nbs/cloud/tasks/metrics/mocks" ) @@ -17,17 +18,23 @@ const maxRetriableErrorCount = 3 //////////////////////////////////////////////////////////////////////////////// func newS3Client( - metricsRegistry *mocks.RegistryMock, + ctx context.Context, callTimeout time.Duration, + s3MetricsRegistry *mocks.RegistryMock, + healthCheckStorage HealthStorage, + healthMetricsRegistry *mocks.RegistryMock, ) (*S3Client, error) { credentials := NewS3Credentials("test", "test") return NewS3Client( + ctx, "endpoint", "region", credentials, callTimeout, - metricsRegistry, + s3MetricsRegistry, + healthCheckStorage, + healthMetricsRegistry, maxRetriableErrorCount, ) } @@ -37,19 +44,26 @@ func newS3Client( func TestS3ShouldSendErrorCanceledMetric(t *testing.T) { ctx, cancel := context.WithCancel(newContext()) - metricsRegistry := mocks.NewRegistryMock() + s3MetricsRegistry := mocks.NewRegistryMock() - s3, err := newS3Client(metricsRegistry, 10*time.Second /* callTimeout */) + db, err := newYDB(ctx, empty.NewRegistry()) + require.NoError(t, err) + defer db.Close(ctx) + + healthCheckStorage := newStorage(t, ctx, db) + healthMetricsRegistry := mocks.NewRegistryMock() + + s3, err := newS3Client(ctx, 10*time.Second /* callTimeout */, s3MetricsRegistry, healthCheckStorage, healthMetricsRegistry) require.NoError(t, err) cancel() - metricsRegistry.GetCounter( + s3MetricsRegistry.GetCounter( "errors", map[string]string{"call": "CreateBucket"}, ).On("Inc").Once() - metricsRegistry.GetCounter( + s3MetricsRegistry.GetCounter( "errors/canceled", map[string]string{"call": "CreateBucket"}, ).On("Inc").Once() @@ -57,29 +71,36 @@ func TestS3ShouldSendErrorCanceledMetric(t *testing.T) { err = s3.CreateBucket(ctx, "test") require.True(t, errors.Is(err, errors.NewEmptyRetriableError())) - metricsRegistry.AssertAllExpectations(t) + s3MetricsRegistry.AssertAllExpectations(t) } func TestS3ShouldSendErrorTimeoutMetric(t *testing.T) { ctx, cancel := context.WithCancel(newContext()) defer cancel() - metricsRegistry := mocks.NewRegistryMock() + s3MetricsRegistry := mocks.NewRegistryMock() + + db, err := newYDB(ctx, empty.NewRegistry()) + require.NoError(t, err) + defer db.Close(ctx) + + healthCheckStorage := newStorage(t, ctx, db) + healthMetricsRegistry := mocks.NewRegistryMock() - s3, err := newS3Client(metricsRegistry, 0 /* callTimeout */) + s3, err := newS3Client(ctx, 0 /* callTimeout */, s3MetricsRegistry, healthCheckStorage, healthMetricsRegistry) require.NoError(t, err) - metricsRegistry.GetCounter( + s3MetricsRegistry.GetCounter( "errors", map[string]string{"call": "CreateBucket"}, ).On("Inc").Once() - metricsRegistry.GetCounter( + s3MetricsRegistry.GetCounter( "hanging", map[string]string{"call": "CreateBucket"}, ).On("Inc").Once() - metricsRegistry.GetCounter( + s3MetricsRegistry.GetCounter( "errors/timeout", map[string]string{"call": "CreateBucket"}, ).On("Inc").Once() @@ -87,24 +108,63 @@ func TestS3ShouldSendErrorTimeoutMetric(t *testing.T) { err = s3.CreateBucket(ctx, "test") require.True(t, errors.Is(err, errors.NewEmptyRetriableError())) - metricsRegistry.AssertAllExpectations(t) + s3MetricsRegistry.AssertAllExpectations(t) } func TestS3ShouldRetryRequests(t *testing.T) { ctx, cancel := context.WithCancel(newContext()) defer cancel() - metricsRegistry := mocks.NewRegistryMock() + s3MetricsRegistry := mocks.NewRegistryMock() + + db, err := newYDB(ctx, empty.NewRegistry()) + require.NoError(t, err) + defer db.Close(ctx) + + healthCheckStorage := newStorage(t, ctx, db) + healthMetricsRegistry := mocks.NewRegistryMock() + + s3, err := newS3Client(ctx, 10*time.Second /* callTimeout */, s3MetricsRegistry, healthCheckStorage, healthMetricsRegistry) + require.NoError(t, err) + + s3MetricsRegistry.GetCounter( + "errors", + map[string]string{"call": "CreateBucket"}, + ).On("Inc").Once() + + s3MetricsRegistry.GetCounter( + "retry", + map[string]string{"call": "CreateBucket"}, + ).On("Inc").Times(maxRetriableErrorCount) + + err = s3.CreateBucket(ctx, "test") + require.True(t, errors.Is(err, errors.NewEmptyRetriableError())) + + s3MetricsRegistry.AssertAllExpectations(t) +} + +func TestS3ShouldSendHealthMetric(t *testing.T) { + ctx, cancel := context.WithCancel(newContext()) + defer cancel() + + s3MetricsRegistry := mocks.NewRegistryMock() + + db, err := newYDB(ctx, empty.NewRegistry()) + require.NoError(t, err) + defer db.Close(ctx) + + healthCheckStorage := newStorage(t, ctx, db) + healthMetricsRegistry := mocks.NewRegistryMock() - s3, err := newS3Client(metricsRegistry, 10*time.Second /* callTimeout */) + s3, err := newS3Client(ctx, 10*time.Second /* callTimeout */, s3MetricsRegistry, healthCheckStorage, healthMetricsRegistry) require.NoError(t, err) - metricsRegistry.GetCounter( + s3MetricsRegistry.GetCounter( "errors", map[string]string{"call": "CreateBucket"}, ).On("Inc").Once() - metricsRegistry.GetCounter( + s3MetricsRegistry.GetCounter( "retry", map[string]string{"call": "CreateBucket"}, ).On("Inc").Times(maxRetriableErrorCount) @@ -112,5 +172,5 @@ func TestS3ShouldRetryRequests(t *testing.T) { err = s3.CreateBucket(ctx, "test") require.True(t, errors.Is(err, errors.NewEmptyRetriableError())) - metricsRegistry.AssertAllExpectations(t) + s3MetricsRegistry.AssertAllExpectations(t) } diff --git a/cloud/tasks/persistence/storage_ydb.go b/cloud/tasks/persistence/storage_ydb.go new file mode 100644 index 00000000000..ee04d863cbc --- /dev/null +++ b/cloud/tasks/persistence/storage_ydb.go @@ -0,0 +1,23 @@ +package persistence + +import ( + tasks_config "github.com/ydb-platform/nbs/cloud/tasks/config" +) + +//////////////////////////////////////////////////////////////////////////////// + +type storageYDB struct { + db *YDBClient + tablesPath string +} + +func NewStorage( + config *tasks_config.TasksConfig, + db *YDBClient, +) *storageYDB { + + return &storageYDB{ + db: db, + tablesPath: db.AbsolutePath(config.GetStorageFolder()), + } +} diff --git a/cloud/tasks/persistence/ya.make b/cloud/tasks/persistence/ya.make index bde3608b4c2..75663d18478 100644 --- a/cloud/tasks/persistence/ya.make +++ b/cloud/tasks/persistence/ya.make @@ -6,11 +6,17 @@ SRCS( ydb.go ydb_logger.go ydb_metrics.go + + common.go + storage_ydb.go + health.go + health_storage.go ) GO_TEST_SRCS( s3_test.go ydb_test.go + health_test.go ) END()