From 35882104b10ac41c75c5e64ddeb6a1e0ccfa0e6e Mon Sep 17 00:00:00 2001 From: Petr Hosek Date: Thu, 20 Jun 2024 09:55:41 +0200 Subject: [PATCH 1/7] fix: removed whitespaces in nolint comment for gochecknoinits --- api/templates/design.go | 2 +- internal/pkg/service/cli/cmd/cmd.go | 2 +- internal/pkg/service/cli/prompt/interactive/interactive.go | 2 +- internal/pkg/service/common/goaextension/anytype/anytype.go | 2 +- internal/pkg/service/common/goaextension/errormsg/error.go | 2 +- internal/pkg/service/common/goaextension/example/example.go | 2 +- internal/pkg/service/common/goaextension/genericerror/error.go | 2 +- internal/pkg/service/common/goaextension/oneof/oneof.go | 2 +- .../pkg/service/common/goaextension/operationid/operationid.go | 2 +- internal/pkg/service/common/goaextension/token/token.go | 2 +- 10 files changed, 10 insertions(+), 10 deletions(-) diff --git a/api/templates/design.go b/api/templates/design.go index 220677f6bf..b0950d3d6e 100644 --- a/api/templates/design.go +++ b/api/templates/design.go @@ -33,7 +33,7 @@ const ( // API definition ------------------------------------------------------------------------------------------------------ -// nolint: gochecknoinits +//nolint:gochecknoinits func init() { dependenciesType := func(method *service.MethodData) string { if dependencies.HasSecurityScheme("APIKey", method) { diff --git a/internal/pkg/service/cli/cmd/cmd.go b/internal/pkg/service/cli/cmd/cmd.go index 6d5d2c9457..b20ee52129 100644 --- a/internal/pkg/service/cli/cmd/cmd.go +++ b/internal/pkg/service/cli/cmd/cmd.go @@ -49,7 +49,7 @@ func DefaultRootFlags() RootFlags { return RootFlags{} } -// nolint: gochecknoinits +//nolint:gochecknoinits func init() { // Disable commands auto-sorting cobra.EnableCommandSorting = false diff --git a/internal/pkg/service/cli/prompt/interactive/interactive.go b/internal/pkg/service/cli/prompt/interactive/interactive.go index dc49ad06b9..1c6051abca 100644 --- a/internal/pkg/service/cli/prompt/interactive/interactive.go +++ b/internal/pkg/service/cli/prompt/interactive/interactive.go @@ -21,7 +21,7 @@ type Prompt struct { editor string // the editor is started when Editor() is called, if empty, the default is system editor is used } -// nolint: gochecknoinits +//nolint:gochecknoinits func init() { // Workaround for bug in 3rd party lib // https://github.com/AlecAivazis/survey/issues/336 diff --git a/internal/pkg/service/common/goaextension/anytype/anytype.go b/internal/pkg/service/common/goaextension/anytype/anytype.go index ad5ecc2a06..e88d41f6ec 100644 --- a/internal/pkg/service/common/goaextension/anytype/anytype.go +++ b/internal/pkg/service/common/goaextension/anytype/anytype.go @@ -13,7 +13,7 @@ import ( openapiv3 "goa.design/goa/v3/http/codegen/openapi/v3" ) -// nolint: gochecknoinits +//nolint:gochecknoinits func init() { codegen.RegisterPluginLast("any-type", "gen", nil, generate) } diff --git a/internal/pkg/service/common/goaextension/errormsg/error.go b/internal/pkg/service/common/goaextension/errormsg/error.go index 8126e42959..7695c3b30e 100644 --- a/internal/pkg/service/common/goaextension/errormsg/error.go +++ b/internal/pkg/service/common/goaextension/errormsg/error.go @@ -15,7 +15,7 @@ import ( "github.com/keboola/keboola-as-code/internal/pkg/utils/strhelper" ) -// nolint: gochecknoinits +//nolint:gochecknoinits func init() { codegen.RegisterPluginFirst("errormsg", "gen", prepare, generate) } diff --git a/internal/pkg/service/common/goaextension/example/example.go b/internal/pkg/service/common/goaextension/example/example.go index 8b243b8c52..34f86faa4a 100644 --- a/internal/pkg/service/common/goaextension/example/example.go +++ b/internal/pkg/service/common/goaextension/example/example.go @@ -23,7 +23,7 @@ const ( metaVisitedAttrValue = "true" ) -// nolint: gochecknoinits +//nolint:gochecknoinits func init() { codegen.RegisterPluginFirst("001-example-object", "gen", prepare, generate) } diff --git a/internal/pkg/service/common/goaextension/genericerror/error.go b/internal/pkg/service/common/goaextension/genericerror/error.go index fdfbc27363..27491f7d15 100644 --- a/internal/pkg/service/common/goaextension/genericerror/error.go +++ b/internal/pkg/service/common/goaextension/genericerror/error.go @@ -9,7 +9,7 @@ import ( "goa.design/goa/v3/eval" ) -// nolint: gochecknoinits +//nolint:gochecknoinits func init() { codegen.RegisterPluginFirst("genericerror", "gen", nil, generate) } diff --git a/internal/pkg/service/common/goaextension/oneof/oneof.go b/internal/pkg/service/common/goaextension/oneof/oneof.go index 452e73f7de..1cc50a0694 100644 --- a/internal/pkg/service/common/goaextension/oneof/oneof.go +++ b/internal/pkg/service/common/goaextension/oneof/oneof.go @@ -30,7 +30,7 @@ const ( oneOfFieldName = "oneOf" ) -// nolint: gochecknoinits +//nolint:gochecknoinits func init() { codegen.RegisterPluginLast("one-of", "gen", nil, generate) } diff --git a/internal/pkg/service/common/goaextension/operationid/operationid.go b/internal/pkg/service/common/goaextension/operationid/operationid.go index 04ae182dae..44a255eaee 100644 --- a/internal/pkg/service/common/goaextension/operationid/operationid.go +++ b/internal/pkg/service/common/goaextension/operationid/operationid.go @@ -12,7 +12,7 @@ import ( openapiv3 "goa.design/goa/v3/http/codegen/openapi/v3" ) -// nolint: gochecknoinits +//nolint:gochecknoinits func init() { codegen.RegisterPluginLast("operation-id", "gen", nil, generate) } diff --git a/internal/pkg/service/common/goaextension/token/token.go b/internal/pkg/service/common/goaextension/token/token.go index 9ae180c83f..507b44075b 100644 --- a/internal/pkg/service/common/goaextension/token/token.go +++ b/internal/pkg/service/common/goaextension/token/token.go @@ -13,7 +13,7 @@ import ( openapiv3 "goa.design/goa/v3/http/codegen/openapi/v3" ) -// nolint: gochecknoinits +//nolint:gochecknoinits func init() { codegen.RegisterPluginFirst("storage-api-token", "gen", nil, generate) } From e7c9c3f574ad2a95de76a19274639eeb22d5161f Mon Sep 17 00:00:00 2001 From: Petr Hosek Date: Thu, 27 Jun 2024 12:00:27 +0200 Subject: [PATCH 2/7] feat: migrate receivers to sources --- .../pkg/service/stream/migrate/migrate.go | 59 ++++++ .../service/stream/migrate/source/source.go | 178 ++++++++++++++++++ 2 files changed, 237 insertions(+) create mode 100644 internal/pkg/service/stream/migrate/migrate.go create mode 100644 internal/pkg/service/stream/migrate/source/source.go diff --git a/internal/pkg/service/stream/migrate/migrate.go b/internal/pkg/service/stream/migrate/migrate.go new file mode 100644 index 0000000000..c63343860b --- /dev/null +++ b/internal/pkg/service/stream/migrate/migrate.go @@ -0,0 +1,59 @@ +package main + +import ( + "context" + "flag" + "os" + + "github.com/keboola/keboola-as-code/internal/pkg/log" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/migrate/source" + "github.com/keboola/keboola-as-code/internal/pkg/utils/errors" +) + +type Flags struct { + storageAPIToken string + host string +} + +func NewFlags() Flags { + storageAPIToken := flag.String("storage-api-token", "", "storage api token") + host := flag.String("host", "", "host") + + flag.Parse() + + return Flags{*storageAPIToken, *host} +} + +func main() { + if err := run(); err != nil { + os.Exit(1) + } +} + +func run() error { + ctx := context.Background() + logger := log.NewServiceLogger(os.Stdout, true) + + flags := NewFlags() + if flags.host == "" && flags.storageAPIToken == "" { + return errors.New("host/storageAPIToken are required") + } + + logger.Info(ctx, "Starting migration...") + // fetch receivers and exports from old API + bufferReceivers, err := source.FetchBufferReceivers(ctx, flags.host, flags.storageAPIToken) + if err != nil { + logger.Error(ctx, err.Error()) + return err + } + + for _, receiver := range bufferReceivers.Receivers { + err = receiver.CreateSource(ctx, flags.storageAPIToken, flags.host) + if err != nil { + logger.Error(ctx, err.Error()) + } + + } + logger.Info(ctx, "Migration done") + return nil +} diff --git a/internal/pkg/service/stream/migrate/source/source.go b/internal/pkg/service/stream/migrate/source/source.go new file mode 100644 index 0000000000..307e2fbc5e --- /dev/null +++ b/internal/pkg/service/stream/migrate/source/source.go @@ -0,0 +1,178 @@ +package source + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + + "github.com/keboola/keboola-as-code/internal/pkg/service/common/ptr" + api "github.com/keboola/keboola-as-code/internal/pkg/service/stream/api/gen/stream" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/migrate/httperror" +) + +const ( + createSourcePath = "/v1/branches/default/sources" + receiversBufferPath = "/v1/receivers" + bufferPrefix = "buffer" + streamPrefix = "stream" +) + +type Receivers struct { + Receivers []Receiver `json:"receivers"` +} +type Template struct { + Language string `json:"language"` + Content string `json:"content"` +} +type Columns struct { + PrimaryKey bool `json:"primaryKey"` + Type string `json:"type"` + Name string `json:"name"` + Template Template `json:"template,omitempty"` +} +type Mapping struct { + TableID string `json:"tableId"` + Incremental bool `json:"incremental"` + Columns []Columns `json:"columns"` +} +type Conditions struct { + Count int `json:"count"` + Size string `json:"size"` + Time string `json:"time"` +} +type Exports struct { + ID string `json:"id"` + ReceiverID string `json:"receiverId"` + Name string `json:"name"` + Mapping Mapping `json:"mapping"` + Conditions Conditions `json:"conditions"` +} +type Receiver struct { + ID string `json:"id"` + URL string `json:"url"` + Name string `json:"name"` + Description string `json:"description"` + Exports []Exports `json:"exports"` +} + +type RequestConfig struct { + method string + host string + token string + path string + body io.Reader +} + +func New(method string, host string, token string, path string, body io.Reader) RequestConfig { + return RequestConfig{ + method: method, + host: host, + token: token, + body: body, + path: path, + } +} + +func FetchBufferReceivers(ctx context.Context, host string, token string) (*Receivers, error) { + return fetchDataFromBuffer(ctx, New( + "GET", + host, + token, + receiversBufferPath, + nil, + )) +} + +func fetchDataFromBuffer(ctx context.Context, reqConfig RequestConfig) (*Receivers, error) { + resp, err := newHTTPRequest(ctx, reqConfig) + if err != nil { + return nil, err + } + + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, httperror.Parser(resp.Body) + } + + var result *Receivers + + err = json.NewDecoder(resp.Body).Decode(&result) + if err != nil { + return nil, err + } + + return result, nil +} + +func (r *Receiver) CreateSource(ctx context.Context, token string, host string) error { + // Set a payload to create source + body, err := r.createSourcePayload() + if err != nil { + return err + } + + // Request to create source + resp, err := newHTTPRequest(ctx, RequestConfig{ + token: token, + method: "POST", + path: createSourcePath, + host: substituteHost(host, bufferPrefix, streamPrefix), + body: body, + }) + if err != nil { + return err + } + + defer resp.Body.Close() + + if resp.StatusCode != http.StatusAccepted { + return httperror.Parser(resp.Body) + } + + return nil +} + +func newHTTPRequest(ctx context.Context, c RequestConfig) (*http.Response, error) { + url := fmt.Sprintf("https://%s%s", c.host, c.path) + + request, err := http.NewRequestWithContext(ctx, c.method, url, c.body) + if err != nil { + return nil, err + } + + request.Header.Add("X-StorageAPI-Token", c.token) + request.Header.Add("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(request) + if err != nil { + return nil, err + } + + return resp, nil +} + +func (r *Receiver) createSourcePayload() (*bytes.Buffer, error) { + s := api.CreateSourcePayload{ + SourceID: ptr.Ptr(key.SourceID(r.ID)), + Type: "http", + Name: r.Name, + Description: ptr.Ptr(r.Description), + } + + payloadBuf := new(bytes.Buffer) + err := json.NewEncoder(payloadBuf).Encode(s) + if err != nil { + return nil, err + } + return payloadBuf, nil +} + +func substituteHost(host, bufferPrefix, streamPrefix string) string { + return strings.Replace(host, bufferPrefix, streamPrefix, 1) +} From 1870a6075a4463854e42fe61e2355302018c06a0 Mon Sep 17 00:00:00 2001 From: Petr Hosek Date: Thu, 27 Jun 2024 12:00:53 +0200 Subject: [PATCH 3/7] feat: error parser --- .../service/stream/migrate/httperror/error.go | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 internal/pkg/service/stream/migrate/httperror/error.go diff --git a/internal/pkg/service/stream/migrate/httperror/error.go b/internal/pkg/service/stream/migrate/httperror/error.go new file mode 100644 index 0000000000..edb409feb0 --- /dev/null +++ b/internal/pkg/service/stream/migrate/httperror/error.go @@ -0,0 +1,40 @@ +package httperror + +import ( + "encoding/json" + "io" + + "github.com/keboola/keboola-as-code/internal/pkg/utils/errors" +) + +type HTTPError struct { + StatusCode int `json:"statusCode"` + HTTPError string `json:"error"` + Message string `json:"message"` +} + +func (e *HTTPError) Error() string { + return "" +} + +func Parser(body io.Reader) error { + // Read the response body into bytes + bodyBytes, err := io.ReadAll(body) + if err != nil { + return err + } + + // Parse the JSON response into HTTPError struct + var errStruct map[string]interface{} + if err := json.Unmarshal(bodyBytes, &errStruct); err != nil { + return errors.New(string(bodyBytes)) + } + + // Marshal the error struct into JSON + jsonErr, err := json.Marshal(&errStruct) + if err != nil { + return err + } + + return errors.New(string(jsonErr)) +} From b2fd539ba305edf3390e18cb97d2173fbef4c457 Mon Sep 17 00:00:00 2001 From: Petr Hosek Date: Fri, 10 May 2024 15:06:58 +0200 Subject: [PATCH 4/7] feat: added creating sink --- .../pkg/service/stream/migrate/core/sink.go | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 internal/pkg/service/stream/migrate/core/sink.go diff --git a/internal/pkg/service/stream/migrate/core/sink.go b/internal/pkg/service/stream/migrate/core/sink.go new file mode 100644 index 0000000000..cafba3c763 --- /dev/null +++ b/internal/pkg/service/stream/migrate/core/sink.go @@ -0,0 +1,96 @@ +package core + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/migrate/httperror" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/migrate/request" +) + +type SinkPayload struct { + SinkID string `json:"sinkId"` + Type string `json:"type"` + Name string `json:"name"` + Description string `json:"description"` + Table Table `json:"table"` +} + +type SinkMapping struct { + Columns []Column `json:"columns"` +} + +type Table struct { + Type string `json:"type"` + TableID string `json:"tableId"` + Mapping SinkMapping `json:"mapping"` +} + +func (e Export) CreateSink(ctx context.Context, token string, host string) error { + body, err := e.createSinkPayload() + if err != nil { + return err + } + + path := fmt.Sprintf(sourcesPath+"/%s/sinks", e.ReceiverID) + + // Request to create sink + resp, err := request.New( + http.MethodPost, + substituteHost(host, bufferPrefix, streamPrefix), + token, + path, + body). + NewHTTPRequest(ctx) + if err != nil { + return err + } + + defer resp.Body.Close() + + if resp.StatusCode != http.StatusAccepted { + return httperror.Parser(resp.Body) + } + + return nil +} + +func (e Export) createSinkPayload() (*bytes.Buffer, error) { + sinkPayload := &SinkPayload{ + SinkID: key.SinkID(e.ID).String(), + Type: definition.SinkTypeTable.String(), + Name: e.Name, + Table: Table{ + Type: definition.TableTypeKeboola.String(), + TableID: e.Mapping.TableID, + }, + } + + for _, m := range e.Mapping.Columns { + var column Column + if m.Template != nil { + column.Template = &Template{ + Language: m.Template.Language, + Content: m.Template.Content, + } + } + column.Name = m.Name + column.Type = m.Type + column.PrimaryKey = m.PrimaryKey + + sinkPayload.Table.Mapping.Columns = append(sinkPayload.Table.Mapping.Columns, column) + } + + payloadBuf := new(bytes.Buffer) + err := json.NewEncoder(payloadBuf).Encode(sinkPayload) + if err != nil { + return nil, err + } + + return payloadBuf, nil +} From 87e014a86625f3fdd01f883a49faafbb8e2faf68 Mon Sep 17 00:00:00 2001 From: Petr Hosek Date: Fri, 10 May 2024 15:08:11 +0200 Subject: [PATCH 5/7] refactor: modification of the structure --- .../stream/migrate/{source => core}/source.go | 92 ++++++------------- .../pkg/service/stream/migrate/migrate.go | 18 +++- .../service/stream/migrate/request/request.go | 45 +++++++++ 3 files changed, 87 insertions(+), 68 deletions(-) rename internal/pkg/service/stream/migrate/{source => core}/source.go (57%) create mode 100644 internal/pkg/service/stream/migrate/request/request.go diff --git a/internal/pkg/service/stream/migrate/source/source.go b/internal/pkg/service/stream/migrate/core/source.go similarity index 57% rename from internal/pkg/service/stream/migrate/source/source.go rename to internal/pkg/service/stream/migrate/core/source.go index 307e2fbc5e..618798dbd2 100644 --- a/internal/pkg/service/stream/migrate/source/source.go +++ b/internal/pkg/service/stream/migrate/core/source.go @@ -1,11 +1,9 @@ -package source +package core import ( "bytes" "context" "encoding/json" - "fmt" - "io" "net/http" "strings" @@ -13,10 +11,11 @@ import ( api "github.com/keboola/keboola-as-code/internal/pkg/service/stream/api/gen/stream" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/migrate/httperror" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/migrate/request" ) const ( - createSourcePath = "/v1/branches/default/sources" + sourcesPath = "/v1/branches/default/sources" receiversBufferPath = "/v1/receivers" bufferPrefix = "buffer" streamPrefix = "stream" @@ -29,23 +28,23 @@ type Template struct { Language string `json:"language"` Content string `json:"content"` } -type Columns struct { - PrimaryKey bool `json:"primaryKey"` - Type string `json:"type"` - Name string `json:"name"` - Template Template `json:"template,omitempty"` +type Column struct { + Type string `json:"type"` + Name string `json:"name"` + PrimaryKey bool `json:"primaryKey,omitempty"` + Template *Template `json:"template,omitempty"` } type Mapping struct { - TableID string `json:"tableId"` - Incremental bool `json:"incremental"` - Columns []Columns `json:"columns"` + TableID string `json:"tableId"` + Incremental bool `json:"incremental"` + Columns []Column `json:"columns"` } type Conditions struct { Count int `json:"count"` Size string `json:"size"` Time string `json:"time"` } -type Exports struct { +type Export struct { ID string `json:"id"` ReceiverID string `json:"receiverId"` Name string `json:"name"` @@ -53,33 +52,15 @@ type Exports struct { Conditions Conditions `json:"conditions"` } type Receiver struct { - ID string `json:"id"` - URL string `json:"url"` - Name string `json:"name"` - Description string `json:"description"` - Exports []Exports `json:"exports"` -} - -type RequestConfig struct { - method string - host string - token string - path string - body io.Reader -} - -func New(method string, host string, token string, path string, body io.Reader) RequestConfig { - return RequestConfig{ - method: method, - host: host, - token: token, - body: body, - path: path, - } + ID string `json:"id"` + URL string `json:"url"` + Name string `json:"name"` + Description string `json:"description"` + Exports []Export `json:"exports"` } func FetchBufferReceivers(ctx context.Context, host string, token string) (*Receivers, error) { - return fetchDataFromBuffer(ctx, New( + return fetchDataFromBuffer(ctx, request.New( "GET", host, token, @@ -88,8 +69,8 @@ func FetchBufferReceivers(ctx context.Context, host string, token string) (*Rece )) } -func fetchDataFromBuffer(ctx context.Context, reqConfig RequestConfig) (*Receivers, error) { - resp, err := newHTTPRequest(ctx, reqConfig) +func fetchDataFromBuffer(ctx context.Context, reqConfig request.Config) (*Receivers, error) { + resp, err := reqConfig.NewHTTPRequest(ctx) if err != nil { return nil, err } @@ -118,13 +99,13 @@ func (r *Receiver) CreateSource(ctx context.Context, token string, host string) } // Request to create source - resp, err := newHTTPRequest(ctx, RequestConfig{ - token: token, - method: "POST", - path: createSourcePath, - host: substituteHost(host, bufferPrefix, streamPrefix), - body: body, - }) + resp, err := request.New( + http.MethodPost, + substituteHost(host, bufferPrefix, streamPrefix), + token, + sourcesPath, + body). + NewHTTPRequest(ctx) if err != nil { return err } @@ -138,25 +119,6 @@ func (r *Receiver) CreateSource(ctx context.Context, token string, host string) return nil } -func newHTTPRequest(ctx context.Context, c RequestConfig) (*http.Response, error) { - url := fmt.Sprintf("https://%s%s", c.host, c.path) - - request, err := http.NewRequestWithContext(ctx, c.method, url, c.body) - if err != nil { - return nil, err - } - - request.Header.Add("X-StorageAPI-Token", c.token) - request.Header.Add("Content-Type", "application/json") - - resp, err := http.DefaultClient.Do(request) - if err != nil { - return nil, err - } - - return resp, nil -} - func (r *Receiver) createSourcePayload() (*bytes.Buffer, error) { s := api.CreateSourcePayload{ SourceID: ptr.Ptr(key.SourceID(r.ID)), diff --git a/internal/pkg/service/stream/migrate/migrate.go b/internal/pkg/service/stream/migrate/migrate.go index c63343860b..28942ac7ad 100644 --- a/internal/pkg/service/stream/migrate/migrate.go +++ b/internal/pkg/service/stream/migrate/migrate.go @@ -6,7 +6,7 @@ import ( "os" "github.com/keboola/keboola-as-code/internal/pkg/log" - "github.com/keboola/keboola-as-code/internal/pkg/service/stream/migrate/source" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/migrate/core" "github.com/keboola/keboola-as-code/internal/pkg/utils/errors" ) @@ -36,12 +36,13 @@ func run() error { flags := NewFlags() if flags.host == "" && flags.storageAPIToken == "" { - return errors.New("host/storageAPIToken are required") + logger.Error(ctx, "host/storage-api-token token is required") + return errors.New("host/storage-api-token is required") } logger.Info(ctx, "Starting migration...") // fetch receivers and exports from old API - bufferReceivers, err := source.FetchBufferReceivers(ctx, flags.host, flags.storageAPIToken) + bufferReceivers, err := core.FetchBufferReceivers(ctx, flags.host, flags.storageAPIToken) if err != nil { logger.Error(ctx, err.Error()) return err @@ -51,9 +52,20 @@ func run() error { err = receiver.CreateSource(ctx, flags.storageAPIToken, flags.host) if err != nil { logger.Error(ctx, err.Error()) + } else { + logger.Infof(ctx, `Source "%s" with id "%s" was created`, receiver.Name, receiver.ID) } + for _, export := range receiver.Exports { + if err = export.CreateSink(ctx, flags.storageAPIToken, flags.host); err != nil { + logger.Error(ctx, err.Error()) + } else { + logger.Infof(ctx, `Sink "%s" with id "%s" was created`, export.Name, export.ID) + } + } } + logger.Info(ctx, "Migration done") + return nil } diff --git a/internal/pkg/service/stream/migrate/request/request.go b/internal/pkg/service/stream/migrate/request/request.go new file mode 100644 index 0000000000..47d290114b --- /dev/null +++ b/internal/pkg/service/stream/migrate/request/request.go @@ -0,0 +1,45 @@ +package request + +import ( + "context" + "fmt" + "io" + "net/http" +) + +type Config struct { + Method string + Host string + Token string + Path string + Body io.Reader +} + +func New(method string, host string, token string, path string, body io.Reader) Config { + return Config{ + Method: method, + Host: host, + Token: token, + Body: body, + Path: path, + } +} + +func (c Config) NewHTTPRequest(ctx context.Context) (*http.Response, error) { + url := fmt.Sprintf("https://%s%s", c.Host, c.Path) + + request, err := http.NewRequestWithContext(ctx, c.Method, url, c.Body) + if err != nil { + return nil, err + } + + request.Header.Add("X-StorageAPI-Token", c.Token) + request.Header.Add("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(request) + if err != nil { + return nil, err + } + + return resp, nil +} From 2dcf8184c70afe1c24d0463c25148a2b4c4f8d8c Mon Sep 17 00:00:00 2001 From: Petr Hosek Date: Tue, 18 Jun 2024 13:24:27 +0200 Subject: [PATCH 6/7] fix: id to uuid-v7 --- internal/pkg/service/stream/migrate/core/sink.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/pkg/service/stream/migrate/core/sink.go b/internal/pkg/service/stream/migrate/core/sink.go index cafba3c763..9b278ef993 100644 --- a/internal/pkg/service/stream/migrate/core/sink.go +++ b/internal/pkg/service/stream/migrate/core/sink.go @@ -80,9 +80,13 @@ func (e Export) createSinkPayload() (*bytes.Buffer, error) { } } column.Name = m.Name - column.Type = m.Type column.PrimaryKey = m.PrimaryKey + column.Type = m.Type + if m.Type == "id" { + column.Type = "uuid-v7" + } + sinkPayload.Table.Mapping.Columns = append(sinkPayload.Table.Mapping.Columns, column) } From eb2591e66699ad9f33e711a4a0a0fa93f0a5c8b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Va=C5=A1ko?= Date: Fri, 19 Jul 2024 08:49:28 +0200 Subject: [PATCH 7/7] fix: Change uuid-7 -> uuid --- internal/pkg/service/stream/migrate/core/sink.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/service/stream/migrate/core/sink.go b/internal/pkg/service/stream/migrate/core/sink.go index 9b278ef993..5882527ff1 100644 --- a/internal/pkg/service/stream/migrate/core/sink.go +++ b/internal/pkg/service/stream/migrate/core/sink.go @@ -84,7 +84,7 @@ func (e Export) createSinkPayload() (*bytes.Buffer, error) { column.Type = m.Type if m.Type == "id" { - column.Type = "uuid-v7" + column.Type = "uuid" } sinkPayload.Table.Mapping.Columns = append(sinkPayload.Table.Mapping.Columns, column)