Skip to content

Commit

Permalink
Integration with Docker Discovery
Browse files Browse the repository at this point in the history
* integrated hostdiscovery package with the new Docker Discovery
* Integrated hostdiscovery package with libnetwork core
* removed libnetwork_discovery tag
* Introduced driver apis for discovery events
* moved overlay driver to make use of the discovery events
* Using Docker Discovery service.
* Changed integration-tests to make use of the new discovery

Signed-off-by: Madhu Venugopal <[email protected]>
  • Loading branch information
mavenugo committed Oct 1, 2015
1 parent 768e582 commit 8d03e80
Show file tree
Hide file tree
Showing 25 changed files with 477 additions and 260 deletions.
63 changes: 63 additions & 0 deletions cmd/dnet/dnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ package main
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/codegangsta/cli"
"github.com/docker/docker/pkg/discovery"
"github.com/docker/docker/pkg/parsers"
"github.com/docker/docker/pkg/reexec"

Expand All @@ -37,6 +41,8 @@ const (
DefaultUnixSocket = "/var/run/dnet.sock"
cfgFileEnv = "LIBNETWORK_CFG"
defaultCfgFile = "/etc/default/libnetwork.toml"
defaultHeartbeat = time.Duration(10) * time.Second
ttlFactor = 2
)

var epConn *dnetConnection
Expand Down Expand Up @@ -91,9 +97,66 @@ func processConfig(cfg *config.Config) []config.Option {
if strings.TrimSpace(cfg.GlobalStore.Client.Address) != "" {
options = append(options, config.OptionKVProviderURL(cfg.GlobalStore.Client.Address))
}
dOptions, err := startDiscovery(&cfg.Cluster)
if err != nil {
logrus.Infof("Skipping discovery : %s", err.Error())
} else {
options = append(options, dOptions...)
}

return options
}

func startDiscovery(cfg *config.ClusterCfg) ([]config.Option, error) {
if cfg == nil {
return nil, fmt.Errorf("discovery requires a valid configuration")
}

hb := time.Duration(cfg.Heartbeat) * time.Second
if hb == 0 {
hb = defaultHeartbeat
}
logrus.Infof("discovery : %s $s", cfg.Discovery, hb.String())
d, err := discovery.New(cfg.Discovery, hb, ttlFactor*hb)
if err != nil {
return nil, err
}

if cfg.Address == "" {
iface, err := net.InterfaceByName("eth0")
if err != nil {
return nil, err
}
addrs, err := iface.Addrs()
if err != nil || len(addrs) == 0 {
return nil, err
}
ip, _, _ := net.ParseCIDR(addrs[0].String())
cfg.Address = ip.String()
}

if ip := net.ParseIP(cfg.Address); ip == nil {
return nil, errors.New("address config should be either ipv4 or ipv6 address")
}

if err := d.Register(cfg.Address + ":0"); err != nil {
return nil, err
}

options := []config.Option{config.OptionDiscoveryWatcher(d), config.OptionDiscoveryAddress(cfg.Address)}
go func() {
for {
select {
case <-time.After(hb):
if err := d.Register(cfg.Address + ":0"); err != nil {
logrus.Warn(err)
}
}
}
}()
return options, nil
}

func dnetApp(stdout, stderr io.Writer) error {
app := cli.NewApp()

Expand Down
3 changes: 2 additions & 1 deletion cmd/dnet/libnetwork.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ title = "LibNetwork Configuration file"
[daemon]
debug = false
[cluster]
discovery = "token://22aa23948f4f6b31230687689636959e"
discovery = "consul://localhost:8500"
Address = "1.1.1.1"
Heartbeat = 20
[datastore]
embedded = false
[datastore.client]
Expand Down
18 changes: 17 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/BurntSushi/toml"
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/discovery"
"github.com/docker/libkv/store"
"github.com/docker/libnetwork/netlabel"
)
Expand All @@ -27,8 +28,9 @@ type DaemonCfg struct {

// ClusterCfg represents cluster configuration
type ClusterCfg struct {
Discovery string
Watcher discovery.Watcher
Address string
Discovery string
Heartbeat uint64
}

Expand Down Expand Up @@ -108,6 +110,20 @@ func OptionKVProviderURL(url string) Option {
}
}

