From 5a9a92d279e058b75b37e589e1eef2ec63f550a6 Mon Sep 17 00:00:00 2001 From: Jordan Brockopp Date: Tue, 23 May 2023 14:10:14 -0500 Subject: [PATCH] feat: add support for processing schedules (#846) * feat(api/types): add support for schedules * feat(database/types): add support for schedules * feat(database): add support for schedules * chore: update go dependencies * feat(database): add schedule engine * feat(api): add support for schedules * add routes * fix: parse entry for schedules * more wip code * add schedule allowlist * fix tests * add validation for entry * add mocks w/o updated payloads * fix issues with create * update mock responses * use schedule mocks * make linter happy * use proper func * couple more updates * fix mock pathing * enhance: switch to adhocore/gronx * chore: update go deps * goimports * yet another goimports * sigh * wildcard goimport * chore: address linter feedback * chore: save work * chore: remove new types * chore: updates for removed types * chore: update go dependencies * chore: address review feedback * chore: remove new types * feat: initial code for scheduler * chore: misc updates * chore: update go dependencies * chore: updates for local testing * chore: save work * fix: introduce jitter * chore: address review feedback * chore: address review feedback * chore: update go dependencies * chore: address review feedback * fix(scheduler): use WithCommit in compiler * chore: address review feedback --------- Co-authored-by: Jordan Sussman Co-authored-by: JordanSussman --- .gitignore | 2 +- api/build.go | 31 ++- api/build_test.go | 8 +- api/schedule/create.go | 2 +- api/schedule/delete.go | 2 +- api/schedule/get.go | 2 +- api/schedule/list.go | 2 +- api/schedule/update.go | 3 +- api/webhook.go | 10 +- cmd/vela-server/schedule.go | 381 ++++++++++++++++++++++++++++++++ cmd/vela-server/server.go | 120 +++++++--- docker-compose.yml | 2 +- go.mod | 5 +- go.sum | 9 +- scm/github/repo.go | 19 ++ scm/github/repo_test.go | 49 ++++ scm/github/testdata/branch.json | 101 +++++++++ scm/service.go | 3 + 18 files changed, 675 insertions(+), 76 deletions(-) create mode 100644 cmd/vela-server/schedule.go create mode 100644 scm/github/testdata/branch.json diff --git a/.gitignore b/.gitignore index f3be5762d..c557bc18b 100644 --- a/.gitignore +++ b/.gitignore @@ -67,4 +67,4 @@ __debug_bin .history .ionide -# End of https://www.toptal.com/developers/gitignore/api/visualstudiocode +# End of https://www.toptal.com/developers/gitignore/api/visualstudiocode \ No newline at end of file diff --git a/api/build.go b/api/build.go index fed9184da..445d4cc8a 100644 --- a/api/build.go +++ b/api/build.go @@ -14,26 +14,23 @@ import ( "strings" "time" - "github.com/go-vela/server/internal/token" - "github.com/go-vela/server/router/middleware/claims" - "github.com/go-vela/server/router/middleware/org" - + "github.com/gin-gonic/gin" "github.com/go-vela/server/compiler" "github.com/go-vela/server/database" + "github.com/go-vela/server/internal/token" "github.com/go-vela/server/queue" "github.com/go-vela/server/router/middleware/build" + "github.com/go-vela/server/router/middleware/claims" "github.com/go-vela/server/router/middleware/executors" + "github.com/go-vela/server/router/middleware/org" "github.com/go-vela/server/router/middleware/repo" "github.com/go-vela/server/router/middleware/user" "github.com/go-vela/server/scm" "github.com/go-vela/server/util" - "github.com/go-vela/types" "github.com/go-vela/types/constants" "github.com/go-vela/types/library" "github.com/go-vela/types/pipeline" - - "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" ) @@ -293,7 +290,7 @@ func CreateBuild(c *gin.Context) { r.SetPipelineType(pipelineType) // skip the build if only the init or clone steps are found - skip := skipEmptyBuild(p) + skip := SkipEmptyBuild(p) if skip != "" { // set build to successful status input.SetStatus(constants.StatusSuccess) @@ -343,7 +340,7 @@ func CreateBuild(c *gin.Context) { input.SetPipelineID(pipeline.GetID()) // create the objects from the pipeline in the database - err = planBuild(database.FromContext(c), p, input, r) + err = PlanBuild(database.FromContext(c), p, input, r) if err != nil { util.HandleError(c, http.StatusInternalServerError, err) @@ -372,7 +369,7 @@ func CreateBuild(c *gin.Context) { } // publish the build to the queue - go publishToQueue( + go PublishToQueue( queue.FromGinContext(c), database.FromContext(c), p, @@ -382,11 +379,11 @@ func CreateBuild(c *gin.Context) { ) } -// skipEmptyBuild checks if the build should be skipped due to it +// SkipEmptyBuild checks if the build should be skipped due to it // not containing any steps besides init or clone. // //nolint:goconst // ignore init and clone constants -func skipEmptyBuild(p *pipeline.Build) string { +func SkipEmptyBuild(p *pipeline.Build) string { if len(p.Stages) == 1 { if p.Stages[0].Name == "init" { return "skipping build since only init stage found" @@ -1223,7 +1220,7 @@ func RestartBuild(c *gin.Context) { r.SetPipelineType(pipelineType) // skip the build if only the init or clone steps are found - skip := skipEmptyBuild(p) + skip := SkipEmptyBuild(p) if skip != "" { // set build to successful status b.SetStatus(constants.StatusSkipped) @@ -1273,7 +1270,7 @@ func RestartBuild(c *gin.Context) { b.SetPipelineID(pipeline.GetID()) // create the objects from the pipeline in the database - err = planBuild(database.FromContext(c), p, b, r) + err = PlanBuild(database.FromContext(c), p, b, r) if err != nil { util.HandleError(c, http.StatusInternalServerError, err) @@ -1301,7 +1298,7 @@ func RestartBuild(c *gin.Context) { } // publish the build to the queue - go publishToQueue( + go PublishToQueue( queue.FromGinContext(c), database.FromContext(c), p, @@ -1568,12 +1565,12 @@ func getPRNumberFromBuild(b *library.Build) (int, error) { return strconv.Atoi(parts[2]) } -// planBuild is a helper function to plan the build for +// PlanBuild is a helper function to plan the build for // execution. This creates all resources, like steps // and services, for the build in the configured backend. // TODO: // - return build and error. -func planBuild(database database.Interface, p *pipeline.Build, b *library.Build, r *library.Repo) error { +func PlanBuild(database database.Interface, p *pipeline.Build, b *library.Build, r *library.Repo) error { // update fields in build object b.SetCreated(time.Now().UTC().Unix()) diff --git a/api/build_test.go b/api/build_test.go index b92802303..1fb395d7d 100644 --- a/api/build_test.go +++ b/api/build_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2022 Target Brands, Inc. All rights reserved. +// Copyright (c) 2023 Target Brands, Inc. All rights reserved. // // Use of this source code is governed by the LICENSE file in this repository. @@ -10,7 +10,7 @@ import ( "github.com/go-vela/types/pipeline" ) -func Test_skipEmptyBuild(t *testing.T) { +func Test_SkipEmptyBuild(t *testing.T) { type args struct { p *pipeline.Build } @@ -72,8 +72,8 @@ func Test_skipEmptyBuild(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := skipEmptyBuild(tt.args.p); got != tt.want { - t.Errorf("skipEmptyBuild() = %v, want %v", got, tt.want) + if got := SkipEmptyBuild(tt.args.p); got != tt.want { + t.Errorf("SkipEmptyBuild() = %v, want %v", got, tt.want) } }) } diff --git a/api/schedule/create.go b/api/schedule/create.go index 127c04909..c8eb92741 100644 --- a/api/schedule/create.go +++ b/api/schedule/create.go @@ -1,4 +1,4 @@ -// Copyright (c) 2022 Target Brands, Inc. All rights reserved. +// Copyright (c) 2023 Target Brands, Inc. All rights reserved. // // Use of this source code is governed by the LICENSE file in this repository. diff --git a/api/schedule/delete.go b/api/schedule/delete.go index fd7c0715f..a697954d8 100644 --- a/api/schedule/delete.go +++ b/api/schedule/delete.go @@ -1,4 +1,4 @@ -// Copyright (c) 2022 Target Brands, Inc. All rights reserved. +// Copyright (c) 2023 Target Brands, Inc. All rights reserved. // // Use of this source code is governed by the LICENSE file in this repository. diff --git a/api/schedule/get.go b/api/schedule/get.go index 51a436bbe..727de5b84 100644 --- a/api/schedule/get.go +++ b/api/schedule/get.go @@ -1,4 +1,4 @@ -// Copyright (c) 2022 Target Brands, Inc. All rights reserved. +// Copyright (c) 2023 Target Brands, Inc. All rights reserved. // // Use of this source code is governed by the LICENSE file in this repository. diff --git a/api/schedule/list.go b/api/schedule/list.go index c43188d86..cb64ada74 100644 --- a/api/schedule/list.go +++ b/api/schedule/list.go @@ -1,4 +1,4 @@ -// Copyright (c) 2022 Target Brands, Inc. All rights reserved. +// Copyright (c) 2023 Target Brands, Inc. All rights reserved. // // Use of this source code is governed by the LICENSE file in this repository. diff --git a/api/schedule/update.go b/api/schedule/update.go index bdadbdd2d..646e55fc2 100644 --- a/api/schedule/update.go +++ b/api/schedule/update.go @@ -9,11 +9,10 @@ import ( "net/http" "time" - "github.com/go-vela/server/router/middleware/schedule" - "github.com/gin-gonic/gin" "github.com/go-vela/server/database" "github.com/go-vela/server/router/middleware/repo" + "github.com/go-vela/server/router/middleware/schedule" "github.com/go-vela/server/util" "github.com/go-vela/types/library" "github.com/sirupsen/logrus" diff --git a/api/webhook.go b/api/webhook.go index 0c8c5bd73..5f4ddc5bf 100644 --- a/api/webhook.go +++ b/api/webhook.go @@ -542,7 +542,7 @@ func PostWebhook(c *gin.Context) { repo.SetPipelineType(pipelineType) // skip the build if only the init or clone steps are found - skip := skipEmptyBuild(p) + skip := SkipEmptyBuild(p) if skip != "" { // set build to successful status b.SetStatus(constants.StatusSkipped) @@ -609,7 +609,7 @@ func PostWebhook(c *gin.Context) { // using the same Number and thus create a constraint // conflict; consider deleting the partially created // build object in the database - err = planBuild(database.FromContext(c), p, b, repo) + err = PlanBuild(database.FromContext(c), p, b, repo) if err != nil { retErr := fmt.Errorf("%s: %w", baseErr, err) @@ -696,7 +696,7 @@ func PostWebhook(c *gin.Context) { } // publish the build to the queue - go publishToQueue( + go PublishToQueue( queue.FromGinContext(c), database.FromContext(c), p, @@ -706,9 +706,9 @@ func PostWebhook(c *gin.Context) { ) } -// publishToQueue is a helper function that creates +// PublishToQueue is a helper function that creates // a build item and publishes it to the queue. -func publishToQueue(queue queue.Service, db database.Interface, p *pipeline.Build, b *library.Build, r *library.Repo, u *library.User) { +func PublishToQueue(queue queue.Service, db database.Interface, p *pipeline.Build, b *library.Build, r *library.Repo, u *library.User) { item := types.ToItem(p, b, r, u) logrus.Infof("Converting queue item to json for build %d for %s", b.GetNumber(), r.GetFullName()) diff --git a/cmd/vela-server/schedule.go b/cmd/vela-server/schedule.go new file mode 100644 index 000000000..172b5c251 --- /dev/null +++ b/cmd/vela-server/schedule.go @@ -0,0 +1,381 @@ +// Copyright (c) 2023 Target Brands, Inc. All rights reserved. +// +// Use of this source code is governed by the LICENSE file in this repository. + +package main + +import ( + "fmt" + "strings" + "time" + + "github.com/adhocore/gronx" + "github.com/go-vela/server/api" + "github.com/go-vela/server/compiler" + "github.com/go-vela/server/database" + "github.com/go-vela/server/queue" + "github.com/go-vela/server/scm" + "github.com/go-vela/types" + "github.com/go-vela/types/constants" + "github.com/go-vela/types/library" + "github.com/go-vela/types/pipeline" + "github.com/sirupsen/logrus" + + "k8s.io/apimachinery/pkg/util/wait" +) + +const baseErr = "unable to schedule build" + +func processSchedules(compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error { + logrus.Infof("processing active schedules to create builds") + + // send API call to capture the list of active schedules + schedules, err := database.ListActiveSchedules() + if err != nil { + return err + } + + // iterate through the list of active schedules + for _, s := range schedules { + // send API call to capture the schedule + // + // This is needed to ensure we are not dealing with a stale schedule since we fetch + // all schedules once and iterate through that list which can take a significant + // amount of time to get to the end of the list. + schedule, err := database.GetSchedule(s.GetID()) + if err != nil { + logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName()) + + continue + } + + // create a variable to track if a build should be triggered based off the schedule + trigger := false + + // check if a build has already been triggered for the schedule + if schedule.GetScheduledAt() == 0 { + // trigger a build for the schedule since one has not already been scheduled + trigger = true + } else { + // parse the previous occurrence of the entry for the schedule + prevTime, err := gronx.PrevTick(schedule.GetEntry(), true) + if err != nil { + logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName()) + + continue + } + + // parse the next occurrence of the entry for the schedule + nextTime, err := gronx.NextTick(schedule.GetEntry(), true) + if err != nil { + logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName()) + + continue + } + + // parse the UNIX timestamp from when the last build was triggered for the schedule + t := time.Unix(schedule.GetScheduledAt(), 0).UTC() + + // check if the time since the last triggered build is greater than the entry duration for the schedule + if time.Since(t) > nextTime.Sub(prevTime) { + // trigger a build for the schedule since it has not previously ran + trigger = true + } + } + + if trigger && schedule.GetActive() { + err = processSchedule(schedule, compiler, database, metadata, queue, scm) + if err != nil { + logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName()) + + continue + } + } + } + + return nil +} + +//nolint:funlen // ignore function length and number of statements +func processSchedule(s *library.Schedule, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error { + // sleep for 1s - 3s before processing the schedule + // + // This should prevent multiple servers from processing a schedule at the same time by + // leveraging a base duration along with a standard deviation of randomness a.k.a. + // "jitter". To create the jitter, we use a base duration of 1s with a scale factor of 3.0. + time.Sleep(wait.Jitter(time.Second, 3.0)) + + // send API call to capture the repo for the schedule + r, err := database.GetRepo(s.GetRepoID()) + if err != nil { + return fmt.Errorf("unable to fetch repo: %w", err) + } + + logrus.Tracef("processing schedule %s/%s", r.GetFullName(), s.GetName()) + + // check if the repo is active + if !r.GetActive() { + return fmt.Errorf("repo %s is not active", r.GetFullName()) + } + + // check if the repo has a valid owner + if r.GetUserID() == 0 { + return fmt.Errorf("repo %s does not have a valid owner", r.GetFullName()) + } + + // send API call to capture the owner for the repo + u, err := database.GetUser(r.GetUserID()) + if err != nil { + return fmt.Errorf("unable to get owner for repo %s: %w", r.GetFullName(), err) + } + + // send API call to confirm repo owner has at least write access to repo + _, err = scm.RepoAccess(u, u.GetToken(), r.GetOrg(), r.GetName()) + if err != nil { + return fmt.Errorf("%s does not have at least write access for repo %s", u.GetName(), r.GetFullName()) + } + + // create SQL filters for querying pending and running builds for repo + filters := map[string]interface{}{ + "status": []string{constants.StatusPending, constants.StatusRunning}, + } + + // send API call to capture the number of pending or running builds for the repo + builds, err := database.GetRepoBuildCount(r, filters) + if err != nil { + return fmt.Errorf("unable to get count of builds for repo %s: %w", r.GetFullName(), err) + } + + // check if the number of pending and running builds exceeds the limit for the repo + if builds >= r.GetBuildLimit() { + return fmt.Errorf("repo %s has excceded the concurrent build limit of %d", r.GetFullName(), r.GetBuildLimit()) + } + + // send API call to capture the commit sha for the branch + _, commit, err := scm.GetBranch(u, r) + if err != nil { + return fmt.Errorf("failed to get commit for repo %s on %s branch: %w", r.GetFullName(), r.GetBranch(), err) + } + + url := strings.TrimSuffix(r.GetClone(), ".git") + + b := new(library.Build) + b.SetAuthor(s.GetCreatedBy()) + b.SetBranch(r.GetBranch()) + b.SetClone(r.GetClone()) + b.SetCommit(commit) + b.SetDeploy(s.GetName()) + b.SetEvent(constants.EventSchedule) + b.SetMessage(fmt.Sprintf("triggered for %s schedule with %s entry", s.GetName(), s.GetEntry())) + b.SetRef(fmt.Sprintf("refs/heads/%s", b.GetBranch())) + b.SetRepoID(r.GetID()) + b.SetSender(s.GetUpdatedBy()) + b.SetSource(fmt.Sprintf("%s/tree/%s", url, b.GetBranch())) + b.SetStatus(constants.StatusPending) + b.SetTitle(fmt.Sprintf("%s received from %s", constants.EventSchedule, url)) + + // populate the build link if a web address is provided + if len(metadata.Vela.WebAddress) > 0 { + b.SetLink(fmt.Sprintf("%s/%s/%d", metadata.Vela.WebAddress, r.GetFullName(), b.GetNumber())) + } + + var ( + // variable to store the raw pipeline configuration + config []byte + // variable to store executable pipeline + p *pipeline.Build + // variable to store pipeline configuration + pipeline *library.Pipeline + // variable to control number of times to retry processing pipeline + retryLimit = 5 + // variable to store the pipeline type for the repository + pipelineType = r.GetPipelineType() + ) + + // implement a loop to process asynchronous operations with a retry limit + // + // Some operations taken during this workflow can lead to race conditions failing to successfully process + // the request. This logic ensures we attempt our best efforts to handle these cases gracefully. + for i := 0; i < retryLimit; i++ { + logrus.Debugf("compilation loop - attempt %d", i+1) + // check if we're on the first iteration of the loop + if i > 0 { + // incrementally sleep in between retries + time.Sleep(time.Duration(i) * time.Second) + } + + // send API call to attempt to capture the pipeline + pipeline, err = database.GetPipelineForRepo(b.GetCommit(), r) + if err != nil { // assume the pipeline doesn't exist in the database yet + // send API call to capture the pipeline configuration file + config, err = scm.ConfigBackoff(u, r, b.GetCommit()) + if err != nil { + return fmt.Errorf("unable to get pipeline config for %s/%s: %w", r.GetFullName(), b.GetCommit(), err) + } + } else { + config = pipeline.GetData() + } + + // send API call to capture repo for the counter (grabbing repo again to ensure counter is correct) + r, err = database.GetRepoForOrg(r.GetOrg(), r.GetName()) + if err != nil { + err = fmt.Errorf("unable to get repo %s: %w", r.GetFullName(), err) + + // check if the retry limit has been exceeded + if i < retryLimit-1 { + logrus.WithError(err).Warningf("retrying #%d", i+1) + + // continue to the next iteration of the loop + continue + } + + return err + } + + // set the build numbers based off repo counter + r.SetCounter(r.GetCounter() + 1) + b.SetNumber(r.GetCounter() + 1) + // set the parent equal to the current repo counter + b.SetParent(r.GetCounter()) + // check if the parent is set to 0 + if b.GetParent() == 0 { + // parent should be "1" if it's the first build ran + b.SetParent(1) + } + + // set the build link if a web address is provided + if len(metadata.Vela.WebAddress) > 0 { + b.SetLink(fmt.Sprintf("%s/%s/%d", metadata.Vela.WebAddress, r.GetFullName(), b.GetNumber())) + } + + // ensure we use the expected pipeline type when compiling + // + // The pipeline type for a repo can change at any time which can break compiling + // existing pipelines in the system for that repo. To account for this, we update + // the repo pipeline type to match what was defined for the existing pipeline + // before compiling. After we're done compiling, we reset the pipeline type. + if len(pipeline.GetType()) > 0 { + r.SetPipelineType(pipeline.GetType()) + } + + var compiled *library.Pipeline + // parse and compile the pipeline configuration file + p, compiled, err = compiler. + Duplicate(). + WithBuild(b). + WithCommit(b.GetCommit()). + WithMetadata(metadata). + WithRepo(r). + WithUser(u). + Compile(config) + if err != nil { + return fmt.Errorf("unable to compile pipeline config for %s/%s: %w", r.GetFullName(), b.GetCommit(), err) + } + + // reset the pipeline type for the repo + // + // The pipeline type for a repo can change at any time which can break compiling + // existing pipelines in the system for that repo. To account for this, we update + // the repo pipeline type to match what was defined for the existing pipeline + // before compiling. After we're done compiling, we reset the pipeline type. + r.SetPipelineType(pipelineType) + + // skip the build if only the init or clone steps are found + skip := api.SkipEmptyBuild(p) + if skip != "" { + return nil + } + + // check if the pipeline did not already exist in the database + if pipeline == nil { + pipeline = compiled + pipeline.SetRepoID(r.GetID()) + pipeline.SetCommit(b.GetCommit()) + pipeline.SetRef(b.GetRef()) + + // send API call to create the pipeline + err = database.CreatePipeline(pipeline) + if err != nil { + err = fmt.Errorf("failed to create pipeline for %s: %w", r.GetFullName(), err) + + // check if the retry limit has been exceeded + if i < retryLimit-1 { + logrus.WithError(err).Warningf("retrying #%d", i+1) + + // continue to the next iteration of the loop + continue + } + + return err + } + + // send API call to capture the created pipeline + pipeline, err = database.GetPipelineForRepo(pipeline.GetCommit(), r) + if err != nil { + return fmt.Errorf("unable to get new pipeline %s/%s: %w", r.GetFullName(), pipeline.GetCommit(), err) + } + } + + b.SetPipelineID(pipeline.GetID()) + + // create the objects from the pipeline in the database + // TODO: + // - if a build gets created and something else fails midway, + // the next loop will attempt to create the same build, + // using the same Number and thus create a constraint + // conflict; consider deleting the partially created + // build object in the database + err = api.PlanBuild(database, p, b, r) + if err != nil { + // check if the retry limit has been exceeded + if i < retryLimit-1 { + logrus.WithError(err).Warningf("retrying #%d", i+1) + + // reset fields set by cleanBuild for retry + b.SetError("") + b.SetStatus(constants.StatusPending) + b.SetFinished(0) + + // continue to the next iteration of the loop + continue + } + + return err + } + + s.SetScheduledAt(time.Now().UTC().Unix()) + + // break the loop because everything was successful + break + } // end of retry loop + + // send API call to update repo for ensuring counter is incremented + err = database.UpdateRepo(r) + if err != nil { + return fmt.Errorf("unable to update repo %s: %w", r.GetFullName(), err) + } + + // send API call to update schedule for ensuring scheduled_at field is set + err = database.UpdateSchedule(s) + if err != nil { + return fmt.Errorf("unable to update schedule %s/%s: %w", r.GetFullName(), s.GetName(), err) + } + + // send API call to capture the triggered build + b, err = database.GetBuild(b.GetNumber(), r) + if err != nil { + return fmt.Errorf("unable to get new build %s/%d: %w", r.GetFullName(), b.GetNumber(), err) + } + + // publish the build to the queue + go api.PublishToQueue( + queue, + database, + p, + b, + r, + u, + ) + + return nil +} diff --git a/cmd/vela-server/server.go b/cmd/vela-server/server.go index 1f8bdbd9c..80508e4b0 100644 --- a/cmd/vela-server/server.go +++ b/cmd/vela-server/server.go @@ -9,16 +9,19 @@ import ( "fmt" "net/http" "net/url" + "os" + "os/signal" + "syscall" "time" + "github.com/gin-gonic/gin" "github.com/go-vela/server/router" "github.com/go-vela/server/router/middleware" - - "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" - "github.com/urfave/cli/v2" - "gopkg.in/tomb.v2" + "golang.org/x/sync/errgroup" + + "k8s.io/apimachinery/pkg/util/wait" ) func server(c *cli.Context) error { @@ -111,47 +114,92 @@ func server(c *cli.Context) error { return err } - var tomb tomb.Tomb - // start http server - tomb.Go(func() error { - port := addr.Port() + port := addr.Port() + // check if a port is part of the address + if len(port) == 0 { + port = c.String("server-port") + } + + // gin expects the address to be ":" ie ":8080" + srv := &http.Server{ + Addr: fmt.Sprintf(":%s", port), + Handler: router, + ReadHeaderTimeout: 60 * time.Second, + } - // check if a port is part of the address - if len(port) == 0 { - port = c.String("server-port") + // create the context for controlling the worker subprocesses + ctx, done := context.WithCancel(context.Background()) + // create the errgroup for managing worker subprocesses + // + // https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#Group + g, gctx := errgroup.WithContext(ctx) + + // spawn goroutine to check for signals to gracefully shutdown + g.Go(func() error { + signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM) + + select { + case sig := <-signalChannel: + logrus.Infof("received signal: %s", sig) + err := srv.Shutdown(ctx) + if err != nil { + logrus.Error(err) + } + done() + case <-gctx.Done(): + logrus.Info("closing signal goroutine") + err := srv.Shutdown(ctx) + if err != nil { + logrus.Error(err) + } + return gctx.Err() } - // gin expects the address to be ":" ie ":8080" - srv := &http.Server{ - Addr: fmt.Sprintf(":%s", port), - Handler: router, - ReadHeaderTimeout: 60 * time.Second, + return nil + }) + + // spawn goroutine for starting the server + g.Go(func() error { + logrus.Infof("starting server on %s", addr.Host) + err = srv.ListenAndServe() + if err != nil { + // log a message indicating the failure of the server + logrus.Errorf("failing server: %v", err) } - logrus.Infof("running server on %s", addr.Host) - go func() { - logrus.Info("Starting HTTP server...") - err := srv.ListenAndServe() - if err != nil { - tomb.Kill(err) - } - }() + return err + }) - //nolint:gosimple // ignore this for now + // spawn goroutine for starting the scheduler + g.Go(func() error { + logrus.Info("starting scheduler") for { - select { - case <-tomb.Dying(): - logrus.Info("Stopping HTTP server...") - return srv.Shutdown(context.Background()) + // cut the configured minimum frequency duration for schedules in half + // + // We need to sleep for some amount of time before we attempt to process schedules + // setup in the database. Since the minimum frequency is configurable, we cut it in + // half and use that as the base duration to determine how long to sleep for. + base := c.Duration("schedule-minimum-frequency") / 2 + logrus.Infof("sleeping for %v before scheduling builds", base) + + // sleep for a duration of time before processing schedules + // + // This should prevent multiple servers from processing schedules at the same time by + // leveraging a base duration along with a standard deviation of randomness a.k.a. + // "jitter". To create the jitter, we use the configured minimum frequency duration + // along with a scale factor of 0.1. + time.Sleep(wait.Jitter(base, 0.1)) + + err = processSchedules(compiler, database, metadata, queue, scm) + if err != nil { + logrus.WithError(err).Warn("unable to process schedules") + } else { + logrus.Trace("successfully processed schedules") } } }) - // Wait for stuff and watch for errors - err = tomb.Wait() - if err != nil { - return err - } - - return tomb.Err() + // wait for errors from server subprocesses + return g.Wait() } diff --git a/docker-compose.yml b/docker-compose.yml index 1394c317b..d030743e6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -157,4 +157,4 @@ services: - IPC_LOCK networks: - vela: + vela: \ No newline at end of file diff --git a/go.mod b/go.mod index b2ab8d4c0..5ff15db7b 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/drone/envsubst v1.0.3 github.com/gin-gonic/gin v1.9.0 github.com/go-playground/assert/v2 v2.2.0 - github.com/go-vela/types v0.19.3-0.20230516131722-f538de06bbf6 + github.com/go-vela/types v0.19.3-0.20230519215217-0da8c8b5e90f github.com/golang-jwt/jwt/v5 v5.0.0 github.com/google/go-cmp v0.5.9 github.com/google/go-github/v52 v52.0.0 @@ -33,8 +33,8 @@ require ( github.com/urfave/cli/v2 v2.25.1 go.starlark.net v0.0.0-20230302034142-4b1e35fe2254 golang.org/x/oauth2 v0.7.0 + golang.org/x/sync v0.1.0 gopkg.in/square/go-jose.v2 v2.6.0 - gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 gorm.io/driver/postgres v1.5.0 gorm.io/driver/sqlite v1.4.4 gorm.io/gorm v1.25.0 @@ -123,4 +123,5 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/klog/v2 v2.90.1 // indirect + k8s.io/utils v0.0.0-20230209194617-a36077c30491 // indirect ) diff --git a/go.sum b/go.sum index e0f559ca4..e988c342d 100644 --- a/go.sum +++ b/go.sum @@ -138,8 +138,8 @@ github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91 github.com/go-playground/validator/v10 v10.11.2 h1:q3SHpufmypg+erIExEKUmsgmhDTyhcJ38oeKGACXohU= github.com/go-playground/validator/v10 v10.11.2/go.mod h1:NieE624vt4SCTJtD87arVLvdmjPAeV8BQlHtMnw9D7s= github.com/go-test/deep v1.0.2 h1:onZX1rnHT3Wv6cqNgYyFOOlgVKJrksuCMCRvJStbMYw= -github.com/go-vela/types v0.19.3-0.20230516131722-f538de06bbf6 h1:WVmgeHuPN2WHTf/tJtseEMPxPoKdit2rD4nCZyPIias= -github.com/go-vela/types v0.19.3-0.20230516131722-f538de06bbf6/go.mod h1:0lsuPfGyVyTWJSi2h3NS6uaEW6DgnFvIzaZu1sXYKrs= +github.com/go-vela/types v0.19.3-0.20230519215217-0da8c8b5e90f h1:13H381Djx9iFC3BSj2f/ac57HlaI3mQL0el9vM7a3+k= +github.com/go-vela/types v0.19.3-0.20230519215217-0da8c8b5e90f/go.mod h1:0lsuPfGyVyTWJSi2h3NS6uaEW6DgnFvIzaZu1sXYKrs= github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA= github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -510,6 +510,7 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -736,8 +737,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/square/go-jose.v2 v2.6.0 h1:NGk74WTnPKBNUhNzQX7PYcTLUjoq7mzKk2OKbvwk2iI= gopkg.in/square/go-jose.v2 v2.6.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= -gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 h1:yiW+nvdHb9LVqSHQBXfZCieqV4fzYhNBql77zY0ykqs= -gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637/go.mod h1:BHsqpu/nsuzkT5BpiH1EMZPLyqSMM8JbIavyFACoFNk= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -765,6 +764,8 @@ k8s.io/apimachinery v0.27.1 h1:EGuZiLI95UQQcClhanryclaQE6xjg1Bts6/L3cD7zyc= k8s.io/apimachinery v0.27.1/go.mod h1:5ikh59fK3AJ287GUvpUsryoMFtH9zj/ARfWCo3AyXTM= k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw= k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= +k8s.io/utils v0.0.0-20230209194617-a36077c30491 h1:r0BAOLElQnnFhE/ApUsg3iHdVYYPBjNSSOMowRZxxsY= +k8s.io/utils v0.0.0-20230209194617-a36077c30491/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= diff --git a/scm/github/repo.go b/scm/github/repo.go index d82c2aeac..b8fa31c3f 100644 --- a/scm/github/repo.go +++ b/scm/github/repo.go @@ -537,3 +537,22 @@ func (c *client) GetHTMLURL(u *library.User, org, repo, name, ref string) (strin return "", fmt.Errorf("no valid repository contents found") } + +// GetBranch defines a function that retrieves a branch for a repo. +func (c *client) GetBranch(u *library.User, r *library.Repo) (string, string, error) { + c.Logger.WithFields(logrus.Fields{ + "org": r.GetOrg(), + "repo": r.GetName(), + "user": u.GetName(), + }).Tracef("retrieving branch %s for repo %s", r.GetBranch(), r.GetFullName()) + + // create GitHub OAuth client with user's token + client := c.newClientToken(u.GetToken()) + + data, _, err := client.Repositories.GetBranch(ctx, r.GetOrg(), r.GetName(), r.GetBranch(), true) + if err != nil { + return "", "", err + } + + return data.GetName(), data.GetCommit().GetSHA(), nil +} diff --git a/scm/github/repo_test.go b/scm/github/repo_test.go index 3dba1bac3..7482c71ce 100644 --- a/scm/github/repo_test.go +++ b/scm/github/repo_test.go @@ -1302,3 +1302,52 @@ func TestGithub_GetPullRequest(t *testing.T) { t.Errorf("HeadRef is %v, want %v", gotHeadRef, wantHeadRef) } } + +func TestGithub_GetBranch(t *testing.T) { + // setup context + gin.SetMode(gin.TestMode) + + resp := httptest.NewRecorder() + _, engine := gin.CreateTestContext(resp) + + // setup mock server + engine.GET("/api/v3/repos/:owner/:repo/branches/:branch", func(c *gin.Context) { + c.Header("Content-Type", "application/json") + c.Status(http.StatusOK) + c.File("testdata/branch.json") + }) + + s := httptest.NewServer(engine) + defer s.Close() + + // setup types + u := new(library.User) + u.SetName("foo") + u.SetToken("bar") + + r := new(library.Repo) + r.SetOrg("octocat") + r.SetName("Hello-World") + r.SetFullName("octocat/Hello-World") + r.SetBranch("main") + + wantBranch := "main" + wantCommit := "7fd1a60b01f91b314f59955a4e4d4e80d8edf11d" + + client, _ := NewTest(s.URL) + + // run test + gotBranch, gotCommit, err := client.GetBranch(u, r) + + if err != nil { + t.Errorf("Status returned err: %v", err) + } + + if !strings.EqualFold(gotBranch, wantBranch) { + t.Errorf("Branch is %v, want %v", gotBranch, wantBranch) + } + + if !strings.EqualFold(gotCommit, wantCommit) { + t.Errorf("Commit is %v, want %v", gotCommit, wantCommit) + } +} diff --git a/scm/github/testdata/branch.json b/scm/github/testdata/branch.json new file mode 100644 index 000000000..b133e7b38 --- /dev/null +++ b/scm/github/testdata/branch.json @@ -0,0 +1,101 @@ +{ + "name": "main", + "commit": { + "sha": "7fd1a60b01f91b314f59955a4e4d4e80d8edf11d", + "node_id": "MDY6Q29tbWl0MTI5NjI2OTo3ZmQxYTYwYjAxZjkxYjMxNGY1OTk1NWE0ZTRkNGU4MGQ4ZWRmMTFk", + "commit": { + "author": { + "name": "The Octocat", + "email": "octocat@nowhere.com", + "date": "2012-03-06T23:06:50Z" + }, + "committer": { + "name": "The Octocat", + "email": "octocat@nowhere.com", + "date": "2012-03-06T23:06:50Z" + }, + "message": "Merge pull request #6 from Spaceghost/patch-1\n\nNew line at end of file.", + "tree": { + "sha": "b4eecafa9be2f2006ce1b709d6857b07069b4608", + "url": "https://api.github.com/repos/octocat/Hello-World/git/trees/b4eecafa9be2f2006ce1b709d6857b07069b4608" + }, + "url": "https://api.github.com/repos/octocat/Hello-World/git/commits/7fd1a60b01f91b314f59955a4e4d4e80d8edf11d", + "comment_count": 77, + "verification": { + "verified": false, + "reason": "unsigned", + "signature": null, + "payload": null + } + }, + "url": "https://api.github.com/repos/octocat/Hello-World/commits/7fd1a60b01f91b314f59955a4e4d4e80d8edf11d", + "html_url": "https://github.com/octocat/Hello-World/commit/7fd1a60b01f91b314f59955a4e4d4e80d8edf11d", + "comments_url": "https://api.github.com/repos/octocat/Hello-World/commits/7fd1a60b01f91b314f59955a4e4d4e80d8edf11d/comments", + "author": { + "login": "octocat", + "id": 583231, + "node_id": "MDQ6VXNlcjU4MzIzMQ==", + "avatar_url": "https://avatars.githubusercontent.com/u/583231?v=4", + "gravatar_id": "", + "url": "https://api.github.com/users/octocat", + "html_url": "https://github.com/octocat", + "followers_url": "https://api.github.com/users/octocat/followers", + "following_url": "https://api.github.com/users/octocat/following{/other_user}", + "gists_url": "https://api.github.com/users/octocat/gists{/gist_id}", + "starred_url": "https://api.github.com/users/octocat/starred{/owner}{/repo}", + "subscriptions_url": "https://api.github.com/users/octocat/subscriptions", + "organizations_url": "https://api.github.com/users/octocat/orgs", + "repos_url": "https://api.github.com/users/octocat/repos", + "events_url": "https://api.github.com/users/octocat/events{/privacy}", + "received_events_url": "https://api.github.com/users/octocat/received_events", + "type": "User", + "site_admin": false + }, + "committer": { + "login": "octocat", + "id": 583231, + "node_id": "MDQ6VXNlcjU4MzIzMQ==", + "avatar_url": "https://avatars.githubusercontent.com/u/583231?v=4", + "gravatar_id": "", + "url": "https://api.github.com/users/octocat", + "html_url": "https://github.com/octocat", + "followers_url": "https://api.github.com/users/octocat/followers", + "following_url": "https://api.github.com/users/octocat/following{/other_user}", + "gists_url": "https://api.github.com/users/octocat/gists{/gist_id}", + "starred_url": "https://api.github.com/users/octocat/starred{/owner}{/repo}", + "subscriptions_url": "https://api.github.com/users/octocat/subscriptions", + "organizations_url": "https://api.github.com/users/octocat/orgs", + "repos_url": "https://api.github.com/users/octocat/repos", + "events_url": "https://api.github.com/users/octocat/events{/privacy}", + "received_events_url": "https://api.github.com/users/octocat/received_events", + "type": "User", + "site_admin": false + }, + "parents": [ + { + "sha": "553c2077f0edc3d5dc5d17262f6aa498e69d6f8e", + "url": "https://api.github.com/repos/octocat/Hello-World/commits/553c2077f0edc3d5dc5d17262f6aa498e69d6f8e", + "html_url": "https://github.com/octocat/Hello-World/commit/553c2077f0edc3d5dc5d17262f6aa498e69d6f8e" + }, + { + "sha": "762941318ee16e59dabbacb1b4049eec22f0d303", + "url": "https://api.github.com/repos/octocat/Hello-World/commits/762941318ee16e59dabbacb1b4049eec22f0d303", + "html_url": "https://github.com/octocat/Hello-World/commit/762941318ee16e59dabbacb1b4049eec22f0d303" + } + ] + }, + "_links": { + "self": "https://api.github.com/repos/octocat/Hello-World/branches/main", + "html": "https://github.com/octocat/Hello-World/tree/main" + }, + "protected": false, + "protection": { + "enabled": false, + "required_status_checks": { + "enforcement_level": "off", + "contexts": [], + "checks": [] + } + }, + "protection_url": "https://api.github.com/repos/octocat/Hello-World/branches/main/protection" +} \ No newline at end of file diff --git a/scm/service.go b/scm/service.go index bb0c0e275..c92cbedbf 100644 --- a/scm/service.go +++ b/scm/service.go @@ -108,6 +108,9 @@ type Service interface { // ListUserRepos defines a function that retrieves // all repos with admin rights for the user. ListUserRepos(*library.User) ([]*library.Repo, error) + // GetBranch defines a function that retrieves + // a branch for a repo. + GetBranch(*library.User, *library.Repo) (string, string, error) // GetPullRequest defines a function that retrieves // a pull request for a repo. GetPullRequest(*library.User, *library.Repo, int) (string, string, string, string, error)