Skip to content

Commit

Permalink
ingest: metrics endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Dec 14, 2023
1 parent 26a448b commit 3cce144
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 0 deletions.
3 changes: 3 additions & 0 deletions ingest/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Context struct {
producer *kafkabase.Producer
eventsLogService eventslog.EventsLogService
server *http.Server
metricsServer *MetricsServer
backupsLogger *BackupLogger
}

Expand Down Expand Up @@ -67,6 +68,7 @@ func (a *Context) InitContext(settings *appbase.AppSettings) error {
ReadHeaderTimeout: time.Second * 60,
IdleTimeout: time.Second * 65,
}
a.metricsServer = NewMetricsServer(a.config)
return nil
}

Expand All @@ -77,6 +79,7 @@ func (a *Context) Cleanup() error {
logging.Infof("Waiting %d seconds before http server shutdown...", a.config.ShutdownExtraDelay)
time.Sleep(time.Duration(a.config.ShutdownExtraDelay) * time.Second)
}
_ = a.metricsServer.Stop()
_ = a.eventsLogService.Close()
a.repository.Close()
a.dbpool.Close()
Expand Down
2 changes: 2 additions & 0 deletions ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Config struct {
RotorURL string `mapstructure:"ROTOR_URL"`
DeviceFunctionsTimeoutMs int `mapstructure:"DEVICE_FUNCTIONS_TIMEOUT_MS" default:"200"`

MetricsPort int `mapstructure:"METRICS_PORT" default:"9091"`

// # GRACEFUL SHUTDOWN
//Timeout that give running batch tasks time to finish during shutdown.
ShutdownTimeoutSec int `mapstructure:"SHUTDOWN_TIMEOUT_SEC" default:"10"`
Expand Down
48 changes: 48 additions & 0 deletions ingest/metrics_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package main

import (
"context"
"fmt"
"github.com/gin-gonic/gin"
"github.com/jitsucom/bulker/jitsubase/appbase"
"github.com/jitsucom/bulker/jitsubase/safego"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
"time"
)

type MetricsServer struct {
appbase.Service
server *http.Server
}

func NewMetricsServer(appconfig *Config) *MetricsServer {
base := appbase.NewServiceBase("metrics_server")
engine := gin.New()
engine.Use(gin.Recovery())
//expose prometheus metrics
engine.GET("/metrics", gin.WrapH(promhttp.Handler()))

server := &http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", appconfig.MetricsPort),
Handler: engine,
ReadTimeout: time.Second * 60,
ReadHeaderTimeout: time.Second * 60,
IdleTimeout: time.Second * 65,
}
m := &MetricsServer{Service: base, server: server}
m.start()
return m
}

func (s *MetricsServer) start() {
safego.RunWithRestart(func() {
s.Infof("Starting metrics server on %s", s.server.Addr)
s.Infof("%v", s.server.ListenAndServe())
})
}

func (s *MetricsServer) Stop() error {
s.Infof("Stopping metrics server")
return s.server.Shutdown(context.Background())
}

0 comments on commit 3cce144

Please sign in to comment.