// OptionDiscoveryWatcher function returns an option setter for discovery watcher
func OptionDiscoveryWatcher(watcher discovery.Watcher) Option {
return func(c *Config) {
c.Cluster.Watcher = watcher
}
}

// OptionDiscoveryAddress function returns an option setter for self discovery address
func OptionDiscoveryAddress(address string) Option {
return func(c *Config) {
c.Cluster.Address = address
}
}

// ProcessOptions processes options and stores it in config
func (c *Config) ProcessOptions(options ...Option) {
for _, opt := range options {
Expand Down
71 changes: 64 additions & 7 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ import (
"container/heap"
"fmt"
"net"
"strings"
"sync"

log "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/discovery"
"github.com/docker/docker/pkg/plugins"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/libnetwork/config"
Expand Down Expand Up @@ -126,6 +128,7 @@ type controller struct {
sandboxes sandboxTable
cfg *config.Config
globalStore, localStore datastore.DataStore
discovery hostdiscovery.HostDiscovery
extKeyListener net.Listener
sync.Mutex
}
Expand Down Expand Up @@ -157,7 +160,7 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
// But it cannot fail creating the Controller
log.Debugf("Failed to Initialize Datastore due to %v. Operating in non-clustered mode", err)
}
if err := c.initDiscovery(); err != nil {
if err := c.initDiscovery(cfg.Cluster.Watcher); err != nil {
// Failing to initalize discovery is a bad situation to be in.
// But it cannot fail creating the Controller
log.Debugf("Failed to Initialize Discovery : %v", err)
Expand Down Expand Up @@ -185,19 +188,57 @@ func (c *controller) validateHostDiscoveryConfig() bool {
return true
}

func (c *controller) initDiscovery() error {
func (c *controller) initDiscovery(watcher discovery.Watcher) error {
if c.cfg == nil {
return fmt.Errorf("discovery initialization requires a valid configuration")
}

hostDiscovery := hostdiscovery.NewHostDiscovery()
return hostDiscovery.StartDiscovery(&c.cfg.Cluster, c.hostJoinCallback, c.hostLeaveCallback)
c.discovery = hostdiscovery.NewHostDiscovery(watcher)
return c.discovery.Watch(c.hostJoinCallback, c.hostLeaveCallback)
}

func (c *controller) hostJoinCallback(hosts []net.IP) {
func (c *controller) hostJoinCallback(nodes []net.IP) {
c.processNodeDiscovery(nodes, true)
}

func (c *controller) hostLeaveCallback(hosts []net.IP) {
func (c *controller) hostLeaveCallback(nodes []net.IP) {
c.processNodeDiscovery(nodes, false)
}

func (c *controller) processNodeDiscovery(nodes []net.IP, add bool) {
c.Lock()
drivers := []*driverData{}
for _, d := range c.drivers {
drivers = append(drivers, d)
}
c.Unlock()

for _, d := range drivers {
c.pushNodeDiscovery(d, nodes, add)
}
}

func (c *controller) pushNodeDiscovery(d *driverData, nodes []net.IP, add bool) {
var self net.IP
if c.cfg != nil {
addr := strings.Split(c.cfg.Cluster.Address, ":")
self = net.ParseIP(addr[0])
}
if d == nil || d.capability.DataScope != datastore.GlobalScope || nodes == nil {
return
}
for _, node := range nodes {
nodeData := driverapi.NodeDiscoveryData{Address: node.String(), Self: node.Equal(self)}
var err error
if add {
err = d.driver.DiscoverNew(driverapi.NodeDiscovery, nodeData)
} else {
err = d.driver.DiscoverDelete(driverapi.NodeDiscovery, nodeData)
}
if err != nil {
log.Debugf("discovery notification error : %v", err)
}
}
}

func (c *controller) Config() config.Config {
Expand All @@ -219,9 +260,15 @@ func (c *controller) RegisterDriver(networkType string, driver driverapi.Driver,
c.Unlock()
return driverapi.ErrActiveRegistration(networkType)
}
c.drivers[networkType] = &driverData{driver, capability}
dData := &driverData{driver, capability}
c.drivers[networkType] = dData
hd := c.discovery
c.Unlock()

if hd != nil {
c.pushNodeDiscovery(dData, hd.Fetch(), true)
}

return nil
}

Expand Down Expand Up @@ -487,6 +534,16 @@ func (c *controller) loadDriver(networkType string) (*driverData, error) {
return dd, nil
}

func (c *controller) getDriver(networkType string) (*driverData, error) {
c.Lock()
defer c.Unlock()
dd, ok := c.drivers[networkType]
if !ok {
return nil, types.NotFoundErrorf("driver %s not found", networkType)
}
return dd, nil
}

func (c *controller) Stop() {
if c.localStore != nil {
c.localStore.KVStore().Close()
Expand Down
62 changes: 62 additions & 0 deletions docs/remote.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,65 @@ If the proxy is asked to remove an endpoint from a sandbox, the remote process s
where `NetworkID` and `EndpointID` have meanings as above. The success response is empty:

{}

### DiscoverNew Notification

libnetwork listens to inbuilt docker discovery notifications and passes it along to the interested drivers.

When the proxy receives a DiscoverNew notification, the remote process shall receive a POST to the URL `/NetworkDriver.DiscoverNew` of the form

{
"DiscoveryType": int,
"DiscoveryData": {
...
}
}

`DiscoveryType` represents the discovery type. Each Discovery Type is represented by a number.
`DiscoveryData` carries discovery data the structure of which is determined by the DiscoveryType

The response indicating success is empty:

`{}`

* Node Discovery

Node Discovery is represented by a `DiscoveryType` value of `1` and the corresponding `DiscoveryData` will carry Node discovery data.

{
"DiscoveryType": int,
"DiscoveryData": {
"Address" : string
"self" : bool
}
}

### DiscoverDelete Notification

When the proxy receives a DiscoverDelete notification, the remote process shall receive a POST to the URL `/NetworkDriver.DiscoverDelete` of the form

{
"DiscoveryType": int,
"DiscoveryData": {
...
}
}

`DiscoveryType` represents the discovery type. Each Discovery Type is represented by a number.
`DiscoveryData` carries discovery data the structure of which is determined by the DiscoveryType

The response indicating success is empty:

`{}`

* Node Discovery

Similar to the DiscoverNew call, Node Discovery is represented by a `DiscoveryType` value of `1` and the corresponding `DiscoveryData` will carry Node discovery data to be delted.

{
"DiscoveryType": int,
"DiscoveryData": {
"Address" : string
"self" : bool
}
}
20 changes: 20 additions & 0 deletions driverapi/driverapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ type Driver interface {
// Leave method is invoked when a Sandbox detaches from an endpoint.
Leave(nid, eid string) error

// DiscoverNew is a notification for a new discovery event, Example:a new node joining a cluster
DiscoverNew(dType DiscoveryType, data interface{}) error

// DiscoverDelete is a notification for a discovery delete event, Example:a node leaving a cluster
DiscoverDelete(dType DiscoveryType, data interface{}) error

// Type returns the the type of this driver, the network type this driver manages
Type() string
}
Expand Down Expand Up @@ -106,3 +112,17 @@ type DriverCallback interface {
type Capability struct {
DataScope datastore.DataScope
}

// DiscoveryType represents the type of discovery element the DiscoverNew function is invoked on
type DiscoveryType int

const (
// NodeDiscovery represents Node join/leave events provided by discovery
NodeDiscovery = iota + 1
)

// NodeDiscoveryData represents the structure backing the node discovery data json string
type NodeDiscoveryData struct {
Address string
Self bool
}
10 changes: 10 additions & 0 deletions drivers/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,16 @@ func (d *driver) Type() string {
return networkType
}

// DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster
func (d *driver) DiscoverNew(dType driverapi.DiscoveryType, data interface{}) error {
return nil
}

// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster
func (d *driver) DiscoverDelete(dType driverapi.DiscoveryType, data interface{}) error {
return nil
}

func parseEndpointOptions(epOptions map[string]interface{}) (*endpointConfiguration, error) {
if epOptions == nil {
return nil, nil
Expand Down
Loading

0 comments on commit 8d03e80

Please sign in to comment.