Skip to content

Commit

Permalink
Merge pull request #1756 from keboola/petr-hosek-PSGO-593_script
Browse files Browse the repository at this point in the history
Migration script
  • Loading branch information
jachym-tousek-keboola authored Jul 22, 2024
2 parents fa90ddc + eb2591e commit f6a4b6f
Show file tree
Hide file tree
Showing 15 changed files with 406 additions and 10 deletions.
2 changes: 1 addition & 1 deletion api/templates/design.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (

// API definition ------------------------------------------------------------------------------------------------------

// nolint: gochecknoinits
//nolint:gochecknoinits
func init() {
dependenciesType := func(method *service.MethodData) string {
if dependencies.HasSecurityScheme("APIKey", method) {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/service/cli/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func DefaultRootFlags() RootFlags {
return RootFlags{}
}

// nolint: gochecknoinits
//nolint:gochecknoinits
func init() {
// Disable commands auto-sorting
cobra.EnableCommandSorting = false
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/service/cli/prompt/interactive/interactive.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Prompt struct {
editor string // the editor is started when Editor() is called, if empty, the default is system editor is used
}

// nolint: gochecknoinits
//nolint:gochecknoinits
func init() {
// Workaround for bug in 3rd party lib
// https://github.com/AlecAivazis/survey/issues/336
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
openapiv3 "goa.design/goa/v3/http/codegen/openapi/v3"
)

// nolint: gochecknoinits
//nolint:gochecknoinits
func init() {
codegen.RegisterPluginLast("any-type", "gen", nil, generate)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/service/common/goaextension/errormsg/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/utils/strhelper"
)

// nolint: gochecknoinits
//nolint:gochecknoinits
func init() {
codegen.RegisterPluginFirst("errormsg", "gen", prepare, generate)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
metaVisitedAttrValue = "true"
)

// nolint: gochecknoinits
//nolint:gochecknoinits
func init() {
codegen.RegisterPluginFirst("001-example-object", "gen", prepare, generate)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"goa.design/goa/v3/eval"
)

// nolint: gochecknoinits
//nolint:gochecknoinits
func init() {
codegen.RegisterPluginFirst("genericerror", "gen", nil, generate)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/service/common/goaextension/oneof/oneof.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
oneOfFieldName = "oneOf"
)

// nolint: gochecknoinits
//nolint:gochecknoinits
func init() {
codegen.RegisterPluginLast("one-of", "gen", nil, generate)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
openapiv3 "goa.design/goa/v3/http/codegen/openapi/v3"
)

// nolint: gochecknoinits
//nolint:gochecknoinits
func init() {
codegen.RegisterPluginLast("operation-id", "gen", nil, generate)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/service/common/goaextension/token/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
openapiv3 "goa.design/goa/v3/http/codegen/openapi/v3"
)

// nolint: gochecknoinits
//nolint:gochecknoinits
func init() {
codegen.RegisterPluginFirst("storage-api-token", "gen", nil, generate)
}
Expand Down
100 changes: 100 additions & 0 deletions internal/pkg/service/stream/migrate/core/sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package core

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"

"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/migrate/httperror"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/migrate/request"
)

type SinkPayload struct {
SinkID string `json:"sinkId"`
Type string `json:"type"`
Name string `json:"name"`
Description string `json:"description"`
Table Table `json:"table"`
}

type SinkMapping struct {
Columns []Column `json:"columns"`
}

type Table struct {
Type string `json:"type"`
TableID string `json:"tableId"`
Mapping SinkMapping `json:"mapping"`
}

func (e Export) CreateSink(ctx context.Context, token string, host string) error {
body, err := e.createSinkPayload()
if err != nil {
return err
}

path := fmt.Sprintf(sourcesPath+"/%s/sinks", e.ReceiverID)

// Request to create sink
resp, err := request.New(
http.MethodPost,
substituteHost(host, bufferPrefix, streamPrefix),
token,
path,
body).
NewHTTPRequest(ctx)
if err != nil {
return err
}

defer resp.Body.Close()

if resp.StatusCode != http.StatusAccepted {
return httperror.Parser(resp.Body)
}

return nil
}

func (e Export) createSinkPayload() (*bytes.Buffer, error) {
sinkPayload := &SinkPayload{
SinkID: key.SinkID(e.ID).String(),
Type: definition.SinkTypeTable.String(),
Name: e.Name,
Table: Table{
Type: definition.TableTypeKeboola.String(),
TableID: e.Mapping.TableID,
},
}

for _, m := range e.Mapping.Columns {
var column Column
if m.Template != nil {
column.Template = &Template{
Language: m.Template.Language,
Content: m.Template.Content,
}
}
column.Name = m.Name
column.PrimaryKey = m.PrimaryKey

column.Type = m.Type
if m.Type == "id" {
column.Type = "uuid"
}

sinkPayload.Table.Mapping.Columns = append(sinkPayload.Table.Mapping.Columns, column)
}

payloadBuf := new(bytes.Buffer)
err := json.NewEncoder(payloadBuf).Encode(sinkPayload)
if err != nil {
return nil, err
}

return payloadBuf, nil
}
140 changes: 140 additions & 0 deletions internal/pkg/service/stream/migrate/core/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package core

import (
"bytes"
"context"
"encoding/json"
"net/http"
"strings"

"github.com/keboola/keboola-as-code/internal/pkg/service/common/ptr"
api "github.com/keboola/keboola-as-code/internal/pkg/service/stream/api/gen/stream"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/migrate/httperror"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/migrate/request"
)

const (
sourcesPath = "/v1/branches/default/sources"
receiversBufferPath = "/v1/receivers"
bufferPrefix = "buffer"
streamPrefix = "stream"
)

type Receivers struct {
Receivers []Receiver `json:"receivers"`
}
type Template struct {
Language string `json:"language"`
Content string `json:"content"`
}
type Column struct {
Type string `json:"type"`
Name string `json:"name"`
PrimaryKey bool `json:"primaryKey,omitempty"`
Template *Template `json:"template,omitempty"`
}
type Mapping struct {
TableID string `json:"tableId"`
Incremental bool `json:"incremental"`
Columns []Column `json:"columns"`
}
type Conditions struct {
Count int `json:"count"`
Size string `json:"size"`
Time string `json:"time"`
}
type Export struct {
ID string `json:"id"`
ReceiverID string `json:"receiverId"`
Name string `json:"name"`
Mapping Mapping `json:"mapping"`
Conditions Conditions `json:"conditions"`
}
type Receiver struct {
ID string `json:"id"`
URL string `json:"url"`
Name string `json:"name"`
Description string `json:"description"`
Exports []Export `json:"exports"`
}

func FetchBufferReceivers(ctx context.Context, host string, token string) (*Receivers, error) {
return fetchDataFromBuffer(ctx, request.New(
"GET",
host,
token,
receiversBufferPath,
nil,
))
}

func fetchDataFromBuffer(ctx context.Context, reqConfig request.Config) (*Receivers, error) {
resp, err := reqConfig.NewHTTPRequest(ctx)
if err != nil {
return nil, err
}

defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, httperror.Parser(resp.Body)
}

var result *Receivers

err = json.NewDecoder(resp.Body).Decode(&result)
if err != nil {
return nil, err
}

return result, nil
}

func (r *Receiver) CreateSource(ctx context.Context, token string, host string) error {
// Set a payload to create source
body, err := r.createSourcePayload()
if err != nil {
return err
}

// Request to create source
resp, err := request.New(
http.MethodPost,
substituteHost(host, bufferPrefix, streamPrefix),
token,
sourcesPath,
body).
NewHTTPRequest(ctx)
if err != nil {
return err
}

defer resp.Body.Close()

if resp.StatusCode != http.StatusAccepted {
return httperror.Parser(resp.Body)
}

return nil
}

func (r *Receiver) createSourcePayload() (*bytes.Buffer, error) {
s := api.CreateSourcePayload{
SourceID: ptr.Ptr(key.SourceID(r.ID)),
Type: "http",
Name: r.Name,
Description: ptr.Ptr(r.Description),
}

payloadBuf := new(bytes.Buffer)
err := json.NewEncoder(payloadBuf).Encode(s)
if err != nil {
return nil, err
}
return payloadBuf, nil
}

func substituteHost(host, bufferPrefix, streamPrefix string) string {
return strings.Replace(host, bufferPrefix, streamPrefix, 1)
}
40 changes: 40 additions & 0 deletions internal/pkg/service/stream/migrate/httperror/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package httperror

import (
"encoding/json"
"io"

"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)

type HTTPError struct {
StatusCode int `json:"statusCode"`
HTTPError string `json:"error"`
Message string `json:"message"`
}

func (e *HTTPError) Error() string {
return ""
}

func Parser(body io.Reader) error {
// Read the response body into bytes
bodyBytes, err := io.ReadAll(body)
if err != nil {
return err
}

// Parse the JSON response into HTTPError struct
var errStruct map[string]interface{}
if err := json.Unmarshal(bodyBytes, &errStruct); err != nil {
return errors.New(string(bodyBytes))
}

// Marshal the error struct into JSON
jsonErr, err := json.Marshal(&errStruct)
if err != nil {
return err
}

return errors.New(string(jsonErr))
}
Loading

0 comments on commit f6a4b6f

Please sign in to comment.