Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KeepAliveParameters to agent client #4157

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions flyteplugins/go/tasks/plugins/webapi/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ var (
Endpoint: "dns:///flyteagent.flyte.svc.cluster.local:80",
Insecure: true,
DefaultTimeout: config.Duration{Duration: 10 * time.Second},
KeepAliveParameters: &KeepAliveParameters{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think we keep existing defaults defined by https://pkg.go.dev/google.golang.org/grpc/keepalive#ClientParameters unchanged, and only give a value for Time? Also 10s seems a bit aggressive. In a large setup, the overhead of DNS resolution is not negligible, plus 10s might be even smaller than DNS cache timeout so in many cases the refresh won't give any new IPs. In our backend production setup, we default this to 5 minutes, just for reference.

Time: config.Duration{Duration: 10 * time.Second},
Timeout: config.Duration{Duration: 5 * time.Second},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Timeout: config.Duration{Duration: 5 * time.Second},
Timeout: config.Duration{Duration: 20 * time.Second},

PermitWithoutStream: true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason we set this to true? It defaults to false I think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
PermitWithoutStream: true,
PermitWithoutStream: false,

},
},
}

Expand All @@ -67,6 +72,22 @@ type Config struct {
AgentForTaskTypes map[string]string `json:"agentForTaskTypes" pflag:"-,"`
}

// KeepAliveParameters defines keepalive parameters on the client-side. For more details, check https://pkg.go.dev/google.golang.org/grpc/keepalive#ClientParameters
type KeepAliveParameters struct {
// After a duration of this time if the client doesn't see any activity it
// pings the server to see if the transport is still alive.
// If set below 10s, a minimum value of 10s will be used instead.
Time config.Duration `json:"time"`
// After having pinged for keepalive check, the client waits for a duration
// of Timeout and if no activity is seen even after that the connection is
// closed.
Timeout config.Duration `json:"timeout"`
// If true, client sends keepalive pings even with no active RPCs. If false,
// when there are no active RPCs, Time and Timeout will be ignored and no
// keepalive pings will be sent.
PermitWithoutStream bool `json:"permitWithoutStream"`
}

type Agent struct {
// Endpoint points to an agent gRPC endpoint
Endpoint string `json:"endpoint"`
Expand All @@ -82,6 +103,9 @@ type Agent struct {

// DefaultTimeout gives the default RPC timeout if a more specific one is not defined in Timeouts; if neither DefaultTimeout nor Timeouts is defined for an operation, RPC timeout will not be enforced
DefaultTimeout config.Duration `json:"defaultTimeout"`

// KeepAliveParameters defines keepalive parameters for the gRPC client
KeepAliveParameters *KeepAliveParameters `json:"keepAliveParameters"`
}

func GetConfig() *Config {
Expand Down
5 changes: 5 additions & 0 deletions flyteplugins/go/tasks/plugins/webapi/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func TestGetAndSetConfig(t *testing.T) {
},
}
cfg.DefaultAgent.DefaultTimeout = config.Duration{Duration: 10 * time.Second}
cfg.DefaultAgent.KeepAliveParameters = &KeepAliveParameters{
Time: config.Duration{Duration: 10 * time.Second},
Timeout: config.Duration{Duration: 5 * time.Second},
PermitWithoutStream: true,
}
cfg.Agents = map[string]*Agent{
"agent_1": {
Insecure: cfg.DefaultAgent.Insecure,
Expand Down
11 changes: 10 additions & 1 deletion flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"

pluginErrors "github.com/flyteorg/flyte/flyteplugins/go/tasks/errors"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery"
Expand Down Expand Up @@ -247,6 +247,15 @@ func getClientFunc(ctx context.Context, agent *Agent, connectionCache map[*Agent
opts = append(opts, grpc.WithDefaultServiceConfig(agent.DefaultServiceConfig))
}

if agent.KeepAliveParameters != nil {

opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: agent.KeepAliveParameters.Time.Duration,
Timeout: agent.KeepAliveParameters.Timeout.Duration,
PermitWithoutStream: agent.KeepAliveParameters.PermitWithoutStream,
}))
}

var err error
conn, err = grpc.Dial(agent.Endpoint, opts...)
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion flyteplugins/go/tasks/plugins/webapi/agent/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ func TestPlugin(t *testing.T) {
})

t.Run("test getClientFunc more config", func(t *testing.T) {
client, err := getClientFunc(context.Background(), &Agent{Endpoint: "localhost:80", Insecure: true, DefaultServiceConfig: "{\"loadBalancingConfig\": [{\"round_robin\":{}}]}"}, map[*Agent]*grpc.ClientConn{})
client, err := getClientFunc(context.Background(), &Agent{
Endpoint: "localhost:80",
Insecure: true,
DefaultServiceConfig: "{\"loadBalancingConfig\": [{\"round_robin\":{}}]}",
KeepAliveParameters: &KeepAliveParameters{Time: config.Duration{Duration: 10 * time.Second}}},
map[*Agent]*grpc.ClientConn{})
assert.NoError(t, err)
assert.NotNil(t, client)
})
Expand Down