Skip to content

Commit

Permalink
Create a single connection pool per host (#359)
Browse files Browse the repository at this point in the history
When history service comes up it creates a gocql client for execution manager and then uses the same client for all shards for making persistence calls. Also created config knobs to configure number of connections for both events and execution manager clients.

Also include shard tag for shard manager calls.

fixes #307
  • Loading branch information
samarabbas authored Sep 29, 2017
1 parent affdf19 commit 81e51c6
Show file tree
Hide file tree
Showing 13 changed files with 132 additions and 116 deletions.
6 changes: 3 additions & 3 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,9 @@ const (
var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
// common scope Names
Common: {
PersistenceCreateShardScope: {operation: "CreateShard"},
PersistenceGetShardScope: {operation: "GetShard"},
PersistenceUpdateShardScope: {operation: "UpdateShard"},
PersistenceCreateShardScope: {operation: "CreateShard", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetShardScope: {operation: "GetShard", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceUpdateShardScope: {operation: "UpdateShard", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceCreateWorkflowExecutionScope: {operation: "CreateWorkflowExecution"},
PersistenceGetWorkflowExecutionScope: {operation: "GetWorkflowExecution"},
PersistenceUpdateWorkflowExecutionScope: {operation: "UpdateWorkflowExecution"},
Expand Down
5 changes: 5 additions & 0 deletions common/mocks/ExecutionManagerFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,8 @@ func (_m *ExecutionManagerFactory) CreateExecutionManager(shardID int) (persiste

return r0, r1
}

// Close is mock implementation for Close of ExecutionManagerFactory
func (_m *ExecutionManagerFactory) Close() {
_m.Called()
}
4 changes: 3 additions & 1 deletion common/persistence/cassandraHistoryPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,16 @@ type (
)

// NewCassandraHistoryPersistence is used to create an instance of HistoryManager implementation
func NewCassandraHistoryPersistence(hosts string, port int, user, password, dc string, keyspace string, logger bark.Logger) (HistoryManager,
func NewCassandraHistoryPersistence(hosts string, port int, user, password, dc string, keyspace string,
numConns int, logger bark.Logger) (HistoryManager,
error) {
cluster := common.NewCassandraCluster(hosts, port, user, password, dc)
cluster.Keyspace = keyspace
cluster.ProtoVersion = cassandraProtoVersion
cluster.Consistency = gocql.LocalQuorum
cluster.SerialConsistency = gocql.LocalSerial
cluster.Timeout = defaultSessionTimeout
cluster.NumConns = numConns

session, err := cluster.CreateSession()
if err != nil {
Expand Down
16 changes: 2 additions & 14 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,20 +531,8 @@ func NewCassandraShardPersistence(hosts string, port int, user, password, dc str
}

// NewCassandraWorkflowExecutionPersistence is used to create an instance of workflowExecutionManager implementation
func NewCassandraWorkflowExecutionPersistence(hosts string, port int, user, password, dc string, keyspace string,
shardID int, logger bark.Logger) (ExecutionManager, error) {
cluster := common.NewCassandraCluster(hosts, port, user, password, dc)
cluster.Keyspace = keyspace
cluster.ProtoVersion = cassandraProtoVersion
cluster.Consistency = gocql.LocalQuorum
cluster.SerialConsistency = gocql.LocalSerial
cluster.Timeout = defaultSessionTimeout

session, err := cluster.CreateSession()
if err != nil {
return nil, err
}

func NewCassandraWorkflowExecutionPersistence(shardID int, session *gocql.Session,
logger bark.Logger) (ExecutionManager, error) {
return &cassandraPersistence{shardID: shardID, session: session, lowConslevel: gocql.One, logger: logger}, nil
}

Expand Down
81 changes: 81 additions & 0 deletions common/persistence/cassandraPersistenceClientFactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package persistence

import (
"github.com/gocql/gocql"
"github.com/uber-common/bark"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/metrics"
)

type (
cassandraPersistenceClientFactory struct {
session *gocql.Session
metricsClient metrics.Client
logger bark.Logger
}
)

// NewCassandraPersistenceClientFactory is used to create an instance of ExecutionManagerFactory implementation
func NewCassandraPersistenceClientFactory(hosts string, port int, user, password, dc string, keyspace string,
numConns int, logger bark.Logger, mClient metrics.Client) (ExecutionManagerFactory, error) {
cluster := common.NewCassandraCluster(hosts, port, user, password, dc)
cluster.Keyspace = keyspace
cluster.ProtoVersion = cassandraProtoVersion
cluster.Consistency = gocql.LocalQuorum
cluster.SerialConsistency = gocql.LocalSerial
cluster.Timeout = defaultSessionTimeout
cluster.NumConns = numConns

session, err := cluster.CreateSession()
if err != nil {
return nil, err
}

return &cassandraPersistenceClientFactory{session: session, metricsClient: mClient, logger: logger}, nil
}

// CreateExecutionManager implements ExecutionManagerFactory interface
func (f *cassandraPersistenceClientFactory) CreateExecutionManager(shardID int) (ExecutionManager, error) {
mgr, err := NewCassandraWorkflowExecutionPersistence(shardID, f.session, f.logger)

if err != nil {
return nil, err
}

if f.metricsClient == nil {
return mgr, nil
}

tags := map[string]string{
metrics.ShardTagName: metrics.AllShardsTagValue,
}
return NewWorkflowExecutionPersistenceClient(
mgr, f.metricsClient.Tagged(tags)), nil
}

// Close releases the underlying resources held by this object
func (f *cassandraPersistenceClientFactory) Close() {
if f.session != nil {
f.session.Close()
}
}
1 change: 1 addition & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ type (

// ExecutionManagerFactory creates an instance of ExecutionManager for a given shard
ExecutionManagerFactory interface {
Closeable
CreateExecutionManager(shardID int) (ExecutionManager, error)
}

Expand Down
23 changes: 6 additions & 17 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,6 @@ type (
}
)

func newTestExecutionMgrFactory(options TestBaseOptions, cassandra CassandraTestCluster,
logger bark.Logger) ExecutionManagerFactory {
return &testExecutionMgrFactory{
options: options,
cassandra: cassandra,
logger: logger,
}
}

func (f *testExecutionMgrFactory) CreateExecutionManager(shardID int) (ExecutionManager, error) {
return NewCassandraWorkflowExecutionPersistence(f.options.ClusterHost, f.options.ClusterPort, f.options.ClusterUser,
f.options.ClusterPassword, f.options.Datacenter, f.cassandra.keyspace,
shardID, f.logger)
}

func (g *testTransferTaskIDGenerator) GetNextTransferTaskID() (int64, error) {
return atomic.AddInt64(&g.seqNum, 1), nil
}
Expand All @@ -128,7 +113,11 @@ func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions) {
if err != nil {
log.Fatal(err)
}
s.ExecutionMgrFactory = newTestExecutionMgrFactory(options, s.CassandraTestCluster, log)
s.ExecutionMgrFactory, err = NewCassandraPersistenceClientFactory(options.ClusterHost, options.ClusterPort,
options.ClusterUser, options.ClusterPassword, options.Datacenter, s.CassandraTestCluster.keyspace, 2, log, nil)
if err != nil {
log.Fatal(err)
}
// Create an ExecutionManager for the shard for use in unit tests
s.WorkflowMgr, err = s.ExecutionMgrFactory.CreateExecutionManager(shardID)
if err != nil {
Expand All @@ -142,7 +131,7 @@ func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions) {
}

s.HistoryMgr, err = NewCassandraHistoryPersistence(options.ClusterHost, options.ClusterPort, options.ClusterUser,
options.ClusterPassword, options.Datacenter, s.CassandraTestCluster.keyspace, log)
options.ClusterPassword, options.Datacenter, s.CassandraTestCluster.keyspace, 2, log)
if err != nil {
log.Fatal(err)
}
Expand Down
5 changes: 4 additions & 1 deletion host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ func (c *cadenceImpl) startHistory(logger bark.Logger, shardMgr persistence.Shar
params.RingpopFactory = newRingpopFactory(common.FrontendServiceName, rpHosts)
params.CassandraConfig.NumHistoryShards = c.numberOfHistoryShards
service := service.New(params)
handler := history.NewHandler(service, history.NewConfig(c.numberOfHistoryShards), shardMgr, metadataMgr,
historyConfig := history.NewConfig(c.numberOfHistoryShards)
historyConfig.HistoryMgrNumConns = c.numberOfHistoryShards
historyConfig.ExecutionMgrNumConns = c.numberOfHistoryShards
handler := history.NewHandler(service, historyConfig, shardMgr, metadataMgr,
visibilityMgr, historyMgr, executionMgrFactory)
handler.Start()
c.historyHandlers = append(c.historyHandlers, handler)
Expand Down
7 changes: 6 additions & 1 deletion service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@ type Config struct {
DefaultVisibilityMaxPageSize int32
DefaultHistoryMaxPageSize int32
RPS int

// Persistence settings
HistoryMgrNumConns int
}

// NewConfig returns new service config with default values
func NewConfig() *Config {
return &Config{
DefaultVisibilityMaxPageSize: 1000,
DefaultHistoryMaxPageSize: 1000,
RPS: 1200, // This limit is based on experimental runs.
RPS: 1200, // This limit is based on experimental runs.
HistoryMgrNumConns: 10,
}
}

Expand Down Expand Up @@ -99,6 +103,7 @@ func (s *Service) Start() {
p.CassandraConfig.Password,
p.CassandraConfig.Datacenter,
p.CassandraConfig.Keyspace,
s.config.HistoryMgrNumConns,
p.Logger)

if err != nil {
Expand Down
70 changes: 0 additions & 70 deletions service/history/execMgrFactory.go

This file was deleted.

1 change: 1 addition & 0 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (h *Handler) Stop() {
h.controller.Stop()
h.shardManager.Close()
h.historyMgr.Close()
h.executionMgrFactory.Close()
h.metadataMgr.Close()
h.visibilityMgr.Close()
h.Service.Stop()
Expand Down
22 changes: 20 additions & 2 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ type Config struct {
TransferProcessorUpdateAckInterval time.Duration
TransferProcessorForceUpdateInterval time.Duration
TransferTaskWorkerCount int

// Persistence settings
ExecutionMgrNumConns int
HistoryMgrNumConns int
}

// NewConfig returns new service config with default values
Expand All @@ -88,6 +92,8 @@ func NewConfig(numberOfShards int) *Config {
TransferProcessorUpdateAckInterval: 10 * time.Second,
TransferProcessorForceUpdateInterval: 10 * time.Minute,
TransferTaskWorkerCount: 10,
ExecutionMgrNumConns: 100,
HistoryMgrNumConns: 100,
}
}

Expand Down Expand Up @@ -181,14 +187,26 @@ func (s *Service) Start() {
p.CassandraConfig.Password,
p.CassandraConfig.Datacenter,
p.CassandraConfig.Keyspace,
s.config.HistoryMgrNumConns,
p.Logger)

if err != nil {
log.Fatalf("Creating Cassandra history manager persistence failed: %v", err)
}

history = persistence.NewHistoryPersistenceClient(history, base.GetMetricsClient())
execMgrFactory := NewExecutionManagerFactory(&p.CassandraConfig, p.Logger, base.GetMetricsClient())

execMgrFactory, err := persistence.NewCassandraPersistenceClientFactory(p.CassandraConfig.Hosts,
p.CassandraConfig.Port,
p.CassandraConfig.User,
p.CassandraConfig.Password,
p.CassandraConfig.Datacenter,
p.CassandraConfig.Keyspace,
s.config.ExecutionMgrNumConns,
p.Logger,
base.GetMetricsClient())
if err != nil {
log.Fatalf("Creating Cassandra execution manager persistence factory failed: %v", err)
}

handler := NewHandler(base,
s.config,
Expand Down
7 changes: 0 additions & 7 deletions service/history/shardController.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,13 +391,6 @@ func (i *historyShardsItem) stopEngine() {
i.engine = nil
logging.LogShardEngineStoppedEvent(i.logger, i.host.Identity(), i.shardID)
}

// Shutting down executionMgr will close all connections
// to cassandra for this engine. So, make sure to
// close executionMgr only after stopping the engine
if i.executionMgr != nil {
i.executionMgr.Close()
}
}

func isShardOwnershiptLostError(err error) bool {
Expand Down

0 comments on commit 81e51c6

Please sign in to comment.