diff --git a/memcache/memcache.go b/memcache/memcache.go index 7b5442d7..b736e1af 100644 --- a/memcache/memcache.go +++ b/memcache/memcache.go @@ -24,8 +24,11 @@ import ( "fmt" "io" "io/ioutil" + "math" "net" + "gopkg.in/fatih/pool.v2" + "strconv" "strings" "sync" @@ -68,8 +71,9 @@ var ( const DefaultTimeout = 100 * time.Millisecond const ( - buffered = 8 // arbitrary buffered channel size, for readability - maxIdleConnsPerAddr = 2 // TODO(bradfitz): make this configurable? + buffered = 8 // arbitrary buffered channel size, for readability + maxIdleConnsPerAddr = 2 // TODO(bradfitz): make this configurable? + maxRetries = 10 // maximum number of times to attempt to reconnect ) // resumableError returns true if err is only a protocol-level cache error. @@ -123,7 +127,7 @@ func New(server ...string) *Client { // NewFromSelector returns a new Client using the provided ServerSelector. func NewFromSelector(ss ServerSelector) *Client { - return &Client{selector: ss} + return &Client{selector: ss, pools: make(map[string]pool.Pool)} } // Client is a memcache client. @@ -135,8 +139,8 @@ type Client struct { selector ServerSelector - lk sync.Mutex - freeconn map[string][]*conn + lk sync.Mutex + pools map[string]pool.Pool } // Item is an item to be got or stored in a memcached server. @@ -162,7 +166,7 @@ type Item struct { // conn is a connection to a server. type conn struct { - nc net.Conn + nc *pool.PoolConn rw *bufio.ReadWriter addr net.Addr c *Client @@ -189,33 +193,25 @@ func (cn *conn) condRelease(err *error) { } } -func (c *Client) putFreeConn(addr net.Addr, cn *conn) { - c.lk.Lock() - defer c.lk.Unlock() - if c.freeconn == nil { - c.freeconn = make(map[string][]*conn) - } - freelist := c.freeconn[addr.String()] - if len(freelist) >= maxIdleConnsPerAddr { - cn.nc.Close() - return +func (c *Client) withAddrPool(addr net.Addr, fn func(pool.Pool)) { + pl := c.pools[addr.String()] + if pl == nil { + c.lk.Lock() + // See if pool is still nil + if c.pools[addr.String()] == nil { + pl, _ = pool.NewChannelPool(1, 10, func() (net.Conn, error) { + return c.dial(addr) + }) + c.pools[addr.String()] = pl + } + c.lk.Unlock() + pl = c.pools[addr.String()] } - c.freeconn[addr.String()] = append(freelist, cn) + fn(pl) } -func (c *Client) getFreeConn(addr net.Addr) (cn *conn, ok bool) { - c.lk.Lock() - defer c.lk.Unlock() - if c.freeconn == nil { - return nil, false - } - freelist, ok := c.freeconn[addr.String()] - if !ok || len(freelist) == 0 { - return nil, false - } - cn = freelist[len(freelist)-1] - c.freeconn[addr.String()] = freelist[:len(freelist)-1] - return cn, true +func (c *Client) putFreeConn(addr net.Addr, cn *conn) { + cn.nc.Close() } func (c *Client) netTimeout() time.Duration { @@ -254,18 +250,16 @@ func (c *Client) dial(addr net.Addr) (net.Conn, error) { return nil, err } -func (c *Client) getConn(addr net.Addr) (*conn, error) { - cn, ok := c.getFreeConn(addr) - if ok { - cn.extendDeadline() - return cn, nil - } - nc, err := c.dial(addr) +func (c *Client) getConn(addr net.Addr) (cn *conn, err error) { + var nc net.Conn + c.withAddrPool(addr, func(pl pool.Pool) { + nc, err = pl.Get() + }) if err != nil { - return nil, err + return } cn = &conn{ - nc: nc, + nc: nc.(*pool.PoolConn), addr: addr, rw: bufio.NewReadWriter(bufio.NewReader(nc), bufio.NewWriter(nc)), c: c, @@ -333,7 +327,25 @@ func (c *Client) withAddrRw(addr net.Addr, fn func(*bufio.ReadWriter) error) (er return err } defer cn.condRelease(&err) - return fn(cn.rw) + // Exponential backoff (based on Ethernet) + retries := 0 + err = fn(cn.rw) + if err == io.EOF { // Bad connection + cn.nc.Close() + for err != nil && retries < maxRetries { + retries++ + backoffCoefficient := int(math.Pow(float64(2), float64(retries))) - 1 + sleepFor := time.Nanosecond * time.Duration(50000*backoffCoefficient) + time.Sleep(sleepFor) + cn, err = c.getConn(addr) + if err != nil { + continue + } + err = fn(cn.rw) + } + } + + return } func (c *Client) withKeyRw(key string, fn func(*bufio.ReadWriter) error) error {