From b3f600066cf65a4be2d397fb8a383078c0c6e0fb Mon Sep 17 00:00:00 2001 From: kopaygorodsky Date: Sat, 29 May 2021 15:48:42 +0300 Subject: [PATCH] added lint job to the ci --- .circleci/config.yml | 4 + .golangci.yml | 8 +- log/default.go | 18 +++-- messagebus.go | 1 + pubsub/dispatcher/dispatcher.go | 9 ++- pubsub/dispatcher/dispatcher_test.go | 14 ++-- pubsub/endpoint/amqp.go | 11 ++- pubsub/endpoint/endpoint.go | 4 +- pubsub/endpoint/router.go | 7 +- pubsub/errors/errors.go | 1 + pubsub/message/decoder.go | 10 +-- pubsub/message/decoder_test.go | 22 +++--- pubsub/message/execution/context.go | 1 + pubsub/message/message.go | 24 +++--- pubsub/message/unstrucutred.go | 1 + pubsub/subscriber/pool.go | 4 - pubsub/transport/mock.go | 9 ++- pubsub/transport/pkg/pkg.go | 3 +- pubsub/transport/plugins/amqp/amqp.go | 2 +- pubsub/transport/plugins/amqp/connection.go | 5 +- pubsub/transport/transport.go | 1 + runtime/scheme/gv.go | 5 +- runtime/scheme/gv_test.go | 9 ++- runtime/scheme/meta.go | 2 +- runtime/scheme/meta_test.go | 9 ++- runtime/scheme/registry.go | 7 +- runtime/scheme/scheme_test.go | 42 +++++----- saga/api/handlers/status/status.go | 5 +- saga/component/component.go | 5 +- saga/context.go | 1 + saga/handlers/control.go | 5 +- saga/handlers/eventhandler.go | 6 +- saga/mutex/sql.go | 7 +- saga/saga.go | 5 +- saga/sagaid_test.go | 3 +- saga/sagainst.go | 27 +++---- saga/sqlstore.go | 76 ++++++++++--------- saga/store.go | 23 +++--- testing/integration/saga/mutex/common.go | 53 ++++++------- .../integration/saga/mutex/mysqlmutex_test.go | 7 +- .../integration/saga/mutex/pgmutex_test.go | 7 +- testing/integration/saga/mysqlstore_test.go | 29 +++---- testing/integration/saga/pgstore_test.go | 3 +- testing/integration/saga/suite/mysqlsuite.go | 13 ++-- testing/integration/saga/suite/pgsuite.go | 9 ++- 45 files changed, 279 insertions(+), 238 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 0d6eeb3..ff0155b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -46,6 +46,10 @@ jobs: sleep 1 done echo Failed waiting for MySQL && exit 1 + - run: + name: Lint + command: | + make lint - run: name: Run tests environment: diff --git a/.golangci.yml b/.golangci.yml index 4650e6a..87aaba9 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -41,7 +41,7 @@ linters: - varcheck - goimports - typecheck - - errorlint + #- errorlint - gofmt - misspell - nestif @@ -57,6 +57,12 @@ linters-settings: go: "1.16" unused: go: "1.16" + gosec: + # Available rules: https://github.com/securego/gosec#available-rules + excludes: + - G404 + nestif: + min-complexity: 8 issues: diff --git a/log/default.go b/log/default.go index 5aeb57e..ad955e3 100644 --- a/log/default.go +++ b/log/default.go @@ -15,20 +15,24 @@ type defaultLogger struct { internalLogger *log.Logger } -func (l defaultLogger) Log(level Level, v... interface{}) { +func (l defaultLogger) Log(level Level, v ...interface{}) { if level == FatalLevel { l.internalLogger.Fatal(v...) return } - l.internalLogger.Output(3, fmt.Sprint(v...)) -} -func (l defaultLogger) Logf(level Level, template string, args... interface{}) { - if level == FatalLevel { - l.internalLogger.Fatalf(template, args...) + if level == PanicLevel { + l.internalLogger.Panic(v...) return } - l.internalLogger.Output(3, fmt.Sprintf(template, args...)) + + if err := l.internalLogger.Output(3, fmt.Sprint(v...)); err != nil { + l.internalLogger.Println(fmt.Sprintf("err logging an entry: %s. %s", err, v)) + } +} + +func (l defaultLogger) Logf(level Level, template string, args ...interface{}) { + l.Log(level, fmt.Sprintf(template, args...)) } func (l defaultLogger) SetLevel(level Level) { diff --git a/messagebus.go b/messagebus.go index 1ebfd4b..4bc4675 100644 --- a/messagebus.go +++ b/messagebus.go @@ -2,6 +2,7 @@ package brigadier import ( "errors" + "github.com/go-foreman/foreman/log" "github.com/go-foreman/foreman/pubsub/dispatcher" "github.com/go-foreman/foreman/pubsub/endpoint" diff --git a/pubsub/dispatcher/dispatcher.go b/pubsub/dispatcher/dispatcher.go index 1adb7ec..86a1407 100644 --- a/pubsub/dispatcher/dispatcher.go +++ b/pubsub/dispatcher/dispatcher.go @@ -2,10 +2,11 @@ package dispatcher import ( "fmt" + "reflect" + "github.com/go-foreman/foreman/pubsub/message" "github.com/go-foreman/foreman/pubsub/message/execution" "github.com/go-foreman/foreman/runtime/scheme" - "reflect" ) type Dispatcher interface { @@ -17,14 +18,14 @@ type Dispatcher interface { func NewDispatcher() Dispatcher { return &dispatcher{ - handlers: make(map[reflect.Type][]execution.Executor), + handlers: make(map[reflect.Type][]execution.Executor), listeners: make(map[reflect.Type][]execution.Executor), } } type dispatcher struct { - handlers map[reflect.Type][]execution.Executor - listeners map[reflect.Type][]execution.Executor + handlers map[reflect.Type][]execution.Executor + listeners map[reflect.Type][]execution.Executor allEvsListeners []execution.Executor } diff --git a/pubsub/dispatcher/dispatcher_test.go b/pubsub/dispatcher/dispatcher_test.go index c464a12..1f06894 100644 --- a/pubsub/dispatcher/dispatcher_test.go +++ b/pubsub/dispatcher/dispatcher_test.go @@ -1,13 +1,14 @@ package dispatcher import ( + "reflect" + "testing" + "github.com/go-foreman/foreman/pubsub/message" "github.com/go-foreman/foreman/pubsub/message/execution" "github.com/go-foreman/foreman/runtime/scheme" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "reflect" - "testing" ) type registerAccountCmd struct { @@ -26,10 +27,6 @@ type confirmationSentEvent struct { message.ObjectMeta } -func allExecutor(execCtx execution.MessageExecutionCtx) error { - return nil -} - type service struct { } @@ -77,7 +74,7 @@ func TestDispatcher_SubscribeForCmd(t *testing.T) { dispatcher.SubscribeForCmd(wrongType, handler.handle) }) }) - + t.Run("cross subscription for an event", func(t *testing.T) { dispatcher := NewDispatcher() dispatcher.SubscribeForCmd(®isterAccountCmd{}, handler.handle) @@ -140,7 +137,7 @@ func TestDispatcher_SubscribeForAllEvents(t *testing.T) { require.Len(t, listeners, 1) assertThisValueExists(t, handler.handle, listeners) }) - + t.Run("subscribe for an event by duplicating handler for all events", func(t *testing.T) { dispatcher := NewDispatcher() dispatcher.SubscribeForAllEvents(handler.handle) @@ -182,4 +179,3 @@ func assertThisValueExists(t *testing.T, expected execution.Executor, executors } assert.True(t, exists, "expected executor is not found among executors") } - diff --git a/pubsub/endpoint/amqp.go b/pubsub/endpoint/amqp.go index c9d23bd..e211631 100644 --- a/pubsub/endpoint/amqp.go +++ b/pubsub/endpoint/amqp.go @@ -2,11 +2,12 @@ package endpoint import ( "context" + "time" + "github.com/go-foreman/foreman/pubsub/message" "github.com/go-foreman/foreman/pubsub/transport" "github.com/go-foreman/foreman/pubsub/transport/pkg" "github.com/pkg/errors" - "time" ) type AmqpEndpoint struct { @@ -27,11 +28,9 @@ func (a AmqpEndpoint) Name() string { func (a AmqpEndpoint) Send(ctx context.Context, msg *message.OutcomingMessage, opts ...DeliveryOption) error { deliveryOpts := &deliveryOptions{} - if opts != nil { - for _, opt := range opts { - if err := opt(deliveryOpts); err != nil { - return errors.Wrapf(err, "error compiling delivery options for message %s", msg.UID()) - } + for _, opt := range opts { + if err := opt(deliveryOpts); err != nil { + return errors.Wrapf(err, "error compiling delivery options for message %s", msg.UID()) } } diff --git a/pubsub/endpoint/endpoint.go b/pubsub/endpoint/endpoint.go index a6304b7..fadcf35 100644 --- a/pubsub/endpoint/endpoint.go +++ b/pubsub/endpoint/endpoint.go @@ -2,8 +2,9 @@ package endpoint import ( "context" - "github.com/go-foreman/foreman/pubsub/message" "time" + + "github.com/go-foreman/foreman/pubsub/message" ) type Endpoint interface { @@ -13,7 +14,6 @@ type Endpoint interface { type deliveryOptions struct { delay *time.Duration - headers message.Headers } func WithDelay(delay time.Duration) DeliveryOption { diff --git a/pubsub/endpoint/router.go b/pubsub/endpoint/router.go index 3221bc8..f1f7bfe 100644 --- a/pubsub/endpoint/router.go +++ b/pubsub/endpoint/router.go @@ -1,13 +1,14 @@ package endpoint import ( + "reflect" + "github.com/go-foreman/foreman/pubsub/message" "github.com/go-foreman/foreman/runtime/scheme" - "reflect" ) type Router interface { - RegisterEndpoint(endpoint Endpoint, objects... message.Object) + RegisterEndpoint(endpoint Endpoint, objects ...message.Object) Route(obj message.Object) []Endpoint } @@ -21,7 +22,7 @@ type router struct { routes map[reflect.Type][]Endpoint } -func (r *router) RegisterEndpoint(endpoint Endpoint, objects... message.Object) { +func (r *router) RegisterEndpoint(endpoint Endpoint, objects ...message.Object) { for _, obj := range objects { structType := scheme.GetStructType(obj) r.routes[structType] = append(r.routes[structType], endpoint) diff --git a/pubsub/errors/errors.go b/pubsub/errors/errors.go index c2a9b02..f160f2d 100644 --- a/pubsub/errors/errors.go +++ b/pubsub/errors/errors.go @@ -1,4 +1,5 @@ package errors + // //type Status int // diff --git a/pubsub/message/decoder.go b/pubsub/message/decoder.go index d0a4b14..be49a78 100644 --- a/pubsub/message/decoder.go +++ b/pubsub/message/decoder.go @@ -2,11 +2,12 @@ package message import ( "encoding/json" + "reflect" + "strings" + "github.com/go-foreman/foreman/runtime/scheme" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" - "reflect" - "strings" ) type Marshaller interface { @@ -28,7 +29,6 @@ func WithDecoderErr(err error) error { type jsonDecoder struct { knownTypes scheme.KnownTypesRegistry - decoder mapstructure.Decoder } func (j jsonDecoder) Unmarshal(b []byte) (Object, error) { @@ -162,7 +162,7 @@ func (j jsonDecoder) Marshal(obj Object) ([]byte, error) { var objectType = reflect.TypeOf((*Object)(nil)).Elem() -func(j jsonDecoder) setGroupKind(obj Object) error { +func (j jsonDecoder) setGroupKind(obj Object) error { if gk := obj.GroupKind(); gk.Empty() { gk, err := j.knownTypes.ObjectKind(obj) if err != nil { @@ -188,4 +188,4 @@ func(j jsonDecoder) setGroupKind(obj Object) error { } return nil -} \ No newline at end of file +} diff --git a/pubsub/message/decoder_test.go b/pubsub/message/decoder_test.go index 97110f4..83188d4 100644 --- a/pubsub/message/decoder_test.go +++ b/pubsub/message/decoder_test.go @@ -2,11 +2,12 @@ package message import ( "encoding/json" + "strings" + "testing" + "github.com/go-foreman/foreman/runtime/scheme" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "strings" - "testing" ) const ( @@ -16,13 +17,13 @@ const ( type WrapperType struct { ObjectMeta Nested Object - Value int + Value int } type SomeTypeWithNestedType struct { ObjectMeta Nested Object - Value int + Value int } type SomeTestType struct { @@ -35,8 +36,6 @@ type ChildType struct { Value int } - - func TestJsonDecoder(t *testing.T) { knownRegistry := scheme.NewKnownTypesRegistry() decoder := NewJsonMarshaller(knownRegistry) @@ -50,7 +49,7 @@ func TestJsonDecoder(t *testing.T) { Group: group.String(), }, }, - Value: 1, + Value: 1, } marshaled, err := decoder.Marshal(instance) @@ -66,7 +65,7 @@ func TestJsonDecoder(t *testing.T) { t.Run("verify that GK is set from schema before encoding", func(t *testing.T) { knownRegistry.AddKnownTypes(group, &SomeTestType{}) instance := &SomeTestType{ - Value: 1, + Value: 1, } marshaled, err := decoder.Marshal(instance) require.NoError(t, err) @@ -77,7 +76,7 @@ func TestJsonDecoder(t *testing.T) { assert.IsType(t, &SomeTestType{}, decodedObj) assert.Equal(t, instance.Value, instance.Value) }) - + t.Run("decode invalid payload with empty GK", func(t *testing.T) { instance := &SomeTestType{ ObjectMeta: ObjectMeta{ @@ -86,7 +85,7 @@ func TestJsonDecoder(t *testing.T) { Group: group.String(), }, }, - Value: 1, + Value: 1, } marshaled, err := json.Marshal(instance) @@ -111,7 +110,7 @@ func TestJsonDecoder(t *testing.T) { knownRegistry.AddKnownTypes(group, &WrapperType{}) instance := &WrapperType{ Nested: &SomeTypeWithNestedType{ - Nested: &SomeTestType{ + Nested: &SomeTestType{ Value: 1, Child: ChildType{ Value: -1, @@ -131,4 +130,3 @@ func TestJsonDecoder(t *testing.T) { assert.EqualValues(t, instance, decodedObj) }) } - diff --git a/pubsub/message/execution/context.go b/pubsub/message/execution/context.go index 92393ed..dade75a 100644 --- a/pubsub/message/execution/context.go +++ b/pubsub/message/execution/context.go @@ -2,6 +2,7 @@ package execution import ( "context" + "github.com/go-foreman/foreman/log" "github.com/go-foreman/foreman/pubsub/endpoint" "github.com/go-foreman/foreman/pubsub/message" diff --git a/pubsub/message/message.go b/pubsub/message/message.go index cecf04f..b035bc2 100644 --- a/pubsub/message/message.go +++ b/pubsub/message/message.go @@ -1,9 +1,10 @@ package message import ( + "time" + "github.com/go-foreman/foreman/runtime/scheme" "github.com/google/uuid" - "time" ) type Headers map[string]interface{} @@ -44,16 +45,16 @@ type ObjectMeta struct { } type ReceivedMessage struct { - uid string - headers Headers - payload Object + uid string + headers Headers + payload Object receivedAt time.Time - origin string + origin string } func NewReceivedMessage(uid string, payload Object, headers Headers, receivedAt time.Time, origin string) *ReceivedMessage { return &ReceivedMessage{ - uid: uid, + uid: uid, headers: headers, payload: payload, receivedAt: receivedAt, @@ -82,8 +83,8 @@ func (m ReceivedMessage) Origin() string { } type OutcomingMessage struct { - obj Object - uid string + obj Object + uid string headers Headers } @@ -129,16 +130,15 @@ func NewOutcomingMessage(payload Object, passedOptions ...MsgOption) *OutcomingM func FromReceivedMsg(received *ReceivedMessage) *OutcomingMessage { return &OutcomingMessage{ headers: received.Headers(), - uid: received.UID(), - obj: received.Payload(), + uid: received.UID(), + obj: received.Payload(), } } type MsgOption func(attr *opts) type opts struct { - description string - headers Headers + headers Headers } func WithHeaders(headers Headers) MsgOption { diff --git a/pubsub/message/unstrucutred.go b/pubsub/message/unstrucutred.go index 3426838..7c4822f 100644 --- a/pubsub/message/unstrucutred.go +++ b/pubsub/message/unstrucutred.go @@ -2,6 +2,7 @@ package message import ( "encoding/json" + "github.com/go-foreman/foreman/runtime/scheme" "github.com/pkg/errors" ) diff --git a/pubsub/subscriber/pool.go b/pubsub/subscriber/pool.go index 110d2f9..acc9a22 100644 --- a/pubsub/subscriber/pool.go +++ b/pubsub/subscriber/pool.go @@ -17,10 +17,6 @@ type worker struct { myTasks workerQueue } -func (w *worker) workerQueue() workerQueue { - return w.myTasks -} - func newWorker(ctx context.Context, dispatcherQueue dispatcherQueue) worker { return worker{ ctx: ctx, diff --git a/pubsub/transport/mock.go b/pubsub/transport/mock.go index de81a69..5109159 100644 --- a/pubsub/transport/mock.go +++ b/pubsub/transport/mock.go @@ -2,8 +2,9 @@ package transport import ( "context" - "github.com/go-foreman/foreman/pubsub/transport/pkg" "time" + + "github.com/go-foreman/foreman/pubsub/transport/pkg" ) func NewStubTransport() *stubTransport { @@ -41,8 +42,8 @@ func (m *stubTransport) CreateQueue(ctx context.Context, queue Queue, queueBind func (m *stubTransport) Consume(ctx context.Context, queues []Queue, options ...ConsumeOpts) (<-chan pkg.IncomingPkg, error) { income := make(chan pkg.IncomingPkg) - for _, q := range queues { - go func() { + for i := range queues { + go func(q Queue) { for _, topic := range m.queueBinding[q.Name()] { msgs := m.topics[topic] for { @@ -58,7 +59,7 @@ func (m *stubTransport) Consume(ctx context.Context, queues []Queue, options ... } } } - }() + }(queues[i]) } return income, nil diff --git a/pubsub/transport/pkg/pkg.go b/pubsub/transport/pkg/pkg.go index dbe9284..4dee070 100644 --- a/pubsub/transport/pkg/pkg.go +++ b/pubsub/transport/pkg/pkg.go @@ -1,8 +1,9 @@ package pkg import ( - "github.com/streadway/amqp" "time" + + "github.com/streadway/amqp" ) type IncomingPkg interface { diff --git a/pubsub/transport/plugins/amqp/amqp.go b/pubsub/transport/plugins/amqp/amqp.go index 3db7809..a004723 100644 --- a/pubsub/transport/plugins/amqp/amqp.go +++ b/pubsub/transport/plugins/amqp/amqp.go @@ -190,7 +190,7 @@ func (t *amqpTransport) Consume(ctx context.Context, queues []transport.Queue, o defer consumersWait.Done() defer func() { - t.logger.Logf(log.InfoLevel,"canceling consumer %s", queue.Name()) + t.logger.Logf(log.InfoLevel, "canceling consumer %s", queue.Name()) if err := t.consumingChannel.Cancel(queue.Name(), true); err != nil { t.logger.Logf(log.ErrorLevel, "error canceling consumer %s", err) } diff --git a/pubsub/transport/plugins/amqp/connection.go b/pubsub/transport/plugins/amqp/connection.go index e8e7d2d..9fc29ee 100644 --- a/pubsub/transport/plugins/amqp/connection.go +++ b/pubsub/transport/plugins/amqp/connection.go @@ -1,10 +1,11 @@ package amqp import ( - "github.com/go-foreman/foreman/log" - "github.com/streadway/amqp" "sync/atomic" "time" + + "github.com/go-foreman/foreman/log" + "github.com/streadway/amqp" ) const ( diff --git a/pubsub/transport/transport.go b/pubsub/transport/transport.go index ee0101d..861979b 100644 --- a/pubsub/transport/transport.go +++ b/pubsub/transport/transport.go @@ -2,6 +2,7 @@ package transport import ( "context" + "github.com/go-foreman/foreman/pubsub/transport/pkg" ) diff --git a/runtime/scheme/gv.go b/runtime/scheme/gv.go index 5ec6a89..8e2d62c 100644 --- a/runtime/scheme/gv.go +++ b/runtime/scheme/gv.go @@ -1,8 +1,9 @@ package scheme import ( - "github.com/pkg/errors" "strings" + + "github.com/pkg/errors" ) type Group string @@ -47,4 +48,4 @@ func FromString(str string) (GroupKind, error) { } return GroupKind{Group: Group(items[0]), Kind: items[1]}, nil -} \ No newline at end of file +} diff --git a/runtime/scheme/gv_test.go b/runtime/scheme/gv_test.go index 68ef216..23bb091 100644 --- a/runtime/scheme/gv_test.go +++ b/runtime/scheme/gv_test.go @@ -2,8 +2,9 @@ package scheme import ( "fmt" - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestGroup(t *testing.T) { @@ -22,9 +23,9 @@ func TestGroupKind(t *testing.T) { t.Run("GK is ok", func(t *testing.T) { gk := GroupKind{ Group: group, - Kind: "SomeTest", + Kind: "SomeTest", } - assert.Equal(t, fmt.Sprintf("%s.%s",group, "SomeTest"), gk.String()) + assert.Equal(t, fmt.Sprintf("%s.%s", group, "SomeTest"), gk.String()) assert.Equal(t, gk.String(), gk.Identifier()) }) @@ -35,7 +36,7 @@ func TestGroupKind(t *testing.T) { t.Run("GK has empty group", func(t *testing.T) { gk := GroupKind{ - Kind: "SomeTest", + Kind: "SomeTest", } assert.False(t, gk.Empty()) assert.Equal(t, "SomeTest", gk.String()) diff --git a/runtime/scheme/meta.go b/runtime/scheme/meta.go index 2ead51d..a9a8435 100644 --- a/runtime/scheme/meta.go +++ b/runtime/scheme/meta.go @@ -9,7 +9,7 @@ type Object interface { } type TypeMeta struct { - Kind string `json:"kind,omitempty" protobuf:"bytes,1,opt,name=kind"` + Kind string `json:"kind,omitempty" protobuf:"bytes,1,opt,name=kind"` Group string `json:"group,omitempty" protobuf:"bytes,2,opt,name=group"` } diff --git a/runtime/scheme/meta_test.go b/runtime/scheme/meta_test.go index 95ddf9a..5baea77 100644 --- a/runtime/scheme/meta_test.go +++ b/runtime/scheme/meta_test.go @@ -1,19 +1,20 @@ package scheme import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestTypeMeta(t *testing.T) { meta := &TypeMeta{ - Kind: "SomeKind", + Kind: "SomeKind", Group: "group", } gv := meta.GroupKind() //other usecases covered by FromAPIVersionAndKind and by ParseGroupVersion tests assert.EqualValues(t, GroupKind{ - Group: "group", - Kind: "SomeKind", + Group: "group", + Kind: "SomeKind", }, gv) } diff --git a/runtime/scheme/registry.go b/runtime/scheme/registry.go index 8c10959..7b82375 100644 --- a/runtime/scheme/registry.go +++ b/runtime/scheme/registry.go @@ -2,8 +2,9 @@ package scheme import ( "fmt" - "github.com/pkg/errors" "reflect" + + "github.com/pkg/errors" ) var KnownTypesRegistryInstance = NewKnownTypesRegistry() @@ -32,8 +33,8 @@ func (r *knownTypesRegistry) AddKnownTypes(g Group, types ...Object) { for _, obj := range types { structType := GetStructType(obj) r.addKnownTypeWithName(GroupKind{ - Group: g, - Kind: structType.Name(), + Group: g, + Kind: structType.Name(), }, obj, structType) } } diff --git a/runtime/scheme/scheme_test.go b/runtime/scheme/scheme_test.go index 0929f0b..f9af866 100644 --- a/runtime/scheme/scheme_test.go +++ b/runtime/scheme/scheme_test.go @@ -1,9 +1,10 @@ package scheme import ( + "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "testing" ) const ( @@ -30,13 +31,13 @@ func TestKnownTypesRegistry_AddKnownTypeWithName(t *testing.T) { t.Run("add known type by pointer", func(t *testing.T) { knownRegistry.AddKnownTypeWithName(GroupKind{ - Group: group, - Kind: "CustomKind", + Group: group, + Kind: "CustomKind", }, &SomeTestType{}) someTestTypeInstance, err := knownRegistry.NewObject(GroupKind{ - Group: group, - Kind: "CustomKind", + Group: group, + Kind: "CustomKind", }) require.NoError(t, err) assert.NotNil(t, someTestTypeInstance) @@ -46,12 +47,12 @@ func TestKnownTypesRegistry_AddKnownTypeWithName(t *testing.T) { t.Run("add known type by value", func(t *testing.T) { knownRegistry.AddKnownTypeWithName(GroupKind{ Group: group, - Kind: "SomeKind", + Kind: "SomeKind", }, &SomeTestType{}) someKindInstance, err := knownRegistry.NewObject(GroupKind{ - Group: group, - Kind: "CustomKind", + Group: group, + Kind: "CustomKind", }) require.NoError(t, err) assert.NotNil(t, someKindInstance) @@ -62,8 +63,8 @@ func TestKnownTypesRegistry_AddKnownTypeWithName(t *testing.T) { expected := "group is required on all types: CustomKind scheme.SomeTestType" require.PanicsWithValue(t, expected, func() { knownRegistry.AddKnownTypeWithName(GroupKind{ - Group: "", - Kind: "CustomKind", + Group: "", + Kind: "CustomKind", }, &SomeTestType{}) }) }) @@ -72,8 +73,8 @@ func TestKnownTypesRegistry_AddKnownTypeWithName(t *testing.T) { expected := "Double registration of different types for test.CustomKind: old=github.com/go-foreman/foreman/runtime/scheme.SomeTestType, new=github.com/go-foreman/foreman/runtime/scheme.SomeAnotherTestType" require.PanicsWithValue(t, expected, func() { knownRegistry.AddKnownTypeWithName(GroupKind{ - Group: group, - Kind: "CustomKind", + Group: group, + Kind: "CustomKind", }, &SomeAnotherTestType{}) }) }) @@ -82,8 +83,8 @@ func TestKnownTypesRegistry_AddKnownTypeWithName(t *testing.T) { wrongType := notStructType("xxx") assert.PanicsWithValue(t, "all types must be pointers to structs", func() { knownRegistry.AddKnownTypeWithName(GroupKind{ - Group: group, - Kind: "WrongKind", + Group: group, + Kind: "WrongKind", }, &wrongType) }) }) @@ -115,16 +116,16 @@ func TestKnownTypesRegistry_AddKnownTypes(t *testing.T) { t.Run("added two types", func(t *testing.T) { knownRegistry.AddKnownTypes(group, &SomeTestType{}, &SomeAnotherTestType{}) someTestType, err := knownRegistry.NewObject(GroupKind{ - Group: group, - Kind: "SomeTestType", + Group: group, + Kind: "SomeTestType", }) require.NoError(t, err) assert.NotNil(t, someTestType) assert.IsType(t, &SomeTestType{}, someTestType) someAnotherTestType, err := knownRegistry.NewObject(GroupKind{ - Group: group, - Kind: "SomeAnotherTestType", + Group: group, + Kind: "SomeAnotherTestType", }) require.NoError(t, err) assert.NotNil(t, someAnotherTestType) @@ -133,8 +134,8 @@ func TestKnownTypesRegistry_AddKnownTypes(t *testing.T) { t.Run("type is not registered", func(t *testing.T) { loadedType, err := knownRegistry.NewObject(GroupKind{ - Group: group, - Kind: "XXXSomeTestType", + Group: group, + Kind: "XXXSomeTestType", }) assert.Nil(t, loadedType) assert.Error(t, err) @@ -151,4 +152,3 @@ func (n notStructType) GroupKind() GroupKind { func (n notStructType) SetGroupKind(gk *GroupKind) { } - diff --git a/saga/api/handlers/status/status.go b/saga/api/handlers/status/status.go index ea139ff..9d944aa 100644 --- a/saga/api/handlers/status/status.go +++ b/saga/api/handlers/status/status.go @@ -3,12 +3,13 @@ package status import ( "context" "encoding/json" + "net/http" + "strings" + "github.com/go-foreman/foreman/log" "github.com/go-foreman/foreman/saga" sagaApiErrors "github.com/go-foreman/foreman/saga/api/errors" "github.com/pkg/errors" - "net/http" - "strings" ) type StatusResponse struct { diff --git a/saga/component/component.go b/saga/component/component.go index c72ab87..453f5d5 100644 --- a/saga/component/component.go +++ b/saga/component/component.go @@ -1,7 +1,9 @@ package component import ( - "github.com/go-foreman/foreman" + "net/http" + + brigadier "github.com/go-foreman/foreman" "github.com/go-foreman/foreman/log" "github.com/go-foreman/foreman/pubsub/endpoint" "github.com/go-foreman/foreman/pubsub/message" @@ -11,7 +13,6 @@ import ( "github.com/go-foreman/foreman/saga/handlers" "github.com/go-foreman/foreman/saga/mutex" "github.com/pkg/errors" - "net/http" ) type Component struct { diff --git a/saga/context.go b/saga/context.go index 561b11c..59b5729 100644 --- a/saga/context.go +++ b/saga/context.go @@ -3,6 +3,7 @@ package saga import ( "context" "fmt" + "github.com/go-foreman/foreman/log" "github.com/go-foreman/foreman/pubsub/endpoint" "github.com/go-foreman/foreman/pubsub/message" diff --git a/saga/handlers/control.go b/saga/handlers/control.go index 33ef121..96d4113 100644 --- a/saga/handlers/control.go +++ b/saga/handlers/control.go @@ -3,6 +3,7 @@ package handlers import ( "context" "fmt" + log "github.com/go-foreman/foreman/log" "github.com/go-foreman/foreman/pubsub/message" "github.com/go-foreman/foreman/pubsub/message/execution" @@ -13,7 +14,7 @@ import ( "github.com/pkg/errors" ) -func NewSagaControlHandler(sagaStore sagaPkg.Store, mutex mutex.Mutex, sagaRegistry scheme.KnownTypesRegistry, sagaUIDSvc sagaPkg.SagaUIDService,logger log.Logger) *SagaControlHandler { +func NewSagaControlHandler(sagaStore sagaPkg.Store, mutex mutex.Mutex, sagaRegistry scheme.KnownTypesRegistry, sagaUIDSvc sagaPkg.SagaUIDService, logger log.Logger) *SagaControlHandler { return &SagaControlHandler{typesRegistry: sagaRegistry, store: sagaStore, mutex: mutex, sagaUIDSvc: sagaUIDSvc, logger: logger} } @@ -70,7 +71,7 @@ func (h SagaControlHandler) Handle(execCtx execution.MessageExecutionCtx) error } if !sagaInstance.Status().Failed() || sagaInstance.Status().Completed() || sagaInstance.Status().Recovering() || sagaInstance.Status().Compensating() { - h.logger.Logf(log.InfoLevel, "Saga %s has status %s, you can't start recovering the process", sagaInstance.UID(), sagaInstance.Status(), ) + h.logger.Logf(log.InfoLevel, "Saga %s has status %s, you can't start recovering the process", sagaInstance.UID(), sagaInstance.Status()) return nil } diff --git a/saga/handlers/eventhandler.go b/saga/handlers/eventhandler.go index 4856aab..30182d5 100644 --- a/saga/handlers/eventhandler.go +++ b/saga/handlers/eventhandler.go @@ -2,12 +2,14 @@ package handlers import ( "context" + "time" + log "github.com/go-foreman/foreman/log" sagaPkg "github.com/go-foreman/foreman/saga" sagaMutex "github.com/go-foreman/foreman/saga/mutex" - "time" "fmt" + "github.com/go-foreman/foreman/pubsub/message" "github.com/go-foreman/foreman/pubsub/message/execution" "github.com/go-foreman/foreman/runtime/scheme" @@ -44,7 +46,7 @@ func (e SagaEventsHandler) Handle(execCtx execution.MessageExecutionCtx) error { } defer func() { - releaseCtx, cancel := context.WithTimeout(context.Background(), time.Second * 30) + releaseCtx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() if err := e.mutex.Release(releaseCtx, sagaId); err != nil { diff --git a/saga/mutex/sql.go b/saga/mutex/sql.go index 594b9b0..523e15b 100644 --- a/saga/mutex/sql.go +++ b/saga/mutex/sql.go @@ -4,9 +4,10 @@ import ( "context" "database/sql" "fmt" - "github.com/go-foreman/foreman/saga" "sync" + "github.com/go-foreman/foreman/saga" + "github.com/pkg/errors" ) @@ -86,7 +87,7 @@ func (m *mysqlMutex) Release(ctx context.Context, sagaId string) error { } type pgsqlMutex struct { - db *sql.DB + db *sql.DB mapLock sync.Mutex connections map[string]*sql.Conn } @@ -115,7 +116,7 @@ func (p *pgsqlMutex) Lock(ctx context.Context, sagaId string) error { } if err := conn.PingContext(ctx); err != nil { - if i < retries -1 { + if i < retries-1 { continue } } diff --git a/saga/saga.go b/saga/saga.go index b30b848..02037be 100644 --- a/saga/saga.go +++ b/saga/saga.go @@ -2,9 +2,10 @@ package saga import ( "fmt" + "reflect" + "github.com/go-foreman/foreman/pubsub/message" "github.com/go-foreman/foreman/runtime/scheme" - "reflect" ) type Saga interface { @@ -20,7 +21,7 @@ type Saga interface { type BaseSaga struct { message.ObjectMeta adjacencyMap map[scheme.GroupKind]Executor - scheme scheme.KnownTypesRegistry + scheme scheme.KnownTypesRegistry } type Executor func(execCtx SagaContext) error diff --git a/saga/sagaid_test.go b/saga/sagaid_test.go index 80d912e..d17bf89 100644 --- a/saga/sagaid_test.go +++ b/saga/sagaid_test.go @@ -2,10 +2,11 @@ package saga import ( "fmt" + "testing" + "github.com/go-foreman/foreman/pubsub/message" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "testing" ) func TestNewSagaUIDService(t *testing.T) { diff --git a/saga/sagainst.go b/saga/sagainst.go index ba753d4..3d935d3 100644 --- a/saga/sagainst.go +++ b/saga/sagainst.go @@ -1,9 +1,10 @@ package saga import ( + "time" + "github.com/go-foreman/foreman/pubsub/message" "github.com/google/uuid" - "time" ) const ( @@ -28,7 +29,7 @@ type Instance interface { Fail(ev message.Object) HistoryEvents() []HistoryEvent - AddHistoryEvent(ev message.Object, opts...AddEvOpt) + AddHistoryEvent(ev message.Object, opts ...AddEvOpt) StartedAt() *time.Time UpdatedAt() *time.Time @@ -51,7 +52,7 @@ func NewSagaInstance(id, parentId string, saga Saga) Instance { parentID: parentId, saga: saga, instanceStatus: instanceStatus{ - status: sagaStatusCreated, + status: sagaStatusCreated, }, historyEvents: make([]HistoryEvent, 0), } @@ -141,7 +142,7 @@ func (s *sagaInstance) update() { s.updatedAt = ¤tTime } -func (s *sagaInstance) AddHistoryEvent(ev message.Object, opts...AddEvOpt) { +func (s *sagaInstance) AddHistoryEvent(ev message.Object, opts ...AddEvOpt) { attachOpts := &addEvOpts{} if len(opts) > 0 { for _, o := range opts { @@ -149,32 +150,32 @@ func (s *sagaInstance) AddHistoryEvent(ev message.Object, opts...AddEvOpt) { } } - historyEv := HistoryEvent{ + historyEv := HistoryEvent{ UID: uuid.New().String(), CreatedAt: time.Now().Round(time.Second).UTC(), Payload: ev, OriginSource: attachOpts.origin, SagaStatus: s.instanceStatus.status.String(), - TraceUID: attachOpts.traceUID, + TraceUID: attachOpts.traceUID, } s.historyEvents = append(s.historyEvents, historyEv) } type HistoryEvent struct { - UID string `json:"uid"` - CreatedAt time.Time `json:"created_at"` + UID string `json:"uid"` + CreatedAt time.Time `json:"created_at"` Payload message.Object `json:"payload"` - OriginSource string `json:"origin"` - SagaStatus string `json:"saga_status"` //saga status at the moment - TraceUID string `json:"trace_uid"` //uid of received message, could be empty + OriginSource string `json:"origin"` + SagaStatus string `json:"saga_status"` //saga status at the moment + TraceUID string `json:"trace_uid"` //uid of received message, could be empty } type addEvOpts struct { traceUID string - origin string + origin string } -type AddEvOpt func (o *addEvOpts) +type AddEvOpt func(o *addEvOpts) func WithTraceUID(uid string) AddEvOpt { return func(o *addEvOpts) { diff --git a/saga/sqlstore.go b/saga/sqlstore.go index 0728b1e..c1179d8 100644 --- a/saga/sqlstore.go +++ b/saga/sqlstore.go @@ -4,15 +4,16 @@ import ( "context" "database/sql" "fmt" - "github.com/go-foreman/foreman/pubsub/message" - "github.com/pkg/errors" "strconv" "time" + + "github.com/go-foreman/foreman/pubsub/message" + "github.com/pkg/errors" ) const ( MYSQLDriver SQLDriver = "mysql" - PGDriver SQLDriver = "pg" + PGDriver SQLDriver = "pg" ) type SQLDriver string @@ -20,15 +21,15 @@ type SQLDriver string type sqlStore struct { msgMarshaller message.Marshaller db *sql.DB - driver SQLDriver + driver SQLDriver } // NewSQLSagaStore creates sql saga store, it supports mysql and postgres drivers. // driver param is required because of https://github.com/golang/go/issues/3602. Better this than +1 dependency or copy pasting code -func NewSQLSagaStore(db *sql.DB, driver SQLDriver, msgMarshaller message.Marshaller) (Store, error) { +func NewSQLSagaStore(db *sql.DB, driver SQLDriver, msgMarshaller message.Marshaller) (Store, error) { s := &sqlStore{db: db, driver: driver, msgMarshaller: msgMarshaller} if err := s.initTables(); err != nil { - return nil, errors.WithStack(err) + return nil, errors.Wrapf(err, "initializing tables for SQLSagaStore, driver %s", driver) } return s, nil @@ -45,7 +46,7 @@ func (s sqlStore) Create(ctx context.Context, sagaInstance Instance) error { tx, err := s.db.Begin() if err != nil { - return errors.WithStack(err) + return errors.Wrapf(err, "beginning a transaction for saga %s", sagaInstance.UID()) } _, err = tx.ExecContext(ctx, s.prepQuery(fmt.Sprintf("INSERT INTO %v (uid, parent_uid, name, payload, status, started_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?);", sagaTableName)), @@ -59,13 +60,13 @@ func (s sqlStore) Create(ctx context.Context, sagaInstance Instance) error { ) if err != nil { if rErr := tx.Rollback(); rErr != nil { - return errors.Wrapf(rErr, "error rollback when %s", err) + return errors.Wrapf(rErr, "rollback when %s", err) } - return errors.WithStack(err) + return errors.Wrapf(err, "inserting saga instance %s", sagaInstance.UID()) } if err := tx.Commit(); err != nil { - return errors.Wrapf(err, "commiting saga instance %s into the store", sagaInstance.UID()) + return errors.Wrapf(err, "committing saga instance %s into the store", sagaInstance.UID()) } return nil @@ -118,29 +119,30 @@ func (s sqlStore) Update(ctx context.Context, sagaInstance Instance) error { if err != nil { if rErr := tx.Rollback(); rErr != nil { - return errors.Wrapf(rErr, "error rollback when %s", err) + return errors.Wrapf(rErr, "rollback when %s", err) } return errors.Wrapf(err, "querying %s for saga_uid %s", sagaHistoryTableName, sagaInstance.UID()) } - var id string - ids := make(map[string]string) - for rows.Next() { - err := rows.Scan(&id) + defer rows.Close() - if err != nil { + var eventID string + eventsIDs := make(map[string]string) + + for rows.Next() { + if err := rows.Scan(&eventID); err != nil { if rErr := tx.Rollback(); rErr != nil { return errors.Wrapf(rErr, "error rollback when %s", err) } - return errors.WithStack(err) + return errors.Wrap(err, "scanning row") } - ids[id] = id + eventsIDs[eventID] = eventID } - if len(ids) < len(sagaInstance.HistoryEvents()) { + if len(eventsIDs) < len(sagaInstance.HistoryEvents()) { for _, ev := range sagaInstance.HistoryEvents() { - if _, exists := ids[ev.UID]; exists { + if _, exists := eventsIDs[ev.UID]; !exists { continue } @@ -148,7 +150,7 @@ func (s sqlStore) Update(ctx context.Context, sagaInstance Instance) error { if err != nil { if rErr := tx.Rollback(); rErr != nil { - return errors.Wrapf(rErr, "error rollback when %s", err) + return errors.Wrapf(rErr, "rollback when %s", err) } return errors.WithStack(err) @@ -167,7 +169,7 @@ func (s sqlStore) Update(ctx context.Context, sagaInstance Instance) error { if err != nil { if rErr := tx.Rollback(); rErr != nil { - return errors.Wrapf(rErr, "error rollback when %s", err) + return errors.Wrapf(rErr, "rollback when %s", err) } return errors.Wrapf(err, "inserting history event %v for saga %s", ev, sagaInstance.UID()) } @@ -175,7 +177,7 @@ func (s sqlStore) Update(ctx context.Context, sagaInstance Instance) error { } if err := tx.Commit(); err != nil { - return errors.WithStack(err) + return errors.Wrapf(err, "committing update of events for saga %s", sagaInstance.UID()) } return nil @@ -218,7 +220,7 @@ func (s sqlStore) GetById(ctx context.Context, sagaId string) (Instance, error) return sagaInstance, nil } -func (s sqlStore) GetByFilter(ctx context.Context, filters... FilterOption) ([]Instance, error) { +func (s sqlStore) GetByFilter(ctx context.Context, filters ...FilterOption) ([]Instance, error) { if len(filters) == 0 { return nil, errors.Errorf("No filters found, you have to specify at least one so result won't be whole store") } @@ -231,7 +233,7 @@ func (s sqlStore) GetByFilter(ctx context.Context, filters... FilterOption) ([]I //todo use https://github.com/Masterminds/squirrel ? +1 dependency, is it really needed? query := fmt.Sprintf( -`SELECT + `SELECT s.uid, s.parent_uid, s.name, @@ -249,7 +251,7 @@ func (s sqlStore) GetByFilter(ctx context.Context, filters... FilterOption) ([]I sh.trace_uid FROM %s s LEFT JOIN %s sh ON s.uid = sh.saga_uid WHERE`, - sagaTableName, sagaHistoryTableName) + sagaTableName, sagaHistoryTableName) var ( args []interface{} @@ -293,6 +295,8 @@ func (s sqlStore) GetByFilter(ctx context.Context, filters... FilterOption) ([]I return nil, errors.Wrap(err, "querying sagas with filter") } + defer rows.Close() + sagas := make(map[string]*sagaInstance) for rows.Next() { @@ -383,6 +387,8 @@ func (s sqlStore) queryEvents(sagaId string) ([]HistoryEvent, error) { return nil, errors.Wrapf(err, "querying events for saga %s", sagaId) } + defer rows.Close() + messages := make([]HistoryEvent, 0) for rows.Next() { @@ -424,12 +430,12 @@ func (s sqlStore) eventFromModel(ev historyEventSqlModel) (*HistoryEvent, error) } res := &HistoryEvent{ - UID: ev.ID.String, - SagaStatus: ev.SagaStatus.String, - Payload: eventPayload, - CreatedAt: ev.CreatedAt.Time, + UID: ev.ID.String, + SagaStatus: ev.SagaStatus.String, + Payload: eventPayload, + CreatedAt: ev.CreatedAt.Time, OriginSource: ev.OriginSource.String, - TraceUID: ev.TraceUID.String, + TraceUID: ev.TraceUID.String, } return res, nil @@ -443,10 +449,10 @@ func (s sqlStore) instanceFromModel(sagaData sagaSqlModel) (*sagaInstance, error sagaInstance := &sagaInstance{ uid: sagaData.ID.String, - instanceStatus: instanceStatus{ - status: status, + instanceStatus: instanceStatus{ + status: status, }, - parentID: sagaData.ParentID.String, + parentID: sagaData.ParentID.String, historyEvents: make([]HistoryEvent, 0), } @@ -483,7 +489,7 @@ func (s sqlStore) instanceFromModel(sagaData sagaSqlModel) (*sagaInstance, error } func (s sqlStore) initTables() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second * 30) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() tx, err := s.db.BeginTx(ctx, &sql.TxOptions{}) diff --git a/saga/store.go b/saga/store.go index a768412..01cb174 100644 --- a/saga/store.go +++ b/saga/store.go @@ -3,6 +3,7 @@ package saga import ( "context" "database/sql" + "github.com/pkg/errors" ) @@ -16,7 +17,7 @@ type FilterOption func(opts *filterOptions) type Store interface { Create(ctx context.Context, saga Instance) error GetById(ctx context.Context, sagaId string) (Instance, error) - GetByFilter(ctx context.Context, filters... FilterOption) ([]Instance, error) + GetByFilter(ctx context.Context, filters ...FilterOption) ([]Instance, error) Update(ctx context.Context, saga Instance) error Delete(ctx context.Context, sagaId string) error } @@ -57,22 +58,22 @@ func statusFromStr(str string) (status, error) { } type sagaSqlModel struct { - ID sql.NullString - ParentID sql.NullString - Name sql.NullString - Payload []byte - Status sql.NullString + ID sql.NullString + ParentID sql.NullString + Name sql.NullString + Payload []byte + Status sql.NullString LastFailedMsg []byte - StartedAt sql.NullTime - UpdatedAt sql.NullTime + StartedAt sql.NullTime + UpdatedAt sql.NullTime } type historyEventSqlModel struct { - ID sql.NullString - Name sql.NullString + ID sql.NullString + Name sql.NullString CreatedAt sql.NullTime Payload []byte OriginSource sql.NullString SagaStatus sql.NullString TraceUID sql.NullString -} \ No newline at end of file +} diff --git a/testing/integration/saga/mutex/common.go b/testing/integration/saga/mutex/common.go index b3bbd62..a1b7c33 100644 --- a/testing/integration/saga/mutex/common.go +++ b/testing/integration/saga/mutex/common.go @@ -4,19 +4,20 @@ import ( "context" "database/sql" "fmt" - "github.com/go-foreman/foreman/saga/mutex" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "math/rand" "testing" "time" + + "github.com/go-foreman/foreman/saga/mutex" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func testSQLMutexUseCases(t *testing.T, mutexFabric func() mutex.Mutex, dbConnection *sql.DB) { sqlMutex := mutexFabric() t.Run("acquire and release a mutex sequentially", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second * 10) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() id := "xxx" @@ -29,7 +30,7 @@ func testSQLMutexUseCases(t *testing.T, mutexFabric func() mutex.Mutex, dbConnec }) t.Run("wait to acquire locked mutex", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second * 1) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() releaseCh := make(chan struct{}) @@ -45,12 +46,12 @@ func testSQLMutexUseCases(t *testing.T, mutexFabric func() mutex.Mutex, dbConnec //this goroutine should wait until first lock is release go func() { time.Sleep(time.Millisecond * 150) - waitingCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond * 200) + waitingCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200) defer cancel() assert.NoError(t, sqlMutex.Lock(waitingCtx, id)) select { - case <- ctx.Done(): + case <-ctx.Done(): return case releaseCh <- struct{}{}: return @@ -58,14 +59,14 @@ func testSQLMutexUseCases(t *testing.T, mutexFabric func() mutex.Mutex, dbConnec }() select { - case <- ctx.Done(): + case <-ctx.Done(): t.FailNow() - case <- releaseCh: + case <-releaseCh: } }) t.Run("wait to acquire locked mutex from another service instance", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second * 100) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*100) defer cancel() releaseCh := make(chan struct{}) @@ -80,12 +81,12 @@ func testSQLMutexUseCases(t *testing.T, mutexFabric func() mutex.Mutex, dbConnec //this goroutine should wait until first lock is release go func() { - waitingCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond * 200) + waitingCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200) defer cancel() anotherInstanceMutex := mutexFabric() assert.NoError(t, anotherInstanceMutex.Lock(waitingCtx, id)) select { - case <- ctx.Done(): + case <-ctx.Done(): return case releaseCh <- struct{}{}: return @@ -93,14 +94,14 @@ func testSQLMutexUseCases(t *testing.T, mutexFabric func() mutex.Mutex, dbConnec }() select { - case <- ctx.Done(): + case <-ctx.Done(): t.FailNow() - case <- releaseCh: + case <-releaseCh: } }) t.Run("failed to acquire a lock", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second * 5) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() releaseCh := make(chan struct{}) @@ -110,14 +111,14 @@ func testSQLMutexUseCases(t *testing.T, mutexFabric func() mutex.Mutex, dbConnec //this goroutine should wait until first lock is release go func() { - waitingCtx, cancel := context.WithTimeout(ctx, time.Millisecond * 200) + waitingCtx, cancel := context.WithTimeout(ctx, time.Millisecond*200) defer cancel() err := sqlMutex.Lock(waitingCtx, id) assert.Error(t, err) assert.Contains(t, err.Error(), fmt.Sprintf("acquiring lock for saga %s", id)) select { - case <- ctx.Done(): + case <-ctx.Done(): return case releaseCh <- struct{}{}: return @@ -125,16 +126,16 @@ func testSQLMutexUseCases(t *testing.T, mutexFabric func() mutex.Mutex, dbConnec }() select { - case <- time.After(time.Millisecond * 500): + case <-time.After(time.Millisecond * 500): t.FailNow() - case <- releaseCh: + case <-releaseCh: } assert.NoError(t, sqlMutex.Release(ctx, id)) }) t.Run("release not existing lock", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second * 5) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() id := "bbb" @@ -151,7 +152,7 @@ func testSQLMutexUseCases(t *testing.T, mutexFabric func() mutex.Mutex, dbConnec rand.Seed(time.Now().UnixNano()) locksCount := 100 - ctx, cancel := context.WithTimeout(context.Background(), time.Second * 60) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) defer cancel() locksIds := make(chan string) @@ -160,14 +161,14 @@ func testSQLMutexUseCases(t *testing.T, mutexFabric func() mutex.Mutex, dbConnec go func() { for id := range locksIds { go func(lockedId string) { - time.Sleep(time.Duration(rand.Intn(500-100) + 100) * time.Millisecond) - releaseCtx, cancel := context.WithTimeout(context.Background(), time.Second * 5) + time.Sleep(time.Duration(rand.Intn(500-100)+100) * time.Millisecond) + releaseCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() assert.NoError(t, sqlMutex.Release(releaseCtx, lockedId)) }(id) } - doneCh <-struct {}{} + doneCh <- struct{}{} }() for i := 0; i < locksCount; i++ { @@ -179,9 +180,9 @@ func testSQLMutexUseCases(t *testing.T, mutexFabric func() mutex.Mutex, dbConnec close(locksIds) select { - case <- ctx.Done(): + case <-ctx.Done(): t.FailNow() - case <- doneCh: + case <-doneCh: return } }) diff --git a/testing/integration/saga/mutex/mysqlmutex_test.go b/testing/integration/saga/mutex/mysqlmutex_test.go index bbe8ad6..aa1b933 100644 --- a/testing/integration/saga/mutex/mysqlmutex_test.go +++ b/testing/integration/saga/mutex/mysqlmutex_test.go @@ -3,13 +3,14 @@ package mutex import ( "context" "database/sql" + "testing" + "time" + "github.com/go-foreman/foreman/saga" "github.com/go-foreman/foreman/saga/mutex" intSuite "github.com/go-foreman/foreman/testing/integration/saga/suite" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" - "testing" - "time" ) type mysqlMutexTest struct { @@ -29,7 +30,7 @@ func (m *mysqlMutexTest) TestMysqlMutexStore() { testSQLMutexUseCases(t, m.createMutexService, m.Connection()) t.Run("manually fail to release already released lock", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second * 10) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() id := "555" diff --git a/testing/integration/saga/mutex/pgmutex_test.go b/testing/integration/saga/mutex/pgmutex_test.go index 0720a41..d5271a8 100644 --- a/testing/integration/saga/mutex/pgmutex_test.go +++ b/testing/integration/saga/mutex/pgmutex_test.go @@ -2,13 +2,14 @@ package mutex import ( "context" + "testing" + "time" + "github.com/go-foreman/foreman/saga" "github.com/go-foreman/foreman/saga/mutex" intSuite "github.com/go-foreman/foreman/testing/integration/saga/suite" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" - "testing" - "time" ) type pgMutexTest struct { @@ -28,7 +29,7 @@ func (m *pgMutexTest) TestMutexStore() { testSQLMutexUseCases(t, m.createMutexService, m.Connection()) t.Run("manually fail to release already released lock", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second * 10) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() id := "555" diff --git a/testing/integration/saga/mysqlstore_test.go b/testing/integration/saga/mysqlstore_test.go index 190b68b..cba2883 100644 --- a/testing/integration/saga/mysqlstore_test.go +++ b/testing/integration/saga/mysqlstore_test.go @@ -4,6 +4,10 @@ import ( "context" "database/sql" "fmt" + "strings" + "testing" + "time" + "github.com/go-foreman/foreman/pubsub/message" "github.com/go-foreman/foreman/runtime/scheme" "github.com/go-foreman/foreman/saga" @@ -12,12 +16,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "strings" - "testing" - "time" ) -const( +const ( testGroup scheme.Group = "testgroup" ) @@ -46,20 +47,20 @@ func (m *mysqlStoreTest) TestMysqlStore() { func testSQLStoreUseCases(t *testing.T, store saga.Store, schemeRegistry scheme.KnownTypesRegistry, dbConnection *sql.DB) { t.Run("initialized store tables", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second * 30) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() - res, err := dbConnection.QueryContext(ctx,"SELECT * from saga") + res, err := dbConnection.QueryContext(ctx, "SELECT * from saga") require.NoError(t, err) require.NotNil(t, res) - require.NoError(t, res.Close()) - res, err = dbConnection.QueryContext(ctx,"SELECT * from saga_history") + require.NoError(t, res.Close()) //nolint:sqlclosecheck + res, err = dbConnection.QueryContext(ctx, "SELECT * from saga_history") require.NoError(t, err) - require.NoError(t, res.Close()) + require.NoError(t, res.Close()) //nolint:sqlclosecheck }) t.Run("create saga instance", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second * 30) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() workflowSaga := &WorkflowSaga{Field: "field", Value: "value"} sagaInstance := saga.NewSagaInstance(uuid.New().String(), "", workflowSaga) @@ -73,7 +74,7 @@ func testSQLStoreUseCases(t *testing.T, store saga.Store, schemeRegistry scheme. }) t.Run("delete saga instance", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second * 30) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() workflowSaga := &WorkflowSaga{Field: "field", Value: "value"} sagaInstance := saga.NewSagaInstance(uuid.New().String(), "", workflowSaga) @@ -86,7 +87,7 @@ func testSQLStoreUseCases(t *testing.T, store saga.Store, schemeRegistry scheme. //this test copies "Create saga instance test" because we don't use fixtures for testing right now and need a way to put records into db t.Run("find saga instance by id", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second * 30) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() workflowSaga := &WorkflowSaga{Field: "field", Value: "value"} @@ -111,7 +112,7 @@ func testSQLStoreUseCases(t *testing.T, store saga.Store, schemeRegistry scheme. }) t.Run("update saga instance", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second * 30) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() workflowSaga := &WorkflowSaga{Field: "field", Value: "value"} @@ -164,7 +165,7 @@ func testSQLStoreUseCases(t *testing.T, store saga.Store, schemeRegistry scheme. }) t.Run("find saga instance by filter", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second * 30) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() anotherSaga := &FilterSaga{WorkFlow: WorkflowSaga{ diff --git a/testing/integration/saga/pgstore_test.go b/testing/integration/saga/pgstore_test.go index 4d0ea6c..0e908e1 100644 --- a/testing/integration/saga/pgstore_test.go +++ b/testing/integration/saga/pgstore_test.go @@ -1,13 +1,14 @@ package saga import ( + "testing" + "github.com/go-foreman/foreman/pubsub/message" "github.com/go-foreman/foreman/runtime/scheme" "github.com/go-foreman/foreman/saga" intSuite "github.com/go-foreman/foreman/testing/integration/saga/suite" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "testing" ) type pgStoreTest struct { diff --git a/testing/integration/saga/suite/mysqlsuite.go b/testing/integration/saga/suite/mysqlsuite.go index d81aba5..fb65075 100644 --- a/testing/integration/saga/suite/mysqlsuite.go +++ b/testing/integration/saga/suite/mysqlsuite.go @@ -3,21 +3,22 @@ package suite import ( "database/sql" "fmt" + "os" + driverSql "github.com/go-sql-driver/mysql" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "os" ) // MysqlSuite struct for MySQL Suite type MysqlSuite struct { suite.Suite - dbConn *sql.DB + dbConn *sql.DB } // SetupSuite setup at the beginning of test func (s *MysqlSuite) SetupSuite() { - disableLogging() + s.disableLogging() connectionStr := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8&parseTime=True", "foreman", "foreman", "127.0.0.1:3306", "foreman") @@ -44,12 +45,12 @@ func (s *MysqlSuite) TearDownSuite() { require.NoError(s.T(), s.dbConn.Close()) } -func disableLogging() { +func (s *MysqlSuite) disableLogging() { nopLogger := NopLogger{} - driverSql.SetLogger(nopLogger) + require.NoError(s.T(), driverSql.SetLogger(nopLogger)) } type NopLogger struct { } -func (l NopLogger) Print(v ...interface{}) {} \ No newline at end of file +func (l NopLogger) Print(v ...interface{}) {} diff --git a/testing/integration/saga/suite/pgsuite.go b/testing/integration/saga/suite/pgsuite.go index f096848..0df4fc9 100644 --- a/testing/integration/saga/suite/pgsuite.go +++ b/testing/integration/saga/suite/pgsuite.go @@ -4,11 +4,12 @@ import ( "context" "database/sql" "fmt" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" "os" "time" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + _ "github.com/jackc/pgx/v4/stdlib" ) @@ -26,7 +27,7 @@ func (s *PgSuite) SetupSuite() { connectionStr = v } - ctx, cancel := context.WithTimeout(context.Background(), time.Second * 10) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() var err error @@ -42,7 +43,7 @@ func (s *PgSuite) Connection() *sql.DB { // TearDownSuite teardown at the end of test func (s *PgSuite) TearDownSuite() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second * 10) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() res, err := s.dbConn.ExecContext(ctx, "DROP TABLE IF EXISTS saga_history, saga;")