Skip to content

Commit

Permalink
working roundtrips
Browse files Browse the repository at this point in the history
  • Loading branch information
anthdm committed Dec 31, 2023
1 parent 857d486 commit bedac30
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 22 deletions.
9 changes: 8 additions & 1 deletion pkg/actrs/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"net/http"

"github.com/anthdm/ffaas/pkg/storage"
"github.com/anthdm/ffaas/proto"
Expand Down Expand Up @@ -46,7 +47,13 @@ func (r *Runtime) Receive(c *actor.Context) {
}
cache := wazero.NewCompilationCache()
r.exec(context.TODO(), deploy.Blob, cache, endpoint.Environment)
c.Respond(&proto.HTTPResponse{Response: []byte("hello")})
resp := &proto.HTTPResponse{
Response: []byte("hello"),
RequestID: msg.ID,
StatusCode: http.StatusOK,
}
c.Respond(resp)
c.Engine().Poison(c.PID())
}
}

Expand Down
46 changes: 25 additions & 21 deletions pkg/actrs/wasmserver.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package actrs

import (
"context"
"fmt"
"io"
"log"
"net/http"
Expand All @@ -17,9 +15,16 @@ import (

const KindWasmServer = "wasm_server"

type requestWithCancel struct {
request *proto.HTTPRequest
cancel context.CancelFunc
type requestWithResponse struct {
request *proto.HTTPRequest
response chan *proto.HTTPResponse
}

func newRequestWithResponse(request *proto.HTTPRequest) requestWithResponse {
return requestWithResponse{
request: request,
response: make(chan *proto.HTTPResponse, 1),
}
}

// WasmServer is an HTTP server that will proxy and route the request to the corresponding function.
Expand All @@ -30,8 +35,7 @@ type WasmServer struct {
metricStore storage.MetricStore
cache storage.ModCacher
cluster *cluster.Cluster

requests map[string]context.CancelFunc
responses map[string]chan *proto.HTTPResponse
}

// NewWasmServer return a new wasm server given a storage and a mod cache.
Expand All @@ -42,7 +46,7 @@ func NewWasmServer(addr string, cluster *cluster.Cluster, store storage.Store, m
metricStore: metricStore,
cache: cache,
cluster: cluster,
requests: make(map[string]context.CancelFunc),
responses: make(map[string]chan *proto.HTTPResponse),
}
server := &http.Server{
Handler: s,
Expand All @@ -58,15 +62,14 @@ func (s *WasmServer) Receive(c *actor.Context) {
case actor.Started:
s.initialize(c)
case actor.Stopped:
case requestWithCancel:
s.requests[msg.request.ID] = msg.cancel
fmt.Println("got from myself", msg)
case requestWithResponse:
s.responses[msg.request.ID] = msg.response
s.sendRequestToRuntime(msg.request)
case *proto.HTTPResponse:
fmt.Println("received resposne from runtime", msg)
// if cancel, ok := s.requests[msg.RequestID]; ok {
// cancel()
// }
if resp, ok := s.responses[msg.RequestID]; ok {
resp <- msg
delete(s.responses, msg.RequestID)
}
}
}

Expand Down Expand Up @@ -110,12 +113,13 @@ func (s *WasmServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
req.EndpointID = endpointID.String()
ctx, cancel := context.WithCancel(r.Context())
s.cluster.Engine().Send(s.self, requestWithCancel{
request: req,
cancel: cancel,
})
<-ctx.Done()
reqres := newRequestWithResponse(req)

s.cluster.Engine().Send(s.self, reqres)

resp := <-reqres.response
w.WriteHeader(int(resp.StatusCode))
w.Write(resp.Response)
}

func makeProtoRequest(r *http.Request) (*proto.HTTPRequest, error) {
Expand Down

0 comments on commit bedac30

Please sign in to comment.