diff --git a/README.md b/README.md index 8f032bdc7e..f8277608a7 100644 --- a/README.md +++ b/README.md @@ -89,12 +89,14 @@ agent: analyzers: 127.0.0.1:8082 topology: # Probes used to capture topology informations like interfaces, - # bridges, namespaces, etc. Available netlink, netns, ovsdb. + # bridges, namespaces, etc... + # Available: netlink, netns, ovsdb, neutron. # Default: netlink, netns probes: - netlink - netns # - ovsdb + # - neutron flow: # Probes used to capture traffic. probes: diff --git a/flow/mappings/neutron.go b/topology/probes/neutron.go similarity index 54% rename from flow/mappings/neutron.go rename to topology/probes/neutron.go index f4bc2004e5..9e1b1f6006 100644 --- a/flow/mappings/neutron.go +++ b/topology/probes/neutron.go @@ -20,7 +20,7 @@ * */ -package mappings +package probes import ( "errors" @@ -34,31 +34,29 @@ import ( "github.com/rackspace/gophercloud/openstack/networking/v2/ports" "github.com/rackspace/gophercloud/pagination" - //"github.com/golang/protobuf/proto" "github.com/pmylund/go-cache" "github.com/redhat-cip/skydive/config" - //"github.com/redhat-cip/skydive/flow" "github.com/redhat-cip/skydive/logging" + "github.com/redhat-cip/skydive/topology/graph" ) type NeutronMapper struct { + graph *graph.Graph client *gophercloud.ServiceClient cache *cache.Cache - cacheUpdaterChan chan string + nodeUpdaterChan chan graph.Identifier } type Attributes struct { TenantID string - VNI uint64 + VNI string } -func (mapper *NeutronMapper) retrievePort(mac string) (ports.Port, error) { - port := ports.Port{} - +func (mapper *NeutronMapper) retrievePort(mac string) (port ports.Port, err error) { opts := ports.ListOpts{MACAddress: mac} pager := ports.List(mapper.client, opts) - err := pager.EachPage(func(page pagination.Page) (bool, error) { + err = pager.EachPage(func(page pagination.Page) (bool, error) { portList, err := ports.ExtractPorts(page) if err != nil { return false, err @@ -70,80 +68,122 @@ func (mapper *NeutronMapper) retrievePort(mac string) (ports.Port, error) { return true, nil } } + return true, nil }) + if len(port.NetworkID) == 0 { - return port, errors.New("Unable to find port for mac address: " + mac) + return port, errors.New("Unable to find port for MAC address: " + mac) } return port, err } -func (mapper *NeutronMapper) retrieveAttributes(mac string) Attributes { - logging.GetLogger().Debugf("Retrieving attributes from Neutron for Mac: %s", mac) - - attrs := Attributes{} +func (mapper *NeutronMapper) retrieveAttributes(mac string) (*Attributes, error) { + logging.GetLogger().Debugf("Retrieving attributes from Neutron for MAC: %s", mac) port, err := mapper.retrievePort(mac) if err != nil { - return attrs + return nil, err } - attrs.TenantID = port.TenantID result := networks.Get(mapper.client, port.NetworkID) network, err := provider.ExtractGet(result) if err != nil { - return attrs - } - - if err != nil { - return attrs - } - - segID, err := strconv.Atoi(network.SegmentationID) - if err == nil { - attrs.VNI = uint64(segID) + return nil, err } - return attrs + return &Attributes{TenantID: port.TenantID, VNI: network.SegmentationID}, nil } -func (mapper *NeutronMapper) cacheUpdater() { - logging.GetLogger().Debug("Start Neutron cache updater") - var mac string +func (mapper *NeutronMapper) nodeUpdater() { + logging.GetLogger().Debugf("Starting Neutron updater") for { - mac = <-mapper.cacheUpdaterChan + nodeID := <-mapper.nodeUpdaterChan + node := mapper.graph.GetNode(nodeID) + if node == nil { + continue + } - logging.GetLogger().Debugf("Mac request received: %s", mac) + mac, ok := node.Metadata()["MAC"]; + if !ok { + continue + } - attrs := mapper.retrieveAttributes(mac) - mapper.cache.Set(mac, attrs, cache.DefaultExpiration) + attrs, err := mapper.retrieveAttributes(mac.(string)) + if err != nil { + continue + } + + mapper.updateNode(node, attrs) } + logging.GetLogger().Debugf("Stopping Neutron updater") } -/*func (mapper *NeutronMapper) EnhanceInterface(mac string, attrs *flow.Flow_InterfaceAttributes) { - a, f := mapper.cache.Get(mac) - if f { - ia := a.(Attributes) +func (mapper *NeutronMapper) updateNode(node *graph.Node, attrs *Attributes) { + mapper.graph.Lock() + defer mapper.graph.Unlock() + + if attrs.TenantID != "" && node.Metadata()["Neutron.TenantID"] != attrs.TenantID { + mapper.graph.AddMetadata(node, "Neutron.TenantID", attrs.TenantID) + } + + if segID, err := strconv.Atoi(attrs.VNI); err != nil { + if node.Metadata()["Neutron.VNI"] != uint64(segID) { + mapper.graph.AddMetadata(node, "Neutron.VNI", uint64(segID)) + } + } - attrs.TenantID = proto.String(ia.TenantID) - attrs.VNI = proto.Uint64(ia.VNI) + mapper.cache.Set(string(node.ID), attrs, cache.DefaultExpiration) +} +func (mapper *NeutronMapper) EnhanceNode(node *graph.Node) { + mac, ok := node.Metadata()["MAC"]; + if !ok { return } - mapper.cacheUpdaterChan <- mac -}*/ + a, f := mapper.cache.Get(mac.(string)) + if f { + attrs := a.(Attributes) + mapper.updateNode(node, &attrs) + return + } -func NewNeutronMapper() (*NeutronMapper, error) { - mapper := &NeutronMapper{} + mapper.nodeUpdaterChan <-node.ID +} - authURL := config.GetConfig().GetString("openstack.auth_url") - username := config.GetConfig().GetString("openstack.username") - password := config.GetConfig().GetString("openstack.password") - tenantName := config.GetConfig().GetString("openstack.tenant_name") - regionName := config.GetConfig().GetString("openstack.region_name") +func (mapper *NeutronMapper) OnNodeUpdated(n *graph.Node) { + go mapper.EnhanceNode(n) +} + +func (mapper *NeutronMapper) OnNodeAdded(n *graph.Node) { + go mapper.EnhanceNode(n) +} + +func (mapper *NeutronMapper) OnNodeDeleted(n *graph.Node) { +} + +func (mapper *NeutronMapper) OnEdgeUpdated(n *graph.Edge) { +} + +func (mapper *NeutronMapper) OnEdgeAdded(n *graph.Edge) { +} + +func (mapper *NeutronMapper) OnEdgeDeleted(n *graph.Edge) { +} + +func (mapper *NeutronMapper) Start() { + go mapper.nodeUpdater() +} + +func (mapper *NeutronMapper) Stop() { + close(mapper.nodeUpdaterChan) +} + +func NewNeutronMapper(g *graph.Graph, authURL string, username string, password string, tenantName string, regionName string) (*NeutronMapper, error) { + mapper := &NeutronMapper{graph: g} opts := gophercloud.AuthOptions{ IdentityEndpoint: authURL, @@ -173,8 +213,19 @@ func NewNeutronMapper() (*NeutronMapper, error) { expire := config.GetConfig().GetInt("cache.expire") cleanup := config.GetConfig().GetInt("cache.cleanup") mapper.cache = cache.New(time.Duration(expire)*time.Second, time.Duration(cleanup)*time.Second) - mapper.cacheUpdaterChan = make(chan string) - go mapper.cacheUpdater() + mapper.nodeUpdaterChan = make(chan graph.Identifier, 100) + + g.AddEventListener(mapper) return mapper, nil } + +func NewNeutronMapperFromConfig(g *graph.Graph) (*NeutronMapper, error) { + authURL := config.GetConfig().GetString("openstack.auth_url") + username := config.GetConfig().GetString("openstack.username") + password := config.GetConfig().GetString("openstack.password") + tenantName := config.GetConfig().GetString("openstack.tenant_name") + regionName := config.GetConfig().GetString("openstack.region_name") + + return NewNeutronMapper(g, authURL, username, password, tenantName, regionName) +} diff --git a/topology/probes/probes.go b/topology/probes/probes.go index 1213c31665..7148224efa 100644 --- a/topology/probes/probes.go +++ b/topology/probes/probes.go @@ -59,6 +59,13 @@ func NewTopologyProbeBundleFromConfig(g *graph.Graph, n *graph.Node) *TopologyPr probes[t] = NewOvsdbProbeFromConfig(g, n) case "docker": probes[t] = NewDockerProbeFromConfig(g, n) + case "neutron": + neutron, err := NewNeutronMapperFromConfig(g) + if err != nil { + logging.GetLogger().Errorf("Failed to initialize Neutron probe: %s", err.Error()) + continue + } + probes[t] = neutron default: logging.GetLogger().Error("unknown probe type %s", t)