Skip to content

Commit

Permalink
Refactor etcd client code to use channels
Browse files Browse the repository at this point in the history
This allows us to use the etcd API in a more idiomatic way
and should handle better the cases where etcd sends empty events.
  • Loading branch information
thomasferrandiz committed Feb 23, 2023
1 parent fa40bfa commit bfbed94
Show file tree
Hide file tree
Showing 10 changed files with 462 additions and 265 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ test: license-check gofmt deps verify-modules
--cap-add=SYS_ADMIN --rm \
-v $(shell pwd):/go/src/github.com/flannel-io/flannel \
golang:$(GO_VERSION) \
/bin/bash -c 'cd /go/src/github.com/flannel-io/flannel && go test -v -cover $(TEST_PACKAGES_EXPANDED)'
/bin/bash -c 'cd /go/src/github.com/flannel-io/flannel && go test -v -cover -timeout 1m $(TEST_PACKAGES_EXPANDED)'

# Test the docker-opts script
cd dist; ./mk-docker-opts_tests.sh
Expand All @@ -91,6 +91,7 @@ e2e-test: bash_unit dist/flanneld-e2e-$(TAG)-$(ARCH).docker
FLANNEL_DOCKER_IMAGE=$(REGISTRY):$(TAG)-$(ARCH) ./bash_unit dist/functional-test-k8s.sh

k3s-e2e-test: bash_unit
$(MAKE) -C images/iperf3 ARCH=$(ARCH)
./bash_unit ./e2e/run-e2e-tests.sh

cover:
Expand Down
4 changes: 4 additions & 0 deletions dist/functional-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,12 @@ setup() {
}

teardown() {
echo "dumping subnets in etcd"
docker run --rm -e ETCDCTL_API=3 -v "${PWD}/test:/certs" $ETCDCTL_IMG etcdctl --endpoints=$etcd_endpt --cacert=/certs/ca.pem --cert=/certs/client.pem --key=/certs/client-key.pem get --prefix /coreos.com/network/subnets 2>&1
echo "########## logs for flannel-e2e-test-flannel1 container ##########" 2>&1
docker logs flannel-e2e-test-flannel1
echo "########## logs for flannel-e2e-test-flannel2 container ##########" 2>&1
docker logs flannel-e2e-test-flannel2
docker rm -f flannel-e2e-test-flannel1 flannel-e2e-test-flannel2 flannel-e2e-test-flannel1-iperf flannel-host1 flannel-host2 > /dev/null 2>&1
docker run --rm -e ETCDCTL_API=3 -v "${PWD}/test:/certs" $ETCDCTL_IMG etcdctl --endpoints=$etcd_endpt --cacert=/certs/ca.pem --cert=/certs/client.pem --key=/certs/client-key.pem del /coreos.com/network/config > /dev/null 2>&1
}
Expand Down
95 changes: 34 additions & 61 deletions pkg/subnet/etcd/local_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ const (
)

var (
errInterrupted = errors.New("interrupted")
errCanceled = errors.New("canceled")
errInterrupted = errors.New("interrupted")
errCanceled = errors.New("canceled")
errUnimplemented = errors.New("unimplemented")
)

type LocalManager struct {
Expand Down Expand Up @@ -288,7 +289,7 @@ func getNextIndex(cursor interface{}) (int64, error) {
return 0, fmt.Errorf("internal error: watch cursor is of unknown type")
}

return nextIndex, nil
return nextIndex + 1, nil
}

func (m *LocalManager) leaseWatchReset(ctx context.Context, sn ip.IP4Net, sn6 ip.IP6Net) (subnet.LeaseWatchResult, error) {
Expand All @@ -303,64 +304,50 @@ func (m *LocalManager) leaseWatchReset(ctx context.Context, sn ip.IP4Net, sn6 ip
}, nil
}

