diff --git a/pkg/providers/ingress/translation/annotations/types.go b/pkg/providers/ingress/translation/annotations/types.go index aac0a0baf4..322e6aa6bc 100644 --- a/pkg/providers/ingress/translation/annotations/types.go +++ b/pkg/providers/ingress/translation/annotations/types.go @@ -29,10 +29,11 @@ const ( AnnotationsUpstreamScheme = AnnotationsPrefix + "upstream-scheme" //support retries and timeouts on upstream - AnnotationsUpstreamRetry = AnnotationsPrefix + "upstream-retries" - AnnotationsUpstreamTimeoutConnect = AnnotationsPrefix + "upstream-connect-timeout" - AnnotationsUpstreamTimeoutRead = AnnotationsPrefix + "upstream-read-timeout" - AnnotationsUpstreamTimeoutSend = AnnotationsPrefix + "upstream-send-timeout" + AnnotationsUpstreamRetry = AnnotationsPrefix + "upstream-retries" + AnnotationsUpstreamTimeoutConnect = AnnotationsPrefix + "upstream-connect-timeout" + AnnotationsUpstreamTimeoutRead = AnnotationsPrefix + "upstream-read-timeout" + AnnotationsUpstreamTimeoutSend = AnnotationsPrefix + "upstream-send-timeout" + AnnotationsUpstreamResolveGranularity = AnnotationsPrefix + "upstream-resolve-granularity" ) const ( diff --git a/pkg/providers/ingress/translation/annotations/upstream/upstream.go b/pkg/providers/ingress/translation/annotations/upstream/upstream.go index 02692d9fc9..f15c0020d1 100644 --- a/pkg/providers/ingress/translation/annotations/upstream/upstream.go +++ b/pkg/providers/ingress/translation/annotations/upstream/upstream.go @@ -28,11 +28,12 @@ func NewParser() annotations.IngressAnnotationsParser { } type Upstream struct { - Scheme string - Retry int - TimeoutRead int - TimeoutConnect int - TimeoutSend int + Scheme string + Retry int + TimeoutRead int + TimeoutConnect int + TimeoutSend int + ResolveGranularity string } func (u *Upstream) Parse(e annotations.Extractor) (interface{}, error) { @@ -85,5 +86,10 @@ func (u *Upstream) Parse(e annotations.Extractor) (interface{}, error) { u.TimeoutSend = t } + resolveGranularity := strings.TrimSuffix(e.GetStringAnnotation(annotations.AnnotationsUpstreamResolveGranularity), "s") + if resolveGranularity != "" { + u.ResolveGranularity = resolveGranularity + } + return *u, nil } diff --git a/pkg/providers/ingress/translation/translator.go b/pkg/providers/ingress/translation/translator.go index 30f30c3d38..707eb53405 100644 --- a/pkg/providers/ingress/translation/translator.go +++ b/pkg/providers/ingress/translation/translator.go @@ -22,6 +22,7 @@ import ( "crypto/sha1" "encoding/base64" "encoding/json" + "errors" "fmt" "strings" @@ -192,7 +193,7 @@ func (t *translator) translateIngressV1(ing *networkingv1.Ingress, skipVerify bo if skipVerify { ups = t.translateDefaultUpstreamFromIngressV1(ns, pathRule.Backend.Service) } else { - ups, err = t.translateUpstreamFromIngressV1(ns, pathRule.Backend.Service) + ups, err = t.translateUpstreamFromIngressV1(ns, pathRule.Backend.Service, ingress.Upstream.ResolveGranularity) if err != nil { log.Errorw("failed to translate ingress backend to upstream", zap.Error(err), @@ -317,7 +318,7 @@ func (t *translator) translateIngressV1beta1(ing *networkingv1beta1.Ingress, ski if skipVerify { ups = t.translateDefaultUpstreamFromIngressV1beta1(ns, pathRule.Backend.ServiceName, pathRule.Backend.ServicePort) } else { - ups, err = t.translateUpstreamFromIngressV1beta1(ns, pathRule.Backend.ServiceName, pathRule.Backend.ServicePort) + ups, err = t.translateUpstreamFromIngressV1beta1(ns, pathRule.Backend.ServiceName, pathRule.Backend.ServicePort, ingress.Upstream.ResolveGranularity) if err != nil { log.Errorw("failed to translate ingress backend to upstream", zap.Error(err), @@ -434,33 +435,29 @@ func (t *translator) translateDefaultUpstreamFromIngressV1(namespace string, bac ups.ID = id.GenID(ups.Name) return ups } -func (t *translator) translateUpstreamFromIngressV1(namespace string, backend *networkingv1.IngressServiceBackend) (*apisixv1.Upstream, error) { - var svcPort int32 - if backend.Port.Name != "" { - svc, err := t.ServiceLister.Services(namespace).Get(backend.Name) - if err != nil { - return nil, err - } - for _, port := range svc.Spec.Ports { - if port.Name == backend.Port.Name { - svcPort = port.Port - break - } - } - if svcPort == 0 { - return nil, &translation.TranslateError{ - Field: "service", - Reason: "port not found", - } - } - } else { - svcPort = backend.Port.Number +func (t *translator) translateUpstreamFromIngressV1(namespace string, backend *networkingv1.IngressServiceBackend, ResolveGranularity string) (*apisixv1.Upstream, error) { + if ResolveGranularity == "" { + ResolveGranularity = types.ResolveGranularity.Endpoint + } + svcClusterIP, svcPort, err := t.GetServiceClusterIPAndPort(backend.Name, backend.Port.Name, backend.Port.Number, namespace, ResolveGranularity) + if err != nil { + return nil, err } + ups, err := t.TranslateService(namespace, backend.Name, "", svcPort) if err != nil { return nil, err } - ups.Name = apisixv1.ComposeUpstreamName(namespace, backend.Name, "", svcPort, types.ResolveGranularity.Endpoint) + if types.ResolveGranularity.Service == ResolveGranularity { + ups.Nodes = apisixv1.UpstreamNodes{ + { + Host: svcClusterIP, + Port: int(svcPort), + Weight: translation.DefaultWeight, + }, + } + } + ups.Name = apisixv1.ComposeUpstreamName(namespace, backend.Name, "", svcPort, ResolveGranularity) ups.ID = id.GenID(ups.Name) return ups, nil } @@ -488,33 +485,29 @@ func (t *translator) translateDefaultUpstreamFromIngressV1beta1(namespace string return ups } -func (t *translator) translateUpstreamFromIngressV1beta1(namespace string, svcName string, svcPort intstr.IntOrString) (*apisixv1.Upstream, error) { - var portNumber int32 - if svcPort.Type == intstr.String { - svc, err := t.ServiceLister.Services(namespace).Get(svcName) - if err != nil { - return nil, err - } - for _, port := range svc.Spec.Ports { - if port.Name == svcPort.StrVal { - portNumber = port.Port - break - } - } - if portNumber == 0 { - return nil, &translation.TranslateError{ - Field: "service", - Reason: "port not found", - } - } - } else { - portNumber = svcPort.IntVal +func (t *translator) translateUpstreamFromIngressV1beta1(namespace string, svcName string, svcPort intstr.IntOrString, ResolveGranularity string) (*apisixv1.Upstream, error) { + if ResolveGranularity == "" { + ResolveGranularity = types.ResolveGranularity.Endpoint } - ups, err := t.TranslateService(namespace, svcName, "", portNumber) + svcClusterIP, svcClusterPort, err := t.GetServiceClusterIPAndPort(svcName, svcPort.StrVal, svcPort.IntVal, namespace, ResolveGranularity) if err != nil { return nil, err } - ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, "", portNumber, types.ResolveGranularity.Endpoint) + + ups, err := t.TranslateService(namespace, svcName, "", svcClusterPort) + if err != nil { + return nil, err + } + if types.ResolveGranularity.Service == ResolveGranularity { + ups.Nodes = apisixv1.UpstreamNodes{ + { + Host: svcClusterIP, + Port: int(svcClusterPort), + Weight: translation.DefaultWeight, + }, + } + } + ups.Name = apisixv1.ComposeUpstreamName(namespace, svcName, "", svcClusterPort, ResolveGranularity) ups.ID = id.GenID(ups.Name) return ups, nil } @@ -654,3 +647,46 @@ func SafeEncodeString(s string, limit int) string { } return string(r) } + +func (t *translator) GetServiceClusterIPAndPort(serviceName string, servicePortName string, servicePort int32, ns string, ResolveGranularity string) (string, int32, error) { + svc, err := t.ServiceLister.Services(ns).Get(serviceName) + if err != nil { + return "", 0, err + } + svcPort := int32(-1) + if ResolveGranularity == "service" && svc.Spec.ClusterIP == "" { + log.Errorw("Ingress refers to a headless service but want to use the service level resolve granularity", + zap.Any("namespace", ns), + zap.Any("service", svc), + ) + return "", 0, errors.New("Ingress conflict headless service and backend resolve granularity") + } + + Type := intstr.Int + if servicePortName != "" { + Type = intstr.String + } +loop: + for _, port := range svc.Spec.Ports { + switch Type { + case intstr.Int: + if servicePort == port.Port { + svcPort = port.Port + break loop + } + case intstr.String: + if servicePortName == port.Name { + svcPort = port.Port + break loop + } + } + } + if svcPort == -1 { + log.Errorw("Ingress refers to non-existent Service port", + zap.String("namespace", ns), + zap.Int32("port", servicePort), + ) + return "", 0, err + } + return svc.Spec.ClusterIP, svcPort, nil +}