Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't stream certain in-memory io.Readers #44

Merged
merged 3 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build-examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
run: |
for i in _examples/*/; do
echo ${GITHUB_WORKSPACE}/$i
cd ${GITHUB_WORKSPACE}/$i && tinygo build -target=wasi
cd ${GITHUB_WORKSPACE}/$i && tinygo build -target=wasi -tags fastlyinternaldebug
done
build-examples-go:
strategy:
Expand All @@ -45,5 +45,5 @@ jobs:
run: |
for i in _examples/*/; do
echo ${GITHUB_WORKSPACE}/$i
cd ${GITHUB_WORKSPACE}/$i && env GOARCH=wasm GOOS=wasip1 go build
cd ${GITHUB_WORKSPACE}/$i && env GOARCH=wasm GOOS=wasip1 go build -tags fastlyinternaldebug
done
4 changes: 2 additions & 2 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
fastly version
viceroy --version
- name: Run Integration Tests
run: RUST_LOG="viceroy=info,viceroy-lib=info" tinygo test -v -target=fastly-compute.json ./integration_tests/...
run: RUST_LOG="viceroy=info,viceroy-lib=info" tinygo test -v -target=fastly-compute.json -tags fastlyinternaldebug ./integration_tests/...
integration-tests-go:
runs-on: ubuntu-latest
steps:
Expand All @@ -68,4 +68,4 @@ jobs:
fastly version
viceroy --version
- name: Run Integration Tests
run: RUST_LOG="viceroy=info,viceroy-lib=info" GOARCH=wasm GOOS=wasip1 go test -exec "viceroy run -C fastly.toml" -v ./integration_tests/...
run: RUST_LOG="viceroy=info,viceroy-lib=info" GOARCH=wasm GOOS=wasip1 go test -tags fastlyinternaldebug -exec "viceroy run -C fastly.toml" -v ./integration_tests/...
148 changes: 89 additions & 59 deletions fsthttp/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
package fsthttp

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/url"
"strings"
"time"

