Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATA-3441 Update data export command #4596

Merged
merged 34 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9df72a5
Update API version and add deprecation nolints #DATA-3441
katiepeters Dec 2, 2024
c391e5e
Split binary and tabular into separate subcommands #DATA-3441
katiepeters Dec 3, 2024
b90725c
Fix duplicate flag; add new flag names #DATA-3441
katiepeters Dec 3, 2024
337a368
Create POC streaming download #DATA-3441
katiepeters Dec 3, 2024
94ee605
Remove 'data' folder; use temporary file until stream completes #DATA…
katiepeters Dec 3, 2024
b7cf33c
Add ExportTabularData to testutils #DATA-3441
katiepeters Dec 3, 2024
95d1e80
Re-create 'actions' for easier testing #DATA-3441
katiepeters Dec 3, 2024
359d405
Start updating tabular client tests #DATA-3441
katiepeters Dec 3, 2024
54d4ce6
Merge with main; resolve conflicts #DATA-3441
katiepeters Dec 5, 2024
12384c1
Update API version #DATA-3441
katiepeters Dec 5, 2024
78ecee1
Finish updating tests #DATA-3441
katiepeters Dec 5, 2024
e86d4bb
Update data service client to match go sdk PR #DATA-3441
katiepeters Dec 5, 2024
e95e161
Remove file if program exits early #DATA-3441
katiepeters Dec 5, 2024
98e5621
Split up tabularData func into smaller functions #DATA-3441
katiepeters Dec 5, 2024
404ab76
Remove stream close (not needed) #DATA-3441
katiepeters Dec 5, 2024
e751eeb
Adjust the logic #DATA-3441
katiepeters Dec 6, 2024
a5f6732
Merge with main and resolve conflicts #DATA-3441
katiepeters Dec 10, 2024
9db0b4d
Create separate success/error cases #DATA-3441
katiepeters Dec 10, 2024
f26d47b
Remove timeout flag #DATA-3441
katiepeters Dec 10, 2024
e6f4df6
Move newline creation #DATA-3441
katiepeters Dec 10, 2024
592df9b
Update tests to account for newline #DATA-3441
katiepeters Dec 10, 2024
0f79105
Bump api version #DATA-3441
katiepeters Dec 10, 2024
ca98423
Lint/small tweaks #DATA-3441
katiepeters Dec 10, 2024
1d40a29
Merge with main and resolve conflicts #DATA-3441
katiepeters Dec 13, 2024
31f27df
Add correct args #DATA-3441
katiepeters Dec 13, 2024
2ce78bd
Remove shouldNotBeNil tests #DATA-3441
katiepeters Dec 13, 2024
0967465
Fix usage text #DATA-3441
katiepeters Dec 13, 2024
bde1971
Lint #DATA-3441
katiepeters Dec 13, 2024
2989eec
Remove nil check #DATA-3441
katiepeters Dec 13, 2024
8e5a816
Correct flag typo #DATA-3441
katiepeters Dec 16, 2024
542886f
Pull out capture interval logic #DATA-3441
katiepeters Dec 16, 2024
283c86a
Use size instead of number of writes for flushing #DATA-3441
katiepeters Dec 16, 2024
d415a79
Improve messaging around timestamps #DATA-3441
katiepeters Dec 16, 2024
fa26414
Remove unnecessary uint conversion DATA-3441
katiepeters Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/data_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func TestDataClient(t *testing.T) {
//nolint:deprecated,staticcheck
grpcClient.TabularDataByFilterFunc = func(ctx context.Context, in *pb.TabularDataByFilterRequest,
opts ...grpc.CallOption,
//nolint:deprecated,staticcheck
) (*pb.TabularDataByFilterResponse, error) {
test.That(t, in.DataRequest, test.ShouldResemble, dataRequestToProto(dataRequest))
test.That(t, in.CountOnly, test.ShouldBeTrue)
Expand Down
119 changes: 81 additions & 38 deletions cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,13 @@ const (
dataFlagAliasRobotName = "robot-name"
dataFlagPartName = "part-name"
dataFlagComponentType = "component-type"
dataFlagResourceSubtype = "resource-subtype"
dataFlagComponentName = "component-name"
dataFlagResourceName = "resource-name"
dataFlagMethod = "method"
dataFlagMimeTypes = "mime-types"
dataFlagStart = "start"
dataFlagEnd = "end"
dataFlagChunkLimit = "chunk-limit"
dataFlagParallelDownloads = "parallel"
dataFlagTags = "tags"
dataFlagBboxLabels = "bbox-labels"
Expand Down Expand Up @@ -187,11 +188,11 @@ var commonFilterFlags = []cli.Flag{
},
&cli.StringFlag{
Name: dataFlagStart,
Usage: "ISO-8601 timestamp indicating the start of the interval filter",
Usage: "ISO-8601 timestamp in RFC3339 format indicating the start of the interval filter",
},
&cli.StringFlag{
Name: dataFlagEnd,
Usage: "ISO-8601 timestamp indicating the end of the interval filter",
Usage: "ISO-8601 timestamp in RFC3339 format indicating the end of the interval filter",
},
&cli.StringSliceFlag{
Name: dataFlagBboxLabels,
Expand Down Expand Up @@ -605,42 +606,84 @@ var app = &cli.App{
HideHelpCommand: true,
Subcommands: []*cli.Command{
{
Name: "export",
Usage: "download data from Viam cloud",
UsageText: createUsageText("data export", []string{dataFlagDestination, dataFlagDataType}, true),
Flags: append([]cli.Flag{
&cli.PathFlag{
Name: dataFlagDestination,
Required: true,
Usage: "output directory for downloaded data",
},
&cli.UintFlag{
Name: dataFlagChunkLimit,
Usage: "maximum number of results per download request (tabular data only)",
Value: 100000,
},
&cli.UintFlag{
Name: dataFlagParallelDownloads,
Usage: "number of download requests to make in parallel (binary data only)",
Value: 100,
},
&cli.StringSliceFlag{
Name: dataFlagTags,
Usage: "tags filter. " +
"accepts tagged for all tagged data, untagged for all untagged data, or a list of tags for all data matching any of the tags",
},
&cli.StringFlag{
Name: dataFlagDataType,
Usage: "type of data to download. can be binary or tabular",
Name: "export",
Usage: "download data from Viam cloud",
Subcommands: []*cli.Command{
{
Name: "binary",
Usage: "download binary data",
UsageText: createUsageText("data export binary", []string{dataFlagDestination}, true),
Flags: append([]cli.Flag{
&cli.PathFlag{
Name: dataFlagDestination,
Required: true,
Usage: "output directory for downloaded data",
},
&cli.UintFlag{
Name: dataFlagParallelDownloads,
Usage: "number of download requests to make in parallel",
Value: 100,
},
&cli.UintFlag{
Name: dataFlagTimeout,
Usage: "number of seconds to wait for large file downloads",
Value: 30,
},
&cli.StringSliceFlag{
Name: dataFlagTags,
Usage: "tags filter. accepts tagged for all tagged data, untagged for all untagged data, or a list of tags",
},
}, commonFilterFlags...),
Action: createCommandWithT[dataExportBinaryArgs](DataExportBinaryAction),
},
&cli.UintFlag{
Name: dataFlagTimeout,
Usage: "number of seconds to wait for large file downloads",
Value: 30,
{
Name: "tabular",
Usage: "download tabular data",
UsageText: createUsageText("data export tabular", []string{
dataFlagDestination,
dataFlagPartID,
dataFlagResourceName,
dataFlagResourceSubtype,
dataFlagMethod,
}, true),
Flags: []cli.Flag{
&cli.PathFlag{
Name: dataFlagDestination,
Required: true,
Usage: "output directory for downloaded data",
},
&cli.StringFlag{
Name: dataFlagPartID,
Required: true,
Usage: "part id",
},
&cli.StringFlag{
Name: dataFlagResourceName,
Required: true,
Usage: "resource name (sometimes called 'component name')",
},
&cli.StringFlag{
Name: dataFlagResourceSubtype,
Required: true,
Usage: "resource subtype (sometimes called 'component type')",
},
&cli.StringFlag{
Name: dataFlagMethod,
Required: true,
Usage: "method name",
},
&cli.StringFlag{
Name: "start",
Usage: "ISO-8601 timestamp in RFC3339 format indicating the start of the interval",
},
&cli.StringFlag{
Name: "end",
Usage: "ISO-8601 timestamp in RFC3339 format indicating the end of the interval",
},
},
Action: createCommandWithT[dataExportTabularArgs](DataExportTabularAction),
},
},
commonFilterFlags...),
Action: createCommandWithT[dataExportArgs](DataExportAction),
},
{
Name: "delete",
Expand All @@ -660,12 +703,12 @@ var app = &cli.App{
&cli.StringFlag{
Name: dataFlagStart,
Required: true,
Usage: "ISO-8601 timestamp indicating the start of the interval filter",
Usage: "ISO-8601 timestamp in RFC3339 format indicating the start of the interval filter",
},
&cli.StringFlag{
Name: dataFlagEnd,
Required: true,
Usage: "ISO-8601 timestamp indicating the end of the interval filter",
Usage: "ISO-8601 timestamp in RFC3339 format indicating the end of the interval filter",
},
&cli.StringSliceFlag{
Name: dataFlagLocationIDs,
Expand Down
167 changes: 108 additions & 59 deletions cli/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package cli

import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"io/fs"
"maps"
"os"
Expand Down Expand Up @@ -103,7 +105,6 @@ func setup(

if dataClient != nil {
// these flags are only relevant when testing a dataClient
flags.String(dataFlagDataType, dataTypeTabular, "")
flags.String(dataFlagDestination, utils.ResolveFile(""), "")
}

Expand Down Expand Up @@ -364,76 +365,124 @@ func TestUpdateBillingServiceAction(t *testing.T) {
test.That(t, out.messages[7], test.ShouldContainSubstring, "USA")
}

func TestTabularDataByFilterAction(t *testing.T) {
pbStruct, err := protoutils.StructToStructPb(map[string]interface{}{"bool": true, "string": "true", "float": float64(1)})
test.That(t, err, test.ShouldBeNil)
type mockDataServiceClient struct {
grpc.ClientStream
responses []*datapb.ExportTabularDataResponse
index int
err error
}

// calls to `TabularDataByFilter` will repeat so long as data continue to be returned,
// so we need a way of telling our injected method when data has already been sent so we
// can send an empty response
var dataRequested bool
//nolint:deprecated,staticcheck
tabularDataByFilterFunc := func(ctx context.Context, in *datapb.TabularDataByFilterRequest, opts ...grpc.CallOption,
//nolint:deprecated
) (*datapb.TabularDataByFilterResponse, error) {
if dataRequested {
//nolint:deprecated,staticcheck
return &datapb.TabularDataByFilterResponse{}, nil
}
dataRequested = true
//nolint:deprecated,staticcheck
return &datapb.TabularDataByFilterResponse{
//nolint:deprecated,staticcheck
Data: []*datapb.TabularData{{Data: pbStruct}},
Metadata: []*datapb.CaptureMetadata{{LocationId: "loc-id"}},
}, nil
func (m *mockDataServiceClient) Recv() (*datapb.ExportTabularDataResponse, error) {
if m.err != nil {
return nil, m.err
}

dsc := &inject.DataServiceClient{
TabularDataByFilterFunc: tabularDataByFilterFunc,
if m.index >= len(m.responses) {
return nil, io.EOF
}

cCtx, ac, out, errOut := setup(&inject.AppServiceClient{}, dsc, nil, nil, nil, "token")
resp := m.responses[m.index]
m.index++

test.That(t, ac.dataExportAction(cCtx, parseStructFromCtx[dataExportArgs](cCtx)), test.ShouldBeNil)
test.That(t, len(errOut.messages), test.ShouldEqual, 0)
test.That(t, len(out.messages), test.ShouldEqual, 4)
test.That(t, out.messages[0], test.ShouldEqual, "Downloading..")
test.That(t, out.messages[1], test.ShouldEqual, ".")
test.That(t, out.messages[2], test.ShouldEqual, ".")
test.That(t, out.messages[3], test.ShouldEqual, "\n")

// expectedDataSize is the expected string length of the data returned by the injected call
expectedDataSize := 98
b := make([]byte, expectedDataSize)

// `data.ndjson` is the standardized name of the file data is written to in the `tabularData` call
filePath := utils.ResolveFile("data/data.ndjson")
file, err := os.Open(filePath)
test.That(t, err, test.ShouldBeNil)
return resp, nil
}

dataSize, err := file.Read(b)
test.That(t, err, test.ShouldBeNil)
test.That(t, dataSize, test.ShouldEqual, expectedDataSize)
func newMockExportStream(responses []*datapb.ExportTabularDataResponse, err error) *mockDataServiceClient {
return &mockDataServiceClient{
responses: responses,
err: err,
}
}

savedData := string(b)
expectedData := "{\"MetadataIndex\":0,\"TimeReceived\":null,\"TimeRequested\":null,\"bool\":true,\"float\":1,\"string\":\"true\"}"
test.That(t, savedData, test.ShouldEqual, expectedData)
func TestDataExportTabularAction(t *testing.T) {
t.Run("successful case", func(t *testing.T) {
pbStructPayload1, err := protoutils.StructToStructPb(map[string]interface{}{"bool": true, "string": "true", "float": float64(1)})
test.That(t, err, test.ShouldBeNil)

expectedMetadataSize := 23
b = make([]byte, expectedMetadataSize)
pbStructPayload2, err := protoutils.StructToStructPb(map[string]interface{}{"booly": false, "string": "true", "float": float64(1)})
test.That(t, err, test.ShouldBeNil)

// metadata is named `0.json` based on its index in the metadata array
filePath = utils.ResolveFile("metadata/0.json")
file, err = os.Open(filePath)
test.That(t, err, test.ShouldBeNil)
exportTabularDataFunc := func(ctx context.Context, in *datapb.ExportTabularDataRequest, opts ...grpc.CallOption,
) (datapb.DataService_ExportTabularDataClient, error) {
return newMockExportStream([]*datapb.ExportTabularDataResponse{
{LocationId: "loc-id", Payload: pbStructPayload1},
{LocationId: "loc-id", Payload: pbStructPayload2},
}, nil), nil
}

metadataSize, err := file.Read(b)
test.That(t, err, test.ShouldBeNil)
test.That(t, metadataSize, test.ShouldEqual, expectedMetadataSize)
dsc := &inject.DataServiceClient{
ExportTabularDataFunc: exportTabularDataFunc,
}

cCtx, ac, out, errOut := setup(&inject.AppServiceClient{}, dsc, nil, nil, nil, "token")

test.That(t, ac.dataExportTabularAction(cCtx, parseStructFromCtx[dataExportTabularArgs](cCtx)), test.ShouldBeNil)
test.That(t, len(errOut.messages), test.ShouldEqual, 0)
test.That(t, len(out.messages), test.ShouldEqual, 3)
test.That(t, strings.Join(out.messages, ""), test.ShouldEqual, "Downloading...\n")

filePath := utils.ResolveFile(dataFileName)

data, err := os.ReadFile(filePath)
test.That(t, err, test.ShouldBeNil)

// Output is unstable, so parse back into maps before comparing to expected.
var actual []map[string]interface{}
decoder := json.NewDecoder(strings.NewReader(string(data)))
for decoder.More() {
var item map[string]interface{}
err = decoder.Decode(&item)
test.That(t, err, test.ShouldBeNil)
actual = append(actual, item)
}

savedMetadata := string(b)
test.That(t, savedMetadata, test.ShouldEqual, "{\"locationId\":\"loc-id\"}")
expectedData := []map[string]interface{}{
{
"locationId": "loc-id",
"payload": map[string]interface{}{
"bool": true,
"float": float64(1),
"string": "true",
},
},
{
"locationId": "loc-id",
"payload": map[string]interface{}{
"booly": false,
"float": float64(1),
"string": "true",
},
},
}

test.That(t, actual, test.ShouldResemble, expectedData)
})

t.Run("error case", func(t *testing.T) {
exportTabularDataFunc := func(ctx context.Context, in *datapb.ExportTabularDataRequest, opts ...grpc.CallOption,
) (datapb.DataService_ExportTabularDataClient, error) {
return newMockExportStream([]*datapb.ExportTabularDataResponse{}, errors.New("whoops")), nil
}

dsc := &inject.DataServiceClient{
ExportTabularDataFunc: exportTabularDataFunc,
}

cCtx, ac, out, errOut := setup(&inject.AppServiceClient{}, dsc, nil, nil, nil, "token")

err := ac.dataExportTabularAction(cCtx, parseStructFromCtx[dataExportTabularArgs](cCtx))
test.That(t, err, test.ShouldBeError, errors.New("error receiving tabular data: whoops"))
test.That(t, len(errOut.messages), test.ShouldEqual, 0)

// Test that export was retried (total of 5 tries).
test.That(t, len(out.messages), test.ShouldEqual, 7)
test.That(t, strings.Join(out.messages, ""), test.ShouldEqual, "Downloading.......\n")

// Test that the data.ndjson file was removed.
filePath := utils.ResolveFile(dataFileName)
_, err = os.ReadFile(filePath)
test.That(t, err, test.ShouldBeError, fmt.Errorf("open %s: no such file or directory", filePath))
vijayvuyyuru marked this conversation as resolved.
Show resolved Hide resolved
})
}

func TestBaseURLParsing(t *testing.T) {
Expand Down
Loading
Loading