diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index 18c81e4c75..57252cb618 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -106,7 +106,8 @@ func New(dir string) (*fsJobQueue, error) { } j, err := q.readJob(jobId) if err != nil { - return nil, err + // Skip invalid jobs, leaving them in place for later examination + continue } // If a job is running, and not cancelled, track the token diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go index aba51cc1e2..fe2ce2c3d7 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go @@ -1,6 +1,8 @@ package fsjobqueue_test import ( + "os" + "path" "testing" "github.com/osbuild/osbuild-composer/pkg/jobqueue" @@ -28,3 +30,15 @@ func TestNonExistant(t *testing.T) { require.Error(t, err) require.Nil(t, q) } + +func TestJobQueueBadJSON(t *testing.T) { + dir := t.TempDir() + + // Write a purposfully invalid JSON file into the queue + err := os.WriteFile(path.Join(dir, "/4f1cf5f8-525d-46b7-aef4-33c6a919c038.json"), []byte("{invalid json content"), 0600) + require.Nil(t, err) + + q, err := fsjobqueue.New(dir) + require.Nil(t, err) + require.NotNil(t, q) +} diff --git a/internal/mocks/rpmmd/fixtures.go b/internal/mocks/rpmmd/fixtures.go index 9644489044..f5c8896fb1 100644 --- a/internal/mocks/rpmmd/fixtures.go +++ b/internal/mocks/rpmmd/fixtures.go @@ -1,6 +1,9 @@ package rpmmd_mock import ( + "os" + "path" + "github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue" dnfjson_mock "github.com/osbuild/osbuild-composer/internal/mocks/dnfjson" "github.com/osbuild/osbuild-composer/internal/store" @@ -64,3 +67,20 @@ func OldChangesFixture(tmpdir string) Fixture { dnfjson_mock.Base, } } + +func BadJobJSONFixture(tmpdir string) Fixture { + err := os.Mkdir(path.Join(tmpdir, "/jobs"), 0755) + if err != nil { + panic(err) + } + err = os.WriteFile(path.Join(tmpdir, "/jobs/30000000-0000-0000-0000-000000000005.json"), []byte("{invalid json content"), 0600) + if err != nil { + panic(err) + } + + return Fixture{ + store.FixtureJobs(), + createBaseWorkersFixture(path.Join(tmpdir, "/jobs")), + dnfjson_mock.Base, + } +} diff --git a/internal/store/fixtures.go b/internal/store/fixtures.go index 9e6e92b511..59f76687cb 100644 --- a/internal/store/fixtures.go +++ b/internal/store/fixtures.go @@ -354,3 +354,152 @@ func FixtureOldChanges() *Store { return s } + +// Fixture to use for checking job queue files +func FixtureJobs() *Store { + var bName = "test" + var b = blueprint.Blueprint{ + Name: bName, + Version: "0.0.0", + Packages: []blueprint.Package{}, + Modules: []blueprint.Package{}, + Groups: []blueprint.Group{}, + Customizations: nil, + } + + var date = time.Date(2019, 11, 27, 13, 19, 0, 0, time.FixedZone("UTC+1", 60*60)) + + var awsTarget = &target.Target{ + Uuid: uuid.MustParse("10000000-0000-0000-0000-000000000000"), + Name: target.TargetNameAWS, + ImageName: "awsimage", + Created: date, + Status: common.IBWaiting, + Options: &target.AWSTargetOptions{ + Region: "frankfurt", + AccessKeyID: "accesskey", + SecretAccessKey: "secretkey", + Bucket: "clay", + Key: "imagekey", + }, + } + + dr := test_distro.NewRegistry() + d := dr.FromHost() + arch, err := d.GetArch(test_distro.TestArchName) + if err != nil { + panic(fmt.Sprintf("failed to get architecture %s for a test distro: %v", test_distro.TestArchName, err)) + } + imgType, err := arch.GetImageType(test_distro.TestImageTypeName) + if err != nil { + panic(fmt.Sprintf("failed to get image type %s for a test distro architecture: %v", test_distro.TestImageTypeName, err)) + } + manifest, _, err := imgType.Manifest(nil, distro.ImageOptions{}, nil, 0) + if err != nil { + panic(fmt.Sprintf("failed to create a manifest: %v", err)) + } + + mf, err := manifest.Serialize(nil, nil, nil) + if err != nil { + panic(fmt.Sprintf("failed to create a manifest: %v", err)) + } + + s := New(nil, dr, nil) + + pkgs := []rpmmd.PackageSpec{ + { + Name: "test1", + Epoch: 0, + Version: "2.11.2", + Release: "1.fc35", + Arch: test_distro.TestArchName, + }, { + Name: "test2", + Epoch: 3, + Version: "4.2.2", + Release: "1.fc35", + Arch: test_distro.TestArchName, + }} + + s.blueprints[bName] = b + s.composes = map[uuid.UUID]Compose{ + uuid.MustParse("30000000-0000-0000-0000-000000000000"): { + Blueprint: &b, + ImageBuild: ImageBuild{ + QueueStatus: common.IBWaiting, + ImageType: imgType, + Manifest: mf, + Targets: []*target.Target{awsTarget}, + JobCreated: date, + }, + Packages: []rpmmd.PackageSpec{}, + }, + uuid.MustParse("30000000-0000-0000-0000-000000000001"): { + Blueprint: &b, + ImageBuild: ImageBuild{ + QueueStatus: common.IBRunning, + ImageType: imgType, + Manifest: mf, + Targets: []*target.Target{}, + JobCreated: date, + JobStarted: date, + }, + Packages: []rpmmd.PackageSpec{}, + }, + uuid.MustParse("30000000-0000-0000-0000-000000000002"): { + Blueprint: &b, + ImageBuild: ImageBuild{ + QueueStatus: common.IBFinished, + ImageType: imgType, + Manifest: mf, + Targets: []*target.Target{awsTarget}, + JobCreated: date, + JobStarted: date, + JobFinished: date, + }, + Packages: []rpmmd.PackageSpec{}, + }, + uuid.MustParse("30000000-0000-0000-0000-000000000003"): { + Blueprint: &b, + ImageBuild: ImageBuild{ + QueueStatus: common.IBFailed, + ImageType: imgType, + Manifest: mf, + Targets: []*target.Target{awsTarget}, + JobCreated: date, + JobStarted: date, + JobFinished: date, + }, + Packages: []rpmmd.PackageSpec{}, + }, + uuid.MustParse("30000000-0000-0000-0000-000000000004"): { + Blueprint: &b, + ImageBuild: ImageBuild{ + QueueStatus: common.IBFinished, + ImageType: imgType, + Manifest: mf, + Targets: []*target.Target{awsTarget}, + JobCreated: date, + JobStarted: date, + JobFinished: date, + }, + Packages: pkgs, + }, + uuid.MustParse("30000000-0000-0000-0000-000000000005"): { + Blueprint: &b, + ImageBuild: ImageBuild{ + QueueStatus: common.IBFinished, + ImageType: imgType, + Manifest: mf, + Targets: []*target.Target{awsTarget}, + JobCreated: date, + JobStarted: date, + JobFinished: date, + JobID: uuid.MustParse("30000000-0000-0000-0000-000000000005"), + }, + Packages: pkgs, + }, + } + + return s +} diff --git a/internal/weldr/api.go b/internal/weldr/api.go index 70fadc53e3..379a7dbfd2 100644 --- a/internal/weldr/api.go +++ b/internal/weldr/api.go @@ -364,7 +364,7 @@ func composeStateFromJobStatus(js *worker.JobStatus, result *worker.OSBuildJobRe // Returns the state of the image in `compose` and the times the job was // queued, started, and finished. Assumes that there's only one image in the // compose. -func (api *API) getComposeStatus(compose store.Compose) *composeStatus { +func (api *API) getComposeStatus(compose store.Compose) (*composeStatus, error) { jobId := compose.ImageBuild.JobID // backwards compatibility: composes that were around before splitting @@ -388,14 +388,14 @@ func (api *API) getComposeStatus(compose store.Compose) *composeStatus { Started: compose.ImageBuild.JobStarted, Finished: compose.ImageBuild.JobFinished, Result: &osbuild.Result{}, - } + }, nil } // All jobs are "osbuild" jobs. var result worker.OSBuildJobResult jobInfo, err := api.workers.OSBuildJobInfo(jobId, &result) if err != nil { - panic(err) + return nil, err } return &composeStatus{ @@ -404,7 +404,7 @@ func (api *API) getComposeStatus(compose store.Compose) *composeStatus { Started: jobInfo.JobStatus.Started, Finished: jobInfo.JobStatus.Finished, Result: result.OSBuildOutput, - } + }, nil } // Opens the image file for `compose`. This asks the worker server for the @@ -2659,7 +2659,14 @@ func (api *API) composeDeleteHandler(writer http.ResponseWriter, request *http.R continue } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors = append(errors, composeDeleteError{ + "ComposeStatusError", + fmt.Sprintf("Error getting status of compose %s: %s", id, err), + }) + continue + } if composeStatus.State != ComposeFinished && composeStatus.State != ComposeFailed { errors = append(errors, composeDeleteError{ "BuildInWrongState", @@ -2724,7 +2731,15 @@ func (api *API) composeCancelHandler(writer http.ResponseWriter, request *http.R return } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", id, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } if composeStatus.State != ComposeWaiting && composeStatus.State != ComposeRunning { errors := responseError{ ID: "BuildInWrongState", @@ -2828,7 +2843,11 @@ func (api *API) composeQueueHandler(writer http.ResponseWriter, request *http.Re composes := api.store.GetAllComposes() for id, compose := range composes { - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + log.Printf("Error getting status of compose %s: %s", id, err) + continue + } switch composeStatus.State { case ComposeWaiting: reply.New = append(reply.New, composeToComposeEntry(id, compose, composeStatus, includeUploads)) @@ -2899,7 +2918,12 @@ func (api *API) composeStatusHandler(writer http.ResponseWriter, request *http.R if !exists { continue } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + log.Printf("Error getting status of compose %s: %s", id, err) + continue + } + if filterBlueprint != "" && compose.Blueprint.Name != filterBlueprint { continue } else if filterStatus != "" && composeStatus.State.ToString() != filterStatus { @@ -2914,7 +2938,12 @@ func (api *API) composeStatusHandler(writer http.ResponseWriter, request *http.R includeUploads := isRequestVersionAtLeast(params, 1) for _, id := range filteredUUIDs { if compose, exists := composes[id]; exists { - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + log.Printf("Error getting status of compose %s: %s", id, err) + continue + } + reply.UUIDs = append(reply.UUIDs, composeToComposeEntry(id, compose, composeStatus, includeUploads)) } } @@ -2969,7 +2998,16 @@ func (api *API) composeInfoHandler(writer http.ResponseWriter, request *http.Req reply.Blueprint = compose.Blueprint // Weldr API assumes only one image build per compose, that's why only the // 1st build is considered - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", id, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } + reply.ComposeType = compose.ImageBuild.ImageType.Name() reply.QueueStatus = composeStatus.State.ToString() reply.ImageSize = compose.ImageBuild.Size @@ -3016,7 +3054,15 @@ func (api *API) composeImageHandler(writer http.ResponseWriter, request *http.Re return } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", uuid, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } if composeStatus.State != ComposeFinished { errors := responseError{ ID: "BuildInWrongState", @@ -3074,7 +3120,15 @@ func (api *API) composeMetadataHandler(writer http.ResponseWriter, request *http return } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", uuid, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } if composeStatus.State != ComposeFinished && composeStatus.State != ComposeFailed { errors := responseError{ ID: "BuildInWrongState", @@ -3135,7 +3189,15 @@ func (api *API) composeResultsHandler(writer http.ResponseWriter, request *http. return } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", uuid, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } if composeStatus.State != ComposeFinished && composeStatus.State != ComposeFailed { errors := responseError{ ID: "BuildInWrongState", @@ -3226,7 +3288,15 @@ func (api *API) composeLogsHandler(writer http.ResponseWriter, request *http.Req return } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", id, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } if composeStatus.State != ComposeFinished && composeStatus.State != ComposeFailed { errors := responseError{ ID: "BuildInWrongState", @@ -3290,7 +3360,15 @@ func (api *API) composeLogHandler(writer http.ResponseWriter, request *http.Requ return } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", id, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } if composeStatus.State == ComposeWaiting { errors := responseError{ ID: "BuildInWrongState", @@ -3320,7 +3398,11 @@ func (api *API) composeFinishedHandler(writer http.ResponseWriter, request *http includeUploads := isRequestVersionAtLeast(params, 1) for id, compose := range api.store.GetAllComposes() { - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + log.Printf("Error getting status of compose %s: %s", id, err) + continue + } if composeStatus.State != ComposeFinished { continue } @@ -3343,7 +3425,11 @@ func (api *API) composeFailedHandler(writer http.ResponseWriter, request *http.R includeUploads := isRequestVersionAtLeast(params, 1) for id, compose := range api.store.GetAllComposes() { - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + log.Printf("Error getting status of compose %s: %s", id, err) + continue + } if composeStatus.State != ComposeFailed { continue } diff --git a/internal/weldr/api_test.go b/internal/weldr/api_test.go index 9b83eb6aea..aab08bbc2c 100644 --- a/internal/weldr/api_test.go +++ b/internal/weldr/api_test.go @@ -1373,6 +1373,8 @@ func TestComposeStatus(t *testing.T) { {rpmmd_mock.BaseFixture, "GET", "/api/v0/compose/status/*?status=FINISHED", ``, http.StatusOK, fmt.Sprintf(`{"uuids":[{"id":"30000000-0000-0000-0000-000000000002","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FINISHED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140},{"id":"30000000-0000-0000-0000-000000000004","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FINISHED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140}]}`, test_distro.TestImageTypeName)}, {rpmmd_mock.BaseFixture, "GET", fmt.Sprintf("/api/v0/compose/status/*?type=%s", test_distro.TestImageTypeName), ``, http.StatusOK, fmt.Sprintf(`{"uuids":[{"id":"30000000-0000-0000-0000-000000000000","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"WAITING","job_created":1574857140},{"id":"30000000-0000-0000-0000-000000000001","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"RUNNING","job_created":1574857140,"job_started":1574857140},{"id":"30000000-0000-0000-0000-000000000002","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FINISHED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140},{"id":"30000000-0000-0000-0000-000000000003","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FAILED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140},{"id":"30000000-0000-0000-0000-000000000004","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FINISHED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140}]}`, test_distro.TestImageTypeName)}, {rpmmd_mock.BaseFixture, "GET", "/api/v1/compose/status/30000000-0000-0000-0000-000000000000", ``, http.StatusOK, fmt.Sprintf(`{"uuids":[{"id":"30000000-0000-0000-0000-000000000000","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"WAITING","job_created":1574857140,"uploads":[{"uuid":"10000000-0000-0000-0000-000000000000","status":"WAITING","provider_name":"aws","image_name":"awsimage","creation_time":1574857140,"settings":{"region":"frankfurt","bucket":"clay","key":"imagekey"}}]}]}`, test_distro.TestImageTypeName)}, + {rpmmd_mock.BadJobJSONFixture, "GET", "/api/v0/compose/status/*", ``, http.StatusOK, fmt.Sprintf(`{"uuids":[{"id":"30000000-0000-0000-0000-000000000000","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"WAITING","job_created":1574857140},{"id":"30000000-0000-0000-0000-000000000001","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"RUNNING","job_created":1574857140,"job_started":1574857140},{"id":"30000000-0000-0000-0000-000000000002","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FINISHED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140},{"id":"30000000-0000-0000-0000-000000000003","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FAILED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140},{"id":"30000000-0000-0000-0000-000000000004","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FINISHED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140}]}`, test_distro.TestImageTypeName)}, + {rpmmd_mock.BadJobJSONFixture, "GET", "/api/v0/compose/status/30000000-0000-0000-0000-000000000005", ``, http.StatusOK, `{"uuids":[]}`}, } if len(os.Getenv("OSBUILD_COMPOSER_TEST_EXTERNAL")) > 0 { @@ -1524,6 +1526,7 @@ func TestComposeQueue(t *testing.T) { {rpmmd_mock.BaseFixture, "GET", "/api/v0/compose/queue", ``, http.StatusOK, fmt.Sprintf(`{"new":[{"blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"WAITING"}],"run":[{"blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"RUNNING"}]}`, test_distro.TestImageTypeName)}, {rpmmd_mock.BaseFixture, "GET", "/api/v1/compose/queue", ``, http.StatusOK, fmt.Sprintf(`{"new":[{"blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"WAITING","uploads":[{"uuid":"10000000-0000-0000-0000-000000000000","status":"WAITING","provider_name":"aws","image_name":"awsimage","creation_time":1574857140,"settings":{"region":"frankfurt","bucket":"clay","key":"imagekey"}}]}],"run":[{"blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"RUNNING"}]}`, test_distro.TestImageTypeName)}, {rpmmd_mock.NoComposesFixture, "GET", "/api/v0/compose/queue", ``, http.StatusOK, `{"new":[],"run":[]}`}, + {rpmmd_mock.BadJobJSONFixture, "GET", "/api/v0/compose/queue", ``, http.StatusOK, fmt.Sprintf(`{"new":[{"blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"WAITING"}],"run":[{"blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"RUNNING"}]}`, test_distro.TestImageTypeName)}, } if len(os.Getenv("OSBUILD_COMPOSER_TEST_EXTERNAL")) > 0 {