From 98dfabd178ccf8abe5b78927558a1bcdeb3d6f22 Mon Sep 17 00:00:00 2001 From: Pascal Delange Date: Mon, 6 Jan 2025 13:50:22 +0100 Subject: [PATCH] fix: silent error during batch ingestion --- cmd/batch_ingestion.go | 10 +++------- cmd/scheduled_executor.go | 10 +++------- cmd/send_pending_webhook_events.go | 10 +++------- jobs/execute_with_monitoring.go | 7 +++---- jobs/ingest_data_from_csv.go | 4 ++-- jobs/scheduled_scenarios.go | 5 ++--- jobs/scheduler.go | 19 ++++++------------- jobs/send_pending_webhook_events.go | 5 ++--- usecases/ingestion_usecase.go | 8 ++++++-- utils/sentry.go | 9 --------- 10 files changed, 30 insertions(+), 57 deletions(-) diff --git a/cmd/batch_ingestion.go b/cmd/batch_ingestion.go index 24f486d61..81209c286 100644 --- a/cmd/batch_ingestion.go +++ b/cmd/batch_ingestion.go @@ -2,7 +2,6 @@ package cmd import ( "context" - "log/slog" "time" "github.com/checkmarble/marble-backend/infra" @@ -14,6 +13,7 @@ import ( "github.com/getsentry/sentry-go" ) +// Deprecated func RunBatchIngestion() error { // This is where we read the environment variables and set up the configuration for the application. gcpConfig := infra.GcpConfig{ @@ -103,10 +103,6 @@ func RunBatchIngestion() error { usecases.WithConvoyServer(convoyConfiguration.APIUrl), ) - err = jobs.IngestDataFromCsv(ctx, uc) - if err != nil { - logger.ErrorContext(ctx, "failed to ingest data from csvs", slog.String("error", err.Error())) - } - - return err + jobs.IngestDataFromCsv(ctx, uc) + return nil } diff --git a/cmd/scheduled_executor.go b/cmd/scheduled_executor.go index 464ff70d9..b250a2fcc 100644 --- a/cmd/scheduled_executor.go +++ b/cmd/scheduled_executor.go @@ -2,7 +2,6 @@ package cmd import ( "context" - "log/slog" "time" "github.com/checkmarble/marble-backend/infra" @@ -17,6 +16,7 @@ import ( "github.com/riverqueue/river/riverdriver/riverpgxv5" ) +// Deprecated func RunScheduledExecuter() error { // This is where we read the environment variables and set up the configuration for the application. gcpConfig := infra.GcpConfig{ @@ -116,10 +116,6 @@ func RunScheduledExecuter() error { usecases.WithConvoyServer(convoyConfiguration.APIUrl), ) - err = jobs.ExecuteAllScheduledScenarios(ctx, uc) - if err != nil { - logger.ErrorContext(ctx, "failed to execute all scheduled scenarios", slog.String("error", err.Error())) - } - - return err + jobs.ExecuteAllScheduledScenarios(ctx, uc) + return nil } diff --git a/cmd/send_pending_webhook_events.go b/cmd/send_pending_webhook_events.go index de55aff35..fe2875f4a 100644 --- a/cmd/send_pending_webhook_events.go +++ b/cmd/send_pending_webhook_events.go @@ -2,7 +2,6 @@ package cmd import ( "context" - "log/slog" "time" "github.com/checkmarble/marble-backend/infra" @@ -14,6 +13,7 @@ import ( "github.com/getsentry/sentry-go" ) +// Deprecated func RunSendPendingWebhookEvents() error { // This is where we read the environment variables and set up the configuration for the application. gcpConfig := infra.GcpConfig{ @@ -93,10 +93,6 @@ func RunSendPendingWebhookEvents() error { usecases.WithConvoyServer(convoyConfiguration.APIUrl), ) - err = jobs.SendPendingWebhookEvents(ctx, uc) - if err != nil { - logger.ErrorContext(ctx, "failed to send pending webhook events", slog.String("error", err.Error())) - } - - return err + jobs.SendPendingWebhookEvents(ctx, uc) + return nil } diff --git a/jobs/execute_with_monitoring.go b/jobs/execute_with_monitoring.go index 00a31f662..f072893fa 100644 --- a/jobs/execute_with_monitoring.go +++ b/jobs/execute_with_monitoring.go @@ -7,7 +7,6 @@ import ( "github.com/checkmarble/marble-backend/usecases" "github.com/checkmarble/marble-backend/utils" - "github.com/cockroachdb/errors" "github.com/getsentry/sentry-go" ) @@ -16,7 +15,7 @@ func executeWithMonitoring( uc usecases.Usecases, jobName string, fn func(context.Context, usecases.Usecases) error, -) error { +) { logger := utils.LoggerFromContext(ctx) logger.InfoContext(ctx, fmt.Sprintf("Start job %s", jobName)) @@ -43,7 +42,8 @@ func executeWithMonitoring( } else { sentry.CaptureException(err) } - return errors.Wrap(err, fmt.Sprintf("error executing job %s", jobName)) + logger.ErrorContext(ctx, fmt.Sprintf("Unexpected Error in batch job: %+v", err)) + return } sentry.CaptureCheckIn( @@ -56,5 +56,4 @@ func executeWithMonitoring( ) logger.InfoContext(ctx, fmt.Sprintf("Done executing job %s", jobName)) - return nil } diff --git a/jobs/ingest_data_from_csv.go b/jobs/ingest_data_from_csv.go index 090e44a15..1cc7cec21 100644 --- a/jobs/ingest_data_from_csv.go +++ b/jobs/ingest_data_from_csv.go @@ -9,8 +9,8 @@ import ( const csvIngestionTimeout = 1 * time.Hour -func IngestDataFromCsv(ctx context.Context, uc usecases.Usecases) error { - return executeWithMonitoring( +func IngestDataFromCsv(ctx context.Context, uc usecases.Usecases) { + executeWithMonitoring( ctx, uc, "batch-ingestion", diff --git a/jobs/scheduled_scenarios.go b/jobs/scheduled_scenarios.go index 1ddba6f65..c4ecdcc10 100644 --- a/jobs/scheduled_scenarios.go +++ b/jobs/scheduled_scenarios.go @@ -13,9 +13,8 @@ import ( const batchScenarioExecutionTimeout = 3 * time.Hour -// Runs every minute -func ExecuteAllScheduledScenarios(ctx context.Context, uc usecases.Usecases) error { - return executeWithMonitoring( +func ExecuteAllScheduledScenarios(ctx context.Context, uc usecases.Usecases) { + executeWithMonitoring( ctx, uc, "scheduled-execution", diff --git a/jobs/scheduler.go b/jobs/scheduler.go index f0adb6aea..f23a46c13 100644 --- a/jobs/scheduler.go +++ b/jobs/scheduler.go @@ -8,13 +8,6 @@ import ( "github.com/checkmarble/marble-backend/utils" ) -func errToReturnCode(err error) int { - if err != nil { - return 1 - } - return 0 -} - // Deprecated and to be moved into the river task scheduler func RunScheduler(ctx context.Context, usecases usecases.Usecases) { taskr := tasker.New(tasker.Option{ @@ -25,22 +18,22 @@ func RunScheduler(ctx context.Context, usecases usecases.Usecases) { taskr.Task("* * * * *", func(ctx context.Context) (int, error) { logger := utils.LoggerFromContext(ctx).With("job", "execute_all_scheduled_scenarios") ctx = utils.StoreLoggerInContext(ctx, logger) - err := ExecuteAllScheduledScenarios(ctx, usecases) - return errToReturnCode(err), err + ExecuteAllScheduledScenarios(ctx, usecases) + return 0, nil }) taskr.Task("* * * * *", func(ctx context.Context) (int, error) { logger := utils.LoggerFromContext(ctx).With("job", "ingest_data_from_csv") ctx = utils.StoreLoggerInContext(ctx, logger) - err := IngestDataFromCsv(ctx, usecases) - return errToReturnCode(err), err + IngestDataFromCsv(ctx, usecases) + return 0, nil }) taskr.Task("*/10 * * * *", func(ctx context.Context) (int, error) { logger := utils.LoggerFromContext(ctx).With("job", "send_webhook_events_to_convoy") ctx = utils.StoreLoggerInContext(ctx, logger) - err := SendPendingWebhookEvents(ctx, usecases) - return errToReturnCode(err), err + SendPendingWebhookEvents(ctx, usecases) + return 0, nil }) taskr.Run() diff --git a/jobs/send_pending_webhook_events.go b/jobs/send_pending_webhook_events.go index 467119604..294ad614f 100644 --- a/jobs/send_pending_webhook_events.go +++ b/jobs/send_pending_webhook_events.go @@ -6,9 +6,8 @@ import ( "github.com/checkmarble/marble-backend/usecases" ) -// Runs every minute -func SendPendingWebhookEvents(ctx context.Context, uc usecases.Usecases) error { - return executeWithMonitoring( +func SendPendingWebhookEvents(ctx context.Context, uc usecases.Usecases) { + executeWithMonitoring( ctx, uc, "send-webhook-events", diff --git a/usecases/ingestion_usecase.go b/usecases/ingestion_usecase.go index 9069513f1..3baee2245 100644 --- a/usecases/ingestion_usecase.go +++ b/usecases/ingestion_usecase.go @@ -387,7 +387,7 @@ func (usecase *IngestionUseCase) processUploadLog(ctx context.Context, uploadLog out := usecase.readFileIngestObjects(ctx, file.FileName, file.ReadCloser) if out.err != nil { setToFailed(out.numRowsIngested) - return err + return out.err } currentTime := time.Now() @@ -409,6 +409,8 @@ type ingestionResult struct { err error } +// This method uses a return value wrapping an error, because we still want to use the number of rows ingested even if +// an error occurred. func (usecase *IngestionUseCase) readFileIngestObjects(ctx context.Context, fileName string, fileReader io.Reader) ingestionResult { logger := utils.LoggerFromContext(ctx) logger.InfoContext(ctx, fmt.Sprintf("Ingesting data from CSV %s", fileName)) @@ -455,7 +457,9 @@ func (usecase *IngestionUseCase) ingestObjectsFromCSV(ctx context.Context, organ duration := end.Sub(start) // divide by 1e6 convert to milliseconds (base is nanoseconds) avgDuration := float64(duration) / float64(total*1e6) - logger.InfoContext(ctx, fmt.Sprintf("Successfully ingested %d objects in %s, average %vms", total, duration, avgDuration)) + if total > 0 { + logger.InfoContext(ctx, fmt.Sprintf("Successfully ingested %d objects in %s, average %vms", total, duration, avgDuration)) + } } defer printDuration() diff --git a/utils/sentry.go b/utils/sentry.go index aaeaa1edf..294336e10 100644 --- a/utils/sentry.go +++ b/utils/sentry.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "strings" "github.com/getsentry/sentry-go" ) @@ -13,14 +12,6 @@ func LogAndReportSentryError(ctx context.Context, err error) { logger := LoggerFromContext(ctx) logger.ErrorContext(ctx, fmt.Sprintf("%+v", err)) - // Known issue where Cloud Run will sometimes fail to create the unix socket to connect to CloudSQL. - // This always happens at the launching of a job or server, when we set up the db pool. - // In this case, we don't log the error in Sentry - if strings.Contains(err.Error(), "failed to connect to `host=/cloudsql/") { - logger.WarnContext(ctx, "Failed to create unix socket to connect to CloudSQL. Wait for the next execution of the job or retry starting the server") - return - } - // Ignore errors that are due to context deadlines or canceled context, as presumably their root case has been handled if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { logger.DebugContext(ctx, fmt.Sprintf("Deadline exceeded or context canceled: %v", err))