Skip to content

Commit

Permalink
Turn the Neutron flow mapper into a topology probe
Browse files Browse the repository at this point in the history
Change-Id: Iab99917ffb336c9561531642e8e17e8108e52ccc
  • Loading branch information
lebauce committed Feb 23, 2016
1 parent 0414cb0 commit d35ed73
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 52 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,14 @@ agent:
analyzers: 127.0.0.1:8082
topology:
# Probes used to capture topology informations like interfaces,
# bridges, namespaces, etc. Available netlink, netns, ovsdb.
# bridges, namespaces, etc...
# Available: netlink, netns, ovsdb, neutron.
# Default: netlink, netns
probes:
- netlink
- netns
# - ovsdb
# - neutron
flow:
# Probes used to capture traffic.
probes:
Expand Down
153 changes: 102 additions & 51 deletions flow/mappings/neutron.go → topology/probes/neutron.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
*
*/

package mappings
package probes

import (
"errors"
Expand All @@ -34,31 +34,29 @@ import (
"github.com/rackspace/gophercloud/openstack/networking/v2/ports"
"github.com/rackspace/gophercloud/pagination"

//"github.com/golang/protobuf/proto"
"github.com/pmylund/go-cache"

"github.com/redhat-cip/skydive/config"
//"github.com/redhat-cip/skydive/flow"
"github.com/redhat-cip/skydive/logging"
"github.com/redhat-cip/skydive/topology/graph"
)

type NeutronMapper struct {
graph *graph.Graph
client *gophercloud.ServiceClient
cache *cache.Cache
cacheUpdaterChan chan string
nodeUpdaterChan chan graph.Identifier
}

type Attributes struct {
TenantID string
VNI uint64
VNI string
}

func (mapper *NeutronMapper) retrievePort(mac string) (ports.Port, error) {
port := ports.Port{}

func (mapper *NeutronMapper) retrievePort(mac string) (port ports.Port, err error) {
opts := ports.ListOpts{MACAddress: mac}
pager := ports.List(mapper.client, opts)
err := pager.EachPage(func(page pagination.Page) (bool, error) {
err = pager.EachPage(func(page pagination.Page) (bool, error) {
portList, err := ports.ExtractPorts(page)
if err != nil {
return false, err
Expand All @@ -70,80 +68,122 @@ func (mapper *NeutronMapper) retrievePort(mac string) (ports.Port, error) {
return true, nil
}
}

return true, nil
})

if len(port.NetworkID) == 0 {
return port, errors.New("Unable to find port for mac address: " + mac)
return port, errors.New("Unable to find port for MAC address: " + mac)
}

return port, err
}

func (mapper *NeutronMapper) retrieveAttributes(mac string) Attributes {
logging.GetLogger().Debugf("Retrieving attributes from Neutron for Mac: %s", mac)

attrs := Attributes{}
func (mapper *NeutronMapper) retrieveAttributes(mac string) (*Attributes, error) {
logging.GetLogger().Debugf("Retrieving attributes from Neutron for MAC: %s", mac)

port, err := mapper.retrievePort(mac)
if err != nil {
return attrs
return nil, err
}
attrs.TenantID = port.TenantID

result := networks.Get(mapper.client, port.NetworkID)
network, err := provider.ExtractGet(result)
if err != nil {
return attrs
}

if err != nil {
return attrs
}

segID, err := strconv.Atoi(network.SegmentationID)
if err == nil {
attrs.VNI = uint64(segID)
return nil, err
}

return attrs
return &Attributes{TenantID: port.TenantID, VNI: network.SegmentationID}, nil
}

func (mapper *NeutronMapper) cacheUpdater() {
logging.GetLogger().Debug("Start Neutron cache updater")

var mac string
func (mapper *NeutronMapper) nodeUpdater() {
logging.GetLogger().Debugf("Starting Neutron updater")
for {
mac = <-mapper.cacheUpdaterChan
nodeID := <-mapper.nodeUpdaterChan
node := mapper.graph.GetNode(nodeID)
if node == nil {
continue
}

logging.GetLogger().Debugf("Mac request received: %s", mac)
mac, ok := node.Metadata()["MAC"];
if !ok {
continue
}

attrs := mapper.retrieveAttributes(mac)
mapper.cache.Set(mac, attrs, cache.DefaultExpiration)
attrs, err := mapper.retrieveAttributes(mac.(string))
if err != nil {
continue
}

mapper.updateNode(node, attrs)
}
logging.GetLogger().Debugf("Stopping Neutron updater")
}

/*func (mapper *NeutronMapper) EnhanceInterface(mac string, attrs *flow.Flow_InterfaceAttributes) {
a, f := mapper.cache.Get(mac)
if f {
ia := a.(Attributes)
func (mapper *NeutronMapper) updateNode(node *graph.Node, attrs *Attributes) {
mapper.graph.Lock()
defer mapper.graph.Unlock()

if attrs.TenantID != "" && node.Metadata()["Neutron.TenantID"] != attrs.TenantID {
mapper.graph.AddMetadata(node, "Neutron.TenantID", attrs.TenantID)
}

if segID, err := strconv.Atoi(attrs.VNI); err != nil {
if node.Metadata()["Neutron.VNI"] != uint64(segID) {
mapper.graph.AddMetadata(node, "Neutron.VNI", uint64(segID))
}
}

attrs.TenantID = proto.String(ia.TenantID)
attrs.VNI = proto.Uint64(ia.VNI)
mapper.cache.Set(string(node.ID), attrs, cache.DefaultExpiration)
}

func (mapper *NeutronMapper) EnhanceNode(node *graph.Node) {
mac, ok := node.Metadata()["MAC"];
if !ok {
return
}

mapper.cacheUpdaterChan <- mac
}*/
a, f := mapper.cache.Get(mac.(string))
if f {
attrs := a.(Attributes)
mapper.updateNode(node, &attrs)
return
}

func NewNeutronMapper() (*NeutronMapper, error) {
mapper := &NeutronMapper{}
mapper.nodeUpdaterChan <-node.ID
}

authURL := config.GetConfig().GetString("openstack.auth_url")
username := config.GetConfig().GetString("openstack.username")
password := config.GetConfig().GetString("openstack.password")
tenantName := config.GetConfig().GetString("openstack.tenant_name")
regionName := config.GetConfig().GetString("openstack.region_name")
func (mapper *NeutronMapper) OnNodeUpdated(n *graph.Node) {
go mapper.EnhanceNode(n)
}

func (mapper *NeutronMapper) OnNodeAdded(n *graph.Node) {
go mapper.EnhanceNode(n)
}

func (mapper *NeutronMapper) OnNodeDeleted(n *graph.Node) {
}

func (mapper *NeutronMapper) OnEdgeUpdated(n *graph.Edge) {
}

func (mapper *NeutronMapper) OnEdgeAdded(n *graph.Edge) {
}

func (mapper *NeutronMapper) OnEdgeDeleted(n *graph.Edge) {
}

func (mapper *NeutronMapper) Start() {
go mapper.nodeUpdater()
}

func (mapper *NeutronMapper) Stop() {
close(mapper.nodeUpdaterChan)
}

func NewNeutronMapper(g *graph.Graph, authURL string, username string, password string, tenantName string, regionName string) (*NeutronMapper, error) {
mapper := &NeutronMapper{graph: g}

opts := gophercloud.AuthOptions{
IdentityEndpoint: authURL,
Expand Down Expand Up @@ -173,8 +213,19 @@ func NewNeutronMapper() (*NeutronMapper, error) {
expire := config.GetConfig().GetInt("cache.expire")
cleanup := config.GetConfig().GetInt("cache.cleanup")
mapper.cache = cache.New(time.Duration(expire)*time.Second, time.Duration(cleanup)*time.Second)
mapper.cacheUpdaterChan = make(chan string)
go mapper.cacheUpdater()
mapper.nodeUpdaterChan = make(chan graph.Identifier, 100)

g.AddEventListener(mapper)

return mapper, nil
}

func NewNeutronMapperFromConfig(g *graph.Graph) (*NeutronMapper, error) {
authURL := config.GetConfig().GetString("openstack.auth_url")
username := config.GetConfig().GetString("openstack.username")
password := config.GetConfig().GetString("openstack.password")
tenantName := config.GetConfig().GetString("openstack.tenant_name")
regionName := config.GetConfig().GetString("openstack.region_name")

return NewNeutronMapper(g, authURL, username, password, tenantName, regionName)
}
7 changes: 7 additions & 0 deletions topology/probes/probes.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ func NewTopologyProbeBundleFromConfig(g *graph.Graph, n *graph.Node) *TopologyPr
probes[t] = NewOvsdbProbeFromConfig(g, n)
case "docker":
probes[t] = NewDockerProbeFromConfig(g, n)
case "neutron":
neutron, err := NewNeutronMapperFromConfig(g)
if err != nil {
logging.GetLogger().Errorf("Failed to initialize Neutron probe: %s", err.Error())
continue
}
probes[t] = neutron

default:
logging.GetLogger().Error("unknown probe type %s", t)
Expand Down

0 comments on commit d35ed73

Please sign in to comment.