diff --git a/.gitignore b/.gitignore index e3e04153..9a61f93d 100644 --- a/.gitignore +++ b/.gitignore @@ -9,5 +9,8 @@ kne_cli/kne_cli kne_cli/kne controller/server/server x/webhook/webhook -x/wire/client/client -x/wire/server/server +x/wire/file/client/client +x/wire/file/server/server +x/wire/intf/client/client +x/wire/intf/server/server +x/wire/forward/forward diff --git a/examples/forward/topology.textproto b/examples/forward/topology.textproto new file mode 100644 index 00000000..7e8b1b27 --- /dev/null +++ b/examples/forward/topology.textproto @@ -0,0 +1,109 @@ +name: "forward-basic" +nodes: { + name: "r1" + vendor: OPENCONFIG + model: "LEMMING" +} +nodes: { + name: "r2" + vendor: OPENCONFIG + model: "LEMMING" +} +nodes: { + name: "fwd1" + vendor: FORWARD + config: { + image: "forward:latest" + vendor_data { + [type.googleapis.com/forward.ForwardConfig] { + wires: { + a: { + interface: { + name: "eth1" + } + } + z: { + local_node: { + name: "fwd2" + interface: "eth1" + } + } + } + wires: { + a: { + local_node: { + name: "fw2" + interface: "eth2" + } + } + z: { + interface: { + name: "eth2" + } + } + } + } + } + } +} +nodes: { + name: "fwd2" + vendor: FORWARD + config: { + image: "forward:latest" + vendor_data { + [type.googleapis.com/forward.ForwardConfig] { + wires: { + a: { + interface: { + name: "eth2" + } + } + z: { + local_node: { + name: "fwd1" + interface: "eth2" + } + } + } + wires: { + a: { + local_node: { + name: "fw1" + interface: "eth1" + } + } + z: { + interface: { + name: "eth1" + } + } + } + } + } + } +} +links: { + a_node: "r1" + a_int: "eth1" + z_node: "fwd1" + z_int: "eth1" +} +links: { + a_node: "fwd2" + a_int: "eth1" + z_node: "r2" + z_int: "eth1" +} +links: { + a_node: "r2" + a_int: "eth2" + z_node: "fwd2" + z_int: "eth2" +} +links: { + a_node: "fwd1" + a_int: "eth2" + z_node: "r1" + z_int: "eth2" +} diff --git a/go.mod b/go.mod index b995101b..fc632214 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/golang/glog v1.2.0 github.com/golang/mock v1.6.0 github.com/google/go-cmp v0.6.0 + github.com/google/gopacket v1.1.19 github.com/kr/pretty v0.3.1 github.com/networkop/meshnet-cni v0.3.1-0.20230525201116-d7c306c635cf github.com/open-traffic-generator/keng-operator v0.3.28 diff --git a/go.sum b/go.sum index 3ea805da..e0c72e58 100644 --- a/go.sum +++ b/go.sum @@ -1028,6 +1028,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= +github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= diff --git a/proto/forward.proto b/proto/forward.proto new file mode 100644 index 00000000..c9f0a50a --- /dev/null +++ b/proto/forward.proto @@ -0,0 +1,59 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +syntax = "proto3"; + +package forward; + +option go_package = "github.com/openconfig/kne/proto/forward"; + +// Forward specific config data for KNE +message ForwardConfig { + repeated Wire wires = 1; +} + +// Wire is a connection between two endpoints intended to forward +// data bidirectionally between them. One of the endpoints must be +// of type Interface. +message Wire { + // The a endpoint serves as the client endpoint in the bidirectional stream. + Endpoint a = 1; + // The z endpoint serves as the server endpoint in the bidirectional stream. + Endpoint z = 2; +} + +message Endpoint { + oneof endpoint { + // Interface is a local interface (ex. eth0) on the node. + Interface interface = 1; + // LocalNode is a node in the same cluster. + LocalNode local_node = 2; + // RemoteNode is a node in a different cluster. + RemoteNode remote_node = 3; + } +} + +message Interface { + string name = 1; + // TODO: Add MTU and other configurable fields. +} + +message LocalNode { + string name = 1; + string interface = 2; +} + +message RemoteNode { + string addr = 1; + string interface = 2; +} diff --git a/proto/forward/forward.pb.go b/proto/forward/forward.pb.go new file mode 100644 index 00000000..c70f6e2c --- /dev/null +++ b/proto/forward/forward.pb.go @@ -0,0 +1,577 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.12 +// source: forward.proto + +package forward + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Forward specific config data for KNE +type ForwardConfig struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Wires []*Wire `protobuf:"bytes,1,rep,name=wires,proto3" json:"wires,omitempty"` +} + +func (x *ForwardConfig) Reset() { + *x = ForwardConfig{} + if protoimpl.UnsafeEnabled { + mi := &file_forward_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ForwardConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ForwardConfig) ProtoMessage() {} + +func (x *ForwardConfig) ProtoReflect() protoreflect.Message { + mi := &file_forward_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ForwardConfig.ProtoReflect.Descriptor instead. +func (*ForwardConfig) Descriptor() ([]byte, []int) { + return file_forward_proto_rawDescGZIP(), []int{0} +} + +func (x *ForwardConfig) GetWires() []*Wire { + if x != nil { + return x.Wires + } + return nil +} + +// Wire is a connection between two endpoints intended to forward +// data bidirectionally between them. One of the endpoints must be +// of type Interface. +type Wire struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The a endpoint serves as the client endpoint in the bidirectional stream. + A *Endpoint `protobuf:"bytes,1,opt,name=a,proto3" json:"a,omitempty"` + // The z endpoint serves as the server endpoint in the bidirectional stream. + Z *Endpoint `protobuf:"bytes,2,opt,name=z,proto3" json:"z,omitempty"` +} + +func (x *Wire) Reset() { + *x = Wire{} + if protoimpl.UnsafeEnabled { + mi := &file_forward_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Wire) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Wire) ProtoMessage() {} + +func (x *Wire) ProtoReflect() protoreflect.Message { + mi := &file_forward_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Wire.ProtoReflect.Descriptor instead. +func (*Wire) Descriptor() ([]byte, []int) { + return file_forward_proto_rawDescGZIP(), []int{1} +} + +func (x *Wire) GetA() *Endpoint { + if x != nil { + return x.A + } + return nil +} + +func (x *Wire) GetZ() *Endpoint { + if x != nil { + return x.Z + } + return nil +} + +type Endpoint struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Endpoint: + // + // *Endpoint_Interface + // *Endpoint_LocalNode + // *Endpoint_RemoteNode + Endpoint isEndpoint_Endpoint `protobuf_oneof:"endpoint"` +} + +func (x *Endpoint) Reset() { + *x = Endpoint{} + if protoimpl.UnsafeEnabled { + mi := &file_forward_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Endpoint) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Endpoint) ProtoMessage() {} + +func (x *Endpoint) ProtoReflect() protoreflect.Message { + mi := &file_forward_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Endpoint.ProtoReflect.Descriptor instead. +func (*Endpoint) Descriptor() ([]byte, []int) { + return file_forward_proto_rawDescGZIP(), []int{2} +} + +func (m *Endpoint) GetEndpoint() isEndpoint_Endpoint { + if m != nil { + return m.Endpoint + } + return nil +} + +func (x *Endpoint) GetInterface() *Interface { + if x, ok := x.GetEndpoint().(*Endpoint_Interface); ok { + return x.Interface + } + return nil +} + +func (x *Endpoint) GetLocalNode() *LocalNode { + if x, ok := x.GetEndpoint().(*Endpoint_LocalNode); ok { + return x.LocalNode + } + return nil +} + +func (x *Endpoint) GetRemoteNode() *RemoteNode { + if x, ok := x.GetEndpoint().(*Endpoint_RemoteNode); ok { + return x.RemoteNode + } + return nil +} + +type isEndpoint_Endpoint interface { + isEndpoint_Endpoint() +} + +type Endpoint_Interface struct { + // Interface is a local interface (ex. eth0) on the node. + Interface *Interface `protobuf:"bytes,1,opt,name=interface,proto3,oneof"` +} + +type Endpoint_LocalNode struct { + // LocalNode is a node in the same cluster. + LocalNode *LocalNode `protobuf:"bytes,2,opt,name=local_node,json=localNode,proto3,oneof"` +} + +type Endpoint_RemoteNode struct { + // RemoteNode is a node in a different cluster. + RemoteNode *RemoteNode `protobuf:"bytes,3,opt,name=remote_node,json=remoteNode,proto3,oneof"` +} + +func (*Endpoint_Interface) isEndpoint_Endpoint() {} + +func (*Endpoint_LocalNode) isEndpoint_Endpoint() {} + +func (*Endpoint_RemoteNode) isEndpoint_Endpoint() {} + +type Interface struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` // TODO: Add MTU and other configurable fields. +} + +func (x *Interface) Reset() { + *x = Interface{} + if protoimpl.UnsafeEnabled { + mi := &file_forward_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Interface) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Interface) ProtoMessage() {} + +func (x *Interface) ProtoReflect() protoreflect.Message { + mi := &file_forward_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Interface.ProtoReflect.Descriptor instead. +func (*Interface) Descriptor() ([]byte, []int) { + return file_forward_proto_rawDescGZIP(), []int{3} +} + +func (x *Interface) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +type LocalNode struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Interface string `protobuf:"bytes,2,opt,name=interface,proto3" json:"interface,omitempty"` +} + +func (x *LocalNode) Reset() { + *x = LocalNode{} + if protoimpl.UnsafeEnabled { + mi := &file_forward_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LocalNode) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LocalNode) ProtoMessage() {} + +func (x *LocalNode) ProtoReflect() protoreflect.Message { + mi := &file_forward_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LocalNode.ProtoReflect.Descriptor instead. +func (*LocalNode) Descriptor() ([]byte, []int) { + return file_forward_proto_rawDescGZIP(), []int{4} +} + +func (x *LocalNode) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *LocalNode) GetInterface() string { + if x != nil { + return x.Interface + } + return "" +} + +type RemoteNode struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Addr string `protobuf:"bytes,1,opt,name=addr,proto3" json:"addr,omitempty"` + Interface string `protobuf:"bytes,2,opt,name=interface,proto3" json:"interface,omitempty"` +} + +func (x *RemoteNode) Reset() { + *x = RemoteNode{} + if protoimpl.UnsafeEnabled { + mi := &file_forward_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RemoteNode) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RemoteNode) ProtoMessage() {} + +func (x *RemoteNode) ProtoReflect() protoreflect.Message { + mi := &file_forward_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RemoteNode.ProtoReflect.Descriptor instead. +func (*RemoteNode) Descriptor() ([]byte, []int) { + return file_forward_proto_rawDescGZIP(), []int{5} +} + +func (x *RemoteNode) GetAddr() string { + if x != nil { + return x.Addr + } + return "" +} + +func (x *RemoteNode) GetInterface() string { + if x != nil { + return x.Interface + } + return "" +} + +var File_forward_proto protoreflect.FileDescriptor + +var file_forward_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x66, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x07, 0x66, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x22, 0x34, 0x0a, 0x0d, 0x46, 0x6f, 0x72, 0x77, + 0x61, 0x72, 0x64, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x23, 0x0a, 0x05, 0x77, 0x69, 0x72, + 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x66, 0x6f, 0x72, 0x77, 0x61, + 0x72, 0x64, 0x2e, 0x57, 0x69, 0x72, 0x65, 0x52, 0x05, 0x77, 0x69, 0x72, 0x65, 0x73, 0x22, 0x48, + 0x0a, 0x04, 0x57, 0x69, 0x72, 0x65, 0x12, 0x1f, 0x0a, 0x01, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x11, 0x2e, 0x66, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x2e, 0x45, 0x6e, 0x64, 0x70, + 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x01, 0x61, 0x12, 0x1f, 0x0a, 0x01, 0x7a, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x66, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x2e, 0x45, 0x6e, 0x64, + 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x01, 0x7a, 0x22, 0xb7, 0x01, 0x0a, 0x08, 0x45, 0x6e, 0x64, + 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x32, 0x0a, 0x09, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x66, 0x61, + 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x66, 0x6f, 0x72, 0x77, 0x61, + 0x72, 0x64, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x66, 0x61, 0x63, 0x65, 0x48, 0x00, 0x52, 0x09, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x66, 0x61, 0x63, 0x65, 0x12, 0x33, 0x0a, 0x0a, 0x6c, 0x6f, 0x63, + 0x61, 0x6c, 0x5f, 0x6e, 0x6f, 0x64, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, + 0x66, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x2e, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x4e, 0x6f, 0x64, + 0x65, 0x48, 0x00, 0x52, 0x09, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x4e, 0x6f, 0x64, 0x65, 0x12, 0x36, + 0x0a, 0x0b, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x6e, 0x6f, 0x64, 0x65, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x66, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x2e, 0x52, 0x65, + 0x6d, 0x6f, 0x74, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x48, 0x00, 0x52, 0x0a, 0x72, 0x65, 0x6d, 0x6f, + 0x74, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x42, 0x0a, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, + 0x6e, 0x74, 0x22, 0x1f, 0x0a, 0x09, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x66, 0x61, 0x63, 0x65, 0x12, + 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, + 0x61, 0x6d, 0x65, 0x22, 0x3d, 0x0a, 0x09, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x4e, 0x6f, 0x64, 0x65, + 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x66, 0x61, 0x63, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x66, 0x61, + 0x63, 0x65, 0x22, 0x3e, 0x0a, 0x0a, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4e, 0x6f, 0x64, 0x65, + 0x12, 0x12, 0x0a, 0x04, 0x61, 0x64, 0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x61, 0x64, 0x64, 0x72, 0x12, 0x1c, 0x0a, 0x09, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x66, 0x61, 0x63, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x66, 0x61, + 0x63, 0x65, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2f, 0x6b, 0x6e, 0x65, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x66, 0x6f, 0x72, 0x77, 0x61, 0x72, 0x64, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_forward_proto_rawDescOnce sync.Once + file_forward_proto_rawDescData = file_forward_proto_rawDesc +) + +func file_forward_proto_rawDescGZIP() []byte { + file_forward_proto_rawDescOnce.Do(func() { + file_forward_proto_rawDescData = protoimpl.X.CompressGZIP(file_forward_proto_rawDescData) + }) + return file_forward_proto_rawDescData +} + +var file_forward_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_forward_proto_goTypes = []interface{}{ + (*ForwardConfig)(nil), // 0: forward.ForwardConfig + (*Wire)(nil), // 1: forward.Wire + (*Endpoint)(nil), // 2: forward.Endpoint + (*Interface)(nil), // 3: forward.Interface + (*LocalNode)(nil), // 4: forward.LocalNode + (*RemoteNode)(nil), // 5: forward.RemoteNode +} +var file_forward_proto_depIdxs = []int32{ + 1, // 0: forward.ForwardConfig.wires:type_name -> forward.Wire + 2, // 1: forward.Wire.a:type_name -> forward.Endpoint + 2, // 2: forward.Wire.z:type_name -> forward.Endpoint + 3, // 3: forward.Endpoint.interface:type_name -> forward.Interface + 4, // 4: forward.Endpoint.local_node:type_name -> forward.LocalNode + 5, // 5: forward.Endpoint.remote_node:type_name -> forward.RemoteNode + 6, // [6:6] is the sub-list for method output_type + 6, // [6:6] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name +} + +func init() { file_forward_proto_init() } +func file_forward_proto_init() { + if File_forward_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_forward_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ForwardConfig); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_forward_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Wire); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_forward_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Endpoint); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_forward_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Interface); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_forward_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*LocalNode); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_forward_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RemoteNode); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_forward_proto_msgTypes[2].OneofWrappers = []interface{}{ + (*Endpoint_Interface)(nil), + (*Endpoint_LocalNode)(nil), + (*Endpoint_RemoteNode)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_forward_proto_rawDesc, + NumEnums: 0, + NumMessages: 6, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_forward_proto_goTypes, + DependencyIndexes: file_forward_proto_depIdxs, + MessageInfos: file_forward_proto_msgTypes, + }.Build() + File_forward_proto = out.File + file_forward_proto_rawDesc = nil + file_forward_proto_goTypes = nil + file_forward_proto_depIdxs = nil +} diff --git a/proto/generate.go b/proto/generate.go index 8a8e2458..e47e9d0b 100644 --- a/proto/generate.go +++ b/proto/generate.go @@ -5,4 +5,5 @@ package proto //go:generate protoc --go_out=./ceos --go_opt=paths=source_relative ./ceos.proto //go:generate protoc --go_out=./controller --go-grpc_out=./controller --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative ./controller.proto //go:generate protoc --go_out=./wire --go-grpc_out=./wire --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative ./wire.proto +//go:generate protoc --go_out=./forward --go-grpc_out=./forward --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative ./forward.proto //go:generate protoc --go_out=./event --go-grpc_out=./event --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative ./event.proto diff --git a/proto/topo.proto b/proto/topo.proto index edcd1998..59c1757e 100644 --- a/proto/topo.proto +++ b/proto/topo.proto @@ -43,6 +43,7 @@ enum Vendor { OPENCONFIG = 10; ALPINE = 11; DRIVENETS = 12; + FORWARD = 13; } // Node is a single container inside the topology diff --git a/proto/topo/topo.pb.go b/proto/topo/topo.pb.go index 7e1aa954..c489ca40 100644 --- a/proto/topo/topo.pb.go +++ b/proto/topo/topo.pb.go @@ -53,6 +53,7 @@ const ( Vendor_OPENCONFIG Vendor = 10 Vendor_ALPINE Vendor = 11 Vendor_DRIVENETS Vendor = 12 + Vendor_FORWARD Vendor = 13 ) // Enum value maps for Vendor. @@ -71,6 +72,7 @@ var ( 10: "OPENCONFIG", 11: "ALPINE", 12: "DRIVENETS", + 13: "FORWARD", } Vendor_value = map[string]int32{ "UNKNOWN": 0, @@ -86,6 +88,7 @@ var ( "OPENCONFIG": 10, "ALPINE": 11, "DRIVENETS": 12, + "FORWARD": 13, } ) @@ -1471,7 +1474,7 @@ var file_topo_proto_rawDesc = []byte{ 0x08, 0x69, 0x6e, 0x73, 0x69, 0x64, 0x65, 0x49, 0x70, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x18, - 0x07, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x2a, 0xa7, 0x01, 0x0a, + 0x07, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x2a, 0xb4, 0x01, 0x0a, 0x06, 0x56, 0x65, 0x6e, 0x64, 0x6f, 0x72, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x48, 0x4f, 0x53, 0x54, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x52, 0x49, 0x53, 0x54, 0x41, 0x10, 0x02, 0x12, 0x09, 0x0a, 0x05, 0x43, 0x49, @@ -1482,10 +1485,11 @@ var file_topo_proto_rawDesc = []byte{ 0x12, 0x09, 0x0a, 0x05, 0x4e, 0x4f, 0x4b, 0x49, 0x41, 0x10, 0x09, 0x12, 0x0e, 0x0a, 0x0a, 0x4f, 0x50, 0x45, 0x4e, 0x43, 0x4f, 0x4e, 0x46, 0x49, 0x47, 0x10, 0x0a, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x4c, 0x50, 0x49, 0x4e, 0x45, 0x10, 0x0b, 0x12, 0x0d, 0x0a, 0x09, 0x44, 0x52, 0x49, 0x56, 0x45, - 0x4e, 0x45, 0x54, 0x53, 0x10, 0x0c, 0x42, 0x26, 0x5a, 0x24, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2f, - 0x6b, 0x6e, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x74, 0x6f, 0x70, 0x6f, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x4e, 0x45, 0x54, 0x53, 0x10, 0x0c, 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x4f, 0x52, 0x57, 0x41, 0x52, + 0x44, 0x10, 0x0d, 0x42, 0x26, 0x5a, 0x24, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2f, 0x6b, 0x6e, 0x65, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x74, 0x6f, 0x70, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( diff --git a/topo/node/forward/forward.go b/topo/node/forward/forward.go new file mode 100644 index 00000000..fd7cae17 --- /dev/null +++ b/topo/node/forward/forward.go @@ -0,0 +1,232 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package forward + +import ( + "context" + "fmt" + "net" + + fpb "github.com/openconfig/kne/proto/forward" + tpb "github.com/openconfig/kne/proto/topo" + "github.com/openconfig/kne/topo/node" + "google.golang.org/protobuf/encoding/prototext" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + log "k8s.io/klog/v2" + "k8s.io/utils/pointer" +) + +const ( + fwdPort = "50058" +) + +func New(nodeImpl *node.Impl) (node.Node, error) { + if nodeImpl == nil { + return nil, fmt.Errorf("nodeImpl cannot be nil") + } + if nodeImpl.Proto == nil { + return nil, fmt.Errorf("nodeImpl.Proto cannot be nil") + } + cfg := defaults(nodeImpl.Proto) + nodeImpl.Proto = cfg + n := &Node{ + Impl: nodeImpl, + } + return n, nil +} + +type Node struct { + *node.Impl +} + +func (n *Node) Create(ctx context.Context) error { + if err := n.ValidateConstraints(); err != nil { + return fmt.Errorf("node %s failed to validate node with errors: %s", n.Name(), err) + } + if err := n.CreatePod(ctx); err != nil { + return fmt.Errorf("node %s failed to create pod %w", n.Name(), err) + } + if err := n.CreateService(ctx); err != nil { + return fmt.Errorf("node %s failed to create service %w", n.Name(), err) + } + return nil +} + +func interfaceFlag(intf string) string { + return fmt.Sprintf("--interfaces=%s", intf) +} + +func endpointFlag(lintf, addr, rintf string) string { + return fmt.Sprintf("--endpoints=%s/%s/%s", lintf, addr, rintf) +} + +func wireToArg(wire *fpb.Wire) (string, error) { + switch at := wire.A.Endpoint.(type) { + case *fpb.Endpoint_Interface: + // If A is an interface, then this node should serve as the fwd client for this wire. + // Additionally Z should not be an interface. + switch zt := wire.Z.Endpoint.(type) { + case *fpb.Endpoint_Interface: + return "", fmt.Errorf("endpoints A and Z cannot both be interfaces") + case *fpb.Endpoint_LocalNode: + ln := wire.GetZ().GetLocalNode() + return endpointFlag(wire.GetA().GetInterface().GetName(), net.JoinHostPort(ln.GetName(), fwdPort), ln.GetInterface()), nil + default: + return "", fmt.Errorf("endpoint Z type not supported: %T", zt) + } + case *fpb.Endpoint_LocalNode: + // If A is not an interface, then this node should serve as the fwd server for this wire. + // Additionally Z should be an interface. + switch zt := wire.Z.Endpoint.(type) { + case *fpb.Endpoint_Interface: + return interfaceFlag(wire.GetZ().GetInterface().GetName()), nil + case *fpb.Endpoint_LocalNode: + return "", fmt.Errorf("one of endpoints A and Z must be an interface") + default: + return "", fmt.Errorf("endpoint Z type not supported: %T", zt) + } + default: + return "", fmt.Errorf("endpoint A type not supported: %T", at) + } +} + +// CreatePod creates a Pod for the Node based on the underlying proto. +func (n *Node) CreatePod(ctx context.Context) error { + pb := n.Proto + log.Infof("Creating Pod:\n %+v", pb) + initContainerImage := pb.Config.InitImage + if initContainerImage == "" { + initContainerImage = node.DefaultInitContainerImage + } + + fwdArgs := pb.Config.Args + if vendorData := pb.Config.GetVendorData(); vendorData != nil { + fwdCfg := &fpb.ForwardConfig{} + if err := vendorData.UnmarshalTo(fwdCfg); err != nil { + return err + } + log.Infof("Got fwdCfg: %v", prototext.Format(fwdCfg)) + for _, wire := range fwdCfg.GetWires() { + arg, err := wireToArg(wire) + if err != nil { + return err + } + fwdArgs = append(fwdArgs, arg) + } + } + log.Infof("Using container args: %v", fwdArgs) + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: pb.Name, + Labels: map[string]string{ + "app": pb.Name, + "topo": n.Namespace, + }, + }, + Spec: corev1.PodSpec{ + InitContainers: []corev1.Container{{ + Name: fmt.Sprintf("init-%s", pb.Name), + Image: initContainerImage, + Args: []string{ + fmt.Sprintf("%d", len(n.Proto.Interfaces)+1), + fmt.Sprintf("%d", pb.Config.Sleep), + }, + ImagePullPolicy: "IfNotPresent", + }}, + Containers: []corev1.Container{{ + Name: pb.Name, + Image: pb.Config.Image, + Command: pb.Config.Command, + Args: fwdArgs, + Env: node.ToEnvVar(pb.Config.Env), + Resources: node.ToResourceRequirements(pb.Constraints), + ImagePullPolicy: "IfNotPresent", + SecurityContext: &corev1.SecurityContext{ + Privileged: pointer.Bool(true), + }, + }}, + TerminationGracePeriodSeconds: pointer.Int64(0), + NodeSelector: map[string]string{}, + Affinity: &corev1.Affinity{ + PodAntiAffinity: &corev1.PodAntiAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{{ + Weight: 100, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{{ + Key: "topo", + Operator: "In", + Values: []string{pb.Name}, + }}, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }}, + }, + }, + }, + } + for label, v := range n.GetProto().GetLabels() { + pod.ObjectMeta.Labels[label] = v + } + if pb.Config.ConfigData != nil { + vol, err := n.CreateConfig(ctx) + if err != nil { + return err + } + pod.Spec.Volumes = append(pod.Spec.Volumes, *vol) + vm := corev1.VolumeMount{ + Name: node.ConfigVolumeName, + MountPath: pb.Config.ConfigPath + "/" + pb.Config.ConfigFile, + ReadOnly: true, + } + if vol.VolumeSource.ConfigMap != nil { + vm.SubPath = pb.Config.ConfigFile + } + for i, c := range pod.Spec.Containers { + pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, vm) + } + } + sPod, err := n.KubeClient.CoreV1().Pods(n.Namespace).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + return err + } + log.Infof("Pod created:\n%+v\n", sPod) + return nil +} + +func defaults(pb *tpb.Node) *tpb.Node { + if pb.Config == nil { + pb.Config = &tpb.Config{} + } + if pb.Config.EntryCommand == "" { + pb.Config.EntryCommand = fmt.Sprintf("kubectl exec -it %s -- sh", pb.Name) + } + if pb.Config.Image == "" { + pb.Config.Image = "forward:latest" + } + if pb.Config.ConfigPath == "" { + pb.Config.ConfigPath = "/etc" + } + if pb.Config.ConfigFile == "" { + pb.Config.ConfigFile = "config" + } + return pb +} + +func init() { + node.Vendor(tpb.Vendor_FORWARD, New) +} diff --git a/topo/node/forward/forward_test.go b/topo/node/forward/forward_test.go new file mode 100644 index 00000000..0e837e07 --- /dev/null +++ b/topo/node/forward/forward_test.go @@ -0,0 +1,109 @@ +package forward + +import ( + "fmt" + "testing" + + "github.com/openconfig/gnmi/errdiff" + topopb "github.com/openconfig/kne/proto/topo" + "github.com/openconfig/kne/topo/node" + "google.golang.org/protobuf/encoding/prototext" + "google.golang.org/protobuf/proto" +) + +func TestNew(t *testing.T) { + tests := []struct { + desc string + nImpl *node.Impl + want *topopb.Node + wantErr string + }{{ + desc: "nil impl", + wantErr: "nodeImpl cannot be nil", + }, { + desc: "nil pb", + wantErr: "nodeImpl.Proto cannot be nil", + nImpl: &node.Impl{}, + }, { + desc: "empty pb", + nImpl: &node.Impl{ + Proto: &topopb.Node{}, + }, + want: &topopb.Node{ + Config: &topopb.Config{ + EntryCommand: fmt.Sprintf("kubectl exec -it %s -- sh", ""), + Image: "forward:latest", + ConfigPath: "/etc", + ConfigFile: "config", + }, + }, + }, { + desc: "provided service", + nImpl: &node.Impl{ + Proto: &topopb.Node{ + Config: &topopb.Config{ + Command: []string{"do", "run"}, + }, + Services: map[uint32]*topopb.Service{ + 2000: { + Name: "Service", + Inside: 2000, + Outside: 20001, + InsideIp: "1.1.1.1", + OutsideIp: "10.10.10.10", + }, + }, + }, + }, + want: &topopb.Node{ + Config: &topopb.Config{ + Command: []string{"do", "run"}, + EntryCommand: fmt.Sprintf("kubectl exec -it %s -- sh", ""), + Image: "forward:latest", + ConfigPath: "/etc", + ConfigFile: "config", + }, + Services: map[uint32]*topopb.Service{ + 2000: { + Name: "Service", + Inside: 2000, + Outside: 20001, + InsideIp: "1.1.1.1", + OutsideIp: "10.10.10.10", + }, + }, + }, + }, { + desc: "provided config command", + nImpl: &node.Impl{ + Proto: &topopb.Node{ + Config: &topopb.Config{ + Command: []string{"do", "run"}, + }, + }, + }, + want: &topopb.Node{ + Config: &topopb.Config{ + Command: []string{"do", "run"}, + EntryCommand: fmt.Sprintf("kubectl exec -it %s -- sh", ""), + Image: "forward:latest", + ConfigPath: "/etc", + ConfigFile: "config", + }, + }, + }} + for _, tt := range tests { + t.Run(tt.desc, func(t *testing.T) { + n, err := New(tt.nImpl) + if s := errdiff.Substring(err, tt.wantErr); s != "" { + t.Fatalf("unexpected error: got %v, want %s", err, s) + } + if tt.wantErr != "" { + return + } + if !proto.Equal(n.GetProto(), tt.want) { + t.Fatalf("New() failed: got\n%swant\n%s", prototext.Format(n.GetProto()), prototext.Format(tt.want)) + } + }) + } +} diff --git a/topo/topo.go b/topo/topo.go index 5e5030a5..42199643 100644 --- a/topo/topo.go +++ b/topo/topo.go @@ -51,6 +51,7 @@ import ( _ "github.com/openconfig/kne/topo/node/arista" _ "github.com/openconfig/kne/topo/node/cisco" _ "github.com/openconfig/kne/topo/node/drivenets" + _ "github.com/openconfig/kne/topo/node/forward" _ "github.com/openconfig/kne/topo/node/gobgp" _ "github.com/openconfig/kne/topo/node/host" _ "github.com/openconfig/kne/topo/node/juniper" diff --git a/x/wire/client/main.go b/x/wire/file/client/main.go similarity index 97% rename from x/wire/client/main.go rename to x/wire/file/client/main.go index 77fb25f7..4f9d56f3 100644 --- a/x/wire/client/main.go +++ b/x/wire/file/client/main.go @@ -37,12 +37,10 @@ func main() { if err != nil { log.Fatalf("Failed to create file based read/writer: %v", err) } - defer frw1.Close() frw2, err := wire.NewFileReadWriter("testdata/fwdxx02sql17src.txt", "testdata/fwdxx02sql17dst.txt") if err != nil { log.Fatalf("Failed to create file based read/writer: %v", err) } - defer frw2.Close() endpoints := map[*wire.PhysicalEndpoint]*wire.Wire{ wire.NewPhysicalEndpoint("xx01.sql17", "Ethernet0/0/0/0"): wire.NewWire(frw1), wire.NewPhysicalEndpoint("xx02.sql17", "Ethernet0/0/0/1"): wire.NewWire(frw2), @@ -54,7 +52,6 @@ func main() { if err != nil { log.Fatalf("Failed to dial %q: %v", *addr, err) } - defer conn.Close() c := wpb.NewWireClient(conn) g := new(errgroup.Group) for e, w := range endpoints { diff --git a/x/wire/client/testdata/fwdxx01sql17src.txt b/x/wire/file/client/testdata/fwdxx01sql17src.txt similarity index 100% rename from x/wire/client/testdata/fwdxx01sql17src.txt rename to x/wire/file/client/testdata/fwdxx01sql17src.txt diff --git a/x/wire/client/testdata/fwdxx02sql17src.txt b/x/wire/file/client/testdata/fwdxx02sql17src.txt similarity index 100% rename from x/wire/client/testdata/fwdxx02sql17src.txt rename to x/wire/file/client/testdata/fwdxx02sql17src.txt diff --git a/x/wire/server/main.go b/x/wire/file/server/main.go similarity index 98% rename from x/wire/server/main.go rename to x/wire/file/server/main.go index 898e2f4d..4b20c6f9 100644 --- a/x/wire/server/main.go +++ b/x/wire/file/server/main.go @@ -67,12 +67,10 @@ func main() { if err != nil { log.Fatalf("Failed to create file based read/writer: %v", err) } - defer frw1.Close() frw2, err := wire.NewFileReadWriter("testdata/xx02sql17src.txt", "testdata/xx02sql17dst.txt") if err != nil { log.Fatalf("Failed to create file based read/writer: %v", err) } - defer frw2.Close() endpoints := map[wire.PhysicalEndpoint]*wire.Wire{ *wire.NewPhysicalEndpoint("xx01.sql17", "Ethernet0/0/0/0"): wire.NewWire(frw1), *wire.NewPhysicalEndpoint("xx02.sql17", "Ethernet0/0/0/1"): wire.NewWire(frw2), diff --git a/x/wire/server/testdata/xx01sql17src.txt b/x/wire/file/server/testdata/xx01sql17src.txt similarity index 100% rename from x/wire/server/testdata/xx01sql17src.txt rename to x/wire/file/server/testdata/xx01sql17src.txt diff --git a/x/wire/server/testdata/xx02sql17src.txt b/x/wire/file/server/testdata/xx02sql17src.txt similarity index 100% rename from x/wire/server/testdata/xx02sql17src.txt rename to x/wire/file/server/testdata/xx02sql17src.txt diff --git a/x/wire/forward/Dockerfile b/x/wire/forward/Dockerfile new file mode 100644 index 00000000..8baa70bb --- /dev/null +++ b/x/wire/forward/Dockerfile @@ -0,0 +1,13 @@ +FROM golang:1.21 + +WORKDIR /app + +COPY go.mod go.sum ./ +RUN go mod download + +COPY x/wire/ x/wire/ +COPY proto/ proto/ + +RUN go build -o forward x/wire/forward/main.go + +ENTRYPOINT ["./forward"] diff --git a/x/wire/forward/main.go b/x/wire/forward/main.go new file mode 100644 index 00000000..f29671ed --- /dev/null +++ b/x/wire/forward/main.go @@ -0,0 +1,151 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package main provides the binary that executes forwarding over grpc wires. +package main + +import ( + "context" + "fmt" + "net" + "strings" + + wpb "github.com/openconfig/kne/proto/wire" + "github.com/openconfig/kne/x/wire" + flag "github.com/spf13/pflag" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + log "k8s.io/klog/v2" +) + +var ( + port = flag.Int("port", 50058, "Wire server port") + interfaces = flag.StringSlice("interfaces", []string{}, "List of local interfaces to serve on the wire server") + endpoints = flag.StringSlice("endpoints", []string{}, "List of strings of the form local_interface/remote_address/remote_interface") +) + +type server struct { + wpb.UnimplementedWireServer + endpoints map[wire.InterfaceEndpoint]*wire.Wire +} + +func newServer(endpoints map[wire.InterfaceEndpoint]*wire.Wire) *server { + return &server{endpoints: endpoints} +} + +func (s *server) Transmit(stream wpb.Wire_TransmitServer) error { + e, err := wire.ParseInterfaceEndpoint(stream.Context()) + if err != nil { + return fmt.Errorf("unable to parse endpoint from incoming stream context: %v", err) + } + log.Infof("New Transmit stream started for endpoint %v", e) + w, ok := s.endpoints[*e] + if !ok { + return fmt.Errorf("no endpoint found on server for request: %v", e) + } + if err := w.Transmit(stream.Context(), stream); err != nil { + return fmt.Errorf("transmit failed: %v", err) + } + return nil +} + +func localEndpoints(intfs []string) (map[wire.InterfaceEndpoint]*wire.Wire, error) { + m := map[wire.InterfaceEndpoint]*wire.Wire{} + for _, intf := range intfs { + rw, err := wire.NewInterfaceReadWriter(intf) + if err != nil { + return nil, err + } + m[*wire.NewInterfaceEndpoint(intf)] = wire.NewWire(rw) + } + return m, nil +} + +func remoteEndpoints(eps []string) (map[string]map[wire.InterfaceEndpoint]*wire.Wire, error) { + m := map[string]map[wire.InterfaceEndpoint]*wire.Wire{} + for _, ep := range eps { + parts := strings.SplitN(ep, "/", 3) + if len(parts) != 3 { + return nil, fmt.Errorf("unable to parse %v into endpoint, got %v", ep, parts) + } + lintf := parts[0] + addr := parts[1] + rintf := parts[2] + rw, err := wire.NewInterfaceReadWriter(lintf) + if err != nil { + return nil, err + } + if _, ok := m[addr]; !ok { + m[addr] = map[wire.InterfaceEndpoint]*wire.Wire{} + } + m[addr][*wire.NewInterfaceEndpoint(rintf)] = wire.NewWire(rw) + } + return m, nil +} + +func main() { + flag.Parse() + ctx := context.Background() + addr := fmt.Sprintf(":%d", *port) + lis, err := net.Listen("tcp6", addr) + if err != nil { + log.Fatalf("Failed to listen: %v", err) + } + s := grpc.NewServer() + le, err := localEndpoints(*interfaces) + if err != nil { + log.Fatalf("Failed to create local endpoints from interfaces: %v") + } + wpb.RegisterWireServer(s, newServer(le)) + g := new(errgroup.Group) + g.Go(func() error { + log.Infof("Wire server listening at %v", lis.Addr()) + return s.Serve(lis) + }) + re, err := remoteEndpoints(*endpoints) + if err != nil { + log.Fatalf("Failed to create remote endpoints: %v", err) + } + for a, m := range re { + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + conn, err := grpc.DialContext(ctx, a, opts...) + if err != nil { + log.Fatalf("Failed to dial %q: %v", a, err) + } + c := wpb.NewWireClient(conn) + for e, w := range m { + c := c + e := e + w := w + g.Go(func() error { + octx := e.NewContext(ctx) + stream, err := c.Transmit(octx) + if err != nil { + return err + } + defer func() { + stream.CloseSend() + }() + log.Infof("Transmitting endpoint %v over wire...", e) + return w.Transmit(ctx, stream) + }) + } + } + if err := g.Wait(); err != nil { + log.Fatalf("Failed to wait for wire transmits: %v", err) + } +} diff --git a/x/wire/intf/client/main.go b/x/wire/intf/client/main.go new file mode 100644 index 00000000..1ce87755 --- /dev/null +++ b/x/wire/intf/client/main.go @@ -0,0 +1,76 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package main provides an example wire client. +package main + +import ( + "context" + "flag" + + wpb "github.com/openconfig/kne/proto/wire" + "github.com/openconfig/kne/x/wire" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + log "k8s.io/klog/v2" +) + +var ( + addr = flag.String("addr", "localhost:50058", "Wire server address") +) + +func main() { + ctx := context.Background() + i0, err := wire.NewInterfaceReadWriter("eth0") + if err != nil { + log.Fatalf("Failed to create interface read/writer: %v", err) + } + i1, err := wire.NewInterfaceReadWriter("eth1") + if err != nil { + log.Fatalf("Failed to create interface read/writer: %v", err) + } + endpoints := map[*wire.PhysicalEndpoint]*wire.Wire{ + wire.NewPhysicalEndpoint("r3", "eth0"): wire.NewWire(i0), + wire.NewPhysicalEndpoint("r3", "eth1"): wire.NewWire(i1), + } + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + conn, err := grpc.DialContext(ctx, *addr, opts...) + if err != nil { + log.Fatalf("Failed to dial %q: %v", *addr, err) + } + c := wpb.NewWireClient(conn) + g := new(errgroup.Group) + for e, w := range endpoints { + e := e + w := w + g.Go(func() error { + octx := e.NewContext(ctx) + stream, err := c.Transmit(octx) + if err != nil { + return err + } + defer func() { + stream.CloseSend() + }() + log.Infof("Transmitting endpoint %v over wire...", e) + return w.Transmit(ctx, stream) + }) + } + if err := g.Wait(); err != nil { + log.Fatalf("Failed to wait for wire transmits: %v", err) + } +} diff --git a/x/wire/intf/server/main.go b/x/wire/intf/server/main.go new file mode 100644 index 00000000..a5cd2a3a --- /dev/null +++ b/x/wire/intf/server/main.go @@ -0,0 +1,83 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package main provides an example wire server. +package main + +import ( + "flag" + "fmt" + "net" + + log "github.com/golang/glog" + wpb "github.com/openconfig/kne/proto/wire" + "github.com/openconfig/kne/x/wire" + "google.golang.org/grpc" +) + +var ( + port = flag.Int("port", 50058, "Wire server port") +) + +type server struct { + wpb.UnimplementedWireServer + endpoints map[wire.PhysicalEndpoint]*wire.Wire +} + +func newServer(endpoints map[wire.PhysicalEndpoint]*wire.Wire) *server { + return &server{endpoints: endpoints} +} + +func (s *server) Transmit(stream wpb.Wire_TransmitServer) error { + pe, err := wire.ParsePhysicalEndpoint(stream.Context()) + if err != nil { + return fmt.Errorf("unable to parse physical endpoint from incoming stream context: %v", err) + } + log.Infof("New Transmit stream started for endpoint %v", pe) + w, ok := s.endpoints[*pe] + if !ok { + return fmt.Errorf("no endpoint found on server for request: %v", pe) + } + if err := w.Transmit(stream.Context(), stream); err != nil { + return fmt.Errorf("transmit failed: %v", err) + } + return nil +} + +func main() { + flag.Parse() + addr := fmt.Sprintf(":%d", *port) + lis, err := net.Listen("tcp6", addr) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + s := grpc.NewServer() + i0, err := wire.NewInterfaceReadWriter("eth0") + if err != nil { + log.Fatalf("Failed to create packet handle read/writer: %v", err) + } + i1, err := wire.NewInterfaceReadWriter("eth1") + if err != nil { + log.Fatalf("Failed to create file based read/writer: %v", err) + } + endpoints := map[wire.PhysicalEndpoint]*wire.Wire{ + *wire.NewPhysicalEndpoint("r3", "eth0"): wire.NewWire(i0), + *wire.NewPhysicalEndpoint("r3", "eth1"): wire.NewWire(i1), + } + wpb.RegisterWireServer(s, newServer(endpoints)) + log.Infof("Wire server listening at %v", lis.Addr()) + if err := s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } +} diff --git a/x/wire/wire.go b/x/wire/wire.go index fafc1125..219e6265 100644 --- a/x/wire/wire.go +++ b/x/wire/wire.go @@ -22,6 +22,9 @@ import ( "io" "os" + "github.com/google/gopacket" + "github.com/google/gopacket/afpacket" + "github.com/google/gopacket/layers" wpb "github.com/openconfig/kne/proto/wire" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -34,6 +37,35 @@ type ReadWriter interface { Write([]byte) error } +type InterfaceReadWriter struct { + handle *afpacket.TPacket + ps *gopacket.PacketSource +} + +func NewInterfaceReadWriter(intf string) (*InterfaceReadWriter, error) { + h, err := afpacket.NewTPacket(afpacket.OptInterface(intf)) + if err != nil { + return nil, fmt.Errorf("unable to create packet handle for %q: %v", intf, err) + } + return &InterfaceReadWriter{handle: h, ps: gopacket.NewPacketSource(h, layers.LinkTypeEthernet)}, nil +} + +func (i *InterfaceReadWriter) Read() ([]byte, error) { + p, err := i.ps.NextPacket() + if err != nil { + return nil, err + } + return p.Data(), nil +} + +func (i *InterfaceReadWriter) Write(b []byte) error { + return i.handle.WritePacketData(b) +} + +func (i *InterfaceReadWriter) Close() { + i.handle.Close() +} + type FileReadWriter struct { src *os.File srcScanner *bufio.Scanner @@ -177,3 +209,32 @@ func (p *PhysicalEndpoint) NewContext(ctx context.Context) context.Context { }) return metadata.NewOutgoingContext(ctx, md) } + +type InterfaceEndpoint struct { + intf string +} + +func NewInterfaceEndpoint(intf string) *InterfaceEndpoint { + return &InterfaceEndpoint{intf: intf} +} + +func ParseInterfaceEndpoint(ctx context.Context) (*InterfaceEndpoint, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, fmt.Errorf("no metadata in incoming context") + } + i := &InterfaceEndpoint{} + vals := md.Get("interface") + if len(vals) != 1 || vals[0] == "" { + return nil, fmt.Errorf("interface key not found") + } + i.intf = vals[0] + return i, nil +} + +func (i *InterfaceEndpoint) NewContext(ctx context.Context) context.Context { + md := metadata.New(map[string]string{ + "interface": i.intf, + }) + return metadata.NewOutgoingContext(ctx, md) +} diff --git a/x/wire/wire_test.go b/x/wire/wire_test.go index ef5b3c47..af288a58 100644 --- a/x/wire/wire_test.go +++ b/x/wire/wire_test.go @@ -64,9 +64,9 @@ func TestTransmit(t *testing.T) { } rw := NewFakeReadWriter(rwSrc) sSrc := []*wpb.Packet{ - &wpb.Packet{Data: []byte("sSrc1")}, - &wpb.Packet{Data: []byte("sSrc2")}, - &wpb.Packet{Data: []byte("sSrc3")}, + {Data: []byte("sSrc1")}, + {Data: []byte("sSrc2")}, + {Data: []byte("sSrc3")}, } s := NewFakeStream(sSrc) w := NewWire(rw)