Skip to content

Commit

Permalink
runtime cancelation
Browse files Browse the repository at this point in the history
  • Loading branch information
anthdm committed Dec 30, 2023
1 parent 6b2282f commit 43b9e3e
Show file tree
Hide file tree
Showing 15 changed files with 270 additions and 55 deletions.
30 changes: 24 additions & 6 deletions cmd/ffaas/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@ import (
"github.com/anthdm/ffaas/pkg/types"
"github.com/anthdm/ffaas/pkg/version"
"github.com/anthdm/ffaas/pkg/wasmhttp"
"github.com/anthdm/hollywood/actor"
"github.com/google/uuid"
"github.com/tetratelabs/wazero"
)

func main() {
var (
memstore = storage.NewMemoryStore()
modCache = storage.NewDefaultModCache()
configFile string
seed bool
memstore = storage.NewMemoryStore()
modCache = storage.NewDefaultModCache()
metricStore = storage.NewMemoryMetricStore()
configFile string
seed bool
)
flagSet := flag.NewFlagSet("ffaas", flag.ExitOnError)
flagSet.StringVar(&configFile, "config", "ffaas.toml", "")
Expand All @@ -43,13 +45,29 @@ func main() {
fmt.Println(banner())
fmt.Println("The opensource faas platform powered by WASM")
fmt.Println()
server := api.NewServer(memstore, modCache)
server := api.NewServer(memstore, metricStore, modCache)
go func() {
fmt.Printf("api server running\t%s\n", config.GetApiUrl())
log.Fatal(server.Listen(config.Get().APIServerAddr))
}()

wasmServer := wasmhttp.NewServer(config.Get().WASMServerAddr, memstore, modCache)
engine, err := actor.NewEngine(nil)
if err != nil {
log.Fatal(err)
}

// eventPID := engine.SpawnFunc(func(c *actor.Context) {
// switch msg := c.Message().(type) {
// case actor.ActorInitializedEvent:
// if strings.Contains(msg.PID.String(), "runtime") {
// fmt.Println("got this", msg.PID)
// engine.Stop(msg.PID)
// }
// }
// }, "event")
// engine.Subscribe(eventPID)

wasmServer := wasmhttp.NewServer(config.Get().WASMServerAddr, engine, memstore, metricStore, modCache)
fmt.Printf("wasm server running\t%s\n", config.GetWasmUrl())
log.Fatal(wasmServer.Listen())
}
Expand Down
Binary file modified examples/go/app.wasm
Binary file not shown.
5 changes: 4 additions & 1 deletion examples/go/main.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package main

import (
"fmt"
"math/rand"
"net/http"

ffaas "github.com/anthdm/ffaas/sdk"
)

func myHandler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(r.URL.String()))
num := rand.Intn(100)
w.Write([]byte(fmt.Sprintf("from my application: %d", num)))
}

func main() {
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
)

