Skip to content

Commit

Permalink
Merge branch 'sample_resolution_select' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
spacehamster87 committed Sep 24, 2024
2 parents 827f6da + 21e4870 commit 8e3327e
Show file tree
Hide file tree
Showing 42 changed files with 824 additions and 240 deletions.
2 changes: 1 addition & 1 deletion api/schema.graphqls
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ type Query {
allocatedNodes(cluster: String!): [Count!]!

job(id: ID!): Job
jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobMetricWithName!]!
jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!], resolution: Int): [JobMetricWithName!]!
jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints

jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList!
Expand Down
5 changes: 2 additions & 3 deletions internal/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ func cleanup() {
func TestRestApi(t *testing.T) {
restapi := setup(t)
t.Cleanup(cleanup)

testData := schema.JobData{
"load_one": map[schema.MetricScope]*schema.JobMetric{
schema.MetricScopeNode: {
Expand All @@ -192,7 +191,7 @@ func TestRestApi(t *testing.T) {
},
}

metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
return testData, nil
}

Expand Down Expand Up @@ -344,7 +343,7 @@ func TestRestApi(t *testing.T) {
}

t.Run("CheckArchive", func(t *testing.T) {
data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background())
data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60)
if err != nil {
t.Fatal(err)
}
Expand Down
20 changes: 17 additions & 3 deletions internal/api/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,15 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request)

var data schema.JobData

metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
resolution := 0

for _, mc := range metricConfigs {
resolution = max(resolution, mc.Timestep)
}

if r.URL.Query().Get("all-metrics") == "true" {
data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context())
data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context(), resolution)
if err != nil {
log.Warn("Error while loading job data")
return
Expand Down Expand Up @@ -606,7 +613,14 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
scopes = []schema.MetricScope{"node"}
}

data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context())
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
resolution := 0

for _, mc := range metricConfigs {
resolution = max(resolution, mc.Timestep)
}

data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context(), resolution)
if err != nil {
log.Warn("Error while loading job data")
return
Expand Down Expand Up @@ -1114,7 +1128,7 @@ func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
}

resolver := graph.GetResolverInstance()
data, err := resolver.Query().JobMetrics(r.Context(), id, metrics, scopes)
data, err := resolver.Query().JobMetrics(r.Context(), id, metrics, scopes, nil)
if err != nil {
json.NewEncoder(rw).Encode(Respone{
Error: &struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/archiver/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
scopes = append(scopes, schema.MetricScopeAccelerator)
}

jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx)
jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx, 0) // 0 Resulotion-Value retrieves highest res (60s)
if err != nil {
log.Error("Error wile loading job data for archiving")
return nil, err
Expand Down
19 changes: 14 additions & 5 deletions internal/graph/generated/generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 14 additions & 10 deletions internal/graph/schema.resolvers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion internal/graph/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@ func (r *queryResolver) rooflineHeatmap(
continue
}

jobdata, err := metricDataDispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx)
// metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
// resolution := 0

// for _, mc := range metricConfigs {
// resolution = max(resolution, mc.Timestep)
// }

jobdata, err := metricDataDispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0)
if err != nil {
log.Errorf("Error while loading roofline metrics for job %d", job.ID)
return nil, err
Expand Down
33 changes: 28 additions & 5 deletions internal/metricDataDispatcher/dataLoader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/resampler"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)

Expand All @@ -23,20 +24,22 @@ func cacheKey(
job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
resolution int,
) string {
// Duration and StartTime do not need to be in the cache key as StartTime is less unique than
// job.ID and the TTL of the cache entry makes sure it does not stay there forever.
return fmt.Sprintf("%d(%s):[%v],[%v]",
job.ID, job.State, metrics, scopes)
return fmt.Sprintf("%d(%s):[%v],[%v]-%d",
job.ID, job.State, metrics, scopes, resolution)
}

// Fetches the metric data for a job.
func LoadData(job *schema.Job,
metrics []string,
scopes []schema.MetricScope,
ctx context.Context,
resolution int,
) (schema.JobData, error) {
data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) {
data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ interface{}, ttl time.Duration, size int) {
var jd schema.JobData
var err error

Expand All @@ -60,7 +63,7 @@ func LoadData(job *schema.Job,
}
}

jd, err = repo.LoadData(job, metrics, scopes, ctx)
jd, err = repo.LoadData(job, metrics, scopes, ctx, resolution)
if err != nil {
if len(jd) != 0 {
log.Warnf("partial error: %s", err.Error())
Expand All @@ -72,12 +75,31 @@ func LoadData(job *schema.Job,
}
size = jd.Size()
} else {
jd, err = archive.GetHandle().LoadJobData(job)
var jd_temp schema.JobData
jd_temp, err = archive.GetHandle().LoadJobData(job)
if err != nil {
log.Error("Error while loading job data from archive")
return err, 0, 0
}

//Deep copy the cached archive hashmap
jd = metricdata.DeepCopy(jd_temp)

//Resampling for archived data.
//Pass the resolution from frontend here.
for _, v := range jd {
for _, v_ := range v {
timestep := 0
for i := 0; i < len(v_.Series); i += 1 {
v_.Series[i].Data, timestep, err = resampler.LargestTriangleThreeBucket(v_.Series[i].Data, v_.Timestep, resolution)
if err != nil {
return err, 0, 0
}
}
v_.Timestep = timestep
}
}

// Avoid sending unrequested data to the client:
if metrics != nil || scopes != nil {
if metrics == nil {
Expand Down Expand Up @@ -117,6 +139,7 @@ func LoadData(job *schema.Job,
}

// FIXME: Review: Is this really necessary or correct.
// Note: Lines 142-170 formerly known as prepareJobData(jobData, scoeps)
// For /monitoring/job/<job> and some other places, flops_any and mem_bw need
// to be available at the scope 'node'. If a job has a lot of nodes,
// statisticsSeries should be available so that a min/median/max Graph can be
Expand Down
Loading

0 comments on commit 8e3327e

Please sign in to comment.