Skip to content

Commit

Permalink
url method forward (#4)
Browse files Browse the repository at this point in the history
* feat: support URL method forwarding & fix socket connection recycling

* fix: the path includes the root directory

* fix: cannot assign requested address
  • Loading branch information
AlanViast authored Oct 29, 2024
1 parent 605aa9c commit e3f9091
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 6 deletions.
49 changes: 44 additions & 5 deletions pkg/filters/proxies/providerproxy/providerproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ import (
"errors"
"net/http"
"net/url"
"strings"

"github.com/megaease/easegress/v2/pkg/context"
"github.com/megaease/easegress/v2/pkg/filters"
proxy "github.com/megaease/easegress/v2/pkg/filters/proxies/httpproxy"
"github.com/megaease/easegress/v2/pkg/filters/proxies/providerproxy/selector"
"github.com/megaease/easegress/v2/pkg/logger"
"github.com/megaease/easegress/v2/pkg/protocols/httpprot"
"github.com/megaease/easegress/v2/pkg/supervisor"
"github.com/megaease/easegress/v2/pkg/util/fasttime"
"github.com/megaease/easegress/v2/pkg/util/readers"
)

const (
Expand All @@ -53,6 +56,10 @@ type (
Interval string `yaml:"interval,omitempty" jsonschema:"format=duration"`
Lag uint64 `yaml:"lag,omitempty" jsonschema:"default=100"`
Policy string `yaml:"policy,omitempty" jsonschema:"default=roundRobin"`

MaxIdleConns int `json:"maxIdleConns,omitempty"`
MaxIdleConnsPerHost int `json:"maxIdleConnsPerHost,omitempty"`
MaxRedirection int `json:"maxRedirection,omitempty"`
}
)

Expand Down Expand Up @@ -100,6 +107,20 @@ func (m *ProviderProxy) ParsePayloadMethod(payload []byte) []string {
return methods
}

func (m *ProviderProxy) HandleRequest(req *httpprot.Request, providerUrl *url.URL) (forwardReq *http.Request, method []string, err error) {
if len(req.URL().Path) != 0 && req.URL().Path != "/" {
providerUrl = providerUrl.JoinPath(req.URL().Path)
pathMethod := strings.Replace(req.URL().Path, "//", "/", -1)
method = []string{pathMethod}
} else {
bodyBytes := req.RawPayload()
method = m.ParsePayloadMethod(bodyBytes)
}

forwardReq, err = http.NewRequestWithContext(req.Context(), req.Method(), providerUrl.String(), req.GetPayload())
return
}

func (m *ProviderProxy) Handle(ctx *context.Context) (result string) {
requestMetrics := RequestMetrics{}

Expand All @@ -113,10 +134,8 @@ func (m *ProviderProxy) Handle(ctx *context.Context) (result string) {
logger.Infof("select rpc provider: %s", reqUrl.String())
requestMetrics.Provider = reqUrl.String()
req := ctx.GetInputRequest().(*httpprot.Request)
forwardReq, method, err := m.HandleRequest(req, reqUrl)

requestMetrics.RpcMethod = m.ParsePayloadMethod(req.RawPayload())

forwardReq, err := http.NewRequestWithContext(req.Context(), req.Method(), reqUrl.String(), req.GetPayload())
if err != nil {
logger.Errorf(err.Error())
return err.Error()
Expand All @@ -127,25 +146,36 @@ func (m *ProviderProxy) Handle(ctx *context.Context) (result string) {
}

response, err := m.client.Do(forwardReq)

if err != nil {
logger.Errorf(err.Error())
return err.Error()
}

requestMetrics.RpcMethod = method
requestMetrics.Duration = fasttime.Since(startTime)
requestMetrics.StatusCode = response.StatusCode
defer m.collectMetrics(requestMetrics)

body := readers.NewCallbackReader(response.Body)
response.Body = body
outputResponse, err := httpprot.NewResponse(response)
outputResponse.Body = response.Body

if err != nil {
logger.Errorf(err.Error())
response.Body.Close()
return err.Error()
}

if err = outputResponse.FetchPayload(-1); err != nil {
logger.Errorf("%s: failed to fetch response payload: %v, please consider to set serverMaxBodySize of SimpleHTTPProxy to -1.", m.Name(), err)
response.Body.Close()
return err.Error()
}

if !outputResponse.IsStream() {
response.Body.Close()
}
ctx.SetResponse(context.DefaultNamespace, outputResponse)
return ""
}
Expand All @@ -159,6 +189,9 @@ var kind = &filters.Kind{
Urls: make([]string, 0),
Interval: "1s",
Policy: "roundRobin",

MaxIdleConns: 10240,
MaxIdleConnsPerHost: 1024,
}
},
CreateInstance: func(spec filters.Spec) filters.Filter {
Expand Down Expand Up @@ -197,7 +230,13 @@ func (m *ProviderProxy) Inherit(previousGeneration filters.Filter) {
}

func (m *ProviderProxy) reload() {
client := http.DefaultClient
clientSpec := &proxy.HTTPClientSpec{
MaxIdleConns: m.spec.MaxIdleConns,
MaxIdleConnsPerHost: m.spec.MaxIdleConnsPerHost,
MaxRedirection: &m.spec.MaxRedirection,
}

client := proxy.HTTPClient(nil, clientSpec, 0)
m.client = client

providerSelectorSpec := selector.ProviderSelectorSpec{
Expand Down
22 changes: 21 additions & 1 deletion pkg/filters/proxies/providerproxy/providerproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ urls:
- https://eth.llamarpc.com
`
proxy := newTestProviderProxy(yamlConfig, assert)
defer proxy.Close()

postData := "{\"method\":\"eth_blockNumber\",\"params\":[],\"id\":1,\"jsonrpc\":\"2.0\"}"

Expand All @@ -106,8 +107,27 @@ urls:
response := proxy.Handle(ctx)
assert.Equal("", response)
assert.NotNil(ctx.GetResponse(context.DefaultNamespace).GetPayload())
}

proxy.Close()
func TestURLProviderProxy(t *testing.T) {
assert := assert.New(t)

const yamlConfig = `
name: tron-providerProxy
kind: ProviderProxy
urls:
- https://docs-demo.tron-mainnet.quiknode.pro
`
proxy := newTestProviderProxy(yamlConfig, assert)
defer proxy.Close()

postData := "{\"id_or_num\":\"66484052\",\"detail\":false}"
stdr, _ := http.NewRequest(http.MethodPost, "https://rpc.tron.network/wallet/getblock", strings.NewReader(postData))
stdr.Header.Set("Content-Type", "application/json")
ctx := getCtx(stdr)
response := proxy.Handle(ctx)
assert.Equal("", response)
assert.NotNil(ctx.GetResponse(context.DefaultNamespace).GetPayload())
}

func TestProviderProxy_ParsePayloadMethod(t *testing.T) {
Expand Down

0 comments on commit e3f9091

Please sign in to comment.