Skip to content

Commit

Permalink
ingest: tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Dec 19, 2023
1 parent 6d3429d commit e65c9fa
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
4 changes: 2 additions & 2 deletions ingest/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (a *Context) InitContext(settings *appbase.AppSettings) error {
a.server = &http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", a.config.HTTPPort),
Handler: router.Engine(),
ReadTimeout: time.Second * 2,
ReadHeaderTimeout: time.Second * 2,
ReadTimeout: time.Second * 5,
ReadHeaderTimeout: time.Second * 5,
IdleTimeout: time.Second * 65,
}
a.metricsServer = NewMetricsServer(a.config)
Expand Down
10 changes: 5 additions & 5 deletions ingest/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/jitsucom/bulker/jitsubase/utils"
"github.com/jitsucom/bulker/jitsubase/uuid"
"github.com/jitsucom/bulker/kafkabase"
jsoniter "github.com/json-iterator/go"
"github.com/penglongli/gin-metrics/ginmetrics"
timeout "github.com/vearne/gin-timeout"
"io"
Expand Down Expand Up @@ -239,8 +238,9 @@ func (r *Router) BatchHandler(c *gin.Context) {
rError = r.ResponseError(c, http.StatusOK, "no destinations found for stream", false, fmt.Errorf(stream.Stream.Id), logPrefix, true)
return
}
err = jsoniter.NewDecoder(c.Request.Body).Decode(&payload)
err = json.NewDecoder(c.Request.Body).Decode(&payload)
if err != nil {
err = fmt.Errorf("Client Ip: %s: %v", utils.NvlString(c.GetHeader("X-Real-Ip"), c.GetHeader("X-Forwarded-For"), c.ClientIP()), err)
rError = r.ResponseError(c, http.StatusOK, "error parsing message", false, err, logPrefix, true)
return
}
Expand Down Expand Up @@ -353,11 +353,12 @@ func (r *Router) IngestHandler(c *gin.Context) {
tp := c.Param("tp")
body, err := io.ReadAll(c.Request.Body)
if err != nil {
err = fmt.Errorf("Client Ip: %s: %v", utils.NvlString(c.GetHeader("X-Real-Ip"), c.GetHeader("X-Forwarded-For"), c.ClientIP()), err)
rError = r.ResponseError(c, http.StatusOK, "error reading HTTP body", false, err, "", true)
return
}
message := AnalyticsServerEvent{}
err = jsoniter.Unmarshal(body, &message)
err = json.Unmarshal(body, &message)
if err != nil {
rError = r.ResponseError(c, http.StatusOK, "error parsing message", false, fmt.Errorf("%v: %s", err, string(body)), "", true)
return
Expand Down Expand Up @@ -417,7 +418,7 @@ func (r *Router) sendToBulker(c *gin.Context, message *AnalyticsServerEvent, ana
rError = r.ResponseError(c, http.StatusOK, "error building ingest message", false, err, logPrefix, sendResponse)
return
}
ingestMessageBytes, err = jsoniter.Marshal(ingestMessage)
ingestMessageBytes, err = json.Marshal(ingestMessage)
if err != nil {
rError = r.ResponseError(c, http.StatusOK, "error marshaling ingest message", false, err, logPrefix, sendResponse)
return
Expand Down Expand Up @@ -508,7 +509,6 @@ func (r *Router) getDataLocator(c *gin.Context, event *AnalyticsServerEvent, ing
cred.IngestType = ingestType
if c.GetHeader("Authorization") != "" {
wk := strings.Replace(c.GetHeader("Authorization"), "Basic ", "", 1)
fmt.Println(wk)
//decode base64
wkDecoded, err := base64.StdEncoding.DecodeString(wk)
if err != nil {
Expand Down

0 comments on commit e65c9fa

Please sign in to comment.