"github.com/fastly/compute-sdk-go/internal/abi/fastly"
Expand Down Expand Up @@ -290,49 +292,59 @@ func (req *Request) AddCookie(c *Cookie) {
// capabilities, and is recommended for most users who need to cache
// HTTP responses.
func (req *Request) Send(ctx context.Context, backend string) (*Response, error) {
if req.sent {
return nil, fmt.Errorf("request already sent")
}

if req.abi.req == nil && req.abi.body == nil {
// abi request not yet constructed
if err := req.constructABIRequest(); err != nil {
return nil, err
}
}

abiPending, abiReqBody, err := req.sendABIRequestAsync(backend)
if err != nil {
status, ok := fastly.IsFastlyError(err)
if !ok {
return nil, err
}

if status == fastly.FastlyStatusInval {
return nil, ErrBackendNotFound
}
// When the request's ManualFramingMode is false, SendAsyncStreaming
// streams the request body to the backend using "Transfer-Encoding:
// chunked". SendAsync buffers the entire body and sends it with a
// "Content-Length" header.
//
// For requests without a body, we want to avoid unnecessary chunked
// encoding, and have observed servers that error when seeing it in
// certain contexts.
//
// For requests where the body is an io.Reader implementer where the
// size is known in advance, we want to send that along with a
// Content-Length as well. Those types are *bytes.Buffer,
// *bytes.Reader, and *strings.Reader.
//
// For all other requests, we stream with chunked encoding.
var (
abiPending *fastly.PendingRequest
err error
streaming bool = true
errc = make(chan error, 3) // needs to be buffered to the max number of writes in copyBody()
)

return nil, err
switch underlyingReaderFrom(req.Body).(type) {
case nil, *bytes.Buffer, *bytes.Reader, *strings.Reader:
joeshaw marked this conversation as resolved.
Show resolved Hide resolved
streaming = false
}

var (
errc = make(chan error, 3)
bodyExists = req.Body != nil
_, bodyIsABI = req.Body.(*fastly.HTTPBody)
shouldCopy = bodyExists && !bodyIsABI
)
req.sent = true

if shouldCopy {
go func() {
_, copyErr := io.Copy(abiReqBody, req.Body)
errc <- maybeWrap(copyErr, "copy body")
errc <- maybeWrap(req.Body.Close(), "close user body")
if copyErr == nil {
// If there was an error copying the body, we *don't* want to Close() the abi req.
// This tells the wasm server that the body is incomplete so it knows not to
// terminate the sent chunked body with a valid final chunk.
// Thus, only Close if copyErr == nil.
errc <- maybeWrap(abiReqBody.Close(), "close request body")
}
}()
if streaming {
go req.copyBody(errc)
abiPending, err = req.abi.req.SendAsyncStreaming(req.abi.body, backend)
} else {
errc <- maybeWrap(abiReqBody.Close(), "close request body")
req.copyBody(errc)
abiPending, err = req.abi.req.SendAsync(req.abi.body, backend)
}
if err != nil {
if status, ok := fastly.IsFastlyError(err); ok && status == fastly.FastlyStatusInval {
return nil, ErrBackendNotFound
}

return nil, fmt.Errorf("begin send: %w", err)
}

pollInterval := safePollInterval(req.SendPollInterval)
Expand Down Expand Up @@ -371,6 +383,27 @@ func (req *Request) Send(ctx context.Context, backend string) (*Response, error)
return resp, nil
}

func (req *Request) copyBody(errc chan<- error) {
var (
bodyExists = req.Body != nil
_, bodyIsABI = req.Body.(*fastly.HTTPBody)
shouldCopy = bodyExists && !bodyIsABI
)

if shouldCopy {
_, copyErr := io.Copy(req.abi.body, req.Body)
errc <- maybeWrap(copyErr, "copy body")
errc <- maybeWrap(req.Body.Close(), "close user body")
if copyErr == nil {
errc <- maybeWrap(req.abi.body.Close(), "close request body")
} else {
errc <- maybeWrap(req.abi.body.Abandon(), "abandon request body")
}
} else {
errc <- maybeWrap(req.abi.body.Close(), "close request body")
}
}

func (req *Request) constructABIRequest() error {
if req.abi.req != nil || req.abi.body != nil {
return fmt.Errorf("request already constructed")
Expand Down Expand Up @@ -419,33 +452,6 @@ func (req *Request) constructABIRequest() error {
return nil
}

func (req *Request) sendABIRequestAsync(backend string) (*fastly.PendingRequest, *fastly.HTTPBody, error) {
if req.sent {
return nil, nil, fmt.Errorf("request already sent")
}

// When the request's ManualFramingMode is false, SendAsyncStreaming
// streams the request body to the backend using "Transfer-Encoding:
// chunked". This is the default behavior we want for requests with
// a body. For requests without a body, we want to avoid
// unnecessary chunked encoding, and have observed servers that
// error when seeing it in certain contexts. Calling SendAsync
// instead causes the entire body to be buffered (in this case, zero
// bytes) and "Content-Length: 0" to be sent instead.
sendFn := req.abi.req.SendAsyncStreaming
if req.Body == nil {
sendFn = req.abi.req.SendAsync
}

abiPending, err := sendFn(req.abi.body, backend)
if err != nil {
return nil, nil, fmt.Errorf("begin send: %w", err)
}

req.sent = true
return abiPending, req.abi.body, nil
}

// CacheOptions control caching behavior for outgoing requests.
type CacheOptions struct {
// Pass controls whether or not the request should be cached at all. By
Expand Down Expand Up @@ -511,6 +517,30 @@ type DecompressResponseOptions struct {
Gzip bool
}

// nopCloser is functionally the same as io.NopCloser, except that we
// can get to the underlying io.Reader.
type nopCloser struct {
io.Reader
}

func (nopCloser) Close() error { return nil }

func (n nopCloser) reader() io.Reader {
return n.Reader
}

func underlyingReaderFrom(rc io.ReadCloser) io.Reader {
if rc == nil {
return nil
}

if nc, ok := rc.(nopCloser); ok {
return nc.reader()
}

return rc.(io.Reader)
}

func abiBodyFrom(rc io.ReadCloser) (*fastly.HTTPBody, error) {
b, ok := rc.(*fastly.HTTPBody)
if ok {
Expand All @@ -534,7 +564,7 @@ func makeBodyFor(r io.Reader) io.ReadCloser {
return b
}

return io.NopCloser(r)
return nopCloser{r}
}

func safePollInterval(d time.Duration) time.Duration {
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/request_upstream/fastly.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ service_id = ""

[local_server.backends.example_backend]
url = "https://example.org/"

[local_server.backends.httpme]
url = "https://http-me.glitch.me"
84 changes: 84 additions & 0 deletions integration_tests/request_upstream/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@
package main

import (
"bytes"
"context"
"encoding/json"
"io"
"strconv"
"strings"
"testing"

"github.com/fastly/compute-sdk-go/fsthttp"
"github.com/fastly/compute-sdk-go/fsttest"
"github.com/fastly/compute-sdk-go/internal/abi/fastly"
)

func TestRequestUpstream(t *testing.T) {
Expand Down Expand Up @@ -76,3 +81,82 @@ func requestUpstream(useAppend bool, t *testing.T) {
t.Errorf("Body = %q, want %q", got, want)
}
}

const bodySize = 64 * 1024

func TestRequestUpstreamBody(t *testing.T) {
body := make([]byte, bodySize)
for i := range body {
body[i] = byte(i)
}

b, err := fastly.NewHTTPBody()
if err != nil {
t.Fatalf("NewHTTPBody: %v", err)
}
_, err = b.Write(body)
if err != nil {
t.Fatalf("Write: %v", err)
}
if err := b.Close(); err != nil {
t.Fatalf("Close: %v", err)
}

testcases := []struct {
name string
body io.Reader
size int
chunked bool
}{
{name: "nil", body: nil},
{name: "bytes.Reader", body: bytes.NewReader(body), size: bodySize},
{name: "bytes.Buffer", body: bytes.NewBuffer(body), size: bodySize},
{name: "strings.Reader", body: strings.NewReader(string(body)), size: bodySize},
{name: "io.NopCloser", body: io.NopCloser(bytes.NewReader(body)), chunked: true},
{name: "fastly.HTTPBody", body: b, chunked: true},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
requestUpstreamBody(t, tc.body, tc.size, tc.chunked)
})
}
}

func requestUpstreamBody(t *testing.T, body io.Reader, size int, chunked bool) {
req, err := fsthttp.NewRequest("POST", "https://http-me.glitch.me/?anything", body)
if err != nil {
t.Fatalf("NewRequest: %v", err)
}

req.Header.Set("Content-Type", "application/octet-stream")
req.CacheOptions.Pass = true

resp, err := req.Send(context.Background(), "httpme")
if err != nil {
t.Fatalf("Send: %v", err)
}
defer resp.Body.Close()

var respData struct {
Headers map[string]string `json:"headers"`
}

if err := json.NewDecoder(resp.Body).Decode(&respData); err != nil {
t.Fatalf("Decode: %v", err)
}

var teWant, clWant string
if chunked {
teWant = "chunked"
} else {
clWant = strconv.Itoa(size)
}

if got, want := respData.Headers["transfer-encoding"], teWant; got != want {
t.Errorf("Header[transfer-encoding] = %q, want %q", got, want)
}
if got, want := respData.Headers["content-length"], clWant; got != want {
t.Errorf("Header[content-length] = %q, want %q", got, want)
}
}