Skip to content

Commit

Permalink
feat: support maxRuns parameter on workers (#195)
Browse files Browse the repository at this point in the history
* feat: round robin queueing

* feat: configurable max runs per worker

* fix: address PR review

* docs for max runs and group round robin
  • Loading branch information
abelanger5 authored Feb 26, 2024
1 parent 2d625fe commit 6ea38a9
Show file tree
Hide file tree
Showing 46 changed files with 659 additions and 373 deletions.
3 changes: 3 additions & 0 deletions api-contracts/dispatcher/dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ message WorkerRegisterRequest {

// (optional) the services for this worker
repeated string services = 3;

// (optional) the max number of runs this worker can handle
optional int32 maxRuns = 4;
}

message WorkerRegisterResponse {
Expand Down
5 changes: 3 additions & 2 deletions api/v1/server/handlers/step-runs/rerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ func (t *StepRunService) StepRunUpdateRerun(ctx echo.Context, request gen.StepRu
// preflight check to make sure there's at least one worker to serve this request
action := stepRun.Step().ActionID

tenSecAgo := time.Now().Add(-10 * time.Second)
sixSecAgo := time.Now().Add(-6 * time.Second)

workers, err := t.config.Repository.Worker().ListWorkers(tenant.ID, &repository.ListWorkersOpts{
Action: &action,
LastHeartbeatAfter: &tenSecAgo,
LastHeartbeatAfter: &sixSecAgo,
Assignable: repository.BoolPtr(true),
})

if err != nil || len(workers) == 0 {
Expand Down
10 changes: 8 additions & 2 deletions api/v1/server/handlers/workers/list.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package workers

import (
"time"

"github.com/labstack/echo/v4"

"github.com/hatchet-dev/hatchet/api/v1/server/oas/gen"
Expand All @@ -12,7 +14,11 @@ import (
func (t *WorkerService) WorkerList(ctx echo.Context, request gen.WorkerListRequestObject) (gen.WorkerListResponseObject, error) {
tenant := ctx.Get("tenant").(*db.TenantModel)

workers, err := t.config.Repository.Worker().ListWorkers(tenant.ID, &repository.ListWorkersOpts{})
sixSecAgo := time.Now().Add(-6 * time.Second)

workers, err := t.config.Repository.Worker().ListWorkers(tenant.ID, &repository.ListWorkersOpts{
LastHeartbeatAfter: &sixSecAgo,
})

if err != nil {
return nil, err
Expand All @@ -22,7 +28,7 @@ func (t *WorkerService) WorkerList(ctx echo.Context, request gen.WorkerListReque

for i, worker := range workers {
workerCp := worker
rows[i] = *transformers.ToWorker(workerCp.Worker)
rows[i] = *transformers.ToWorkerSqlc(&workerCp.Worker)
}

return gen.WorkerList200JSONResponse(
Expand Down
14 changes: 14 additions & 0 deletions api/v1/server/oas/transformers/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package transformers
import (
"github.com/hatchet-dev/hatchet/api/v1/server/oas/gen"
"github.com/hatchet-dev/hatchet/internal/repository/prisma/db"
"github.com/hatchet-dev/hatchet/internal/repository/prisma/dbsqlc"
)

func ToWorker(worker *db.WorkerModel) *gen.Worker {
Expand All @@ -29,3 +30,16 @@ func ToWorker(worker *db.WorkerModel) *gen.Worker {

return res
}

func ToWorkerSqlc(worker *dbsqlc.Worker) *gen.Worker {
res := &gen.Worker{
Metadata: *toAPIMetadata(pgUUIDToStr(worker.ID), worker.CreatedAt.Time, worker.UpdatedAt.Time),
Name: worker.Name,
}

if !worker.LastHeartbeatAt.Time.IsZero() {
res.LastHeartbeatAt = &worker.LastHeartbeatAt.Time
}

return res
}
1 change: 1 addition & 0 deletions examples/dag/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func run(ch <-chan interface{}, events chan<- string) error {
worker.WithClient(
c,
),
worker.WithMaxRuns(1),
)
if err != nil {
return fmt.Errorf("error creating worker: %w", err)
Expand Down
4 changes: 4 additions & 0 deletions frontend/docs/pages/home/go-sdk/creating-a-worker.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ The client to use to communicate with the Hatchet instance. This is required.

The name of the worker. This is used to identify the worker in the Hatchet UI.

### `worker.WithMaxRuns`

The maximum number of runs the worker can process simultaneously.

### `worker.WithErrorAlerter`

Use this option to set up an external error alerter, such as [Sentry](https://sentry.io/).
Expand Down
2 changes: 1 addition & 1 deletion frontend/docs/pages/home/python-sdk/creating-a-worker.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ worker.start()
The `hatchet.worker()` method takes a number of options:

- `name`: The name of the worker. This is used to identify the worker in the Hatchet UI.
- `max_threads`: The maximum number of threads to use for the worker. Defaults to 200.
- `max_runs`: The maximum number of concurrent runs that the worker can run. Defaults to `None` (unlimited runs).
- `debug`: Whether to enable debug logging. Defaults to `False`.
- `handle_kill`: Whether to call `sys.exit()` when the worker receives a `SIGTERM` signal after graceful termination. Defaults to `True`.
10 changes: 10 additions & 0 deletions frontend/docs/pages/home/typescript-sdk/creating-a-worker.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,14 @@ main();

## Options

### Name

The `hatchet.worker()` method takes a simple name parameter which can be used to identify the worker on the Hatchet dashboard.

### Max Runs

The `maxRuns` option can be used to limit the number of runs a worker will process before stopping. This is particularly useful for resource-intensive workers. For example, to limit the worker to only executing 1 run at a time, you can use the following code:

```ts
hatchet.worker('example-worker', 1)
```
35 changes: 35 additions & 0 deletions frontend/docs/pages/home/typescript-sdk/creating-a-workflow.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ const workflow: Workflow = {
};
```

The argument `limitStrategy` to the `concurrency` configuration can be set to either `CANCEL_IN_PROGRESS` (the default, documented above), or `GROUP_ROUND_ROBIN`. See documentation for the `GROUP_ROUND_ROBIN` strategy below.

### Cancellation Signalling

When a concurrent workflow is already running, Hatchet will send a cancellation signal to the step via it's context. For now, you must handle this signal to exit the step at a logical point:
Expand Down Expand Up @@ -239,3 +241,36 @@ This same approach can be used for:
- Setting concurrency for a specific user session by `session_id` (i.e. multiple chat messages sent)
- Limiting data or document ingestion by setting an input hash or on-file key.
- Rudimentary fairness rules by limiting groups per tenant to a certain number of concurrent executions.

### Use-Case: Group Round Robin

You can distribute workflows fairly between tenants using the `GROUP_ROUND_ROBIN` option for `limitStrategy`. This will ensure that each distinct group gets a fair share of the concurrency limit. For example, let's say 5 workflows got queued in quick succession for keys `A`, `B`, and `C`:

```txt
A, A, A, A, A, B, B, B, B, B, C, C, C, C, C
```

If there is a maximum of 2 concurrent executions, the execution order will be:

```txt
A, B, C, A, B, C, A, B, C, A, B, C, A, B, C
```

This can be set in the `concurrency` configuration as follows:

```ts
const workflow: Workflow = {
id: 'concurrency-example-rr',
description: 'test',
on: {
event: 'concurrency:create',
},
concurrency: {
name: 'multi-tenant-fairness',
key: (ctx) => ctx.workflowInput().group,
maxRuns: 2,
limitStrategy: ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
},
steps: [...],
};
```
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ require (
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/invopop/jsonschema v0.12.0
github.com/jackc/pgx-zerolog v0.0.0-20230315001418-f978528409eb
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx-zerolog v0.0.0-20230315001418-f978528409eb h1:pSv+zRVeAYjbXRFjyytFIMRBSKWVowCi7KbXSMR/+ug=
github.com/jackc/pgx-zerolog v0.0.0-20230315001418-f978528409eb/go.mod h1:CRUuPsmIajLt3dZIlJ5+O8IDSib6y8yrst8DkCthTa4=
github.com/jackc/pgx/v5 v5.5.0 h1:NxstgwndsTRy7eq9/kqYc/BZh5w2hHJV86wjvO+1xPw=
github.com/jackc/pgx/v5 v5.5.0/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
Expand Down
2 changes: 2 additions & 0 deletions internal/config/database/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type ConfigFile struct {
Seed SeedConfigFile `mapstructure:"seed" json:"seed,omitempty"`

Logger shared.LoggerConfigFile `mapstructure:"logger" json:"logger,omitempty"`

LogQueries bool `mapstructure:"logQueries" json:"logQueries,omitempty" default:"false"`
}

type SeedConfigFile struct {
Expand Down
10 changes: 10 additions & 0 deletions internal/config/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"strings"

"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/tracelog"

pgxzero "github.com/jackc/pgx-zerolog"

"github.com/hatchet-dev/hatchet/internal/auth/cookie"
"github.com/hatchet-dev/hatchet/internal/auth/oauth"
Expand Down Expand Up @@ -130,6 +133,13 @@ func GetDatabaseConfigFromConfigFile(cf *database.ConfigFile) (res *database.Con
return nil, err
}

if cf.LogQueries {
config.ConnConfig.Tracer = &tracelog.TraceLog{
Logger: pgxzero.NewLogger(l),
LogLevel: tracelog.LogLevelDebug,
}
}

config.MaxConns = 20

pool, err := pgxpool.NewWithConfig(context.Background(), config)
Expand Down
1 change: 1 addition & 0 deletions internal/repository/prisma/dbsqlc/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/repository/prisma/dbsqlc/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ CREATE TABLE "Worker" (
"name" TEXT NOT NULL,
"status" "WorkerStatus" NOT NULL DEFAULT 'ACTIVE',
"dispatcherId" UUID NOT NULL,
"maxRuns" INTEGER,

CONSTRAINT "Worker_pkey" PRIMARY KEY ("id")
);
Expand Down
1 change: 1 addition & 0 deletions internal/repository/prisma/dbsqlc/sqlc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ sql:
- job_runs.sql
- tickers.sql
- dispatchers.sql
- workers.sql
schema:
- schema.sql
strict_order_by: false
Expand Down
34 changes: 34 additions & 0 deletions internal/repository/prisma/dbsqlc/workers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- name: ListWorkersWithStepCount :many
SELECT
sqlc.embed(workers),
COUNT(runs."id") FILTER (WHERE runs."status" = 'RUNNING') AS "runningStepRuns"
FROM
"Worker" workers
LEFT JOIN
"StepRun" AS runs ON runs."workerId" = workers."id" AND runs."status" = 'RUNNING'
WHERE
workers."tenantId" = @tenantId
AND (
sqlc.narg('actionId')::text IS NULL OR
workers."id" IN (
SELECT "_ActionToWorker"."B"
FROM "_ActionToWorker"
INNER JOIN "Action" ON "Action"."id" = "_ActionToWorker"."A"
WHERE "Action"."tenantId" = @tenantId AND "Action"."actionId" = sqlc.narg('actionId')::text
)
)
AND (
sqlc.narg('lastHeartbeatAfter')::timestamp IS NULL OR
workers."lastHeartbeatAt" > sqlc.narg('lastHeartbeatAfter')::timestamp
)
AND (
sqlc.narg('assignable')::boolean IS NULL OR
workers."maxRuns" IS NULL OR
(sqlc.narg('assignable')::boolean AND workers."maxRuns" > (
SELECT COUNT(*)
FROM "StepRun"
WHERE runs."workerId" = workers."id" AND runs."status" = 'RUNNING'
))
)
GROUP BY
workers."id";
97 changes: 97 additions & 0 deletions internal/repository/prisma/dbsqlc/workers.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/repository/prisma/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewPrismaRepository(client *db.PrismaClient, pool *pgxpool.Pool, fs ...Pris
github: NewGithubRepository(client, opts.v),
step: NewStepRepository(client, opts.v),
dispatcher: NewDispatcherRepository(client, pool, opts.v, opts.l),
worker: NewWorkerRepository(client, opts.v),
worker: NewWorkerRepository(client, pool, opts.v, opts.l),
ticker: NewTickerRepository(client, pool, opts.v, opts.l),
userSession: NewUserSessionRepository(client, opts.v),
user: NewUserRepository(client, opts.v),
Expand Down
Loading

0 comments on commit 6ea38a9

Please sign in to comment.