Skip to content

Commit

Permalink
Merge branch 'dev' into sample_resolution_select
Browse files Browse the repository at this point in the history
- Moved resample changes to metricDataDispatcher
- Added res argument to archiver, updateFootprintService
  • Loading branch information
spacehamster87 committed Sep 5, 2024
2 parents b04bf6a + 7ea4086 commit 0b7cdde
Show file tree
Hide file tree
Showing 25 changed files with 824 additions and 524 deletions.
2 changes: 1 addition & 1 deletion cmd/cc-backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func main() {
log.Fatalf("failed to initialize archive: %s", err.Error())
}

if err := metricdata.Init(config.Keys.DisableArchive); err != nil {
if err := metricdata.Init(); err != nil {
log.Fatalf("failed to initialize metricdata repository: %s", err.Error())
}

Expand Down
7 changes: 4 additions & 3 deletions cmd/cc-backend/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/99designs/gqlgen/graphql/handler"
"github.com/99designs/gqlgen/graphql/playground"
"github.com/ClusterCockpit/cc-backend/internal/api"
"github.com/ClusterCockpit/cc-backend/internal/archiver"
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph"
Expand Down Expand Up @@ -260,8 +261,8 @@ func serverStart() {
})

server = &http.Server{
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
ReadTimeout: 20 * time.Second,
WriteTimeout: 20 * time.Second,
Handler: handler,
Addr: config.Keys.Addr,
}
Expand Down Expand Up @@ -308,5 +309,5 @@ func serverShutdown() {
server.Shutdown(context.Background())

// Then, wait for any async archivings still pending...
apiHandle.JobRepository.WaitForArchiving()
archiver.WaitForArchiving()
}
11 changes: 7 additions & 4 deletions internal/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (
"testing"

"github.com/ClusterCockpit/cc-backend/internal/api"
"github.com/ClusterCockpit/cc-backend/internal/archiver"
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
Expand Down Expand Up @@ -150,10 +152,11 @@ func setup(t *testing.T) *api.RestApi {
t.Fatal(err)
}

if err := metricdata.Init(config.Keys.DisableArchive); err != nil {
if err := metricdata.Init(); err != nil {
t.Fatal(err)
}

archiver.Start(repository.GetJobRepository())
auth.Init()
graph.Init()

Expand Down Expand Up @@ -310,7 +313,7 @@ func TestRestApi(t *testing.T) {
t.Fatal(response.Status, recorder.Body.String())
}

restapi.JobRepository.WaitForArchiving()
archiver.WaitForArchiving()
resolver := graph.GetResolverInstance()
job, err := resolver.Query().Job(ctx, strconv.Itoa(int(dbid)))
if err != nil {
Expand Down Expand Up @@ -340,7 +343,7 @@ func TestRestApi(t *testing.T) {
}

t.Run("CheckArchive", func(t *testing.T) {
data, err := metricdata.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60)
data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -421,7 +424,7 @@ func TestRestApi(t *testing.T) {
t.Fatal(response.Status, recorder.Body.String())
}

restapi.JobRepository.WaitForArchiving()
archiver.WaitForArchiving()
jobid, cluster := int64(12345), "testcluster"
job, err := restapi.JobRepository.Find(&jobid, &cluster, nil)
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions internal/api/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import (
"sync"
"time"

"github.com/ClusterCockpit/cc-backend/internal/archiver"
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/importer"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/internal/util"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
Expand Down Expand Up @@ -522,7 +523,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request)
}

if r.URL.Query().Get("all-metrics") == "true" {
data, err = metricdata.LoadData(job, nil, scopes, r.Context(), resolution)
data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context(), resolution)
if err != nil {
log.Warn("Error while loading job data")
return
Expand Down Expand Up @@ -618,7 +619,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
resolution = max(resolution, mc.Timestep)
}

data, err := metricdata.LoadData(job, metrics, scopes, r.Context(), resolution)
data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context(), resolution)
if err != nil {
log.Warn("Error while loading job data")
return
Expand Down Expand Up @@ -1095,7 +1096,7 @@ func (api *RestApi) checkAndHandleStopJob(rw http.ResponseWriter, job *schema.Jo
}

// Trigger async archiving
api.JobRepository.TriggerArchiving(job)
archiver.TriggerArchiving(job)
}

func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
Expand Down
94 changes: 94 additions & 0 deletions internal/archiver/archiveWorker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package archiver

import (
"context"
"sync"
"time"

"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
sq "github.com/Masterminds/squirrel"
)

var (
archivePending sync.WaitGroup
archiveChannel chan *schema.Job
jobRepo *repository.JobRepository
)

func Start(r *repository.JobRepository) {
archiveChannel = make(chan *schema.Job, 128)
jobRepo = r

go archivingWorker()
}

// Archiving worker thread
func archivingWorker() {
for {
select {
case job, ok := <-archiveChannel:
if !ok {
break
}
start := time.Now()
// not using meta data, called to load JobMeta into Cache?
// will fail if job meta not in repository
if _, err := jobRepo.FetchMetadata(job); err != nil {
log.Errorf("archiving job (dbid: %d) failed at check metadata step: %s", job.ID, err.Error())
jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
continue
}

// ArchiveJob will fetch all the data from a MetricDataRepository and push into configured archive backend
// TODO: Maybe use context with cancel/timeout here
jobMeta, err := ArchiveJob(job, context.Background())
if err != nil {
log.Errorf("archiving job (dbid: %d) failed at archiving job step: %s", job.ID, err.Error())
jobRepo.UpdateMonitoringStatus(job.ID, schema.MonitoringStatusArchivingFailed)
continue
}

stmt := sq.Update("job").Where("job.id = ?", job.ID)

if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil {
log.Errorf("archiving job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error())
continue
}
if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil {
log.Errorf("archiving job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error())
continue
}
// Update the jobs database entry one last time:
stmt = jobRepo.MarkArchived(stmt, schema.MonitoringStatusArchivingSuccessful)
if err := jobRepo.Execute(stmt); err != nil {
log.Errorf("archiving job (dbid: %d) failed at db execute: %s", job.ID, err.Error())
continue
}
log.Debugf("archiving job %d took %s", job.JobID, time.Since(start))
log.Printf("archiving job (dbid: %d) successful", job.ID)
archivePending.Done()
}
}
}

// Trigger async archiving
func TriggerArchiving(job *schema.Job) {
if archiveChannel == nil {
log.Fatal("Cannot archive without archiving channel. Did you Start the archiver?")
}

archivePending.Add(1)
archiveChannel <- job
}

// Wait for background thread to finish pending archiving operations
func WaitForArchiving() {
// close channel and wait for worker to process remaining jobs
archivePending.Wait()
}
82 changes: 82 additions & 0 deletions internal/archiver/archiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package archiver

import (
"context"
"math"

"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)

// Writes a running job to the job-archive
func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
allMetrics := make([]string, 0)
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
for _, mc := range metricConfigs {
allMetrics = append(allMetrics, mc.Name)
}

scopes := []schema.MetricScope{schema.MetricScopeNode}
// FIXME: Add a config option for this
if job.NumNodes <= 8 {
// This will add the native scope if core scope is not available
scopes = append(scopes, schema.MetricScopeCore)
}

if job.NumAcc > 0 {
scopes = append(scopes, schema.MetricScopeAccelerator)
}

jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx)
if err != nil {
log.Error("Error wile loading job data for archiving")
return nil, err
}

jobMeta := &schema.JobMeta{
BaseJob: job.BaseJob,
StartTime: job.StartTime.Unix(),
Statistics: make(map[string]schema.JobStatistics),
}

for metric, data := range jobData {
avg, min, max := 0.0, math.MaxFloat32, -math.MaxFloat32
nodeData, ok := data["node"]
if !ok {
// This should never happen ?
continue
}

for _, series := range nodeData.Series {
avg += series.Statistics.Avg
min = math.Min(min, series.Statistics.Min)
max = math.Max(max, series.Statistics.Max)
}

jobMeta.Statistics[metric] = schema.JobStatistics{
Unit: schema.Unit{
Prefix: archive.GetMetricConfig(job.Cluster, metric).Unit.Prefix,
Base: archive.GetMetricConfig(job.Cluster, metric).Unit.Base,
},
Avg: avg / float64(job.NumNodes),
Min: min,
Max: max,
}
}

// If the file based archive is disabled,
// only return the JobMeta structure as the
// statistics in there are needed.
if config.Keys.DisableArchive {
return jobMeta, nil
}

return jobMeta, archive.GetHandle().ImportJob(jobMeta, &jobData)
}
20 changes: 11 additions & 9 deletions internal/graph/schema.resolvers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions internal/graph/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/99designs/gqlgen/graphql"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/metricDataDispatcher"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
// "github.com/ClusterCockpit/cc-backend/pkg/archive"
Expand All @@ -24,8 +24,8 @@ func (r *queryResolver) rooflineHeatmap(
ctx context.Context,
filter []*model.JobFilter,
rows int, cols int,
minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) {

minX float64, minY float64, maxX float64, maxY float64,
) ([][]float64, error) {
jobs, err := r.Repo.QueryJobs(ctx, filter, &model.PageRequest{Page: 1, ItemsPerPage: MAX_JOBS_FOR_ANALYSIS + 1}, nil)
if err != nil {
log.Error("Error while querying jobs for roofline")
Expand Down Expand Up @@ -54,7 +54,7 @@ func (r *queryResolver) rooflineHeatmap(
// resolution = max(resolution, mc.Timestep)
// }

jobdata, err := metricdata.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0)
jobdata, err := metricDataDispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0)
if err != nil {
log.Errorf("Error while loading roofline metrics for job %d", job.ID)
return nil, err
Expand Down Expand Up @@ -127,7 +127,7 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF
continue
}

if err := metricdata.LoadAverages(job, metrics, avgs, ctx); err != nil {
if err := metricDataDispatcher.LoadAverages(job, metrics, avgs, ctx); err != nil {
log.Error("Error while loading averages for footprint")
return nil, err
}
Expand Down
Loading

0 comments on commit 0b7cdde

Please sign in to comment.