diff --git a/pkg/actrs/runtime.go b/pkg/actrs/runtime.go index 9f58cf8..890d9be 100644 --- a/pkg/actrs/runtime.go +++ b/pkg/actrs/runtime.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "net/http" "github.com/anthdm/ffaas/pkg/storage" "github.com/anthdm/ffaas/proto" @@ -46,7 +47,13 @@ func (r *Runtime) Receive(c *actor.Context) { } cache := wazero.NewCompilationCache() r.exec(context.TODO(), deploy.Blob, cache, endpoint.Environment) - c.Respond(&proto.HTTPResponse{Response: []byte("hello")}) + resp := &proto.HTTPResponse{ + Response: []byte("hello"), + RequestID: msg.ID, + StatusCode: http.StatusOK, + } + c.Respond(resp) + c.Engine().Poison(c.PID()) } } diff --git a/pkg/actrs/wasmserver.go b/pkg/actrs/wasmserver.go index 69f1e4f..c4f0b75 100644 --- a/pkg/actrs/wasmserver.go +++ b/pkg/actrs/wasmserver.go @@ -1,8 +1,6 @@ package actrs import ( - "context" - "fmt" "io" "log" "net/http" @@ -17,9 +15,16 @@ import ( const KindWasmServer = "wasm_server" -type requestWithCancel struct { - request *proto.HTTPRequest - cancel context.CancelFunc +type requestWithResponse struct { + request *proto.HTTPRequest + response chan *proto.HTTPResponse +} + +func newRequestWithResponse(request *proto.HTTPRequest) requestWithResponse { + return requestWithResponse{ + request: request, + response: make(chan *proto.HTTPResponse, 1), + } } // WasmServer is an HTTP server that will proxy and route the request to the corresponding function. @@ -30,8 +35,7 @@ type WasmServer struct { metricStore storage.MetricStore cache storage.ModCacher cluster *cluster.Cluster - - requests map[string]context.CancelFunc + responses map[string]chan *proto.HTTPResponse } // NewWasmServer return a new wasm server given a storage and a mod cache. @@ -42,7 +46,7 @@ func NewWasmServer(addr string, cluster *cluster.Cluster, store storage.Store, m metricStore: metricStore, cache: cache, cluster: cluster, - requests: make(map[string]context.CancelFunc), + responses: make(map[string]chan *proto.HTTPResponse), } server := &http.Server{ Handler: s, @@ -58,15 +62,14 @@ func (s *WasmServer) Receive(c *actor.Context) { case actor.Started: s.initialize(c) case actor.Stopped: - case requestWithCancel: - s.requests[msg.request.ID] = msg.cancel - fmt.Println("got from myself", msg) + case requestWithResponse: + s.responses[msg.request.ID] = msg.response s.sendRequestToRuntime(msg.request) case *proto.HTTPResponse: - fmt.Println("received resposne from runtime", msg) - // if cancel, ok := s.requests[msg.RequestID]; ok { - // cancel() - // } + if resp, ok := s.responses[msg.RequestID]; ok { + resp <- msg + delete(s.responses, msg.RequestID) + } } } @@ -110,12 +113,13 @@ func (s *WasmServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } req.EndpointID = endpointID.String() - ctx, cancel := context.WithCancel(r.Context()) - s.cluster.Engine().Send(s.self, requestWithCancel{ - request: req, - cancel: cancel, - }) - <-ctx.Done() + reqres := newRequestWithResponse(req) + + s.cluster.Engine().Send(s.self, reqres) + + resp := <-reqres.response + w.WriteHeader(int(resp.StatusCode)) + w.Write(resp.Response) } func makeProtoRequest(r *http.Request) (*proto.HTTPRequest, error) {