require (
github.com/anthdm/hollywood v0.0.0-20231229180503-ac25f94bfd67 // indirect
github.com/anthdm/hollywood v0.0.0-20231230110106-87b55a8811e9 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
google.golang.org/protobuf v1.30.0 // indirect
Expand All @@ -21,7 +21,6 @@ require (
require (
github.com/pelletier/go-toml/v2 v2.1.1
github.com/stealthrocket/wazergo v0.19.1 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.14.0 // indirect
Expand Down
10 changes: 8 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
github.com/anthdm/hollywood v0.0.0-20231229180503-ac25f94bfd67 h1:QhQCDsP77OF/PA8C9GPwpsQa+gGDCdZQ3KJuLhMgtNs=
github.com/anthdm/hollywood v0.0.0-20231229180503-ac25f94bfd67/go.mod h1:JdJPske9LzDx/pkXYEiYUaabofUZ995dM5cS0iSUtP8=
github.com/anthdm/hollywood v0.0.0-20231230110106-87b55a8811e9 h1:mdjfpzD+eotubcfrmMNfhZtmNh4zs8BobWPjpm0yA70=
github.com/anthdm/hollywood v0.0.0-20231230110106-87b55a8811e9/go.mod h1:JdJPske9LzDx/pkXYEiYUaabofUZ995dM5cS0iSUtP8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi/v5 v5.0.11 h1:BnpYbFZ3T3S1WMpD79r7R5ThWX40TaFB7L31Y8xqSwA=
github.com/go-chi/chi/v5 v5.0.11/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down Expand Up @@ -34,14 +37,17 @@ github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IU
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc=
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611 h1:qCEDpW1G+vcj3Y7Fy52pEM1AWm3abj8WimGYejI3SC4=
golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
Expand Down
32 changes: 32 additions & 0 deletions pkg/act/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package act

import (
"log/slog"

"github.com/anthdm/ffaas/pkg/storage"
"github.com/anthdm/ffaas/pkg/types"
"github.com/anthdm/hollywood/actor"
)

type Metric struct {
store storage.MetricStore
}

func NewMetric(store storage.MetricStore) actor.Producer {
return func() actor.Receiver {
return &Metric{
store: store,
}
}
}

func (m *Metric) Receive(c *actor.Context) {
switch msg := c.Message().(type) {
case actor.Started:
case actor.Stopped:
case *types.RuntimeMetric:
if err := m.store.CreateRuntimeMetric(msg); err != nil {
slog.Warn("failed to store RuntimeMetric", "err", err)
}
}
}
77 changes: 54 additions & 23 deletions pkg/act/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,84 @@ package act

import (
"context"
"fmt"
"net/http"
"time"

"github.com/anthdm/ffaas/pkg/runtime"
"github.com/anthdm/ffaas/pkg/storage"
"github.com/anthdm/ffaas/pkg/types"
"github.com/anthdm/hollywood/actor"
"github.com/google/uuid"
)

const KindRuntime = "runtime"

type Runtime struct {
args runtime.Args
w http.ResponseWriter
started time.Time
uptime time.Duration
terminateFn func()
args runtime.Args
w http.ResponseWriter
endpointID uuid.UUID
deployID uuid.UUID
store storage.MetricStore
path string
started time.Time
cancel context.CancelFunc
}

func NewRuntime(w http.ResponseWriter, args runtime.Args) actor.Producer {
func NewRuntime(w http.ResponseWriter,
args runtime.Args,
endpointID uuid.UUID,
deployID uuid.UUID,
store storage.MetricStore,
path string,
cancel context.CancelFunc) actor.Producer {
return func() actor.Receiver {
return &Runtime{
args: args,
w: w,
args: args,
w: w,
path: path,
store: store,
deployID: deployID,
endpointID: endpointID,
cancel: cancel,
}
}
}

func (r *Runtime) Receive(c *actor.Context) {
switch c.Message().(type) {
case actor.Started:
r.start(c)
c.Engine().Poison(c.PID())
r.terminateFn = func() {
c.Engine().Poison(c.PID())
}
r.exec()
case actor.Stopped:
r.uptime = time.Since(r.started)
fmt.Println("stopped uptime", r.uptime)
r.cancel()
metric := types.RuntimeMetric{
ID: uuid.New(),
StartTime: r.started,
Duration: time.Since(r.started),
EndpointID: r.endpointID,
DeployID: r.deployID,
RequestURL: r.path,
}
r.store.CreateRuntimeMetric(&metric)
}
}

func (r *Runtime) start(c *actor.Context) {
func (r *Runtime) exec() {
r.started = time.Now()
if err := runtime.Run(context.Background(), r.args); err != nil {
r.w.WriteHeader(http.StatusInternalServerError)
r.w.Write([]byte(err.Error()))
return
}
if _, err := r.args.RequestPlugin.WriteResponse(r.w); err != nil {
r.w.WriteHeader(http.StatusInternalServerError)
r.w.Write([]byte(err.Error()))
return
}
go func() {
defer r.terminateFn()
if err := runtime.Run(context.Background(), r.args); err != nil {
r.w.WriteHeader(http.StatusInternalServerError)
r.w.Write([]byte(err.Error()))
return
}
if _, err := r.args.RequestPlugin.WriteResponse(r.w); err != nil {
r.w.WriteHeader(http.StatusInternalServerError)
r.w.Write([]byte(err.Error()))
return
}
}()
}
39 changes: 39 additions & 0 deletions pkg/act/runtime_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package act

import (
"fmt"

"github.com/anthdm/hollywood/actor"
)

const (
KindRuntimeManager = "runtime_manager"
RuntimeManagerID = "rtm"
)

type RuntimeStart struct {
pid *actor.PID
}

type RuntimeManager struct {
runtimes map[string]*actor.PID
}

func NewRuntimeManager() actor.Producer {
return func() actor.Receiver {
return &RuntimeManager{
runtimes: make(map[string]*actor.PID),
}
}
}

func (rm *RuntimeManager) Receive(c *actor.Context) {
switch msg := c.Message().(type) {
case RuntimeStart:
fmt.Println("need to start", msg)
// pid := s.engine.Spawn(act.NewRuntime(w, args, endpointID, deploy.ID, s.metricStore, requestURL), act.KindRuntime)
case actor.Started:
case actor.Stopped:

}
}
27 changes: 21 additions & 6 deletions pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@ import (

// Server serves the public ffaas API.
type Server struct {
router *chi.Mux
store storage.Store
cache storage.ModCacher
router *chi.Mux
store storage.Store
metricStore storage.MetricStore
cache storage.ModCacher
}

// NewServer returns a new server given a Store interface.
func NewServer(store storage.Store, cache storage.ModCacher) *Server {
func NewServer(store storage.Store, metricStore storage.MetricStore, cache storage.ModCacher) *Server {
return &Server{
store: store,
cache: cache,
store: store,
cache: cache,
metricStore: metricStore,
}
}

Expand All @@ -38,6 +40,7 @@ func (s *Server) initRouter() {
s.router = chi.NewRouter()
s.router.Get("/status", handleStatus)
s.router.Get("/endpoint/{id}", makeAPIHandler(s.handleGetEndpoint))
s.router.Get("/endpoint/{id}/metrics", makeAPIHandler(s.handleGetEndpointMetrics))
s.router.Post("/endpoint", makeAPIHandler(s.handleCreateEndpoint))
s.router.Post("/endpoint/{id}/deploy", makeAPIHandler(s.handleCreateDeploy))
s.router.Post("/endpoint/{id}/rollback", makeAPIHandler(s.handleCreateRollback))
Expand Down Expand Up @@ -171,3 +174,15 @@ func (s *Server) handleCreateRollback(w http.ResponseWriter, r *http.Request) er

return writeJSON(w, http.StatusOK, map[string]any{"deploy": deploy.ID})
}

func (s *Server) handleGetEndpointMetrics(w http.ResponseWriter, r *http.Request) error {
endpointID, err := uuid.Parse(chi.URLParam(r, "id"))
if err != nil {
return writeJSON(w, http.StatusBadRequest, ErrorResponse(err))
}
metrics, err := s.metricStore.GetRuntimeMetrics(endpointID)
if err != nil {
return writeJSON(w, http.StatusNotFound, ErrorResponse(err))
}
return writeJSON(w, http.StatusOK, metrics)
}
1 change: 0 additions & 1 deletion pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func Run(ctx context.Context, args Args) error {
config := wazero.NewRuntimeConfig().WithCompilationCache(args.Cache)
runtime := wazero.NewRuntimeWithConfig(ctx, config)
defer runtime.Close(ctx)

if err := args.RequestPlugin.Instanciate(ctx, runtime); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 43b9e3e

Please sign in to comment.