Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

draft #2927

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft

draft #2927

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cloud/disk_manager/pkg/app/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,15 @@ func run(
s3Bucket = snapshotConfig.GetS3Bucket()
// TODO: remove when s3 will always be initialized.
if s3Config != nil {
registry := mon.NewRegistry("s3_client")
s3MetricsRegistry := mon.NewRegistry("s3_client")
healthMetricsRegistry := mon.NewRegistry("health")

var err error
s3, err = persistence.NewS3ClientFromConfig(s3Config, registry)
s3, err = persistence.NewS3ClientFromConfig(
s3Config,
s3MetricsRegistry,
healthMetricsRegistry,
)
if err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion cloud/disk_manager/pkg/schema/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ func initDataplane(
var s3 *persistence.S3Client
// TODO: remove when s3 will always be initialized.
if s3Config != nil {
s3, err = persistence.NewS3ClientFromConfig(s3Config, metrics.NewEmptyRegistry())
s3, err = persistence.NewS3ClientFromConfig(
s3Config,
metrics.NewEmptyRegistry(),
metrics.NewEmptyRegistry(),
)
if err != nil {
return err
}
Expand Down
63 changes: 63 additions & 0 deletions cloud/tasks/persistence/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package persistence

import (
"time"

"github.com/ydb-platform/nbs/cloud/tasks/metrics"
)

////////////////////////////////////////////////////////////////////////////////

type HealthCheck struct {
queriesCount uint64
successQueriesCount uint64
storage *healthCheckStorage
registry metrics.Registry
metricsCollectionInterval time.Duration
}

func (h *HealthCheck) reportSuccessRate() {
if h.queriesCount == 0 {
h.registry.Gauge("successRate").Set(0)
return
}

h.registry.Gauge("successRate").Set(float64(h.successQueriesCount) / float64(h.queriesCount))
}

////////////////////////////////////////////////////////////////////////////////

func (h *HealthCheck) AccountQuery(err error) {
h.queriesCount++
if err == nil {
h.successQueriesCount++
}
}

func NewHealthCheck(
componentName string,
storage *healthCheckStorage,
registry metrics.Registry,
) *HealthCheck {

subRegistry := registry.WithTags(map[string]string{
"component": componentName,
})

h := HealthCheck{
storage: storage,
registry: subRegistry,
metricsCollectionInterval: 15 * time.Second, // todo
}

go func() {
ticker := time.NewTicker(h.metricsCollectionInterval)
defer ticker.Stop()

for range ticker.C {
h.reportSuccessRate()
}
}()

return &h
}
112 changes: 112 additions & 0 deletions cloud/tasks/persistence/health_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package persistence

import (
"context"

"github.com/ydb-platform/nbs/cloud/tasks/logging"
)

// import (
// "time"
// )

////////////////////////////////////////////////////////////////////////////////

func healthCheckTableDescription() CreateTableDescription {
return NewCreateTableDescription(
WithColumn("component", Optional(TypeUTF8)),
WithColumn("update_at", Optional(TypeTimestamp)),
WithPrimaryKeyColumn("component", "update_at"),
)
}

func CreateYDBTables(
ctx context.Context,
db *YDBClient,
dropUnusedColumns bool,
) error {

err := db.CreateOrAlterTable(
ctx,
"kek",
"health",
healthCheckTableDescription(),
dropUnusedColumns,
)
if err != nil {
return err
}
logging.Info(ctx, "Created nodes table")

return nil
}

type healthCheckStorage struct {
}

func NewStorage(db *YDBClient) *healthCheckStorage {
return &healthCheckStorage{}
}

// type HealthCheck struct {
// componentName string
// Rate float64
// LastUpdate time.Time
// }

////////////////////////////////////////////////////////////////////////////////

// Returns ydb entity of the node object.
// func (h *HealthCheck) structValue() persistence.Value {
// return persistence.StructValue(
// persistence.StructFieldValue("rate", persistence.DoubleValue(h.Rate)),
// persistence.StructFieldValue("last_update", persistence.DatetimeValueFromTime(h.LastUpdate)),
// )
// }

// // Scans single node from the YDB result set.
// func scanHealthCheck(result persistence.Result) (healthCheck HealthCheck, err error) {
// err = result.ScanNamed(
// persistence.OptionalWithDefault("rate", &healthCheck.Rate),
// persistence.OptionalWithDefault("last_update", &healthCheck.LastUpdate),
// )
// return
// }

// // Scans all nodes from the YDB result set.
// func scanNodes(ctx context.Context, res persistence.Result) ([]Node, error) {
// var nodes []Node
// for res.NextResultSet(ctx) {
// for res.NextRow() {
// node, err := scanNode(res)
// if err != nil {
// return nil, err
// }
// nodes = append(nodes, node)
// }
// }

// return nodes, nil
// }

// Returns node struct definition in YQL.
// func nodeStructTypeString() string {
// return `Struct<
// host: Utf8,
// last_heartbeat: Timestamp,
// inflight_task_count: Uint32>`
// }

// Returns table description for the table that holds nodes.
// func healthCheckTableDescription() persistence.CreateTableDescription {
// return persistence.NewCreateTableDescription(
// persistence.WithColumn("host", persistence.Optional(persistence.TypeUTF8)),
// persistence.WithColumn("last_heartbeat", persistence.Optional(persistence.TypeTimestamp)),
// persistence.WithColumn("inflight_task_count", persistence.Optional(persistence.TypeUint32)),
// persistence.WithPrimaryKeyColumn("host"),
// )
// }

////////////////////////////////////////////////////////////////////////////////

// Updates heartbeat timestamp and the current number of inflight tasks.
81 changes: 81 additions & 0 deletions cloud/tasks/persistence/health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package persistence

import (
"context"
"sync"
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/ydb-platform/nbs/cloud/tasks/errors"
"github.com/ydb-platform/nbs/cloud/tasks/metrics/empty"
"github.com/ydb-platform/nbs/cloud/tasks/metrics/mocks"
)

////////////////////////////////////////////////////////////////////////////////

func newStorage(
t *testing.T,
ctx context.Context,
db *YDBClient,
) *healthCheckStorage {

err := CreateYDBTables(
ctx,
db,
false, // dropUnusedColums
)
require.NoError(t, err)

storage := NewStorage(db)
require.NoError(t, err)

return storage
}

////////////////////////////////////////////////////////////////////////////////

func TestHealthCheckMetric(t *testing.T) {
ctx, cancel := context.WithCancel(newContext())
defer cancel()

db, err := newYDB(ctx, empty.NewRegistry())
require.NoError(t, err)
defer db.Close(ctx)

storage := newStorage(t, ctx, db)

registry := mocks.NewRegistryMock()
healthCheck := NewHealthCheck("test", storage, registry)

gaugeSetWg := sync.WaitGroup{}

gaugeSetWg.Add(1)
registry.GetGauge(
"successRate",
map[string]string{"component": "test"},
).On("Set", float64(0)).Once().Run(
func(args mock.Arguments) {
gaugeSetWg.Done()
},
)
gaugeSetWg.Wait()

healthCheck.AccountQuery(nil)
healthCheck.AccountQuery(nil)
healthCheck.AccountQuery(nil)
healthCheck.AccountQuery(errors.NewEmptyRetriableError())

gaugeSetWg.Add(1)
registry.GetGauge(
"successRate",
map[string]string{"component": "test"},
).On("Set", float64(3.0/4.0)).Once().Run(
func(args mock.Arguments) {
gaugeSetWg.Done()
},
)
gaugeSetWg.Wait()

registry.AssertAllExpectations(t)
}
11 changes: 7 additions & 4 deletions cloud/tasks/persistence/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ func NewS3Client(
region string,
credentials S3Credentials,
callTimeout time.Duration,
registry metrics.Registry,
s3MetricsRegistry metrics.Registry,
healthMetricsRegistry metrics.Registry,
maxRetriableErrorCount uint64,
) (*S3Client, error) {

s3Metrics := newS3Metrics(registry, callTimeout)
s3Metrics := newS3Metrics(callTimeout, s3MetricsRegistry, healthMetricsRegistry)

sessionConfig := &aws.Config{
Credentials: aws_credentials.NewStaticCredentials(
Expand Down Expand Up @@ -90,7 +91,8 @@ func NewS3Client(

func NewS3ClientFromConfig(
config *persistence_config.S3Config,
registry metrics.Registry,
s3MetricsRegistry metrics.Registry,
healthMetricsRegistry metrics.Registry,
) (*S3Client, error) {

credentials, err := NewS3CredentialsFromFile(config.GetCredentialsFilePath())
Expand All @@ -111,7 +113,8 @@ func NewS3ClientFromConfig(
config.GetRegion(),
credentials,
callTimeout,
registry,
s3MetricsRegistry,
healthMetricsRegistry,
config.GetMaxRetriableErrorCount(),
)
}
Expand Down
24 changes: 16 additions & 8 deletions cloud/tasks/persistence/s3_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ func s3CallDurationBuckets() metrics.DurationBuckets {
////////////////////////////////////////////////////////////////////////////////

type s3Metrics struct {
registry metrics.Registry
callTimeout time.Duration
callTimeout time.Duration
s3MetricsRegistry metrics.Registry
healthCheck *HealthCheck
}

func (m *s3Metrics) StatCall(
Expand All @@ -38,18 +39,20 @@ func (m *s3Metrics) StatCall(
start := time.Now()

return func(err *error) {
subRegistry := m.registry.WithTags(map[string]string{
subRegistry := m.s3MetricsRegistry.WithTags(map[string]string{
"call": name,
})

// Should initialize all counters before using them, to avoid 'no data'.
errorCounter := subRegistry.Counter("errors")
successCounter := subRegistry.Counter("success")
hangingCounter := subRegistry.Counter("hanging")
errorCounter := subRegistry.Counter("errors")
timeoutCounter := subRegistry.Counter("errors/timeout")
canceledCounter := subRegistry.Counter("errors/canceled")
timeHistogram := subRegistry.DurationHistogram("time", s3CallDurationBuckets())

m.healthCheck.AccountQuery(*err)

if time.Since(start) >= m.callTimeout {
logging.Error(
ctx,
Expand Down Expand Up @@ -110,7 +113,7 @@ func (m *s3Metrics) OnRetry(req *request.Request) {
req.RetryCount+1,
)

subRegistry := m.registry.WithTags(map[string]string{
subRegistry := m.s3MetricsRegistry.WithTags(map[string]string{
"call": req.Operation.Name,
})

Expand All @@ -119,9 +122,14 @@ func (m *s3Metrics) OnRetry(req *request.Request) {
retryCounter.Inc()
}

func newS3Metrics(registry metrics.Registry, callTimeout time.Duration) *s3Metrics {
func newS3Metrics(
callTimeout time.Duration,
s3MetricsRegistry metrics.Registry,
healthMetricsRegistry metrics.Registry,
) *s3Metrics {
return &s3Metrics{
registry: registry,
callTimeout: callTimeout,
callTimeout: callTimeout,
s3MetricsRegistry: s3MetricsRegistry,
healthCheck: NewHealthCheck("s3", healthMetricsRegistry),
}
}
Loading
Loading