From 8d03e80f21c2f21a792efbd49509f487da0d89cc Mon Sep 17 00:00:00 2001 From: Madhu Venugopal Date: Fri, 18 Sep 2015 12:54:08 -0700 Subject: [PATCH] Integration with Docker Discovery * integrated hostdiscovery package with the new Docker Discovery * Integrated hostdiscovery package with libnetwork core * removed libnetwork_discovery tag * Introduced driver apis for discovery events * moved overlay driver to make use of the discovery events * Using Docker Discovery service. * Changed integration-tests to make use of the new discovery Signed-off-by: Madhu Venugopal --- cmd/dnet/dnet.go | 63 +++++++++++++ cmd/dnet/libnetwork.toml | 3 +- config/config.go | 18 +++- controller.go | 71 ++++++++++++-- docs/remote.md | 62 ++++++++++++ driverapi/driverapi.go | 20 ++++ drivers/bridge/bridge.go | 10 ++ drivers/host/host.go | 10 ++ drivers/null/null.go | 10 ++ drivers/overlay/joinleave.go | 3 +- drivers/overlay/ov_network.go | 35 ++++--- drivers/overlay/ov_serf.go | 64 +++++-------- drivers/overlay/overlay.go | 69 ++++++++++---- drivers/overlay/overlay_test.go | 20 +++- drivers/remote/api/api.go | 17 +++- drivers/remote/driver.go | 24 +++++ drivers/remote/driver_test.go | 16 ++++ drivers/windows/windows.go | 10 ++ hostdiscovery/hostdiscovery.go | 94 ++++++------------- hostdiscovery/hostdiscovery_api.go | 12 +-- hostdiscovery/hostdiscovery_disabled.go | 28 ------ hostdiscovery/hostdiscovery_test.go | 67 +------------ hostdiscovery/libnetwork.toml | 4 +- test/integration/dnet/helpers.bash | 3 + .../integration/dnet/run-integration-tests.sh | 4 +- 25 files changed, 477 insertions(+), 260 deletions(-) delete mode 100644 hostdiscovery/hostdiscovery_disabled.go diff --git a/cmd/dnet/dnet.go b/cmd/dnet/dnet.go index dad5c82c15..738081bb98 100644 --- a/cmd/dnet/dnet.go +++ b/cmd/dnet/dnet.go @@ -3,17 +3,21 @@ package main import ( "bytes" "encoding/json" + "errors" "fmt" "io" "io/ioutil" + "net" "net/http" "net/http/httptest" "os" "os/signal" "strings" "syscall" + "time" "github.com/codegangsta/cli" + "github.com/docker/docker/pkg/discovery" "github.com/docker/docker/pkg/parsers" "github.com/docker/docker/pkg/reexec" @@ -37,6 +41,8 @@ const ( DefaultUnixSocket = "/var/run/dnet.sock" cfgFileEnv = "LIBNETWORK_CFG" defaultCfgFile = "/etc/default/libnetwork.toml" + defaultHeartbeat = time.Duration(10) * time.Second + ttlFactor = 2 ) var epConn *dnetConnection @@ -91,9 +97,66 @@ func processConfig(cfg *config.Config) []config.Option { if strings.TrimSpace(cfg.GlobalStore.Client.Address) != "" { options = append(options, config.OptionKVProviderURL(cfg.GlobalStore.Client.Address)) } + dOptions, err := startDiscovery(&cfg.Cluster) + if err != nil { + logrus.Infof("Skipping discovery : %s", err.Error()) + } else { + options = append(options, dOptions...) + } + return options } +func startDiscovery(cfg *config.ClusterCfg) ([]config.Option, error) { + if cfg == nil { + return nil, fmt.Errorf("discovery requires a valid configuration") + } + + hb := time.Duration(cfg.Heartbeat) * time.Second + if hb == 0 { + hb = defaultHeartbeat + } + logrus.Infof("discovery : %s $s", cfg.Discovery, hb.String()) + d, err := discovery.New(cfg.Discovery, hb, ttlFactor*hb) + if err != nil { + return nil, err + } + + if cfg.Address == "" { + iface, err := net.InterfaceByName("eth0") + if err != nil { + return nil, err + } + addrs, err := iface.Addrs() + if err != nil || len(addrs) == 0 { + return nil, err + } + ip, _, _ := net.ParseCIDR(addrs[0].String()) + cfg.Address = ip.String() + } + + if ip := net.ParseIP(cfg.Address); ip == nil { + return nil, errors.New("address config should be either ipv4 or ipv6 address") + } + + if err := d.Register(cfg.Address + ":0"); err != nil { + return nil, err + } + + options := []config.Option{config.OptionDiscoveryWatcher(d), config.OptionDiscoveryAddress(cfg.Address)} + go func() { + for { + select { + case <-time.After(hb): + if err := d.Register(cfg.Address + ":0"); err != nil { + logrus.Warn(err) + } + } + } + }() + return options, nil +} + func dnetApp(stdout, stderr io.Writer) error { app := cli.NewApp() diff --git a/cmd/dnet/libnetwork.toml b/cmd/dnet/libnetwork.toml index 4e22516d13..2033ae6158 100755 --- a/cmd/dnet/libnetwork.toml +++ b/cmd/dnet/libnetwork.toml @@ -3,8 +3,9 @@ title = "LibNetwork Configuration file" [daemon] debug = false [cluster] - discovery = "token://22aa23948f4f6b31230687689636959e" + discovery = "consul://localhost:8500" Address = "1.1.1.1" + Heartbeat = 20 [datastore] embedded = false [datastore.client] diff --git a/config/config.go b/config/config.go index 3163800174..96c8dab677 100644 --- a/config/config.go +++ b/config/config.go @@ -5,6 +5,7 @@ import ( "github.com/BurntSushi/toml" log "github.com/Sirupsen/logrus" + "github.com/docker/docker/pkg/discovery" "github.com/docker/libkv/store" "github.com/docker/libnetwork/netlabel" ) @@ -27,8 +28,9 @@ type DaemonCfg struct { // ClusterCfg represents cluster configuration type ClusterCfg struct { - Discovery string + Watcher discovery.Watcher Address string + Discovery string Heartbeat uint64 } @@ -108,6 +110,20 @@ func OptionKVProviderURL(url string) Option { } } +// OptionDiscoveryWatcher function returns an option setter for discovery watcher +func OptionDiscoveryWatcher(watcher discovery.Watcher) Option { + return func(c *Config) { + c.Cluster.Watcher = watcher + } +} + +// OptionDiscoveryAddress function returns an option setter for self discovery address +func OptionDiscoveryAddress(address string) Option { + return func(c *Config) { + c.Cluster.Address = address + } +} + // ProcessOptions processes options and stores it in config func (c *Config) ProcessOptions(options ...Option) { for _, opt := range options { diff --git a/controller.go b/controller.go index a646281e7c..9a364c5413 100644 --- a/controller.go +++ b/controller.go @@ -47,9 +47,11 @@ import ( "container/heap" "fmt" "net" + "strings" "sync" log "github.com/Sirupsen/logrus" + "github.com/docker/docker/pkg/discovery" "github.com/docker/docker/pkg/plugins" "github.com/docker/docker/pkg/stringid" "github.com/docker/libnetwork/config" @@ -126,6 +128,7 @@ type controller struct { sandboxes sandboxTable cfg *config.Config globalStore, localStore datastore.DataStore + discovery hostdiscovery.HostDiscovery extKeyListener net.Listener sync.Mutex } @@ -157,7 +160,7 @@ func New(cfgOptions ...config.Option) (NetworkController, error) { // But it cannot fail creating the Controller log.Debugf("Failed to Initialize Datastore due to %v. Operating in non-clustered mode", err) } - if err := c.initDiscovery(); err != nil { + if err := c.initDiscovery(cfg.Cluster.Watcher); err != nil { // Failing to initalize discovery is a bad situation to be in. // But it cannot fail creating the Controller log.Debugf("Failed to Initialize Discovery : %v", err) @@ -185,19 +188,57 @@ func (c *controller) validateHostDiscoveryConfig() bool { return true } -func (c *controller) initDiscovery() error { +func (c *controller) initDiscovery(watcher discovery.Watcher) error { if c.cfg == nil { return fmt.Errorf("discovery initialization requires a valid configuration") } - hostDiscovery := hostdiscovery.NewHostDiscovery() - return hostDiscovery.StartDiscovery(&c.cfg.Cluster, c.hostJoinCallback, c.hostLeaveCallback) + c.discovery = hostdiscovery.NewHostDiscovery(watcher) + return c.discovery.Watch(c.hostJoinCallback, c.hostLeaveCallback) } -func (c *controller) hostJoinCallback(hosts []net.IP) { +func (c *controller) hostJoinCallback(nodes []net.IP) { + c.processNodeDiscovery(nodes, true) } -func (c *controller) hostLeaveCallback(hosts []net.IP) { +func (c *controller) hostLeaveCallback(nodes []net.IP) { + c.processNodeDiscovery(nodes, false) +} + +func (c *controller) processNodeDiscovery(nodes []net.IP, add bool) { + c.Lock() + drivers := []*driverData{} + for _, d := range c.drivers { + drivers = append(drivers, d) + } + c.Unlock() + + for _, d := range drivers { + c.pushNodeDiscovery(d, nodes, add) + } +} + +func (c *controller) pushNodeDiscovery(d *driverData, nodes []net.IP, add bool) { + var self net.IP + if c.cfg != nil { + addr := strings.Split(c.cfg.Cluster.Address, ":") + self = net.ParseIP(addr[0]) + } + if d == nil || d.capability.DataScope != datastore.GlobalScope || nodes == nil { + return + } + for _, node := range nodes { + nodeData := driverapi.NodeDiscoveryData{Address: node.String(), Self: node.Equal(self)} + var err error + if add { + err = d.driver.DiscoverNew(driverapi.NodeDiscovery, nodeData) + } else { + err = d.driver.DiscoverDelete(driverapi.NodeDiscovery, nodeData) + } + if err != nil { + log.Debugf("discovery notification error : %v", err) + } + } } func (c *controller) Config() config.Config { @@ -219,9 +260,15 @@ func (c *controller) RegisterDriver(networkType string, driver driverapi.Driver, c.Unlock() return driverapi.ErrActiveRegistration(networkType) } - c.drivers[networkType] = &driverData{driver, capability} + dData := &driverData{driver, capability} + c.drivers[networkType] = dData + hd := c.discovery c.Unlock() + if hd != nil { + c.pushNodeDiscovery(dData, hd.Fetch(), true) + } + return nil } @@ -487,6 +534,16 @@ func (c *controller) loadDriver(networkType string) (*driverData, error) { return dd, nil } +func (c *controller) getDriver(networkType string) (*driverData, error) { + c.Lock() + defer c.Unlock() + dd, ok := c.drivers[networkType] + if !ok { + return nil, types.NotFoundErrorf("driver %s not found", networkType) + } + return dd, nil +} + func (c *controller) Stop() { if c.localStore != nil { c.localStore.KVStore().Close() diff --git a/docs/remote.md b/docs/remote.md index 1987c862db..cfd2023ee8 100644 --- a/docs/remote.md +++ b/docs/remote.md @@ -204,3 +204,65 @@ If the proxy is asked to remove an endpoint from a sandbox, the remote process s where `NetworkID` and `EndpointID` have meanings as above. The success response is empty: {} + +### DiscoverNew Notification + +libnetwork listens to inbuilt docker discovery notifications and passes it along to the interested drivers. + +When the proxy receives a DiscoverNew notification, the remote process shall receive a POST to the URL `/NetworkDriver.DiscoverNew` of the form + + { + "DiscoveryType": int, + "DiscoveryData": { + ... + } + } + +`DiscoveryType` represents the discovery type. Each Discovery Type is represented by a number. +`DiscoveryData` carries discovery data the structure of which is determined by the DiscoveryType + +The response indicating success is empty: + + `{}` + +* Node Discovery + +Node Discovery is represented by a `DiscoveryType` value of `1` and the corresponding `DiscoveryData` will carry Node discovery data. + + { + "DiscoveryType": int, + "DiscoveryData": { + "Address" : string + "self" : bool + } + } + +### DiscoverDelete Notification + +When the proxy receives a DiscoverDelete notification, the remote process shall receive a POST to the URL `/NetworkDriver.DiscoverDelete` of the form + + { + "DiscoveryType": int, + "DiscoveryData": { + ... + } + } + +`DiscoveryType` represents the discovery type. Each Discovery Type is represented by a number. +`DiscoveryData` carries discovery data the structure of which is determined by the DiscoveryType + +The response indicating success is empty: + + `{}` + +* Node Discovery + +Similar to the DiscoverNew call, Node Discovery is represented by a `DiscoveryType` value of `1` and the corresponding `DiscoveryData` will carry Node discovery data to be delted. + + { + "DiscoveryType": int, + "DiscoveryData": { + "Address" : string + "self" : bool + } + } diff --git a/driverapi/driverapi.go b/driverapi/driverapi.go index 054442e753..1661c3672f 100644 --- a/driverapi/driverapi.go +++ b/driverapi/driverapi.go @@ -40,6 +40,12 @@ type Driver interface { // Leave method is invoked when a Sandbox detaches from an endpoint. Leave(nid, eid string) error + // DiscoverNew is a notification for a new discovery event, Example:a new node joining a cluster + DiscoverNew(dType DiscoveryType, data interface{}) error + + // DiscoverDelete is a notification for a discovery delete event, Example:a node leaving a cluster + DiscoverDelete(dType DiscoveryType, data interface{}) error + // Type returns the the type of this driver, the network type this driver manages Type() string } @@ -106,3 +112,17 @@ type DriverCallback interface { type Capability struct { DataScope datastore.DataScope } + +// DiscoveryType represents the type of discovery element the DiscoverNew function is invoked on +type DiscoveryType int + +const ( + // NodeDiscovery represents Node join/leave events provided by discovery + NodeDiscovery = iota + 1 +) + +// NodeDiscoveryData represents the structure backing the node discovery data json string +type NodeDiscoveryData struct { + Address string + Self bool +} diff --git a/drivers/bridge/bridge.go b/drivers/bridge/bridge.go index 87380101dd..71cc0e7fce 100644 --- a/drivers/bridge/bridge.go +++ b/drivers/bridge/bridge.go @@ -1375,6 +1375,16 @@ func (d *driver) Type() string { return networkType } +// DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster +func (d *driver) DiscoverNew(dType driverapi.DiscoveryType, data interface{}) error { + return nil +} + +// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster +func (d *driver) DiscoverDelete(dType driverapi.DiscoveryType, data interface{}) error { + return nil +} + func parseEndpointOptions(epOptions map[string]interface{}) (*endpointConfiguration, error) { if epOptions == nil { return nil, nil diff --git a/drivers/host/host.go b/drivers/host/host.go index 747bdc62c9..2549ed866e 100644 --- a/drivers/host/host.go +++ b/drivers/host/host.go @@ -65,3 +65,13 @@ func (d *driver) Leave(nid, eid string) error { func (d *driver) Type() string { return networkType } + +// DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster +func (d *driver) DiscoverNew(dType driverapi.DiscoveryType, data interface{}) error { + return nil +} + +// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster +func (d *driver) DiscoverDelete(dType driverapi.DiscoveryType, data interface{}) error { + return nil +} diff --git a/drivers/null/null.go b/drivers/null/null.go index 6f472e78a4..670fc68672 100644 --- a/drivers/null/null.go +++ b/drivers/null/null.go @@ -65,3 +65,13 @@ func (d *driver) Leave(nid, eid string) error { func (d *driver) Type() string { return networkType } + +// DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster +func (d *driver) DiscoverNew(dType driverapi.DiscoveryType, data interface{}) error { + return nil +} + +// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster +func (d *driver) DiscoverDelete(dType driverapi.DiscoveryType, data interface{}) error { + return nil +} diff --git a/drivers/overlay/joinleave.go b/drivers/overlay/joinleave.go index c563b59a69..f690492550 100644 --- a/drivers/overlay/joinleave.go +++ b/drivers/overlay/joinleave.go @@ -2,6 +2,7 @@ package overlay import ( "fmt" + "net" "github.com/docker/libnetwork/driverapi" "github.com/vishvananda/netlink" @@ -73,7 +74,7 @@ func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo, } d.peerDbAdd(nid, eid, ep.addr.IP, ep.mac, - d.serfInstance.LocalMember().Addr, true) + net.ParseIP(d.bindAddress), true) d.notifyCh <- ovNotify{ action: "join", nid: nid, diff --git a/drivers/overlay/ov_network.go b/drivers/overlay/ov_network.go index 72a97670f4..9e5f9d8909 100644 --- a/drivers/overlay/ov_network.go +++ b/drivers/overlay/ov_network.go @@ -156,23 +156,8 @@ func (n *network) initSandbox() error { return fmt.Errorf("could not create bridge inside the network sandbox: %v", err) } - vxlanName, err := createVxlan(n.vxlanID()) - if err != nil { - return err - } - - if err := sbox.AddInterface(vxlanName, "vxlan", - sbox.InterfaceOptions().Master("bridge1")); err != nil { - return fmt.Errorf("could not add vxlan interface inside the network sandbox: %v", - err) - } - - n.vxlanName = vxlanName - n.setSandbox(sbox) - n.driver.peerDbUpdateSandbox(n.id) - var nlSock *nl.NetlinkSocket sbox.InvokeFunc(func() { nlSock, err = nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_NEIGH) @@ -182,7 +167,27 @@ func (n *network) initSandbox() error { }) go n.watchMiss(nlSock) + return n.initVxlan() +} +func (n *network) initVxlan() error { + var vxlanName string + n.Lock() + sbox := n.sbox + n.Unlock() + + vxlanName, err := createVxlan(n.vxlanID()) + if err != nil { + return err + } + + if err = sbox.AddInterface(vxlanName, "vxlan", + sbox.InterfaceOptions().Master("bridge1")); err != nil { + return fmt.Errorf("could not add vxlan interface inside the network sandbox: %v", err) + } + + n.vxlanName = vxlanName + n.driver.peerDbUpdateSandbox(n.id) return nil } diff --git a/drivers/overlay/ov_serf.go b/drivers/overlay/ov_serf.go index 453ef2c912..cac97ae901 100644 --- a/drivers/overlay/ov_serf.go +++ b/drivers/overlay/ov_serf.go @@ -35,46 +35,12 @@ func (l *logWriter) Write(p []byte) (int, error) { return len(p), nil } -func getBindAddr(ifaceName string) (string, error) { - iface, err := net.InterfaceByName(ifaceName) - if err != nil { - return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err) - } - - addrs, err := iface.Addrs() - if err != nil { - return "", fmt.Errorf("failed to get interface addresses: %v", err) - } - - for _, a := range addrs { - addr, ok := a.(*net.IPNet) - if !ok { - continue - } - addrIP := addr.IP - - if addrIP.IsLinkLocalUnicast() { - continue - } - - return addrIP.String(), nil - } - - return "", fmt.Errorf("failed to get bind address") -} - func (d *driver) serfInit() error { var err error config := serf.DefaultConfig() config.Init() - if d.ifaceName != "" { - bindAddr, err := getBindAddr(d.ifaceName) - if err != nil { - return fmt.Errorf("getBindAddr error: %v", err) - } - config.MemberlistConfig.BindAddr = bindAddr - } + config.MemberlistConfig.BindAddr = d.bindAddress d.eventCh = make(chan serf.Event, 4) config.EventCh = d.eventCh @@ -93,13 +59,6 @@ func (d *driver) serfInit() error { } }() - if d.neighIP != "" { - if _, err = s.Join([]string{d.neighIP}, false); err != nil { - return fmt.Errorf("Failed to join the cluster at neigh IP %s: %v", - d.neighIP, err) - } - } - d.serfInstance = s d.notifyCh = make(chan ovNotify) @@ -109,6 +68,17 @@ func (d *driver) serfInit() error { return nil } +func (d *driver) serfJoin() error { + if d.neighIP == "" { + return fmt.Errorf("no neighbor to join") + } + if _, err := d.serfInstance.Join([]string{d.neighIP}, false); err != nil { + return fmt.Errorf("Failed to join the cluster at neigh IP %s: %v", + d.neighIP, err) + } + return nil +} + func (d *driver) notifyEvent(event ovNotify) { n := d.network(event.nid) ep := n.endpoint(event.eid) @@ -246,3 +216,13 @@ func (d *driver) startSerfLoop(eventCh chan serf.Event, notifyCh chan ovNotify, } } } + +func (d *driver) isSerfAlive() bool { + d.Lock() + serfInstance := d.serfInstance + d.Unlock() + if serfInstance == nil || serfInstance.State() != serf.SerfAlive { + return false + } + return true +} diff --git a/drivers/overlay/overlay.go b/drivers/overlay/overlay.go index 900e370487..ff29c8d399 100644 --- a/drivers/overlay/overlay.go +++ b/drivers/overlay/overlay.go @@ -6,6 +6,7 @@ import ( "net" "sync" + "github.com/Sirupsen/logrus" "github.com/docker/libkv/store" "github.com/docker/libnetwork/config" "github.com/docker/libnetwork/datastore" @@ -29,7 +30,7 @@ type driver struct { eventCh chan serf.Event notifyCh chan ovNotify exitCh chan chan struct{} - ifaceName string + bindAddress string neighIP string config map[string]interface{} peerDb peerNetworkMap @@ -38,7 +39,8 @@ type driver struct { store datastore.DataStore ipAllocator *idm.Idm vxlanIdm *idm.Idm - sync.Once + once sync.Once + joinOnce sync.Once sync.Mutex } @@ -107,15 +109,7 @@ func (d *driver) configure() error { return nil } - d.Do(func() { - if ifaceName, ok := d.config[netlabel.OverlayBindInterface]; ok { - d.ifaceName = ifaceName.(string) - } - - if neighIP, ok := d.config[netlabel.OverlayNeighborIP]; ok { - d.neighIP = neighIP.(string) - } - + d.once.Do(func() { provider, provOk := d.config[netlabel.KVProvider] provURL, urlOk := d.config[netlabel.KVProviderURL] @@ -148,12 +142,6 @@ func (d *driver) configure() error { err = fmt.Errorf("failed to initalize ipam id manager: %v", err) return } - - err = d.serfInit() - if err != nil { - err = fmt.Errorf("initializing serf instance failed: %v", err) - } - }) return err @@ -162,3 +150,50 @@ func (d *driver) configure() error { func (d *driver) Type() string { return networkType } + +func (d *driver) nodeJoin(node string, self bool) { + if self && node != "" && !d.isSerfAlive() { + d.Lock() + d.bindAddress = node + d.Unlock() + err := d.serfInit() + if err != nil { + logrus.Errorf("initializing serf instance failed: %v", err) + return + } + } + + if d.serfInstance != nil && !self && node != "" { + var err error + d.joinOnce.Do(func() { + d.Lock() + d.neighIP = node + d.Unlock() + err = d.serfJoin() + }) + if err != nil { + logrus.Errorf("joining serf neighbor %s failed: %v", node, err) + d.Lock() + d.joinOnce = sync.Once{} + d.Unlock() + return + } + } +} + +// DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster +func (d *driver) DiscoverNew(dType driverapi.DiscoveryType, data interface{}) error { + if dType == driverapi.NodeDiscovery { + nodeData, ok := data.(driverapi.NodeDiscoveryData) + if !ok { + return fmt.Errorf("invalid discovery data") + } + d.nodeJoin(nodeData.Address, nodeData.Self) + } + return nil +} + +// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster +func (d *driver) DiscoverDelete(dType driverapi.DiscoveryType, data interface{}) error { + return nil +} diff --git a/drivers/overlay/overlay_test.go b/drivers/overlay/overlay_test.go index 2cfc4d6fb4..36d1943736 100644 --- a/drivers/overlay/overlay_test.go +++ b/drivers/overlay/overlay_test.go @@ -1,11 +1,11 @@ package overlay import ( + "net" "testing" "time" "github.com/docker/libnetwork/driverapi" - "github.com/docker/libnetwork/netlabel" _ "github.com/docker/libnetwork/testutils" ) @@ -17,16 +17,28 @@ type driverTester struct { const testNetworkType = "overlay" func setupDriver(t *testing.T) *driverTester { - opt := make(map[string]interface{}) - opt[netlabel.OverlayBindInterface] = "eth0" dt := &driverTester{t: t} - if err := Init(dt, opt); err != nil { + if err := Init(dt, nil); err != nil { t.Fatal(err) } if err := dt.d.configure(); err != nil { t.Fatal(err) } + + iface, err := net.InterfaceByName("eth0") + if err != nil { + t.Fatal(err) + } + addrs, err := iface.Addrs() + if err != nil || len(addrs) == 0 { + t.Fatal(err) + } + data := driverapi.NodeDiscoveryData{ + Address: addrs[0].String(), + Self: true, + } + dt.d.DiscoverNew(driverapi.NodeDiscovery, data) return dt } diff --git a/drivers/remote/api/api.go b/drivers/remote/api/api.go index 2d441ab7d2..2345be0a29 100644 --- a/drivers/remote/api/api.go +++ b/drivers/remote/api/api.go @@ -4,7 +4,11 @@ with a remote driver. */ package api -import "net" +import ( + "net" + + "github.com/docker/libnetwork/driverapi" +) // Response is the basic response structure used in all responses. type Response struct { @@ -143,3 +147,14 @@ type LeaveRequest struct { type LeaveResponse struct { Response } + +// DiscoveryNotification represents a discovery notification +type DiscoveryNotification struct { + DiscoveryType driverapi.DiscoveryType + DiscoveryData interface{} +} + +// DiscoveryResponse is used by libnetwork to log any plugin error processing the discovery notifications +type DiscoveryResponse struct { + Response +} diff --git a/drivers/remote/driver.go b/drivers/remote/driver.go index 5f5a0f5b25..c4eb5e95c8 100644 --- a/drivers/remote/driver.go +++ b/drivers/remote/driver.go @@ -247,6 +247,30 @@ func (d *driver) Type() string { return d.networkType } +// DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster +func (d *driver) DiscoverNew(dType driverapi.DiscoveryType, data interface{}) error { + if dType != driverapi.NodeDiscovery { + return fmt.Errorf("Unknown discovery type : %v", dType) + } + notif := &api.DiscoveryNotification{ + DiscoveryType: dType, + DiscoveryData: data, + } + return d.call("DiscoverNew", notif, &api.DiscoveryResponse{}) +} + +// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster +func (d *driver) DiscoverDelete(dType driverapi.DiscoveryType, data interface{}) error { + if dType != driverapi.NodeDiscovery { + return fmt.Errorf("Unknown discovery type : %v", dType) + } + notif := &api.DiscoveryNotification{ + DiscoveryType: dType, + DiscoveryData: data, + } + return d.call("DiscoverDelete", notif, &api.DiscoveryResponse{}) +} + func parseStaticRoutes(r api.JoinResponse) ([]*types.StaticRoute, error) { var routes = make([]*types.StaticRoute, len(r.StaticRoutes)) for i, inRoute := range r.StaticRoutes { diff --git a/drivers/remote/driver_test.go b/drivers/remote/driver_test.go index a0fd602d42..07f76fa9bb 100644 --- a/drivers/remote/driver_test.go +++ b/drivers/remote/driver_test.go @@ -335,6 +335,12 @@ func TestRemoteDriver(t *testing.T) { }, } }) + handle(t, mux, "DiscoverNew", func(msg map[string]interface{}) interface{} { + return map[string]string{} + }) + handle(t, mux, "DiscoverDelete", func(msg map[string]interface{}) interface{} { + return map[string]interface{}{} + }) p, err := plugins.Get(plugin, driverapi.NetworkPluginEndpointType) if err != nil { @@ -382,6 +388,16 @@ func TestRemoteDriver(t *testing.T) { if err = d.DeleteNetwork(netID); err != nil { t.Fatal(err) } + + data := driverapi.NodeDiscoveryData{ + Address: "192.168.1.1", + } + if err = d.DiscoverNew(driverapi.NodeDiscovery, data); err != nil { + t.Fatal(err) + } + if err = d.DiscoverDelete(driverapi.NodeDiscovery, data); err != nil { + t.Fatal(err) + } } type failEndpoint struct { diff --git a/drivers/windows/windows.go b/drivers/windows/windows.go index 82fc61b7c0..6872486bf4 100644 --- a/drivers/windows/windows.go +++ b/drivers/windows/windows.go @@ -52,3 +52,13 @@ func (d *driver) Leave(nid, eid string) error { func (d *driver) Type() string { return networkType } + +// DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster +func (d *driver) DiscoverNew(dType driverapi.DiscoveryType, data interface{}) error { + return nil +} + +// DiscoverDelete is a notification for a discovery delete event, such as a node leaving a cluster +func (d *driver) DiscoverDelete(dType driverapi.DiscoveryType, data interface{}) error { + return nil +} diff --git a/hostdiscovery/hostdiscovery.go b/hostdiscovery/hostdiscovery.go index aa39baa834..cb29e45032 100644 --- a/hostdiscovery/hostdiscovery.go +++ b/hostdiscovery/hostdiscovery.go @@ -1,73 +1,48 @@ -// +build libnetwork_discovery - package hostdiscovery import ( - "errors" - "fmt" "net" "sync" - "time" log "github.com/Sirupsen/logrus" mapset "github.com/deckarep/golang-set" - "github.com/docker/libnetwork/config" - "github.com/docker/swarm/discovery" - // Anonymous import will be removed after we upgrade to latest swarm - _ "github.com/docker/swarm/discovery/file" - // Anonymous import will be removed after we upgrade to latest swarm - _ "github.com/docker/swarm/discovery/kv" - // Anonymous import will be removed after we upgrade to latest swarm - _ "github.com/docker/swarm/discovery/nodes" - // Anonymous import will be removed after we upgrade to latest swarm - _ "github.com/docker/swarm/discovery/token" + "github.com/docker/docker/pkg/discovery" + // Including KV + _ "github.com/docker/docker/pkg/discovery/kv" + "github.com/docker/libkv/store/consul" + "github.com/docker/libkv/store/etcd" + "github.com/docker/libkv/store/zookeeper" + "github.com/docker/libnetwork/types" ) -const defaultHeartbeat = time.Duration(10) * time.Second -const TTLFactor = 3 - type hostDiscovery struct { - discovery discovery.Discovery - nodes mapset.Set - stopChan chan struct{} + watcher discovery.Watcher + nodes mapset.Set + stopChan chan struct{} sync.Mutex } -// NewHostDiscovery function creates a host discovery object -func NewHostDiscovery() HostDiscovery { - return &hostDiscovery{nodes: mapset.NewSet(), stopChan: make(chan struct{})} +func init() { + consul.Register() + etcd.Register() + zookeeper.Register() } -func (h *hostDiscovery) StartDiscovery(cfg *config.ClusterCfg, joinCallback JoinCallback, leaveCallback LeaveCallback) error { - if cfg == nil { - return fmt.Errorf("discovery requires a valid configuration") - } - - hb := time.Duration(cfg.Heartbeat) * time.Second - if hb == 0 { - hb = defaultHeartbeat - } - d, err := discovery.New(cfg.Discovery, hb, TTLFactor*hb) - if err != nil { - return err - } - - if ip := net.ParseIP(cfg.Address); ip == nil { - return errors.New("address config should be either ipv4 or ipv6 address") - } - - if err := d.Register(cfg.Address + ":0"); err != nil { - return err - } +// NewHostDiscovery function creates a host discovery object +func NewHostDiscovery(watcher discovery.Watcher) HostDiscovery { + return &hostDiscovery{watcher: watcher, nodes: mapset.NewSet(), stopChan: make(chan struct{})} +} +func (h *hostDiscovery) Watch(joinCallback JoinCallback, leaveCallback LeaveCallback) error { h.Lock() - h.discovery = d + d := h.watcher h.Unlock() - + if d == nil { + return types.BadRequestErrorf("invalid discovery watcher") + } discoveryCh, errCh := d.Watch(h.stopChan) go h.monitorDiscovery(discoveryCh, errCh, joinCallback, leaveCallback) - go h.sustainHeartbeat(d, hb, cfg) return nil } @@ -77,7 +52,9 @@ func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-ch case entries := <-ch: h.processCallback(entries, joinCallback, leaveCallback) case err := <-errCh: - log.Errorf("discovery error: %v", err) + if err != nil { + log.Errorf("discovery error: %v", err) + } case <-h.stopChan: return } @@ -87,26 +64,13 @@ func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-ch func (h *hostDiscovery) StopDiscovery() error { h.Lock() stopChan := h.stopChan - h.discovery = nil + h.watcher = nil h.Unlock() close(stopChan) return nil } -func (h *hostDiscovery) sustainHeartbeat(d discovery.Discovery, hb time.Duration, config *config.ClusterCfg) { - for { - select { - case <-h.stopChan: - return - case <-time.After(hb): - if err := d.Register(config.Address + ":0"); err != nil { - log.Warn(err) - } - } - } -} - func (h *hostDiscovery) processCallback(entries discovery.Entries, joinCallback JoinCallback, leaveCallback LeaveCallback) { updated := hosts(entries) h.Lock() @@ -135,14 +99,14 @@ func diff(existing mapset.Set, updated mapset.Set) (added []net.IP, removed []ne return } -func (h *hostDiscovery) Fetch() ([]net.IP, error) { +func (h *hostDiscovery) Fetch() []net.IP { h.Lock() defer h.Unlock() ips := []net.IP{} for _, ipstr := range h.nodes.ToSlice() { ips = append(ips, net.ParseIP(ipstr.(string))) } - return ips, nil + return ips } func hosts(entries discovery.Entries) mapset.Set { diff --git a/hostdiscovery/hostdiscovery_api.go b/hostdiscovery/hostdiscovery_api.go index 09394e09bc..5be520fca8 100644 --- a/hostdiscovery/hostdiscovery_api.go +++ b/hostdiscovery/hostdiscovery_api.go @@ -1,10 +1,6 @@ package hostdiscovery -import ( - "net" - - "github.com/docker/libnetwork/config" -) +import "net" // JoinCallback provides a callback event for new node joining the cluster type JoinCallback func(entries []net.IP) @@ -14,10 +10,10 @@ type LeaveCallback func(entries []net.IP) // HostDiscovery primary interface type HostDiscovery interface { - // StartDiscovery initiates the discovery process and provides appropriate callbacks - StartDiscovery(*config.ClusterCfg, JoinCallback, LeaveCallback) error + //Watch Node join and leave cluster events + Watch(joinCallback JoinCallback, leaveCallback LeaveCallback) error // StopDiscovery stops the discovery perocess StopDiscovery() error // Fetch returns a list of host IPs that are currently discovered - Fetch() ([]net.IP, error) + Fetch() []net.IP } diff --git a/hostdiscovery/hostdiscovery_disabled.go b/hostdiscovery/hostdiscovery_disabled.go deleted file mode 100644 index 2dc67ccb0f..0000000000 --- a/hostdiscovery/hostdiscovery_disabled.go +++ /dev/null @@ -1,28 +0,0 @@ -// +build !libnetwork_discovery - -package hostdiscovery - -import ( - "net" - - "github.com/docker/libnetwork/config" -) - -type hostDiscovery struct{} - -// NewHostDiscovery function creates a host discovery object -func NewHostDiscovery() HostDiscovery { - return &hostDiscovery{} -} - -func (h *hostDiscovery) StartDiscovery(cfg *config.ClusterCfg, joinCallback JoinCallback, leaveCallback LeaveCallback) error { - return nil -} - -func (h *hostDiscovery) StopDiscovery() error { - return nil -} - -func (h *hostDiscovery) Fetch() ([]net.IP, error) { - return []net.IP{}, nil -} diff --git a/hostdiscovery/hostdiscovery_test.go b/hostdiscovery/hostdiscovery_test.go index 6da5bcc3e0..0b3a902306 100644 --- a/hostdiscovery/hostdiscovery_test.go +++ b/hostdiscovery/hostdiscovery_test.go @@ -1,80 +1,15 @@ -// +build libnetwork_discovery - package hostdiscovery import ( "net" "testing" - "time" mapset "github.com/deckarep/golang-set" _ "github.com/docker/libnetwork/testutils" - "github.com/docker/libnetwork/config" - "github.com/docker/swarm/discovery" + "github.com/docker/docker/pkg/discovery" ) -func TestDiscovery(t *testing.T) { - _, err := net.DialTimeout("tcp", "discovery-stage.hub.docker.com:80", 10*time.Second) - if err != nil { - t.Skip("Skipping Discovery test which need connectivity to discovery-stage.hub.docker.com") - } - - hd := NewHostDiscovery() - config, err := config.ParseConfig("libnetwork.toml") - if err != nil { - t.Fatal(err) - } - - err = hd.StartDiscovery(&config.Cluster, func(hosts []net.IP) {}, func(hosts []net.IP) {}) - if err != nil { - t.Fatal(err) - } - time.Sleep(time.Duration(config.Cluster.Heartbeat*2) * time.Second) - hosts, err := hd.Fetch() - if err != nil { - t.Fatal(err) - } - - found := false - for _, ip := range hosts { - if ip.Equal(net.ParseIP(config.Cluster.Address)) { - found = true - } - } - if !found { - t.Fatalf("Expecting hosts. But none discovered ") - } - err = hd.StopDiscovery() - if err != nil { - t.Fatal(err) - } -} - -func TestBadDiscovery(t *testing.T) { - _, err := net.DialTimeout("tcp", "discovery-stage.hub.docker.com:80", 10*time.Second) - if err != nil { - t.Skip("Skipping Discovery test which need connectivity to discovery-stage.hub.docker.com") - } - - hd := NewHostDiscovery() - cfg := &config.Config{} - cfg.Cluster.Discovery = "" - err = hd.StartDiscovery(&cfg.Cluster, func(hosts []net.IP) {}, func(hosts []net.IP) {}) - if err == nil { - t.Fatal("Invalid discovery configuration must fail") - } - cfg, err = config.ParseConfig("libnetwork.toml") - if err != nil { - t.Fatal(err) - } - cfg.Cluster.Address = "invalid" - err = hd.StartDiscovery(&cfg.Cluster, func(hosts []net.IP) {}, func(hosts []net.IP) {}) - if err == nil { - t.Fatal("Invalid discovery address configuration must fail") - } -} - func TestDiff(t *testing.T) { existing := mapset.NewSetFromSlice([]interface{}{"1.1.1.1", "2.2.2.2"}) addedIP := "3.3.3.3" diff --git a/hostdiscovery/libnetwork.toml b/hostdiscovery/libnetwork.toml index b8c6854103..7839d1e3a8 100644 --- a/hostdiscovery/libnetwork.toml +++ b/hostdiscovery/libnetwork.toml @@ -1,6 +1,6 @@ title = "LibNetwork Configuration file" [cluster] - discovery = "token://08469efb104bce980931ed24c8eb03a2" - Address = "1.1.1.1" + discovery = "consul://localhost:8500" + Address = "6.5.5.5" Heartbeat = 3 diff --git a/test/integration/dnet/helpers.bash b/test/integration/dnet/helpers.bash index f0adf6eb40..935ae28092 100644 --- a/test/integration/dnet/helpers.bash +++ b/test/integration/dnet/helpers.bash @@ -70,6 +70,9 @@ title = "LibNetwork Configuration file" [daemon] debug = false labels = [${labels}] +[cluster] + discovery = "consul://${bridge_ip}:8500" + Heartbeat = 10 [globalstore] embedded = false [globalstore.client] diff --git a/test/integration/dnet/run-integration-tests.sh b/test/integration/dnet/run-integration-tests.sh index 15578a9d23..030fde0ae8 100755 --- a/test/integration/dnet/run-integration-tests.sh +++ b/test/integration/dnet/run-integration-tests.sh @@ -81,9 +81,9 @@ unset cmap[dnet-3-multi] ## Setup start_dnet 1 overlay 1>>${INTEGRATION_ROOT}/test.log 2>&1 cmap[dnet-1-overlay]=dnet-1-overlay -start_dnet 2 overlay $(docker inspect --format '{{.NetworkSettings.IPAddress}}' dnet-1-overlay) 1>>${INTEGRATION_ROOT}/test.log 2>&1 +start_dnet 2 overlay 1>>${INTEGRATION_ROOT}/test.log 2>&1 cmap[dnet-2-overlay]=dnet-2-overlay -start_dnet 3 overlay $(docker inspect --format '{{.NetworkSettings.IPAddress}}' dnet-2-overlay) 1>>${INTEGRATION_ROOT}/test.log 2>&1 +start_dnet 3 overlay 1>>${INTEGRATION_ROOT}/test.log 2>&1 cmap[dnet-3-overlay]=dnet-3-overlay ## Run the test cases