Skip to content

Commit

Permalink
implement sse (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
adelowo authored Jan 20, 2024
1 parent e285b5c commit 35d425e
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 3 deletions.
8 changes: 7 additions & 1 deletion cmd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/adelowo/sdump/config"
"github.com/adelowo/sdump/datastore/postgres"
"github.com/adelowo/sdump/server/httpd"
"github.com/r3labs/sse/v2"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -50,7 +51,10 @@ func createHTTPCommand(cmd *cobra.Command, cfg *config.Config) {
logger := logrus.WithField("host", hostName).
WithField("module", "http.server")

httpServer := httpd.New(*cfg, urlStore, ingestStore, logger)
sseServer := sse.New()

httpServer := httpd.New(*cfg, urlStore, ingestStore,
logger, sseServer)

go func() {
logger.Debug("starting HTTP server")
Expand All @@ -69,6 +73,8 @@ func createHTTPCommand(cmd *cobra.Command, cfg *config.Config) {
logger.WithError(err).Error("could not shut down http server")
}

sseServer.Close()

return nil
},
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/r3labs/sse/v2 v2.10.0 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
Expand Down Expand Up @@ -128,6 +129,7 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
mellium.im/sasl v0.3.1 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0=
github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
Expand Down Expand Up @@ -354,6 +356,7 @@ golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
Expand Down Expand Up @@ -421,6 +424,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
Expand Down
7 changes: 6 additions & 1 deletion internal/util/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,10 @@ func GetIP(r *http.Request) net.IP {
return net.ParseIP(ip)
}

return net.ParseIP(r.RemoteAddr)
h, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
return net.IP{}
}

return net.ParseIP(h)
}
7 changes: 6 additions & 1 deletion server/httpd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ import (
"github.com/adelowo/sdump/config"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/r3labs/sse/v2"
"github.com/sirupsen/logrus"
)

func New(cfg config.Config,
urlRepo sdump.URLRepository,
ingestRepo sdump.IngestRepository,
logger *logrus.Entry,
sseServer *sse.Server,
) *http.Server {
return &http.Server{
Handler: buildRoutes(cfg, logger, urlRepo, ingestRepo),
Handler: buildRoutes(cfg, logger, urlRepo, ingestRepo, sseServer),
Addr: fmt.Sprintf(":%d", cfg.HTTP.Port),
}
}
Expand All @@ -26,6 +28,7 @@ func buildRoutes(cfg config.Config,
logger *logrus.Entry,
urlRepo sdump.URLRepository,
ingestRepo sdump.IngestRepository,
sseServer *sse.Server,
) http.Handler {
router := chi.NewRouter()

Expand All @@ -38,10 +41,12 @@ func buildRoutes(cfg config.Config,
urlRepo: urlRepo,
logger: logger,
ingestRepo: ingestRepo,
sseServer: sseServer,
}

router.Post("/", urlHandler.create)
router.Post("/{reference}", urlHandler.ingest)
router.Get("/events", sseServer.ServeHTTP)

return router
}
Expand Down
32 changes: 32 additions & 0 deletions server/httpd/url.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package httpd

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -12,6 +14,7 @@ import (
"github.com/adelowo/sdump/internal/util"
"github.com/go-chi/chi/v5"
"github.com/go-chi/render"
"github.com/r3labs/sse/v2"
"github.com/sirupsen/logrus"
)

Expand All @@ -20,6 +23,7 @@ type urlHandler struct {
urlRepo sdump.URLRepository
ingestRepo sdump.IngestRepository
cfg config.Config
sseServer *sse.Server
}

func (u *urlHandler) create(w http.ResponseWriter, r *http.Request) {
Expand All @@ -40,6 +44,10 @@ func (u *urlHandler) create(w http.ResponseWriter, r *http.Request) {
return
}

go func() {
_ = u.sseServer.CreateStream(endpoint.PubChannel())
}()

_ = render.Render(w, r, &createdURLEndpointResponse{
APIStatus: newAPIStatus(http.StatusOK, "created url endpoint"),
URL: struct {
Expand Down Expand Up @@ -110,6 +118,30 @@ func (u *urlHandler) ingest(w http.ResponseWriter, r *http.Request) {
return
}

go func() {
if !u.sseServer.StreamExists(endpoint.PubChannel()) {
_ = u.sseServer.CreateStream(endpoint.PubChannel())
}

b := new(bytes.Buffer)

var sseEvent struct {
Request sdump.RequestDefinition `json:"request"`
}

sseEvent.Request = ingestedRequest.Request

if err := json.NewEncoder(b).Encode(&sseEvent); err != nil {
logger.WithError(err).Error("could not format SSE event")
return
}

u.sseServer.Publish(endpoint.PubChannel(), &sse.Event{
ID: []byte(ingestedRequest.ID.String()),
Data: b.Bytes(),
})
}()

_ = render.Render(w, r, newAPIStatus(http.StatusAccepted,
"Request ingested"))
}
3 changes: 3 additions & 0 deletions url.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sdump

import (
"context"
"fmt"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -33,6 +34,8 @@ type URLEndpoint struct {
bun.BaseModel `bun:"table:urls"`
}

func (u *URLEndpoint) PubChannel() string { return fmt.Sprintf("messages.%s", u.Reference) }

func NewURLEndpoint() *URLEndpoint {
return &URLEndpoint{
Reference: xid.New().String(),
Expand Down

0 comments on commit 35d425e

Please sign in to comment.