From 5ed1bc2ee0b2a5a2c0cfe991c653b090055a7b28 Mon Sep 17 00:00:00 2001 From: "Brian C. Lane" Date: Mon, 6 Nov 2023 13:21:18 -0800 Subject: [PATCH] Don't Panic in getComposeStatus and skip invalid jobs in fsjobqueue New This handles corrupt job json files by skipping them. They still exist, and errors are logged, but the system keeps working. If one or more of the json files in /var/lib/osbuild-composer/jobs/ becomes corrupt they can stop the osbuild-composer service from starting, or stop commands like 'composer-cli compose status' from working because they quit on the first error and miss any job that aren't broken. --- internal/jobqueue/fsjobqueue/fsjobqueue.go | 3 +- .../jobqueue/fsjobqueue/fsjobqueue_test.go | 14 ++ internal/mocks/rpmmd/fixtures.go | 20 +++ internal/store/fixtures.go | 149 ++++++++++++++++++ internal/weldr/api.go | 120 ++++++++++++-- internal/weldr/api_test.go | 3 + 6 files changed, 291 insertions(+), 18 deletions(-) diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index 18c81e4c75e..57252cb618c 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -106,7 +106,8 @@ func New(dir string) (*fsJobQueue, error) { } j, err := q.readJob(jobId) if err != nil { - return nil, err + // Skip invalid jobs, leaving them in place for later examination + continue } // If a job is running, and not cancelled, track the token diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go index aba51cc1e29..fe2ce2c3d76 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go @@ -1,6 +1,8 @@ package fsjobqueue_test import ( + "os" + "path" "testing" "github.com/osbuild/osbuild-composer/pkg/jobqueue" @@ -28,3 +30,15 @@ func TestNonExistant(t *testing.T) { require.Error(t, err) require.Nil(t, q) } + +func TestJobQueueBadJSON(t *testing.T) { + dir := t.TempDir() + + // Write a purposfully invalid JSON file into the queue + err := os.WriteFile(path.Join(dir, "/4f1cf5f8-525d-46b7-aef4-33c6a919c038.json"), []byte("{invalid json content"), 0600) + require.Nil(t, err) + + q, err := fsjobqueue.New(dir) + require.Nil(t, err) + require.NotNil(t, q) +} diff --git a/internal/mocks/rpmmd/fixtures.go b/internal/mocks/rpmmd/fixtures.go index 96444890446..f5c8896fb1b 100644 --- a/internal/mocks/rpmmd/fixtures.go +++ b/internal/mocks/rpmmd/fixtures.go @@ -1,6 +1,9 @@ package rpmmd_mock import ( + "os" + "path" + "github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue" dnfjson_mock "github.com/osbuild/osbuild-composer/internal/mocks/dnfjson" "github.com/osbuild/osbuild-composer/internal/store" @@ -64,3 +67,20 @@ func OldChangesFixture(tmpdir string) Fixture { dnfjson_mock.Base, } } + +func BadJobJSONFixture(tmpdir string) Fixture { + err := os.Mkdir(path.Join(tmpdir, "/jobs"), 0755) + if err != nil { + panic(err) + } + err = os.WriteFile(path.Join(tmpdir, "/jobs/30000000-0000-0000-0000-000000000005.json"), []byte("{invalid json content"), 0600) + if err != nil { + panic(err) + } + + return Fixture{ + store.FixtureJobs(), + createBaseWorkersFixture(path.Join(tmpdir, "/jobs")), + dnfjson_mock.Base, + } +} diff --git a/internal/store/fixtures.go b/internal/store/fixtures.go index 9e6e92b511e..59f76687cba 100644 --- a/internal/store/fixtures.go +++ b/internal/store/fixtures.go @@ -354,3 +354,152 @@ func FixtureOldChanges() *Store { return s } + +// Fixture to use for checking job queue files +func FixtureJobs() *Store { + var bName = "test" + var b = blueprint.Blueprint{ + Name: bName, + Version: "0.0.0", + Packages: []blueprint.Package{}, + Modules: []blueprint.Package{}, + Groups: []blueprint.Group{}, + Customizations: nil, + } + + var date = time.Date(2019, 11, 27, 13, 19, 0, 0, time.FixedZone("UTC+1", 60*60)) + + var awsTarget = &target.Target{ + Uuid: uuid.MustParse("10000000-0000-0000-0000-000000000000"), + Name: target.TargetNameAWS, + ImageName: "awsimage", + Created: date, + Status: common.IBWaiting, + Options: &target.AWSTargetOptions{ + Region: "frankfurt", + AccessKeyID: "accesskey", + SecretAccessKey: "secretkey", + Bucket: "clay", + Key: "imagekey", + }, + } + + dr := test_distro.NewRegistry() + d := dr.FromHost() + arch, err := d.GetArch(test_distro.TestArchName) + if err != nil { + panic(fmt.Sprintf("failed to get architecture %s for a test distro: %v", test_distro.TestArchName, err)) + } + imgType, err := arch.GetImageType(test_distro.TestImageTypeName) + if err != nil { + panic(fmt.Sprintf("failed to get image type %s for a test distro architecture: %v", test_distro.TestImageTypeName, err)) + } + manifest, _, err := imgType.Manifest(nil, distro.ImageOptions{}, nil, 0) + if err != nil { + panic(fmt.Sprintf("failed to create a manifest: %v", err)) + } + + mf, err := manifest.Serialize(nil, nil, nil) + if err != nil { + panic(fmt.Sprintf("failed to create a manifest: %v", err)) + } + + s := New(nil, dr, nil) + + pkgs := []rpmmd.PackageSpec{ + { + Name: "test1", + Epoch: 0, + Version: "2.11.2", + Release: "1.fc35", + Arch: test_distro.TestArchName, + }, { + Name: "test2", + Epoch: 3, + Version: "4.2.2", + Release: "1.fc35", + Arch: test_distro.TestArchName, + }} + + s.blueprints[bName] = b + s.composes = map[uuid.UUID]Compose{ + uuid.MustParse("30000000-0000-0000-0000-000000000000"): { + Blueprint: &b, + ImageBuild: ImageBuild{ + QueueStatus: common.IBWaiting, + ImageType: imgType, + Manifest: mf, + Targets: []*target.Target{awsTarget}, + JobCreated: date, + }, + Packages: []rpmmd.PackageSpec{}, + }, + uuid.MustParse("30000000-0000-0000-0000-000000000001"): { + Blueprint: &b, + ImageBuild: ImageBuild{ + QueueStatus: common.IBRunning, + ImageType: imgType, + Manifest: mf, + Targets: []*target.Target{}, + JobCreated: date, + JobStarted: date, + }, + Packages: []rpmmd.PackageSpec{}, + }, + uuid.MustParse("30000000-0000-0000-0000-000000000002"): { + Blueprint: &b, + ImageBuild: ImageBuild{ + QueueStatus: common.IBFinished, + ImageType: imgType, + Manifest: mf, + Targets: []*target.Target{awsTarget}, + JobCreated: date, + JobStarted: date, + JobFinished: date, + }, + Packages: []rpmmd.PackageSpec{}, + }, + uuid.MustParse("30000000-0000-0000-0000-000000000003"): { + Blueprint: &b, + ImageBuild: ImageBuild{ + QueueStatus: common.IBFailed, + ImageType: imgType, + Manifest: mf, + Targets: []*target.Target{awsTarget}, + JobCreated: date, + JobStarted: date, + JobFinished: date, + }, + Packages: []rpmmd.PackageSpec{}, + }, + uuid.MustParse("30000000-0000-0000-0000-000000000004"): { + Blueprint: &b, + ImageBuild: ImageBuild{ + QueueStatus: common.IBFinished, + ImageType: imgType, + Manifest: mf, + Targets: []*target.Target{awsTarget}, + JobCreated: date, + JobStarted: date, + JobFinished: date, + }, + Packages: pkgs, + }, + uuid.MustParse("30000000-0000-0000-0000-000000000005"): { + Blueprint: &b, + ImageBuild: ImageBuild{ + QueueStatus: common.IBFinished, + ImageType: imgType, + Manifest: mf, + Targets: []*target.Target{awsTarget}, + JobCreated: date, + JobStarted: date, + JobFinished: date, + JobID: uuid.MustParse("30000000-0000-0000-0000-000000000005"), + }, + Packages: pkgs, + }, + } + + return s +} diff --git a/internal/weldr/api.go b/internal/weldr/api.go index 70fadc53e31..379a7dbfd2c 100644 --- a/internal/weldr/api.go +++ b/internal/weldr/api.go @@ -364,7 +364,7 @@ func composeStateFromJobStatus(js *worker.JobStatus, result *worker.OSBuildJobRe // Returns the state of the image in `compose` and the times the job was // queued, started, and finished. Assumes that there's only one image in the // compose. -func (api *API) getComposeStatus(compose store.Compose) *composeStatus { +func (api *API) getComposeStatus(compose store.Compose) (*composeStatus, error) { jobId := compose.ImageBuild.JobID // backwards compatibility: composes that were around before splitting @@ -388,14 +388,14 @@ func (api *API) getComposeStatus(compose store.Compose) *composeStatus { Started: compose.ImageBuild.JobStarted, Finished: compose.ImageBuild.JobFinished, Result: &osbuild.Result{}, - } + }, nil } // All jobs are "osbuild" jobs. var result worker.OSBuildJobResult jobInfo, err := api.workers.OSBuildJobInfo(jobId, &result) if err != nil { - panic(err) + return nil, err } return &composeStatus{ @@ -404,7 +404,7 @@ func (api *API) getComposeStatus(compose store.Compose) *composeStatus { Started: jobInfo.JobStatus.Started, Finished: jobInfo.JobStatus.Finished, Result: result.OSBuildOutput, - } + }, nil } // Opens the image file for `compose`. This asks the worker server for the @@ -2659,7 +2659,14 @@ func (api *API) composeDeleteHandler(writer http.ResponseWriter, request *http.R continue } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors = append(errors, composeDeleteError{ + "ComposeStatusError", + fmt.Sprintf("Error getting status of compose %s: %s", id, err), + }) + continue + } if composeStatus.State != ComposeFinished && composeStatus.State != ComposeFailed { errors = append(errors, composeDeleteError{ "BuildInWrongState", @@ -2724,7 +2731,15 @@ func (api *API) composeCancelHandler(writer http.ResponseWriter, request *http.R return } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", id, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } if composeStatus.State != ComposeWaiting && composeStatus.State != ComposeRunning { errors := responseError{ ID: "BuildInWrongState", @@ -2828,7 +2843,11 @@ func (api *API) composeQueueHandler(writer http.ResponseWriter, request *http.Re composes := api.store.GetAllComposes() for id, compose := range composes { - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + log.Printf("Error getting status of compose %s: %s", id, err) + continue + } switch composeStatus.State { case ComposeWaiting: reply.New = append(reply.New, composeToComposeEntry(id, compose, composeStatus, includeUploads)) @@ -2899,7 +2918,12 @@ func (api *API) composeStatusHandler(writer http.ResponseWriter, request *http.R if !exists { continue } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + log.Printf("Error getting status of compose %s: %s", id, err) + continue + } + if filterBlueprint != "" && compose.Blueprint.Name != filterBlueprint { continue } else if filterStatus != "" && composeStatus.State.ToString() != filterStatus { @@ -2914,7 +2938,12 @@ func (api *API) composeStatusHandler(writer http.ResponseWriter, request *http.R includeUploads := isRequestVersionAtLeast(params, 1) for _, id := range filteredUUIDs { if compose, exists := composes[id]; exists { - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + log.Printf("Error getting status of compose %s: %s", id, err) + continue + } + reply.UUIDs = append(reply.UUIDs, composeToComposeEntry(id, compose, composeStatus, includeUploads)) } } @@ -2969,7 +2998,16 @@ func (api *API) composeInfoHandler(writer http.ResponseWriter, request *http.Req reply.Blueprint = compose.Blueprint // Weldr API assumes only one image build per compose, that's why only the // 1st build is considered - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", id, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } + reply.ComposeType = compose.ImageBuild.ImageType.Name() reply.QueueStatus = composeStatus.State.ToString() reply.ImageSize = compose.ImageBuild.Size @@ -3016,7 +3054,15 @@ func (api *API) composeImageHandler(writer http.ResponseWriter, request *http.Re return } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", uuid, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } if composeStatus.State != ComposeFinished { errors := responseError{ ID: "BuildInWrongState", @@ -3074,7 +3120,15 @@ func (api *API) composeMetadataHandler(writer http.ResponseWriter, request *http return } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", uuid, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } if composeStatus.State != ComposeFinished && composeStatus.State != ComposeFailed { errors := responseError{ ID: "BuildInWrongState", @@ -3135,7 +3189,15 @@ func (api *API) composeResultsHandler(writer http.ResponseWriter, request *http. return } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", uuid, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } if composeStatus.State != ComposeFinished && composeStatus.State != ComposeFailed { errors := responseError{ ID: "BuildInWrongState", @@ -3226,7 +3288,15 @@ func (api *API) composeLogsHandler(writer http.ResponseWriter, request *http.Req return } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", id, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } if composeStatus.State != ComposeFinished && composeStatus.State != ComposeFailed { errors := responseError{ ID: "BuildInWrongState", @@ -3290,7 +3360,15 @@ func (api *API) composeLogHandler(writer http.ResponseWriter, request *http.Requ return } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", id, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } if composeStatus.State == ComposeWaiting { errors := responseError{ ID: "BuildInWrongState", @@ -3320,7 +3398,11 @@ func (api *API) composeFinishedHandler(writer http.ResponseWriter, request *http includeUploads := isRequestVersionAtLeast(params, 1) for id, compose := range api.store.GetAllComposes() { - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + log.Printf("Error getting status of compose %s: %s", id, err) + continue + } if composeStatus.State != ComposeFinished { continue } @@ -3343,7 +3425,11 @@ func (api *API) composeFailedHandler(writer http.ResponseWriter, request *http.R includeUploads := isRequestVersionAtLeast(params, 1) for id, compose := range api.store.GetAllComposes() { - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + log.Printf("Error getting status of compose %s: %s", id, err) + continue + } if composeStatus.State != ComposeFailed { continue } diff --git a/internal/weldr/api_test.go b/internal/weldr/api_test.go index 9b83eb6aeaf..aab08bbc2ce 100644 --- a/internal/weldr/api_test.go +++ b/internal/weldr/api_test.go @@ -1373,6 +1373,8 @@ func TestComposeStatus(t *testing.T) { {rpmmd_mock.BaseFixture, "GET", "/api/v0/compose/status/*?status=FINISHED", ``, http.StatusOK, fmt.Sprintf(`{"uuids":[{"id":"30000000-0000-0000-0000-000000000002","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FINISHED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140},{"id":"30000000-0000-0000-0000-000000000004","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FINISHED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140}]}`, test_distro.TestImageTypeName)}, {rpmmd_mock.BaseFixture, "GET", fmt.Sprintf("/api/v0/compose/status/*?type=%s", test_distro.TestImageTypeName), ``, http.StatusOK, fmt.Sprintf(`{"uuids":[{"id":"30000000-0000-0000-0000-000000000000","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"WAITING","job_created":1574857140},{"id":"30000000-0000-0000-0000-000000000001","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"RUNNING","job_created":1574857140,"job_started":1574857140},{"id":"30000000-0000-0000-0000-000000000002","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FINISHED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140},{"id":"30000000-0000-0000-0000-000000000003","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FAILED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140},{"id":"30000000-0000-0000-0000-000000000004","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FINISHED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140}]}`, test_distro.TestImageTypeName)}, {rpmmd_mock.BaseFixture, "GET", "/api/v1/compose/status/30000000-0000-0000-0000-000000000000", ``, http.StatusOK, fmt.Sprintf(`{"uuids":[{"id":"30000000-0000-0000-0000-000000000000","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"WAITING","job_created":1574857140,"uploads":[{"uuid":"10000000-0000-0000-0000-000000000000","status":"WAITING","provider_name":"aws","image_name":"awsimage","creation_time":1574857140,"settings":{"region":"frankfurt","bucket":"clay","key":"imagekey"}}]}]}`, test_distro.TestImageTypeName)}, + {rpmmd_mock.BadJobJSONFixture, "GET", "/api/v0/compose/status/*", ``, http.StatusOK, fmt.Sprintf(`{"uuids":[{"id":"30000000-0000-0000-0000-000000000000","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"WAITING","job_created":1574857140},{"id":"30000000-0000-0000-0000-000000000001","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"RUNNING","job_created":1574857140,"job_started":1574857140},{"id":"30000000-0000-0000-0000-000000000002","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FINISHED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140},{"id":"30000000-0000-0000-0000-000000000003","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FAILED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140},{"id":"30000000-0000-0000-0000-000000000004","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FINISHED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140}]}`, test_distro.TestImageTypeName)}, + {rpmmd_mock.BadJobJSONFixture, "GET", "/api/v0/compose/status/30000000-0000-0000-0000-000000000005", ``, http.StatusOK, `{"uuids":[]}`}, } if len(os.Getenv("OSBUILD_COMPOSER_TEST_EXTERNAL")) > 0 { @@ -1524,6 +1526,7 @@ func TestComposeQueue(t *testing.T) { {rpmmd_mock.BaseFixture, "GET", "/api/v0/compose/queue", ``, http.StatusOK, fmt.Sprintf(`{"new":[{"blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"WAITING"}],"run":[{"blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"RUNNING"}]}`, test_distro.TestImageTypeName)}, {rpmmd_mock.BaseFixture, "GET", "/api/v1/compose/queue", ``, http.StatusOK, fmt.Sprintf(`{"new":[{"blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"WAITING","uploads":[{"uuid":"10000000-0000-0000-0000-000000000000","status":"WAITING","provider_name":"aws","image_name":"awsimage","creation_time":1574857140,"settings":{"region":"frankfurt","bucket":"clay","key":"imagekey"}}]}],"run":[{"blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"RUNNING"}]}`, test_distro.TestImageTypeName)}, {rpmmd_mock.NoComposesFixture, "GET", "/api/v0/compose/queue", ``, http.StatusOK, `{"new":[],"run":[]}`}, + {rpmmd_mock.BadJobJSONFixture, "GET", "/api/v0/compose/queue", ``, http.StatusOK, fmt.Sprintf(`{"new":[{"blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"WAITING"}],"run":[{"blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"RUNNING"}]}`, test_distro.TestImageTypeName)}, } if len(os.Getenv("OSBUILD_COMPOSER_TEST_EXTERNAL")) > 0 {