From 35d425e41a25b613fe2ea597d521d4dbd562e428 Mon Sep 17 00:00:00 2001 From: Lanre Adelowo Date: Sun, 21 Jan 2024 00:13:48 +0100 Subject: [PATCH] implement sse (#5) --- cmd/http.go | 8 +++++++- go.mod | 2 ++ go.sum | 5 +++++ internal/util/ip.go | 7 ++++++- server/httpd/http.go | 7 ++++++- server/httpd/url.go | 32 ++++++++++++++++++++++++++++++++ url.go | 3 +++ 7 files changed, 61 insertions(+), 3 deletions(-) diff --git a/cmd/http.go b/cmd/http.go index f72a29d..df5e96a 100644 --- a/cmd/http.go +++ b/cmd/http.go @@ -9,6 +9,7 @@ import ( "github.com/adelowo/sdump/config" "github.com/adelowo/sdump/datastore/postgres" "github.com/adelowo/sdump/server/httpd" + "github.com/r3labs/sse/v2" "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -50,7 +51,10 @@ func createHTTPCommand(cmd *cobra.Command, cfg *config.Config) { logger := logrus.WithField("host", hostName). WithField("module", "http.server") - httpServer := httpd.New(*cfg, urlStore, ingestStore, logger) + sseServer := sse.New() + + httpServer := httpd.New(*cfg, urlStore, ingestStore, + logger, sseServer) go func() { logger.Debug("starting HTTP server") @@ -69,6 +73,8 @@ func createHTTPCommand(cmd *cobra.Command, cfg *config.Config) { logger.WithError(err).Error("could not shut down http server") } + sseServer.Close() + return nil }, } diff --git a/go.mod b/go.mod index dd6025e..4a4a935 100644 --- a/go.mod +++ b/go.mod @@ -91,6 +91,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect + github.com/r3labs/sse/v2 v2.10.0 // indirect github.com/rivo/uniseg v0.4.4 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect @@ -128,6 +129,7 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect google.golang.org/grpc v1.59.0 // indirect google.golang.org/protobuf v1.31.0 // indirect + gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect mellium.im/sasl v0.3.1 // indirect diff --git a/go.sum b/go.sum index d756891..5bb70a5 100644 --- a/go.sum +++ b/go.sum @@ -230,6 +230,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= +github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0= +github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= @@ -354,6 +356,7 @@ golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= @@ -421,6 +424,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= +gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/internal/util/ip.go b/internal/util/ip.go index 15cab71..8064ddd 100644 --- a/internal/util/ip.go +++ b/internal/util/ip.go @@ -31,5 +31,10 @@ func GetIP(r *http.Request) net.IP { return net.ParseIP(ip) } - return net.ParseIP(r.RemoteAddr) + h, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + return net.IP{} + } + + return net.ParseIP(h) } diff --git a/server/httpd/http.go b/server/httpd/http.go index 97653f4..b7b6081 100644 --- a/server/httpd/http.go +++ b/server/httpd/http.go @@ -8,6 +8,7 @@ import ( "github.com/adelowo/sdump/config" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" + "github.com/r3labs/sse/v2" "github.com/sirupsen/logrus" ) @@ -15,9 +16,10 @@ func New(cfg config.Config, urlRepo sdump.URLRepository, ingestRepo sdump.IngestRepository, logger *logrus.Entry, + sseServer *sse.Server, ) *http.Server { return &http.Server{ - Handler: buildRoutes(cfg, logger, urlRepo, ingestRepo), + Handler: buildRoutes(cfg, logger, urlRepo, ingestRepo, sseServer), Addr: fmt.Sprintf(":%d", cfg.HTTP.Port), } } @@ -26,6 +28,7 @@ func buildRoutes(cfg config.Config, logger *logrus.Entry, urlRepo sdump.URLRepository, ingestRepo sdump.IngestRepository, + sseServer *sse.Server, ) http.Handler { router := chi.NewRouter() @@ -38,10 +41,12 @@ func buildRoutes(cfg config.Config, urlRepo: urlRepo, logger: logger, ingestRepo: ingestRepo, + sseServer: sseServer, } router.Post("/", urlHandler.create) router.Post("/{reference}", urlHandler.ingest) + router.Get("/events", sseServer.ServeHTTP) return router } diff --git a/server/httpd/url.go b/server/httpd/url.go index 198d0fa..fb6c54a 100644 --- a/server/httpd/url.go +++ b/server/httpd/url.go @@ -1,6 +1,8 @@ package httpd import ( + "bytes" + "encoding/json" "errors" "fmt" "io" @@ -12,6 +14,7 @@ import ( "github.com/adelowo/sdump/internal/util" "github.com/go-chi/chi/v5" "github.com/go-chi/render" + "github.com/r3labs/sse/v2" "github.com/sirupsen/logrus" ) @@ -20,6 +23,7 @@ type urlHandler struct { urlRepo sdump.URLRepository ingestRepo sdump.IngestRepository cfg config.Config + sseServer *sse.Server } func (u *urlHandler) create(w http.ResponseWriter, r *http.Request) { @@ -40,6 +44,10 @@ func (u *urlHandler) create(w http.ResponseWriter, r *http.Request) { return } + go func() { + _ = u.sseServer.CreateStream(endpoint.PubChannel()) + }() + _ = render.Render(w, r, &createdURLEndpointResponse{ APIStatus: newAPIStatus(http.StatusOK, "created url endpoint"), URL: struct { @@ -110,6 +118,30 @@ func (u *urlHandler) ingest(w http.ResponseWriter, r *http.Request) { return } + go func() { + if !u.sseServer.StreamExists(endpoint.PubChannel()) { + _ = u.sseServer.CreateStream(endpoint.PubChannel()) + } + + b := new(bytes.Buffer) + + var sseEvent struct { + Request sdump.RequestDefinition `json:"request"` + } + + sseEvent.Request = ingestedRequest.Request + + if err := json.NewEncoder(b).Encode(&sseEvent); err != nil { + logger.WithError(err).Error("could not format SSE event") + return + } + + u.sseServer.Publish(endpoint.PubChannel(), &sse.Event{ + ID: []byte(ingestedRequest.ID.String()), + Data: b.Bytes(), + }) + }() + _ = render.Render(w, r, newAPIStatus(http.StatusAccepted, "Request ingested")) } diff --git a/url.go b/url.go index 8df9f23..203bb45 100644 --- a/url.go +++ b/url.go @@ -2,6 +2,7 @@ package sdump import ( "context" + "fmt" "time" "github.com/google/uuid" @@ -33,6 +34,8 @@ type URLEndpoint struct { bun.BaseModel `bun:"table:urls"` } +func (u *URLEndpoint) PubChannel() string { return fmt.Sprintf("messages.%s", u.Reference) } + func NewURLEndpoint() *URLEndpoint { return &URLEndpoint{ Reference: xid.New().String(),