Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: silent error during batch ingestion #794

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions cmd/batch_ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"context"
"log/slog"
"time"

"github.com/checkmarble/marble-backend/infra"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/getsentry/sentry-go"
)

// Deprecated
func RunBatchIngestion() error {
// This is where we read the environment variables and set up the configuration for the application.
gcpConfig := infra.GcpConfig{
Expand Down Expand Up @@ -103,10 +103,6 @@ func RunBatchIngestion() error {
usecases.WithConvoyServer(convoyConfiguration.APIUrl),
)

err = jobs.IngestDataFromCsv(ctx, uc)
if err != nil {
logger.ErrorContext(ctx, "failed to ingest data from csvs", slog.String("error", err.Error()))
}

return err
jobs.IngestDataFromCsv(ctx, uc)
return nil
}
10 changes: 3 additions & 7 deletions cmd/scheduled_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"context"
"log/slog"
"time"

"github.com/checkmarble/marble-backend/infra"
Expand All @@ -17,6 +16,7 @@ import (
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)

// Deprecated
func RunScheduledExecuter() error {
// This is where we read the environment variables and set up the configuration for the application.
gcpConfig := infra.GcpConfig{
Expand Down Expand Up @@ -116,10 +116,6 @@ func RunScheduledExecuter() error {
usecases.WithConvoyServer(convoyConfiguration.APIUrl),
)

err = jobs.ExecuteAllScheduledScenarios(ctx, uc)
if err != nil {
logger.ErrorContext(ctx, "failed to execute all scheduled scenarios", slog.String("error", err.Error()))
}

return err
jobs.ExecuteAllScheduledScenarios(ctx, uc)
return nil
}
10 changes: 3 additions & 7 deletions cmd/send_pending_webhook_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"context"
"log/slog"
"time"

"github.com/checkmarble/marble-backend/infra"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/getsentry/sentry-go"
)

// Deprecated
func RunSendPendingWebhookEvents() error {
// This is where we read the environment variables and set up the configuration for the application.
gcpConfig := infra.GcpConfig{
Expand Down Expand Up @@ -93,10 +93,6 @@ func RunSendPendingWebhookEvents() error {
usecases.WithConvoyServer(convoyConfiguration.APIUrl),
)

err = jobs.SendPendingWebhookEvents(ctx, uc)
if err != nil {
logger.ErrorContext(ctx, "failed to send pending webhook events", slog.String("error", err.Error()))
}

return err
jobs.SendPendingWebhookEvents(ctx, uc)
return nil
}
7 changes: 3 additions & 4 deletions jobs/execute_with_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/checkmarble/marble-backend/usecases"
"github.com/checkmarble/marble-backend/utils"

"github.com/cockroachdb/errors"
"github.com/getsentry/sentry-go"
)

Expand All @@ -16,7 +15,7 @@ func executeWithMonitoring(
uc usecases.Usecases,
jobName string,
fn func(context.Context, usecases.Usecases) error,
) error {
) {
logger := utils.LoggerFromContext(ctx)
logger.InfoContext(ctx, fmt.Sprintf("Start job %s", jobName))

Expand All @@ -43,7 +42,8 @@ func executeWithMonitoring(
} else {
sentry.CaptureException(err)
}
return errors.Wrap(err, fmt.Sprintf("error executing job %s", jobName))
logger.ErrorContext(ctx, fmt.Sprintf("Unexpected Error in batch job: %+v", err))
return
}

sentry.CaptureCheckIn(
Expand All @@ -56,5 +56,4 @@ func executeWithMonitoring(
)

logger.InfoContext(ctx, fmt.Sprintf("Done executing job %s", jobName))
return nil
}
4 changes: 2 additions & 2 deletions jobs/ingest_data_from_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

const csvIngestionTimeout = 1 * time.Hour

func IngestDataFromCsv(ctx context.Context, uc usecases.Usecases) error {
return executeWithMonitoring(
func IngestDataFromCsv(ctx context.Context, uc usecases.Usecases) {
executeWithMonitoring(
ctx,
uc,
"batch-ingestion",
Expand Down
5 changes: 2 additions & 3 deletions jobs/scheduled_scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ import (

const batchScenarioExecutionTimeout = 3 * time.Hour

// Runs every minute
func ExecuteAllScheduledScenarios(ctx context.Context, uc usecases.Usecases) error {
return executeWithMonitoring(
func ExecuteAllScheduledScenarios(ctx context.Context, uc usecases.Usecases) {
executeWithMonitoring(
ctx,
uc,
"scheduled-execution",
Expand Down
19 changes: 6 additions & 13 deletions jobs/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@ import (
"github.com/checkmarble/marble-backend/utils"
)

func errToReturnCode(err error) int {
if err != nil {
return 1
}
return 0
}

// Deprecated and to be moved into the river task scheduler
func RunScheduler(ctx context.Context, usecases usecases.Usecases) {
taskr := tasker.New(tasker.Option{
Expand All @@ -25,22 +18,22 @@ func RunScheduler(ctx context.Context, usecases usecases.Usecases) {
taskr.Task("* * * * *", func(ctx context.Context) (int, error) {
logger := utils.LoggerFromContext(ctx).With("job", "execute_all_scheduled_scenarios")
ctx = utils.StoreLoggerInContext(ctx, logger)
err := ExecuteAllScheduledScenarios(ctx, usecases)
return errToReturnCode(err), err
ExecuteAllScheduledScenarios(ctx, usecases)
return 0, nil
})

taskr.Task("* * * * *", func(ctx context.Context) (int, error) {
logger := utils.LoggerFromContext(ctx).With("job", "ingest_data_from_csv")
ctx = utils.StoreLoggerInContext(ctx, logger)
err := IngestDataFromCsv(ctx, usecases)
return errToReturnCode(err), err
IngestDataFromCsv(ctx, usecases)
return 0, nil
})

taskr.Task("*/10 * * * *", func(ctx context.Context) (int, error) {
logger := utils.LoggerFromContext(ctx).With("job", "send_webhook_events_to_convoy")
ctx = utils.StoreLoggerInContext(ctx, logger)
err := SendPendingWebhookEvents(ctx, usecases)
return errToReturnCode(err), err
SendPendingWebhookEvents(ctx, usecases)
return 0, nil
})

taskr.Run()
Expand Down
5 changes: 2 additions & 3 deletions jobs/send_pending_webhook_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ import (
"github.com/checkmarble/marble-backend/usecases"
)

// Runs every minute
func SendPendingWebhookEvents(ctx context.Context, uc usecases.Usecases) error {
return executeWithMonitoring(
func SendPendingWebhookEvents(ctx context.Context, uc usecases.Usecases) {
executeWithMonitoring(
ctx,
uc,
"send-webhook-events",
Expand Down
8 changes: 6 additions & 2 deletions usecases/ingestion_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (usecase *IngestionUseCase) processUploadLog(ctx context.Context, uploadLog
out := usecase.readFileIngestObjects(ctx, file.FileName, file.ReadCloser)
if out.err != nil {
setToFailed(out.numRowsIngested)
return err
return out.err
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main point of the PR is here. The rest is mainly cleaning up potentially duplicate error handling

}

currentTime := time.Now()
Expand All @@ -409,6 +409,8 @@ type ingestionResult struct {
err error
}

// This method uses a return value wrapping an error, because we still want to use the number of rows ingested even if
// an error occurred.
func (usecase *IngestionUseCase) readFileIngestObjects(ctx context.Context, fileName string, fileReader io.Reader) ingestionResult {
logger := utils.LoggerFromContext(ctx)
logger.InfoContext(ctx, fmt.Sprintf("Ingesting data from CSV %s", fileName))
Expand Down Expand Up @@ -455,7 +457,9 @@ func (usecase *IngestionUseCase) ingestObjectsFromCSV(ctx context.Context, organ
duration := end.Sub(start)
// divide by 1e6 convert to milliseconds (base is nanoseconds)
avgDuration := float64(duration) / float64(total*1e6)
logger.InfoContext(ctx, fmt.Sprintf("Successfully ingested %d objects in %s, average %vms", total, duration, avgDuration))
if total > 0 {
logger.InfoContext(ctx, fmt.Sprintf("Successfully ingested %d objects in %s, average %vms", total, duration, avgDuration))
}
}
defer printDuration()

Expand Down
9 changes: 0 additions & 9 deletions utils/sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"strings"

"github.com/getsentry/sentry-go"
)
Expand All @@ -13,14 +12,6 @@ func LogAndReportSentryError(ctx context.Context, err error) {
logger := LoggerFromContext(ctx)
logger.ErrorContext(ctx, fmt.Sprintf("%+v", err))

// Known issue where Cloud Run will sometimes fail to create the unix socket to connect to CloudSQL.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be so much of an issue any more, this was a pain point when we were starting a cloud run job 3 times per minute, and just hitting the CloudSQL availability SLAs.

// This always happens at the launching of a job or server, when we set up the db pool.
// In this case, we don't log the error in Sentry
if strings.Contains(err.Error(), "failed to connect to `host=/cloudsql/") {
logger.WarnContext(ctx, "Failed to create unix socket to connect to CloudSQL. Wait for the next execution of the job or retry starting the server")
return
}

// Ignore errors that are due to context deadlines or canceled context, as presumably their root case has been handled
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
logger.DebugContext(ctx, fmt.Sprintf("Deadline exceeded or context canceled: %v", err))
Expand Down
Loading