diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go index 083b9e52..0770e816 100644 --- a/cmd/cc-backend/server.go +++ b/cmd/cc-backend/server.go @@ -25,7 +25,6 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" "github.com/ClusterCockpit/cc-backend/internal/graph/generated" - "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/routerConfig" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/runtimeEnv" @@ -314,9 +313,6 @@ func serverShutdown() { // First shut down the server gracefully (waiting for all ongoing requests) server.Shutdown(context.Background()) - // Then, wait for any async jobStarts still pending... - repository.WaitForJobStart() - // Then, wait for any async archivings still pending... archiver.WaitForArchiving() } diff --git a/internal/api/api_test.go b/internal/api/api_test.go index bcabd5f7..c47bd4d3 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -249,9 +249,6 @@ func TestRestApi(t *testing.T) { if response.StatusCode != http.StatusCreated { t.Fatal(response.Status, recorder.Body.String()) } - - time.Sleep(1 * time.Second) - resolver := graph.GetResolverInstance() job, err := restapi.JobRepository.Find(&TestJobId, &TestClusterName, &TestStartTime) if err != nil { diff --git a/internal/api/rest.go b/internal/api/rest.go index 41b8d5a3..4e527018 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -123,18 +123,8 @@ func (api *RestApi) MountFrontendApiRoutes(r *mux.Router) { } } -// StartJobApiResponse model -type StartJobApiResponse struct { - Message string `json:"msg"` -} - -// DeleteJobApiResponse model -type DeleteJobApiResponse struct { - Message string `json:"msg"` -} - -// UpdateUserApiResponse model -type UpdateUserApiResponse struct { +// DefaultApiResponse model +type DefaultJobApiResponse struct { Message string `json:"msg"` } @@ -790,6 +780,11 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { return } + // aquire lock to avoid race condition between API calls + var unlockOnce sync.Once + api.RepositoryMutex.Lock() + defer unlockOnce.Do(api.RepositoryMutex.Unlock) + // Check if combination of (job_id, cluster_id, start_time) already exists: jobs, err := api.JobRepository.FindAll(&req.JobID, &req.Cluster, nil) if err != nil && err != sql.ErrNoRows { @@ -804,12 +799,27 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { } } - repository.TriggerJobStart(repository.JobWithUser{Job: &req, User: repository.GetUserFromContext(r.Context())}) + id, err := api.JobRepository.Start(&req) + if err != nil { + handleError(fmt.Errorf("insert into database failed: %w", err), http.StatusInternalServerError, rw) + return + } + // unlock here, adding Tags can be async + unlockOnce.Do(api.RepositoryMutex.Unlock) + + for _, tag := range req.Tags { + if _, err := api.JobRepository.AddTagOrCreate(repository.GetUserFromContext(r.Context()), id, tag.Type, tag.Name, tag.Scope); err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + handleError(fmt.Errorf("adding tag to new job %d failed: %w", id, err), http.StatusInternalServerError, rw) + return + } + } + log.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", id, req.Cluster, req.JobID, req.User, req.StartTime) rw.Header().Add("Content-Type", "application/json") rw.WriteHeader(http.StatusCreated) - json.NewEncoder(rw).Encode(StartJobApiResponse{ - Message: fmt.Sprintf("Successfully triggered job start"), + json.NewEncoder(rw).Encode(DefaultJobApiResponse{ + Message: "success", }) } @@ -892,7 +902,7 @@ func (api *RestApi) deleteJobById(rw http.ResponseWriter, r *http.Request) { } rw.Header().Add("Content-Type", "application/json") rw.WriteHeader(http.StatusOK) - json.NewEncoder(rw).Encode(DeleteJobApiResponse{ + json.NewEncoder(rw).Encode(DefaultJobApiResponse{ Message: fmt.Sprintf("Successfully deleted job %s", id), }) } @@ -943,7 +953,7 @@ func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request) rw.Header().Add("Content-Type", "application/json") rw.WriteHeader(http.StatusOK) - json.NewEncoder(rw).Encode(DeleteJobApiResponse{ + json.NewEncoder(rw).Encode(DefaultJobApiResponse{ Message: fmt.Sprintf("Successfully deleted job %d", job.ID), }) } @@ -987,7 +997,7 @@ func (api *RestApi) deleteJobBefore(rw http.ResponseWriter, r *http.Request) { rw.Header().Add("Content-Type", "application/json") rw.WriteHeader(http.StatusOK) - json.NewEncoder(rw).Encode(DeleteJobApiResponse{ + json.NewEncoder(rw).Encode(DefaultJobApiResponse{ Message: fmt.Sprintf("Successfully deleted %d jobs", cnt), }) } diff --git a/internal/repository/dbConnection.go b/internal/repository/dbConnection.go index d062052f..418eef94 100644 --- a/internal/repository/dbConnection.go +++ b/internal/repository/dbConnection.go @@ -82,8 +82,6 @@ func Connect(driver string, db string) { if err != nil { log.Fatal(err) } - - startJobStartWorker() }) } diff --git a/internal/repository/jobStartWorker.go b/internal/repository/jobStartWorker.go deleted file mode 100644 index 18d2be7b..00000000 --- a/internal/repository/jobStartWorker.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package repository - -import ( - "sync" - "time" - - "github.com/ClusterCockpit/cc-backend/pkg/log" - "github.com/ClusterCockpit/cc-backend/pkg/schema" -) - -type JobWithUser struct { - Job *schema.JobMeta - User *schema.User -} - -var ( - jobStartPending sync.WaitGroup - jobStartChannel chan JobWithUser -) - -func startJobStartWorker() { - jobStartChannel = make(chan JobWithUser, 128) - - go jobStartWorker() -} - -// Archiving worker thread -func jobStartWorker() { - for { - select { - case req, ok := <-jobStartChannel: - if !ok { - break - } - jobRepo := GetJobRepository() - var id int64 - - for i := 0; i < 5; i++ { - var err error - - id, err = jobRepo.Start(req.Job) - if err != nil { - log.Errorf("Attempt %d: insert into database failed: %v", i, err) - } else { - break - } - time.Sleep(1 * time.Second) - } - - for _, tag := range req.Job.Tags { - if _, err := jobRepo.AddTagOrCreate(req.User, id, - tag.Type, tag.Name, tag.Scope); err != nil { - log.Errorf("adding tag to new job %d failed: %v", id, err) - } - } - - log.Printf("new job (id: %d): cluster=%s, jobId=%d, user=%s, startTime=%d", - id, req.Job.Cluster, req.Job.JobID, req.Job.User, req.Job.StartTime) - - jobStartPending.Done() - } - } -} - -// Trigger async archiving -func TriggerJobStart(req JobWithUser) { - if jobStartChannel == nil { - log.Fatal("Cannot start Job without jobStart channel. Did you Start the worker?") - } - - jobStartPending.Add(1) - jobStartChannel <- req -} - -// Wait for background thread to finish pending archiving operations -func WaitForJobStart() { - // close channel and wait for worker to process remaining jobs - jobStartPending.Wait() -}