Skip to content

Commit

Permalink
ingest: better error reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Dec 20, 2023
1 parent e65c9fa commit ef0e7df
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 85 deletions.
51 changes: 26 additions & 25 deletions bulkerapp/app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,17 @@ func (r *Router) EventsHandler(c *gin.Context) {

destination := r.repository.GetDestination(destinationId)
if destination == nil {
rError = r.ResponseError(c, http.StatusNotFound, "destination not found", false, fmt.Errorf("destination not found: %s", destinationId), "", true)
rError = r.ResponseError(c, http.StatusNotFound, "destination not found", false, fmt.Errorf("destination not found: %s", destinationId), true)
return
}
mode = string(destination.Mode())
if tableName == "" {
rError = r.ResponseError(c, http.StatusBadRequest, "missing required parameter", false, fmt.Errorf("tableName query parameter is required"), "", true)
rError = r.ResponseError(c, http.StatusBadRequest, "missing required parameter", false, fmt.Errorf("tableName query parameter is required"), true)
return
}
topicId, err := destination.TopicId(tableName)
if err != nil {
rError = r.ResponseError(c, http.StatusInternalServerError, "couldn't generate topicId", false, err, "", true)
rError = r.ResponseError(c, http.StatusInternalServerError, "couldn't generate topicId", false, err, true)
return
}
err = r.topicManager.EnsureDestinationTopic(destination, topicId)
Expand All @@ -137,20 +137,20 @@ func (r *Router) EventsHandler(c *gin.Context) {
if ok && kafkaErr.Code() == kafka.ErrTopicAlreadyExists {
r.Warnf("Topic %s already exists", topicId)
} else {
rError = r.ResponseError(c, http.StatusInternalServerError, "couldn't create topic", false, fmt.Errorf("topicId %s: %v", topicId, err), "", true)
rError = r.ResponseError(c, http.StatusInternalServerError, "couldn't create topic", false, fmt.Errorf("topicId %s: %v", topicId, err), true)
return
}
}

body, err := io.ReadAll(c.Request.Body)
if err != nil {
rError = r.ResponseError(c, http.StatusBadRequest, "error reading HTTP body", false, err, "", true)
rError = r.ResponseError(c, http.StatusBadRequest, "error reading HTTP body", false, err, true)
return
}
bytesRead = len(body)
err = r.producer.ProduceAsync(topicId, uuid.New(), body, map[string]string{MetricsMetaHeader: metricsMeta})
if err != nil {
rError = r.ResponseError(c, http.StatusInternalServerError, "producer error", true, err, "", true)
rError = r.ResponseError(c, http.StatusInternalServerError, "producer error", true, err, true)
return
}
c.JSON(http.StatusOK, gin.H{"message": "ok"})
Expand Down Expand Up @@ -179,12 +179,12 @@ func (r *Router) BulkHandler(c *gin.Context) {

destination := r.repository.GetDestination(destinationId)
if destination == nil {
rError = r.ResponseError(c, http.StatusNotFound, "destination not found", false, fmt.Errorf("destination not found: %s", destinationId), "", true)
rError = r.ResponseError(c, http.StatusNotFound, "destination not found", false, fmt.Errorf("destination not found: %s", destinationId), true)
return
}
mode = string(destination.Mode())
if tableName == "" {
rError = r.ResponseError(c, http.StatusBadRequest, "missing required parameter", false, fmt.Errorf("tableName query parameter is required"), "", true)
rError = r.ResponseError(c, http.StatusBadRequest, "missing required parameter", false, fmt.Errorf("tableName query parameter is required"), true)
return
}
var streamOptions []bulker.StreamOption
Expand All @@ -195,7 +195,7 @@ func (r *Router) BulkHandler(c *gin.Context) {
destination.InitBulkerInstance()
bulkerStream, err := destination.bulker.CreateStream(jobId, tableName, bulkMode, streamOptions...)
if err != nil {
rError = r.ResponseError(c, http.StatusInternalServerError, "create stream error", true, err, "", true)
rError = r.ResponseError(c, http.StatusInternalServerError, "create stream error", true, err, true)
return
}
scanner := bufio.NewScanner(c.Request.Body)
Expand All @@ -205,7 +205,7 @@ func (r *Router) BulkHandler(c *gin.Context) {
eventBytes := scanner.Bytes()
if len(eventBytes) >= 5 && string(eventBytes[:5]) == "ABORT" {
_, _ = bulkerStream.Abort(c)
rError = r.ResponseError(c, http.StatusBadRequest, "aborted", false, fmt.Errorf(string(eventBytes)), "", true)
rError = r.ResponseError(c, http.StatusBadRequest, "aborted", false, fmt.Errorf(string(eventBytes)), true)
return
}
bytesRead += len(eventBytes)
Expand All @@ -214,25 +214,25 @@ func (r *Router) BulkHandler(c *gin.Context) {
dec.UseNumber()
if err = dec.Decode(&obj); err != nil {
_, _ = bulkerStream.Abort(c)
rError = r.ResponseError(c, http.StatusBadRequest, "unmarhsal error", false, err, "", true)
rError = r.ResponseError(c, http.StatusBadRequest, "unmarhsal error", false, err, true)
return
}
if _, _, err = bulkerStream.Consume(c, obj); err != nil {
_, _ = bulkerStream.Abort(c)
rError = r.ResponseError(c, http.StatusBadRequest, "stream consume error", false, err, "", true)
rError = r.ResponseError(c, http.StatusBadRequest, "stream consume error", false, err, true)
return
}
consumed++
}
if err = scanner.Err(); err != nil {
_, _ = bulkerStream.Abort(c)
rError = r.ResponseError(c, http.StatusBadRequest, "scanner error", false, err, "", true)
rError = r.ResponseError(c, http.StatusBadRequest, "scanner error", false, err, true)
return
}
if consumed > 0 {
state, err := bulkerStream.Complete(c)
if err != nil {
rError = r.ResponseError(c, http.StatusBadRequest, "stream complete error", false, err, "", true)
rError = r.ResponseError(c, http.StatusBadRequest, "stream complete error", false, err, true)
return
}
r.Infof("Bulk stream for %s mode: %s Completed. Processed: %d in %dms.", jobId, mode, state.SuccessfulRows, time.Since(start).Milliseconds())
Expand Down Expand Up @@ -283,25 +283,26 @@ func (r *Router) IngestHandler(c *gin.Context) {
metrics.IngestHandlerRequests(domain, "success", "").Inc()
}
}()
c.Set(appbase.ContextLoggerName, "ingest")
body, err := io.ReadAll(c.Request.Body)
if err != nil {
rError = r.ResponseError(c, http.StatusBadRequest, "error reading HTTP body", false, err, "", true)
rError = r.ResponseError(c, http.StatusBadRequest, "error reading HTTP body", false, err, true)
return
}
ingestMessage := IngestMessage{}
err = jsoniter.Unmarshal(body, &ingestMessage)
if err != nil {
rError = r.ResponseError(c, http.StatusBadRequest, "error parsing IngestMessage", false, fmt.Errorf("%v: %s", err, string(body)), "", true)
rError = r.ResponseError(c, http.StatusBadRequest, "error parsing IngestMessage", false, fmt.Errorf("%v: %s", err, string(body)), true)
return
}
messageId := ingestMessage.MessageId
c.Set(appbase.ContextMessageId, messageId)
domain = utils.DefaultString(ingestMessage.Origin.Slug, ingestMessage.Origin.Domain)
logPrefix := fmt.Sprintf("[ingest] Message ID: %s Domain: %s", messageId, domain)
r.Debugf(logPrefix)
c.Set(appbase.ContextDomain, domain)

stream := r.getStream(ingestMessage)
if stream == nil {
rError = r.ResponseError(c, http.StatusBadRequest, "stream not found", false, nil, logPrefix, true)
rError = r.ResponseError(c, http.StatusBadRequest, "stream not found", false, nil, true)
return
}
eventsLogId = stream.Stream.Id
Expand All @@ -324,13 +325,13 @@ func (r *Router) IngestHandler(c *gin.Context) {
r.Debugf("[ingest] Message ID: %s Producing for: %s topic: %s key: %s", messageId, destination.ConnectionId, topic, messageKey)
if err != nil {
metrics.IngestedMessages(destination.ConnectionId, "error", "message marshal error").Inc()
rError = r.ResponseError(c, http.StatusBadRequest, "message marshal error", false, err, logPrefix, true)
rError = r.ResponseError(c, http.StatusBadRequest, "message marshal error", false, err, true)
continue
}
err = r.producer.ProduceAsync(topic, messageKey, payload, nil)
if err != nil {
metrics.IngestedMessages(destination.ConnectionId, "error", "producer error").Inc()
rError = r.ResponseError(c, http.StatusInternalServerError, "producer error", true, err, logPrefix, true)
rError = r.ResponseError(c, http.StatusInternalServerError, "producer error", true, err, true)
continue
}
metrics.IngestedMessages(destination.ConnectionId, "success", "").Inc()
Expand Down Expand Up @@ -369,7 +370,7 @@ func (r *Router) FailedHandler(c *gin.Context) {
err = consumer.Assign([]kafka.TopicPartition{{Topic: &topicId, Partition: 0, Offset: kafka.OffsetBeginning}})
}
if err != nil {
r.ResponseError(c, http.StatusInternalServerError, "consumer error", true, err, "", true)
r.ResponseError(c, http.StatusInternalServerError, "consumer error", true, err, true)
return
}
start := time.Now()
Expand Down Expand Up @@ -406,14 +407,14 @@ func (r *Router) FailedHandler(c *gin.Context) {
func (r *Router) TestConnectionHandler(c *gin.Context) {
body, err := io.ReadAll(c.Request.Body)
if err != nil {
_ = r.ResponseError(c, http.StatusBadRequest, "error reading HTTP body", false, err, "", true)
_ = r.ResponseError(c, http.StatusBadRequest, "error reading HTTP body", false, err, true)
return
}
bulkerCfg := bulker.Config{}
destinationConfig := map[string]any{}
err = utils.ParseObject(body, &destinationConfig)
if err != nil {
_ = r.ResponseError(c, http.StatusUnprocessableEntity, "parse failed", false, err, "", true)
_ = r.ResponseError(c, http.StatusUnprocessableEntity, "parse failed", false, err, true)
return
} else {
r.Debugf("[test] parsed config for destination %s: %+v", utils.MapNVL(destinationConfig, "id", ""), destinationConfig)
Expand All @@ -427,7 +428,7 @@ func (r *Router) TestConnectionHandler(c *gin.Context) {
if b != nil {
_ = b.Close()
}
_ = r.ResponseError(c, http.StatusUnprocessableEntity, "error creating bulker", false, err, "", true)
_ = r.ResponseError(c, http.StatusUnprocessableEntity, "error creating bulker", false, err, true)
return
}
_ = b.Close()
Expand Down
Loading

0 comments on commit ef0e7df

Please sign in to comment.