Skip to content

Commit

Permalink
Merge branch 'dev' into add_nats_server
Browse files Browse the repository at this point in the history
  • Loading branch information
spacehamster87 committed Sep 25, 2024
2 parents f61b5bb + 8e3327e commit 140d1b7
Show file tree
Hide file tree
Showing 65 changed files with 2,482 additions and 806 deletions.
5 changes: 3 additions & 2 deletions api/schema.graphqls
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type Tag {
id: ID!
type: String!
name: String!
scope: String!
}

type Resource {
Expand Down Expand Up @@ -223,7 +224,7 @@ type Query {
allocatedNodes(cluster: String!): [Count!]!

job(id: ID!): Job
jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!]): [JobMetricWithName!]!
jobMetrics(id: ID!, metrics: [String!], scopes: [MetricScope!], resolution: Int): [JobMetricWithName!]!
jobsFootprints(filter: [JobFilter!], metrics: [String!]!): Footprints

jobs(filter: [JobFilter!], page: PageRequest, order: OrderByInput): JobResultList!
Expand All @@ -235,7 +236,7 @@ type Query {
}

type Mutation {
createTag(type: String!, name: String!): Tag!
createTag(type: String!, name: String!, scope: String!): Tag!
deleteTag(id: ID!): ID!
addTagsToJob(job: ID!, tagIds: [ID!]!): [Tag!]!
removeTagsFromJob(job: ID!, tagIds: [ID!]!): [Tag!]!
Expand Down
9 changes: 4 additions & 5 deletions internal/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ func cleanup() {
func TestRestApi(t *testing.T) {
restapi := setup(t)
t.Cleanup(cleanup)

testData := schema.JobData{
"load_one": map[schema.MetricScope]*schema.JobMetric{
schema.MetricScopeNode: {
Expand All @@ -192,7 +191,7 @@ func TestRestApi(t *testing.T) {
},
}

metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context) (schema.JobData, error) {
metricdata.TestLoadDataCallback = func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) {
return testData, nil
}

Expand All @@ -215,7 +214,7 @@ func TestRestApi(t *testing.T) {
"exclusive": 1,
"monitoringStatus": 1,
"smt": 1,
"tags": [{ "type": "testTagType", "name": "testTagName" }],
"tags": [{ "type": "testTagType", "name": "testTagName", "scope": "testuser" }],
"resources": [
{
"hostname": "host123",
Expand Down Expand Up @@ -283,7 +282,7 @@ func TestRestApi(t *testing.T) {
t.Fatalf("unexpected job properties: %#v", job)
}

if len(job.Tags) != 1 || job.Tags[0].Type != "testTagType" || job.Tags[0].Name != "testTagName" {
if len(job.Tags) != 1 || job.Tags[0].Type != "testTagType" || job.Tags[0].Name != "testTagName" || job.Tags[0].Scope != "testuser" {
t.Fatalf("unexpected tags: %#v", job.Tags)
}

Expand Down Expand Up @@ -344,7 +343,7 @@ func TestRestApi(t *testing.T) {
}

t.Run("CheckArchive", func(t *testing.T) {
data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background())
data, err := metricDataDispatcher.LoadData(stoppedJob, []string{"load_one"}, []schema.MetricScope{schema.MetricScopeNode}, context.Background(), 60)
if err != nil {
t.Fatal(err)
}
Expand Down
45 changes: 31 additions & 14 deletions internal/api/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,9 @@ type ErrorResponse struct {
// ApiTag model
type ApiTag struct {
// Tag Type
Type string `json:"type" example:"Debug"`
Name string `json:"name" example:"Testjob"` // Tag Name
Type string `json:"type" example:"Debug"`
Name string `json:"name" example:"Testjob"` // Tag Name
Scope string `json:"scope" example:"global"` // Tag Scope for Frontend Display
}

// ApiMeta model
Expand Down Expand Up @@ -420,7 +421,7 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) {
StartTime: job.StartTime.Unix(),
}

res.Tags, err = api.JobRepository.GetTags(&job.ID)
res.Tags, err = api.JobRepository.GetTags(r.Context(), &job.ID)
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
Expand Down Expand Up @@ -493,7 +494,7 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request)
return
}

job.Tags, err = api.JobRepository.GetTags(&job.ID)
job.Tags, err = api.JobRepository.GetTags(r.Context(), &job.ID)
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
Expand All @@ -515,8 +516,15 @@ func (api *RestApi) getCompleteJobById(rw http.ResponseWriter, r *http.Request)

var data schema.JobData

metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
resolution := 0

for _, mc := range metricConfigs {
resolution = max(resolution, mc.Timestep)
}

if r.URL.Query().Get("all-metrics") == "true" {
data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context())
data, err = metricDataDispatcher.LoadData(job, nil, scopes, r.Context(), resolution)
if err != nil {
log.Warn("Error while loading job data")
return
Expand Down Expand Up @@ -579,7 +587,7 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
return
}

job.Tags, err = api.JobRepository.GetTags(&job.ID)
job.Tags, err = api.JobRepository.GetTags(r.Context(), &job.ID)
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
Expand All @@ -605,7 +613,14 @@ func (api *RestApi) getJobById(rw http.ResponseWriter, r *http.Request) {
scopes = []schema.MetricScope{"node"}
}

data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context())
metricConfigs := archive.GetCluster(job.Cluster).MetricConfig
resolution := 0

for _, mc := range metricConfigs {
resolution = max(resolution, mc.Timestep)
}

data, err := metricDataDispatcher.LoadData(job, metrics, scopes, r.Context(), resolution)
if err != nil {
log.Warn("Error while loading job data")
return
Expand Down Expand Up @@ -687,6 +702,7 @@ func (api *RestApi) editMeta(rw http.ResponseWriter, r *http.Request) {
// @summary Adds one or more tags to a job
// @tags Job add and modify
// @description Adds tag(s) to a job specified by DB ID. Name and Type of Tag(s) can be chosen freely.
// @description Tag Scope for frontend visibility will default to "global" if none entered, other options: "admin" or specific username.
// @description If tagged job is already finished: Tag will be written directly to respective archive files.
// @accept json
// @produce json
Expand All @@ -712,7 +728,7 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
return
}

job.Tags, err = api.JobRepository.GetTags(&job.ID)
job.Tags, err = api.JobRepository.GetTags(r.Context(), &job.ID)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
Expand All @@ -725,16 +741,17 @@ func (api *RestApi) tagJob(rw http.ResponseWriter, r *http.Request) {
}

for _, tag := range req {
tagId, err := api.JobRepository.AddTagOrCreate(job.ID, tag.Type, tag.Name)
tagId, err := api.JobRepository.AddTagOrCreate(r.Context(), job.ID, tag.Type, tag.Name, tag.Scope)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}

job.Tags = append(job.Tags, &schema.Tag{
ID: tagId,
Type: tag.Type,
Name: tag.Name,
ID: tagId,
Type: tag.Type,
Name: tag.Name,
Scope: tag.Scope,
})
}

Expand Down Expand Up @@ -802,7 +819,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) {
unlockOnce.Do(api.RepositoryMutex.Unlock)

for _, tag := range req.Tags {
if _, err := api.JobRepository.AddTagOrCreate(id, tag.Type, tag.Name); err != nil {
if _, err := api.JobRepository.AddTagOrCreate(r.Context(), id, tag.Type, tag.Name, tag.Scope); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
handleError(fmt.Errorf("adding tag to new job %d failed: %w", id, err), http.StatusInternalServerError, rw)
return
Expand Down Expand Up @@ -1111,7 +1128,7 @@ func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) {
}

resolver := graph.GetResolverInstance()
data, err := resolver.Query().JobMetrics(r.Context(), id, metrics, scopes)
data, err := resolver.Query().JobMetrics(r.Context(), id, metrics, scopes, nil)
if err != nil {
json.NewEncoder(rw).Encode(Respone{
Error: &struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/archiver/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
scopes = append(scopes, schema.MetricScopeAccelerator)
}

jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx)
jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, ctx, 0) // 0 Resulotion-Value retrieves highest res (60s)
if err != nil {
log.Error("Error wile loading job data for archiving")
return nil, err
Expand Down
8 changes: 6 additions & 2 deletions internal/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,18 @@ func GetAuthInstance() *Authentication {

func persistUser(user *schema.User) {
r := repository.GetUserRepository()
_, err := r.GetUser(user.Username)
dbUser, err := r.GetUser(user.Username)

if err != nil && err != sql.ErrNoRows {
log.Errorf("Error while loading user '%s': %v", user.Username, err)
} else if err == sql.ErrNoRows {
} else if err == sql.ErrNoRows { // Adds New User
if err := r.AddUser(user); err != nil {
log.Errorf("Error while adding user '%s' to DB: %v", user.Username, err)
}
} else { // Update Existing
if err := r.UpdateUser(dbUser, user); err != nil {
log.Errorf("Error while updating user '%s' to DB: %v", user.Username, err)
}
}
}

Expand Down
Loading

0 comments on commit 140d1b7

Please sign in to comment.