diff --git a/network/bandwidth/fake_shaper.go b/network/bandwidth/fake_shaper.go new file mode 100644 index 000000000..591f3e839 --- /dev/null +++ b/network/bandwidth/fake_shaper.go @@ -0,0 +1,56 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bandwidth + +import ( + "errors" + + "k8s.io/apimachinery/pkg/api/resource" +) + +// FakeShaper provides an implementation of the bandwidth.Shaper. +// Beware this is implementation has no features besides Reset and GetCIDRs. +type FakeShaper struct { + CIDRs []string + ResetCIDRs []string +} + +// Limit is not implemented +func (f *FakeShaper) Limit(cidr string, egress, ingress *resource.Quantity) error { + return errors.New("unimplemented") +} + +// Reset appends a particular CIDR to the set of ResetCIDRs being managed by this shaper +func (f *FakeShaper) Reset(cidr string) error { + f.ResetCIDRs = append(f.ResetCIDRs, cidr) + return nil +} + +// ReconcileInterface is not implemented +func (f *FakeShaper) ReconcileInterface() error { + return errors.New("unimplemented") +} + +// ReconcileCIDR is not implemented +func (f *FakeShaper) ReconcileCIDR(cidr string, egress, ingress *resource.Quantity) error { + return errors.New("unimplemented") +} + +// GetCIDRs returns the set of CIDRs that are being managed by this shaper +func (f *FakeShaper) GetCIDRs() ([]string, error) { + return f.CIDRs, nil +} diff --git a/network/bandwidth/interfaces.go b/network/bandwidth/interfaces.go new file mode 100644 index 000000000..ec29d5d10 --- /dev/null +++ b/network/bandwidth/interfaces.go @@ -0,0 +1,40 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bandwidth + +import "k8s.io/apimachinery/pkg/api/resource" + +// Shaper is designed so that the shaper structs created +// satisfy the Shaper interface. +type Shaper interface { + // Limit the bandwidth for a particular CIDR on a particular interface + // * ingress and egress are in bits/second + // * cidr is expected to be a valid network CIDR (e.g. '1.2.3.4/32' or '10.20.0.1/16') + // 'egress' bandwidth limit applies to all packets on the interface whose source matches 'cidr' + // 'ingress' bandwidth limit applies to all packets on the interface whose destination matches 'cidr' + // Limits are aggregate limits for the CIDR, not per IP address. CIDRs must be unique, but can be overlapping, traffic + // that matches multiple CIDRs counts against all limits. + Limit(cidr string, egress, ingress *resource.Quantity) error + // Remove a bandwidth limit for a particular CIDR on a particular network interface + Reset(cidr string) error + // Reconcile the interface managed by this shaper with the state on the ground. + ReconcileInterface() error + // Reconcile a CIDR managed by this shaper with the state on the ground + ReconcileCIDR(cidr string, egress, ingress *resource.Quantity) error + // GetCIDRs returns the set of CIDRs that are being managed by this shaper + GetCIDRs() ([]string, error) +} diff --git a/network/bandwidth/linux.go b/network/bandwidth/linux.go new file mode 100644 index 000000000..03e7354a4 --- /dev/null +++ b/network/bandwidth/linux.go @@ -0,0 +1,346 @@ +//go:build linux +// +build linux + +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bandwidth + +import ( + "bufio" + "bytes" + "encoding/hex" + "fmt" + "net" + "regexp" + "strings" + + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/utils/exec" + netutils "k8s.io/utils/net" +) + +var ( + classShowMatcher = regexp.MustCompile(`class htb (1:\d+)`) + classAndHandleMatcher = regexp.MustCompile(`filter parent 1:.*fh (\d+::\d+).*flowid (\d+:\d+)`) +) + +// tcShaper provides an implementation of the Shaper interface on Linux using the 'tc' tool. +// In general, using this requires that the caller posses the NET_CAP_ADMIN capability, though if you +// do this within an container, it only requires the NS_CAPABLE capability for manipulations to that +// container's network namespace. +// Uses the hierarchical token bucket queuing discipline (htb), this requires Linux 2.4.20 or newer +// or a custom kernel with that queuing discipline backported. +type tcShaper struct { + e exec.Interface + iface string +} + +// NewTCShaper makes a new tcShaper for the given interface +func NewTCShaper(iface string) Shaper { + shaper := &tcShaper{ + e: exec.New(), + iface: iface, + } + return shaper +} + +func (t *tcShaper) execAndLog(cmdStr string, args ...string) error { + logrus.Infof("Running: %s %s", cmdStr, strings.Join(args, " ")) + cmd := t.e.Command(cmdStr, args...) + out, err := cmd.CombinedOutput() + logrus.Infof("Output from tc: %s", string(out)) + return err +} + +func (t *tcShaper) nextClassID() (int, error) { + data, err := t.e.Command("tc", "class", "show", "dev", t.iface).CombinedOutput() + if err != nil { + return -1, err + } + + scanner := bufio.NewScanner(bytes.NewBuffer(data)) + classes := sets.String{} + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + // skip empty lines + if len(line) == 0 { + continue + } + // expected tc line: + // class htb 1:1 root prio 0 rate 1000Kbit ceil 1000Kbit burst 1600b cburst 1600b + matches := classShowMatcher.FindStringSubmatch(line) + if len(matches) != 2 { + return -1, fmt.Errorf("unexpected output from tc: %s (%v)", scanner.Text(), matches) + } + classes.Insert(matches[1]) + } + + // Make sure it doesn't go forever + for nextClass := 1; nextClass < 10000; nextClass++ { + if !classes.Has(fmt.Sprintf("1:%d", nextClass)) { + return nextClass, nil + } + } + // This should really never happen + return -1, fmt.Errorf("exhausted class space, please try again") +} + +// Convert a CIDR from text to a hex representation +// Strips any masked parts of the IP, so 1.2.3.4/16 becomes hex(1.2.0.0)/ffffffff +func hexCIDR(cidr string) (string, error) { + ip, ipnet, err := netutils.ParseCIDRSloppy(cidr) + if err != nil { + return "", err + } + ip = ip.Mask(ipnet.Mask) + hexIP := hex.EncodeToString([]byte(ip)) + hexMask := ipnet.Mask.String() + return hexIP + "/" + hexMask, nil +} + +// Convert a CIDR from hex representation to text, opposite of the above. +func asciiCIDR(cidr string) (string, error) { + parts := strings.Split(cidr, "/") + if len(parts) != 2 { + return "", fmt.Errorf("unexpected CIDR format: %s", cidr) + } + ipData, err := hex.DecodeString(parts[0]) + if err != nil { + return "", err + } + ip := net.IP(ipData) + + maskData, err := hex.DecodeString(parts[1]) + if err != nil { + return "", err + } + mask := net.IPMask(maskData) + size, _ := mask.Size() + + return fmt.Sprintf("%s/%d", ip.String(), size), nil +} + +func (t *tcShaper) findCIDRClass(cidr string) (classAndHandleList [][]string, found bool, err error) { + data, err := t.e.Command("tc", "filter", "show", "dev", t.iface).CombinedOutput() + if err != nil { + return classAndHandleList, false, err + } + + hex, err := hexCIDR(cidr) + if err != nil { + return classAndHandleList, false, err + } + spec := fmt.Sprintf("match %s", hex) + + scanner := bufio.NewScanner(bytes.NewBuffer(data)) + filter := "" + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if len(line) == 0 { + continue + } + if strings.HasPrefix(line, "filter") { + filter = line + continue + } + if strings.Contains(line, spec) { + // expected tc line: + // `filter parent 1: protocol ip pref 1 u32 fh 800::800 order 2048 key ht 800 bkt 0 flowid 1:1` (old version) or + // `filter parent 1: protocol ip pref 1 u32 chain 0 fh 800::800 order 2048 key ht 800 bkt 0 flowid 1:1 not_in_hw` (new version) + matches := classAndHandleMatcher.FindStringSubmatch(filter) + if len(matches) != 3 { + return classAndHandleList, false, fmt.Errorf("unexpected output from tc: %s %d (%v)", filter, len(matches), matches) + } + resultTmp := []string{matches[2], matches[1]} + classAndHandleList = append(classAndHandleList, resultTmp) + } + } + if len(classAndHandleList) > 0 { + return classAndHandleList, true, nil + } + return classAndHandleList, false, nil +} + +func makeKBitString(rsrc *resource.Quantity) string { + return fmt.Sprintf("%dkbit", (rsrc.Value() / 1000)) +} + +func (t *tcShaper) makeNewClass(rate string) (int, error) { + class, err := t.nextClassID() + if err != nil { + return -1, err + } + if err := t.execAndLog("tc", "class", "add", + "dev", t.iface, + "parent", "1:", + "classid", fmt.Sprintf("1:%d", class), + "htb", "rate", rate); err != nil { + return -1, err + } + return class, nil +} + +func (t *tcShaper) Limit(cidr string, upload, download *resource.Quantity) (err error) { + var downloadClass, uploadClass int + if download != nil { + if downloadClass, err = t.makeNewClass(makeKBitString(download)); err != nil { + return err + } + if err := t.execAndLog("tc", "filter", "add", + "dev", t.iface, + "protocol", "ip", + "parent", "1:0", + "prio", "1", "u32", + "match", "ip", "dst", cidr, + "flowid", fmt.Sprintf("1:%d", downloadClass)); err != nil { + return err + } + } + if upload != nil { + if uploadClass, err = t.makeNewClass(makeKBitString(upload)); err != nil { + return err + } + if err := t.execAndLog("tc", "filter", "add", + "dev", t.iface, + "protocol", "ip", + "parent", "1:0", + "prio", "1", "u32", + "match", "ip", "src", cidr, + "flowid", fmt.Sprintf("1:%d", uploadClass)); err != nil { + return err + } + } + return nil +} + +// tests to see if an interface exists, if it does, return true and the status line for the interface +// returns false, "", if an error occurs. +func (t *tcShaper) interfaceExists() (bool, string, error) { + data, err := t.e.Command("tc", "qdisc", "show", "dev", t.iface).CombinedOutput() + if err != nil { + return false, "", err + } + value := strings.TrimSpace(string(data)) + if len(value) == 0 { + return false, "", nil + } + // Newer versions of tc and/or the kernel return the following instead of nothing: + // qdisc noqueue 0: root refcnt 2 + fields := strings.Fields(value) + if len(fields) > 1 && fields[1] == "noqueue" { + return false, "", nil + } + return true, value, nil +} + +func (t *tcShaper) ReconcileCIDR(cidr string, upload, download *resource.Quantity) error { + _, found, err := t.findCIDRClass(cidr) + if err != nil { + return err + } + if !found { + return t.Limit(cidr, upload, download) + } + // TODO: actually check bandwidth limits here + return nil +} + +func (t *tcShaper) ReconcileInterface() error { + exists, output, err := t.interfaceExists() + if err != nil { + return err + } + if !exists { + logrus.Info("Didn't find bandwidth interface, creating") + return t.initializeInterface() + } + fields := strings.Split(output, " ") + if len(fields) < 12 || fields[1] != "htb" || fields[2] != "1:" { + if err := t.deleteInterface(fields[2]); err != nil { + return err + } + return t.initializeInterface() + } + return nil +} + +func (t *tcShaper) initializeInterface() error { + return t.execAndLog("tc", "qdisc", "add", "dev", t.iface, "root", "handle", "1:", "htb", "default", "30") +} + +func (t *tcShaper) Reset(cidr string) error { + classAndHandle, found, err := t.findCIDRClass(cidr) + if err != nil { + return err + } + if !found { + return fmt.Errorf("Failed to find cidr: %s on interface: %s", cidr, t.iface) + } + for i := 0; i < len(classAndHandle); i++ { + if err := t.execAndLog("tc", "filter", "del", + "dev", t.iface, + "parent", "1:", + "proto", "ip", + "prio", "1", + "handle", classAndHandle[i][1], "u32"); err != nil { + return err + } + if err := t.execAndLog("tc", "class", "del", + "dev", t.iface, + "parent", "1:", + "classid", classAndHandle[i][0]); err != nil { + return err + } + } + return nil +} + +func (t *tcShaper) deleteInterface(class string) error { + return t.execAndLog("tc", "qdisc", "delete", "dev", t.iface, "root", "handle", class) +} + +func (t *tcShaper) GetCIDRs() ([]string, error) { + data, err := t.e.Command("tc", "filter", "show", "dev", t.iface).CombinedOutput() + if err != nil { + return nil, err + } + + result := []string{} + scanner := bufio.NewScanner(bytes.NewBuffer(data)) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if len(line) == 0 { + continue + } + if strings.Contains(line, "match") { + parts := strings.Split(line, " ") + // expected tc line: + // match at + if len(parts) != 4 { + return nil, fmt.Errorf("unexpected output: %v", parts) + } + cidr, err := asciiCIDR(parts[1]) + if err != nil { + return nil, err + } + result = append(result, cidr) + } + } + return result, nil +} diff --git a/network/bandwidth/linux_test.go b/network/bandwidth/linux_test.go new file mode 100644 index 000000000..91f6cb101 --- /dev/null +++ b/network/bandwidth/linux_test.go @@ -0,0 +1,729 @@ +//go:build linux +// +build linux + +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bandwidth + +import ( + "errors" + "reflect" + "strings" + "testing" + + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/utils/exec" + fakeexec "k8s.io/utils/exec/testing" +) + +var tcClassOutput = `class htb 1:1 root prio 0 rate 10000bit ceil 10000bit burst 1600b cburst 1600b +class htb 1:2 root prio 0 rate 10000bit ceil 10000bit burst 1600b cburst 1600b +class htb 1:3 root prio 0 rate 10000bit ceil 10000bit burst 1600b cburst 1600b +class htb 1:4 root prio 0 rate 10000bit ceil 10000bit burst 1600b cburst 1600b +` + +var tcClassOutput2 = `class htb 1:1 root prio 0 rate 10000bit ceil 10000bit burst 1600b cburst 1600b +class htb 1:2 root prio 0 rate 10000bit ceil 10000bit burst 1600b cburst 1600b +class htb 1:3 root prio 0 rate 10000bit ceil 10000bit burst 1600b cburst 1600b +class htb 1:4 root prio 0 rate 10000bit ceil 10000bit burst 1600b cburst 1600b +class htb 1:5 root prio 0 rate 10000bit ceil 10000bit burst 1600b cburst 1600b +` + +func TestNextClassID(t *testing.T) { + tests := []struct { + output string + expectErr bool + expected int + err error + }{ + { + output: tcClassOutput, + expected: 5, + }, + { + output: "\n", + expected: 1, + }, + { + expected: -1, + expectErr: true, + err: errors.New("test error"), + }, + } + for _, test := range tests { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeAction{ + func() ([]byte, []byte, error) { return []byte(test.output), nil, test.err }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { + return fakeexec.InitFakeCmd(&fcmd, cmd, args...) + }, + }, + } + shaper := &tcShaper{e: &fexec} + class, err := shaper.nextClassID() + if test.expectErr { + if err == nil { + t.Errorf("unexpected non-error") + } + } else { + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if class != test.expected { + t.Errorf("expected: %d, found %d", test.expected, class) + } + } + } +} + +func TestHexCIDR(t *testing.T) { + tests := []struct { + name string + input string + output string + expectErr bool + }{ + { + name: "IPv4 masked", + input: "1.2.3.4/16", + output: "01020000/ffff0000", + }, + { + name: "IPv4 host", + input: "172.17.0.2/32", + output: "ac110002/ffffffff", + }, + { + name: "IPv6 masked", + input: "2001:dead:beef::cafe/64", + output: "2001deadbeef00000000000000000000/ffffffffffffffff0000000000000000", + }, + { + name: "IPv6 host", + input: "2001::5/128", + output: "20010000000000000000000000000005/ffffffffffffffffffffffffffffffff", + }, + { + name: "invalid CIDR", + input: "foo", + expectErr: true, + }, + } + for _, test := range tests { + output, err := hexCIDR(test.input) + if test.expectErr { + if err == nil { + t.Errorf("case %s: unexpected non-error", test.name) + } + } else { + if err != nil { + t.Errorf("case %s: unexpected error: %v", test.name, err) + } + if output != test.output { + t.Errorf("case %s: expected: %s, saw: %s", + test.name, test.output, output) + } + } + } +} + +func TestAsciiCIDR(t *testing.T) { + tests := []struct { + name string + input string + output string + expectErr bool + }{ + { + name: "IPv4", + input: "01020000/ffff0000", + output: "1.2.0.0/16", + }, + { + name: "IPv4 host", + input: "ac110002/ffffffff", + output: "172.17.0.2/32", + }, + { + name: "IPv6", + input: "2001deadbeef00000000000000000000/ffffffffffffffff0000000000000000", + output: "2001:dead:beef::/64", + }, + { + name: "IPv6 host", + input: "20010000000000000000000000000005/ffffffffffffffffffffffffffffffff", + output: "2001::5/128", + }, + { + name: "invalid CIDR", + input: "malformed", + expectErr: true, + }, + { + name: "non-hex IP", + input: "nonhex/32", + expectErr: true, + }, + { + name: "non-hex mask", + input: "01020000/badmask", + expectErr: true, + }, + } + for _, test := range tests { + output, err := asciiCIDR(test.input) + if test.expectErr { + if err == nil { + t.Errorf("case %s: unexpected non-error", test.name) + } + } else { + if err != nil { + t.Errorf("case %s: unexpected error: %v", test.name, err) + } + if output != test.output { + t.Errorf("case %s: expected: %s, saw: %s", + test.name, test.output, output) + } + } + } +} + +var tcFilterOutput = `filter parent 1: protocol ip pref 1 u32 +filter parent 1: protocol ip pref 1 u32 fh 800: ht divisor 1 +filter parent 1: protocol ip pref 1 u32 fh 800::800 order 2048 key ht 800 bkt 0 flowid 1:1 + match ac110002/ffffffff at 16 +filter parent 1: protocol ip pref 1 u32 fh 800::801 order 2049 key ht 800 bkt 0 flowid 1:2 + match 01020000/ffff0000 at 16 +` +var tcFilterOutputNewVersion = `filter parent 1: protocol ip pref 1 u32 +filter parent 1: protocol ip pref 1 u32 chain 0 fh 800: ht divisor 1 +filter parent 1: protocol ip pref 1 u32 chain 0 fh 800::800 order 2048 key ht 800 bkt 0 flowid 1:1 not_in_hw + match ac110002/ffffffff at 16 +filter parent 1: protocol ip pref 1 u32 chain 0 fh 800::801 order 2049 key ht 800 bkt 0 flowid 1:2 not_in_hw + match 01020000/ffff0000 at 16 +` + +func TestFindCIDRClass(t *testing.T) { + tests := []struct { + cidr string + output string + expectErr bool + expectNotFound bool + expectedClass string + expectedHandle string + err error + }{ + { + cidr: "172.17.0.2/32", + output: tcFilterOutput, + expectedClass: "1:1", + expectedHandle: "800::800", + }, + { + cidr: "1.2.3.4/16", + output: tcFilterOutput, + expectedClass: "1:2", + expectedHandle: "800::801", + }, + { + cidr: "2.2.3.4/16", + output: tcFilterOutput, + expectNotFound: true, + }, + { + cidr: "172.17.0.2/32", + output: tcFilterOutputNewVersion, + expectedClass: "1:1", + expectedHandle: "800::800", + }, + { + cidr: "1.2.3.4/16", + output: tcFilterOutputNewVersion, + expectedClass: "1:2", + expectedHandle: "800::801", + }, + { + cidr: "2.2.3.4/16", + output: tcFilterOutputNewVersion, + expectNotFound: true, + }, + { + err: errors.New("test error"), + expectErr: true, + }, + } + for _, test := range tests { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeAction{ + func() ([]byte, []byte, error) { return []byte(test.output), nil, test.err }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { + return fakeexec.InitFakeCmd(&fcmd, cmd, args...) + }, + }, + } + shaper := &tcShaper{e: &fexec} + classAndHandle, found, err := shaper.findCIDRClass(test.cidr) + if test.expectErr { + if err == nil { + t.Errorf("unexpected non-error") + } + } else { + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if test.expectNotFound { + if found { + t.Errorf("unexpectedly found an interface: %s", classAndHandle) + } + } else { + if classAndHandle[0][0] != test.expectedClass { + t.Errorf("expected class: %s, found %s", test.expectedClass, classAndHandle) + } + if classAndHandle[0][1] != test.expectedHandle { + t.Errorf("expected handle: %s, found %s", test.expectedHandle, classAndHandle) + } + } + } + } +} + +func TestGetCIDRs(t *testing.T) { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeAction{ + func() ([]byte, []byte, error) { return []byte(tcFilterOutput), nil, nil }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { + return fakeexec.InitFakeCmd(&fcmd, cmd, args...) + }, + }, + } + shaper := &tcShaper{e: &fexec} + cidrs, err := shaper.GetCIDRs() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + expectedCidrs := []string{"172.17.0.2/32", "1.2.0.0/16"} + if !reflect.DeepEqual(cidrs, expectedCidrs) { + t.Errorf("expected: %v, saw: %v", expectedCidrs, cidrs) + } +} + +func TestLimit(t *testing.T) { + tests := []struct { + cidr string + ingress *resource.Quantity + egress *resource.Quantity + expectErr bool + expectedCalls int + err error + }{ + { + cidr: "1.2.3.4/32", + ingress: resource.NewQuantity(10, resource.DecimalSI), + egress: resource.NewQuantity(20, resource.DecimalSI), + expectedCalls: 6, + }, + { + cidr: "1.2.3.4/32", + ingress: resource.NewQuantity(10, resource.DecimalSI), + egress: nil, + expectedCalls: 3, + }, + { + cidr: "1.2.3.4/32", + ingress: nil, + egress: resource.NewQuantity(20, resource.DecimalSI), + expectedCalls: 3, + }, + { + cidr: "1.2.3.4/32", + ingress: nil, + egress: nil, + expectedCalls: 0, + }, + { + err: errors.New("test error"), + ingress: resource.NewQuantity(10, resource.DecimalSI), + egress: resource.NewQuantity(20, resource.DecimalSI), + expectErr: true, + }, + } + + for _, test := range tests { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeAction{ + func() ([]byte, []byte, error) { return []byte(tcClassOutput), nil, test.err }, + func() ([]byte, []byte, error) { return []byte{}, nil, test.err }, + func() ([]byte, []byte, error) { return []byte{}, nil, test.err }, + func() ([]byte, []byte, error) { return []byte(tcClassOutput2), nil, test.err }, + func() ([]byte, []byte, error) { return []byte{}, nil, test.err }, + func() ([]byte, []byte, error) { return []byte{}, nil, test.err }, + }, + } + + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + iface := "cbr0" + shaper := &tcShaper{e: &fexec, iface: iface} + if err := shaper.Limit(test.cidr, test.ingress, test.egress); err != nil && !test.expectErr { + t.Errorf("unexpected error: %v", err) + return + } else if err == nil && test.expectErr { + t.Error("unexpected non-error") + return + } + // No more testing in the error case + if test.expectErr { + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("unexpected number of calls: %d, expected: 1", fcmd.CombinedOutputCalls) + } + return + } + + if fcmd.CombinedOutputCalls != test.expectedCalls { + t.Errorf("unexpected number of calls: %d, expected: %d", fcmd.CombinedOutputCalls, test.expectedCalls) + } + + for ix := range fcmd.CombinedOutputLog { + output := fcmd.CombinedOutputLog[ix] + if output[0] != "tc" { + t.Errorf("unexpected command: %s, expected tc", output[0]) + } + if output[4] != iface { + t.Errorf("unexpected interface: %s, expected %s (%v)", output[4], iface, output) + } + if ix == 1 { + var expectedRate string + if test.ingress != nil { + expectedRate = makeKBitString(test.ingress) + } else { + expectedRate = makeKBitString(test.egress) + } + if output[11] != expectedRate { + t.Errorf("unexpected ingress: %s, expected: %s", output[11], expectedRate) + } + if output[8] != "1:5" { + t.Errorf("unexpected class: %s, expected: %s", output[8], "1:5") + } + } + if ix == 2 { + if output[15] != test.cidr { + t.Errorf("unexpected cidr: %s, expected: %s", output[15], test.cidr) + } + if output[17] != "1:5" { + t.Errorf("unexpected class: %s, expected: %s", output[17], "1:5") + } + } + if ix == 4 { + if output[11] != makeKBitString(test.egress) { + t.Errorf("unexpected egress: %s, expected: %s", output[11], makeKBitString(test.egress)) + } + if output[8] != "1:6" { + t.Errorf("unexpected class: %s, expected: %s", output[8], "1:6") + } + } + if ix == 5 { + if output[15] != test.cidr { + t.Errorf("unexpected cidr: %s, expected: %s", output[15], test.cidr) + } + if output[17] != "1:6" { + t.Errorf("unexpected class: %s, expected: %s", output[17], "1:5") + } + } + } + } +} + +func TestReset(t *testing.T) { + tests := []struct { + cidr string + err error + expectErr bool + expectedHandle string + expectedClass string + }{ + { + cidr: "1.2.3.4/16", + expectedHandle: "800::801", + expectedClass: "1:2", + }, + { + cidr: "172.17.0.2/32", + expectedHandle: "800::800", + expectedClass: "1:1", + }, + { + err: errors.New("test error"), + expectErr: true, + }, + } + for _, test := range tests { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeAction{ + func() ([]byte, []byte, error) { return []byte(tcFilterOutput), nil, test.err }, + func() ([]byte, []byte, error) { return []byte{}, nil, test.err }, + func() ([]byte, []byte, error) { return []byte{}, nil, test.err }, + }, + } + + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + iface := "cbr0" + shaper := &tcShaper{e: &fexec, iface: iface} + + if err := shaper.Reset(test.cidr); err != nil && !test.expectErr { + t.Errorf("unexpected error: %v", err) + return + } else if test.expectErr && err == nil { + t.Error("unexpected non-error") + return + } + + // No more testing in the error case + if test.expectErr { + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("unexpected number of calls: %d, expected: 1", fcmd.CombinedOutputCalls) + } + return + } + + if fcmd.CombinedOutputCalls != 3 { + t.Errorf("unexpected number of calls: %d, expected: 3", fcmd.CombinedOutputCalls) + } + + for ix := range fcmd.CombinedOutputLog { + output := fcmd.CombinedOutputLog[ix] + if output[0] != "tc" { + t.Errorf("unexpected command: %s, expected tc", output[0]) + } + if output[4] != iface { + t.Errorf("unexpected interface: %s, expected %s (%v)", output[4], iface, output) + } + if ix == 1 && output[12] != test.expectedHandle { + t.Errorf("unexpected handle: %s, expected: %s", output[12], test.expectedHandle) + } + if ix == 2 && output[8] != test.expectedClass { + t.Errorf("unexpected class: %s, expected: %s", output[8], test.expectedClass) + } + } + } +} + +var tcQdisc = "qdisc htb 1: root refcnt 2 r2q 10 default 30 direct_packets_stat 0\n" + +func TestReconcileInterfaceExists(t *testing.T) { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeAction{ + func() ([]byte, []byte, error) { return []byte(tcQdisc), nil, nil }, + }, + } + + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + iface := "cbr0" + shaper := &tcShaper{e: &fexec, iface: iface} + err := shaper.ReconcileInterface() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("unexpected number of calls: %d", fcmd.CombinedOutputCalls) + } + + output := fcmd.CombinedOutputLog[0] + if len(output) != 5 { + t.Errorf("unexpected command: %v", output) + } + if output[0] != "tc" { + t.Errorf("unexpected command: %s", output[0]) + } + if output[4] != iface { + t.Errorf("unexpected interface: %s, expected %s", output[4], iface) + } + if output[2] != "show" { + t.Errorf("unexpected action: %s", output[2]) + } +} + +func testReconcileInterfaceHasNoData(t *testing.T, output string) { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeAction{ + func() ([]byte, []byte, error) { return []byte(output), nil, nil }, + func() ([]byte, []byte, error) { return []byte(output), nil, nil }, + }, + } + + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + iface := "cbr0" + shaper := &tcShaper{e: &fexec, iface: iface} + err := shaper.ReconcileInterface() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if fcmd.CombinedOutputCalls != 2 { + t.Errorf("unexpected number of calls: %d", fcmd.CombinedOutputCalls) + } + + for ix, output := range fcmd.CombinedOutputLog { + if output[0] != "tc" { + t.Errorf("unexpected command: %s", output[0]) + } + if output[4] != iface { + t.Errorf("unexpected interface: %s, expected %s", output[4], iface) + } + if ix == 0 { + if len(output) != 5 { + t.Errorf("unexpected command: %v", output) + } + if output[2] != "show" { + t.Errorf("unexpected action: %s", output[2]) + } + } + if ix == 1 { + if len(output) != 11 { + t.Errorf("unexpected command: %v", output) + } + if output[2] != "add" { + t.Errorf("unexpected action: %s", output[2]) + } + if output[7] != "1:" { + t.Errorf("unexpected root class: %s", output[7]) + } + if output[8] != "htb" { + t.Errorf("unexpected qdisc algo: %s", output[8]) + } + } + } +} + +func TestReconcileInterfaceDoesntExist(t *testing.T) { + testReconcileInterfaceHasNoData(t, "\n") +} + +var tcQdiscNoqueue = "qdisc noqueue 0: root refcnt 2 \n" + +func TestReconcileInterfaceExistsWithNoqueue(t *testing.T) { + testReconcileInterfaceHasNoData(t, tcQdiscNoqueue) +} + +var tcQdiscWrong = []string{ + "qdisc htb 2: root refcnt 2 r2q 10 default 30 direct_packets_stat 0\n", + "qdisc foo 1: root refcnt 2 r2q 10 default 30 direct_packets_stat 0\n", +} + +func TestReconcileInterfaceIsWrong(t *testing.T) { + for _, test := range tcQdiscWrong { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeAction{ + func() ([]byte, []byte, error) { return []byte(test), nil, nil }, + func() ([]byte, []byte, error) { return []byte("\n"), nil, nil }, + func() ([]byte, []byte, error) { return []byte("\n"), nil, nil }, + }, + } + + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + iface := "cbr0" + shaper := &tcShaper{e: &fexec, iface: iface} + err := shaper.ReconcileInterface() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if fcmd.CombinedOutputCalls != 3 { + t.Errorf("unexpected number of calls: %d", fcmd.CombinedOutputCalls) + } + + for ix, output := range fcmd.CombinedOutputLog { + if output[0] != "tc" { + t.Errorf("unexpected command: %s", output[0]) + } + if output[4] != iface { + t.Errorf("unexpected interface: %s, expected %s", output[4], iface) + } + if ix == 0 { + if len(output) != 5 { + t.Errorf("unexpected command: %v", output) + } + if output[2] != "show" { + t.Errorf("unexpected action: %s", output[2]) + } + } + if ix == 1 { + if len(output) != 8 { + t.Errorf("unexpected command: %v", output) + } + if output[2] != "delete" { + t.Errorf("unexpected action: %s", output[2]) + } + if output[7] != strings.Split(test, " ")[2] { + t.Errorf("unexpected class: %s, expected: %s", output[7], strings.Split(test, " ")[2]) + } + } + if ix == 2 { + if len(output) != 11 { + t.Errorf("unexpected command: %v", output) + } + if output[7] != "1:" { + t.Errorf("unexpected root class: %s", output[7]) + } + if output[8] != "htb" { + t.Errorf("unexpected qdisc algo: %s", output[8]) + } + } + } + } +} diff --git a/network/bandwidth/unsupported.go b/network/bandwidth/unsupported.go new file mode 100644 index 000000000..e3c288015 --- /dev/null +++ b/network/bandwidth/unsupported.go @@ -0,0 +1,54 @@ +//go:build !linux +// +build !linux + +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bandwidth + +import ( + "errors" + + "k8s.io/apimachinery/pkg/api/resource" +) + +type unsupportedShaper struct { +} + +// NewTCShaper makes a new unsupportedShaper for the given interface +func NewTCShaper(iface string) Shaper { + return &unsupportedShaper{} +} + +func (f *unsupportedShaper) Limit(cidr string, egress, ingress *resource.Quantity) error { + return errors.New("unimplemented") +} + +func (f *unsupportedShaper) Reset(cidr string) error { + return nil +} + +func (f *unsupportedShaper) ReconcileInterface() error { + return errors.New("unimplemented") +} + +func (f *unsupportedShaper) ReconcileCIDR(cidr string, egress, ingress *resource.Quantity) error { + return errors.New("unimplemented") +} + +func (f *unsupportedShaper) GetCIDRs() ([]string, error) { + return []string{}, nil +} diff --git a/network/bandwidth/utils.go b/network/bandwidth/utils.go new file mode 100644 index 000000000..05eeb7cda --- /dev/null +++ b/network/bandwidth/utils.go @@ -0,0 +1,66 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bandwidth + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/api/resource" +) + +var minRsrc = resource.MustParse("1k") +var maxRsrc = resource.MustParse("1P") + +func validateBandwidthIsReasonable(rsrc *resource.Quantity) error { + if rsrc.Value() < minRsrc.Value() { + return fmt.Errorf("resource is unreasonably small (< 1kbit)") + } + if rsrc.Value() > maxRsrc.Value() { + return fmt.Errorf("resource is unreasonably large (> 1Pbit)") + } + return nil +} + +// ExtractPodBandwidthResources extracts the ingress and egress from the given pod annotations +func ExtractPodBandwidthResources(podAnnotations map[string]string) (ingress, egress *resource.Quantity, err error) { + if podAnnotations == nil { + return nil, nil, nil + } + str, found := podAnnotations["kubernetes.io/ingress-bandwidth"] + if found { + ingressValue, err := resource.ParseQuantity(str) + if err != nil { + return nil, nil, err + } + ingress = &ingressValue + if err := validateBandwidthIsReasonable(ingress); err != nil { + return nil, nil, err + } + } + str, found = podAnnotations["kubernetes.io/egress-bandwidth"] + if found { + egressValue, err := resource.ParseQuantity(str) + if err != nil { + return nil, nil, err + } + egress = &egressValue + if err := validateBandwidthIsReasonable(egress); err != nil { + return nil, nil, err + } + } + return ingress, egress, nil +} diff --git a/network/bandwidth/utils_test.go b/network/bandwidth/utils_test.go new file mode 100644 index 000000000..5ce2287ac --- /dev/null +++ b/network/bandwidth/utils_test.go @@ -0,0 +1,90 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bandwidth + +import ( + "reflect" + "testing" + + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + api "k8s.io/kubernetes/pkg/apis/core" +) + +func TestExtractPodBandwidthResources(t *testing.T) { + four, _ := resource.ParseQuantity("4M") + ten, _ := resource.ParseQuantity("10M") + twenty, _ := resource.ParseQuantity("20M") + + testPod := func(ingress, egress string) *api.Pod { + pod := &api.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{}}} + if len(ingress) != 0 { + pod.Annotations["kubernetes.io/ingress-bandwidth"] = ingress + } + if len(egress) != 0 { + pod.Annotations["kubernetes.io/egress-bandwidth"] = egress + } + return pod + } + + tests := []struct { + pod *api.Pod + expectedIngress *resource.Quantity + expectedEgress *resource.Quantity + expectError bool + }{ + { + pod: &api.Pod{}, + }, + { + pod: testPod("10M", ""), + expectedIngress: &ten, + }, + { + pod: testPod("", "10M"), + expectedEgress: &ten, + }, + { + pod: testPod("4M", "20M"), + expectedIngress: &four, + expectedEgress: &twenty, + }, + { + pod: testPod("foo", ""), + expectError: true, + }, + } + for _, test := range tests { + ingress, egress, err := ExtractPodBandwidthResources(test.pod.Annotations) + if test.expectError { + if err == nil { + t.Errorf("unexpected non-error") + } + continue + } + if err != nil { + t.Errorf("unexpected error: %v", err) + continue + } + if !reflect.DeepEqual(ingress, test.expectedIngress) { + t.Errorf("expected: %v, saw: %v", ingress, test.expectedIngress) + } + if !reflect.DeepEqual(egress, test.expectedEgress) { + t.Errorf("expected: %v, saw: %v", egress, test.expectedEgress) + } + } +} diff --git a/network/cni/cni.go b/network/cni/cni.go index 38780640d..cfc69766a 100644 --- a/network/cni/cni.go +++ b/network/cni/cni.go @@ -28,12 +28,13 @@ import ( "github.com/Mirantis/cri-dockerd/config" + "github.com/Mirantis/cri-dockerd/network/bandwidth" "github.com/containernetworking/cni/libcni" cnitypes "github.com/containernetworking/cni/pkg/types" "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/wait" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" - "k8s.io/kubernetes/pkg/util/bandwidth" + utilslice "k8s.io/kubernetes/pkg/util/slice" utilexec "k8s.io/utils/exec" diff --git a/network/kubenet/kubenet_linux.go b/network/kubenet/kubenet_linux.go index 980fb0ab8..9f23ee395 100644 --- a/network/kubenet/kubenet_linux.go +++ b/network/kubenet/kubenet_linux.go @@ -40,12 +40,12 @@ import ( utilnet "k8s.io/apimachinery/pkg/util/net" utilsets "k8s.io/apimachinery/pkg/util/sets" utilsysctl "k8s.io/component-helpers/node/util/sysctl" - "k8s.io/kubernetes/pkg/util/bandwidth" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilexec "k8s.io/utils/exec" utilebtables "k8s.io/utils/net/ebtables" "github.com/Mirantis/cri-dockerd/network" + "github.com/Mirantis/cri-dockerd/network/bandwidth" "github.com/Mirantis/cri-dockerd/network/hostport" netutils "k8s.io/utils/net" diff --git a/network/kubenet/kubenet_linux_test.go b/network/kubenet/kubenet_linux_test.go index 670aae84a..a0b2f3b77 100644 --- a/network/kubenet/kubenet_linux_test.go +++ b/network/kubenet/kubenet_linux_test.go @@ -30,12 +30,12 @@ import ( utilsets "k8s.io/apimachinery/pkg/util/sets" sysctltest "k8s.io/component-helpers/node/util/sysctl/testing" - "k8s.io/kubernetes/pkg/util/bandwidth" ipttest "k8s.io/kubernetes/pkg/util/iptables/testing" "k8s.io/utils/exec" fakeexec "k8s.io/utils/exec/testing" "github.com/Mirantis/cri-dockerd/network" + "github.com/Mirantis/cri-dockerd/network/bandwidth" mockcni "github.com/Mirantis/cri-dockerd/network/cni/testing" nettest "github.com/Mirantis/cri-dockerd/network/testing" )