diff --git a/api/schema.graphqls b/api/schema.graphqls index 568c15d8..ed94d422 100644 --- a/api/schema.graphqls +++ b/api/schema.graphqls @@ -113,6 +113,7 @@ type Tag { id: ID! type: String! name: String! + scope: String! } type Resource { @@ -223,7 +224,7 @@ type Query { allocatedNodes(cluster: String!): [Count!]! job(id: ID!): Job - jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobMetricWithName!]! + jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!], resolution: Int): [JobMetricWithName!]! jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList! @@ -235,7 +236,7 @@ type Query { } type Mutation { - createTag(type: String!, name: String!): Tag! + createTag(type: String!, name: String!, scope: String!): Tag! deleteTag(id: ID!): ID! addTagsToJob(job: ID!, tagIds: [ID!]!): [Tag!]! removeTagsFromJob(job: ID!, tagIds: [ID!]!): [Tag!]! diff --git a/internal/api/api_test.go b/internal/api/api_test.go index aa0a1f4b..0312e43c 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -175,7 +175,6 @@ func cleanup() { func TestRestApi(t *testing.T) { restapi := setup(t) t.Cleanup(cleanup) - testData := schema.JobData{ "load_one": map[schema.MetricScope]*schema.JobMetric{ schema.MetricScopeNode: { @@ -192,7 +191,7 @@ func TestRestApi(t *testing.T) { }, } - metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { + metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) { return testData, nil } @@ -215,7 +214,7 @@ func TestRestApi(t *testing.T) { "exclusive": 1, "monitoringStatus": 1, "smt": 1, - "tags": [{ "type": "testTagType", "name": "testTagName" }], + "tags": [{ "type": "testTagType", "name": "testTagName", "scope": "testuser" }], "resources": [ { "hostname": "host123", @@ -283,7 +282,7 @@ func TestRestApi(t *testing.T) { t.Fatalf("unexpected job properties: %#v", job) } - if len(job.Tags) != 1 || job.Tags[0].Type != "testTagType" || job.Tags[0].Name != "testTagName" { + if len(job.Tags) != 1 || job.Tags[0].Type != "testTagType" || job.Tags[0].Name != "testTagName" || job.Tags[0].Scope != "testuser" { t.Fatalf("unexpected tags: %#v", job.Tags) } @@ -344,7 +343,7 @@ func TestRestApi(t *testing.T) { } t.Run("CheckArchive", func(t *testing.T) { - data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background()) + data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60) if err != nil { t.Fatal(err) } diff --git a/internal/api/rest.go b/internal/api/rest.go index b7370906..62b2f192 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -177,8 +177,9 @@ type ErrorResponse struct { // ApiTag model type ApiTag struct { // Tag Type - Type string `json:"type" example:"Debug"` - Name string `json:"name" example:"Testjob"` // Tag Name + Type string `json:"type" example:"Debug"` + Name string `json:"name" example:"Testjob"` // Tag Name + Scope string `json:"scope" example:"global"` // Tag Scope for Frontend Display } // ApiMeta model @@ -420,7 +421,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { StartTime: job.StartTime.Unix(), } - res.Tags, err = api.JobRepository.GetTags(&job.ID) + res.Tags, err = api.JobRepository.GetTags(r.Context(), &job.ID) if err != nil { handleError(err, http.StatusInternalServerError, rw) return @@ -493,7 +494,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) return } - job.Tags, err = api.JobRepository.GetTags(&job.ID) + job.Tags, err = api.JobRepository.GetTags(r.Context(), &job.ID) if err != nil { handleError(err, http.StatusInternalServerError, rw) return @@ -515,8 +516,15 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request) var data schema.JobData + metricConfigs := archive.GetCluster(job.Cluster).MetricConfig + resolution := 0 + + for _, mc := range metricConfigs { + resolution = max(resolution, mc.Timestep) + } + if r.URL.Query().Get("all-metrics") == "true" { - data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context()) + data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context(), resolution) if err != nil { log.Warn("Error while loading job data") return @@ -579,7 +587,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) { return } - job.Tags, err = api.JobRepository.GetTags(&job.ID) + job.Tags, err = api.JobRepository.GetTags(r.Context(), &job.ID) if err != nil { handleError(err, http.StatusInternalServerError, rw) return @@ -605,7 +613,14 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) { scopes = []schema.MetricScope{"node"} } - data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context()) + metricConfigs := archive.GetCluster(job.Cluster).MetricConfig + resolution := 0 + + for _, mc := range metricConfigs { + resolution = max(resolution, mc.Timestep) + } + + data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context(), resolution) if err != nil { log.Warn("Error while loading job data") return @@ -687,6 +702,7 @@ func (api *RestApi) editMeta(rw http.ResponseWriter, r *http.Request) { // @summary Adds one or more tags to a job // @tags Job add and modify // @description Adds tag(s) to a job specified by DB ID. Name and Type of Tag(s) can be chosen freely. +// @description Tag Scope for frontend visibility will default to "global" if none entered, other options: "admin" or specific username. // @description If tagged job is already finished: Tag will be written directly to respective archive files. // @accept json // @produce json @@ -712,7 +728,7 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) { return } - job.Tags, err = api.JobRepository.GetTags(&job.ID) + job.Tags, err = api.JobRepository.GetTags(r.Context(), &job.ID) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return @@ -725,16 +741,17 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) { } for _, tag := range req { - tagId, err := api.JobRepository.AddTagOrCreate(job.ID, tag.Type, tag.Name) + tagId, err := api.JobRepository.AddTagOrCreate(r.Context(), job.ID, tag.Type, tag.Name, tag.Scope) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return } job.Tags = append(job.Tags, &schema.Tag{ - ID: tagId, - Type: tag.Type, - Name: tag.Name, + ID: tagId, + Type: tag.Type, + Name: tag.Name, + Scope: tag.Scope, }) } @@ -802,7 +819,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { unlockOnce.Do(api.RepositoryMutex.Unlock) for _, tag := range req.Tags { - if _, err := api.JobRepository.AddTagOrCreate(id, tag.Type, tag.Name); err != nil { + if _, err := api.JobRepository.AddTagOrCreate(r.Context(), id, tag.Type, tag.Name, tag.Scope); err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) handleError(fmt.Errorf("adding tag to new job %d failed: %w", id, err), http.StatusInternalServerError, rw) return @@ -1111,7 +1128,7 @@ func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) { } resolver := graph.GetResolverInstance() - data, err := resolver.Query().JobMetrics(r.Context(), id, metrics, scopes) + data, err := resolver.Query().JobMetrics(r.Context(), id, metrics, scopes, nil) if err != nil { json.NewEncoder(rw).Encode(Respone{ Error: &struct { diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index de84cf0c..1c4a3ec1 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -34,7 +34,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { scopes = append(scopes, schema.MetricScopeAccelerator) } - jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx) + jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx, 0) // 0 Resulotion-Value retrieves highest res (60s) if err != nil { log.Error("Error wile loading job data for archiving") return nil, err diff --git a/internal/auth/auth.go b/internal/auth/auth.go index e45fa9df..b6e4cbe6 100644 --- a/internal/auth/auth.go +++ b/internal/auth/auth.go @@ -145,14 +145,18 @@ func GetAuthInstance() *Authentication { func persistUser(user *schema.User) { r := repository.GetUserRepository() - _, err := r.GetUser(user.Username) + dbUser, err := r.GetUser(user.Username) if err != nil && err != sql.ErrNoRows { log.Errorf("Error while loading user '%s': %v", user.Username, err) - } else if err == sql.ErrNoRows { + } else if err == sql.ErrNoRows { // Adds New User if err := r.AddUser(user); err != nil { log.Errorf("Error while adding user '%s' to DB: %v", user.Username, err) } + } else { // Update Existing + if err := r.UpdateUser(dbUser, user); err != nil { + log.Errorf("Error while updating user '%s' to DB: %v", user.Username, err) + } } } diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index 9ca0a608..3fd3649e 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -229,7 +229,7 @@ type ComplexityRoot struct { Mutation struct { AddTagsToJob func(childComplexity int, job string, tagIds []string) int - CreateTag func(childComplexity int, typeArg string, name string) int + CreateTag func(childComplexity int, typeArg string, name string, scope string) int DeleteTag func(childComplexity int, id string) int RemoveTagsFromJob func(childComplexity int, job string, tagIds []string) int UpdateConfiguration func(childComplexity int, name string, value string) int @@ -246,7 +246,7 @@ type ComplexityRoot struct { Clusters func(childComplexity int) int GlobalMetrics func(childComplexity int) int Job func(childComplexity int, id string) int - JobMetrics func(childComplexity int, id string, metrics []string, scopes []schema.MetricScope) int + JobMetrics func(childComplexity int, id string, metrics []string, scopes []schema.MetricScope, resolution *int) int Jobs func(childComplexity int, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) int JobsFootprints func(childComplexity int, filter []*model.JobFilter, metrics []string) int JobsStatistics func(childComplexity int, filter []*model.JobFilter, metrics []string, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate) int @@ -303,9 +303,10 @@ type ComplexityRoot struct { } Tag struct { - ID func(childComplexity int) int - Name func(childComplexity int) int - Type func(childComplexity int) int + ID func(childComplexity int) int + Name func(childComplexity int) int + Scope func(childComplexity int) int + Type func(childComplexity int) int } TimeRangeOutput struct { @@ -355,7 +356,7 @@ type MetricValueResolver interface { Name(ctx context.Context, obj *schema.MetricValue) (*string, error) } type MutationResolver interface { - CreateTag(ctx context.Context, typeArg string, name string) (*schema.Tag, error) + CreateTag(ctx context.Context, typeArg string, name string, scope string) (*schema.Tag, error) DeleteTag(ctx context.Context, id string) (string, error) AddTagsToJob(ctx context.Context, job string, tagIds []string) ([]*schema.Tag, error) RemoveTagsFromJob(ctx context.Context, job string, tagIds []string) ([]*schema.Tag, error) @@ -368,7 +369,7 @@ type QueryResolver interface { User(ctx context.Context, username string) (*model.User, error) AllocatedNodes(ctx context.Context, cluster string) ([]*model.Count, error) Job(ctx context.Context, id string) (*schema.Job, error) - JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobMetricWithName, error) + JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope, resolution *int) ([]*model.JobMetricWithName, error) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) Jobs(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) (*model.JobResultList, error) JobsStatistics(ctx context.Context, filter []*model.JobFilter, metrics []string, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) @@ -1183,7 +1184,7 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return 0, false } - return e.complexity.Mutation.CreateTag(childComplexity, args["type"].(string), args["name"].(string)), true + return e.complexity.Mutation.CreateTag(childComplexity, args["type"].(string), args["name"].(string), args["scope"].(string)), true case "Mutation.deleteTag": if e.complexity.Mutation.DeleteTag == nil { @@ -1290,7 +1291,7 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return 0, false } - return e.complexity.Query.JobMetrics(childComplexity, args["id"].(string), args["metrics"].([]string), args["scopes"].([]schema.MetricScope)), true + return e.complexity.Query.JobMetrics(childComplexity, args["id"].(string), args["metrics"].([]string), args["scopes"].([]schema.MetricScope), args["resolution"].(*int)), true case "Query.jobs": if e.complexity.Query.Jobs == nil { @@ -1602,6 +1603,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Tag.Name(childComplexity), true + case "Tag.scope": + if e.complexity.Tag.Scope == nil { + break + } + + return e.complexity.Tag.Scope(childComplexity), true + case "Tag.type": if e.complexity.Tag.Type == nil { break @@ -1949,6 +1957,7 @@ type Tag { id: ID! type: String! name: String! + scope: String! } type Resource { @@ -2059,7 +2068,7 @@ type Query { allocatedNodes(cluster: String!): [Count!]! job(id: ID!): Job - jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobMetricWithName!]! + jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!], resolution: Int): [JobMetricWithName!]! jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList! @@ -2071,7 +2080,7 @@ type Query { } type Mutation { - createTag(type: String!, name: String!): Tag! + createTag(type: String!, name: String!, scope: String!): Tag! deleteTag(id: ID!): ID! addTagsToJob(job: ID!, tagIds: [ID!]!): [Tag!]! removeTagsFromJob(job: ID!, tagIds: [ID!]!): [Tag!]! @@ -2244,6 +2253,15 @@ func (ec *executionContext) field_Mutation_createTag_args(ctx context.Context, r } } args["name"] = arg1 + var arg2 string + if tmp, ok := rawArgs["scope"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("scope")) + arg2, err = ec.unmarshalNString2string(ctx, tmp) + if err != nil { + return nil, err + } + } + args["scope"] = arg2 return args, nil } @@ -2370,6 +2388,15 @@ func (ec *executionContext) field_Query_jobMetrics_args(ctx context.Context, raw } } args["scopes"] = arg2 + var arg3 *int + if tmp, ok := rawArgs["resolution"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("resolution")) + arg3, err = ec.unmarshalOInt2áš–int(ctx, tmp) + if err != nil { + return nil, err + } + } + args["resolution"] = arg3 return args, nil } @@ -4622,6 +4649,8 @@ func (ec *executionContext) fieldContext_Job_tags(_ context.Context, field graph return ec.fieldContext_Tag_type(ctx, field) case "name": return ec.fieldContext_Tag_name(ctx, field) + case "scope": + return ec.fieldContext_Tag_scope(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type Tag", field.Name) }, @@ -7680,7 +7709,7 @@ func (ec *executionContext) _Mutation_createTag(ctx context.Context, field graph }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return ec.resolvers.Mutation().CreateTag(rctx, fc.Args["type"].(string), fc.Args["name"].(string)) + return ec.resolvers.Mutation().CreateTag(rctx, fc.Args["type"].(string), fc.Args["name"].(string), fc.Args["scope"].(string)) }) if err != nil { ec.Error(ctx, err) @@ -7711,6 +7740,8 @@ func (ec *executionContext) fieldContext_Mutation_createTag(ctx context.Context, return ec.fieldContext_Tag_type(ctx, field) case "name": return ec.fieldContext_Tag_name(ctx, field) + case "scope": + return ec.fieldContext_Tag_scope(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type Tag", field.Name) }, @@ -7829,6 +7860,8 @@ func (ec *executionContext) fieldContext_Mutation_addTagsToJob(ctx context.Conte return ec.fieldContext_Tag_type(ctx, field) case "name": return ec.fieldContext_Tag_name(ctx, field) + case "scope": + return ec.fieldContext_Tag_scope(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type Tag", field.Name) }, @@ -7892,6 +7925,8 @@ func (ec *executionContext) fieldContext_Mutation_removeTagsFromJob(ctx context. return ec.fieldContext_Tag_type(ctx, field) case "name": return ec.fieldContext_Tag_name(ctx, field) + case "scope": + return ec.fieldContext_Tag_scope(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type Tag", field.Name) }, @@ -8199,6 +8234,8 @@ func (ec *executionContext) fieldContext_Query_tags(_ context.Context, field gra return ec.fieldContext_Tag_type(ctx, field) case "name": return ec.fieldContext_Tag_name(ctx, field) + case "scope": + return ec.fieldContext_Tag_scope(ctx, field) } return nil, fmt.Errorf("no field named %q was found under type Tag", field.Name) }, @@ -8499,7 +8536,7 @@ func (ec *executionContext) _Query_jobMetrics(ctx context.Context, field graphql }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return ec.resolvers.Query().JobMetrics(rctx, fc.Args["id"].(string), fc.Args["metrics"].([]string), fc.Args["scopes"].([]schema.MetricScope)) + return ec.resolvers.Query().JobMetrics(rctx, fc.Args["id"].(string), fc.Args["metrics"].([]string), fc.Args["scopes"].([]schema.MetricScope), fc.Args["resolution"].(*int)) }) if err != nil { ec.Error(ctx, err) @@ -10547,6 +10584,50 @@ func (ec *executionContext) fieldContext_Tag_name(_ context.Context, field graph return fc, nil } +func (ec *executionContext) _Tag_scope(ctx context.Context, field graphql.CollectedField, obj *schema.Tag) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Tag_scope(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Scope, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Tag_scope(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Tag", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _TimeRangeOutput_from(ctx context.Context, field graphql.CollectedField, obj *model.TimeRangeOutput) (ret graphql.Marshaler) { fc, err := ec.fieldContext_TimeRangeOutput_from(ctx, field) if err != nil { @@ -15666,6 +15747,11 @@ func (ec *executionContext) _Tag(ctx context.Context, sel ast.SelectionSet, obj if out.Values[i] == graphql.Null { out.Invalids++ } + case "scope": + out.Values[i] = ec._Tag_scope(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } default: panic("unknown field " + strconv.Quote(field.Name)) } diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 6177bce9..ef17e1d9 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "slices" "strconv" "strings" "time" @@ -29,7 +30,7 @@ func (r *clusterResolver) Partitions(ctx context.Context, obj *schema.Cluster) ( // Tags is the resolver for the tags field. func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error) { - return r.Repo.GetTags(&obj.ID) + return r.Repo.GetTags(ctx, &obj.ID) } // ConcurrentJobs is the resolver for the concurrentJobs field. @@ -86,14 +87,14 @@ func (r *metricValueResolver) Name(ctx context.Context, obj *schema.MetricValue) } // CreateTag is the resolver for the createTag field. -func (r *mutationResolver) CreateTag(ctx context.Context, typeArg string, name string) (*schema.Tag, error) { - id, err := r.Repo.CreateTag(typeArg, name) +func (r *mutationResolver) CreateTag(ctx context.Context, typeArg string, name string, scope string) (*schema.Tag, error) { + id, err := r.Repo.CreateTag(typeArg, name, scope) if err != nil { log.Warn("Error while creating tag") return nil, err } - return &schema.Tag{ID: id, Type: typeArg, Name: name}, nil + return &schema.Tag{ID: id, Type: typeArg, Name: name, Scope: scope}, nil } // DeleteTag is the resolver for the deleteTag field. @@ -103,6 +104,7 @@ func (r *mutationResolver) DeleteTag(ctx context.Context, id string) (string, er // AddTagsToJob is the resolver for the addTagsToJob field. func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds []string) ([]*schema.Tag, error) { + // Selectable Tags Pre-Filtered by Scope in Frontend: No backend check required jid, err := strconv.ParseInt(job, 10, 64) if err != nil { log.Warn("Error while adding tag to job") @@ -117,7 +119,7 @@ func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds return nil, err } - if tags, err = r.Repo.AddTag(jid, tid); err != nil { + if tags, err = r.Repo.AddTag(ctx, jid, tid); err != nil { log.Warn("Error while adding tag") return nil, err } @@ -128,6 +130,7 @@ func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds // RemoveTagsFromJob is the resolver for the removeTagsFromJob field. func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, tagIds []string) ([]*schema.Tag, error) { + // Removable Tags Pre-Filtered by Scope in Frontend: No backend check required jid, err := strconv.ParseInt(job, 10, 64) if err != nil { log.Warn("Error while parsing job id") @@ -142,7 +145,7 @@ func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, ta return nil, err } - if tags, err = r.Repo.RemoveTag(jid, tid); err != nil { + if tags, err = r.Repo.RemoveTag(ctx, jid, tid); err != nil { log.Warn("Error while removing tag") return nil, err } @@ -168,7 +171,7 @@ func (r *queryResolver) Clusters(ctx context.Context) ([]*schema.Cluster, error) // Tags is the resolver for the tags field. func (r *queryResolver) Tags(ctx context.Context) ([]*schema.Tag, error) { - return r.Repo.GetTags(nil) + return r.Repo.GetTags(ctx, nil) } // GlobalMetrics is the resolver for the globalMetrics field. @@ -224,14 +227,19 @@ func (r *queryResolver) Job(ctx context.Context, id string) (*schema.Job, error) } // JobMetrics is the resolver for the jobMetrics field. -func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope) ([]*model.JobMetricWithName, error) { +func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []string, scopes []schema.MetricScope, resolution *int) ([]*model.JobMetricWithName, error) { + if resolution == nil && config.Keys.EnableResampling != nil { + defaultRes := slices.Max(config.Keys.EnableResampling.Resolutions) + resolution = &defaultRes + } + job, err := r.Query().Job(ctx, id) if err != nil { log.Warn("Error while querying job for metrics") return nil, err } - data, err := metricDataDispatcher.LoadData(job, metrics, scopes, ctx) + data, err := metricDataDispatcher.LoadData(job, metrics, scopes, ctx, *resolution) if err != nil { log.Warn("Error while loading job data") return nil, err @@ -440,11 +448,9 @@ func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } // SubCluster returns generated.SubClusterResolver implementation. func (r *Resolver) SubCluster() generated.SubClusterResolver { return &subClusterResolver{r} } -type ( - clusterResolver struct{ *Resolver } - jobResolver struct{ *Resolver } - metricValueResolver struct{ *Resolver } - mutationResolver struct{ *Resolver } - queryResolver struct{ *Resolver } - subClusterResolver struct{ *Resolver } -) +type clusterResolver struct{ *Resolver } +type jobResolver struct{ *Resolver } +type metricValueResolver struct{ *Resolver } +type mutationResolver struct{ *Resolver } +type queryResolver struct{ *Resolver } +type subClusterResolver struct{ *Resolver } diff --git a/internal/graph/util.go b/internal/graph/util.go index 8296a020..c2bd73df 100644 --- a/internal/graph/util.go +++ b/internal/graph/util.go @@ -47,7 +47,14 @@ func (r *queryResolver) rooflineHeatmap( continue } - jobdata, err := metricDataDispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx) + // metricConfigs := archive.GetCluster(job.Cluster).MetricConfig + // resolution := 0 + + // for _, mc := range metricConfigs { + // resolution = max(resolution, mc.Timestep) + // } + + jobdata, err := metricDataDispatcher.LoadData(job, []string{"flops_any", "mem_bw"}, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0) if err != nil { log.Errorf("Error while loading roofline metrics for job %d", job.ID) return nil, err diff --git a/internal/importer/handleImport.go b/internal/importer/handleImport.go index 35403f67..153402ad 100644 --- a/internal/importer/handleImport.go +++ b/internal/importer/handleImport.go @@ -120,8 +120,8 @@ func HandleImportFlag(flag string) error { } for _, tag := range job.Tags { - if _, err := r.AddTagOrCreate(id, tag.Type, tag.Name); err != nil { - log.Error("Error while adding or creating tag") + if err := r.ImportTag(id, tag.Type, tag.Name, tag.Scope); err != nil { + log.Error("Error while adding or creating tag on import") return err } } diff --git a/internal/metricDataDispatcher/dataLoader.go b/internal/metricDataDispatcher/dataLoader.go index 2c7cfa65..121fbf4e 100644 --- a/internal/metricDataDispatcher/dataLoader.go +++ b/internal/metricDataDispatcher/dataLoader.go @@ -14,6 +14,7 @@ import ( "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/lrucache" + "github.com/ClusterCockpit/cc-backend/pkg/resampler" "github.com/ClusterCockpit/cc-backend/pkg/schema" ) @@ -23,11 +24,12 @@ func cacheKey( job *schema.Job, metrics []string, scopes []schema.MetricScope, + resolution int, ) string { // Duration and StartTime do not need to be in the cache key as StartTime is less unique than // job.ID and the TTL of the cache entry makes sure it does not stay there forever. - return fmt.Sprintf("%d(%s):[%v],[%v]", - job.ID, job.State, metrics, scopes) + return fmt.Sprintf("%d(%s):[%v],[%v]-%d", + job.ID, job.State, metrics, scopes, resolution) } // Fetches the metric data for a job. @@ -35,8 +37,9 @@ func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, + resolution int, ) (schema.JobData, error) { - data := cache.Get(cacheKey(job, metrics, scopes), func() (_ interface{}, ttl time.Duration, size int) { + data := cache.Get(cacheKey(job, metrics, scopes, resolution), func() (_ interface{}, ttl time.Duration, size int) { var jd schema.JobData var err error @@ -60,7 +63,7 @@ func LoadData(job *schema.Job, } } - jd, err = repo.LoadData(job, metrics, scopes, ctx) + jd, err = repo.LoadData(job, metrics, scopes, ctx, resolution) if err != nil { if len(jd) != 0 { log.Warnf("partial error: %s", err.Error()) @@ -72,12 +75,31 @@ func LoadData(job *schema.Job, } size = jd.Size() } else { - jd, err = archive.GetHandle().LoadJobData(job) + var jd_temp schema.JobData + jd_temp, err = archive.GetHandle().LoadJobData(job) if err != nil { log.Error("Error while loading job data from archive") return err, 0, 0 } + //Deep copy the cached archive hashmap + jd = metricdata.DeepCopy(jd_temp) + + //Resampling for archived data. + //Pass the resolution from frontend here. + for _, v := range jd { + for _, v_ := range v { + timestep := 0 + for i := 0; i < len(v_.Series); i += 1 { + v_.Series[i].Data, timestep, err = resampler.LargestTriangleThreeBucket(v_.Series[i].Data, v_.Timestep, resolution) + if err != nil { + return err, 0, 0 + } + } + v_.Timestep = timestep + } + } + // Avoid sending unrequested data to the client: if metrics != nil || scopes != nil { if metrics == nil { @@ -117,6 +139,7 @@ func LoadData(job *schema.Job, } // FIXME: Review: Is this really necessary or correct. + // Note: Lines 142-170 formerly known as prepareJobData(jobData, scoeps) // For /monitoring/job/ and some other places, flops_any and mem_bw need // to be available at the scope 'node'. If a job has a lot of nodes, // statisticsSeries should be available so that a min/median/max Graph can be diff --git a/internal/metricdata/cc-metric-store.go b/internal/metricdata/cc-metric-store.go index e564db6c..f2853e3b 100644 --- a/internal/metricdata/cc-metric-store.go +++ b/internal/metricdata/cc-metric-store.go @@ -55,6 +55,7 @@ type ApiQuery struct { SubType *string `json:"subtype,omitempty"` Metric string `json:"metric"` Hostname string `json:"host"` + Resolution int `json:"resolution"` TypeIds []string `json:"type-ids,omitempty"` SubTypeIds []string `json:"subtype-ids,omitempty"` Aggregate bool `json:"aggreg"` @@ -66,13 +67,14 @@ type ApiQueryResponse struct { } type ApiMetricData struct { - Error *string `json:"error"` - Data []schema.Float `json:"data"` - From int64 `json:"from"` - To int64 `json:"to"` - Avg schema.Float `json:"avg"` - Min schema.Float `json:"min"` - Max schema.Float `json:"max"` + Error *string `json:"error"` + Data []schema.Float `json:"data"` + From int64 `json:"from"` + To int64 `json:"to"` + Resolution int `json:"resolution"` + Avg schema.Float `json:"avg"` + Min schema.Float `json:"min"` + Max schema.Float `json:"max"` } func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error { @@ -83,7 +85,7 @@ func (ccms *CCMetricStore) Init(rawConfig json.RawMessage) error { } ccms.url = config.Url - ccms.queryEndpoint = fmt.Sprintf("%s/api/query", config.Url) + ccms.queryEndpoint = fmt.Sprintf("%s/api/query/", config.Url) ccms.jwt = config.Token ccms.client = http.Client{ Timeout: 10 * time.Second, @@ -129,7 +131,7 @@ func (ccms *CCMetricStore) doRequest( return nil, err } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, ccms.queryEndpoint, buf) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, ccms.queryEndpoint, buf) if err != nil { log.Warn("Error while building request body") return nil, err @@ -138,6 +140,13 @@ func (ccms *CCMetricStore) doRequest( req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ccms.jwt)) } + // versioning the cc-metric-store query API. + // v2 = data with resampling + // v1 = data without resampling + q := req.URL.Query() + q.Add("version", "v2") + req.URL.RawQuery = q.Encode() + res, err := ccms.client.Do(req) if err != nil { log.Error("Error while performing request") @@ -162,8 +171,9 @@ func (ccms *CCMetricStore) LoadData( metrics []string, scopes []schema.MetricScope, ctx context.Context, + resolution int, ) (schema.JobData, error) { - queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes) + queries, assignedScope, err := ccms.buildQueries(job, metrics, scopes, resolution) if err != nil { log.Warn("Error while building queries") return nil, err @@ -195,11 +205,17 @@ func (ccms *CCMetricStore) LoadData( jobData[metric] = make(map[schema.MetricScope]*schema.JobMetric) } + res := row[0].Resolution + if res == 0 { + res = mc.Timestep + } + jobMetric, ok := jobData[metric][scope] + if !ok { jobMetric = &schema.JobMetric{ Unit: mc.Unit, - Timestep: mc.Timestep, + Timestep: res, Series: make([]schema.Series, 0), } jobData[metric][scope] = jobMetric @@ -251,7 +267,6 @@ func (ccms *CCMetricStore) LoadData( /* Returns list for "partial errors" */ return jobData, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) } - return jobData, nil } @@ -267,6 +282,7 @@ func (ccms *CCMetricStore) buildQueries( job *schema.Job, metrics []string, scopes []schema.MetricScope, + resolution int, ) ([]ApiQuery, []schema.MetricScope, error) { queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) assignedScope := []schema.MetricScope{} @@ -318,11 +334,12 @@ func (ccms *CCMetricStore) buildQueries( } queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &acceleratorString, - TypeIds: host.Accelerators, + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: false, + Type: &acceleratorString, + TypeIds: host.Accelerators, + Resolution: resolution, }) assignedScope = append(assignedScope, schema.MetricScopeAccelerator) continue @@ -335,11 +352,12 @@ func (ccms *CCMetricStore) buildQueries( } queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &acceleratorString, - TypeIds: host.Accelerators, + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &acceleratorString, + TypeIds: host.Accelerators, + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -348,11 +366,12 @@ func (ccms *CCMetricStore) buildQueries( // HWThread -> HWThead if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeHWThread { queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &hwthreadString, - TypeIds: intToStringSlice(hwthreads), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: false, + Type: &hwthreadString, + TypeIds: intToStringSlice(hwthreads), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -363,11 +382,12 @@ func (ccms *CCMetricStore) buildQueries( cores, _ := topology.GetCoresFromHWThreads(hwthreads) for _, core := range cores { queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Core[core]), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(topology.Core[core]), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) } @@ -379,11 +399,12 @@ func (ccms *CCMetricStore) buildQueries( sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) for _, socket := range sockets { queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(topology.Socket[socket]), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(topology.Socket[socket]), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) } @@ -393,11 +414,12 @@ func (ccms *CCMetricStore) buildQueries( // HWThread -> Node if nativeScope == schema.MetricScopeHWThread && scope == schema.MetricScopeNode { queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &hwthreadString, - TypeIds: intToStringSlice(hwthreads), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &hwthreadString, + TypeIds: intToStringSlice(hwthreads), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -407,11 +429,12 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeCore { cores, _ := topology.GetCoresFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &coreString, - TypeIds: intToStringSlice(cores), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: false, + Type: &coreString, + TypeIds: intToStringSlice(cores), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -421,11 +444,12 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeCore && scope == schema.MetricScopeNode { cores, _ := topology.GetCoresFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &coreString, - TypeIds: intToStringSlice(cores), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &coreString, + TypeIds: intToStringSlice(cores), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -435,11 +459,12 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeMemoryDomain { sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &memoryDomainString, - TypeIds: intToStringSlice(sockets), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: false, + Type: &memoryDomainString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -449,11 +474,12 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeMemoryDomain && scope == schema.MetricScopeNode { sockets, _ := topology.GetMemoryDomainsFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &memoryDomainString, - TypeIds: intToStringSlice(sockets), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &memoryDomainString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -463,11 +489,12 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeSocket { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: false, - Type: &socketString, - TypeIds: intToStringSlice(sockets), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: false, + Type: &socketString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -477,11 +504,12 @@ func (ccms *CCMetricStore) buildQueries( if nativeScope == schema.MetricScopeSocket && scope == schema.MetricScopeNode { sockets, _ := topology.GetSocketsFromHWThreads(hwthreads) queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, - Aggregate: true, - Type: &socketString, - TypeIds: intToStringSlice(sockets), + Metric: remoteName, + Hostname: host.Hostname, + Aggregate: true, + Type: &socketString, + TypeIds: intToStringSlice(sockets), + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -490,8 +518,9 @@ func (ccms *CCMetricStore) buildQueries( // Node -> Node if nativeScope == schema.MetricScopeNode && scope == schema.MetricScopeNode { queries = append(queries, ApiQuery{ - Metric: remoteName, - Hostname: host.Hostname, + Metric: remoteName, + Hostname: host.Hostname, + Resolution: resolution, }) assignedScope = append(assignedScope, scope) continue @@ -510,7 +539,15 @@ func (ccms *CCMetricStore) LoadStats( metrics []string, ctx context.Context, ) (map[string]map[string]schema.MetricStatistics, error) { - queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}) // #166 Add scope shere for analysis view accelerator normalization? + + // metricConfigs := archive.GetCluster(job.Cluster).MetricConfig + // resolution := 9000 + + // for _, mc := range metricConfigs { + // resolution = min(resolution, mc.Timestep) + // } + + queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, 0) // #166 Add scope shere for analysis view accelerator normalization? if err != nil { log.Warn("Error while building query") return nil, err @@ -588,8 +625,9 @@ func (ccms *CCMetricStore) LoadNodeData( for _, node := range nodes { for _, metric := range metrics { req.Queries = append(req.Queries, ApiQuery{ - Hostname: node, - Metric: ccms.toRemoteName(metric), + Hostname: node, + Metric: ccms.toRemoteName(metric), + Resolution: 60, // Default for Node Queries }) } } @@ -597,7 +635,7 @@ func (ccms *CCMetricStore) LoadNodeData( resBody, err := ccms.doRequest(ctx, &req) if err != nil { - log.Error("Error while performing request") + log.Error(fmt.Sprintf("Error while performing request %#v\n", err)) return nil, err } diff --git a/internal/metricdata/influxdb-v2.go b/internal/metricdata/influxdb-v2.go index b95f07e0..b416fa52 100644 --- a/internal/metricdata/influxdb-v2.go +++ b/internal/metricdata/influxdb-v2.go @@ -60,7 +60,8 @@ func (idb *InfluxDBv2DataRepository) LoadData( job *schema.Job, metrics []string, scopes []schema.MetricScope, - ctx context.Context) (schema.JobData, error) { + ctx context.Context, + resolution int) (schema.JobData, error) { measurementsConds := make([]string, 0, len(metrics)) for _, m := range metrics { diff --git a/internal/metricdata/metricdata.go b/internal/metricdata/metricdata.go index 68d8d329..354dd5f2 100644 --- a/internal/metricdata/metricdata.go +++ b/internal/metricdata/metricdata.go @@ -21,7 +21,7 @@ type MetricDataRepository interface { Init(rawConfig json.RawMessage) error // Return the JobData for the given job, only with the requested metrics. - LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) + LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) // Return a map of metrics to a map of nodes to the metric statistics of the job. node scope assumed for now. LoadStats(job *schema.Job, metrics []string, ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) diff --git a/internal/metricdata/prometheus.go b/internal/metricdata/prometheus.go index a8d9f395..06118244 100644 --- a/internal/metricdata/prometheus.go +++ b/internal/metricdata/prometheus.go @@ -265,6 +265,7 @@ func (pdb *PrometheusDataRepository) LoadData( metrics []string, scopes []schema.MetricScope, ctx context.Context, + resolution int, ) (schema.JobData, error) { // TODO respect requested scope if len(scopes) == 0 || !contains(scopes, schema.MetricScopeNode) { @@ -356,7 +357,7 @@ func (pdb *PrometheusDataRepository) LoadStats( // map of metrics of nodes of stats stats := map[string]map[string]schema.MetricStatistics{} - data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx) + data, err := pdb.LoadData(job, metrics, []schema.MetricScope{schema.MetricScopeNode}, ctx, 0 /*resolution here*/) if err != nil { log.Warn("Error while loading job for stats") return nil, err diff --git a/internal/metricdata/utils.go b/internal/metricdata/utils.go index 6d490fe3..f480e402 100644 --- a/internal/metricdata/utils.go +++ b/internal/metricdata/utils.go @@ -12,7 +12,7 @@ import ( "github.com/ClusterCockpit/cc-backend/pkg/schema" ) -var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) { +var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) { panic("TODO") } @@ -27,9 +27,10 @@ func (tmdr *TestMetricDataRepository) LoadData( job *schema.Job, metrics []string, scopes []schema.MetricScope, - ctx context.Context) (schema.JobData, error) { + ctx context.Context, + resolution int) (schema.JobData, error) { - return TestLoadDataCallback(job, metrics, scopes, ctx) + return TestLoadDataCallback(job, metrics, scopes, ctx, resolution) } func (tmdr *TestMetricDataRepository) LoadStats( @@ -48,3 +49,41 @@ func (tmdr *TestMetricDataRepository) LoadNodeData( panic("TODO") } + +func DeepCopy(jd_temp schema.JobData) schema.JobData { + var jd schema.JobData + + jd = make(schema.JobData, len(jd_temp)) + for k, v := range jd_temp { + jd[k] = make(map[schema.MetricScope]*schema.JobMetric, len(jd_temp[k])) + for k_, v_ := range v { + jd[k][k_] = new(schema.JobMetric) + jd[k][k_].Series = make([]schema.Series, len(v_.Series)) + for i := 0; i < len(v_.Series); i += 1 { + jd[k][k_].Series[i].Data = make([]schema.Float, len(v_.Series[i].Data)) + copy(jd[k][k_].Series[i].Data, v_.Series[i].Data) + jd[k][k_].Series[i].Hostname = v_.Series[i].Hostname + jd[k][k_].Series[i].Id = v_.Series[i].Id + jd[k][k_].Series[i].Statistics.Avg = v_.Series[i].Statistics.Avg + jd[k][k_].Series[i].Statistics.Min = v_.Series[i].Statistics.Min + jd[k][k_].Series[i].Statistics.Max = v_.Series[i].Statistics.Max + } + jd[k][k_].Timestep = v_.Timestep + jd[k][k_].Unit.Base = v_.Unit.Base + jd[k][k_].Unit.Prefix = v_.Unit.Prefix + if v_.StatisticsSeries != nil { + jd[k][k_].StatisticsSeries = new(schema.StatsSeries) + copy(jd[k][k_].StatisticsSeries.Max, v_.StatisticsSeries.Max) + copy(jd[k][k_].StatisticsSeries.Min, v_.StatisticsSeries.Min) + copy(jd[k][k_].StatisticsSeries.Median, v_.StatisticsSeries.Median) + copy(jd[k][k_].StatisticsSeries.Mean, v_.StatisticsSeries.Mean) + for k__, v__ := range v_.StatisticsSeries.Percentiles { + jd[k][k_].StatisticsSeries.Percentiles[k__] = v__ + } + } else { + jd[k][k_].StatisticsSeries = v_.StatisticsSeries + } + } + } + return jd +} diff --git a/internal/repository/jobQuery.go b/internal/repository/jobQuery.go index 7b575ef2..843d7975 100644 --- a/internal/repository/jobQuery.go +++ b/internal/repository/jobQuery.go @@ -89,7 +89,8 @@ func (r *JobRepository) CountJobs( ctx context.Context, filters []*model.JobFilter, ) (int, error) { - query, qerr := SecurityCheck(ctx, sq.Select("count(*)").From("job")) + // DISTICT count for tags filters, does not affect other queries + query, qerr := SecurityCheck(ctx, sq.Select("count(DISTINCT job.id)").From("job")) if qerr != nil { return 0, qerr } @@ -136,7 +137,8 @@ func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilde // Build a sq.SelectBuilder out of a schema.JobFilter. func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.SelectBuilder { if filter.Tags != nil { - query = query.Join("jobtag ON jobtag.job_id = job.id").Where(sq.Eq{"jobtag.tag_id": filter.Tags}) + // This is an OR-Logic query: Returns all distinct jobs with at least one of the requested tags; TODO: AND-Logic query? + query = query.Join("jobtag ON jobtag.job_id = job.id").Where(sq.Eq{"jobtag.tag_id": filter.Tags}).Distinct() } if filter.JobID != nil { query = buildStringCondition("job.job_id", filter.JobID, query) diff --git a/internal/repository/job_test.go b/internal/repository/job_test.go index 2589fb92..f7b37836 100644 --- a/internal/repository/job_test.go +++ b/internal/repository/job_test.go @@ -5,9 +5,11 @@ package repository import ( + "context" "fmt" "testing" + "github.com/ClusterCockpit/cc-backend/pkg/schema" _ "github.com/mattn/go-sqlite3" ) @@ -45,7 +47,19 @@ func TestFindById(t *testing.T) { func TestGetTags(t *testing.T) { r := setup(t) - tags, counts, err := r.CountTags(nil) + const contextUserKey ContextKey = "user" + contextUserValue := &schema.User{ + Username: "testuser", + Projects: make([]string, 0), + Roles: []string{"user"}, + AuthType: 0, + AuthSource: 2, + } + + ctx := context.WithValue(getContext(t), contextUserKey, contextUserValue) + + // Test Tag has Scope "global" + tags, counts, err := r.CountTags(ctx) if err != nil { t.Fatal(err) } diff --git a/internal/repository/migrations/sqlite3/08_add-footprint.up.sql b/internal/repository/migrations/sqlite3/08_add-footprint.up.sql index 7f0d578a..e555c57e 100644 --- a/internal/repository/migrations/sqlite3/08_add-footprint.up.sql +++ b/internal/repository/migrations/sqlite3/08_add-footprint.up.sql @@ -11,8 +11,9 @@ ALTER TABLE job ADD COLUMN energy REAL NOT NULL DEFAULT 0.0; ALTER TABLE job ADD COLUMN energy_footprint TEXT DEFAULT NULL; ALTER TABLE job ADD COLUMN footprint TEXT DEFAULT NULL; -UPDATE job SET footprint = '{"flops_any_avg": 0.0}'; +ALTER TABLE tag ADD COLUMN tag_scope TEXT NOT NULL DEFAULT 'global'; +UPDATE job SET footprint = '{"flops_any_avg": 0.0}'; UPDATE job SET footprint = json_replace(footprint, '$.flops_any_avg', job.flops_any_avg); UPDATE job SET footprint = json_insert(footprint, '$.mem_bw_avg', job.mem_bw_avg); UPDATE job SET footprint = json_insert(footprint, '$.mem_used_max', job.mem_used_max); diff --git a/internal/repository/stats.go b/internal/repository/stats.go index ca05ca37..ba7a8aab 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -77,8 +77,8 @@ func (r *JobRepository) buildStatsQuery( // fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType) if col != "" { - // Scan columns: id, totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours - query = sq.Select(col, "COUNT(job.id) as totalJobs", + // Scan columns: id, totalJobs, name, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours + query = sq.Select(col, "COUNT(job.id) as totalJobs", "name", fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s) as totalWalltime`, time.Now().Unix(), castType), fmt.Sprintf(`CAST(SUM(job.num_nodes) as %s) as totalNodes`, castType), fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as %s) as totalNodeHours`, time.Now().Unix(), castType), @@ -86,9 +86,9 @@ func (r *JobRepository) buildStatsQuery( fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as %s) as totalCoreHours`, time.Now().Unix(), castType), fmt.Sprintf(`CAST(SUM(job.num_acc) as %s) as totalAccs`, castType), fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as %s) as totalAccHours`, time.Now().Unix(), castType), - ).From("job").GroupBy(col) + ).From("job").Join("user ON user.username = job.user").GroupBy(col) } else { - // Scan columns: totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours + // Scan columns: totalJobs, name, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours query = sq.Select("COUNT(job.id)", fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s)`, time.Now().Unix(), castType), fmt.Sprintf(`CAST(SUM(job.num_nodes) as %s)`, castType), @@ -107,15 +107,15 @@ func (r *JobRepository) buildStatsQuery( return query } -func (r *JobRepository) getUserName(ctx context.Context, id string) string { - user := GetUserFromContext(ctx) - name, _ := r.FindColumnValue(user, id, "user", "name", "username", false) - if name != "" { - return name - } else { - return "-" - } -} +// func (r *JobRepository) getUserName(ctx context.Context, id string) string { +// user := GetUserFromContext(ctx) +// name, _ := r.FindColumnValue(user, id, "user", "name", "username", false) +// if name != "" { +// return name +// } else { +// return "-" +// } +// } func (r *JobRepository) getCastType() string { var castType string @@ -167,14 +167,20 @@ func (r *JobRepository) JobsStatsGrouped( for rows.Next() { var id sql.NullString + var name sql.NullString var jobs, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64 - if err := rows.Scan(&id, &jobs, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil { + if err := rows.Scan(&id, &jobs, &name, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil { log.Warn("Error while scanning rows") return nil, err } if id.Valid { var totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours int + var personName string + + if name.Valid { + personName = name.String + } if jobs.Valid { totalJobs = int(jobs.Int64) @@ -205,11 +211,11 @@ func (r *JobRepository) JobsStatsGrouped( } if col == "job.user" { - name := r.getUserName(ctx, id.String) + // name := r.getUserName(ctx, id.String) stats = append(stats, &model.JobsStatistics{ ID: id.String, - Name: name, + Name: personName, TotalJobs: totalJobs, TotalWalltime: totalWalltime, TotalNodes: totalNodes, diff --git a/internal/repository/tags.go b/internal/repository/tags.go index 8dace036..dcdbd293 100644 --- a/internal/repository/tags.go +++ b/internal/repository/tags.go @@ -5,6 +5,8 @@ package repository import ( + "context" + "fmt" "strings" "github.com/ClusterCockpit/cc-backend/pkg/archive" @@ -14,7 +16,14 @@ import ( ) // Add the tag with id `tagId` to the job with the database id `jobId`. -func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) { +func (r *JobRepository) AddTag(ctx context.Context, job int64, tag int64) ([]*schema.Tag, error) { + + j, err := r.FindById(ctx, job) + if err != nil { + log.Warn("Error while finding job by id") + return nil, err + } + q := sq.Insert("jobtag").Columns("job_id", "tag_id").Values(job, tag) if _, err := q.RunWith(r.stmtCache).Exec(); err != nil { @@ -23,49 +32,62 @@ func (r *JobRepository) AddTag(job int64, tag int64) ([]*schema.Tag, error) { return nil, err } - j, err := r.FindByIdDirect(job) + tags, err := r.GetTags(ctx, &job) if err != nil { - log.Warn("Error while finding job by id") + log.Warn("Error while getting tags for job") return nil, err } - tags, err := r.GetTags(&job) + archiveTags, err := r.getArchiveTags(&job) if err != nil { log.Warn("Error while getting tags for job") return nil, err } - return tags, archive.UpdateTags(j, tags) + return tags, archive.UpdateTags(j, archiveTags) } // Removes a tag from a job -func (r *JobRepository) RemoveTag(job, tag int64) ([]*schema.Tag, error) { +func (r *JobRepository) RemoveTag(ctx context.Context, job, tag int64) ([]*schema.Tag, error) { + + j, err := r.FindById(ctx, job) + if err != nil { + log.Warn("Error while finding job by id") + return nil, err + } + q := sq.Delete("jobtag").Where("jobtag.job_id = ?", job).Where("jobtag.tag_id = ?", tag) if _, err := q.RunWith(r.stmtCache).Exec(); err != nil { s, _, _ := q.ToSql() - log.Errorf("Error adding tag with %s: %v", s, err) + log.Errorf("Error removing tag with %s: %v", s, err) return nil, err } - j, err := r.FindByIdDirect(job) + tags, err := r.GetTags(ctx, &job) if err != nil { - log.Warn("Error while finding job by id") + log.Warn("Error while getting tags for job") return nil, err } - tags, err := r.GetTags(&job) + archiveTags, err := r.getArchiveTags(&job) if err != nil { log.Warn("Error while getting tags for job") return nil, err } - return tags, archive.UpdateTags(j, tags) + return tags, archive.UpdateTags(j, archiveTags) } // CreateTag creates a new tag with the specified type and name and returns its database id. -func (r *JobRepository) CreateTag(tagType string, tagName string) (tagId int64, err error) { - q := sq.Insert("tag").Columns("tag_type", "tag_name").Values(tagType, tagName) +func (r *JobRepository) CreateTag(tagType string, tagName string, tagScope string) (tagId int64, err error) { + + // Default to "Global" scope if none defined + if tagScope == "" { + tagScope = "global" + } + + q := sq.Insert("tag").Columns("tag_type", "tag_name", "tag_scope").Values(tagType, tagName, tagScope) res, err := q.RunWith(r.stmtCache).Exec() if err != nil { @@ -77,9 +99,10 @@ func (r *JobRepository) CreateTag(tagType string, tagName string) (tagId int64, return res.LastInsertId() } -func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts map[string]int, err error) { +func (r *JobRepository) CountTags(ctx context.Context) (tags []schema.Tag, counts map[string]int, err error) { + // Fetch all Tags in DB for Display in Frontend Tag-View tags = make([]schema.Tag, 0, 100) - xrows, err := r.DB.Queryx("SELECT id, tag_type, tag_name FROM tag") + xrows, err := r.DB.Queryx("SELECT id, tag_type, tag_name, tag_scope FROM tag") if err != nil { return nil, nil, err } @@ -89,16 +112,38 @@ func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts if err = xrows.StructScan(&t); err != nil { return nil, nil, err } - tags = append(tags, t) + + // Handle Scope Filtering: Tag Scope is Global, Private (== Username) or User is auth'd to view Admin Tags + readable, err := r.checkScopeAuth(ctx, "read", t.Scope) + if err != nil { + return nil, nil, err + } + if readable { + tags = append(tags, t) + } } - q := sq.Select("t.tag_name, count(jt.tag_id)"). + user := GetUserFromContext(ctx) + + // Query and Count Jobs with attached Tags + q := sq.Select("t.tag_name, t.id, count(jt.tag_id)"). From("tag t"). LeftJoin("jobtag jt ON t.id = jt.tag_id"). GroupBy("t.tag_name") + // Handle Scope Filtering + scopeList := "\"global\"" + if user != nil { + scopeList += ",\"" + user.Username + "\"" + } + if user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport}) { + scopeList += ",\"admin\"" + } + q = q.Where("t.tag_scope IN (" + scopeList + ")") + + // Handle Job Ownership if user != nil && user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport}) { // ADMIN || SUPPORT: Count all jobs - log.Debug("CountTags: User Admin or Support -> Count all Jobs for Tags") + // log.Debug("CountTags: User Admin or Support -> Count all Jobs for Tags") // Unchanged: Needs to be own case still, due to UserRole/NoRole compatibility handling in else case } else if user != nil && user.HasRole(schema.RoleManager) { // MANAGER: Count own jobs plus project's jobs // Build ("project1", "project2", ...) list of variable length directly in SQL string @@ -115,29 +160,45 @@ func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts counts = make(map[string]int) for rows.Next() { var tagName string + var tagId int var count int - if err = rows.Scan(&tagName, &count); err != nil { + if err = rows.Scan(&tagName, &tagId, &count); err != nil { return nil, nil, err } - counts[tagName] = count + // Use tagId as second Map-Key component to differentiate tags with identical names + counts[fmt.Sprint(tagName, tagId)] = count } err = rows.Err() - return + return tags, counts, err } // AddTagOrCreate adds the tag with the specified type and name to the job with the database id `jobId`. // If such a tag does not yet exist, it is created. -func (r *JobRepository) AddTagOrCreate(jobId int64, tagType string, tagName string) (tagId int64, err error) { - tagId, exists := r.TagId(tagType, tagName) +func (r *JobRepository) AddTagOrCreate(ctx context.Context, jobId int64, tagType string, tagName string, tagScope string) (tagId int64, err error) { + + // Default to "Global" scope if none defined + if tagScope == "" { + tagScope = "global" + } + + writable, err := r.checkScopeAuth(ctx, "write", tagScope) + if err != nil { + return 0, err + } + if !writable { + return 0, fmt.Errorf("cannot write tag scope with current authorization") + } + + tagId, exists := r.TagId(tagType, tagName, tagScope) if !exists { - tagId, err = r.CreateTag(tagType, tagName) + tagId, err = r.CreateTag(tagType, tagName, tagScope) if err != nil { return 0, err } } - if _, err := r.AddTag(jobId, tagId); err != nil { + if _, err := r.AddTag(ctx, jobId, tagId); err != nil { return 0, err } @@ -145,19 +206,19 @@ func (r *JobRepository) AddTagOrCreate(jobId int64, tagType string, tagName stri } // TagId returns the database id of the tag with the specified type and name. -func (r *JobRepository) TagId(tagType string, tagName string) (tagId int64, exists bool) { +func (r *JobRepository) TagId(tagType string, tagName string, tagScope string) (tagId int64, exists bool) { exists = true if err := sq.Select("id").From("tag"). - Where("tag.tag_type = ?", tagType).Where("tag.tag_name = ?", tagName). + Where("tag.tag_type = ?", tagType).Where("tag.tag_name = ?", tagName).Where("tag.tag_scope = ?", tagScope). RunWith(r.stmtCache).QueryRow().Scan(&tagId); err != nil { exists = false } return } -// GetTags returns a list of all tags if job is nil or of the tags that the job with that database ID has. -func (r *JobRepository) GetTags(job *int64) ([]*schema.Tag, error) { - q := sq.Select("id", "tag_type", "tag_name").From("tag") +// GetTags returns a list of all scoped tags if job is nil or of the tags that the job with that database ID has. +func (r *JobRepository) GetTags(ctx context.Context, job *int64) ([]*schema.Tag, error) { + q := sq.Select("id", "tag_type", "tag_name", "tag_scope").From("tag") if job != nil { q = q.Join("jobtag ON jobtag.tag_id = tag.id").Where("jobtag.job_id = ?", *job) } @@ -172,7 +233,41 @@ func (r *JobRepository) GetTags(job *int64) ([]*schema.Tag, error) { tags := make([]*schema.Tag, 0) for rows.Next() { tag := &schema.Tag{} - if err := rows.Scan(&tag.ID, &tag.Type, &tag.Name); err != nil { + if err := rows.Scan(&tag.ID, &tag.Type, &tag.Name, &tag.Scope); err != nil { + log.Warn("Error while scanning rows") + return nil, err + } + // Handle Scope Filtering: Tag Scope is Global, Private (== Username) or User is auth'd to view Admin Tags + readable, err := r.checkScopeAuth(ctx, "read", tag.Scope) + if err != nil { + return nil, err + } + if readable { + tags = append(tags, tag) + } + } + + return tags, nil +} + +// GetArchiveTags returns a list of all tags *regardless of scope* for archiving if job is nil or of the tags that the job with that database ID has. +func (r *JobRepository) getArchiveTags(job *int64) ([]*schema.Tag, error) { + q := sq.Select("id", "tag_type", "tag_name", "tag_scope").From("tag") + if job != nil { + q = q.Join("jobtag ON jobtag.tag_id = tag.id").Where("jobtag.job_id = ?", *job) + } + + rows, err := q.RunWith(r.stmtCache).Query() + if err != nil { + s, _, _ := q.ToSql() + log.Errorf("Error get tags with %s: %v", s, err) + return nil, err + } + + tags := make([]*schema.Tag, 0) + for rows.Next() { + tag := &schema.Tag{} + if err := rows.Scan(&tag.ID, &tag.Type, &tag.Name, &tag.Scope); err != nil { log.Warn("Error while scanning rows") return nil, err } @@ -181,3 +276,60 @@ func (r *JobRepository) GetTags(job *int64) ([]*schema.Tag, error) { return tags, nil } + +func (r *JobRepository) ImportTag(jobId int64, tagType string, tagName string, tagScope string) (err error) { + // Import has no scope ctx, only import from metafile to DB (No recursive archive update required), only returns err + + tagId, exists := r.TagId(tagType, tagName, tagScope) + if !exists { + tagId, err = r.CreateTag(tagType, tagName, tagScope) + if err != nil { + return err + } + } + + q := sq.Insert("jobtag").Columns("job_id", "tag_id").Values(jobId, tagId) + + if _, err := q.RunWith(r.stmtCache).Exec(); err != nil { + s, _, _ := q.ToSql() + log.Errorf("Error adding tag on import with %s: %v", s, err) + return err + } + + return nil +} + +func (r *JobRepository) checkScopeAuth(ctx context.Context, operation string, scope string) (pass bool, err error) { + user := GetUserFromContext(ctx) + if user != nil { + switch { + case operation == "write" && scope == "admin": + if user.HasRole(schema.RoleAdmin) || (len(user.Roles) == 1 && user.HasRole(schema.RoleApi)) { + return true, nil + } + return false, nil + case operation == "write" && scope == "global": + if user.HasRole(schema.RoleAdmin) || (len(user.Roles) == 1 && user.HasRole(schema.RoleApi)) { + return true, nil + } + return false, nil + case operation == "write" && scope == user.Username: + return true, nil + case operation == "read" && scope == "admin": + return user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport}), nil + case operation == "read" && scope == "global": + return true, nil + case operation == "read" && scope == user.Username: + return true, nil + default: + if operation == "read" || operation == "write" { + // No acceptable scope: deny tag + return false, nil + } else { + return false, fmt.Errorf("error while checking tag operation auth: unknown operation (%s)", operation) + } + } + } else { + return false, fmt.Errorf("error while checking tag operation auth: no user in context") + } +} diff --git a/internal/repository/testdata/job.db b/internal/repository/testdata/job.db index c345327e..23eba6fb 100644 Binary files a/internal/repository/testdata/job.db and b/internal/repository/testdata/job.db differ diff --git a/internal/repository/user.go b/internal/repository/user.go index a8776a92..db961368 100644 --- a/internal/repository/user.go +++ b/internal/repository/user.go @@ -130,6 +130,27 @@ func (r *UserRepository) AddUser(user *schema.User) error { return nil } +func (r *UserRepository) UpdateUser(dbUser *schema.User, user *schema.User) error { + // user contains updated info, apply to dbuser + // TODO: Discuss updatable fields + if dbUser.Name != user.Name { + if _, err := sq.Update("user").Set("name", user.Name).Where("user.username = ?", dbUser.Username).RunWith(r.DB).Exec(); err != nil { + log.Errorf("error while updating name of user '%s'", user.Username) + return err + } + } + + // Toggled until greenlit + // if dbUser.HasRole(schema.RoleManager) && !reflect.DeepEqual(dbUser.Projects, user.Projects) { + // projects, _ := json.Marshal(user.Projects) + // if _, err := sq.Update("user").Set("projects", projects).Where("user.username = ?", dbUser.Username).RunWith(r.DB).Exec(); err != nil { + // return err + // } + // } + + return nil +} + func (r *UserRepository) DelUser(username string) error { _, err := r.DB.Exec(`DELETE FROM user WHERE user.username = ?`, username) if err != nil { diff --git a/internal/routerConfig/routes.go b/internal/routerConfig/routes.go index 1dd6dee1..1dfdfc2a 100644 --- a/internal/routerConfig/routes.go +++ b/internal/routerConfig/routes.go @@ -13,6 +13,7 @@ import ( "strings" "time" + "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/util" @@ -124,9 +125,8 @@ func setupAnalysisRoute(i InfoType, r *http.Request) InfoType { func setupTaglistRoute(i InfoType, r *http.Request) InfoType { jobRepo := repository.GetJobRepository() - user := repository.GetUserFromContext(r.Context()) - tags, counts, err := jobRepo.CountTags(user) + tags, counts, err := jobRepo.CountTags(r.Context()) tagMap := make(map[string][]map[string]interface{}) if err != nil { log.Warnf("GetTags failed: %s", err.Error()) @@ -134,11 +134,13 @@ func setupTaglistRoute(i InfoType, r *http.Request) InfoType { return i } + // Uses tag.ID as second Map-Key component to differentiate tags with identical names for _, tag := range tags { tagItem := map[string]interface{}{ "id": tag.ID, "name": tag.Name, - "count": counts[tag.Name], + "scope": tag.Scope, + "count": counts[fmt.Sprint(tag.Name, tag.ID)], } tagMap[tag.Type] = append(tagMap[tag.Type], tagItem) } @@ -272,12 +274,13 @@ func SetupRoutes(router *mux.Router, buildInfo web.Build) { availableRoles, _ := schema.GetValidRolesMap(user) page := web.Page{ - Title: title, - User: *user, - Roles: availableRoles, - Build: buildInfo, - Config: conf, - Infos: infos, + Title: title, + User: *user, + Roles: availableRoles, + Build: buildInfo, + Config: conf, + Resampling: config.Keys.EnableResampling, + Infos: infos, } if route.Filter { diff --git a/internal/taskManager/updateFootprintService.go b/internal/taskManager/updateFootprintService.go index 2434fd19..060e3329 100644 --- a/internal/taskManager/updateFootprintService.go +++ b/internal/taskManager/updateFootprintService.go @@ -47,9 +47,10 @@ func RegisterFootprintWorker() { scopes = append(scopes, schema.MetricScopeAccelerator) for _, job := range jobs { - jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, context.Background()) + // log.Debugf("Try job %d", job.JobID) + jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, context.Background(), 0) // 0 Resolution-Value retrieves highest res if err != nil { - log.Error("Error wile loading job data for footprint update") + log.Errorf("Error wile loading job data for footprint update: %v", err) continue } @@ -107,6 +108,7 @@ func RegisterFootprintWorker() { // log.Errorf("Update job (dbid: %d) failed at db execute: %s", job.ID, err.Error()) // continue // } + log.Debugf("Finish job %d", job.JobID) } jobRepo.TransactionCommit(t) diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 56c5d47d..52a760fc 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -171,8 +171,9 @@ func UpdateTags(job *schema.Job, tags []*schema.Tag) error { jobMeta.Tags = make([]*schema.Tag, 0) for _, tag := range tags { jobMeta.Tags = append(jobMeta.Tags, &schema.Tag{ - Name: tag.Name, - Type: tag.Type, + Name: tag.Name, + Type: tag.Type, + Scope: tag.Scope, }) } diff --git a/pkg/archive/json.go b/pkg/archive/json.go index ff2c6d92..12196583 100644 --- a/pkg/archive/json.go +++ b/pkg/archive/json.go @@ -9,8 +9,8 @@ import ( "io" "time" - "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" ) func DecodeJobData(r io.Reader, k string) (schema.JobData, error) { diff --git a/pkg/resampler/resampler.go b/pkg/resampler/resampler.go new file mode 100644 index 00000000..26cead01 --- /dev/null +++ b/pkg/resampler/resampler.go @@ -0,0 +1,123 @@ +package resampler + +import ( + "errors" + "fmt" + "math" + + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +func SimpleResampler(data []schema.Float, old_frequency int64, new_frequency int64) ([]schema.Float, error) { + if old_frequency == 0 || new_frequency == 0 { + return nil, errors.New("either old or new frequency is set to 0") + } + + if new_frequency%old_frequency != 0 { + return nil, errors.New("new sampling frequency should be multiple of the old frequency") + } + + var step int = int(new_frequency / old_frequency) + var new_data_length = len(data) / step + + if new_data_length == 0 || len(data) < 100 || new_data_length >= len(data) { + return data, nil + } + + new_data := make([]schema.Float, new_data_length) + + for i := 0; i < new_data_length; i++ { + new_data[i] = data[i*step] + } + + return new_data, nil +} + +// Inspired by one of the algorithms from https://skemman.is/bitstream/1946/15343/3/SS_MSthesis.pdf +// Adapted from https://github.com/haoel/downsampling/blob/master/core/lttb.go +func LargestTriangleThreeBucket(data []schema.Float, old_frequency int, new_frequency int) ([]schema.Float, int, error) { + + if old_frequency == 0 || new_frequency == 0 { + return data, old_frequency, nil + } + + if new_frequency%old_frequency != 0 { + return nil, 0, errors.New(fmt.Sprintf("new sampling frequency : %d should be multiple of the old frequency : %d", new_frequency, old_frequency)) + } + + var step int = int(new_frequency / old_frequency) + var new_data_length = len(data) / step + + if new_data_length == 0 || len(data) < 100 || new_data_length >= len(data) { + return data, old_frequency, nil + } + + new_data := make([]schema.Float, 0, new_data_length) + + // Bucket size. Leave room for start and end data points + bucketSize := float64(len(data)-2) / float64(new_data_length-2) + + new_data = append(new_data, data[0]) // Always add the first point + + // We have 3 pointers represent for + // > bucketLow - the current bucket's beginning location + // > bucketMiddle - the current bucket's ending location, + // also the beginning location of next bucket + // > bucketHight - the next bucket's ending location. + bucketLow := 1 + bucketMiddle := int(math.Floor(bucketSize)) + 1 + + var prevMaxAreaPoint int + + for i := 0; i < new_data_length-2; i++ { + + bucketHigh := int(math.Floor(float64(i+2)*bucketSize)) + 1 + if bucketHigh >= len(data)-1 { + bucketHigh = len(data) - 2 + } + + // Calculate point average for next bucket (containing c) + avgPointX, avgPointY := calculateAverageDataPoint(data[bucketMiddle:bucketHigh+1], int64(bucketMiddle)) + + // Get the range for current bucket + currBucketStart := bucketLow + currBucketEnd := bucketMiddle + + // Point a + pointX := prevMaxAreaPoint + pointY := data[prevMaxAreaPoint] + + maxArea := -1.0 + + var maxAreaPoint int + flag_ := 0 + for ; currBucketStart < currBucketEnd; currBucketStart++ { + + area := calculateTriangleArea(schema.Float(pointX), pointY, avgPointX, avgPointY, schema.Float(currBucketStart), data[currBucketStart]) + if area > maxArea { + maxArea = area + maxAreaPoint = currBucketStart + } + if math.IsNaN(float64(avgPointY)) { + flag_ = 1 + + } + } + + if flag_ == 1 { + new_data = append(new_data, schema.NaN) // Pick this point from the bucket + + } else { + new_data = append(new_data, data[maxAreaPoint]) // Pick this point from the bucket + } + prevMaxAreaPoint = maxAreaPoint // This MaxArea point is the next's prevMAxAreaPoint + + //move to the next window + bucketLow = bucketMiddle + bucketMiddle = bucketHigh + } + + new_data = append(new_data, data[len(data)-1]) // Always add last + + return new_data, new_frequency, nil +} diff --git a/pkg/resampler/util.go b/pkg/resampler/util.go new file mode 100644 index 00000000..36d8bed8 --- /dev/null +++ b/pkg/resampler/util.go @@ -0,0 +1,35 @@ +package resampler + +import ( + "math" + + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +func calculateTriangleArea(paX, paY, pbX, pbY, pcX, pcY schema.Float) float64 { + area := ((paX-pcX)*(pbY-paY) - (paX-pbX)*(pcY-paY)) * 0.5 + return math.Abs(float64(area)) +} + +func calculateAverageDataPoint(points []schema.Float, xStart int64) (avgX schema.Float, avgY schema.Float) { + flag := 0 + for _, point := range points { + avgX += schema.Float(xStart) + avgY += point + xStart++ + if math.IsNaN(float64(point)) { + flag = 1 + } + } + + l := schema.Float(len(points)) + + avgX /= l + avgY /= l + + if flag == 1 { + return avgX, schema.NaN + } else { + return avgX, avgY + } +} diff --git a/pkg/schema/config.go b/pkg/schema/config.go index 9d2b76ac..b2b58726 100644 --- a/pkg/schema/config.go +++ b/pkg/schema/config.go @@ -79,6 +79,12 @@ type Retention struct { type NatsConfig struct { // Port of the nats server Port int `json:"port"` + +type ResampleConfig struct { + // Trigger next zoom level at less than this many visible datapoints + Trigger int `json:"trigger"` + // Array of resampling target resolutions, in seconds; Example: [600,300,60] + Resolutions []int `json:"resolutions"` } // Format of the configuration (file). See below for the defaults. @@ -141,6 +147,9 @@ type ProgramConfig struct { // be provided! Most options here can be overwritten by the user. UiDefaults map[string]interface{} `json:"ui-defaults"` + // If exists, will enable dynamic zoom in frontend metric plots using the configured values + EnableResampling *ResampleConfig `json:"enable-resampling"` + // Where to store MachineState files MachineStateDir string `json:"machine-state-dir"` diff --git a/pkg/schema/job.go b/pkg/schema/job.go index 2a2ea95b..f5bcc621 100644 --- a/pkg/schema/job.go +++ b/pkg/schema/job.go @@ -117,9 +117,10 @@ type JobStatistics struct { // Tag model // @Description Defines a tag using name and type. type Tag struct { - Type string `json:"type" db:"tag_type" example:"Debug"` - Name string `json:"name" db:"tag_name" example:"Testjob"` - ID int64 `json:"id" db:"id"` + Type string `json:"type" db:"tag_type" example:"Debug"` + Name string `json:"name" db:"tag_name" example:"Testjob"` + Scope string `json:"scope" db:"tag_scope" example:"global"` + ID int64 `json:"id" db:"id"` } // Resource model diff --git a/pkg/schema/schemas/config.schema.json b/pkg/schema/schemas/config.schema.json index ee64b5a0..cc6c553d 100644 --- a/pkg/schema/schemas/config.schema.json +++ b/pkg/schema/schemas/config.schema.json @@ -424,6 +424,27 @@ "plot_general_colorscheme", "plot_list_selectedMetrics" ] + }, + "enable-resampling": { + "description": "Enable dynamic zoom in frontend metric plots.", + "type": "object", + "properties": { + "trigger": { + "description": "Trigger next zoom level at less than this many visible datapoints.", + "type": "integer" + }, + "resolutions": { + "description": "Array of resampling target resolutions, in seconds.", + "type": "array", + "items": { + "type": "integer" + } + } + }, + "required": [ + "trigger", + "resolutions" + ] } }, "required": [ diff --git a/web/frontend/src/Header.svelte b/web/frontend/src/Header.svelte index 9150ed04..de5159a4 100644 --- a/web/frontend/src/Header.svelte +++ b/web/frontend/src/Header.svelte @@ -48,6 +48,7 @@ href: `/monitoring/user/${username}`, icon: "bar-chart-line-fill", perCluster: false, + listOptions: false, menu: "none", }, { @@ -56,6 +57,7 @@ href: `/monitoring/jobs/`, icon: "card-list", perCluster: false, + listOptions: false, menu: "none", }, { @@ -63,7 +65,8 @@ requiredRole: roles.manager, href: "/monitoring/users/", icon: "people-fill", - perCluster: false, + perCluster: true, + listOptions: true, menu: "Groups", }, { @@ -71,7 +74,8 @@ requiredRole: roles.support, href: "/monitoring/projects/", icon: "folder", - perCluster: false, + perCluster: true, + listOptions: true, menu: "Groups", }, { @@ -80,6 +84,7 @@ href: "/monitoring/tags/", icon: "tags", perCluster: false, + listOptions: false, menu: "Groups", }, { @@ -88,6 +93,7 @@ href: "/monitoring/analysis/", icon: "graph-up", perCluster: true, + listOptions: false, menu: "Stats", }, { @@ -96,6 +102,7 @@ href: "/monitoring/systems/", icon: "cpu", perCluster: true, + listOptions: false, menu: "Groups", }, { @@ -104,6 +111,7 @@ href: "/monitoring/status/", icon: "cpu", perCluster: true, + listOptions: false, menu: "Stats", }, ]; diff --git a/web/frontend/src/Job.root.svelte b/web/frontend/src/Job.root.svelte index 213898e0..73dd158d 100644 --- a/web/frontend/src/Job.root.svelte +++ b/web/frontend/src/Job.root.svelte @@ -2,9 +2,9 @@ @component Main single job display component; displays plots for every metric as well as various information Properties: + - `dbid Number`: The jobs DB ID - `username String`: Empty string if auth. is disabled, otherwise the username as string - `authlevel Number`: The current users authentication level - - `clusters [String]`: List of cluster names - `roles [Number]`: Enum containing available roles --> @@ -25,7 +25,6 @@ CardHeader, CardTitle, Button, - Icon, } from "@sveltestrap/sveltestrap"; import { getContext } from "svelte"; import { @@ -35,16 +34,16 @@ transformDataForRoofline, } from "./generic/utils.js"; import Metric from "./job/Metric.svelte"; - import TagManagement from "./job/TagManagement.svelte"; import StatsTable from "./job/StatsTable.svelte"; - import JobFootprint from "./generic/helper/JobFootprint.svelte"; + import JobSummary from "./job/JobSummary.svelte"; + import ConcurrentJobs from "./generic/helper/ConcurrentJobs.svelte"; import PlotTable from "./generic/PlotTable.svelte"; - import Polar from "./generic/plots/Polar.svelte"; import Roofline from "./generic/plots/Roofline.svelte"; import JobInfo from "./generic/joblist/JobInfo.svelte"; import MetricSelection from "./generic/select/MetricSelection.svelte"; export let dbid; + export let username; export let authlevel; export let roles; @@ -53,12 +52,11 @@ const ccconfig = getContext("cc-config") let isMetricsSelectionOpen = false, - showFootprint = !!ccconfig[`job_view_showFootprint`], selectedMetrics = [], selectedScopes = []; let plots = {}, - jobTags, + roofWidth, statsTable let missingMetrics = [], @@ -75,7 +73,7 @@ duration, numNodes, numHWThreads, numAcc, SMT, exclusive, partition, subCluster, arrayJobId, monitoringStatus, state, walltime, - tags { id, type, name }, + tags { id, type, scope, name }, resources { hostname, hwthreads, accelerators }, metaData, userData { name, email }, @@ -122,15 +120,6 @@ variables: { dbid, selectedMetrics, selectedScopes }, }); - function loadAllScopes() { - selectedScopes = [...selectedScopes, "socket", "core"] - jobMetrics = queryStore({ - client: client, - query: query, - variables: { dbid, selectedMetrics, selectedScopes}, - }); - } - // Handle Job Query on Init -> is not executed anymore getContext("on-init")(() => { let job = $initq.data.job; @@ -231,221 +220,225 @@ })); - - + + + {#if $initq.error} {$initq.error.message} {:else if $initq.data} - + + + {#if $initq.data?.job?.metaData?.message} + + + +
Job {$initq.data?.job?.jobId} ({$initq.data?.job?.cluster})
+ The following note was added by administrators: +
+ + {@html $initq.data.job.metaData.message} + +
+
+ {/if} + + + + + + {#if $initq.data.job.concurrentJobs != null && $initq.data.job.concurrentJobs.items.length != 0} + + + {$initq.data.job.concurrentJobs.items.length} Concurrent Jobs + + + roles.manager)}/> + + + {/if} +
+
{:else} {/if} - {#if $initq.data && showFootprint} - - - - {/if} - {#if $initq?.data && $jobMetrics?.data?.jobMetrics} - {#if $initq.data.job.concurrentJobs != null && $initq.data.job.concurrentJobs.items.length != 0} - {#if authlevel > roles.manager} - -
- Concurrent Jobs -
-
    -
  • - See All -
  • - {#each $initq.data.job.concurrentJobs.items as pjob, index} -
  • - {pjob.jobId} -
  • - {/each} -
- - {:else} - -
- {$initq.data.job.concurrentJobs.items.length} Concurrent Jobs -
-

- Number of shared jobs on the same node with overlapping runtimes. -

- - {/if} - {/if} - - - - - c.name == $initq.data.job.cluster) - .subClusters.find((sc) => sc.name == $initq.data.job.subCluster)} - data={transformDataForRoofline( - $jobMetrics.data.jobMetrics.find( - (m) => m.name == "flops_any" && m.scope == "node", - )?.metric, - $jobMetrics.data.jobMetrics.find( - (m) => m.name == "mem_bw" && m.scope == "node", - )?.metric, - )} - /> - - {:else} - + + + + {#if $initq.error} + {$initq.error.message} + {:else if $initq?.data && $jobMetrics?.data} + + {:else} - - {/if} -
- - - {#if $initq.data} - {/if} - - {#if $initq.data} - + + + + {#if $initq.error || $jobMetrics.error} + +

Initq Error: {$initq.error?.message}

+

jobMetrics Error: {$jobMetrics.error?.message}

+
+ {:else if $initq?.data && $jobMetrics?.data} + +
+ c.name == $initq.data.job.cluster) + .subClusters.find((sc) => sc.name == $initq.data.job.subCluster)} + data={transformDataForRoofline( + $jobMetrics.data?.jobMetrics?.find( + (m) => m.name == "flops_any" && m.scope == "node", + )?.metric, + $jobMetrics.data?.jobMetrics?.find( + (m) => m.name == "mem_bw" && m.scope == "node", + )?.metric, + )} + /> +
+
+ {:else} + {/if}
- - - {#if $jobMetrics.error} - {#if $initq.data.job.monitoringStatus == 0 || $initq.data.job.monitoringStatus == 2} - Not monitored or archiving failed -
+ + + + + {#if $initq.data} + + + {/if} - {$jobMetrics.error.message} - {:else if $jobMetrics.fetching} - - {:else if $initq?.data && $jobMetrics?.data?.jobMetrics} - - {#if item.data} - gm.name == item.metric)?.unit} - nativeScope={$initq.data.globalMetrics.find((gm) => gm.name == item.metric)?.scope} - rawData={item.data.map((x) => x.metric)} - scopes={item.data.map((x) => x.scope)} - {width} - isShared={$initq.data.job.exclusive != 1} - /> - {:else} - No dataset returned for {item.metric} +
+ + + {#if $jobMetrics.error} + {#if $initq.data.job.monitoringStatus == 0 || $initq.data.job.monitoringStatus == 2} + Not monitored or archiving failed +
+ {/if} + {$jobMetrics.error.message} + {:else if $jobMetrics.fetching} + + {:else if $initq?.data && $jobMetrics?.data?.jobMetrics} + - {/if} - - {/if} - -
- - - {#if $initq.data} - - {#if somethingMissing} - -
- - - Missing Metrics/Resources - - - {#if missingMetrics.length > 0} -

- No data at all is available for the metrics: {missingMetrics.join( - ", ", - )} -

- {/if} - {#if missingHosts.length > 0} -

Some metrics are missing for the following hosts:

-
    - {#each missingHosts as missing} -
  • - {missing.hostname}: {missing.metrics.join(", ")} -
  • - {/each} -
- {/if} -
-
-
-
- {/if} - - {#if $jobMetrics?.data?.jobMetrics} - {#key $jobMetrics.data.jobMetrics} - statsTable.moreLoaded(detail)} job={$initq.data.job} - jobMetrics={$jobMetrics.data.jobMetrics} + metricName={item.metric} + metricUnit={$initq.data.globalMetrics.find((gm) => gm.name == item.metric)?.unit} + nativeScope={$initq.data.globalMetrics.find((gm) => gm.name == item.metric)?.scope} + rawData={item.data.map((x) => x.metric)} + scopes={item.data.map((x) => x.scope)} + {width} + isShared={$initq.data.job.exclusive != 1} /> - {/key} - {/if} - - -
- {#if $initq.data.job.metaData?.jobScript} -
{$initq.data.job.metaData?.jobScript}
- {:else} - No job script available - {/if} -
-
- -
- {#if $initq.data.job.metaData?.slurmInfo} -
{$initq.data.job.metaData?.slurmInfo}
{:else} No additional slurm information availableNo dataset returned for {item.metric} {/if} -
-
-
+
+ {/if} + +
+
+
+ + + + {#if $initq.data} + + + {#if somethingMissing} + +
+ + + Missing Metrics/Resources + + + {#if missingMetrics.length > 0} +

+ No data at all is available for the metrics: {missingMetrics.join( + ", ", + )} +

+ {/if} + {#if missingHosts.length > 0} +

Some metrics are missing for the following hosts:

+
    + {#each missingHosts as missing} +
  • + {missing.hostname}: {missing.metrics.join(", ")} +
  • + {/each} +
+ {/if} +
+
+
+
+ {/if} + + {#if $jobMetrics?.data?.jobMetrics} + {#key $jobMetrics.data.jobMetrics} + + {/key} + {/if} + + +
+ {#if $initq.data.job.metaData?.jobScript} +
{$initq.data.job.metaData?.jobScript}
+ {:else} + No job script available + {/if} +
+
+ +
+ {#if $initq.data.job.metaData?.slurmInfo} +
{$initq.data.job.metaData?.slurmInfo}
+ {:else} + No additional slurm information available + {/if} +
+
+
+
{/if}
diff --git a/web/frontend/src/Node.root.svelte b/web/frontend/src/Node.root.svelte index 2d58540c..c8b0d788 100644 --- a/web/frontend/src/Node.root.svelte +++ b/web/frontend/src/Node.root.svelte @@ -90,11 +90,10 @@ }, }); - let itemsPerPage = ccconfig.plot_list_jobsPerPage; - let page = 1; - let paging = { itemsPerPage, page }; - let sorting = { field: "startTime", type: "col", order: "DESC" }; - $: filter = [ + + const paging = { itemsPerPage: 50, page: 1 }; + const sorting = { field: "startTime", type: "col", order: "DESC" }; + const filter = [ { cluster: { eq: cluster } }, { node: { contains: hostname } }, { state: ["running"] }, @@ -207,7 +206,6 @@ cluster={clusters.find((c) => c.name == cluster)} subCluster={$nodeMetricsData.data.nodeMetrics[0].subCluster} series={item.metric.series} - resources={[{ hostname: hostname }]} forNode={true} /> {:else if item.disabled === true && item.metric} diff --git a/web/frontend/src/Systems.root.svelte b/web/frontend/src/Systems.root.svelte index c4834014..0d5e70ed 100644 --- a/web/frontend/src/Systems.root.svelte +++ b/web/frontend/src/Systems.root.svelte @@ -206,7 +206,6 @@ metric={item.data.name} cluster={clusters.find((c) => c.name == cluster)} subCluster={item.subCluster} - resources={[{ hostname: item.host }]} forNode={true} /> {:else if item.disabled === true && item.data} diff --git a/web/frontend/src/config.entrypoint.js b/web/frontend/src/config.entrypoint.js index 2978c8c9..345056b6 100644 --- a/web/frontend/src/config.entrypoint.js +++ b/web/frontend/src/config.entrypoint.js @@ -9,6 +9,7 @@ new Config({ username: username }, context: new Map([ - ['cc-config', clusterCockpitConfig] + ['cc-config', clusterCockpitConfig], + ['resampling', resampleConfig] ]) }) diff --git a/web/frontend/src/config/AdminSettings.svelte b/web/frontend/src/config/AdminSettings.svelte index d959c3b7..9d3abf21 100644 --- a/web/frontend/src/config/AdminSettings.svelte +++ b/web/frontend/src/config/AdminSettings.svelte @@ -51,7 +51,5 @@ - - - +
diff --git a/web/frontend/src/config/admin/Options.svelte b/web/frontend/src/config/admin/Options.svelte index 2a4e11c2..a1fe307a 100644 --- a/web/frontend/src/config/admin/Options.svelte +++ b/web/frontend/src/config/admin/Options.svelte @@ -3,11 +3,13 @@ --> - - - Scramble Names / Presentation Mode - - Active? - - + + + + Scramble Names / Presentation Mode + + Active? + + + + +{#if resampleConfig} + + + + Metric Plot Resampling +

Triggered at {resampleConfig.trigger} datapoints.

+

Configured resolutions: {resampleConfig.resolutions}

+
+
+ +{/if} diff --git a/web/frontend/src/generic/Filters.svelte b/web/frontend/src/generic/Filters.svelte index a1839c7d..d01c16b2 100644 --- a/web/frontend/src/generic/Filters.svelte +++ b/web/frontend/src/generic/Filters.svelte @@ -322,7 +322,9 @@ {#if filters.tags.length != 0} (isTagsOpen = true)}> {#each filters.tags as tagId} - + {#key tagId} + + {/key} {/each} {/if} diff --git a/web/frontend/src/generic/JobList.svelte b/web/frontend/src/generic/JobList.svelte index e3e3f408..1cbd4c64 100644 --- a/web/frontend/src/generic/JobList.svelte +++ b/web/frontend/src/generic/JobList.svelte @@ -81,6 +81,7 @@ id type name + scope } userData { name diff --git a/web/frontend/src/generic/helper/ConcurrentJobs.svelte b/web/frontend/src/generic/helper/ConcurrentJobs.svelte new file mode 100644 index 00000000..85bac83e --- /dev/null +++ b/web/frontend/src/generic/helper/ConcurrentJobs.svelte @@ -0,0 +1,101 @@ + + + + +{#if renderCard} + + + {cJobs.items.length} Concurrent Jobs + + + + {#if showLinks} + + {:else} +
    + {#each cJobs.items as cJob} +
  • + {cJob.jobId} +
  • + {/each} +
+ {/if} +
+
+{:else} +

+ {cJobs.items.length} Jobs running on the same node with overlapping runtimes using shared resources. + ( See All ) +

+
+ {#if showLinks} + + {:else} + + {/if} +{/if} + + diff --git a/web/frontend/src/generic/helper/JobFootprint.svelte b/web/frontend/src/generic/helper/JobFootprint.svelte index 4e1abb03..b6087a70 100644 --- a/web/frontend/src/generic/helper/JobFootprint.svelte +++ b/web/frontend/src/generic/helper/JobFootprint.svelte @@ -208,7 +208,7 @@ {fpd.message}{fpd.message} @@ -246,7 +246,7 @@ {fpd.message}{fpd.message} {/if} {/each} diff --git a/web/frontend/src/generic/helper/Tag.svelte b/web/frontend/src/generic/helper/Tag.svelte index 55dabcb2..7efaf63a 100644 --- a/web/frontend/src/generic/helper/Tag.svelte +++ b/web/frontend/src/generic/helper/Tag.svelte @@ -23,12 +23,22 @@ if ($initialized && tag == null) tag = allTags.find(tag => tag.id == id) } + + function getScopeColor(scope) { + switch (scope) { + case "admin": + return "#19e5e6"; + case "global": + return "#c85fc8"; + default: + return "#ffc107"; + } + } diff --git a/web/frontend/src/generic/joblist/JobInfo.svelte b/web/frontend/src/generic/joblist/JobInfo.svelte index 1e32afce..93b43c3b 100644 --- a/web/frontend/src/generic/joblist/JobInfo.svelte +++ b/web/frontend/src/generic/joblist/JobInfo.svelte @@ -10,9 +10,14 @@ import { Badge, Icon } from "@sveltestrap/sveltestrap"; import { scrambleNames, scramble } from "../utils.js"; import Tag from "../helper/Tag.svelte"; + import TagManagement from "../helper/TagManagement.svelte"; export let job; export let jobTags = job.tags; + export let showTagedit = false; + export let username = null; + export let authlevel= null; + export let roles = null; function formatDuration(duration) { const hours = Math.floor(duration / 3600); @@ -36,7 +41,7 @@
-

+

{job.jobId} ({job.cluster}) -

+

{scrambleNames ? scramble(job.user) : job.user} @@ -84,7 +89,7 @@ {/if}

-

+

{#if job.numNodes == 1} {job.resources[0].hostname} {:else} @@ -104,7 +109,7 @@ {job.subCluster}

-

+

Start: {new Date(job.startTime).toLocaleString()} @@ -117,11 +122,25 @@ {/if}

-

- {#each jobTags as tag} - - {/each} -

+ {#if showTagedit} +
+

+ : + {#if jobTags?.length > 0} + {#each jobTags as tag} + + {/each} + {:else} + No Tags + {/if} +

+ {:else} +

+ {#each jobTags as tag} + + {/each} +

+ {/if}
diff --git a/web/frontend/src/job/Metric.svelte b/web/frontend/src/job/Metric.svelte index 88c6da81..89b9d308 100644 --- a/web/frontend/src/job/Metric.svelte +++ b/web/frontend/src/job/Metric.svelte @@ -13,14 +13,24 @@ --> @@ -65,13 +175,13 @@ {metricName} ({unit}) @@ -85,13 +195,13 @@ {/if} {#key series} - {#if fetching == true} + {#if $metricData?.fetching == true} {:else if error != null} {error.message} {:else if series != null && !patternMatches} { handleZoom(detail) }} {width} height={300} cluster={job.cluster} @@ -101,11 +211,11 @@ metric={metricName} {series} {isShared} - resources={job.resources} + {zoomState} /> {:else if statsSeries[selectedScopeIndex] != null && patternMatches} { handleZoom(detail) }} {width} height={300} cluster={job.cluster} @@ -115,7 +225,7 @@ metric={metricName} {series} {isShared} - resources={job.resources} + {zoomState} statisticsSeries={statsSeries[selectedScopeIndex]} useStatsSeries={!!statsSeries[selectedScopeIndex]} /> diff --git a/web/frontend/src/job/StatsTable.svelte b/web/frontend/src/job/StatsTable.svelte index 88777f68..0d641127 100644 --- a/web/frontend/src/job/StatsTable.svelte +++ b/web/frontend/src/job/StatsTable.svelte @@ -4,6 +4,9 @@ Properties: - `job Object`: The job object - `jobMetrics [Object]`: The jobs metricdata + + Exported: + - `moreLoaded`: Adds additional scopes requested from Metric.svelte in Job-View --> - +
+ {#each selectedMetrics as metric} + {/each} + {#each selectedMetrics as metric} @@ -146,8 +161,6 @@
- {metric} - {#each scopesForMetric(metric, jobMetrics) as scope} {/each} - +
Node
-
- - - - (isOpen = !isOpen)}> - - Manage Tags - {#if pendingChange !== false} - - {:else} - - {/if} - - - - -
- - - Search using "type: name". If no tag matches your search, a - button for creating a new one will appear. - - -
    - {#each allTagsFiltered as tag} - - - - - {#if pendingChange === tag.id} - - {:else if job.tags.find((t) => t.id == tag.id)} - - {:else} - - {/if} - - - {:else} - - No tags matching - - {/each} -
-
- {#if newTagType && newTagName && isNewTag(newTagType, newTagName)} - - {:else if allTagsFiltered.length == 0} - Search Term is not a valid Tag (type: name) - {/if} -
- - - -
- - - - diff --git a/web/frontend/src/jobs.entrypoint.js b/web/frontend/src/jobs.entrypoint.js index f976b4ae..9e98d0d7 100644 --- a/web/frontend/src/jobs.entrypoint.js +++ b/web/frontend/src/jobs.entrypoint.js @@ -9,6 +9,7 @@ new Jobs({ roles: roles }, context: new Map([ - ['cc-config', clusterCockpitConfig] + ['cc-config', clusterCockpitConfig], + ['resampling', resampleConfig] ]) }) diff --git a/web/frontend/src/user.entrypoint.js b/web/frontend/src/user.entrypoint.js index 0bff82aa..16c616de 100644 --- a/web/frontend/src/user.entrypoint.js +++ b/web/frontend/src/user.entrypoint.js @@ -8,6 +8,7 @@ new User({ user: userInfos }, context: new Map([ - ['cc-config', clusterCockpitConfig] + ['cc-config', clusterCockpitConfig], + ['resampling', resampleConfig] ]) }) diff --git a/web/templates/config.tmpl b/web/templates/config.tmpl index 9f3f3be9..7993c3e3 100644 --- a/web/templates/config.tmpl +++ b/web/templates/config.tmpl @@ -12,6 +12,7 @@ const username = {{ .User.Username }}; const filterPresets = {{ .FilterPresets }}; const clusterCockpitConfig = {{ .Config }}; + const resampleConfig = {{ .Resampling }}; {{end}} \ No newline at end of file diff --git a/web/templates/monitoring/job.tmpl b/web/templates/monitoring/job.tmpl index 9b344f9a..92365b3c 100644 --- a/web/templates/monitoring/job.tmpl +++ b/web/templates/monitoring/job.tmpl @@ -11,8 +11,10 @@ id: "{{ .Infos.id }}", }; const clusterCockpitConfig = {{ .Config }}; + const username = {{ .User.Username }}; const authlevel = {{ .User.GetAuthLevel }}; const roles = {{ .Roles }}; + const resampleConfig = {{ .Resampling }}; {{end}} diff --git a/web/templates/monitoring/jobs.tmpl b/web/templates/monitoring/jobs.tmpl index 6ea05d56..42484712 100644 --- a/web/templates/monitoring/jobs.tmpl +++ b/web/templates/monitoring/jobs.tmpl @@ -12,6 +12,7 @@ const clusterCockpitConfig = {{ .Config }}; const authlevel = {{ .User.GetAuthLevel }}; const roles = {{ .Roles }}; + const resampleConfig = {{ .Resampling }}; {{end}} diff --git a/web/templates/monitoring/taglist.tmpl b/web/templates/monitoring/taglist.tmpl index 6e487dd3..7831a645 100644 --- a/web/templates/monitoring/taglist.tmpl +++ b/web/templates/monitoring/taglist.tmpl @@ -7,8 +7,16 @@ {{ $tagType }} {{ range $tagList }} -
- {{ .name }} {{ .count }} + {{if eq .scope "global"}} + + {{ .name }} {{ .count }} + {{else if eq .scope "admin"}} + + {{ .name }} {{ .count }} + {{else}} + + {{ .name }} {{ .count }} + {{end}} {{end}} {{end}} diff --git a/web/templates/monitoring/user.tmpl b/web/templates/monitoring/user.tmpl index 693ae618..8b4cf446 100644 --- a/web/templates/monitoring/user.tmpl +++ b/web/templates/monitoring/user.tmpl @@ -10,6 +10,7 @@ const userInfos = {{ .Infos }}; const filterPresets = {{ .FilterPresets }}; const clusterCockpitConfig = {{ .Config }}; + const resampleConfig = {{ .Resampling }}; {{end}} diff --git a/web/web.go b/web/web.go index 99008b51..45ca9e3d 100644 --- a/web/web.go +++ b/web/web.go @@ -98,6 +98,7 @@ type Page struct { FilterPresets map[string]interface{} // For pages with the Filter component, this can be used to set initial filters. Infos map[string]interface{} // For generic use (e.g. username for /monitoring/user/, job id for /monitoring/job/) Config map[string]interface{} // UI settings for the currently logged in user (e.g. line width, ...) + Resampling *schema.ResampleConfig // If not nil, defines resampling trigger and resolutions } func RenderTemplate(rw http.ResponseWriter, file string, page *Page) {