From 81e51c6b3f9f9dccbe07564667c4ed2ca84b85d0 Mon Sep 17 00:00:00 2001 From: Samar Abbas - Uber Date: Fri, 29 Sep 2017 16:10:31 -0700 Subject: [PATCH] Create a single connection pool per host (#359) When history service comes up it creates a gocql client for execution manager and then uses the same client for all shards for making persistence calls. Also created config knobs to configure number of connections for both events and execution manager clients. Also include shard tag for shard manager calls. fixes #307 --- common/metrics/defs.go | 6 +- common/mocks/ExecutionManagerFactory.go | 5 ++ .../cassandraHistoryPersistence.go | 4 +- common/persistence/cassandraPersistence.go | 16 +--- .../cassandraPersistenceClientFactory.go | 81 +++++++++++++++++++ common/persistence/dataInterfaces.go | 1 + common/persistence/persistenceTestBase.go | 23 ++---- host/onebox.go | 5 +- service/frontend/service.go | 7 +- service/history/execMgrFactory.go | 70 ---------------- service/history/handler.go | 1 + service/history/service.go | 22 ++++- service/history/shardController.go | 7 -- 13 files changed, 132 insertions(+), 116 deletions(-) create mode 100644 common/persistence/cassandraPersistenceClientFactory.go delete mode 100644 service/history/execMgrFactory.go diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 84500c84bff..993309539a0 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -339,9 +339,9 @@ const ( var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ // common scope Names Common: { - PersistenceCreateShardScope: {operation: "CreateShard"}, - PersistenceGetShardScope: {operation: "GetShard"}, - PersistenceUpdateShardScope: {operation: "UpdateShard"}, + PersistenceCreateShardScope: {operation: "CreateShard", tags: map[string]string{ShardTagName: NoneShardsTagValue}}, + PersistenceGetShardScope: {operation: "GetShard", tags: map[string]string{ShardTagName: NoneShardsTagValue}}, + PersistenceUpdateShardScope: {operation: "UpdateShard", tags: map[string]string{ShardTagName: NoneShardsTagValue}}, PersistenceCreateWorkflowExecutionScope: {operation: "CreateWorkflowExecution"}, PersistenceGetWorkflowExecutionScope: {operation: "GetWorkflowExecution"}, PersistenceUpdateWorkflowExecutionScope: {operation: "UpdateWorkflowExecution"}, diff --git a/common/mocks/ExecutionManagerFactory.go b/common/mocks/ExecutionManagerFactory.go index 11e5f38b292..4e4ae19059b 100644 --- a/common/mocks/ExecutionManagerFactory.go +++ b/common/mocks/ExecutionManagerFactory.go @@ -50,3 +50,8 @@ func (_m *ExecutionManagerFactory) CreateExecutionManager(shardID int) (persiste return r0, r1 } + +// Close is mock implementation for Close of ExecutionManagerFactory +func (_m *ExecutionManagerFactory) Close() { + _m.Called() +} diff --git a/common/persistence/cassandraHistoryPersistence.go b/common/persistence/cassandraHistoryPersistence.go index 9f250f27285..dafac4bc927 100644 --- a/common/persistence/cassandraHistoryPersistence.go +++ b/common/persistence/cassandraHistoryPersistence.go @@ -60,7 +60,8 @@ type ( ) // NewCassandraHistoryPersistence is used to create an instance of HistoryManager implementation -func NewCassandraHistoryPersistence(hosts string, port int, user, password, dc string, keyspace string, logger bark.Logger) (HistoryManager, +func NewCassandraHistoryPersistence(hosts string, port int, user, password, dc string, keyspace string, + numConns int, logger bark.Logger) (HistoryManager, error) { cluster := common.NewCassandraCluster(hosts, port, user, password, dc) cluster.Keyspace = keyspace @@ -68,6 +69,7 @@ func NewCassandraHistoryPersistence(hosts string, port int, user, password, dc s cluster.Consistency = gocql.LocalQuorum cluster.SerialConsistency = gocql.LocalSerial cluster.Timeout = defaultSessionTimeout + cluster.NumConns = numConns session, err := cluster.CreateSession() if err != nil { diff --git a/common/persistence/cassandraPersistence.go b/common/persistence/cassandraPersistence.go index 0d54ce17bd4..4c648e800f5 100644 --- a/common/persistence/cassandraPersistence.go +++ b/common/persistence/cassandraPersistence.go @@ -531,20 +531,8 @@ func NewCassandraShardPersistence(hosts string, port int, user, password, dc str } // NewCassandraWorkflowExecutionPersistence is used to create an instance of workflowExecutionManager implementation -func NewCassandraWorkflowExecutionPersistence(hosts string, port int, user, password, dc string, keyspace string, - shardID int, logger bark.Logger) (ExecutionManager, error) { - cluster := common.NewCassandraCluster(hosts, port, user, password, dc) - cluster.Keyspace = keyspace - cluster.ProtoVersion = cassandraProtoVersion - cluster.Consistency = gocql.LocalQuorum - cluster.SerialConsistency = gocql.LocalSerial - cluster.Timeout = defaultSessionTimeout - - session, err := cluster.CreateSession() - if err != nil { - return nil, err - } - +func NewCassandraWorkflowExecutionPersistence(shardID int, session *gocql.Session, + logger bark.Logger) (ExecutionManager, error) { return &cassandraPersistence{shardID: shardID, session: session, lowConslevel: gocql.One, logger: logger}, nil } diff --git a/common/persistence/cassandraPersistenceClientFactory.go b/common/persistence/cassandraPersistenceClientFactory.go new file mode 100644 index 00000000000..cc2012e1a99 --- /dev/null +++ b/common/persistence/cassandraPersistenceClientFactory.go @@ -0,0 +1,81 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package persistence + +import ( + "github.com/gocql/gocql" + "github.com/uber-common/bark" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/metrics" +) + +type ( + cassandraPersistenceClientFactory struct { + session *gocql.Session + metricsClient metrics.Client + logger bark.Logger + } +) + +// NewCassandraPersistenceClientFactory is used to create an instance of ExecutionManagerFactory implementation +func NewCassandraPersistenceClientFactory(hosts string, port int, user, password, dc string, keyspace string, + numConns int, logger bark.Logger, mClient metrics.Client) (ExecutionManagerFactory, error) { + cluster := common.NewCassandraCluster(hosts, port, user, password, dc) + cluster.Keyspace = keyspace + cluster.ProtoVersion = cassandraProtoVersion + cluster.Consistency = gocql.LocalQuorum + cluster.SerialConsistency = gocql.LocalSerial + cluster.Timeout = defaultSessionTimeout + cluster.NumConns = numConns + + session, err := cluster.CreateSession() + if err != nil { + return nil, err + } + + return &cassandraPersistenceClientFactory{session: session, metricsClient: mClient, logger: logger}, nil +} + +// CreateExecutionManager implements ExecutionManagerFactory interface +func (f *cassandraPersistenceClientFactory) CreateExecutionManager(shardID int) (ExecutionManager, error) { + mgr, err := NewCassandraWorkflowExecutionPersistence(shardID, f.session, f.logger) + + if err != nil { + return nil, err + } + + if f.metricsClient == nil { + return mgr, nil + } + + tags := map[string]string{ + metrics.ShardTagName: metrics.AllShardsTagValue, + } + return NewWorkflowExecutionPersistenceClient( + mgr, f.metricsClient.Tagged(tags)), nil +} + +// Close releases the underlying resources held by this object +func (f *cassandraPersistenceClientFactory) Close() { + if f.session != nil { + f.session.Close() + } +} diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index c584977dd62..4b5f6442d65 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -650,6 +650,7 @@ type ( // ExecutionManagerFactory creates an instance of ExecutionManager for a given shard ExecutionManagerFactory interface { + Closeable CreateExecutionManager(shardID int) (ExecutionManager, error) } diff --git a/common/persistence/persistenceTestBase.go b/common/persistence/persistenceTestBase.go index 0da09f26ea5..9f30c55f69a 100644 --- a/common/persistence/persistenceTestBase.go +++ b/common/persistence/persistenceTestBase.go @@ -97,21 +97,6 @@ type ( } ) -func newTestExecutionMgrFactory(options TestBaseOptions, cassandra CassandraTestCluster, - logger bark.Logger) ExecutionManagerFactory { - return &testExecutionMgrFactory{ - options: options, - cassandra: cassandra, - logger: logger, - } -} - -func (f *testExecutionMgrFactory) CreateExecutionManager(shardID int) (ExecutionManager, error) { - return NewCassandraWorkflowExecutionPersistence(f.options.ClusterHost, f.options.ClusterPort, f.options.ClusterUser, - f.options.ClusterPassword, f.options.Datacenter, f.cassandra.keyspace, - shardID, f.logger) -} - func (g *testTransferTaskIDGenerator) GetNextTransferTaskID() (int64, error) { return atomic.AddInt64(&g.seqNum, 1), nil } @@ -128,7 +113,11 @@ func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions) { if err != nil { log.Fatal(err) } - s.ExecutionMgrFactory = newTestExecutionMgrFactory(options, s.CassandraTestCluster, log) + s.ExecutionMgrFactory, err = NewCassandraPersistenceClientFactory(options.ClusterHost, options.ClusterPort, + options.ClusterUser, options.ClusterPassword, options.Datacenter, s.CassandraTestCluster.keyspace, 2, log, nil) + if err != nil { + log.Fatal(err) + } // Create an ExecutionManager for the shard for use in unit tests s.WorkflowMgr, err = s.ExecutionMgrFactory.CreateExecutionManager(shardID) if err != nil { @@ -142,7 +131,7 @@ func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions) { } s.HistoryMgr, err = NewCassandraHistoryPersistence(options.ClusterHost, options.ClusterPort, options.ClusterUser, - options.ClusterPassword, options.Datacenter, s.CassandraTestCluster.keyspace, log) + options.ClusterPassword, options.Datacenter, s.CassandraTestCluster.keyspace, 2, log) if err != nil { log.Fatal(err) } diff --git a/host/onebox.go b/host/onebox.go index 45f8cd2d37b..187939bf6a7 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -188,7 +188,10 @@ func (c *cadenceImpl) startHistory(logger bark.Logger, shardMgr persistence.Shar params.RingpopFactory = newRingpopFactory(common.FrontendServiceName, rpHosts) params.CassandraConfig.NumHistoryShards = c.numberOfHistoryShards service := service.New(params) - handler := history.NewHandler(service, history.NewConfig(c.numberOfHistoryShards), shardMgr, metadataMgr, + historyConfig := history.NewConfig(c.numberOfHistoryShards) + historyConfig.HistoryMgrNumConns = c.numberOfHistoryShards + historyConfig.ExecutionMgrNumConns = c.numberOfHistoryShards + handler := history.NewHandler(service, historyConfig, shardMgr, metadataMgr, visibilityMgr, historyMgr, executionMgrFactory) handler.Start() c.historyHandlers = append(c.historyHandlers, handler) diff --git a/service/frontend/service.go b/service/frontend/service.go index 261b08e09e7..724b7b9d7d3 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -31,6 +31,9 @@ type Config struct { DefaultVisibilityMaxPageSize int32 DefaultHistoryMaxPageSize int32 RPS int + + // Persistence settings + HistoryMgrNumConns int } // NewConfig returns new service config with default values @@ -38,7 +41,8 @@ func NewConfig() *Config { return &Config{ DefaultVisibilityMaxPageSize: 1000, DefaultHistoryMaxPageSize: 1000, - RPS: 1200, // This limit is based on experimental runs. + RPS: 1200, // This limit is based on experimental runs. + HistoryMgrNumConns: 10, } } @@ -99,6 +103,7 @@ func (s *Service) Start() { p.CassandraConfig.Password, p.CassandraConfig.Datacenter, p.CassandraConfig.Keyspace, + s.config.HistoryMgrNumConns, p.Logger) if err != nil { diff --git a/service/history/execMgrFactory.go b/service/history/execMgrFactory.go deleted file mode 100644 index 187c3c17fe7..00000000000 --- a/service/history/execMgrFactory.go +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package history - -import ( - "github.com/uber-common/bark" - "github.com/uber/cadence/common/metrics" - "github.com/uber/cadence/common/persistence" - "github.com/uber/cadence/common/service/config" -) - -// executionMgrFactory is an implementation of -// persistence.ExecutionManagerFactory interface -type executionMgrFactory struct { - config *config.Cassandra - logger bark.Logger - metricsClient metrics.Client -} - -// NewExecutionManagerFactory builds and returns a factory object -func NewExecutionManagerFactory(config *config.Cassandra, - logger bark.Logger, mClient metrics.Client) persistence.ExecutionManagerFactory { - - return &executionMgrFactory{ - config: config, - logger: logger, - metricsClient: mClient, - } -} - -// CreateExecutionManager implements ExecutionManagerFactory interface -func (factory *executionMgrFactory) CreateExecutionManager(shardID int) (persistence.ExecutionManager, error) { - - mgr, err := persistence.NewCassandraWorkflowExecutionPersistence( - factory.config.Hosts, - factory.config.Port, - factory.config.User, - factory.config.Password, - factory.config.Datacenter, - factory.config.Keyspace, - shardID, - factory.logger) - - if err != nil { - return nil, err - } - tags := map[string]string{ - metrics.ShardTagName: metrics.AllShardsTagValue, - } - return persistence.NewWorkflowExecutionPersistenceClient( - mgr, factory.metricsClient.Tagged(tags)), nil -} diff --git a/service/history/handler.go b/service/history/handler.go index 1699ec60f13..6399e1d3056 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -120,6 +120,7 @@ func (h *Handler) Stop() { h.controller.Stop() h.shardManager.Close() h.historyMgr.Close() + h.executionMgrFactory.Close() h.metadataMgr.Close() h.visibilityMgr.Close() h.Service.Stop() diff --git a/service/history/service.go b/service/history/service.go index 3efc37a99f2..de01d9b4a50 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -62,6 +62,10 @@ type Config struct { TransferProcessorUpdateAckInterval time.Duration TransferProcessorForceUpdateInterval time.Duration TransferTaskWorkerCount int + + // Persistence settings + ExecutionMgrNumConns int + HistoryMgrNumConns int } // NewConfig returns new service config with default values @@ -88,6 +92,8 @@ func NewConfig(numberOfShards int) *Config { TransferProcessorUpdateAckInterval: 10 * time.Second, TransferProcessorForceUpdateInterval: 10 * time.Minute, TransferTaskWorkerCount: 10, + ExecutionMgrNumConns: 100, + HistoryMgrNumConns: 100, } } @@ -181,14 +187,26 @@ func (s *Service) Start() { p.CassandraConfig.Password, p.CassandraConfig.Datacenter, p.CassandraConfig.Keyspace, + s.config.HistoryMgrNumConns, p.Logger) if err != nil { log.Fatalf("Creating Cassandra history manager persistence failed: %v", err) } - history = persistence.NewHistoryPersistenceClient(history, base.GetMetricsClient()) - execMgrFactory := NewExecutionManagerFactory(&p.CassandraConfig, p.Logger, base.GetMetricsClient()) + + execMgrFactory, err := persistence.NewCassandraPersistenceClientFactory(p.CassandraConfig.Hosts, + p.CassandraConfig.Port, + p.CassandraConfig.User, + p.CassandraConfig.Password, + p.CassandraConfig.Datacenter, + p.CassandraConfig.Keyspace, + s.config.ExecutionMgrNumConns, + p.Logger, + base.GetMetricsClient()) + if err != nil { + log.Fatalf("Creating Cassandra execution manager persistence factory failed: %v", err) + } handler := NewHandler(base, s.config, diff --git a/service/history/shardController.go b/service/history/shardController.go index b902551b892..a8b81589762 100644 --- a/service/history/shardController.go +++ b/service/history/shardController.go @@ -391,13 +391,6 @@ func (i *historyShardsItem) stopEngine() { i.engine = nil logging.LogShardEngineStoppedEvent(i.logger, i.host.Identity(), i.shardID) } - - // Shutting down executionMgr will close all connections - // to cassandra for this engine. So, make sure to - // close executionMgr only after stopping the engine - if i.executionMgr != nil { - i.executionMgr.Close() - } } func isShardOwnershiptLostError(err error) bool {