diff --git a/server/embed/config.go b/server/embed/config.go index 2f4a34167423..80487bafb978 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -211,12 +211,12 @@ type Config struct { // streams that each client can open at a time. MaxConcurrentStreams uint32 `json:"max-concurrent-streams"` - ListenPeerUrls, ListenClientUrls []url.URL - AdvertisePeerUrls, AdvertiseClientUrls []url.URL - ClientTLSInfo transport.TLSInfo - ClientAutoTLS bool - PeerTLSInfo transport.TLSInfo - PeerAutoTLS bool + ListenPeerUrls, ListenClientUrls, ListenClientHttpUrls []url.URL + AdvertisePeerUrls, AdvertiseClientUrls []url.URL + ClientTLSInfo transport.TLSInfo + ClientAutoTLS bool + PeerTLSInfo transport.TLSInfo + PeerAutoTLS bool // SelfSignedCertValidity specifies the validity period of the client and peer certificates // that are automatically generated by etcd when you specify ClientAutoTLS and PeerAutoTLS, // the unit is year, and the default is 1 @@ -439,10 +439,11 @@ type configYAML struct { // configJSON has file options that are translated into Config options type configJSON struct { - ListenPeerUrls string `json:"listen-peer-urls"` - ListenClientUrls string `json:"listen-client-urls"` - AdvertisePeerUrls string `json:"initial-advertise-peer-urls"` - AdvertiseClientUrls string `json:"advertise-client-urls"` + ListenPeerUrls string `json:"listen-peer-urls"` + ListenClientUrls string `json:"listen-client-urls"` + ListenClientHttpUrls string `json:"listen-client-http-urls"` + AdvertisePeerUrls string `json:"initial-advertise-peer-urls"` + AdvertiseClientUrls string `json:"advertise-client-urls"` CORSJSON string `json:"cors"` HostWhitelistJSON string `json:"host-whitelist"` @@ -589,6 +590,15 @@ func (cfg *configYAML) configFromFile(path string) error { cfg.Config.ListenClientUrls = u } + if cfg.configJSON.ListenClientHttpUrls != "" { + u, err := types.NewURLs(strings.Split(cfg.configJSON.ListenClientHttpUrls, ",")) + if err != nil { + fmt.Fprintf(os.Stderr, "unexpected error setting up listen-client-http-urls: %v\n", err) + os.Exit(1) + } + cfg.Config.ListenClientHttpUrls = u + } + if cfg.configJSON.AdvertisePeerUrls != "" { u, err := types.NewURLs(strings.Split(cfg.configJSON.AdvertisePeerUrls, ",")) if err != nil { @@ -688,6 +698,12 @@ func (cfg *Config) Validate() error { if err := checkBindURLs(cfg.ListenClientUrls); err != nil { return err } + if err := checkBindURLs(cfg.ListenClientHttpUrls); err != nil { + return err + } + if len(cfg.ListenClientHttpUrls) == 0 { + cfg.logger.Warn("Running http and grpc server on single port. This is not recommended for production.") + } if err := checkBindURLs(cfg.ListenMetricsUrls); err != nil { return err } @@ -957,9 +973,12 @@ func (cfg *Config) ClientSelfCert() (err error) { cfg.logger.Warn("ignoring client auto TLS since certs given") return nil } - chosts := make([]string, len(cfg.ListenClientUrls)) - for i, u := range cfg.ListenClientUrls { - chosts[i] = u.Host + chosts := make([]string, 0, len(cfg.ListenClientUrls)+len(cfg.ListenClientHttpUrls)) + for _, u := range cfg.ListenClientUrls { + chosts = append(chosts, u.Host) + } + for _, u := range cfg.ListenClientHttpUrls { + chosts = append(chosts, u.Host) } cfg.ClientTLSInfo, err = transport.SelfCert(cfg.logger, filepath.Join(cfg.Dir, "fixtures", "client"), chosts, cfg.SelfSignedCertValidity) if err != nil { @@ -1094,6 +1113,14 @@ func (cfg *Config) getListenClientUrls() (ss []string) { return ss } +func (cfg *Config) getListenClientHttpUrls() (ss []string) { + ss = make([]string, len(cfg.ListenClientHttpUrls)) + for i := range cfg.ListenClientHttpUrls { + ss[i] = cfg.ListenClientHttpUrls[i].String() + } + return ss +} + func (cfg *Config) getMetricsURLs() (ss []string) { ss = make([]string, len(cfg.ListenMetricsUrls)) for i := range cfg.ListenMetricsUrls { diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 209903a7eb7b..08d6244f5ce2 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -459,11 +459,16 @@ func (e *Etcd) Close() { func stopServers(ctx context.Context, ss *servers) { // first, close the http.Server - ss.http.Shutdown(ctx) - // do not grpc.Server.GracefulStop with TLS enabled etcd server + if ss.http != nil { + ss.http.Shutdown(ctx) + } + if ss.grpc == nil { + return + } + // do not grpc.Server.GracefulStop when grpc runs under http server // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531 // and https://github.com/etcd-io/etcd/issues/8916 - if ss.secure { + if ss.secure && ss.http != nil { ss.grpc.Stop() return } @@ -621,7 +626,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro } sctxs = make(map[string]*serveCtx) - for _, u := range cfg.ListenClientUrls { + for _, u := range append(cfg.ListenClientUrls, cfg.ListenClientHttpUrls...) { if u.Scheme == "http" || u.Scheme == "unix" { if !cfg.ClientTLSInfo.Empty() { cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("client-url", u.String())) @@ -648,6 +653,24 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro sctx.addr = addr sctx.network = network } + for _, u := range cfg.ListenClientHttpUrls { + addr, secure, network := resolveUrl(u) + + sctx := sctxs[addr] + if sctx == nil { + sctx = newServeCtx(cfg.logger) + sctxs[addr] = sctx + } else if !sctx.httpOnly { + return nil, fmt.Errorf("cannot bind both --client-listen-urls and --client-listen-http-urls on the same url %s", u.String()) + } + sctx.secure = sctx.secure || secure + sctx.insecure = sctx.insecure || !secure + sctx.scheme = u.Scheme + sctx.addr = addr + sctx.network = network + sctx.httpOnly = true + } + for _, sctx := range sctxs { if sctx.l, err = transport.NewListenerWithOpts(sctx.addr, sctx.scheme, transport.WithSocketOpts(&cfg.SocketOpts), @@ -670,7 +693,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro } defer func(addr string) { - if err == nil { + if err == nil || sctx.l == nil { return } sctx.l.Close() @@ -738,20 +761,27 @@ func (e *Etcd) serveClients() (err error) { })) } + splitHttp := false + for _, sctx := range e.sctxs { + if sctx.httpOnly { + splitHttp = true + } + } + // start client servers in each goroutine for _, sctx := range e.sctxs { go func(s *serveCtx) { - e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(), gopts...)) + e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHttp), splitHttp, gopts...)) }(sctx) } return nil } -func (e *Etcd) grpcGatewayDial() (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) { +func (e *Etcd) grpcGatewayDial(splitHttp bool) (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) { if !e.cfg.EnableGRPCGateway { return nil } - sctx := e.pickGrpcGatewayServeContext() + sctx := e.pickGrpcGatewayServeContext(splitHttp) addr := sctx.addr if network := sctx.network; network == "unix" { // explicitly define unix network for gRPC socket support @@ -784,9 +814,11 @@ func (e *Etcd) grpcGatewayDial() (grpcDial func(ctx context.Context) (*grpc.Clie } } -func (e *Etcd) pickGrpcGatewayServeContext() *serveCtx { +func (e *Etcd) pickGrpcGatewayServeContext(splitHttp bool) *serveCtx { for _, sctx := range e.sctxs { - return sctx + if !splitHttp || !sctx.httpOnly { + return sctx + } } panic("Expect at least one context able to serve grpc") } diff --git a/server/embed/serve.go b/server/embed/serve.go index 044a88d776dd..af9f9fdacbb7 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -57,6 +57,7 @@ type serveCtx struct { network string secure bool insecure bool + httpOnly bool ctx context.Context cancel context.CancelFunc @@ -95,6 +96,7 @@ func (sctx *serveCtx) serve( handler http.Handler, errHandler func(error), grpcDialForRestGatewayBackends func(ctx context.Context) (*grpc.ClientConn, error), + splitHttp bool, gopts ...grpc.ServerOption) (err error) { logger := defaultLog.New(io.Discard, "etcdhttp", 0) @@ -110,6 +112,12 @@ func (sctx *serveCtx) serve( sctx.lg.Info("ready to serve client requests") m := cmux.New(sctx.l) + var server func() error + onlyGRPC := splitHttp && !sctx.httpOnly + onlyHttp := splitHttp && sctx.httpOnly + grpcEnabled := !onlyHttp + httpEnabled := !onlyGRPC + v3c := v3client.New(s) servElection := v3election.NewElectionServer(v3c) servLock := v3lock.NewLockServer(v3c) @@ -125,103 +133,137 @@ func (sctx *serveCtx) serve( return err } } + var traffic string + switch { + case onlyGRPC: + traffic = "grpc" + case onlyHttp: + traffic = "http" + default: + traffic = "grpc+http" + } if sctx.insecure { - gs := v3rpc.Server(s, nil, nil, gopts...) - v3electionpb.RegisterElectionServer(gs, servElection) - v3lockpb.RegisterLockServer(gs, servLock) - if sctx.serviceRegister != nil { - sctx.serviceRegister(gs) + var gs *grpc.Server + var srv *http.Server + if httpEnabled { + httpmux := sctx.createMux(gwmux, handler) + srv = &http.Server{ + Handler: createAccessController(sctx.lg, s, httpmux), + ErrorLog: logger, // do not log user error + } + if err := configureHttpServer(srv, s.Cfg); err != nil { + sctx.lg.Error("Configure http server failed", zap.Error(err)) + return err + } } - - defer func(gs *grpc.Server) { - if err != nil { - sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err)) - gs.Stop() - sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err)) + if grpcEnabled { + gs = v3rpc.Server(s, nil, nil, gopts...) + v3electionpb.RegisterElectionServer(gs, servElection) + v3lockpb.RegisterLockServer(gs, servLock) + if sctx.serviceRegister != nil { + sctx.serviceRegister(gs) } - }(gs) - - grpcl := m.Match(cmux.HTTP2()) - go func(gs *grpc.Server, grpcLis net.Listener) { - errHandler(gs.Serve(grpcLis)) - }(gs, grpcl) - - httpmux := sctx.createMux(gwmux, handler) - - srvhttp := &http.Server{ - Handler: createAccessController(sctx.lg, s, httpmux), - ErrorLog: logger, // do not log user error + defer func(gs *grpc.Server) { + if err != nil { + sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err)) + gs.Stop() + sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err)) + } + }(gs) } - if err := configureHttpServer(srvhttp, s.Cfg); err != nil { - sctx.lg.Error("Configure http server failed", zap.Error(err)) - return err + if onlyGRPC { + server = func() error { + return gs.Serve(sctx.l) + } + } else { + server = m.Serve + + httpl := m.Match(cmux.HTTP1()) + go func(srvhttp *http.Server, tlsLis net.Listener) { + errHandler(srvhttp.Serve(tlsLis)) + }(srv, httpl) + + if grpcEnabled { + grpcl := m.Match(cmux.HTTP2()) + go func(gs *grpc.Server, l net.Listener) { + errHandler(gs.Serve(l)) + }(gs, grpcl) + } } - httpl := m.Match(cmux.HTTP1()) - - go func(srvhttp *http.Server, httpLis net.Listener) { - errHandler(srvhttp.Serve(httpLis)) - }(srvhttp, httpl) - sctx.serversC <- &servers{grpc: gs, http: srvhttp} + sctx.serversC <- &servers{grpc: gs, http: srv} sctx.lg.Info( "serving client traffic insecurely; this is strongly discouraged!", + zap.String("traffic", traffic), zap.String("address", sctx.l.Addr().String()), ) } if sctx.secure { + var gs *grpc.Server + var srv *http.Server + tlscfg, tlsErr := tlsinfo.ServerConfig() if tlsErr != nil { return tlsErr } - gs := v3rpc.Server(s, tlscfg, nil, gopts...) - v3electionpb.RegisterElectionServer(gs, servElection) - v3lockpb.RegisterLockServer(gs, servLock) - if sctx.serviceRegister != nil { - sctx.serviceRegister(gs) + if grpcEnabled { + gs = v3rpc.Server(s, tlscfg, nil, gopts...) + v3electionpb.RegisterElectionServer(gs, servElection) + v3lockpb.RegisterLockServer(gs, servLock) + if sctx.serviceRegister != nil { + sctx.serviceRegister(gs) + } + defer func(gs *grpc.Server) { + if err != nil { + sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err)) + gs.Stop() + sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err)) + } + }(gs) } - - defer func(gs *grpc.Server) { - if err != nil { - sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err)) - gs.Stop() - sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err)) + if httpEnabled { + if grpcEnabled { + handler = grpcHandlerFunc(gs, handler) } - }(gs) + httpmux := sctx.createMux(gwmux, handler) - handler = grpcHandlerFunc(gs, handler) - var tlsl net.Listener - tlsl, err = transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo) - if err != nil { - return err + srv = &http.Server{ + Handler: createAccessController(sctx.lg, s, httpmux), + TLSConfig: tlscfg, + ErrorLog: logger, // do not log user error + } + if err := configureHttpServer(srv, s.Cfg); err != nil { + sctx.lg.Error("Configure https server failed", zap.Error(err)) + return err + } } - // TODO: add debug flag; enable logging when debug flag is set - httpmux := sctx.createMux(gwmux, handler) - srv := &http.Server{ - Handler: createAccessController(sctx.lg, s, httpmux), - TLSConfig: tlscfg, - ErrorLog: logger, // do not log user error - } - if err := configureHttpServer(srv, s.Cfg); err != nil { - sctx.lg.Error("Configure https server failed", zap.Error(err)) - return err - } + if onlyGRPC { + server = func() error { return gs.Serve(sctx.l) } + } else { + server = m.Serve - go func(srvhttp *http.Server, tlsLis net.Listener) { - errHandler(srvhttp.Serve(tlsLis)) - }(srv, tlsl) + tlsl, err := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo) + if err != nil { + return err + } + go func(srvhttp *http.Server, tlsl net.Listener) { + errHandler(srvhttp.Serve(tlsl)) + }(srv, tlsl) + } sctx.serversC <- &servers{secure: true, grpc: gs, http: srv} sctx.lg.Info( "serving client traffic securely", + zap.String("traffic", traffic), zap.String("address", sctx.l.Addr().String()), ) } - return m.Serve() + return server() } func configureHttpServer(srv *http.Server, cfg config.ServerConfig) error { diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 954a08727d1f..18adce5f765b 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -115,7 +115,11 @@ func newConfig() *config { ) fs.Var( flags.NewUniqueURLsWithExceptions(embed.DefaultListenClientURLs, ""), "listen-client-urls", - "List of URLs to listen on for client traffic.", + "List of URLs to listen on for client grpc traffic and http as long as --listen-client-http-urls is not specified.", + ) + fs.Var( + flags.NewUniqueURLsWithExceptions("", ""), "listen-client-http-urls", + "List of URLs to listen on for http only client traffic. Enabling this flag removes http services from --listen-client-urls.", ) fs.Var( flags.NewUniqueURLsWithExceptions("", ""), @@ -386,6 +390,7 @@ func (cfg *config) configFromCmdLine() error { cfg.ec.ListenPeerUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-peer-urls") cfg.ec.AdvertisePeerUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "initial-advertise-peer-urls") cfg.ec.ListenClientUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-urls") + cfg.ec.ListenClientHttpUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-client-http-urls") cfg.ec.AdvertiseClientUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "advertise-client-urls") cfg.ec.ListenMetricsUrls = flags.UniqueURLsFromFlag(cfg.cf.flagSet, "listen-metrics-urls") diff --git a/server/etcdmain/config_test.go b/server/etcdmain/config_test.go index 7a9c96b6a0bf..30239374edae 100644 --- a/server/etcdmain/config_test.go +++ b/server/etcdmain/config_test.go @@ -37,6 +37,7 @@ func TestConfigParsingMemberFlags(t *testing.T) { "-experimental-snapshot-catchup-entries=1000", "-listen-peer-urls=http://localhost:8000,https://localhost:8001", "-listen-client-urls=http://localhost:7000,https://localhost:7001", + "-listen-client-http-urls=http://localhost:7002,https://localhost:7003", // it should be set if -listen-client-urls is set "-advertise-client-urls=http://localhost:7000,https://localhost:7001", } @@ -60,6 +61,7 @@ func TestConfigFileMemberFields(t *testing.T) { SnapshotCatchUpEntries uint64 `json:"experimental-snapshot-catch-up-entries"` ListenPeerUrls string `json:"listen-peer-urls"` ListenClientUrls string `json:"listen-client-urls"` + ListenClientHttpUrls string `json:"listen-client-http-urls"` AdvertiseClientUrls string `json:"advertise-client-urls"` }{ "testdir", @@ -70,6 +72,7 @@ func TestConfigFileMemberFields(t *testing.T) { 1000, "http://localhost:8000,https://localhost:8001", "http://localhost:7000,https://localhost:7001", + "http://localhost:7002,https://localhost:7003", "http://localhost:7000,https://localhost:7001", } @@ -398,6 +401,7 @@ func validateMemberFlags(t *testing.T, cfg *config) { Dir: "testdir", ListenPeerUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}, ListenClientUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}, + ListenClientHttpUrls: []url.URL{{Scheme: "http", Host: "localhost:7002"}, {Scheme: "https", Host: "localhost:7003"}}, MaxSnapFiles: 10, MaxWalFiles: 10, Name: "testname", @@ -429,6 +433,9 @@ func validateMemberFlags(t *testing.T, cfg *config) { if !reflect.DeepEqual(cfg.ec.ListenClientUrls, wcfg.ListenClientUrls) { t.Errorf("listen-client-urls = %v, want %v", cfg.ec.ListenClientUrls, wcfg.ListenClientUrls) } + if !reflect.DeepEqual(cfg.ec.ListenClientHttpUrls, wcfg.ListenClientHttpUrls) { + t.Errorf("listen-client-http-urls = %v, want %v", cfg.ec.ListenClientHttpUrls, wcfg.ListenClientHttpUrls) + } } func validateClusteringFlags(t *testing.T, cfg *config) { diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index 67b67add8162..a9affdee05c6 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -65,7 +65,9 @@ Member: --listen-peer-urls 'http://localhost:2380' List of URLs to listen on for peer traffic. --listen-client-urls 'http://localhost:2379' - List of URLs to listen on for client traffic. + List of URLs to listen on for client grpc traffic and http as long as --listen-client-http-urls is not specified. + --listen-client-http-urls '' + List of URLs to listen on for http only client traffic. Enabling this flag removes http services from --listen-client-urls. --max-snapshots '` + strconv.Itoa(embed.DefaultMaxSnapshots) + `' Maximum number of snapshot files to retain (0 is unlimited). --max-wals '` + strconv.Itoa(embed.DefaultMaxWALs) + `' diff --git a/tests/e2e/utils.go b/tests/e2e/utils.go index fcc6f78bb3b6..26a4c6086ead 100644 --- a/tests/e2e/utils.go +++ b/tests/e2e/utils.go @@ -77,15 +77,17 @@ func tlsInfo(t testing.TB, cfg e2e.ClientConfig) (*transport.TLSInfo, error) { } } -func fillEtcdWithData(ctx context.Context, c *clientv3.Client, keyCount int, valueSize uint) error { +func fillEtcdWithData(ctx context.Context, c *clientv3.Client, dbSize int) error { g := errgroup.Group{} concurrency := 10 + keyCount := 100 keysPerRoutine := keyCount / concurrency + valueSize := dbSize / keyCount for i := 0; i < concurrency; i++ { i := i g.Go(func() error { for j := 0; j < keysPerRoutine; j++ { - _, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(valueSize)) + _, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(uint(valueSize))) if err != nil { return err } diff --git a/tests/e2e/watch_delay_test.go b/tests/e2e/watch_delay_test.go index 1e1bd911e38c..1eff89522951 100644 --- a/tests/e2e/watch_delay_test.go +++ b/tests/e2e/watch_delay_test.go @@ -35,29 +35,48 @@ import ( const ( watchResponsePeriod = 100 * time.Millisecond watchTestDuration = 5 * time.Second - // TODO: Reduce maxWatchDelay when https://github.com/etcd-io/etcd/issues/15402 is addressed. - maxWatchDelay = 2 * time.Second - // Configure enough read load to cause starvation from https://github.com/etcd-io/etcd/issues/15402. - // Tweaked to pass on GitHub runner. For local runs please increase parameters. - // TODO: Increase when https://github.com/etcd-io/etcd/issues/15402 is fully addressed. - numberOfPreexistingKeys = 100 - sizeOfPreexistingValues = 5000 - readLoadConcurrency = 10 + readLoadConcurrency = 10 ) type testCase struct { - name string - config e2e.EtcdProcessClusterConfig + name string + config e2e.EtcdProcessClusterConfig + maxWatchDelay time.Duration + dbSizeBytes int } +const ( + Kilo = 1000 + Mega = 1000 * Kilo +) + +// 10 MB is not a bottleneck of grpc server, but filling up etcd with data. +// Keeping it lower so tests don't take too long. +// If we implement reuse of db we could increase the dbSize. var tcs = []testCase{ { - name: "NoTLS", - config: e2e.EtcdProcessClusterConfig{ClusterSize: 1}, + name: "NoTLS", + config: e2e.EtcdProcessClusterConfig{ClusterSize: 1}, + maxWatchDelay: 100 * time.Millisecond, + dbSizeBytes: 5 * Mega, + }, + { + name: "TLS", + config: e2e.EtcdProcessClusterConfig{ClusterSize: 1, Client: e2e.ClientConfig{ConnectionType: e2e.ClientTLS}}, + maxWatchDelay: 2 * time.Second, + dbSizeBytes: 500 * Kilo, + }, + { + name: "SeparateHttpNoTLS", + config: e2e.EtcdProcessClusterConfig{ClusterSize: 1, ClientHttpSeparate: true}, + maxWatchDelay: 100 * time.Millisecond, + dbSizeBytes: 5 * Mega, }, { - name: "ClientTLS", - config: e2e.EtcdProcessClusterConfig{ClusterSize: 1, Client: e2e.ClientConfig{ConnectionType: e2e.ClientTLS}}, + name: "SeparateHttpTLS", + config: e2e.EtcdProcessClusterConfig{ClusterSize: 1, Client: e2e.ClientConfig{ConnectionType: e2e.ClientTLS}, ClientHttpSeparate: true}, + maxWatchDelay: 100 * time.Millisecond, + dbSizeBytes: 5 * Mega, }, } @@ -71,13 +90,13 @@ func TestWatchDelayForPeriodicProgressNotification(t *testing.T) { require.NoError(t, err) defer clus.Close() c := newClient(t, clus.EndpointsV3(), tc.config.Client) - require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues)) + require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes)) ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration) defer cancel() g := errgroup.Group{} continuouslyExecuteGetAll(ctx, t, &g, c) - validateWatchDelay(t, c.Watch(ctx, "fake-key", clientv3.WithProgressNotify())) + validateWatchDelay(t, c.Watch(ctx, "fake-key", clientv3.WithProgressNotify()), tc.maxWatchDelay) require.NoError(t, g.Wait()) }) } @@ -91,7 +110,7 @@ func TestWatchDelayForManualProgressNotification(t *testing.T) { require.NoError(t, err) defer clus.Close() c := newClient(t, clus.EndpointsV3(), tc.config.Client) - require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues)) + require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes)) ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration) defer cancel() @@ -110,7 +129,7 @@ func TestWatchDelayForManualProgressNotification(t *testing.T) { time.Sleep(watchResponsePeriod) } }) - validateWatchDelay(t, c.Watch(ctx, "fake-key")) + validateWatchDelay(t, c.Watch(ctx, "fake-key"), tc.maxWatchDelay) require.NoError(t, g.Wait()) }) } @@ -124,7 +143,7 @@ func TestWatchDelayForEvent(t *testing.T) { require.NoError(t, err) defer clus.Close() c := newClient(t, clus.EndpointsV3(), tc.config.Client) - require.NoError(t, fillEtcdWithData(context.Background(), c, numberOfPreexistingKeys, sizeOfPreexistingValues)) + require.NoError(t, fillEtcdWithData(context.Background(), c, tc.dbSizeBytes)) ctx, cancel := context.WithTimeout(context.Background(), watchTestDuration) defer cancel() @@ -144,13 +163,13 @@ func TestWatchDelayForEvent(t *testing.T) { } }) continuouslyExecuteGetAll(ctx, t, &g, c) - validateWatchDelay(t, c.Watch(ctx, "key")) + validateWatchDelay(t, c.Watch(ctx, "key"), tc.maxWatchDelay) require.NoError(t, g.Wait()) }) } } -func validateWatchDelay(t *testing.T, watch clientv3.WatchChan) { +func validateWatchDelay(t *testing.T, watch clientv3.WatchChan, maxWatchDelay time.Duration) { start := time.Now() var maxDelay time.Duration for range watch { @@ -181,7 +200,7 @@ func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Gr for i := 0; i < readLoadConcurrency; i++ { g.Go(func() error { for { - _, err := c.Get(ctx, "", clientv3.WithPrefix()) + resp, err := c.Get(ctx, "", clientv3.WithPrefix()) if err != nil { if strings.Contains(err.Error(), "context deadline exceeded") { return nil @@ -189,8 +208,12 @@ func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Gr return err } } + respSize := 0 + for _, kv := range resp.Kvs { + respSize += kv.Size() + } mux.Lock() - size += numberOfPreexistingKeys * sizeOfPreexistingValues + size += respSize mux.Unlock() } }) diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index ef08e00fa007..1db101cfd84f 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -151,10 +151,11 @@ type EtcdProcessClusterConfig struct { SnapshotCount int // default is 10000 SnapshotCatchUpEntries int // default is 5000 - Client ClientConfig - IsPeerTLS bool - IsPeerAutoTLS bool - CN bool + Client ClientConfig + ClientHttpSeparate bool + IsPeerTLS bool + IsPeerAutoTLS bool + CN bool CipherSuites []string @@ -463,6 +464,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in peerPort := port + 1 metricsPort := port + 2 peer2Port := port + 3 + clientHttpPort := port + 4 curlHost := fmt.Sprintf("localhost:%d", clientPort) switch cfg.Client.ConnectionType { @@ -511,6 +513,10 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in "--data-dir", dataDirPath, "--snapshot-count", fmt.Sprintf("%d", cfg.SnapshotCount), } + if cfg.ClientHttpSeparate { + clientHttpUrl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", clientHttpPort)} + args = append(args, "--listen-client-http-urls", clientHttpUrl.String()) + } if cfg.ForceNewCluster { args = append(args, "--force-new-cluster")