Skip to content

Commit

Permalink
ingest: fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Dec 14, 2023
1 parent 5d67e13 commit 26a448b
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
2 changes: 1 addition & 1 deletion ingest/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (r *Repository) refresh() {
streamsByIds := map[string]*StreamWithDestinations{}
streamsByDomains := map[string][]*StreamWithDestinations{}
defer func() {
r.Infof("Refreshed in %v", time.Now().Sub(start))
r.Debugf("Refreshed in %v", time.Now().Sub(start))
}()
rows, err := r.dbpool.Query(context.Background(), SQLQuery)
if err != nil {
Expand Down
16 changes: 12 additions & 4 deletions ingest/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,11 @@ func (r *Router) BatchHandler(c *gin.Context) {
IngestHandlerRequests(domain, "error", rError.ErrorType).Inc()
}
}()

defer func() {
if rerr := recover(); rerr != nil {
rError = r.ResponseError(c, http.StatusInternalServerError, "panic", true, fmt.Errorf("%v", rerr), "", true)
}
}()
if !strings.HasSuffix(c.ContentType(), "application/json") && !strings.HasSuffix(c.ContentType(), "text/plain") {
rError = r.ResponseError(c, http.StatusBadRequest, "invalid content type", false, fmt.Errorf("%s. Expected: application/json", c.ContentType()), "", true)
return
Expand Down Expand Up @@ -300,6 +304,11 @@ func (r *Router) IngestHandler(c *gin.Context) {
IngestHandlerRequests(domain, "success", "").Inc()
}
}()
defer func() {
if rerr := recover(); rerr != nil {
rError = r.ResponseError(c, http.StatusInternalServerError, "panic", true, fmt.Errorf("%v", rerr), "", true)
}
}()
if !strings.HasSuffix(c.ContentType(), "application/json") && !strings.HasSuffix(c.ContentType(), "text/plain") {
rError = r.ResponseError(c, http.StatusBadRequest, "invalid content type", false, fmt.Errorf("%s. Expected: application/json", c.ContentType()), "", true)
return
Expand All @@ -321,7 +330,7 @@ func (r *Router) IngestHandler(c *gin.Context) {
rError = r.ResponseError(c, http.StatusOK, "error parsing message", false, fmt.Errorf("%v: %s", err, string(body)), "", true)
return
}
messageId := message["messageId"].(string)
messageId, _ := message["messageId"].(string)
loc, err := r.getDataLocator(c, &message, ingestType)
if err != nil {
rError = r.ResponseError(c, http.StatusOK, "error processing message", false, fmt.Errorf("%v: %s", err, string(body)), "", true)
Expand Down Expand Up @@ -441,7 +450,7 @@ func patchEvent(c *gin.Context, event *AnalyticsServerEvent, tp string, ingestTy
ctx["locale"] = strings.TrimSpace(strings.Split(c.GetHeader("Accept-Language"), ",")[0])
}
ev["context"] = ctx
nowIsoDate := time.Now().Format(time.RFC3339)
nowIsoDate := time.Now().Format(time.RFC3339Nano)
ev["receivedAt"] = nowIsoDate
ev["type"] = typeFixed
if _, ok = ev["timestamp"]; !ok {
Expand Down Expand Up @@ -469,7 +478,6 @@ func (r *Router) getDataLocator(c *gin.Context, event *AnalyticsServerEvent, ing
cred.WriteKey = c.GetHeader("X-Write-Key")
}
host := strings.Split(c.Request.Host, ":")[0]
r.Infof("Host: %s Datahost: %s", host, dataHost)
if dataHost != "" && strings.HasSuffix(host, "."+dataHost) {
cred.Slug = strings.TrimSuffix(host, "."+dataHost)
} else {
Expand Down

0 comments on commit 26a448b

Please sign in to comment.