diff --git a/scheduler-jobs/README.md b/scheduler-jobs/README.md index d3749fab..6056cb55 100644 --- a/scheduler-jobs/README.md +++ b/scheduler-jobs/README.md @@ -1,17 +1,28 @@ -# How-To Run Locally: +# Scheduler Jobs -To run locally as a process: `go run scheduler-jobs.go`, assuming you have a scheduler and dapr running accordingly (see below). +## Overview -Run Scheduler: -![scheduler_debugger_config.png](scheduler_debugger_config.png) +- schedule 100 oneshot jobs indefinitely (repeat = 1) +- schedule 100 indefinite jobs indefinitely (repeat not set, trigger every 30s) +- schedule repeat-job job indefinitely (repeat = 5, trigger every 1s due immediately) +- indefinitely schedule and delete a create-delete-job job (repeat = 1, trigger every 1s) -Run Dapr sidecar: -![sidecar_debugger_config.png](sidecar_debugger_config.png) +## How-To Run Locally: + +Run with dapr: +```shell +dapr run \ + --app-id scheduler-jobs \ + --app-port 8383 \ + --dapr-grpc-port 3501 --app-protocol grpc \ + --dapr-http-port 3500 --scheduler-host-address=127.0.0.1:50006 --app-channel-address=127.0.0.1 \ + -- go run scheduler-jobs.go +``` To run locally as a container: ```shell docker build -t scheduler-jobs . -docker run -p 3006:3006 --name scheduler-jobs scheduler-jobs # optionally add -d to run in background +docker run -p 8383:8383 --name scheduler-jobs scheduler-jobs # optionally add -d to run in background # check container is running docker ps ``` \ No newline at end of file diff --git a/scheduler-jobs/scheduler-jobs.go b/scheduler-jobs/scheduler-jobs.go index 448e9f77..be224c2e 100644 --- a/scheduler-jobs/scheduler-jobs.go +++ b/scheduler-jobs/scheduler-jobs.go @@ -19,8 +19,7 @@ import ( "github.com/dapr/go-sdk/client" ) -const appPort = ":3006" -const daprGRPCPort = "3501" +const appPort = ":8383" var oneshot atomic.Int64 var indefinite atomic.Int64 @@ -121,8 +120,15 @@ func (s *appServer) OnJobEventAlpha1(ctx context.Context, in *rtv1.JobEventReque return nil, nil } -func scheduleOneshotJobs(daprClient client.Client) { +func scheduleOneshotJobs(ctx context.Context, daprClient client.Client) { for i := 0; i < 100; i++ { + select { + case <-ctx.Done(): + log.Println("context canceled, stopping scheduleOneshotJobs.") + return + default: + } + jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second) name := "oneshot-job-" + strconv.Itoa(i) req := &client.Job{ Name: name, @@ -132,15 +138,23 @@ func scheduleOneshotJobs(daprClient client.Client) { TTL: "40s", Data: nil, } - err := daprClient.ScheduleJobAlpha1(context.Background(), req) + err := daprClient.ScheduleJobAlpha1(jobCtx, req) + cancel() if err != nil { log.Printf("Error scheduling oneshot job '%s': %s\n", name, err) } } } -func scheduleIndefiniteJobs(daprClient client.Client) { +func scheduleIndefiniteJobs(ctx context.Context, daprClient client.Client) { for i := 0; i < 100; i++ { + select { + case <-ctx.Done(): + log.Println("context canceled, stopping scheduleOneshotJobs.") + return + default: + } + jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second) name := "indefinite-job-" + strconv.Itoa(i) req := &client.Job{ Name: name, @@ -149,14 +163,15 @@ func scheduleIndefiniteJobs(daprClient client.Client) { TTL: "40s", Data: nil, } - err := daprClient.ScheduleJobAlpha1(context.Background(), req) + err := daprClient.ScheduleJobAlpha1(jobCtx, req) + cancel() if err != nil { log.Printf("Error scheduling indefinite job '%s': %s\n", name, err) } } } -func scheduleRepeatedJob(daprClient client.Client) { +func scheduleRepeatedJob(ctx context.Context, daprClient client.Client) { name := "repeat-job" req := &client.Job{ Name: name, @@ -166,13 +181,15 @@ func scheduleRepeatedJob(daprClient client.Client) { TTL: "10s", Data: nil, } - err := daprClient.ScheduleJobAlpha1(context.Background(), req) + jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + err := daprClient.ScheduleJobAlpha1(jobCtx, req) if err != nil { log.Printf("Error scheduling repeat job '%s': %s\n", name, err) } } -func scheduleSingleJob(daprClient client.Client) { +func scheduleSingleJob(ctx context.Context, daprClient client.Client) { name := "create-delete-job" req := &client.Job{ Name: name, @@ -182,15 +199,19 @@ func scheduleSingleJob(daprClient client.Client) { TTL: "3s", Data: nil, } - err := daprClient.ScheduleJobAlpha1(context.Background(), req) + jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + err := daprClient.ScheduleJobAlpha1(jobCtx, req) if err != nil { log.Printf("Error scheduling single job '%s': %s\n", name, err) } } -func deleteSingleJob(daprClient client.Client) { +func deleteSingleJob(ctx context.Context, daprClient client.Client) { name := "create-delete-job" - err := daprClient.DeleteJobAlpha1(context.Background(), name) + jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + err := daprClient.DeleteJobAlpha1(jobCtx, name) if err != nil { log.Printf("Error deleting single job '%s': %s\n", name, err) } @@ -209,8 +230,7 @@ func main() { } }(ctx) - // --dapr-grpc-port=3501 - daprClient, err := client.NewClientWithPort(daprGRPCPort) + daprClient, err := client.NewClient() if err != nil { log.Fatalf("Error getting dapr client: %v", err) } @@ -222,46 +242,54 @@ func main() { time.Sleep(5 * time.Second) // Schedule initial batch of jobs - go scheduleOneshotJobs(daprClient) - go scheduleIndefiniteJobs(daprClient) - go scheduleRepeatedJob(daprClient) + go scheduleOneshotJobs(ctx, daprClient) + go scheduleIndefiniteJobs(ctx, daprClient) + go scheduleRepeatedJob(ctx, daprClient) // Schedule additional oneshot jobs once 100 are triggered go func(ctx context.Context) { - select { - case <-ctx.Done(): - return - case <-oneshotDoneCh: - log.Println("Received input that maxOneshotTriggerCount was reached. Sleeping...") - time.Sleep(10 * time.Second) - log.Println("Scheduling next batch of oneshot jobs...") - go scheduleOneshotJobs(daprClient) + for { + select { + case <-ctx.Done(): + log.Println("context canceled, stopping oneshot scheduling goroutine.") + return + case <-oneshotDoneCh: + log.Println("Received input that maxOneshotTriggerCount was reached. Sleeping...") + time.Sleep(10 * time.Second) + log.Println("Scheduling next batch of oneshot jobs...") + go scheduleOneshotJobs(ctx, daprClient) + } } }(ctx) // Schedule additional indefinite jobs once 100 are triggered go func(ctx context.Context) { - select { - case <-ctx.Done(): - return - case <-indefiniteDoneCh: - log.Println("Received input that maxIndefiniteTriggerCount was reached. Sleeping...") - time.Sleep(10 * time.Second) - log.Println("Scheduling next batch of indefinite jobs...") - go scheduleIndefiniteJobs(daprClient) + for { + select { + case <-ctx.Done(): + log.Println("context canceled, stopping indefinite scheduling goroutine.") + return + case <-indefiniteDoneCh: + log.Println("Received input that maxIndefiniteTriggerCount was reached. Sleeping...") + time.Sleep(10 * time.Second) + log.Println("Scheduling next batch of indefinite jobs...") + go scheduleIndefiniteJobs(ctx, daprClient) + } } }(ctx) - // Schedule job to trigger immediately every second for 1s + // Schedule job to trigger immediately every second for 1s for 5 times max (repeats) go func(ctx context.Context) { - select { - case <-ctx.Done(): - return - case <-repeatDoneCh: - log.Println("Received input that maxRepeatTriggerCount was reached. Sleeping...") - time.Sleep(60 * time.Second) - log.Println("Scheduling next repeated job...") - go scheduleRepeatedJob(daprClient) + for { + select { + case <-ctx.Done(): + return + case <-repeatDoneCh: + log.Println("Received input that maxRepeatTriggerCount was reached. Sleeping...") + time.Sleep(60 * time.Second) + log.Println("Scheduling next repeated job...") + go scheduleRepeatedJob(ctx, daprClient) + } } }(ctx) @@ -274,13 +302,13 @@ func main() { case <-receivedSingleJobDoneCh: log.Println("Received input that the create-delete-job triggered, now deleting the job...") // received triggered job, now delete it & set atomic int to 0 - deleteSingleJob(daprClient) + deleteSingleJob(ctx, daprClient) log.Println("Successfully deleted create-delete-job.") } } }(ctx) - go scheduleSingleJob(daprClient) + go scheduleSingleJob(ctx, daprClient) // Reschedule the create-delete job after deletion, ensure triggers once go func(ctx context.Context) { @@ -292,7 +320,7 @@ func main() { log.Println("Received input that the create-delete-job was deleted. Sleeping for 5 seconds...") time.Sleep(5 * time.Second) log.Println("Scheduling create-delete-job...") - scheduleSingleJob(daprClient) + scheduleSingleJob(ctx, daprClient) log.Println("Successfully scheduled create-delete-job.") } } diff --git a/scheduler-jobs/scheduler_debugger_config.png b/scheduler-jobs/scheduler_debugger_config.png deleted file mode 100644 index 93489f51..00000000 Binary files a/scheduler-jobs/scheduler_debugger_config.png and /dev/null differ diff --git a/scheduler-jobs/sidecar_debugger_config.png b/scheduler-jobs/sidecar_debugger_config.png deleted file mode 100644 index 5476b16b..00000000 Binary files a/scheduler-jobs/sidecar_debugger_config.png and /dev/null differ