forked from miquella/caddy-awses
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathes_client_factory.go
85 lines (68 loc) · 1.99 KB
/
es_client_factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package awses
import (
"sync"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/elasticsearchservice"
)
type awsregion string
type sessionCache struct {
sess *session.Session
conf *aws.Config
}
// An Elasticsearch client factory with a cache that allows concurrent cached client sharing
type ElasticsearchClientFactory struct {
Role string
mutex sync.RWMutex
clients map[awsregion]sessionCache
}
func NewElasticsearchClientFactory(role string) *ElasticsearchClientFactory {
return &ElasticsearchClientFactory{
Role: role,
}
}
// Returns a new client (does not lock or use the cache)
func (f *ElasticsearchClientFactory) New(region string) sessionCache {
config := &aws.Config{
Region: ®ion,
}
sess := session.Must(session.NewSession(config))
if f.Role != "" {
config.Credentials = stscreds.NewCredentials(sess, f.Role)
}
return sessionCache{
sess: sess,
conf: config,
}
}
// Returns a cached client or instantiates a new client and caches it
func (f *ElasticsearchClientFactory) Get(region string) *elasticsearchservice.ElasticsearchService {
// read lock to check client cache
client, found := f.cached(region)
if found {
return elasticsearchservice.New(client.sess, client.conf)
}
// write lock to construct a new client (if necessary)
f.mutex.Lock()
defer f.mutex.Unlock()
if f.clients == nil {
f.clients = map[awsregion]sessionCache{}
}
client = f.New(region)
f.clients[awsregion(region)] = client
return elasticsearchservice.New(client.sess, client.conf)
}
func (f *ElasticsearchClientFactory) cached(region string) (sc sessionCache, found bool) {
f.mutex.RLock()
defer f.mutex.RUnlock()
if f.clients == nil {
return sessionCache{}, false
}
sc = f.clients[awsregion(region)]
if sc.conf != nil && sc.conf.Credentials != nil && sc.conf.Credentials.IsExpired() {
delete(f.clients, awsregion(region))
return sessionCache{}, false
}
return sc, true
}