From 9fb1e118eaad0692afbb04a0ab43ccf0c876c0f8 Mon Sep 17 00:00:00 2001 From: Katie Peters Date: Wed, 18 Dec 2024 14:10:58 -0500 Subject: [PATCH] DATA-3441 Update data export command (#4596) --- app/data_client_test.go | 1 + cli/app.go | 119 ++++-- cli/client_test.go | 167 ++++++--- cli/data.go | 339 +++++++++++------- .../replay/replay_utils_test.go | 2 + 5 files changed, 404 insertions(+), 224 deletions(-) diff --git a/app/data_client_test.go b/app/data_client_test.go index d2f2ec2b7b9..b5fa0e4b42f 100644 --- a/app/data_client_test.go +++ b/app/data_client_test.go @@ -287,6 +287,7 @@ func TestDataClient(t *testing.T) { //nolint:deprecated,staticcheck grpcClient.TabularDataByFilterFunc = func(ctx context.Context, in *pb.TabularDataByFilterRequest, opts ...grpc.CallOption, + //nolint:deprecated,staticcheck ) (*pb.TabularDataByFilterResponse, error) { test.That(t, in.DataRequest, test.ShouldResemble, dataRequestToProto(dataRequest)) test.That(t, in.CountOnly, test.ShouldBeTrue) diff --git a/cli/app.go b/cli/app.go index ba15a22ca00..2458b0c1c6d 100644 --- a/cli/app.go +++ b/cli/app.go @@ -101,12 +101,13 @@ const ( dataFlagAliasRobotName = "robot-name" dataFlagPartName = "part-name" dataFlagComponentType = "component-type" + dataFlagResourceSubtype = "resource-subtype" dataFlagComponentName = "component-name" + dataFlagResourceName = "resource-name" dataFlagMethod = "method" dataFlagMimeTypes = "mime-types" dataFlagStart = "start" dataFlagEnd = "end" - dataFlagChunkLimit = "chunk-limit" dataFlagParallelDownloads = "parallel" dataFlagTags = "tags" dataFlagBboxLabels = "bbox-labels" @@ -192,11 +193,11 @@ var commonFilterFlags = []cli.Flag{ }, &cli.StringFlag{ Name: dataFlagStart, - Usage: "ISO-8601 timestamp indicating the start of the interval filter", + Usage: "ISO-8601 timestamp in RFC3339 format indicating the start of the interval filter", }, &cli.StringFlag{ Name: dataFlagEnd, - Usage: "ISO-8601 timestamp indicating the end of the interval filter", + Usage: "ISO-8601 timestamp in RFC3339 format indicating the end of the interval filter", }, &cli.StringSliceFlag{ Name: dataFlagBboxLabels, @@ -622,42 +623,84 @@ var app = &cli.App{ HideHelpCommand: true, Subcommands: []*cli.Command{ { - Name: "export", - Usage: "download data from Viam cloud", - UsageText: createUsageText("data export", []string{dataFlagDestination, dataFlagDataType}, true), - Flags: append([]cli.Flag{ - &cli.PathFlag{ - Name: dataFlagDestination, - Required: true, - Usage: "output directory for downloaded data", - }, - &cli.UintFlag{ - Name: dataFlagChunkLimit, - Usage: "maximum number of results per download request (tabular data only)", - Value: 100000, - }, - &cli.UintFlag{ - Name: dataFlagParallelDownloads, - Usage: "number of download requests to make in parallel (binary data only)", - Value: 100, - }, - &cli.StringSliceFlag{ - Name: dataFlagTags, - Usage: "tags filter. " + - "accepts tagged for all tagged data, untagged for all untagged data, or a list of tags for all data matching any of the tags", - }, - &cli.StringFlag{ - Name: dataFlagDataType, - Usage: "type of data to download. can be binary or tabular", + Name: "export", + Usage: "download data from Viam cloud", + Subcommands: []*cli.Command{ + { + Name: "binary", + Usage: "download binary data", + UsageText: createUsageText("data export binary", []string{dataFlagDestination}, true), + Flags: append([]cli.Flag{ + &cli.PathFlag{ + Name: dataFlagDestination, + Required: true, + Usage: "output directory for downloaded data", + }, + &cli.UintFlag{ + Name: dataFlagParallelDownloads, + Usage: "number of download requests to make in parallel", + Value: 100, + }, + &cli.UintFlag{ + Name: dataFlagTimeout, + Usage: "number of seconds to wait for large file downloads", + Value: 30, + }, + &cli.StringSliceFlag{ + Name: dataFlagTags, + Usage: "tags filter. accepts tagged for all tagged data, untagged for all untagged data, or a list of tags", + }, + }, commonFilterFlags...), + Action: createCommandWithT[dataExportBinaryArgs](DataExportBinaryAction), }, - &cli.UintFlag{ - Name: dataFlagTimeout, - Usage: "number of seconds to wait for large file downloads", - Value: 30, + { + Name: "tabular", + Usage: "download tabular data", + UsageText: createUsageText("data export tabular", []string{ + dataFlagDestination, + dataFlagPartID, + dataFlagResourceName, + dataFlagResourceSubtype, + dataFlagMethod, + }, true), + Flags: []cli.Flag{ + &cli.PathFlag{ + Name: dataFlagDestination, + Required: true, + Usage: "output directory for downloaded data", + }, + &cli.StringFlag{ + Name: dataFlagPartID, + Required: true, + Usage: "part id", + }, + &cli.StringFlag{ + Name: dataFlagResourceName, + Required: true, + Usage: "resource name (sometimes called 'component name')", + }, + &cli.StringFlag{ + Name: dataFlagResourceSubtype, + Required: true, + Usage: "resource subtype (sometimes called 'component type')", + }, + &cli.StringFlag{ + Name: dataFlagMethod, + Required: true, + Usage: "method name", + }, + &cli.StringFlag{ + Name: "start", + Usage: "ISO-8601 timestamp in RFC3339 format indicating the start of the interval", + }, + &cli.StringFlag{ + Name: "end", + Usage: "ISO-8601 timestamp in RFC3339 format indicating the end of the interval", + }, + }, + Action: createCommandWithT[dataExportTabularArgs](DataExportTabularAction), }, }, - commonFilterFlags...), - Action: createCommandWithT[dataExportArgs](DataExportAction), }, { Name: "delete", @@ -677,12 +720,12 @@ var app = &cli.App{ &cli.StringFlag{ Name: dataFlagStart, Required: true, - Usage: "ISO-8601 timestamp indicating the start of the interval filter", + Usage: "ISO-8601 timestamp in RFC3339 format indicating the start of the interval filter", }, &cli.StringFlag{ Name: dataFlagEnd, Required: true, - Usage: "ISO-8601 timestamp indicating the end of the interval filter", + Usage: "ISO-8601 timestamp in RFC3339 format indicating the end of the interval filter", }, &cli.StringSliceFlag{ Name: dataFlagLocationIDs, diff --git a/cli/client_test.go b/cli/client_test.go index bda3d9e26ca..1402ef6d2cc 100644 --- a/cli/client_test.go +++ b/cli/client_test.go @@ -2,9 +2,11 @@ package cli import ( "context" + "encoding/json" "errors" "flag" "fmt" + "io" "io/fs" "maps" "os" @@ -103,7 +105,6 @@ func setup( if dataClient != nil { // these flags are only relevant when testing a dataClient - flags.String(dataFlagDataType, dataTypeTabular, "") flags.String(dataFlagDestination, utils.ResolveFile(""), "") } @@ -383,76 +384,124 @@ func TestUpdateBillingServiceAction(t *testing.T) { test.That(t, out.messages[7], test.ShouldContainSubstring, "USA") } -func TestTabularDataByFilterAction(t *testing.T) { - pbStruct, err := protoutils.StructToStructPb(map[string]interface{}{"bool": true, "string": "true", "float": float64(1)}) - test.That(t, err, test.ShouldBeNil) +type mockDataServiceClient struct { + grpc.ClientStream + responses []*datapb.ExportTabularDataResponse + index int + err error +} - // calls to `TabularDataByFilter` will repeat so long as data continue to be returned, - // so we need a way of telling our injected method when data has already been sent so we - // can send an empty response - var dataRequested bool - //nolint:deprecated,staticcheck - tabularDataByFilterFunc := func(ctx context.Context, in *datapb.TabularDataByFilterRequest, opts ...grpc.CallOption, - //nolint:deprecated - ) (*datapb.TabularDataByFilterResponse, error) { - if dataRequested { - //nolint:deprecated,staticcheck - return &datapb.TabularDataByFilterResponse{}, nil - } - dataRequested = true - //nolint:deprecated,staticcheck - return &datapb.TabularDataByFilterResponse{ - //nolint:deprecated,staticcheck - Data: []*datapb.TabularData{{Data: pbStruct}}, - Metadata: []*datapb.CaptureMetadata{{LocationId: "loc-id"}}, - }, nil +func (m *mockDataServiceClient) Recv() (*datapb.ExportTabularDataResponse, error) { + if m.err != nil { + return nil, m.err } - dsc := &inject.DataServiceClient{ - TabularDataByFilterFunc: tabularDataByFilterFunc, + if m.index >= len(m.responses) { + return nil, io.EOF } - cCtx, ac, out, errOut := setup(&inject.AppServiceClient{}, dsc, nil, nil, nil, "token") + resp := m.responses[m.index] + m.index++ - test.That(t, ac.dataExportAction(cCtx, parseStructFromCtx[dataExportArgs](cCtx)), test.ShouldBeNil) - test.That(t, len(errOut.messages), test.ShouldEqual, 0) - test.That(t, len(out.messages), test.ShouldEqual, 4) - test.That(t, out.messages[0], test.ShouldEqual, "Downloading..") - test.That(t, out.messages[1], test.ShouldEqual, ".") - test.That(t, out.messages[2], test.ShouldEqual, ".") - test.That(t, out.messages[3], test.ShouldEqual, "\n") - - // expectedDataSize is the expected string length of the data returned by the injected call - expectedDataSize := 98 - b := make([]byte, expectedDataSize) - - // `data.ndjson` is the standardized name of the file data is written to in the `tabularData` call - filePath := utils.ResolveFile("data/data.ndjson") - file, err := os.Open(filePath) - test.That(t, err, test.ShouldBeNil) + return resp, nil +} - dataSize, err := file.Read(b) - test.That(t, err, test.ShouldBeNil) - test.That(t, dataSize, test.ShouldEqual, expectedDataSize) +func newMockExportStream(responses []*datapb.ExportTabularDataResponse, err error) *mockDataServiceClient { + return &mockDataServiceClient{ + responses: responses, + err: err, + } +} - savedData := string(b) - expectedData := "{\"MetadataIndex\":0,\"TimeReceived\":null,\"TimeRequested\":null,\"bool\":true,\"float\":1,\"string\":\"true\"}" - test.That(t, savedData, test.ShouldEqual, expectedData) +func TestDataExportTabularAction(t *testing.T) { + t.Run("successful case", func(t *testing.T) { + pbStructPayload1, err := protoutils.StructToStructPb(map[string]interface{}{"bool": true, "string": "true", "float": float64(1)}) + test.That(t, err, test.ShouldBeNil) - expectedMetadataSize := 23 - b = make([]byte, expectedMetadataSize) + pbStructPayload2, err := protoutils.StructToStructPb(map[string]interface{}{"booly": false, "string": "true", "float": float64(1)}) + test.That(t, err, test.ShouldBeNil) - // metadata is named `0.json` based on its index in the metadata array - filePath = utils.ResolveFile("metadata/0.json") - file, err = os.Open(filePath) - test.That(t, err, test.ShouldBeNil) + exportTabularDataFunc := func(ctx context.Context, in *datapb.ExportTabularDataRequest, opts ...grpc.CallOption, + ) (datapb.DataService_ExportTabularDataClient, error) { + return newMockExportStream([]*datapb.ExportTabularDataResponse{ + {LocationId: "loc-id", Payload: pbStructPayload1}, + {LocationId: "loc-id", Payload: pbStructPayload2}, + }, nil), nil + } - metadataSize, err := file.Read(b) - test.That(t, err, test.ShouldBeNil) - test.That(t, metadataSize, test.ShouldEqual, expectedMetadataSize) + dsc := &inject.DataServiceClient{ + ExportTabularDataFunc: exportTabularDataFunc, + } + + cCtx, ac, out, errOut := setup(&inject.AppServiceClient{}, dsc, nil, nil, nil, "token") + + test.That(t, ac.dataExportTabularAction(cCtx, parseStructFromCtx[dataExportTabularArgs](cCtx)), test.ShouldBeNil) + test.That(t, len(errOut.messages), test.ShouldEqual, 0) + test.That(t, len(out.messages), test.ShouldEqual, 3) + test.That(t, strings.Join(out.messages, ""), test.ShouldEqual, "Downloading...\n") + + filePath := utils.ResolveFile(dataFileName) + + data, err := os.ReadFile(filePath) + test.That(t, err, test.ShouldBeNil) + + // Output is unstable, so parse back into maps before comparing to expected. + var actual []map[string]interface{} + decoder := json.NewDecoder(strings.NewReader(string(data))) + for decoder.More() { + var item map[string]interface{} + err = decoder.Decode(&item) + test.That(t, err, test.ShouldBeNil) + actual = append(actual, item) + } - savedMetadata := string(b) - test.That(t, savedMetadata, test.ShouldEqual, "{\"locationId\":\"loc-id\"}") + expectedData := []map[string]interface{}{ + { + "locationId": "loc-id", + "payload": map[string]interface{}{ + "bool": true, + "float": float64(1), + "string": "true", + }, + }, + { + "locationId": "loc-id", + "payload": map[string]interface{}{ + "booly": false, + "float": float64(1), + "string": "true", + }, + }, + } + + test.That(t, actual, test.ShouldResemble, expectedData) + }) + + t.Run("error case", func(t *testing.T) { + exportTabularDataFunc := func(ctx context.Context, in *datapb.ExportTabularDataRequest, opts ...grpc.CallOption, + ) (datapb.DataService_ExportTabularDataClient, error) { + return newMockExportStream([]*datapb.ExportTabularDataResponse{}, errors.New("whoops")), nil + } + + dsc := &inject.DataServiceClient{ + ExportTabularDataFunc: exportTabularDataFunc, + } + + cCtx, ac, out, errOut := setup(&inject.AppServiceClient{}, dsc, nil, nil, nil, "token") + + err := ac.dataExportTabularAction(cCtx, parseStructFromCtx[dataExportTabularArgs](cCtx)) + test.That(t, err, test.ShouldBeError, errors.New("error receiving tabular data: whoops")) + test.That(t, len(errOut.messages), test.ShouldEqual, 0) + + // Test that export was retried (total of 5 tries). + test.That(t, len(out.messages), test.ShouldEqual, 7) + test.That(t, strings.Join(out.messages, ""), test.ShouldEqual, "Downloading.......\n") + + // Test that the data.ndjson file was removed. + filePath := utils.ResolveFile(dataFileName) + _, err = os.ReadFile(filePath) + test.That(t, err, test.ShouldBeError, fmt.Errorf("open %s: no such file or directory", filePath)) + }) } func TestBaseURLParsing(t *testing.T) { diff --git a/cli/data.go b/cli/data.go index e4906b81cc2..b37e677be83 100644 --- a/cli/data.go +++ b/cli/data.go @@ -5,13 +5,11 @@ import ( "bytes" "compress/gzip" "context" - "encoding/json" "fmt" "io" "net/http" "os" "path/filepath" - "strconv" "strings" "sync" "sync/atomic" @@ -31,15 +29,13 @@ import ( ) const ( + dataFileName = "data.ndjson" dataDir = "data" metadataDir = "metadata" maxRetryCount = 5 logEveryN = 100 maxLimit = 100 - dataTypeBinary = "binary" - dataTypeTabular = "tabular" - dataCommandAdd = "add" dataCommandRemove = "remove" @@ -70,7 +66,7 @@ type commonFilterArgs struct { Tags []string } -type dataExportArgs struct { +type dataExportBinaryArgs struct { Destination string ChunkLimit uint Parallel uint @@ -78,35 +74,34 @@ type dataExportArgs struct { Timeout uint } -// DataExportAction is the corresponding action for 'data export'. -func DataExportAction(c *cli.Context, args dataExportArgs) error { - client, err := newViamClient(c) +type dataExportTabularArgs struct { + Destination string + PartID string + ResourceName string + ResourceSubtype string + Method string + Start string + End string +} + +// DataExportBinaryAction is the corresponding action for 'data export binary'. +func DataExportBinaryAction(cCtx *cli.Context, args dataExportBinaryArgs) error { + client, err := newViamClient(cCtx) if err != nil { return err } - return client.dataExportAction(c, args) + return client.dataExportBinaryAction(cCtx, args) } -func (c *viamClient) dataExportAction(cCtx *cli.Context, args dataExportArgs) error { - filter, err := createDataFilter(cCtx) +// DataExportTabularAction is the corresponding action for 'data export tabular'. +func DataExportTabularAction(cCtx *cli.Context, args dataExportTabularArgs) error { + client, err := newViamClient(cCtx) if err != nil { return err } - switch args.DataType { - case dataTypeBinary: - if err := c.binaryData(args.Destination, filter, args.Parallel, args.Timeout); err != nil { - return err - } - case dataTypeTabular: - if err := c.tabularData(args.Destination, filter, args.ChunkLimit); err != nil { - return err - } - default: - return errors.Errorf("%s must be binary or tabular, got %q", dataFlagDataType, args.DataType) - } - return nil + return client.dataExportTabularAction(cCtx, args) } type dataTagByFilterArgs struct { @@ -274,30 +269,94 @@ func createDataFilter(c *cli.Context) (*datapb.Filter, error) { if len(args.BBoxLabels) != 0 { filter.BboxLabels = args.BBoxLabels } + if args.Start != "" || args.End != "" { + interval, err := createCaptureInterval(args.Start, args.End) + if err != nil { + return nil, err + } + + filter.Interval = interval + } + return filter, nil +} + +func createExportTabularRequest(c *cli.Context) (*datapb.ExportTabularDataRequest, error) { + args := parseStructFromCtx[dataExportTabularArgs](c) + request := &datapb.ExportTabularDataRequest{} + + if args.PartID != "" { + request.PartId = args.PartID + } + if args.ResourceName != "" { + request.ResourceName = args.ResourceName + } + if args.ResourceSubtype != "" { + request.ResourceSubtype = args.ResourceSubtype + } + if args.Method != "" { + request.MethodName = args.Method + } + + interval, err := createCaptureInterval(args.Start, args.End) + if err != nil { + return nil, err + } + + request.Interval = interval + + return request, nil +} + +func createCaptureInterval(startStr, endStr string) (*datapb.CaptureInterval, error) { var start *timestamppb.Timestamp var end *timestamppb.Timestamp timeLayout := time.RFC3339 - if args.Start != "" { - t, err := time.Parse(timeLayout, args.Start) + + if startStr != "" { + t, err := time.Parse(timeLayout, startStr) if err != nil { return nil, errors.Wrap(err, "could not parse start flag") } start = timestamppb.New(t) } - if args.End != "" { - t, err := time.Parse(timeLayout, args.End) + if endStr != "" { + t, err := time.Parse(timeLayout, endStr) if err != nil { return nil, errors.Wrap(err, "could not parse end flag") } end = timestamppb.New(t) } - if start != nil || end != nil { - filter.Interval = &datapb.CaptureInterval{ - Start: start, - End: end, - } + + return &datapb.CaptureInterval{ + Start: start, + End: end, + }, nil +} + +func (c *viamClient) dataExportBinaryAction(cCtx *cli.Context, args dataExportBinaryArgs) error { + filter, err := createDataFilter(cCtx) + if err != nil { + return err } - return filter, nil + + if err := c.binaryData(args.Destination, filter, args.Parallel, args.Timeout); err != nil { + return err + } + + return nil +} + +func (c *viamClient) dataExportTabularAction(cCtx *cli.Context, args dataExportTabularArgs) error { + request, err := createExportTabularRequest(cCtx) + if err != nil { + return err + } + + if err := c.tabularData(args.Destination, request); err != nil { + return err + } + + return nil } // BinaryData downloads binary data matching filter to dst. @@ -636,120 +695,146 @@ func filenameForDownload(meta *datapb.BinaryMetadata) string { return fileName } -// tabularData downloads binary data matching filter to dst. -func (c *viamClient) tabularData(dst string, filter *datapb.Filter, limit uint) error { +// tabularData downloads unified tabular data and metadata for the requested data source and interval to the specified destination. +func (c *viamClient) tabularData(dest string, request *datapb.ExportTabularDataRequest) error { if err := c.ensureLoggedIn(); err != nil { return err } - if err := makeDestinationDirs(dst); err != nil { + if err := makeDestinationDirs(dest); err != nil { return errors.Wrapf(err, "could not create destination directories") } - var err error - //nolint:deprecated,staticcheck - var resp *datapb.TabularDataByFilterResponse - // TODO(DATA-640): Support export in additional formats. - //nolint:gosec - dataFile, err := os.Create(filepath.Join(dst, dataDir, "data.ndjson")) - if err != nil { - return errors.Wrapf(err, "could not create data file") - } - w := bufio.NewWriter(dataFile) + fmt.Fprintf(c.c.App.Writer, "Downloading..") //nolint:errcheck - fmt.Fprintf(c.c.App.Writer, "Downloading..") //nolint:errcheck // no newline - var last string - mdIndexes := make(map[string]int) - mdIndex := 0 - for { - for count := 0; count < maxRetryCount; count++ { - //nolint:deprecated,staticcheck - resp, err = c.dataClient.TabularDataByFilter(context.Background(), &datapb.TabularDataByFilterRequest{ - DataRequest: &datapb.DataRequest{ - Filter: filter, - Limit: uint64(limit), - Last: last, - }, - CountOnly: false, - }) - fmt.Fprintf(c.c.App.Writer, ".") //nolint:errcheck // no newline - if err == nil { - break + for count := 0; count < maxRetryCount; count++ { + err := func() error { + dataFilePath := filepath.Join(dest, dataFileName) + dataFile, err := os.Create(dataFilePath) //nolint:gosec + if err != nil { + return errors.Wrapf(err, "could not create data file") } - } - if err != nil { - return err - } - last = resp.GetLast() - mds := resp.GetMetadata() - if len(mds) == 0 { - break - } - // Map the current response's metadata indexes to those combined across all responses. - localToGlobalMDIndex := make(map[int]int) - for i, md := range mds { - currMDIndex, ok := mdIndexes[md.String()] - if ok { - localToGlobalMDIndex[i] = currMDIndex - continue // Already have this metadata file, so skip creating it again. - } - mdIndexes[md.String()] = mdIndex - localToGlobalMDIndex[i] = mdIndex + writer := bufio.NewWriter(dataFile) - mdJSONBytes, err := protojson.Marshal(md) - if err != nil { - return errors.Wrap(err, "could not marshal metadata") - } - //nolint:gosec - mdFile, err := os.Create(filepath.Join(dst, metadataDir, strconv.Itoa(mdIndex)+".json")) - if err != nil { - return errors.Wrapf(err, fmt.Sprintf("could not create metadata file for metadata index %d", mdIndex)) //nolint:govet - } - if _, err := mdFile.Write(mdJSONBytes); err != nil { - return errors.Wrapf(err, "could not write to metadata file %s", mdFile.Name()) - } - if err := mdFile.Close(); err != nil { - return errors.Wrapf(err, "could not close metadata file %s", mdFile.Name()) - } - mdIndex++ - } + dataRowChan := make(chan []byte) + errChan := make(chan error, 1) - data := resp.GetData() - for _, datum := range data { - // Write everything as json for now. - d := datum.GetData() - if d == nil { - continue - } - m := d.AsMap() - m["TimeRequested"] = datum.GetTimeRequested() - m["TimeReceived"] = datum.GetTimeReceived() - m["MetadataIndex"] = localToGlobalMDIndex[int(datum.GetMetadataIndex())] - j, err := json.Marshal(m) - if err != nil { - return errors.Wrap(err, "could not marshal JSON response") - } - _, err = w.Write(append(j, []byte("\n")...)) - if err != nil { - return errors.Wrapf(err, "could not write to file %s", dataFile.Name()) + var exportErr error + + ctx, cancel := context.WithCancel(context.Background()) + + defer func() { + writer.Flush() //nolint:errcheck,gosec + dataFile.Close() //nolint:errcheck,gosec + cancel() + + if exportErr != nil { + os.Remove(dataFile.Name()) //nolint:errcheck,gosec + } + }() + + go func() { + defer close(dataRowChan) + fmt.Fprintf(c.c.App.Writer, ".") //nolint:errcheck // Adds '.' to 'Downloading..' output. + + stream, err := c.dataClient.ExportTabularData(ctx, request) + if err != nil { + errChan <- errors.Wrap(err, "failed to export tabular data") + return + } + + for { + select { + case <-ctx.Done(): + return + default: + resp, err := stream.Recv() + if errors.Is(err, io.EOF) { + return + } + if err != nil { + errChan <- errors.Wrap(err, "error receiving tabular data") + return + } + + dataRow, err := protojson.Marshal(resp) + if err != nil { + errChan <- errors.Wrap(err, "error formatting tabular data") + return + } + + select { + case dataRowChan <- dataRow: + // Successfully sent. + case <-ctx.Done(): + return + } + } + } + }() + + for { + select { + case dataRow, ok := <-dataRowChan: + // No more data to write. + if !ok { + if err = writer.Flush(); err != nil { + exportErr = errors.Wrap(err, "error writing data to file") + return exportErr + } + + return nil + } + + if err = writeData(writer, dataRow); err != nil { + exportErr = errors.Wrap(err, "error writing data") + return exportErr + } + case err := <-errChan: + exportErr = err + return err + case <-ctx.Done(): + exportErr = ctx.Err() + return ctx.Err() + } } + }() + + if err != nil && count < maxRetryCount-1 { + continue } - } - printf(c.c.App.Writer, "") // newline - if err := w.Flush(); err != nil { - return errors.Wrapf(err, "could not flush writer for %s", dataFile.Name()) + printf(c.c.App.Writer, "") // newline + return err } return nil } -func makeDestinationDirs(dst string) error { - if err := os.MkdirAll(filepath.Join(dst, dataDir), 0o700); err != nil { +func writeData(writer *bufio.Writer, dataRow []byte) error { + _, err := writer.Write(dataRow) + if err != nil { + return err + } + + err = writer.WriteByte('\n') + if err != nil { return err } - if err := os.MkdirAll(filepath.Join(dst, metadataDir), 0o700); err != nil { + + // Periodically flush to keep buffer size down. + if writer.Size() > 10_000_000 { + if err = writer.Flush(); err != nil { + return err + } + } + + return nil +} + +func makeDestinationDirs(dst string) error { + if err := os.MkdirAll(dst, 0o700); err != nil { return err } return nil diff --git a/components/movementsensor/replay/replay_utils_test.go b/components/movementsensor/replay/replay_utils_test.go index ee35c74dd0d..73ccee66397 100644 --- a/components/movementsensor/replay/replay_utils_test.go +++ b/components/movementsensor/replay/replay_utils_test.go @@ -53,6 +53,7 @@ func (mDServer *mockDataServiceServer) TabularDataByFilter(ctx context.Context, last := req.DataRequest.GetLast() limit := req.DataRequest.GetLimit() + //nolint:deprecated,staticcheck var dataset []*datapb.TabularData var dataIndex int var err error @@ -79,6 +80,7 @@ func (mDServer *mockDataServiceServer) TabularDataByFilter(ctx context.Context, last = fmt.Sprint(dataIndex) + //nolint:deprecated,staticcheck tabularData := &datapb.TabularData{ Data: data, TimeRequested: timeReq,