Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3802 from weaveworks/reload-ip-tables
Browse files Browse the repository at this point in the history
Reload router iptables rules if they get cleared
  • Loading branch information
bboreham authored Jul 24, 2020
2 parents 1ff09fd + 341bac9 commit 7b927a7
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 87 deletions.
68 changes: 46 additions & 22 deletions net/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func EnsureBridge(procPath string, config *BridgeConfig, log *logrus.Logger, ips
break
}

if err := configureIPTables(config, ips); err != nil {
if err := ConfigureIPTables(config, ips); err != nil {
return bridgeType, errors.Wrap(err, "configuring iptables")
}

Expand Down Expand Up @@ -297,9 +297,7 @@ func EnsureBridge(procPath string, config *BridgeConfig, log *logrus.Logger, ips
return bridgeType, errors.Wrapf(err, "configuring ARP cache on bridge %q", config.WeaveBridgeName)
}

// NB: No concurrent call to Expose is possible, as EnsureBridge is called
// before any service has been started.
if err := reexpose(config, log); err != nil {
if err := Reexpose(config, log); err != nil {
return bridgeType, err
}

Expand Down Expand Up @@ -431,10 +429,40 @@ func (f fastdpImpl) attach(veth *netlink.Veth) error {
return odp.AddDatapathInterfaceIfNotExist(f.datapathName, veth.Attrs().Name)
}

func configureIPTables(config *BridgeConfig, ips ipset.Interface) error {
// ResetIPTables resets IPTables in case they're in a strange state from a previous run.
func ResetIPTables(config *BridgeConfig, ips ipset.Interface) error {
ipt, err := iptables.New()
if err != nil {
return errors.Wrap(err, "creating iptables object")
return errors.Wrap(err, "creating iptables object while resetting")
}

if !config.NPC {
// Create/Flush a chain for allowing ingress traffic when the bridge is exposed
if err := ipt.ClearChain("filter", "WEAVE-EXPOSE"); err != nil {
return errors.Wrap(err, "failed to clear/create filter/WEAVE-EXPOSE chain")
}
}

if err := ipt.ClearChain("nat", "WEAVE"); err != nil {
return errors.Wrap(err, "failed to clear/create nat/WEAVE chain")
}

if config.NoMasqLocal {
ips := ipset.New(common.LogLogger(), 0)
_ = ips.Destroy(NoMasqLocalIpset)
if err := ips.Create(NoMasqLocalIpset, ipset.HashNet); err != nil {
return err
}
}

return nil
}

// ConfigureIPTables idempotently configures all the iptables!
func ConfigureIPTables(config *BridgeConfig, ips ipset.Interface) error {
ipt, err := iptables.New()
if err != nil {
return errors.Wrap(err, "creating iptables object while configuring")
}

// The order among weave filter/FORWARD rules is important!
Expand Down Expand Up @@ -515,8 +543,8 @@ func configureIPTables(config *BridgeConfig, ips ipset.Interface) error {

if !config.NPC {
// Create/Flush a chain for allowing ingress traffic when the bridge is exposed
if err := ipt.ClearChain("filter", "WEAVE-EXPOSE"); err != nil {
return errors.Wrap(err, "failed to clear/create filter/WEAVE-EXPOSE chain")
if err := ensureChains(ipt, "filter", "WEAVE-EXPOSE"); err != nil {
return errors.Wrap(err, "failed to ensure existence of filter/WEAVE-EXPOSE chain")
}

fwdRules = append(fwdRules, []string{"-o", config.WeaveBridgeName, "-j", "WEAVE-EXPOSE"})
Expand All @@ -532,26 +560,22 @@ func configureIPTables(config *BridgeConfig, ips ipset.Interface) error {
}

// Create a chain for masquerading
if err := ipt.ClearChain("nat", "WEAVE"); err != nil {
return errors.Wrap(err, "failed to clear/create nat/WEAVE chain")
if err := ensureChains(ipt, "nat", "WEAVE"); err != nil {
return errors.Wrap(err, "failed to ensure existence of nat/WEAVE chain")
}
if err := ipt.AppendUnique("nat", "POSTROUTING", "-j", "WEAVE"); err != nil {
return err
}

// For the cases where the weave bridge is the default gateway for
// containers (e.g. Kubernetes): create the ipset to store CIDRs allocated
// by IPAM for local containers. In the case of Kubernetes, external traffic
// sent to these CIDRs avoids SNAT'ing so that NodePort with
// `"externalTrafficPolicy":"Local"` would receive packets with correct
// src IP addr.
// containers (e.g. Kubernetes): In `ResetIPTables` (which we assume
// to have been called at this point) we create an ipset to store CIDRs
// allocated by IPAM for local containers.
// In the case of Kubernetes, external traffic sent to these CIDRs
// avoids SNAT'ing so that NodePort with `"externalTrafficPolicy":"Local"`
// would receive packets with correct src IP addr.
if config.NoMasqLocal {
ips := ipset.New(common.LogLogger(), 0)
_ = ips.Destroy(NoMasqLocalIpset)
if err := ips.Create(NoMasqLocalIpset, ipset.HashNet); err != nil {
return err
}
if err := ipt.Insert("nat", "WEAVE", 1,
if err := ipt.AppendUnique("nat", "WEAVE",
"-m", "set", "--match-set", string(NoMasqLocalIpset), "dst",
"-m", "comment", "--comment", "Prevent SNAT to locally running containers",
"-j", "RETURN"); err != nil {
Expand Down Expand Up @@ -609,7 +633,7 @@ func linkSetUpByName(linkName string) error {
return netlink.LinkSetUp(link)
}

func reexpose(config *BridgeConfig, log *logrus.Logger) error {
func Reexpose(config *BridgeConfig, log *logrus.Logger) error {
// Get existing IP addrs of the weave bridge.
// If the bridge hasn't been exposed, then this functions does nothing.
//
Expand Down
9 changes: 2 additions & 7 deletions net/expose.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,8 @@ func addNatRule(ipt *iptables.IPTables, rulespec ...string) error {
// Loop until we get an exit code other than "temporarily unavailable"
for {
if err := ipt.AppendUnique("nat", "WEAVE", rulespec...); err != nil {
if ierr, ok := err.(*iptables.Error); ok {
if status, ok := ierr.ExitError.Sys().(syscall.WaitStatus); ok {
// (magic exit code 4 found in iptables source code; undocumented)
if status.ExitStatus() == 4 {
continue
}
}
if isResourceError(err) {
continue
}
return err
}
Expand Down
19 changes: 17 additions & 2 deletions net/ipset/ipset.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type Interface interface {
Create(ipsetName Name, ipsetType Type) error
AddEntry(user types.UID, ipsetName Name, entry string, comment string) error
DelEntry(user types.UID, ipsetName Name, entry string) error
Exist(user types.UID, ipsetName Name, entry string) bool
EntryExists(user types.UID, ipsetName Name, entry string) bool
Exists(ipsetName Name) (bool, error)
Flush(ipsetName Name) error
Destroy(ipsetName Name) error

Expand Down Expand Up @@ -121,10 +122,24 @@ func (i *ipset) DelEntry(user types.UID, ipsetName Name, entry string) error {
return doExec("del", string(ipsetName), entry)
}

func (i *ipset) Exist(user types.UID, ipsetName Name, entry string) bool {
func (i *ipset) EntryExists(user types.UID, ipsetName Name, entry string) bool {
return i.existUser(user, ipsetName, entry)
}

// Dummy way to check whether a given ipset exists.
func (i *ipset) Exists(name Name) (bool, error) {
sets, err := i.List(string(name))
if err != nil {
return false, err
}
for _, s := range sets {
if s == name {
return true, nil
}
}
return false, nil
}

func (i *ipset) Flush(ipsetName Name) error {
i.removeSetFromUsers(ipsetName)
return doExec("flush", string(ipsetName))
Expand Down
120 changes: 120 additions & 0 deletions net/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package net

import (
"strings"
"syscall"
"time"

"github.com/coreos/go-iptables/iptables"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
utilwait "k8s.io/apimachinery/pkg/util/wait"
)

// AddChainWithRules creates a chain and appends given rules to it.
Expand Down Expand Up @@ -104,3 +108,119 @@ func ensureRulesAtTop(table, chain string, rulespecs [][]string, ipt *iptables.I

return nil
}

func chainExists(ipt *iptables.IPTables, table string, chain string) (bool, error) {
existingChains, err := ipt.ListChains(table)
if err != nil {
return false, errors.Wrapf(err, "ipt.ListChains(%s)", table)
}
chainMap := make(map[string]struct{})
for _, c := range existingChains {
chainMap[c] = struct{}{}
}

_, found := chainMap[chain]
return found, nil
}

const (
// Max time we wait for an iptables flush to complete after we notice it has started
iptablesFlushTimeout = 5 * time.Second
// How often we poll while waiting for an iptables flush to complete
iptablesFlushPollTime = 100 * time.Millisecond
)

// MonitorForIptablesFlush periodically checks for a canary chain in iptables. If this canary chain goes missing it calls the reloadFunc.
// This is a more efficient way of detecting whether firewalld or another process has been removing rules that we rely on.
// The reloadFunc can then check whether other chains that should exist are still there, fix things and restore the canary.
func MonitorForIptablesFlush(log *logrus.Logger, canary string, tables []string, reloadFunc func(), interval time.Duration, stopCh <-chan struct{}) {
ipt, err := iptables.New()
if err != nil {
log.Errorf("creating iptables object while initializing iptable Monitoring %s", err)
return
}

for {
_ = PollImmediateUntil(interval, func() (bool, error) {
for _, table := range tables {
if err := ensureChains(ipt, table, canary); err != nil {
log.Warningf("Could not set up iptables canary %s/%s: %v", table, canary, err)
return false, nil
}
}
return true, nil
}, stopCh)

// Poll until stopCh is closed or iptables is flushed
err = utilwait.PollUntil(interval, func() (bool, error) {
if exists, err := chainExists(ipt, tables[0], canary); exists {
return false, nil
} else if isResourceError(err) {
log.Warningf("Could not check for iptables canary %s/%s: %v", tables[0], canary, err)
return false, nil
}
log.Infof("iptables canary %s/%s deleted", tables[0], canary)

// Wait for the other canaries to be deleted too before returning
// so we don't start reloading too soon.
err := utilwait.PollImmediate(iptablesFlushPollTime, iptablesFlushTimeout, func() (bool, error) {
for i := 1; i < len(tables); i++ {
if exists, err := chainExists(ipt, tables[i], canary); exists || isResourceError(err) {
return false, nil
}
}
return true, nil
})
if err != nil {
log.Warning("Inconsistent iptables state detected.")
}
return true, nil
}, stopCh)

if err != nil {
// stopCh was closed
for _, table := range tables {
_ = ipt.DeleteChain(table, canary)
}
return
}

log.Infof("Reloading after iptables flush")
reloadFunc()
}
}

const iptablesStatusResourceProblem = 4

// isResourceError returns true if the error indicates that iptables ran into a "resource
// problem" and was unable to attempt the request. In particular, this will be true if it
// times out trying to get the iptables lock.
func isResourceError(err error) bool {
if ierr, ok := err.(*iptables.Error); ok {
if status, ok := ierr.ExitError.Sys().(syscall.WaitStatus); ok {
return status.ExitStatus() == iptablesStatusResourceProblem
}
}

return false
}

// PollImmediateUntil tries a condition func until it returns true, an error or stopCh is closed.
//
// PollImmediateUntil runs the 'condition' before waiting for the interval.
// 'condition' will always be invoked at least once.
func PollImmediateUntil(interval time.Duration, condition utilwait.ConditionFunc, stopCh <-chan struct{}) error {
done, err := condition()
if err != nil {
return err
}
if done {
return nil
}
select {
case <-stopCh:
return utilwait.ErrWaitTimeout
default:
return utilwait.PollUntil(interval, condition, stopCh)
}
}
Loading

0 comments on commit 7b927a7

Please sign in to comment.