Skip to content

Commit

Permalink
Merge branch 'dev' into add_nats_server
Browse files Browse the repository at this point in the history
  • Loading branch information
spacehamster87 committed Sep 5, 2024
2 parents 9b3f334 + 7ea4086 commit f61b5bb
Show file tree
Hide file tree
Showing 31 changed files with 1,395 additions and 926 deletions.
20 changes: 15 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,21 @@ SVELTE_COMPONENTS = status \
header

SVELTE_TARGETS = $(addprefix $(FRONTEND)/public/build/,$(addsuffix .js, $(SVELTE_COMPONENTS)))
SVELTE_SRC = $(wildcard $(FRONTEND)/src/*.svelte) \
$(wildcard $(FRONTEND)/src/*.js) \
$(wildcard $(FRONTEND)/src/filters/*.svelte) \
$(wildcard $(FRONTEND)/src/plots/*.svelte) \
$(wildcard $(FRONTEND)/src/joblist/*.svelte)
SVELTE_SRC = $(wildcard $(FRONTEND)/src/*.svelte) \
$(wildcard $(FRONTEND)/src/*.js) \
$(wildcard $(FRONTEND)/src/analysis/*.svelte) \
$(wildcard $(FRONTEND)/src/config/*.svelte) \
$(wildcard $(FRONTEND)/src/config/admin/*.svelte) \
$(wildcard $(FRONTEND)/src/config/user/*.svelte) \
$(wildcard $(FRONTEND)/src/generic/*.js) \
$(wildcard $(FRONTEND)/src/generic/*.svelte) \
$(wildcard $(FRONTEND)/src/generic/filters/*.svelte) \
$(wildcard $(FRONTEND)/src/generic/plots/*.svelte) \
$(wildcard $(FRONTEND)/src/generic/joblist/*.svelte) \
$(wildcard $(FRONTEND)/src/generic/helper/*.svelte) \
$(wildcard $(FRONTEND)/src/generic/select/*.svelte) \
$(wildcard $(FRONTEND)/src/header/*.svelte) \
$(wildcard $(FRONTEND)/src/job/*.svelte)

.PHONY: clean distclean test tags frontend swagger graphql $(TARGET)

Expand Down
2 changes: 1 addition & 1 deletion cmd/cc-backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func main() {
log.Fatalf("failed to initialize archive: %s", err.Error())
}

if err := metricdata.Init(config.Keys.DisableArchive); err != nil {
if err := metricdata.Init(); err != nil {
log.Fatalf("failed to initialize metricdata repository: %s", err.Error())
}

Expand Down
56 changes: 17 additions & 39 deletions cmd/cc-backend/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/99designs/gqlgen/graphql/handler"
"github.com/99designs/gqlgen/graphql/playground"
"github.com/ClusterCockpit/cc-backend/internal/api"
"github.com/ClusterCockpit/cc-backend/internal/archiver"
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph"
Expand All @@ -38,6 +39,15 @@ var (
apiHandle *api.RestApi
)

func onFailureResponse(rw http.ResponseWriter, r *http.Request, err error) {
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusUnauthorized)
json.NewEncoder(rw).Encode(map[string]string{
"status": http.StatusText(http.StatusUnauthorized),
"error": err.Error(),
})
}

func serverInit() {
// Setup the http.Handler/Router used by the server
graph.Init()
Expand Down Expand Up @@ -166,64 +176,32 @@ func serverInit() {
return authHandle.AuthApi(
// On success;
next,

// On failure: JSON Response
func(rw http.ResponseWriter, r *http.Request, err error) {
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusUnauthorized)
json.NewEncoder(rw).Encode(map[string]string{
"status": http.StatusText(http.StatusUnauthorized),
"error": err.Error(),
})
})
onFailureResponse)
})

userapi.Use(func(next http.Handler) http.Handler {
return authHandle.AuthUserApi(
// On success;
next,

// On failure: JSON Response
func(rw http.ResponseWriter, r *http.Request, err error) {
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusUnauthorized)
json.NewEncoder(rw).Encode(map[string]string{
"status": http.StatusText(http.StatusUnauthorized),
"error": err.Error(),
})
})
onFailureResponse)
})

configapi.Use(func(next http.Handler) http.Handler {
return authHandle.AuthConfigApi(
// On success;
next,

// On failure: JSON Response
func(rw http.ResponseWriter, r *http.Request, err error) {
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusUnauthorized)
json.NewEncoder(rw).Encode(map[string]string{
"status": http.StatusText(http.StatusUnauthorized),
"error": err.Error(),
})
})
onFailureResponse)
})

frontendapi.Use(func(next http.Handler) http.Handler {
return authHandle.AuthFrontendApi(
// On success;
next,

// On failure: JSON Response
func(rw http.ResponseWriter, r *http.Request, err error) {
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusUnauthorized)
json.NewEncoder(rw).Encode(map[string]string{
"status": http.StatusText(http.StatusUnauthorized),
"error": err.Error(),
})
})
onFailureResponse)
})
}

Expand Down Expand Up @@ -283,8 +261,8 @@ func serverStart() {
})

server = &http.Server{
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
ReadTimeout: 20 * time.Second,
WriteTimeout: 20 * time.Second,
Handler: handler,
Addr: config.Keys.Addr,
}
Expand Down Expand Up @@ -331,5 +309,5 @@ func serverShutdown() {
server.Shutdown(context.Background())

// Then, wait for any async archivings still pending...
apiHandle.JobRepository.WaitForArchiving()
archiver.WaitForArchiving()
}
11 changes: 7 additions & 4 deletions internal/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (
"testing"

"github.com/ClusterCockpit/cc-backend/internal/api"
"github.com/ClusterCockpit/cc-backend/internal/archiver"
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
Expand Down Expand Up @@ -150,10 +152,11 @@ func setup(t *testing.T) *api.RestApi {
t.Fatal(err)
}

if err := metricdata.Init(config.Keys.DisableArchive); err != nil {
if err := metricdata.Init(); err != nil {
t.Fatal(err)
}

archiver.Start(repository.GetJobRepository())
auth.Init()
graph.Init()

Expand Down Expand Up @@ -311,7 +314,7 @@ func TestRestApi(t *testing.T) {
t.Fatal(response.Status, recorder.Body.String())
}

restapi.JobRepository.WaitForArchiving()
archiver.WaitForArchiving()
resolver := graph.GetResolverInstance()
job, err := resolver.Query().Job(ctx, strconv.Itoa(int(dbid)))
if err != nil {
Expand Down Expand Up @@ -341,7 +344,7 @@ func TestRestApi(t *testing.T) {
}

t.Run("CheckArchive", func(t *testing.T) {
data, err := metricdata.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background())
data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -422,7 +425,7 @@ func TestRestApi(t *testing.T) {
t.Fatal(response.Status, recorder.Body.String())
}

restapi.JobRepository.WaitForArchiving()
archiver.WaitForArchiving()
jobid, cluster := int64(12345), "testcluster"
job, err := restapi.JobRepository.Find(&jobid, &cluster, nil)
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions internal/api/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import (
"sync"
"time"

"github.com/ClusterCockpit/cc-backend/internal/archiver"
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/importer"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/internal/util"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
Expand Down Expand Up @@ -515,7 +516,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request)
var data schema.JobData

if r.URL.Query().Get("all-metrics") == "true" {
data, err = metricdata.LoadData(job, nil, scopes, r.Context())
data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context())
if err != nil {
log.Warn("Error while loading job data")
return
Expand Down Expand Up @@ -604,7 +605,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
scopes = []schema.MetricScope{"node"}
}

data, err := metricdata.LoadData(job, metrics, scopes, r.Context())
data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context())
if err != nil {
log.Warn("Error while loading job data")
return
Expand Down Expand Up @@ -1081,7 +1082,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
}

// Trigger async archiving
api.JobRepository.TriggerArchiving(job)
archiver.TriggerArchiving(job)
}

func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
Expand Down
94 changes: 94 additions & 0 deletions internal/archiver/archiveWorker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package archiver

import (
"context"
"sync"
"time"

"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
sq "github.com/Masterminds/squirrel"
)

var (
archivePending sync.WaitGroup
archiveChannel chan *schema.Job
jobRepo *repository.JobRepository
)

func Start(r *repository.JobRepository) {
archiveChannel = make(chan *schema.Job, 128)
jobRepo = r

go archivingWorker()
}

// Archiving worker thread
func archivingWorker() {
for {
select {
case job, ok := <-archiveChannel:
if !ok {
break
}
start := time.Now()
// not using meta data, called to load JobMeta into Cache?
// will fail if job meta not in repository
if _, err := jobRepo.FetchMetadata(job); err != nil {
log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error())
jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
continue
}

// ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
// TODO: Maybe use context with cancel/timeout here
jobMeta, err := ArchiveJob(job, context.Background())
if err != nil {
log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error())
jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
continue
}

stmt := sq.Update("job").Where("job.id = ?", job.ID)

if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil {
log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error())
continue
}
if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil {
log.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error())
continue
}
// Update the jobs database entry one last time:
stmt = jobRepo.MarkArchived(stmt, schema.MonitoringStatusArchivingSuccessful)
if err := jobRepo.Execute(stmt); err != nil {
log.Errorf("archiving job (dbid: %d) failed at db execute: %s", job.ID, err.Error())
continue
}
log.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
log.Printf("archiving job (dbid: %d) successful", job.ID)
archivePending.Done()
}
}
}

// Trigger async archiving
func TriggerArchiving(job *schema.Job) {
if archiveChannel == nil {
log.Fatal("Cannot archive without archiving channel. Did you Start the archiver?")
}

archivePending.Add(1)
archiveChannel <- job
}

// Wait for background thread to finish pending archiving operations
func WaitForArchiving() {
// close channel and wait for worker to process remaining jobs
archivePending.Wait()
}
Loading

0 comments on commit f61b5bb

Please sign in to comment.