From fd61fbfdfda94d1478d4de41ea50fdbe5e5c0243 Mon Sep 17 00:00:00 2001 From: Gabor Retvari Date: Tue, 12 Nov 2024 20:01:48 +0100 Subject: [PATCH] refactor: Rewrite metric exporting to Open Telemetry --- go.mod | 30 +-- go.sum | 51 +++-- internal/telemetry/metrics.go | 170 --------------- internal/telemetry/statsconn.go | 53 ++--- internal/telemetry/telemetry.go | 233 ++++++++++++++++++++ internal/telemetry/tester/tester.go | 119 +++++++++++ internal/util/conn.go | 93 -------- internal/util/file_conn.go | 55 +++++ internal/util/socketpool.go | 46 +++- internal/util/socketpool_nonunix.go | 20 ++ internal/util/socketpool_unix.go | 8 +- relay.go | 21 +- relay_test.go | 22 +- server.go | 10 +- stunner.go | 23 +- stunner_test.go | 316 +++++++++++++++------------- 16 files changed, 760 insertions(+), 510 deletions(-) delete mode 100644 internal/telemetry/metrics.go create mode 100644 internal/telemetry/telemetry.go create mode 100644 internal/telemetry/tester/tester.go delete mode 100644 internal/util/conn.go create mode 100644 internal/util/file_conn.go create mode 100644 internal/util/socketpool_nonunix.go diff --git a/go.mod b/go.mod index 22168e75..58c96984 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,8 @@ module github.com/l7mp/stunner -go 1.21 +go 1.22 -toolchain go1.21.5 +toolchain go1.22.4 require ( github.com/deepmap/oapi-codegen/v2 v2.1.0 @@ -18,12 +18,17 @@ require ( github.com/pion/logging v0.2.2 github.com/pion/transport/v3 v3.0.7 github.com/pion/turn/v4 v4.0.0 - github.com/prometheus/client_golang v1.20.2 + github.com/prometheus/client_golang v1.20.4 github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/otel v1.31.0 + go.opentelemetry.io/otel/exporters/prometheus v0.53.0 + go.opentelemetry.io/otel/metric v1.31.0 + go.opentelemetry.io/otel/sdk v1.31.0 + go.opentelemetry.io/otel/sdk/metric v1.31.0 go.uber.org/zap v1.26.0 - golang.org/x/sys v0.23.0 + golang.org/x/sys v0.26.0 golang.org/x/time v0.5.0 k8s.io/api v0.29.1 k8s.io/apimachinery v0.29.1 @@ -42,6 +47,7 @@ require ( github.com/emicklei/go-restful/v3 v3.11.2 // indirect github.com/evanphx/json-patch v5.9.0+incompatible // indirect github.com/go-errors/errors v1.5.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.20.2 // indirect github.com/go-openapi/jsonreference v0.20.4 // indirect github.com/go-openapi/swag v0.22.9 // indirect @@ -58,7 +64,6 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.9 // indirect - github.com/kylelemons/godebug v1.1.0 // indirect github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/moby/spdystream v0.2.0 // indirect @@ -76,21 +81,22 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect - github.com/prometheus/common v0.58.0 // indirect + github.com/prometheus/common v0.60.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/wlynxg/anet v0.0.3 // indirect github.com/xlab/treeprint v1.2.0 // indirect + go.opentelemetry.io/otel/trace v1.31.0 // indirect go.starlark.net v0.0.0-20240123142251-f86470692795 // indirect go.uber.org/multierr v1.10.0 // indirect - golang.org/x/crypto v0.26.0 // indirect + golang.org/x/crypto v0.27.0 // indirect golang.org/x/mod v0.17.0 // indirect - golang.org/x/net v0.28.0 // indirect - golang.org/x/oauth2 v0.22.0 // indirect + golang.org/x/net v0.29.0 // indirect + golang.org/x/oauth2 v0.23.0 // indirect golang.org/x/sync v0.8.0 // indirect - golang.org/x/term v0.23.0 // indirect - golang.org/x/text v0.17.0 // indirect + golang.org/x/term v0.24.0 // indirect + golang.org/x/text v0.18.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect - google.golang.org/protobuf v1.34.2 // indirect + google.golang.org/protobuf v1.35.1 // indirect gopkg.in/evanphx/json-patch.v5 v5.9.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 647996fb..bc4e93a5 100644 --- a/go.sum +++ b/go.sum @@ -28,8 +28,11 @@ github.com/getkin/kin-openapi v0.123.0 h1:zIik0mRwFNLyvtXK274Q6ut+dPh6nlxBp0x7mN github.com/getkin/kin-openapi v0.123.0/go.mod h1:wb1aSZA/iWmorQP9KTAS/phLj/t17B5jT7+fS8ed9NM= github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk= github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-logr/zapr v1.3.0 h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ= github.com/go-logr/zapr v1.3.0/go.mod h1:YKepepNBd1u/oyhd/yQmtjVXmm9uML4IXUgMOwR8/Gg= github.com/go-openapi/jsonpointer v0.20.2 h1:mQc3nmndL8ZBzStEo3JYF8wzmeWffDH4VbXz58sAx6Q= @@ -139,12 +142,12 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.20.2 h1:5ctymQzZlyOON1666svgwn3s6IKWgfbjsejTMiXIyjg= -github.com/prometheus/client_golang v1.20.2/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_golang v1.20.4 h1:Tgh3Yr67PaOv/uTqloMsCEdeuFTatm5zIq5+qNN23vI= +github.com/prometheus/client_golang v1.20.4/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= -github.com/prometheus/common v0.58.0 h1:N+N8vY4/23r6iYfD3UQZUoJPnUYAo7v6LG5XZxjZTXo= -github.com/prometheus/common v0.58.0/go.mod h1:GpWM7dewqmVYcd7SmRaiWVe9SSqjf0UrwnYnpEZNuT0= +github.com/prometheus/common v0.60.0 h1:+V9PAREWNvJMAuJ1x1BaWl9dewMW4YrHZQbx0sJNllA= +github.com/prometheus/common v0.60.0/go.mod h1:h0LYf1R1deLSKtD4Vdg8gy4RuOvENW2J/h19V5NADQw= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= @@ -172,6 +175,18 @@ github.com/xlab/treeprint v1.2.0 h1:HzHnuAF1plUN2zGlAFHbSQP2qJ0ZAD3XF5XD7OesXRQ= github.com/xlab/treeprint v1.2.0/go.mod h1:gj5Gd3gPdKtR1ikdDK6fnFLdmIS0X30kTTuNd/WEJu0= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= +go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= +go.opentelemetry.io/otel/exporters/prometheus v0.53.0 h1:QXobPHrwiGLM4ufrY3EOmDPJpo2P90UuFau4CDPJA/I= +go.opentelemetry.io/otel/exporters/prometheus v0.53.0/go.mod h1:WOAXGr3D00CfzmFxtTV1eR0GpoHuPEu+HJT8UWW2SIU= +go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= +go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= +go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= +go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= +go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= +go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= +go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= +go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= go.starlark.net v0.0.0-20240123142251-f86470692795 h1:LmbG8Pq7KDGkglKVn8VpZOZj6vb9b8nKEGcg9l03epM= go.starlark.net v0.0.0-20240123142251-f86470692795/go.mod h1:LcLNIzVOMp4oV+uusnpk+VU+SzXaJakUuBjoCSWH5dM= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -183,8 +198,8 @@ go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= -golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug= golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -195,10 +210,10 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= -golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= -golang.org/x/oauth2 v0.22.0 h1:BzDx2FehcG7jJwgWLELCdmLuxk2i+x9UDpSiss2u0ZA= -golang.org/x/oauth2 v0.22.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= +golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -208,14 +223,14 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= -golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= -golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM= +golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= -golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -230,8 +245,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= +google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go deleted file mode 100644 index 80fca0d4..00000000 --- a/internal/telemetry/metrics.go +++ /dev/null @@ -1,170 +0,0 @@ -package telemetry - -import ( - "github.com/pion/logging" - "github.com/prometheus/client_golang/prometheus" -) - -const ( - stunnerNamespace = "stunner" -) - -var ( - ConnLabels = []string{"name"} - CounterLabels = []string{"name", "direction"} - AllocActiveGauge prometheus.GaugeFunc - ListenerPacketsTotal *prometheus.CounterVec - ListenerBytesTotal *prometheus.CounterVec - ListenerConnsTotal *prometheus.CounterVec - ListenerConnsActive *prometheus.GaugeVec - ClusterPacketsTotal *prometheus.CounterVec - ClusterBytesTotal *prometheus.CounterVec - // promClusterConnsTotal *prometheus.CounterVec - // promClusterConnsActive *prometheus.GaugeVec -) - -func Init() { - // listener stats - ListenerConnsActive = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: stunnerNamespace, - Subsystem: "listener", - Name: "connections", - Help: "Number of active downstream connections at a listener.", - }, ConnLabels) - ListenerConnsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: stunnerNamespace, - Subsystem: "listener", - Name: "connections_total", - Help: "Number of downstream connections at a listener.", - }, ConnLabels) - ListenerPacketsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: stunnerNamespace, - Subsystem: "listener", - Name: "packets_total", - Help: "Number of datagrams sent or received at a listener.", - }, CounterLabels) - ListenerBytesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: stunnerNamespace, - Subsystem: "listener", - Name: "bytes_total", - Help: "Number of bytes sent or received at a listener.", - }, CounterLabels) - - prometheus.MustRegister(ListenerPacketsTotal) - prometheus.MustRegister(ListenerBytesTotal) - prometheus.MustRegister(ListenerConnsTotal) - prometheus.MustRegister(ListenerConnsActive) - - // cluster stats - // promClusterConnsActive = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - // Namespace: stunnerNamespace, - // Subsystem: "cluster", - // Name: "connections", - // Help: "Number of active upstream connections on behalf of a listener", - // }, promConnLabels) - // promClusterConnsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ - // Namespace: stunnerNamespace, - // Subsystem: "cluster", - // Name: "connections_total", - // Help: "Number of upstream connections on behalf of a listener.", - // }, promConnLabels) - ClusterPacketsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: stunnerNamespace, - Subsystem: "cluster", - Name: "packets_total", - Help: "Number of datagrams sent to backends or received from backends on behalf of a listener", - }, CounterLabels) - ClusterBytesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: stunnerNamespace, - Subsystem: "cluster", - Name: "bytes_total", - Help: "Number of bytes sent to backends or received from backends on behalf of a listener.", - }, CounterLabels) - - prometheus.MustRegister(ClusterPacketsTotal) - prometheus.MustRegister(ClusterBytesTotal) - // prometheus.MustRegister(promClusterConnsTotal) - // prometheus.MustRegister(promClusterConnsActive) -} - -func Close() { - _ = prometheus.Unregister(ListenerPacketsTotal) - _ = prometheus.Unregister(ListenerBytesTotal) - _ = prometheus.Unregister(ListenerConnsTotal) - _ = prometheus.Unregister(ListenerConnsActive) - _ = prometheus.Unregister(ClusterPacketsTotal) - _ = prometheus.Unregister(ClusterBytesTotal) - // _ = prometheus.Unregister(promClusterConnsTotal) - // _ = prometheus.Unregister(promClusterConnsActive) -} - -func IncrementPackets(n string, c ConnType, d Direction, count uint64) { - switch c { - case ListenerType: - ListenerPacketsTotal.WithLabelValues(n, d.String()).Add(float64(count)) - case ClusterType: - ClusterPacketsTotal.WithLabelValues(n, d.String()).Add(float64(count)) - } -} - -func IncrementBytes(n string, c ConnType, d Direction, count uint64) { - switch c { - case ListenerType: - ListenerBytesTotal.WithLabelValues(n, d.String()).Add(float64(count)) - case ClusterType: - ClusterBytesTotal.WithLabelValues(n, d.String()).Add(float64(count)) - } -} - -func AddConnection(n string, c ConnType) { - switch c { - case ListenerType: - ListenerConnsActive.WithLabelValues(n).Add(1) - ListenerConnsTotal.WithLabelValues(n).Add(1) - case ClusterType: - // promClusterConnsActive.WithLabelValues(n).Add(1) - // promClusterConnsTotal.WithLabelValues(n).Add(1) - } -} - -func SubConnection(n string, c ConnType) { - switch c { - case ListenerType: - ListenerConnsActive.WithLabelValues(n).Sub(1) - case ClusterType: - // promClusterConnsActive.WithLabelValues(n).Sub(1) - } -} - -func RegisterAllocationMetric(log logging.LeveledLogger, GetAllocationCount func() float64) { - AllocActiveGauge = prometheus.NewGaugeFunc( - prometheus.GaugeOpts{ - Namespace: stunnerNamespace, - Name: "allocations_active", - Help: "Number of active allocations.", - }, - GetAllocationCount, - ) - if err := prometheus.Register(AllocActiveGauge); err == nil { - log.Debug("GaugeFunc 'stunner_allocations_active' registered.") - } else { - log.Warn("GaugeFunc 'stunner_allocations_active' cannot be registered.") - } -} - -func UnregisterAllocationMetric(log logging.LeveledLogger) { - if AllocActiveGauge != nil { - if success := prometheus.Unregister(AllocActiveGauge); success { - log.Debug("GaugeFunc 'stunner_allocations_active' unregistered.") - return - } - } - log.Warn("GaugeFunc 'stunner_allocations_active' cannot be unregistered.") -} - -// func GetListenerPacketsTotal(ch chan prometheus.Metric) { -// go func() { -// defer close(ch) -// promListenerPacketsTotal.Collect(ch) -// }() -// } diff --git a/internal/telemetry/statsconn.go b/internal/telemetry/statsconn.go index a83565ed..36ac9665 100644 --- a/internal/telemetry/statsconn.go +++ b/internal/telemetry/statsconn.go @@ -9,13 +9,14 @@ import ( // Listener is a net.Listener that knows how to report to Prometheus. type Listener struct { net.Listener - name string - connType ConnType + name string + connType ConnType + telemetry *Telemetry } // NewListener creates a net.Listener that knows its name and type. -func NewListener(l net.Listener, n string, t ConnType) *Listener { - return &Listener{Listener: l, name: n, connType: t} +func NewListener(l net.Listener, n string, t ConnType, tm *Telemetry) *Listener { + return &Listener{Listener: l, name: n, connType: t, telemetry: tm} } // Accept accepts a new connection on a Listener. @@ -25,28 +26,29 @@ func (l *Listener) Accept() (net.Conn, error) { return nil, err } - return NewConn(conn, l.name, l.connType), nil + return NewConn(conn, l.name, l.connType, l.telemetry), nil } // Conn is a net.Conn that knows how to report to Prometheus. type Conn struct { net.Conn - name string - connType ConnType + name string + connType ConnType + telemetry *Telemetry } // NewConn allocates a stats conn that knows its name and type. -func NewConn(c net.Conn, n string, t ConnType) *Conn { - AddConnection(n, t) - return &Conn{Conn: c, name: n, connType: t} +func NewConn(c net.Conn, n string, t ConnType, tm *Telemetry) *Conn { + tm.AddConnection(n, t) + return &Conn{Conn: c, name: n, connType: t, telemetry: tm} } // Read reads from the Conn. func (c *Conn) Read(b []byte) (n int, err error) { n, err = c.Conn.Read(b) if n > 0 { - IncrementBytes(c.name, c.connType, Incoming, uint64(n)) - IncrementPackets(c.name, c.connType, Incoming, 1) + c.telemetry.IncrementBytes(c.name, c.connType, Incoming, uint64(n)) + c.telemetry.IncrementPackets(c.name, c.connType, Incoming, 1) } return } @@ -55,37 +57,38 @@ func (c *Conn) Read(b []byte) (n int, err error) { func (c *Conn) Write(b []byte) (n int, err error) { n, err = c.Conn.Write(b) if n > 0 { - IncrementBytes(c.name, c.connType, Outgoing, uint64(n)) - IncrementPackets(c.name, c.connType, Outgoing, 1) + c.telemetry.IncrementBytes(c.name, c.connType, Outgoing, uint64(n)) + c.telemetry.IncrementPackets(c.name, c.connType, Outgoing, 1) } return } // Close closes the Conn. func (c *Conn) Close() error { - SubConnection(c.name, c.connType) + c.telemetry.SubConnection(c.name, c.connType) return c.Conn.Close() } // PacketConn is a net.PacketConn that knows how to report to Prometheus. type PacketConn struct { net.PacketConn - name string - connType ConnType + name string + connType ConnType + telemetry *Telemetry } // NewPacketConn decorates a PacketConnn with metric reporting. -func NewPacketConn(c net.PacketConn, n string, t ConnType) *PacketConn { - AddConnection(n, t) - return &PacketConn{PacketConn: c, name: n, connType: t} +func NewPacketConn(c net.PacketConn, n string, t ConnType, tm *Telemetry) *PacketConn { + tm.AddConnection(n, t) + return &PacketConn{PacketConn: c, name: n, connType: t, telemetry: tm} } // ReadFrom reads from the PacketConn. func (c *PacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { n, addr, err = c.PacketConn.ReadFrom(p) if n > 0 { - IncrementBytes(c.name, c.connType, Incoming, uint64(n)) - IncrementPackets(c.name, c.connType, Incoming, 1) + c.telemetry.IncrementBytes(c.name, c.connType, Incoming, uint64(n)) + c.telemetry.IncrementPackets(c.name, c.connType, Incoming, 1) } return } @@ -94,8 +97,8 @@ func (c *PacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { func (c *PacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { n, err = c.PacketConn.WriteTo(p, addr) if n > 0 { - IncrementBytes(c.name, c.connType, Outgoing, uint64(n)) - IncrementPackets(c.name, c.connType, Outgoing, 1) + c.telemetry.IncrementBytes(c.name, c.connType, Outgoing, uint64(n)) + c.telemetry.IncrementPackets(c.name, c.connType, Outgoing, 1) } return } @@ -104,6 +107,6 @@ func (c *PacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { // WriteTo writes to the PacketConn. // Close closes the PacketConn. func (c *PacketConn) Close() error { - SubConnection(c.name, c.connType) + c.telemetry.SubConnection(c.name, c.connType) return c.PacketConn.Close() } diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go new file mode 100644 index 00000000..4a186c92 --- /dev/null +++ b/internal/telemetry/telemetry.go @@ -0,0 +1,233 @@ +package telemetry + +import ( + "context" + "fmt" + "time" + + "github.com/pion/logging" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" +) + +const ( + stunnerInstrumentName = "stunner" + closeTimeout = 250 * time.Millisecond +) + +// Callbacks lets the caller to define various callbacks for reporting active metrics from an +// object that cannot be reached from this subpackage. This interface allows to easily add new +// metric reporters. +type Callbacks struct { + // GetAllocationCount should map to the total allocation counter of the server. + GetAllocationCount func() int64 +} + +type Telemetry struct { + sdkmetric.Reader + + meter metric.Meter + provider *sdkmetric.MeterProvider + ctx context.Context + cancel context.CancelFunc + + // Metrics instruments + ListenerPacketsCounter metric.Int64Counter + ListenerBytesCounter metric.Int64Counter + ListenerConnsCounter metric.Int64Counter + ListenerConnsGauge metric.Int64UpDownCounter + ClusterPacketsCounter metric.Int64Counter + ClusterBytesCounter metric.Int64Counter + AllocationsGauge metric.Int64ObservableGauge + + callbacks Callbacks + + log logging.LeveledLogger +} + +func New(callbacks Callbacks, dryRun bool, log logging.LeveledLogger) (*Telemetry, error) { + var reader sdkmetric.Reader + + resource, err := resource.Merge(resource.Default(), + resource.NewWithAttributes(semconv.SchemaURL, semconv.ServiceNameKey.String("stunner"))) + if err != nil { + return nil, fmt.Errorf("could not create OTEL resource: %w", err) + } + + if dryRun { + // Use manual collection mode + reader = sdkmetric.NewManualReader() + } else { + // Create a new Prometheus exporter (starts background collection) + exporter, err := prometheus.New() + if err != nil { + return nil, err + } + reader = exporter + } + + // Create a new MeterProvider with the Prometheus exporter + provider := sdkmetric.NewMeterProvider( + sdkmetric.WithResource(resource), + sdkmetric.WithReader(reader), + ) + + ctx, cancel := context.WithCancel(context.Background()) + t := &Telemetry{ + Reader: reader, + meter: provider.Meter(stunnerInstrumentName), + provider: provider, + callbacks: callbacks, + ctx: ctx, + cancel: cancel, + log: log, + } + + if err := t.init(); err != nil { + return nil, err + } + return t, nil +} + +// Close cleanly shuts down the meter provider and blocks until the shutdown cycle is finished or a +// timout expires. +func (t *Telemetry) Close() error { + ctx, cancel := context.WithTimeout(t.ctx, closeTimeout) + defer cancel() + defer t.cancel() + return t.provider.Shutdown(ctx) +} + +func (t *Telemetry) Ctx() context.Context { + return t.ctx +} + +func (t *Telemetry) init() error { + var err error + + // Initialize listener metrics + t.ListenerPacketsCounter, err = t.meter.Int64Counter( + stunnerInstrumentName+"_listener_packets_total", + metric.WithDescription("Number of datagrams sent or received at a listener"), + ) + if err != nil { + return err + } + + t.ListenerBytesCounter, err = t.meter.Int64Counter( + stunnerInstrumentName+"_listener_bytes_total", + metric.WithDescription("Number of bytes sent or received at a listener"), + ) + if err != nil { + return err + } + + t.ListenerConnsCounter, err = t.meter.Int64Counter( + stunnerInstrumentName+"_listener_connections_total", + metric.WithDescription("Number of all downstream connections observed at a listener"), + ) + if err != nil { + return err + } + + t.ListenerConnsGauge, err = t.meter.Int64UpDownCounter( + stunnerInstrumentName+"_listener_connections", + metric.WithDescription("Number of active downstream connections at a listener"), + ) + if err != nil { + return err + } + + // Initialize cluster metrics + t.ClusterPacketsCounter, err = t.meter.Int64Counter( + stunnerInstrumentName+"_cluster_packets_total", + metric.WithDescription("Number of datagrams sent to or received from backends"), + ) + if err != nil { + return err + } + + t.ClusterBytesCounter, err = t.meter.Int64Counter( + stunnerInstrumentName+"_cluster_bytes_total", + metric.WithDescription("Number of bytes sent to or received from backends"), + ) + if err != nil { + return err + } + + t.AllocationsGauge, err = t.meter.Int64ObservableGauge( + stunnerInstrumentName+"_allocations_active", + metric.WithDescription("Number of active allocations"), + ) + if err != nil { + return err + } + + _, err = t.meter.RegisterCallback( + func(_ context.Context, o metric.Observer) error { + o.ObserveInt64(t.AllocationsGauge, t.callbacks.GetAllocationCount()) + return nil + }, + t.AllocationsGauge, + ) + if err != nil { + return err + } + + return nil +} + +func (t *Telemetry) IncrementPackets(n string, c ConnType, d Direction, count uint64) { + attrs := metric.WithAttributes( + attribute.String("name", n), + attribute.String("direction", d.String()), + ) + + switch c { + case ListenerType: + t.ListenerPacketsCounter.Add(t.ctx, int64(count), attrs) + case ClusterType: + t.ClusterPacketsCounter.Add(t.ctx, int64(count), attrs) + } +} + +func (t *Telemetry) IncrementBytes(n string, c ConnType, d Direction, count uint64) { + attrs := metric.WithAttributes( + attribute.String("name", n), + attribute.String("direction", d.String()), + ) + + switch c { + case ListenerType: + t.ListenerBytesCounter.Add(t.ctx, int64(count), attrs) + case ClusterType: + t.ClusterBytesCounter.Add(t.ctx, int64(count), attrs) + } +} + +func (t *Telemetry) AddConnection(n string, c ConnType) { + attrs := metric.WithAttributes(attribute.String("name", n)) + + switch c { + case ListenerType: + t.ListenerConnsGauge.Add(t.ctx, 1, attrs) + t.ListenerConnsCounter.Add(t.ctx, 1, attrs) + case ClusterType: + // Cluster connection metrics are disabled + } +} + +func (t *Telemetry) SubConnection(n string, c ConnType) { + attrs := metric.WithAttributes(attribute.String("name", n)) + + switch c { + case ListenerType: + t.ListenerConnsGauge.Add(t.ctx, -1, attrs) + case ClusterType: + // Cluster connection metrics are disabled + } +} diff --git a/internal/telemetry/tester/tester.go b/internal/telemetry/tester/tester.go new file mode 100644 index 00000000..55336322 --- /dev/null +++ b/internal/telemetry/tester/tester.go @@ -0,0 +1,119 @@ +package tester + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/attribute" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +// Tester provides testing utilities for OpenTelemetry metrics +type Tester struct { + sdkmetric.Reader + *testing.T +} + +// New creates a new test helper and returns both the helper and a MeterProvider +// that can be used to initialize the system under test +func New(reader sdkmetric.Reader, t *testing.T) *Tester { + return &Tester{Reader: reader, T: t} +} + +// CollectAndCount returns the number of metrics with the given name and attributes +func (h *Tester) CollectAndCount(name string) int { + metrics := &metricdata.ResourceMetrics{} + err := h.Collect(context.Background(), metrics) + assert.NoError(h, err, "failed to collect metrics: %v") + + for _, scope := range metrics.ScopeMetrics { + for _, m := range scope.Metrics { + if m.Name != name { + continue + } + + sum, ok := m.Data.(metricdata.Sum[int64]) + assert.True(h, ok, fmt.Sprintf("metric %s is not a Sum", name)) + + return len(sum.DataPoints) + } + } + + return 0 +} + +// CollectAndGetInt returns the value of the metric with given name and attributes. +func (h *Tester) CollectAndGetInt(name string, attrs ...string) int { + assert.True(h, len(attrs)%2 == 0, "odd number of attribute key-value pairs") + + metrics := &metricdata.ResourceMetrics{} + err := h.Collect(context.Background(), metrics) + assert.NoError(h, err, "failed to collect metrics: %v") + + for _, scope := range metrics.ScopeMetrics { + for _, m := range scope.Metrics { + if m.Name != name { + continue + } + + sum, ok := m.Data.(metricdata.Sum[int64]) + assert.True(h, ok, fmt.Sprintf("metric %s is not a Sum", name)) + + for _, dp := range sum.DataPoints { + matches := true + for i := 0; i < len(attrs); i += 2 { + if val, ok := dp.Attributes.Value(attribute.Key(attrs[i])); !ok || val.AsString() != attrs[i+1] { + matches = false + break + } + } + if matches { + return int(dp.Value) + } + } + } + } + + return 0 +} + +// CollectAndDump returns the metrics with the given name and attributes as a string. +func (h *Tester) CollectAndDump(name string, attrs ...string) string { + metrics := &metricdata.ResourceMetrics{} + if err := h.Collect(context.Background(), metrics); err != nil { + return "" + } + + ret := []string{} + for _, scope := range metrics.ScopeMetrics { + for _, m := range scope.Metrics { + if m.Name != name { + continue + } + + sum, ok := m.Data.(metricdata.Sum[int64]) + if !ok { + return "" + } + + for _, dp := range sum.DataPoints { + matches := true + for i := 0; i < len(attrs); i += 2 { + if val, ok := dp.Attributes.Value(attribute.Key(attrs[i])); !ok || val.AsString() != attrs[i+1] { + matches = false + break + } + } + if matches { + ret = append(ret, fmt.Sprintf("%v", dp)) + } + } + } + } + + return strings.Join(ret, ",") +} diff --git a/internal/util/conn.go b/internal/util/conn.go deleted file mode 100644 index 459d9bd8..00000000 --- a/internal/util/conn.go +++ /dev/null @@ -1,93 +0,0 @@ -package util - -import ( - "fmt" - "net" - "os" - "time" - - "github.com/l7mp/stunner/internal/telemetry" - "github.com/pion/transport/v3" -) - -type FileConnAddr struct { - File *os.File -} - -func (s *FileConnAddr) Network() string { return "file" } -func (s *FileConnAddr) String() string { return s.File.Name() } - -type FileConn struct { - file *os.File -} - -func (f *FileConn) Read(b []byte) (n int, err error) { - return f.file.Read(b) -} - -func (f *FileConn) Write(b []byte) (n int, err error) { - return f.file.Write(b) -} - -func (f *FileConn) Close() error { - return f.file.Close() -} - -func (f *FileConn) LocalAddr() net.Addr { - return &FileConnAddr{File: f.file} -} - -func (f *FileConn) RemoteAddr() net.Addr { - return &FileConnAddr{File: f.file} -} - -func (f *FileConn) SetDeadline(t time.Time) error { - return nil -} - -func (f *FileConn) SetReadDeadline(t time.Time) error { - return nil -} - -func (f *FileConn) SetWriteDeadline(t time.Time) error { - return nil -} - -// NewFileConn returns a wrapper that shows an os.File as a net.Conn. -func NewFileConn(file *os.File) net.Conn { - return &FileConn{file: file} -} - -// PacketConnPool is a factory to create pools of related PacketConns, which may either be a set of -// PacketConns bound to the same local IP using SO_REUSEPORT (on unix, under certain circumstances) -// that can do multithreaded readloops, or a single PacketConn as a fallback for non-unic -// architectures and for testing. -type PacketConnPool interface { - // Make creates a PacketConnPool, caller must make sure to close the sockets. - Make(network, address string) ([]net.PacketConn, error) - // Size returns the number of sockets in the pool. - Size() int -} - -// defaultPacketConPool implements a socketpool that consists of only a single socket, used as a fallback for architectures that do not support SO_REUSEPORT or when socket pooling is disabled. -type defaultPacketConnPool struct { - transport.Net - listenerName string -} - -// Make creates a PacketConnPool, caller must make sure to close the sockets. -func (p *defaultPacketConnPool) Make(network, address string) ([]net.PacketConn, error) { - conns := []net.PacketConn{} - - conn, err := p.ListenPacket(network, address) - if err != nil { - return []net.PacketConn{}, fmt.Errorf("failed to create PacketConn at %s "+ - "(REUSEPORT: false): %s", address, err) - } - - conn = telemetry.NewPacketConn(conn, p.listenerName, telemetry.ListenerType) - conns = append(conns, conn) - return conns, nil -} - -func (p *defaultPacketConnPool) Size() int { return 1 } diff --git a/internal/util/file_conn.go b/internal/util/file_conn.go new file mode 100644 index 00000000..d7b8c5c7 --- /dev/null +++ b/internal/util/file_conn.go @@ -0,0 +1,55 @@ +package util + +import ( + "net" + "os" + "time" +) + +type FileConnAddr struct { + File *os.File +} + +func (s *FileConnAddr) Network() string { return "file" } +func (s *FileConnAddr) String() string { return s.File.Name() } + +type FileConn struct { + file *os.File +} + +func (f *FileConn) Read(b []byte) (n int, err error) { + return f.file.Read(b) +} + +func (f *FileConn) Write(b []byte) (n int, err error) { + return f.file.Write(b) +} + +func (f *FileConn) Close() error { + return f.file.Close() +} + +func (f *FileConn) LocalAddr() net.Addr { + return &FileConnAddr{File: f.file} +} + +func (f *FileConn) RemoteAddr() net.Addr { + return &FileConnAddr{File: f.file} +} + +func (f *FileConn) SetDeadline(t time.Time) error { + return nil +} + +func (f *FileConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (f *FileConn) SetWriteDeadline(t time.Time) error { + return nil +} + +// NewFileConn returns a wrapper that shows an os.File as a net.Conn. +func NewFileConn(file *os.File) net.Conn { + return &FileConn{file: file} +} diff --git a/internal/util/socketpool.go b/internal/util/socketpool.go index c02b9281..2c63a6d3 100644 --- a/internal/util/socketpool.go +++ b/internal/util/socketpool.go @@ -1,15 +1,45 @@ -//go:build !linux - package util import ( + "fmt" + "net" + + "github.com/l7mp/stunner/internal/telemetry" "github.com/pion/transport/v3" ) -// NewPacketConnPool creates a new packet connection pool which is fixed to a single connection, -// used if threadNum is zero or if we are running on top of transport.VNet (which does not support -// reuseport), or if we are on non-unix, see the fallback in socketpool.go. -func NewPacketConnPool(listenerName string, vnet transport.Net, threadNum int) PacketConnPool { - // default to a single socket for vnet or if udp multithreading is disabled - return &defaultPacketConnPool{Net: vnet, listenerName: listenerName} +// PacketConnPool is a factory to create pools of related PacketConns, which may either be a set of +// PacketConns bound to the same local IP using SO_REUSEPORT (on unix, under certain circumstances) +// that can do multithreaded readloops, or a single PacketConn as a fallback for non-unic +// architectures and for testing. +type PacketConnPool interface { + // Make creates a PacketConnPool, caller must make sure to close the sockets. + Make(network, address string) ([]net.PacketConn, error) + // Size returns the number of sockets in the pool. + Size() int } + +// defaultPacketConPool implements a socketpool that consists of only a single socket, used as a +// fallback for architectures that do not support SO_REUSEPORT or when socket pooling is disabled. +type defaultPacketConnPool struct { + transport.Net + listenerName string + telemetry *telemetry.Telemetry +} + +// Make creates a PacketConnPool, caller must make sure to close the sockets. +func (p *defaultPacketConnPool) Make(network, address string) ([]net.PacketConn, error) { + conns := []net.PacketConn{} + + conn, err := p.ListenPacket(network, address) + if err != nil { + return []net.PacketConn{}, fmt.Errorf("failed to create PacketConn at %s "+ + "(REUSEPORT: false): %s", address, err) + } + + conn = telemetry.NewPacketConn(conn, p.listenerName, telemetry.ListenerType, p.telemetry) + conns = append(conns, conn) + return conns, nil +} + +func (p *defaultPacketConnPool) Size() int { return 1 } diff --git a/internal/util/socketpool_nonunix.go b/internal/util/socketpool_nonunix.go new file mode 100644 index 00000000..23df79fe --- /dev/null +++ b/internal/util/socketpool_nonunix.go @@ -0,0 +1,20 @@ +//go:build !linux + +package util + +import ( + "github.com/l7mp/stunner/internal/telemetry" + "github.com/pion/transport/v3" +) + +// NewPacketConnPool creates a new packet connection pool which is fixed to a single connection, +// used if threadNum is zero or if we are running on top of transport.VNet (which does not support +// reuseport), or if we are on non-unix, see the fallback in socketpool.go. +func NewPacketConnPool(listenerName string, vnet transport.Net, threadNum int, t *telemetry.Telemetry) PacketConnPool { + // default to a single socket for vnet or if udp multithreading is disabled + return &defaultPacketConnPool{ + Net: vnet, + listenerName: listenerName, + telemetry: t, + } +} diff --git a/internal/util/socketpool_unix.go b/internal/util/socketpool_unix.go index d6f09764..46476256 100644 --- a/internal/util/socketpool_unix.go +++ b/internal/util/socketpool_unix.go @@ -20,12 +20,13 @@ type unixPacketConnPool struct { net.ListenConfig listenerName string size int + telemetry *telemetry.Telemetry } // NewPacketConnPool creates a new packet connection pool. Pooling is disabled if threadNum is zero // or if we are running on top of transport.VNet (which does not support reuseport), or if we are // on non-unix, see the fallback in socketpool.go. -func NewPacketConnPool(listenerName string, vnet transport.Net, threadNum int) PacketConnPool { +func NewPacketConnPool(listenerName string, vnet transport.Net, threadNum int, t *telemetry.Telemetry) PacketConnPool { // default to a single socket for vnet or if udp multithreading is disabled _, ok := vnet.(*stdnet.Net) if ok && threadNum > 0 { @@ -45,9 +46,10 @@ func NewPacketConnPool(listenerName string, vnet transport.Net, threadNum int) P }, size: threadNum, listenerName: listenerName, + telemetry: t, } } else { - return &defaultPacketConnPool{listenerName: listenerName, Net: vnet} + return &defaultPacketConnPool{listenerName: listenerName, Net: vnet, telemetry: t} } } @@ -62,7 +64,7 @@ func (p *unixPacketConnPool) Make(network, address string) ([]net.PacketConn, er return []net.PacketConn{}, fmt.Errorf("failed to create PacketConn "+ "%d at %s (REUSEPORT: %t): %s", i, address, (p.size > 0), err) } - conn = telemetry.NewPacketConn(conn, p.listenerName, telemetry.ListenerType) + conn = telemetry.NewPacketConn(conn, p.listenerName, telemetry.ListenerType, p.telemetry) conns = append(conns, conn) } diff --git a/relay.go b/relay.go index 953cfe68..c93a1f1f 100644 --- a/relay.go +++ b/relay.go @@ -56,9 +56,11 @@ type RelayGen struct { // Logger is a logger factory we can use to generate per-listener relay loggers. Logger *logger.LeveledLoggerFactory + + telemetry *telemetry.Telemetry } -func NewRelayGen(l *object.Listener, logger *logger.LeveledLoggerFactory) *RelayGen { +func NewRelayGen(l *object.Listener, t *telemetry.Telemetry, logger *logger.LeveledLoggerFactory) *RelayGen { return &RelayGen{ Listener: l, RelayAddress: l.Addr, @@ -66,6 +68,7 @@ func NewRelayGen(l *object.Listener, logger *logger.LeveledLoggerFactory) *Relay ClusterCache: lru.New(ClusterCacheSize), Net: l.Net, Logger: logger, + telemetry: t, } } @@ -86,7 +89,7 @@ func (r *RelayGen) AllocatePacketConn(network string, requestedPort int) (net.Pa return nil, nil, err } - conn = NewPortRangePacketConn(conn, r.PortRangeChecker, + conn = NewPortRangePacketConn(conn, r.PortRangeChecker, r.telemetry, r.Logger.NewLogger(fmt.Sprintf("relay-%s", r.Listener.Name))) relayAddr, ok := conn.LocalAddr().(*net.UDPAddr) @@ -145,18 +148,20 @@ func (s *Stunner) GenPortRangeChecker(g *RelayGen) PortRangeChecker { type PortRangePacketConn struct { net.PacketConn checker PortRangeChecker - log logging.LeveledLogger readDeadline time.Time + telemetry *telemetry.Telemetry lock sync.Mutex + log logging.LeveledLogger } // NewPortRangePacketConn decorates a PacketConn with filtering on a target port range. Errors are reported per listener name. -func NewPortRangePacketConn(c net.PacketConn, checker PortRangeChecker, log logging.LeveledLogger) net.PacketConn { +func NewPortRangePacketConn(c net.PacketConn, checker PortRangeChecker, t *telemetry.Telemetry, log logging.LeveledLogger) net.PacketConn { // cluster add/sub connection is not tracked // AddConnection(n, t) r := PortRangePacketConn{ PacketConn: c, checker: checker, + telemetry: t, log: log, } @@ -172,8 +177,8 @@ func (c *PortRangePacketConn) WriteTo(p []byte, peerAddr net.Addr) (int, error) n, err := c.PacketConn.WriteTo(p, peerAddr) if n > 0 { - telemetry.IncrementBytes(cluster.Name, telemetry.ClusterType, telemetry.Outgoing, uint64(n)) - telemetry.IncrementPackets(cluster.Name, telemetry.ClusterType, telemetry.Outgoing, 1) + c.telemetry.IncrementBytes(cluster.Name, telemetry.ClusterType, telemetry.Outgoing, uint64(n)) + c.telemetry.IncrementPackets(cluster.Name, telemetry.ClusterType, telemetry.Outgoing, 1) } return n, err @@ -204,8 +209,8 @@ func (c *PortRangePacketConn) ReadFrom(p []byte) (int, net.Addr, error) { } if n > 0 { - telemetry.IncrementBytes(cluster.Name, telemetry.ClusterType, telemetry.Incoming, uint64(n)) - telemetry.IncrementPackets(cluster.Name, telemetry.ClusterType, telemetry.Incoming, 1) + c.telemetry.IncrementBytes(cluster.Name, telemetry.ClusterType, telemetry.Incoming, uint64(n)) + c.telemetry.IncrementPackets(cluster.Name, telemetry.ClusterType, telemetry.Incoming, 1) } return n, peerAddr, nil diff --git a/relay_test.go b/relay_test.go index b9845570..09e665cb 100644 --- a/relay_test.go +++ b/relay_test.go @@ -35,9 +35,6 @@ func getChecker(minPort, maxPort int) PortRangeChecker { } func TestPortRangePacketConn(t *testing.T) { - telemetry.Init() - defer telemetry.Close() - lim := test.TimeOut(time.Second * 30) defer lim.Stop() @@ -53,6 +50,10 @@ func TestPortRangePacketConn(t *testing.T) { return } + tm, err := telemetry.New(telemetry.Callbacks{}, false, loggerFactory.NewLogger("metric")) + assert.NoError(t, err, "should succeed") + defer tm.Close() + t.Run("LoopbackOnValidPort", func(t *testing.T) { log.Debug("Creating base socket") addr := "127.0.0.1:15000" @@ -61,7 +62,7 @@ func TestPortRangePacketConn(t *testing.T) { msg := "PING!" log.Debug("Creating filtered packet conn wrappeer socket") - conn := NewPortRangePacketConn(baseConn, getChecker(10000, 20000), log) + conn := NewPortRangePacketConn(baseConn, getChecker(10000, 20000), tm, log) assert.NoError(t, err, "should create port-range filtered packetconn") log.Debug("Sending packet") @@ -91,7 +92,7 @@ func TestPortRangePacketConn(t *testing.T) { msg := "PING!" log.Debug("Creating filtered packet conn wrappeer socket") - conn := NewPortRangePacketConn(baseConn, getChecker(10000, 20000), log) + conn := NewPortRangePacketConn(baseConn, getChecker(10000, 20000), tm, log) assert.NoError(t, err, "should create port-range filtered packetconn") log.Debug("Sending packet") @@ -120,7 +121,7 @@ func TestPortRangePacketConn(t *testing.T) { msg := "PING!" log.Debug("Creating filtered packet conn wrappeer socket") - conn := NewPortRangePacketConn(baseConn, getChecker(15000, 15000), log) + conn := NewPortRangePacketConn(baseConn, getChecker(15000, 15000), tm, log) assert.NoError(t, err, "should create port-range filtered packetconn") log.Debug("Sending packet") @@ -145,9 +146,6 @@ func TestPortRangePacketConn(t *testing.T) { // BenchmarkPortRangePacketConn sends lots of invalid packets: this is mostly for testing the logger func BenchmarkPortRangePacketConn(b *testing.B) { - telemetry.Init() - defer telemetry.Close() - loggerFactory := logger.NewLoggerFactory(connTestLoglevel) log := loggerFactory.NewLogger("test") // relayLog := loggerFactory.WithRateLimiter(.25, 1).NewLogger("relay") @@ -159,6 +157,10 @@ func BenchmarkPortRangePacketConn(b *testing.B) { b.Fatalf("Cannot allocate vnet: %s", err.Error()) } + tm, err := telemetry.New(telemetry.Callbacks{}, false, loggerFactory.NewLogger("metric")) + assert.NoError(b, err, "should succeed") + defer tm.Close() + log.Debug("Creating base socket") addr := "127.0.0.1:25000" baseConn, err := nw.ListenPacket("udp", addr) @@ -168,7 +170,7 @@ func BenchmarkPortRangePacketConn(b *testing.B) { msg := "PING!" log.Debug("Creating filtered packet conn wrappeer socket") - conn := WithCounter(NewPortRangePacketConn(baseConn, getChecker(15000, 15000), relayLog)) + conn := WithCounter(NewPortRangePacketConn(baseConn, getChecker(15000, 15000), tm, relayLog)) if err != nil { b.Fatalf("Cannot create port-range packetconn: %s", err.Error()) } diff --git a/server.go b/server.go index 6c072eb1..b3b5613a 100644 --- a/server.go +++ b/server.go @@ -31,7 +31,7 @@ func (s *Stunner) StartServer(l *object.Listener) error { var pConns []turn.PacketConnConfig var lConns []turn.ListenerConfig - relay := NewRelayGen(l, s.logger) + relay := NewRelayGen(l, s.telemetry, s.logger) relay.PortRangeChecker = s.GenPortRangeChecker(relay) permissionHandler := s.NewPermissionHandler(l) @@ -40,7 +40,7 @@ func (s *Stunner) StartServer(l *object.Listener) error { switch l.Proto { case stnrv1.ListenerProtocolTURNUDP: - socketPool := util.NewPacketConnPool(l.Name, l.Net, s.udpThreadNum) + socketPool := util.NewPacketConnPool(l.Name, l.Net, s.udpThreadNum, s.telemetry) s.log.Infof("setting up UDP listener socket pool at %s with %d readloop threads", addr, socketPool.Size()) @@ -68,7 +68,7 @@ func (s *Stunner) StartServer(l *object.Listener) error { return fmt.Errorf("failed to create TCP listener at %s: %s", addr, err) } - tcpListener = telemetry.NewListener(tcpListener, l.Name, telemetry.ListenerType) + tcpListener = telemetry.NewListener(tcpListener, l.Name, telemetry.ListenerType, s.telemetry) conn := turn.ListenerConfig{ Listener: tcpListener, @@ -96,7 +96,7 @@ func (s *Stunner) StartServer(l *object.Listener) error { return fmt.Errorf("failed to create TLS listener at %s: %s", addr, err) } - tlsListener = telemetry.NewListener(tlsListener, l.Name, telemetry.ListenerType) + tlsListener = telemetry.NewListener(tlsListener, l.Name, telemetry.ListenerType, s.telemetry) conn := turn.ListenerConfig{ Listener: tlsListener, @@ -129,7 +129,7 @@ func (s *Stunner) StartServer(l *object.Listener) error { return fmt.Errorf("failed to create DTLS listener at %s: %s", addr, err) } - dtlsListener = telemetry.NewListener(dtlsListener, l.Name, telemetry.ListenerType) + dtlsListener = telemetry.NewListener(dtlsListener, l.Name, telemetry.ListenerType, s.telemetry) conn := turn.ListenerConfig{ Listener: dtlsListener, diff --git a/stunner.go b/stunner.go index acc1a73a..8e4c3905 100644 --- a/stunner.go +++ b/stunner.go @@ -29,6 +29,7 @@ type Stunner struct { suppressRollback, dryRun bool resolver resolver.DnsResolver udpThreadNum int + telemetry *telemetry.Telemetry logger *logger.LeveledLoggerFactory log logging.LeveledLogger net transport.Net @@ -101,13 +102,20 @@ func NewStunner(options Options) *Stunner { s.clusterManager = manager.NewManager("cluster-manager", object.NewClusterFactory(r, logger), logger) + telemetryCallbacks := telemetry.Callbacks{ + GetAllocationCount: func() int64 { return s.GetActiveConnections() }, + } + t, err := telemetry.New(telemetryCallbacks, s.dryRun, logger.NewLogger("metrics")) + if err != nil { + log.Errorf("Could not initialize metric provider: %s", err.Error()) + return nil + } + s.telemetry = t + if !s.dryRun { s.resolver.Start() - telemetry.Init() - telemetry.RegisterAllocationMetric(s.log, s.GetActiveConnections) } - // TODO: remove this when STUNner gains self-managed dataplanes s.ready = true return s @@ -275,16 +283,15 @@ func (s *Stunner) Close() { } } - telemetry.UnregisterAllocationMetric(s.log) - if !s.dryRun { - telemetry.Close() + if err := s.telemetry.Close(); err != nil { // blocks until finished + s.log.Errorf("Could not shutdown metric provider cleanly: %s", err.Error()) } s.resolver.Close() } // GetActiveConnections returns the number of active downstream (listener-side) TURN allocations. -func (s *Stunner) GetActiveConnections() float64 { +func (s *Stunner) GetActiveConnections() int64 { count := s.AllocationCount() - return float64(count) + return int64(count) } diff --git a/stunner_test.go b/stunner_test.go index 5b4f3360..b10e6b54 100644 --- a/stunner_test.go +++ b/stunner_test.go @@ -19,11 +19,10 @@ import ( "github.com/pion/transport/v3/test" "github.com/pion/transport/v3/vnet" "github.com/pion/turn/v4" - "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/l7mp/stunner/internal/resolver" - "github.com/l7mp/stunner/internal/telemetry" + telemetrytester "github.com/l7mp/stunner/internal/telemetry/tester" "github.com/l7mp/stunner/pkg/logger" stnrv1 "github.com/l7mp/stunner/pkg/apis/v1" @@ -32,12 +31,17 @@ import ( cfgclient "github.com/l7mp/stunner/pkg/config/client" ) -var stunnerTestLoglevel string = "all:ERROR" +const ( + // timeout = 200 * time.Millisecond + // interval = 50 * time.Millisecond -// var stunnerTestLoglevel string = stnrv1.DefaultLogLevel -// var stunnerTestLoglevel string = "all:INFO" -// var stunnerTestLoglevel string = "all:TRACE" -// var stunnerTestLoglevel string = "all:TRACE,vnet:INFO,turn:ERROR,turnc:ERROR" + stunnerTestLoglevel string = "all:ERROR" + + // stunnerTestLoglevel string = stnrv1.DefaultLogLevel + // stunnerTestLoglevel string = "all:INFO" + // stunnerTestLoglevel string = "all:TRACE" + // stunnerTestLoglevel string = "all:TRACE,vnet:INFO,turn:ERROR,turnc:ERROR" +) var certPem, keyPem, _ = GenerateSelfSignedKey() var certPem64 = base64.StdEncoding.EncodeToString(certPem) @@ -176,7 +180,7 @@ func stunnerEchoTest(conf echoTestConfig) { assert.NoError(t, echoConn.Close(), "cannot close echo server connection") } } - time.Sleep(150 * time.Millisecond) + time.Sleep(50 * time.Millisecond) client.Close() } @@ -655,7 +659,7 @@ type StunnerTestClusterConfig struct { config stnrv1.StunnerConfig echoServerAddr string result bool - tester func(t *testing.T) + tester func(h *telemetrytester.Tester) } var testClusterConfigsWithVNet = []StunnerTestClusterConfig{ @@ -1198,34 +1202,36 @@ var testPortRangeConfigsWithVNet = []StunnerTestClusterConfig{ }, echoServerAddr: "1.2.3.5:5678", result: true, - tester: func(t *testing.T) { - c := telemetry.ListenerConnsTotal - assert.Equal(t, 1, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.Equal(t, float64(1), testutil.ToFloat64(c.WithLabelValues("udp"))) - - g := telemetry.ListenerConnsActive - assert.Equal(t, 1, testutil.CollectAndCount(g), "ListenerConnsTotal") - assert.Equal(t, float64(1), testutil.ToFloat64(g.WithLabelValues("udp"))) - - c = telemetry.ListenerPacketsTotal - assert.Equal(t, 2, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "rx")), float64(500)) - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "tx")), float64(500)) - - c = telemetry.ListenerBytesTotal - assert.Equal(t, 2, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "rx")), float64(2000)) - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "tx")), float64(2000)) - - c = telemetry.ClusterPacketsTotal - assert.Equal(t, 2, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "rx")), float64(500)) - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "tx")), float64(500)) - - c = telemetry.ClusterBytesTotal - assert.Equal(t, 2, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "rx")), float64(2000)) - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "tx")), float64(2000)) + tester: func(h *telemetrytester.Tester) { + // stunner_listener_connections_total + assert.Equal(h, 1, h.CollectAndCount("stunner_listener_connections_total")) // name: udp + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections_total", "name", "udp")) + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections_total", "name", "udp")) + + // stunner_listener_connections + assert.Equal(h, 1, h.CollectAndCount("stunner_listener_connections")) // name: udp + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections", "name", "udp")) + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections", "name", "udp")) + + // stunner_listener_packets_total + assert.Equal(h, 2, h.CollectAndCount("stunner_listener_packets_total")) + assert.Greater(h, h.CollectAndGetInt("stunner_listener_packets_total", "name", "udp", "direction", "rx"), 20) + assert.Greater(h, h.CollectAndGetInt("stunner_listener_packets_total", "name", "udp", "direction", "tx"), 20) + + // stunner_listener_bytes_total + assert.Equal(h, 2, h.CollectAndCount("stunner_listener_bytes_total")) + assert.Greater(h, h.CollectAndGetInt("stunner_listener_bytes_total", "name", "udp", "direction", "rx"), 20) + assert.Greater(h, h.CollectAndGetInt("stunner_listener_bytes_total", "name", "udp", "direction", "tx"), 20) + + // stunner_cluster_packets_total + assert.Equal(h, 2, h.CollectAndCount("stunner_cluster_packets_total")) + assert.Greater(h, h.CollectAndGetInt("stunner_cluster_packets_total", "name", "echo-server-cluster", "direction", "rx"), 20) + assert.Greater(h, h.CollectAndGetInt("stunner_cluster_packets_total", "name", "echo-server-cluster", "direction", "tx"), 20) + + // stunner_cluster_bytes_total + assert.Equal(h, 2, h.CollectAndCount("stunner_cluster_bytes_total")) + assert.Greater(h, h.CollectAndGetInt("stunner_cluster_bytes_total", "name", "echo-server-cluster", "direction", "rx"), 20) + assert.Greater(h, h.CollectAndGetInt("stunner_cluster_bytes_total", "name", "echo-server-cluster", "direction", "tx"), 20) }, }, { @@ -1265,34 +1271,36 @@ var testPortRangeConfigsWithVNet = []StunnerTestClusterConfig{ }, echoServerAddr: "1.2.3.5:5678", result: true, - tester: func(t *testing.T) { - c := telemetry.ListenerConnsTotal - assert.Equal(t, 1, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.Equal(t, float64(1), testutil.ToFloat64(c.WithLabelValues("udp"))) - - g := telemetry.ListenerConnsActive - assert.Equal(t, 1, testutil.CollectAndCount(g), "ListenerConnsTotal") - assert.Equal(t, float64(1), testutil.ToFloat64(g.WithLabelValues("udp"))) - - c = telemetry.ListenerPacketsTotal - assert.Equal(t, 2, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "rx")), float64(500)) - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "tx")), float64(500)) - - c = telemetry.ListenerBytesTotal - assert.Equal(t, 2, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "rx")), float64(2000)) - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "tx")), float64(2000)) - - c = telemetry.ClusterPacketsTotal - assert.Equal(t, 2, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "rx")), float64(500)) - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "tx")), float64(500)) - - c = telemetry.ClusterBytesTotal - assert.Equal(t, 2, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "rx")), float64(2000)) - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "tx")), float64(2000)) + tester: func(h *telemetrytester.Tester) { + // stunner_listener_connections_total + assert.Equal(h, 1, h.CollectAndCount("stunner_listener_connections_total")) // name: udp + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections_total", "name", "udp")) + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections_total", "name", "udp")) + + // stunner_listener_connections + assert.Equal(h, 1, h.CollectAndCount("stunner_listener_connections")) // name: udp + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections", "name", "udp")) + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections", "name", "udp")) + + // stunner_listener_packets_total + assert.Equal(h, 2, h.CollectAndCount("stunner_listener_packets_total")) + assert.Greater(h, h.CollectAndGetInt("stunner_listener_packets_total", "name", "udp", "direction", "rx"), 200) + assert.Greater(h, h.CollectAndGetInt("stunner_listener_packets_total", "name", "udp", "direction", "tx"), 200) + + // stunner_listener_bytes_total + assert.Equal(h, 2, h.CollectAndCount("stunner_listener_bytes_total")) + assert.Greater(h, h.CollectAndGetInt("stunner_listener_bytes_total", "name", "udp", "direction", "rx"), 2000) + assert.Greater(h, h.CollectAndGetInt("stunner_listener_bytes_total", "name", "udp", "direction", "tx"), 2000) + + // stunner_cluster_packets_total + assert.Equal(h, 2, h.CollectAndCount("stunner_cluster_packets_total")) + assert.Greater(h, h.CollectAndGetInt("stunner_cluster_packets_total", "name", "echo-server-cluster", "direction", "rx"), 200) + assert.Greater(h, h.CollectAndGetInt("stunner_cluster_packets_total", "name", "echo-server-cluster", "direction", "tx"), 200) + + // stunner_cluster_bytes_total + assert.Equal(h, 2, h.CollectAndCount("stunner_cluster_bytes_total")) + assert.Greater(h, h.CollectAndGetInt("stunner_cluster_bytes_total", "name", "echo-server-cluster", "direction", "rx"), 2000) + assert.Greater(h, h.CollectAndGetInt("stunner_cluster_bytes_total", "name", "echo-server-cluster", "direction", "tx"), 2000) }, }, { @@ -1328,34 +1336,36 @@ var testPortRangeConfigsWithVNet = []StunnerTestClusterConfig{ }, echoServerAddr: "1.2.3.5:5678", result: false, - tester: func(t *testing.T) { - c := telemetry.ListenerConnsTotal - assert.Equal(t, 1, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.Equal(t, float64(1), testutil.ToFloat64(c.WithLabelValues("udp"))) - - g := telemetry.ListenerConnsActive - assert.Equal(t, 1, testutil.CollectAndCount(g), "ListenerConnsTotal") - assert.Equal(t, float64(1), testutil.ToFloat64(g.WithLabelValues("udp"))) - - c = telemetry.ListenerPacketsTotal - assert.Equal(t, 2, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "rx")), float64(500)) // signaling+data - assert.LessOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "tx")), float64(50)) // just signaling - - c = telemetry.ListenerBytesTotal - assert.Equal(t, 2, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "rx")), float64(1000)) // signaling+data - assert.LessOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "tx")), float64(1000)) // just signaling - - c = telemetry.ClusterPacketsTotal - assert.Equal(t, 0, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.Equal(t, float64(0), testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "rx"))) - assert.Equal(t, float64(0), testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "tx"))) - - c = telemetry.ClusterBytesTotal - assert.Equal(t, 0, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.Equal(t, float64(0), testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "rx"))) - assert.Equal(t, float64(0), testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "tx"))) + tester: func(h *telemetrytester.Tester) { + // stunner_listener_connections_total + assert.Equal(h, 1, h.CollectAndCount("stunner_listener_connections_total")) // name: udp + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections_total", "name", "udp")) + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections_total", "name", "udp")) + + // stunner_listener_connections + assert.Equal(h, 1, h.CollectAndCount("stunner_listener_connections")) // name: udp + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections", "name", "udp")) + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections", "name", "udp")) + + // stunner_listener_packets_total + assert.Equal(h, 2, h.CollectAndCount("stunner_listener_packets_total")) + assert.Greater(h, h.CollectAndGetInt("stunner_listener_packets_total", "name", "udp", "direction", "rx"), 500) // signaling+data + assert.Less(h, h.CollectAndGetInt("stunner_listener_packets_total", "name", "udp", "direction", "tx"), 50) // just signaling + + // stunner_listener_bytes_total + assert.Equal(h, 2, h.CollectAndCount("stunner_listener_bytes_total")) + assert.Greater(h, h.CollectAndGetInt("stunner_listener_bytes_total", "name", "udp", "direction", "rx"), 1000) // signaling+data + assert.Less(h, h.CollectAndGetInt("stunner_listener_bytes_total", "name", "udp", "direction", "tx"), 1000) // just signaling + + // stunner_cluster_packets_total + assert.Equal(h, 0, h.CollectAndCount("stunner_cluster_packets_total")) + assert.Equal(h, 0, h.CollectAndGetInt("stunner_cluster_packets_total", "name", "echo-server-cluster", "direction", "rx")) // fail + assert.Equal(h, 0, h.CollectAndGetInt("stunner_cluster_packets_total", "name", "echo-server-cluster", "direction", "tx")) // fail + + // stunner_cluster_bytes_total + assert.Equal(h, 0, h.CollectAndCount("stunner_cluster_bytes_total")) + assert.Equal(h, 0, h.CollectAndGetInt("stunner_cluster_bytes_total", "name", "echo-server-cluster", "direction", "rx")) // fail + assert.Equal(h, 0, h.CollectAndGetInt("stunner_cluster_bytes_total", "name", "echo-server-cluster", "direction", "tx")) // fail }, }, { @@ -1391,34 +1401,36 @@ var testPortRangeConfigsWithVNet = []StunnerTestClusterConfig{ }, echoServerAddr: "1.2.3.5:5678", result: false, - tester: func(t *testing.T) { - c := telemetry.ListenerConnsTotal - assert.Equal(t, 1, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.Equal(t, float64(1), testutil.ToFloat64(c.WithLabelValues("udp"))) - - g := telemetry.ListenerConnsActive - assert.Equal(t, 1, testutil.CollectAndCount(g), "ListenerConnsTotal") - assert.Equal(t, float64(1), testutil.ToFloat64(g.WithLabelValues("udp"))) - - c = telemetry.ListenerPacketsTotal - assert.Equal(t, 2, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "rx")), float64(500)) // signaling+data - assert.LessOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "tx")), float64(50)) // just signaling - - c = telemetry.ListenerBytesTotal - assert.Equal(t, 2, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "rx")), float64(1000)) // signaling+data - assert.LessOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "tx")), float64(1000)) // just signaling - - c = telemetry.ClusterPacketsTotal - assert.Equal(t, 0, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.Equal(t, float64(0), testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "rx"))) - assert.Equal(t, float64(0), testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "tx"))) - - c = telemetry.ClusterBytesTotal - assert.Equal(t, 0, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.Equal(t, float64(0), testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "rx"))) - assert.Equal(t, float64(0), testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "tx"))) + tester: func(h *telemetrytester.Tester) { + // stunner_listener_connections_total + assert.Equal(h, 1, h.CollectAndCount("stunner_listener_connections_total")) // name: udp + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections_total", "name", "udp")) + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections_total", "name", "udp")) + + // stunner_listener_connections + assert.Equal(h, 1, h.CollectAndCount("stunner_listener_connections")) // name: udp + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections", "name", "udp")) + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections", "name", "udp")) + + // stunner_listener_packets_total + assert.Equal(h, 2, h.CollectAndCount("stunner_listener_packets_total")) + assert.Greater(h, h.CollectAndGetInt("stunner_listener_packets_total", "name", "udp", "direction", "rx"), 500) // signaling+data + assert.Less(h, h.CollectAndGetInt("stunner_listener_packets_total", "name", "udp", "direction", "tx"), 50) // just signaling + + // stunner_listener_bytes_total + assert.Equal(h, 2, h.CollectAndCount("stunner_listener_bytes_total")) + assert.Greater(h, h.CollectAndGetInt("stunner_listener_bytes_total", "name", "udp", "direction", "rx"), 1000) // signaling+data + assert.Less(h, h.CollectAndGetInt("stunner_listener_bytes_total", "name", "udp", "direction", "tx"), 1000) // just signaling + + // stunner_cluster_packets_total + assert.Equal(h, 0, h.CollectAndCount("stunner_cluster_packets_total")) + assert.Equal(h, 0, h.CollectAndGetInt("stunner_cluster_packets_total", "name", "echo-server-cluster", "direction", "rx")) // fail + assert.Equal(h, 0, h.CollectAndGetInt("stunner_cluster_packets_total", "name", "echo-server-cluster", "direction", "tx")) // fail + + // stunner_cluster_bytes_total + assert.Equal(h, 0, h.CollectAndCount("stunner_cluster_bytes_total")) + assert.Equal(h, 0, h.CollectAndGetInt("stunner_cluster_bytes_total", "name", "echo-server-cluster", "direction", "rx")) // fail + assert.Equal(h, 0, h.CollectAndGetInt("stunner_cluster_bytes_total", "name", "echo-server-cluster", "direction", "tx")) // fail }, }, { @@ -1458,34 +1470,36 @@ var testPortRangeConfigsWithVNet = []StunnerTestClusterConfig{ }, echoServerAddr: "1.2.3.5:5678", result: true, - tester: func(t *testing.T) { - c := telemetry.ListenerConnsTotal - assert.Equal(t, 1, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.Equal(t, float64(1), testutil.ToFloat64(c.WithLabelValues("udp"))) - - g := telemetry.ListenerConnsActive - assert.Equal(t, 1, testutil.CollectAndCount(g), "ListenerConnsTotal") - assert.Equal(t, float64(1), testutil.ToFloat64(g.WithLabelValues("udp"))) - - c = telemetry.ListenerPacketsTotal - assert.Equal(t, 2, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "rx")), float64(500)) - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "tx")), float64(500)) - - c = telemetry.ListenerBytesTotal - assert.Equal(t, 2, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "rx")), float64(2000)) - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("udp", "tx")), float64(2000)) - - c = telemetry.ClusterPacketsTotal - assert.Equal(t, 2, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "rx")), float64(500)) - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "tx")), float64(500)) - - c = telemetry.ClusterBytesTotal - assert.Equal(t, 2, testutil.CollectAndCount(c), "ListenerConnsTotal") - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "rx")), float64(2000)) - assert.GreaterOrEqual(t, testutil.ToFloat64(c.WithLabelValues("echo-server-cluster", "tx")), float64(2000)) + tester: func(h *telemetrytester.Tester) { + // stunner_listener_connections_total + assert.Equal(h, 1, h.CollectAndCount("stunner_listener_connections_total")) // name: udp + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections_total", "name", "udp")) + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections_total", "name", "udp")) + + // stunner_listener_connections + assert.Equal(h, 1, h.CollectAndCount("stunner_listener_connections")) // name: udp + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections", "name", "udp")) + assert.Equal(h, 1, h.CollectAndGetInt("stunner_listener_connections", "name", "udp")) + + // stunner_listener_packets_total + assert.Equal(h, 2, h.CollectAndCount("stunner_listener_packets_total")) + assert.Greater(h, h.CollectAndGetInt("stunner_listener_packets_total", "name", "udp", "direction", "rx"), 200) + assert.Greater(h, h.CollectAndGetInt("stunner_listener_packets_total", "name", "udp", "direction", "tx"), 200) + + // stunner_listener_bytes_total + assert.Equal(h, 2, h.CollectAndCount("stunner_listener_bytes_total")) + assert.Greater(h, h.CollectAndGetInt("stunner_listener_bytes_total", "name", "udp", "direction", "rx"), 2000) + assert.Greater(h, h.CollectAndGetInt("stunner_listener_bytes_total", "name", "udp", "direction", "tx"), 2000) + + // stunner_cluster_packets_total + assert.Equal(h, 2, h.CollectAndCount("stunner_cluster_packets_total")) + assert.Greater(h, h.CollectAndGetInt("stunner_cluster_packets_total", "name", "echo-server-cluster", "direction", "rx"), 200) + assert.Greater(h, h.CollectAndGetInt("stunner_cluster_packets_total", "name", "echo-server-cluster", "direction", "tx"), 200) + + // stunner_cluster_bytes_total + assert.Equal(h, 2, h.CollectAndCount("stunner_cluster_bytes_total")) + assert.Greater(h, h.CollectAndGetInt("stunner_cluster_bytes_total", "name", "echo-server-cluster", "direction", "rx"), 2000) + assert.Greater(h, h.CollectAndGetInt("stunner_cluster_bytes_total", "name", "echo-server-cluster", "direction", "tx"), 2000) }, }, // TODO: implement port-range filtering for DNS clusters @@ -1590,6 +1604,7 @@ func TestStunnerPortRangeWithVNet(t *testing.T) { stunner := NewStunner(Options{ LogLevel: stunnerTestLoglevel, SuppressRollback: true, + DryRun: false, Resolver: mockDns, Net: v.podnet, }) @@ -1620,7 +1635,7 @@ func TestStunnerPortRangeWithVNet(t *testing.T) { stunnerEchoFloodTest(testConfig) if c.tester != nil { - c.tester(t) + c.tester(telemetrytester.New(stunner.telemetry, t)) } assert.NoError(t, lconn.Close(), "cannot close TURN client connection") @@ -1632,6 +1647,7 @@ func TestStunnerPortRangeWithVNet(t *testing.T) { func stunnerEchoFloodTest(conf echoTestConfig) { t := conf.t + t.Helper() log := conf.loggerFactory.NewLogger("test") client, err := turn.NewClient(&turn.ClientConfig{