Skip to content

Commit

Permalink
sync-sidecar: better error reporting. enabled pprof profiler
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jan 28, 2025
1 parent f450e43 commit 59d8abf
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 11 deletions.
8 changes: 4 additions & 4 deletions sync-sidecar/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ ON CONFLICT ON CONSTRAINT source_catalog_pkey DO UPDATE SET catalog=$4, timestam
upsertStateSQL = `INSERT INTO source_state (sync_id, stream, state, timestamp) VALUES ($1, $2, $3, $4)
ON CONFLICT ON CONSTRAINT source_state_pkey DO UPDATE SET state=$3, timestamp = $4`

upsertTaskDescriptionSQL = `INSERT INTO source_task (sync_id, task_id, package, version, started_at, updated_at, status, description) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT ON CONSTRAINT source_task_pkey DO UPDATE SET updated_at=$6, status = $7, description=$8`
upsertTaskDescriptionAndErrorSQL = `INSERT INTO source_task (sync_id, task_id, package, version, started_at, updated_at, status, description, error) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT ON CONSTRAINT source_task_pkey DO UPDATE SET updated_at=$6, status = $7, description=$8, error=$9`

upsertTaskErrorSQL = `INSERT INTO source_task (sync_id, task_id, package, version, started_at, updated_at, status, error) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT ON CONSTRAINT source_task_pkey DO UPDATE SET updated_at=$6, status = $7, error=$8`
Expand Down Expand Up @@ -81,8 +81,8 @@ func UpsertState(dbpool *pgxpool.Pool, syncId, stream string, state any, timesta
return err
}

func UpsertTaskDescription(dbpool *pgxpool.Pool, syncId, taskId, packageName, packageVersion string, startedAt time.Time, status, description string) error {
_, err := dbpool.Exec(context.Background(), upsertTaskDescriptionSQL, syncId, taskId, packageName, packageVersion, startedAt, time.Now(), status, description)
func UpsertTaskDescriptionAndError(dbpool *pgxpool.Pool, syncId, taskId, packageName, packageVersion string, startedAt time.Time, status, description, error string) error {
_, err := dbpool.Exec(context.Background(), upsertTaskDescriptionAndErrorSQL, syncId, taskId, packageName, packageVersion, startedAt, time.Now(), status, description, error)
return err
}

Expand Down
6 changes: 6 additions & 0 deletions sync-sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/jitsucom/bulker/jitsubase/logging"
"github.com/jitsucom/bulker/jitsubase/utils"
"github.com/jitsucom/bulker/sync-sidecar/db"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"strconv"
Expand Down Expand Up @@ -132,6 +134,10 @@ func main() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)

go func() {
_ = http.ListenAndServe("localhost:6060", nil)
}()

go func() {
sig := <-sigs
logging.Infof("Received signal: %s. Shutting down...", sig)
Expand Down
24 changes: 17 additions & 7 deletions sync-sidecar/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,26 @@ func (s *ReadSideCar) Run() {
}
})
status := "PARTIAL"
errorText := ""
if allSuccess {
status = "SUCCESS"
} else if allFailed {
status = "FAILED"
} else if timeExceeded {
status = "TIME_EXCEEDED"
errorText = fmt.Sprintf("Task timeout: task is running for more than %d hours.", s.taskTimeoutHours)
} else if cancelled {
status = "CANCELLED"
errorText = "The task was cancelled"
} else if s.firstErr != nil {
errorText = "ERROR: " + s.firstErr.Error()
}

if allFailed && s.firstErr != nil {
s.sendBadStatus("FAILED", "ERROR: "+s.firstErr.Error())
} else {
processedStreamsJson, _ := jsonorder.Marshal(statusMap)
s.sendGoodStatus(status, string(processedStreamsJson), true)
s.sendGoodStatus(status, string(processedStreamsJson), errorText, true)
}
} else if s.isErr() {
s.sendBadStatus("FAILED", "ERROR: "+s.firstErr.Error())
Expand All @@ -128,7 +133,7 @@ func (s *ReadSideCar) Run() {
} else if cancelled {
s.sendBadStatus("CANCELLED", "The task was cancelled")
} else {
s.sendGoodStatus("SUCCESS", "", true)
s.sendGoodStatus("SUCCESS", "", "", true)
}
}()
s.log("Sidecar. command: read. syncId: %s, taskId: %s, package: %s:%s startedAt: %s", s.syncId, s.taskId, s.packageName, s.packageVersion, s.startedAt.Format(time.RFC3339))
Expand Down Expand Up @@ -211,7 +216,10 @@ func (s *ReadSideCar) Run() {
stream.errorFromLogs = row.Log.Message
}
}
s.sourceLog(row.Log.Level, row.Log.Message)
level := strings.ToUpper(row.Log.Level)
if shouldLog(level, s.logLevel) || shouldLog(level, s.dbLogLevel) {
s.sourceLog(row.Log.Level, row.Log.Message)
}
case DebugType:
message := row.Message
if row.Data != nil {
Expand All @@ -221,7 +229,9 @@ func (s *ReadSideCar) Run() {
message = fmt.Sprintf("%s: %s", message, sData)
}
}
s.sourceLog("DEBUG", message)
if shouldLog("DEBUG", s.logLevel) || shouldLog("DEBUG", s.dbLogLevel) {
s.sourceLog("DEBUG", message)
}
case StateType:
if s.lastStateMessage != lineStr {
s.processState(row.State)
Expand Down Expand Up @@ -572,7 +582,7 @@ func (s *ReadSideCar) updateRunningStatus() {
}
})
processedStreamsJson, _ := jsonorder.Marshal(statusMap)
s.sendGoodStatus("RUNNING", string(processedStreamsJson), false)
s.sendGoodStatus("RUNNING", string(processedStreamsJson), "", false)
}

func (s *ReadSideCar) sendBadStatus(status string, error string) {
Expand All @@ -583,11 +593,11 @@ func (s *ReadSideCar) sendBadStatus(status string, error string) {
}
}

func (s *ReadSideCar) sendGoodStatus(status string, description string, log bool) {
func (s *ReadSideCar) sendGoodStatus(status string, description, error string, log bool) {
if log {
s.log("READ %s", joinStrings(status, description, ": "))
}
err := db.UpsertTaskDescription(s.dbpool, s.syncId, s.taskId, s.packageName, s.packageVersion, s.startedAt, status, description)
err := db.UpsertTaskDescriptionAndError(s.dbpool, s.syncId, s.taskId, s.packageName, s.packageVersion, s.startedAt, status, description, error)
if err != nil {
s.panic("error updating task: %v", err)
}
Expand Down

0 comments on commit 59d8abf

Please sign in to comment.