Skip to content
This repository has been archived by the owner on Jun 12, 2024. It is now read-only.

Commit

Permalink
Close heartbeats in the sql task handler (#55)
Browse files Browse the repository at this point in the history
**What**
- Ensure that the task actually ends by closing the heartbeats channel.
  This was not seen during previous testing because the test code closed
  the channel. However, when deployed, the task never finished.

Signed-off-by: Lucas Roesler <[email protected]>
  • Loading branch information
LucasRoesler authored Sep 1, 2020
1 parent ca6a1f6 commit 40308bb
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pkg/queue/workers/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

cdb "github.com/contiamo/go-base/pkg/db"
"go.uber.org/goleak"

"github.com/Masterminds/squirrel"
dbtest "github.com/contiamo/go-base/pkg/db/test"
Expand All @@ -25,6 +26,8 @@ import (
var emptyJSON = []byte("{}")

func TestRetentionHandler(t *testing.T) {
defer goleak.VerifyNone(t)

logrus.SetOutput(ioutil.Discard)
defer logrus.SetOutput(os.Stdout)

Expand Down Expand Up @@ -151,7 +154,6 @@ func TestRetentionHandler(t *testing.T) {
// wait a moment for the final heartbeat and then close the channel to avoid
// goroutine leak errors
time.Sleep(time.Second)
close(heartbeats)

require.Equal(t, SQLTaskProgress{}, seenBeats[0])

Expand Down
1 change: 1 addition & 0 deletions pkg/queue/workers/sql_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (h *sqlTaskHandler) Process(ctx context.Context, task queue.Task, heartbeat
span, ctx := h.StartSpan(ctx, "Process")
defer func() {
h.FinishSpan(span, err)
close(heartbeats)
}()
span.SetTag("task.id", task.ID)
span.SetTag("task.queue", task.Queue)
Expand Down

0 comments on commit 40308bb

Please sign in to comment.