Skip to content

Commit

Permalink
feat: add sentry integration to workers (#46)
Browse files Browse the repository at this point in the history
* feat: add sentry integration to worker

* docs: add sentry alerter docs for worker
  • Loading branch information
abelanger5 authored Jan 2, 2024
1 parent f60ccbb commit 373b9f4
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 0 deletions.
137 changes: 137 additions & 0 deletions examples/errors-test/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package main

import (
"context"
"fmt"
"os"
"time"

"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
"github.com/hatchet-dev/hatchet/pkg/errors/sentry"
"github.com/hatchet-dev/hatchet/pkg/worker"
"github.com/joho/godotenv"
)

type userCreateEvent struct {
Username string `json:"username"`
UserId string `json:"user_id"`
Data map[string]string `json:"data"`
}

type stepOneOutput struct {
Message string `json:"message"`
}

func StepOne(ctx context.Context, input *userCreateEvent) (result *stepOneOutput, err error) {
return nil, fmt.Errorf("this is an error")
}

func main() {
err := godotenv.Load()

if err != nil {
panic(err)
}

client, err := client.New()

if err != nil {
panic(err)
}

sentryAlerter, err := sentry.NewSentryAlerter(&sentry.SentryAlerterOpts{
DSN: os.Getenv("SENTRY_DSN"),
Environment: os.Getenv("SENTRY_ENVIRONMENT"),
})

if err != nil {
panic(err)
}

// Create a worker. This automatically reads in a TemporalClient from .env and workflow files from the .hatchet
// directory, but this can be customized with the `worker.WithTemporalClient` and `worker.WithWorkflowFiles` options.
w, err := worker.NewWorker(
worker.WithClient(
client,
),
worker.WithErrorAlerter(sentryAlerter),
)

if err != nil {
panic(err)
}

err = w.On(worker.Event("user:create"), &worker.WorkflowJob{
Name: "failing-workflow",
Description: "This is a failing workflow.",
Steps: []worker.WorkflowStep{
{
Function: StepOne,
},
},
})

if err != nil {
panic(err)
}

// err = worker.RegisterAction("echo:echo", func(ctx context.Context, input *actionInput) (result any, err error) {
// return map[string]interface{}{
// "message": input.Message,
// }, nil
// })

// if err != nil {
// panic(err)
// }

// err = worker.RegisterAction("echo:object", func(ctx context.Context, input *actionInput) (result any, err error) {
// return nil, nil
// })

// if err != nil {
// panic(err)
// }

interruptCtx, cancel := cmdutils.InterruptContextFromChan(cmdutils.InterruptChan())
defer cancel()

go func() {
err = w.Start(interruptCtx)

if err != nil {
panic(err)
}

cancel()
}()

testEvent := userCreateEvent{
Username: "echo-test",
UserId: "1234",
Data: map[string]string{
"test": "test",
},
}

// push an event
err = client.Event().Push(
context.Background(),
"user:create",
testEvent,
)

if err != nil {
panic(err)
}

for {
select {
case <-interruptCtx.Done():
return
default:
time.Sleep(time.Second)
}
}
}
53 changes: 53 additions & 0 deletions frontend/docs/pages/go-sdk/creating-a-worker.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,56 @@ The client to use to communicate with the Hatchet instance. This is required.
### `worker.WithName`

The name of the worker. This is used to identify the worker in the Hatchet UI.

### `worker.WithErrorAlerter`

Use this option to set up an external error alerter, such as [Sentry](https://sentry.io/).

#### Sentry Alerter

You can use the built-in Sentry alerter via the `"github.com/hatchet-dev/hatchet/pkg/errors/sentry"` package.

```go

import (
"os"

"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/errors/sentry"
"github.com/hatchet-dev/hatchet/pkg/worker"
)

func main() {
// client initialization code...

// create the sentry alerter
sentryAlerter, err := sentry.NewSentryAlerter(&sentry.SentryAlerterOpts{
DSN: os.Getenv("SENTRY_DSN"),
Environment: os.Getenv("SENTRY_ENVIRONMENT"),
})

if err != nil {
panic(err)
}

// Create a worker. This automatically reads in a TemporalClient from .env and workflow files from the .hatchet
// directory, but this can be customized with the `worker.WithTemporalClient` and `worker.WithWorkflowFiles` options.
w, err := worker.NewWorker(
worker.WithClient(
client,
),
// call the `WithErrorAlerter` method to set up the sentry alerter
worker.WithErrorAlerter(sentryAlerter),
)
}
```

#### Custom Alerters

The `ErrorAlerter` needs to satisfy the following interface:

```go
type Alerter interface {
SendAlert(ctx context.Context, err error, data map[string]interface{})
}
```
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ require (
require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/getsentry/sentry-go v0.25.0
github.com/go-chi/chi v1.5.5
github.com/go-co-op/gocron v1.36.0
github.com/go-playground/validator/v10 v10.16.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q
github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA=
github.com/getkin/kin-openapi v0.122.0 h1:WB9Jbl0Hp/T79/JF9xlSW5Kl9uYdk/AWD0yAd9HOM10=
github.com/getkin/kin-openapi v0.122.0/go.mod h1:PCWw/lfBrJY4HcdqE3jj+QFkaFK8ABoqo7PvqVhXXqw=
github.com/getsentry/sentry-go v0.25.0 h1:q6Eo+hS+yoJlTO3uu/azhQadsD8V+jQn2D8VvX1eOyI=
github.com/getsentry/sentry-go v0.25.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
github.com/go-chi/chi v1.5.5 h1:vOB/HbEMt9QqBqErz07QehcOKHaWFtuj87tTDVz2qXE=
github.com/go-chi/chi v1.5.5/go.mod h1:C9JqLr3tIYjDOZpzn+BCuxY8z8vmca43EeMgyZt7irw=
github.com/go-co-op/gocron v1.36.0 h1:sEmAwg57l4JWQgzaVWYfKZ+w13uHOqeOtwjo72Ll5Wc=
Expand Down
53 changes: 53 additions & 0 deletions pkg/errors/sentry/sentry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package sentry

import (
"context"
"fmt"

"github.com/getsentry/sentry-go"
)

type SentryAlerter struct {
client *sentry.Client
}

func noIntegrations(ints []sentry.Integration) []sentry.Integration {
return []sentry.Integration{}
}

type SentryAlerterOpts struct {
DSN string
Environment string
}

func NewSentryAlerter(opts *SentryAlerterOpts) (*SentryAlerter, error) {
sentryClient, err := sentry.NewClient(sentry.ClientOptions{
Dsn: opts.DSN,
AttachStacktrace: true,
Integrations: noIntegrations,
Environment: opts.Environment,
})
if err != nil {
return nil, err
}

return &SentryAlerter{
client: sentryClient,
}, nil
}

func (s *SentryAlerter) SendAlert(ctx context.Context, err error, data map[string]interface{}) {
scope := sentry.NewScope()

for key, val := range data {
scope.SetTag(key, fmt.Sprintf("%v", val))
}

s.client.CaptureException(
err,
&sentry.EventHint{
Data: data,
},
scope,
)
}
20 changes: 20 additions & 0 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/errors"
"github.com/hatchet-dev/hatchet/pkg/integrations"
"github.com/rs/zerolog"
)
Expand Down Expand Up @@ -57,6 +58,8 @@ type Worker struct {
cancelMap sync.Map

services sync.Map

alerter errors.Alerter
}

type WorkerOpt func(*WorkerOpts)
Expand All @@ -67,6 +70,7 @@ type WorkerOpts struct {
l *zerolog.Logger

integrations []integrations.Integration
alerter errors.Alerter
}

func defaultWorkerOpts() *WorkerOpts {
Expand All @@ -76,6 +80,7 @@ func defaultWorkerOpts() *WorkerOpts {
name: getHostName(),
l: &logger,
integrations: []integrations.Integration{},
alerter: errors.NoOpAlerter{},
}
}

Expand All @@ -97,6 +102,12 @@ func WithIntegration(integration integrations.Integration) WorkerOpt {
}
}

func WithErrorAlerter(alerter errors.Alerter) WorkerOpt {
return func(opts *WorkerOpts) {
opts.alerter = alerter
}
}

// NewWorker creates a new worker instance
func NewWorker(fs ...WorkerOpt) (*Worker, error) {
opts := defaultWorkerOpts()
Expand All @@ -110,6 +121,7 @@ func NewWorker(fs ...WorkerOpt) (*Worker, error) {
name: opts.name,
l: opts.l,
actions: map[string]Action{},
alerter: opts.alerter,
}

// register all integrations
Expand Down Expand Up @@ -309,6 +321,14 @@ func (w *Worker) startStepRun(ctx context.Context, assignedAction *client.Action
if err != nil {
failureEvent := w.getActionEvent(assignedAction, client.ActionEventTypeFailed)

w.alerter.SendAlert(context.Background(), err, map[string]interface{}{
"actionId": assignedAction.ActionId,
"workerId": assignedAction.WorkerId,
"stepRunId": assignedAction.StepRunId,
"jobName": assignedAction.JobName,
"actionType": assignedAction.ActionType,
})

failureEvent.EventPayload = err.Error()

_, err := w.client.Dispatcher().SendActionEvent(
Expand Down

0 comments on commit 373b9f4

Please sign in to comment.