Skip to content

Commit

Permalink
DATA-3441 Update data export command (#4596)
Browse files Browse the repository at this point in the history
  • Loading branch information
katiepeters authored Dec 18, 2024
1 parent 15d8098 commit 9fb1e11
Show file tree
Hide file tree
Showing 5 changed files with 404 additions and 224 deletions.
1 change: 1 addition & 0 deletions app/data_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func TestDataClient(t *testing.T) {
//nolint:deprecated,staticcheck
grpcClient.TabularDataByFilterFunc = func(ctx context.Context, in *pb.TabularDataByFilterRequest,
opts ...grpc.CallOption,
//nolint:deprecated,staticcheck
) (*pb.TabularDataByFilterResponse, error) {
test.That(t, in.DataRequest, test.ShouldResemble, dataRequestToProto(dataRequest))
test.That(t, in.CountOnly, test.ShouldBeTrue)
Expand Down
119 changes: 81 additions & 38 deletions cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,13 @@ const (
dataFlagAliasRobotName = "robot-name"
dataFlagPartName = "part-name"
dataFlagComponentType = "component-type"
dataFlagResourceSubtype = "resource-subtype"
dataFlagComponentName = "component-name"
dataFlagResourceName = "resource-name"
dataFlagMethod = "method"
dataFlagMimeTypes = "mime-types"
dataFlagStart = "start"
dataFlagEnd = "end"
dataFlagChunkLimit = "chunk-limit"
dataFlagParallelDownloads = "parallel"
dataFlagTags = "tags"
dataFlagBboxLabels = "bbox-labels"
Expand Down Expand Up @@ -192,11 +193,11 @@ var commonFilterFlags = []cli.Flag{
},
&cli.StringFlag{
Name: dataFlagStart,
Usage: "ISO-8601 timestamp indicating the start of the interval filter",
Usage: "ISO-8601 timestamp in RFC3339 format indicating the start of the interval filter",
},
&cli.StringFlag{
Name: dataFlagEnd,
Usage: "ISO-8601 timestamp indicating the end of the interval filter",
Usage: "ISO-8601 timestamp in RFC3339 format indicating the end of the interval filter",
},
&cli.StringSliceFlag{
Name: dataFlagBboxLabels,
Expand Down Expand Up @@ -622,42 +623,84 @@ var app = &cli.App{
HideHelpCommand: true,
Subcommands: []*cli.Command{
{
Name: "export",
Usage: "download data from Viam cloud",
UsageText: createUsageText("data export", []string{dataFlagDestination, dataFlagDataType}, true),
Flags: append([]cli.Flag{
&cli.PathFlag{
Name: dataFlagDestination,
Required: true,
Usage: "output directory for downloaded data",
},
&cli.UintFlag{
Name: dataFlagChunkLimit,
Usage: "maximum number of results per download request (tabular data only)",
Value: 100000,
},
&cli.UintFlag{
Name: dataFlagParallelDownloads,
Usage: "number of download requests to make in parallel (binary data only)",
Value: 100,
},
&cli.StringSliceFlag{
Name: dataFlagTags,
Usage: "tags filter. " +
"accepts tagged for all tagged data, untagged for all untagged data, or a list of tags for all data matching any of the tags",
},
&cli.StringFlag{
Name: dataFlagDataType,
Usage: "type of data to download. can be binary or tabular",
Name: "export",
Usage: "download data from Viam cloud",
Subcommands: []*cli.Command{
{
Name: "binary",
Usage: "download binary data",
UsageText: createUsageText("data export binary", []string{dataFlagDestination}, true),
Flags: append([]cli.Flag{
&cli.PathFlag{
Name: dataFlagDestination,
Required: true,
Usage: "output directory for downloaded data",
},
&cli.UintFlag{
Name: dataFlagParallelDownloads,
Usage: "number of download requests to make in parallel",
Value: 100,
},
&cli.UintFlag{
Name: dataFlagTimeout,
Usage: "number of seconds to wait for large file downloads",
Value: 30,
},
&cli.StringSliceFlag{
Name: dataFlagTags,
Usage: "tags filter. accepts tagged for all tagged data, untagged for all untagged data, or a list of tags",
},
}, commonFilterFlags...),
Action: createCommandWithT[dataExportBinaryArgs](DataExportBinaryAction),
},
&cli.UintFlag{
Name: dataFlagTimeout,
Usage: "number of seconds to wait for large file downloads",
Value: 30,
{
Name: "tabular",
Usage: "download tabular data",
UsageText: createUsageText("data export tabular", []string{
dataFlagDestination,
dataFlagPartID,
dataFlagResourceName,
dataFlagResourceSubtype,
dataFlagMethod,
}, true),
Flags: []cli.Flag{
&cli.PathFlag{
Name: dataFlagDestination,
Required: true,
Usage: "output directory for downloaded data",
},
&cli.StringFlag{
Name: dataFlagPartID,
Required: true,
Usage: "part id",
},
&cli.StringFlag{
Name: dataFlagResourceName,
Required: true,
Usage: "resource name (sometimes called 'component name')",
},
&cli.StringFlag{
Name: dataFlagResourceSubtype,
Required: true,
Usage: "resource subtype (sometimes called 'component type')",
},
&cli.StringFlag{
Name: dataFlagMethod,
Required: true,
Usage: "method name",
},
&cli.StringFlag{
Name: "start",
Usage: "ISO-8601 timestamp in RFC3339 format indicating the start of the interval",
},
&cli.StringFlag{
Name: "end",
Usage: "ISO-8601 timestamp in RFC3339 format indicating the end of the interval",
},
},
Action: createCommandWithT[dataExportTabularArgs](DataExportTabularAction),
},
},
commonFilterFlags...),
Action: createCommandWithT[dataExportArgs](DataExportAction),
},
{
Name: "delete",
Expand All @@ -677,12 +720,12 @@ var app = &cli.App{
&cli.StringFlag{
Name: dataFlagStart,
Required: true,
Usage: "ISO-8601 timestamp indicating the start of the interval filter",
Usage: "ISO-8601 timestamp in RFC3339 format indicating the start of the interval filter",
},
&cli.StringFlag{
Name: dataFlagEnd,
Required: true,
Usage: "ISO-8601 timestamp indicating the end of the interval filter",
Usage: "ISO-8601 timestamp in RFC3339 format indicating the end of the interval filter",
},
&cli.StringSliceFlag{
Name: dataFlagLocationIDs,
Expand Down
167 changes: 108 additions & 59 deletions cli/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package cli

import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"io/fs"
"maps"
"os"
Expand Down Expand Up @@ -103,7 +105,6 @@ func setup(

if dataClient != nil {
// these flags are only relevant when testing a dataClient
flags.String(dataFlagDataType, dataTypeTabular, "")
flags.String(dataFlagDestination, utils.ResolveFile(""), "")
}

Expand Down Expand Up @@ -383,76 +384,124 @@ func TestUpdateBillingServiceAction(t *testing.T) {
test.That(t, out.messages[7], test.ShouldContainSubstring, "USA")
}

func TestTabularDataByFilterAction(t *testing.T) {
pbStruct, err := protoutils.StructToStructPb(map[string]interface{}{"bool": true, "string": "true", "float": float64(1)})
test.That(t, err, test.ShouldBeNil)
type mockDataServiceClient struct {
grpc.ClientStream
responses []*datapb.ExportTabularDataResponse
index int
err error
}

// calls to `TabularDataByFilter` will repeat so long as data continue to be returned,
// so we need a way of telling our injected method when data has already been sent so we
// can send an empty response
var dataRequested bool
//nolint:deprecated,staticcheck
tabularDataByFilterFunc := func(ctx context.Context, in *datapb.TabularDataByFilterRequest, opts ...grpc.CallOption,
//nolint:deprecated
) (*datapb.TabularDataByFilterResponse, error) {
if dataRequested {
//nolint:deprecated,staticcheck
return &datapb.TabularDataByFilterResponse{}, nil
}
dataRequested = true
//nolint:deprecated,staticcheck
return &datapb.TabularDataByFilterResponse{
//nolint:deprecated,staticcheck
Data: []*datapb.TabularData{{Data: pbStruct}},
Metadata: []*datapb.CaptureMetadata{{LocationId: "loc-id"}},
}, nil
func (m *mockDataServiceClient) Recv() (*datapb.ExportTabularDataResponse, error) {
if m.err != nil {
return nil, m.err
}

dsc := &inject.DataServiceClient{
TabularDataByFilterFunc: tabularDataByFilterFunc,
if m.index >= len(m.responses) {
return nil, io.EOF
}

cCtx, ac, out, errOut := setup(&inject.AppServiceClient{}, dsc, nil, nil, nil, "token")
resp := m.responses[m.index]
m.index++

test.That(t, ac.dataExportAction(cCtx, parseStructFromCtx[dataExportArgs](cCtx)), test.ShouldBeNil)
test.That(t, len(errOut.messages), test.ShouldEqual, 0)
test.That(t, len(out.messages), test.ShouldEqual, 4)
test.That(t, out.messages[0], test.ShouldEqual, "Downloading..")
test.That(t, out.messages[1], test.ShouldEqual, ".")
test.That(t, out.messages[2], test.ShouldEqual, ".")
test.That(t, out.messages[3], test.ShouldEqual, "\n")

// expectedDataSize is the expected string length of the data returned by the injected call
expectedDataSize := 98
b := make([]byte, expectedDataSize)

// `data.ndjson` is the standardized name of the file data is written to in the `tabularData` call
filePath := utils.ResolveFile("data/data.ndjson")
file, err := os.Open(filePath)
test.That(t, err, test.ShouldBeNil)
return resp, nil
}

dataSize, err := file.Read(b)
test.That(t, err, test.ShouldBeNil)
test.That(t, dataSize, test.ShouldEqual, expectedDataSize)
func newMockExportStream(responses []*datapb.ExportTabularDataResponse, err error) *mockDataServiceClient {
return &mockDataServiceClient{
responses: responses,
err: err,
}
}

savedData := string(b)
expectedData := "{\"MetadataIndex\":0,\"TimeReceived\":null,\"TimeRequested\":null,\"bool\":true,\"float\":1,\"string\":\"true\"}"
test.That(t, savedData, test.ShouldEqual, expectedData)
func TestDataExportTabularAction(t *testing.T) {
t.Run("successful case", func(t *testing.T) {
pbStructPayload1, err := protoutils.StructToStructPb(map[string]interface{}{"bool": true, "string": "true", "float": float64(1)})
test.That(t, err, test.ShouldBeNil)

expectedMetadataSize := 23
b = make([]byte, expectedMetadataSize)
pbStructPayload2, err := protoutils.StructToStructPb(map[string]interface{}{"booly": false, "string": "true", "float": float64(1)})
test.That(t, err, test.ShouldBeNil)

// metadata is named `0.json` based on its index in the metadata array
filePath = utils.ResolveFile("metadata/0.json")
file, err = os.Open(filePath)
test.That(t, err, test.ShouldBeNil)
exportTabularDataFunc := func(ctx context.Context, in *datapb.ExportTabularDataRequest, opts ...grpc.CallOption,
) (datapb.DataService_ExportTabularDataClient, error) {
return newMockExportStream([]*datapb.ExportTabularDataResponse{
{LocationId: "loc-id", Payload: pbStructPayload1},
{LocationId: "loc-id", Payload: pbStructPayload2},
}, nil), nil
}

metadataSize, err := file.Read(b)
test.That(t, err, test.ShouldBeNil)
test.That(t, metadataSize, test.ShouldEqual, expectedMetadataSize)
dsc := &inject.DataServiceClient{
ExportTabularDataFunc: exportTabularDataFunc,
}

cCtx, ac, out, errOut := setup(&inject.AppServiceClient{}, dsc, nil, nil, nil, "token")

test.That(t, ac.dataExportTabularAction(cCtx, parseStructFromCtx[dataExportTabularArgs](cCtx)), test.ShouldBeNil)
test.That(t, len(errOut.messages), test.ShouldEqual, 0)
test.That(t, len(out.messages), test.ShouldEqual, 3)
test.That(t, strings.Join(out.messages, ""), test.ShouldEqual, "Downloading...\n")

filePath := utils.ResolveFile(dataFileName)

data, err := os.ReadFile(filePath)
test.That(t, err, test.ShouldBeNil)

// Output is unstable, so parse back into maps before comparing to expected.
var actual []map[string]interface{}
decoder := json.NewDecoder(strings.NewReader(string(data)))
for decoder.More() {
var item map[string]interface{}
err = decoder.Decode(&item)
test.That(t, err, test.ShouldBeNil)
actual = append(actual, item)
}

savedMetadata := string(b)
test.That(t, savedMetadata, test.ShouldEqual, "{\"locationId\":\"loc-id\"}")
expectedData := []map[string]interface{}{
{
"locationId": "loc-id",
"payload": map[string]interface{}{
"bool": true,
"float": float64(1),
"string": "true",
},
},
{
"locationId": "loc-id",
"payload": map[string]interface{}{
"booly": false,
"float": float64(1),
"string": "true",
},
},
}

test.That(t, actual, test.ShouldResemble, expectedData)
})

t.Run("error case", func(t *testing.T) {
exportTabularDataFunc := func(ctx context.Context, in *datapb.ExportTabularDataRequest, opts ...grpc.CallOption,
) (datapb.DataService_ExportTabularDataClient, error) {
return newMockExportStream([]*datapb.ExportTabularDataResponse{}, errors.New("whoops")), nil
}

dsc := &inject.DataServiceClient{
ExportTabularDataFunc: exportTabularDataFunc,
}

cCtx, ac, out, errOut := setup(&inject.AppServiceClient{}, dsc, nil, nil, nil, "token")

err := ac.dataExportTabularAction(cCtx, parseStructFromCtx[dataExportTabularArgs](cCtx))
test.That(t, err, test.ShouldBeError, errors.New("error receiving tabular data: whoops"))
test.That(t, len(errOut.messages), test.ShouldEqual, 0)

// Test that export was retried (total of 5 tries).
test.That(t, len(out.messages), test.ShouldEqual, 7)
test.That(t, strings.Join(out.messages, ""), test.ShouldEqual, "Downloading.......\n")

// Test that the data.ndjson file was removed.
filePath := utils.ResolveFile(dataFileName)
_, err = os.ReadFile(filePath)
test.That(t, err, test.ShouldBeError, fmt.Errorf("open %s: no such file or directory", filePath))
})
}

func TestBaseURLParsing(t *testing.T) {
Expand Down
Loading

0 comments on commit 9fb1e11

Please sign in to comment.