Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE WIP: Nw policy updates #105

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ofnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,11 @@ type OfnetEndpoint struct {
type OfnetPolicyRule struct {
RuleId string // Unique identifier for the rule
Priority int // Priority for the rule (1..100. 100 is highest)
SrcVrf string // For policy rules, reqiured to uniquely identify the SrcEndpointGroup
SrcEndpointGroup int // Source endpoint group
DstVrf string // For policy rules, required to uniquely identify the DstEndpointGroup
DstEndpointGroup int // Destination endpoint group
SrcIpAddr string // source IP addrss and mask
SrcIpAddr string // source IP address and mask
DstIpAddr string // Destination IP address and mask
IpProtocol uint8 // IP protocol number
SrcPort uint16 // Source port
Expand Down
19 changes: 17 additions & 2 deletions ofnetAgent.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ import (
cmap "github.com/streamrail/concurrent-map"
)

// these can be passed to NewOfnetAgent for endpointIPsAreUnique parameter
const OFNET_AGENT_ENDPOINT_IPS_ARE_NOT_UNIQUE_PARAM = false
const OFNET_AGENT_ENDPOINT_IPS_ARE_UNIQUE_PARAM = true

// OfnetAgent state
type OfnetAgent struct {
ctrler *ofctrl.Controller // Controller instance
Expand All @@ -55,6 +59,11 @@ type OfnetAgent struct {
datapath OfnetDatapath // Configured datapath
protopath OfnetProto // Configured protopath

// True if all requests to create endpoints no matter the VRF will have
// unique IPs, which would allow for inferring the VRF based on IP address
// True also allows endpoints in different VRFs to communicate directly
endpointIpsAreUnique bool

masterDb map[string]*OfnetNode // list of Masters
masterDbMutex sync.Mutex // Sync mutex for masterDb

Expand Down Expand Up @@ -147,8 +156,8 @@ const (

// Create a new Ofnet agent and initialize it
func NewOfnetAgent(bridgeName string, dpName string, localIp net.IP, rpcPort uint16,
ovsPort uint16, uplinkInfo []string) (*OfnetAgent, error) {
log.Infof("Creating new ofnet agent for %s,%s,%d,%d,%d\n", bridgeName, dpName, localIp, rpcPort, ovsPort)
ovsPort uint16, uplinkInfo []string, endpointIpsAreUnique bool) (*OfnetAgent, error) {
log.Infof("Creating new ofnet agent for %s,%s,%d,%d,%d,%v\n", bridgeName, dpName, localIp, rpcPort, ovsPort, endpointIpsAreUnique)
agent := new(OfnetAgent)

// Init params
Expand All @@ -168,6 +177,8 @@ func NewOfnetAgent(bridgeName string, dpName string, localIp net.IP, rpcPort uin
agent.vniVlanMap = make(map[uint32]*uint16)
agent.vlanVniMap = make(map[uint16]*uint32)

agent.endpointIpsAreUnique = endpointIpsAreUnique

// Initialize vtep database
agent.vtepTable = make(map[string]*uint32)

Expand Down Expand Up @@ -253,6 +264,10 @@ func (self *OfnetAgent) incrErrStats(errName string) {
self.stats[errName+"-ERROR"] = currStats
}

func (a *OfnetAgent) IsEndpointIpsAreUnique() bool {
return a.endpointIpsAreUnique
}

// getEndpointId Get a unique identifier for the endpoint.
func (self *OfnetAgent) getEndpointId(endpoint EndpointInfo) string {
self.vlanVrfMutex.RLock()
Expand Down
2 changes: 1 addition & 1 deletion ofnetMaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (self *OfnetMaster) UnRegisterNode(hostInfo *OfnetNode, ret *bool) error {
// Add an Endpoint
func (self *OfnetMaster) EndpointAdd(ep *OfnetEndpoint, ret *bool) error {

log.Infof("Received Endpoint CReate from Remote netplugin")
log.Infof("Received Endpoint Create from Remote netplugin")
// Check if we have the endpoint already and which is more recent
self.masterMutex.RLock()
oldEp := self.endpointDb[ep.EndpointID]
Expand Down
139 changes: 98 additions & 41 deletions ofnetPolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ofnet

import (
"errors"
"fmt"
"net"
"net/rpc"
"reflect"
Expand Down Expand Up @@ -79,31 +80,56 @@ func (self *PolicyAgent) SwitchDisconnected(sw *ofctrl.OFSwitch) {
}

// Metadata Format
// 6 3 3 1 1 0 0
// 3 1 0 6 5 1 0
// +-------------+-+---------------+---------------+-+
// | ....U |U| SrcGrp | DstGrp |V|
// +-------------+-+---------------+---------------+-+
// Source Tenant + Group
// 0x1fff ffff 8000 0000 Destination Tenant + Group
// | 0x7FFF FFFE
// +--------+----------+ |
// | v +--------+---------+
// v Source Group v v
// Source Tenant 0x7FFF 8000 0000 Destination Tenant Destination Group
// 0x1FFF 8000 0000 0000 | 0x7FFE 0000 0x0001 FFFE
// | | | |
// +-------+--------++---------+---------++--------+-----++-----------+------+
// | || || || |
// v vv vv vv v
// 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 000V
//
// U: Unused
// SrcGrp: Source endpoint group
// DstGrp: Destination endpoint group
// V: Received on VTEP Port. Dont flood back to VTEP ports.
//

// DstGroupMetadata returns metadata for dst group
func DstGroupMetadata(groupId int) (uint64, uint64) {
metadata := uint64(groupId) << 1
metadataMask := uint64(0xfffe)
// returns openflow metadata and mask values for dst group
func DstGroupMetadata(vrfid uint16, groupId int) (uint64, uint64) {
// vrf: shift 16 for src group, 1 for VTEP flag
// group: shift 1 for the VTEP flag
metadata := (uint64(vrfid) << 17) + (uint64(groupId) << 1)
// vrf:
// 14 bits shifted 1 for vtep flag and 16 for group
// format((((1<<14))-1)<<(1+16), 'x')
// 0x7ffe0000
// group:
// format((((1<<16)-1)<<1), 'x')
// 0x1fffe
metadataMask := uint64(0x7ffffffe)
metadata = metadata & metadataMask

return metadata, metadataMask
}

// SrcGroupMetadata returns metadata for src group
func SrcGroupMetadata(groupId int) (uint64, uint64) {
metadata := uint64(groupId) << 16
metadataMask := uint64(0x7fff0000)
// returns openflow metadata and mask for src group
func SrcGroupMetadata(vrfid uint16, groupId int) (uint64, uint64) {
// vrf:
// shift 30 for dest vrf+group, 16 for src group, 1 for VTEP flag = 47
// group:
// shift 30 for the dest vrf+group, 1 for the VTEP flag
metadata := (uint64(vrfid) << 47) + (uint64(groupId) << (30 + 1))
// vrf:
// 14 bits shifted by 1: vtep flag + 30: dest vrf+group + 16: src group
// format((((1<<14))-1)<<(1+30+16), 'x')
// 0x1FFF800000000000
// group:
// 16 bits shifted 30 for dest vrf+group plus 1 for vtep flag
// format((((1<<16))-1)<<(30+1), 'x')
// 0x7fff80000000
metadataMask := uint64(0x1FFFFFFF80000000)
metadata = metadata & metadataMask

return metadata, metadataMask
Expand Down Expand Up @@ -139,8 +165,9 @@ func (self *PolicyAgent) AddEndpoint(endpoint *OfnetEndpoint) error {
self.agent.vrfMutex.RLock()
vrfid := self.agent.vrfNameIdMap[*vrf]
self.agent.vrfMutex.RUnlock()
vrfMetadata, vrfMetadataMask := Vrfmetadata(*vrfid)
// Install the Dst group lookup flow

vrfMetadata, vrfMetadataMask := VrfDestMetadata(*vrfid)
// match destination tenant and IP
dstGrpFlow, err := self.dstGrpTable.NewFlow(ofctrl.FlowMatch{
Priority: FLOW_MATCH_PRIORITY,
Ethertype: 0x0800,
Expand All @@ -153,8 +180,8 @@ func (self *PolicyAgent) AddEndpoint(endpoint *OfnetEndpoint) error {
return err
}

// Format the metadata
metadata, metadataMask := DstGroupMetadata(endpoint.EndpointGroup)
// Format the metadata for the destination group
metadata, metadataMask := DstGroupMetadata(*vrfid, endpoint.EndpointGroup)

// Set dst GroupId
err = dstGrpFlow.SetMetadata(metadata, metadataMask)
Expand Down Expand Up @@ -230,7 +257,7 @@ func (self *PolicyAgent) AddIpv6Endpoint(endpoint *OfnetEndpoint) error {
vrfid := self.agent.vrfNameIdMap[*vrf]
self.agent.vrfMutex.RUnlock()

vrfMetadata, vrfMetadataMask := Vrfmetadata(*vrfid)
vrfMetadata, vrfMetadataMask := VrfDestMetadata(*vrfid)
// Install the Dst group lookup flow
dstGrpFlow, err := self.dstGrpTable.NewFlow(ofctrl.FlowMatch{
Priority: FLOW_MATCH_PRIORITY,
Expand All @@ -245,7 +272,7 @@ func (self *PolicyAgent) AddIpv6Endpoint(endpoint *OfnetEndpoint) error {
}

// Format the metadata
metadata, metadataMask := DstGroupMetadata(endpoint.EndpointGroup)
metadata, metadataMask := DstGroupMetadata(*vrfid, endpoint.EndpointGroup)

// Set dst GroupId
err = dstGrpFlow.SetMetadata(metadata, metadataMask)
Expand Down Expand Up @@ -299,8 +326,10 @@ func (self *PolicyAgent) AddRule(rule *OfnetPolicyRule, ret *bool) error {
var ipDaMask *net.IP = nil
var ipSa *net.IP = nil
var ipSaMask *net.IP = nil
var md *uint64 = nil
var mdm *uint64 = nil
var metadata uint64 = 0 // for calculations of md
var metadataMask uint64 = 0 // for calculations of mdm
var md *uint64 = nil // flow metadata
var mdm *uint64 = nil // flow metadata mask
var flag, flagMask uint16
var flagPtr, flagMaskPtr *uint16
var err error
Expand Down Expand Up @@ -346,24 +375,52 @@ func (self *PolicyAgent) AddRule(rule *OfnetPolicyRule, ret *bool) error {
}
}

// parse source/dst endpoint groups
if rule.SrcEndpointGroup != 0 && rule.DstEndpointGroup != 0 {
srcMetadata, srcMetadataMask := SrcGroupMetadata(rule.SrcEndpointGroup)
dstMetadata, dstMetadataMask := DstGroupMetadata(rule.DstEndpointGroup)
metadata := srcMetadata | dstMetadata
metadataMask := srcMetadataMask | dstMetadataMask
md = &metadata
mdm = &metadataMask
} else if rule.SrcEndpointGroup != 0 {
srcMetadata, srcMetadataMask := SrcGroupMetadata(rule.SrcEndpointGroup)
md = &srcMetadata
mdm = &srcMetadataMask
} else if rule.DstEndpointGroup != 0 {
dstMetadata, dstMetadataMask := DstGroupMetadata(rule.DstEndpointGroup)
md = &dstMetadata
mdm = &dstMetadataMask
updateMetadata := func(meta uint64, mask uint64) (*uint64, *uint64) {
metadata |= meta
metadataMask |= mask
return &metadata, &metadataMask
}
// parse source/dst endpoint tenants and groups
var srcVrfId *uint16
var dstVrfId *uint16
if rule.SrcVrf != "" {
srcVrfId = self.agent.getvrfId(rule.SrcVrf)
if srcVrfId == nil {
errMsg := fmt.Sprintf("VRF %s was not found", rule.SrcVrf)
log.Errorf(errMsg)
return errors.New(errMsg)
}
md, mdm = updateMetadata(VrfSrcMetadata(*srcVrfId))
}
if rule.SrcEndpointGroup != 0 {
if rule.SrcVrf == "" {
errMsg := fmt.Sprintf("Source group %v was provided without VRF",
rule.SrcEndpointGroup)
log.Errorf(errMsg)
return errors.New(errMsg)
}

md, mdm = updateMetadata(SrcGroupMetadata(*srcVrfId, rule.SrcEndpointGroup))
}
if rule.DstVrf != "" {
dstVrfId = self.agent.getvrfId(rule.DstVrf)
if dstVrfId == nil {
errMsg := fmt.Sprintf("VRF %s was not found", rule.DstVrf)
log.Errorf(errMsg)
return errors.New(errMsg)
}
md, mdm = updateMetadata(VrfDestMetadata(*dstVrfId))
}
if rule.DstEndpointGroup != 0 {
if rule.DstVrf == "" {
errMsg := fmt.Sprintf("Destination group %v was provided without VRF",
rule.DstEndpointGroup)
log.Errorf(errMsg)
return errors.New(errMsg)
}

md, mdm = updateMetadata(DstGroupMetadata(*dstVrfId, rule.DstEndpointGroup))
}
// Setup TCP flags
if rule.IpProtocol == 6 && rule.TcpFlags != "" {
switch rule.TcpFlags {
Expand Down
Loading