Skip to content

Commit

Permalink
feat: Add retry info to files statistics endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
michaljurecko committed May 22, 2024
1 parent 0fd3f17 commit 8cb9d17
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
19 changes: 18 additions & 1 deletion api/stream/design.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,16 +913,20 @@ var Levels = Type("Levels", func() {

var Level = Type("Level", func() {
Attribute("firstRecordAt", String, func() {
Description("Timestamp of the first received record.")
Format(FormatDateTime)
Example("2022-04-28T14:20:04.000Z")
})
Attribute("lastRecordAt", String, func() {
Description("Timestamp of the last received record.")
Format(FormatDateTime)
Example("2022-04-28T14:20:04.000Z")
})
Attribute("recordsCount", UInt64)
Required("recordsCount")
Attribute("uncompressedSize", UInt64)
Attribute("uncompressedSize", UInt64, func() {
Description("Uncompressed size of data in bytes.")
})
Required("uncompressedSize")
})

Expand Down Expand Up @@ -957,6 +961,19 @@ var SinkFile = Type("SinkFile", func() {
Format(FormatDateTime)
Example("2022-04-28T14:20:04.000Z")
})
Attribute("retryAttempt", Int, func() {
Description("Number of failed attempts.")
Example(3)
})
Attribute("retryReason", String, func() {
Description("Reason of the last failed attempt.")
Example("network problem")
})
Attribute("retryAfter", String, func() {
Description("Next attempt time.")
Format(FormatDateTime)
Example("2022-04-28T14:20:04.000Z")
})
Required("state", "openedAt", "statistics")
Attribute("statistics", SinkFileStatistics)
})
Expand Down
10 changes: 9 additions & 1 deletion internal/pkg/service/stream/api/service/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (s *service) SinkStatisticsFiles(ctx context.Context, d dependencies.SinkRe
filesMap := make(map[model.FileID]*stream.SinkFile)

err = d.StorageRepository().File().ListRecentIn(d.SinkKey()).Do(ctx).ForEachValue(func(value model.File, header *iterator.Header) error {
filesMap[value.FileID] = &stream.SinkFile{
out := &stream.SinkFile{
State: value.State,
OpenedAt: value.OpenedAt().String(),
ClosingAt: timeToString(value.ClosingAt),
Expand All @@ -250,6 +250,14 @@ func (s *service) SinkStatisticsFiles(ctx context.Context, d dependencies.SinkRe
Total: &stream.Level{},
},
}

if value.RetryAttempt > 0 {
out.RetryAttempt = ptr.Ptr(value.RetryAttempt)
out.RetryReason = ptr.Ptr(value.RetryReason)
out.RetryAfter = ptr.Ptr(value.RetryAfter.String())
}

filesMap[value.FileID] = out
return nil
})
if err != nil {
Expand Down

0 comments on commit 8cb9d17

Please sign in to comment.