func (m *LocalManager) WatchLease(ctx context.Context, sn ip.IP4Net, sn6 ip.IP6Net, cursor interface{}) (subnet.LeaseWatchResult, error) {
if cursor == nil {
return m.leaseWatchReset(ctx, sn, sn6)
}

nextIndex, err := getNextIndex(cursor)
func (m *LocalManager) WatchLease(ctx context.Context, sn ip.IP4Net, sn6 ip.IP6Net, receiver chan []subnet.LeaseWatchResult) error {
wr, err := m.leaseWatchReset(ctx, sn, sn6)
if err != nil {
return subnet.LeaseWatchResult{}, err
return err
}

evt, index, err := m.registry.watchSubnet(ctx, nextIndex, sn, sn6)

switch {
case err == nil:
return subnet.LeaseWatchResult{
Events: []subnet.Event{evt},
Cursor: watchCursor{index},
}, nil
log.Info("manager.WatchLease: sending reset results...")
//send the result of leaseWatchResult to allow the listener
//to catch-up to the current state
receiver <- []subnet.LeaseWatchResult{wr}

case isIndexTooSmall(err):
log.Warning("Watch of subnet leases failed because etcd index outside history window")
return m.leaseWatchReset(ctx, sn, sn6)

default:
return subnet.LeaseWatchResult{}, err
nextIndex, err := getNextIndex(wr.Cursor)
if err != nil {
return err
}
}

func (m *LocalManager) WatchLeases(ctx context.Context, cursor interface{}) (subnet.LeaseWatchResult, error) {
if cursor == nil {
return m.leasesWatchReset(ctx)
err = m.registry.watchSubnet(ctx, nextIndex, sn, sn6, receiver)
if err != nil {
return err
}
return nil
}

nextIndex, err := getNextIndex(cursor)
func (m *LocalManager) WatchLeases(ctx context.Context, receiver chan []subnet.LeaseWatchResult) error {
wr, err := m.registry.leasesWatchReset(ctx)
if err != nil {
return subnet.LeaseWatchResult{}, err
return err
}

evt, index, err := m.registry.watchSubnets(ctx, nextIndex)
switch {
case err == nil:
//TODO only vxlan backend and kube subnet manager support dual stack now.
evt.Lease.EnableIPv4 = true
return subnet.LeaseWatchResult{
Events: []subnet.Event{evt},
Cursor: watchCursor{index},
}, nil

case isIndexTooSmall(err):
log.Warning("Watch of subnet leases failed because etcd index outside history window")
return m.leasesWatchReset(ctx)
// send the result of leasesWatchReset to the listener
// to catch-up on the state if the registry
// before starting to watch changes
receiver <- []subnet.LeaseWatchResult{wr}

case index != 0:
return subnet.LeaseWatchResult{Cursor: watchCursor{index}}, err
nextIndex, err := getNextIndex(wr.Cursor)
if err != nil {
return err
}

default:
return subnet.LeaseWatchResult{}, err
err = m.registry.watchSubnets(ctx, receiver, nextIndex)
if err != nil {
return err
}
return nil
}

// CompleteLease monitor lease
Expand Down Expand Up @@ -414,20 +401,6 @@ func isIndexTooSmall(err error) bool {
return err == rpctypes.ErrGRPCCompacted
}

// leasesWatchReset is called when incremental lease watch failed and we need to grab a snapshot
func (m *LocalManager) leasesWatchReset(ctx context.Context) (subnet.LeaseWatchResult, error) {
wr := subnet.LeaseWatchResult{}

leases, index, err := m.registry.getSubnets(ctx)
if err != nil {
return wr, fmt.Errorf("failed to retrieve subnet leases: %v", err)
}

wr.Cursor = watchCursor{index}
wr.Snapshot = leases
return wr, nil
}

func isSubnetConfigCompat(config *subnet.Config, sn ip.IP4Net) bool {
if sn.IP < config.SubnetMin || sn.IP > config.SubnetMax {
return false
Expand Down
74 changes: 22 additions & 52 deletions pkg/subnet/etcd/mock_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/jonboulle/clockwork"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"golang.org/x/net/context"
log "k8s.io/klog"
)

var clock clockwork.Clock = clockwork.NewRealClock()
Expand All @@ -38,6 +39,7 @@ type netwk struct {
}

func (n *netwk) sendSubnetEvent(sn ip.IP4Net, e event) {
log.Infof("sendSubnetEvent: sn=[ %s ], e=[ %v ]", sn, e)
n.subnetsEvents <- e

n.mux.Lock()
Expand All @@ -50,17 +52,6 @@ func (n *netwk) sendSubnetEvent(sn ip.IP4Net, e event) {
c <- e
}

func (n *netwk) subnetEventsChan(sn ip.IP4Net) chan event {
n.mux.Lock()
c, ok := n.subnetEvents[sn]
if !ok {
c = make(chan event, 10)
n.subnetEvents[sn] = c
}
n.mux.Unlock()
return c
}

type event struct {
evt Event
index int64
Expand Down Expand Up @@ -95,8 +86,8 @@ func (msr *MockSubnetRegistry) setConfig(config string) error {
}

func (msr *MockSubnetRegistry) getSubnets(ctx context.Context) ([]Lease, int64, error) {
//msr.mux.Lock()
//defer msr.mux.Unlock()
msr.mux.Lock()
defer msr.mux.Unlock()

subs := make([]Lease, len(msr.network.subnets))
copy(subs, msr.network.subnets)
Expand Down Expand Up @@ -206,68 +197,47 @@ func (msr *MockSubnetRegistry) deleteSubnet(ctx context.Context, sn ip.IP4Net, s
return nil
}

func (msr *MockSubnetRegistry) watchSubnets(ctx context.Context, since int64) (Event, int64, error) {
func (msr *MockSubnetRegistry) watchSubnets(ctx context.Context, leaseWatchChan chan []LeaseWatchResult, since int64) error {
log.Infof("watchSubnets started with since= [ %d]", since)
for {
msr.mux.Lock()
index := msr.index
msr.mux.Unlock()

if since < index {
return Event{}, 0, rpctypes.ErrGRPCCompacted
return rpctypes.ErrGRPCCompacted
}

select {
case <-ctx.Done():
return Event{}, 0, ctx.Err()
close(leaseWatchChan)
return ctx.Err()

case e := <-msr.network.subnetsEvents:
if e.index > since {
return e.evt, e.index, nil
leaseWatchChan <- []LeaseWatchResult{
{Events: []Event{e.evt},
Cursor: e.index}}
}
}
}
}

// TODO ignores ip6
func (msr *MockSubnetRegistry) watchSubnet(ctx context.Context, since int64, sn ip.IP4Net, sn6 ip.IP6Net) (Event, int64, error) {
for {
msr.mux.Lock()
index := msr.index
msr.mux.Unlock()

if since < index {
return Event{}, msr.index, rpctypes.ErrGRPCCompacted
}

select {
case <-ctx.Done():
return Event{}, index, ctx.Err()

case e := <-msr.network.subnetEventsChan(sn):
if e.index > since {
return e.evt, index, nil
}
}
}
func (msr *MockSubnetRegistry) watchSubnet(ctx context.Context, since int64, sn ip.IP4Net, sn6 ip.IP6Net, leaseWatchChan chan []LeaseWatchResult) error {
return errUnimplemented
}

func (msr *MockSubnetRegistry) expireSubnet(network string, sn ip.IP4Net) {
if sub, i, err := msr.network.findSubnet(sn); err == nil {
msr.index += 1
msr.network.subnets[i] = msr.network.subnets[len(msr.network.subnets)-1]
msr.network.subnets = msr.network.subnets[:len(msr.network.subnets)-1]
sub.Asof = msr.index
msr.network.sendSubnetEvent(sn, event{
Event{
Type: EventRemoved,
Lease: sub,
}, msr.index,
})
func (msr *MockSubnetRegistry) leasesWatchReset(ctx context.Context) (LeaseWatchResult, error) {
wr := LeaseWatchResult{}
leases, index, err := msr.getSubnets(ctx)
if err != nil {
return wr, fmt.Errorf("failed to retrieve subnet leases: %v", err)
}
}

func (msr *MockSubnetRegistry) getNetwork(ctx context.Context) (*netwk, error) {
return msr.network, nil
wr.Cursor = watchCursor{index}
wr.Snapshot = leases
return wr, nil
}

func (n *netwk) findSubnet(sn ip.IP4Net) (Lease, int, error) {
Expand Down
Loading

0 comments on commit bfbed94

Please sign in to comment.