From 03d9c684336749b580b6bdc0c8f425613c03a611 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Thu, 14 Nov 2024 13:54:24 -0800 Subject: [PATCH] Close TCP connection in case of error This is better than keeping the connection open, as it sends a signal to the exporter that something is wrong. This is also one of the four possible behaviors described by RFC7011 for handling malformed IPFIX messages for TCP and SCTP connections. See https://datatracker.ietf.org/doc/html/rfc7011#section-9.1. Note that implementing one of these four options is not mandatory. However, the RFC also states that: > the Collecting Process SHOULD stop processing IPFIX Messages > from clearly malfunctioning Exporting Processes (e.g., those from > which the last few IPFIX Messages have been malformed). Closing the TCP connection is the easiest thing for us to implement, and it makes sense to use IMO. Signed-off-by: Antonin Bas --- pkg/collector/process.go | 2 +- pkg/collector/process_test.go | 45 +++++++++++++++++++++++++++++++---- pkg/collector/tcp.go | 21 ++++++++++++---- 3 files changed, 57 insertions(+), 11 deletions(-) diff --git a/pkg/collector/process.go b/pkg/collector/process.go index 1f1406ed..9506e401 100644 --- a/pkg/collector/process.go +++ b/pkg/collector/process.go @@ -174,7 +174,7 @@ func (cp *CollectingProcess) Stop() { close(cp.stopChan) // wait for all connections to be safely deleted and returned cp.wg.Wait() - klog.Info("Stopping the collecting process") + klog.Info("Stopped the collecting process") } func (cp *CollectingProcess) GetAddress() net.Addr { diff --git a/pkg/collector/process_test.go b/pkg/collector/process_test.go index 5f07efce..294dc00d 100644 --- a/pkg/collector/process_test.go +++ b/pkg/collector/process_test.go @@ -21,6 +21,7 @@ import ( "crypto/x509" "encoding/binary" "fmt" + "io" "net" "runtime" "sync" @@ -38,10 +39,14 @@ import ( testcerts "github.com/vmware/go-ipfix/pkg/test/certs" ) -var validTemplatePacket = []byte{0, 10, 0, 40, 95, 154, 107, 127, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 0, 24, 1, 0, 0, 3, 0, 8, 0, 4, 0, 12, 0, 4, 128, 101, 255, 255, 0, 0, 220, 186} -var validTemplatePacketIPv6 = []byte{0, 10, 0, 32, 96, 27, 70, 6, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 0, 16, 1, 0, 0, 2, 0, 27, 0, 16, 0, 28, 0, 16} -var validDataPacket = []byte{0, 10, 0, 33, 95, 154, 108, 18, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 17, 1, 2, 3, 4, 5, 6, 7, 8, 4, 112, 111, 100, 49} -var validDataPacketIPv6 = []byte{0, 10, 0, 52, 96, 27, 75, 252, 0, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 36, 32, 1, 0, 0, 50, 56, 223, 225, 0, 99, 0, 0, 0, 0, 254, 251, 32, 1, 0, 0, 50, 56, 223, 225, 0, 99, 0, 0, 0, 0, 254, 251} +var ( + validTemplatePacket = []byte{0, 10, 0, 40, 95, 154, 107, 127, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 0, 24, 1, 0, 0, 3, 0, 8, 0, 4, 0, 12, 0, 4, 128, 101, 255, 255, 0, 0, 220, 186} + validTemplatePacketIPv6 = []byte{0, 10, 0, 32, 96, 27, 70, 6, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 0, 16, 1, 0, 0, 2, 0, 27, 0, 16, 0, 28, 0, 16} + validDataPacket = []byte{0, 10, 0, 33, 95, 154, 108, 18, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 17, 1, 2, 3, 4, 5, 6, 7, 8, 4, 112, 111, 100, 49} + validDataPacketIPv6 = []byte{0, 10, 0, 52, 96, 27, 75, 252, 0, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 36, 32, 1, 0, 0, 50, 56, 223, 225, 0, 99, 0, 0, 0, 0, 254, 251, 32, 1, 0, 0, 50, 56, 223, 225, 0, 99, 0, 0, 0, 0, 254, 251} + + invalidTemplatePacketWrongVersion = []byte{0, 9, 0, 40, 95, 40, 211, 236, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 0, 24, 1, 0, 0, 3, 0, 8, 0, 4, 0, 12, 0, 4, 128, 105, 255, 255, 0, 0, 218, 21} +) const ( tcpTransport = "tcp" @@ -85,6 +90,36 @@ func TestTCPCollectingProcess_ReceiveTemplateRecord(t *testing.T) { assert.Equal(t, int64(1), cp.GetNumRecordsReceived()) } +func TestTCPCollectingProcess_ReceiveInvalidTemplateRecord(t *testing.T) { + input := getCollectorInput(tcpTransport, false, false) + cp, err := InitCollectingProcess(input) + if err != nil { + t.Fatalf("TCP Collecting Process does not start correctly: %v", err) + } + go cp.Start() + // wait until collector is ready + waitForCollectorReady(t, cp) + go func() { + // consume all messages to avoid blocking + ch := cp.GetMsgChan() + for range ch { + } + }() + collectorAddr := cp.GetAddress() + // client + conn, err := net.Dial(collectorAddr.Network(), collectorAddr.String()) + if err != nil { + t.Errorf("Cannot establish connection to %s", collectorAddr.String()) + } + defer conn.Close() + conn.SetReadDeadline(time.Now().Add(1 * time.Second)) + conn.Write(invalidTemplatePacketWrongVersion) + readBuffer := make([]byte, 100) + _, err = conn.Read(readBuffer) + assert.ErrorIs(t, err, io.EOF) + cp.Stop() +} + func TestUDPCollectingProcess_ReceiveTemplateRecord(t *testing.T) { input := getCollectorInput(udpTransport, false, false) cp, err := InitCollectingProcess(input) @@ -375,7 +410,7 @@ func TestCollectingProcess_DecodeTemplateRecord(t *testing.T) { templateID: &template{}, }, }, - templateRecord: []byte{0, 9, 0, 40, 95, 40, 211, 236, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 0, 24, 1, 0, 0, 3, 0, 8, 0, 4, 0, 12, 0, 4, 128, 105, 255, 255, 0, 0, 218, 21}, + templateRecord: invalidTemplatePacketWrongVersion, expectedErr: "collector only supports IPFIX (v10)", // Invalid version means we stop decoding the packet right away, so we will not modify the existing template map isTemplateExpected: true, diff --git a/pkg/collector/tcp.go b/pkg/collector/tcp.go index e0178070..e55c9e84 100644 --- a/pkg/collector/tcp.go +++ b/pkg/collector/tcp.go @@ -81,9 +81,14 @@ func (cp *CollectingProcess) handleTCPClient(conn net.Conn) { }() defer conn.Close() reader := bufio.NewReader(conn) + doneCh := make(chan struct{}) cp.wg.Add(1) + // We read from the connection in a separate goroutine, so we can stop immediately when + // cp.StopChan is closed. An alternative would be to use a read deadline, and check + // cp.StopChan at every iteration. go func() { defer cp.wg.Done() + defer close(doneCh) for { length, err := getMessageLength(reader) if errors.Is(err, io.EOF) { @@ -102,16 +107,22 @@ func (cp *CollectingProcess) handleTCPClient(conn net.Conn) { } message, err := cp.decodePacket(bytes.NewBuffer(buff), address) if err != nil { - // TODO: should we close the connection instead and force the client to - // re-open it? - klog.ErrorS(err, "Error when decoding packet") - continue + // This can be an invalid template record, or invalid data record. + // We close the connection, which is the best way to let the client + // (exporter) know that something is wrong. + klog.ErrorS(err, "Error when decoding packet, closing connection") + return } klog.V(4).InfoS("Processed message from exporter", "observationDomainID", message.GetObsDomainID(), "setType", message.GetSet().GetSetType(), "numRecords", message.GetSet().GetNumberOfRecords()) } }() - <-cp.stopChan + select { + case <-cp.stopChan: + break + case <-doneCh: + break + } } func (cp *CollectingProcess) createServerConfig() (*tls.Config, error) {