Skip to content

Commit

Permalink
Update kafka-connector chart for async invocations with backpressure
Browse files Browse the repository at this point in the history
Signed-off-by: Han Verstraete (OpenFaaS Ltd) <[email protected]>
  • Loading branch information
welteki committed Jan 10, 2025
1 parent a382352 commit dddbc55
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 37 deletions.
129 changes: 94 additions & 35 deletions chart/kafka-connector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ $ helm repo add openfaas https://openfaas.github.io/faas-netes/

Prepare a custom [values.yaml](values.yaml) with:

* brokerHosts - comma separted list of host:port
* topics - the topics to subscribe to
* replicas - this should match the partition size, so if the size is 3, set this to 3
- brokerHosts - comma separted list of host:port
- topics - the topics to subscribe to
- replicas - this should match the partition size, so if the size is 3, set this to 3

Then you will need to read up on the encryption and authentication options and update the settings accordingly.

Expand All @@ -65,46 +65,105 @@ $ helm repo update && \
## Encryption options

1) TLS off (default)
2) TLS on
1. TLS off (default)
2. TLS on

## Authentication options

1) TLS with SASL using CA from the default trust store
3) TLS with SASL using a custom CA
4) TLS with client certificates
1. TLS with SASL using CA from the default trust store
2. TLS with SASL using a custom CA
3. TLS with client certificates

## Async invocations

The connector can be configured to invoke function asynchronously. This lets you use [OpenFaaS async](https://docs.openfaas.com/reference/async/) features like retries.
To prevent the connector from consuming all Kafka messages at once and submitting them to the OpenFaaS async queue a limit on the number of inflight async invocations can be configured.

Configure the connector for async invocations:

```yaml
# Invoke functions asynchronously.
asyncInvocation: true

async:
# Limit the number of inflight async invocations for the connector.
# A value of 0 indicates no concurrency limit.
maxInflight: 0

# Configure an externally-managed NATS server.
# NATS is used for async invocations and is required when
# setting the 'async.maxInflight' parameter to a value other than 0.
# By default the OpenFaaS embedded nats deployment is used.
# These values should be identical to the configuration in your OpenFaaS deployment values.yaml file
# when external nats is enabled.
nats:
external:
enabled: false
host: ""
port: ""
```
### Reset the inflight concurrency counter
If the inflight counter gets out if sync for some reason, e.g a misconfiguration, network issues, it can be forcefully reset.
The connecter checks if a Lease object exists on startup and resets the counter if the Lease does not exist.
Remove the lease and restart the connector to reset the counter.
1. Remove the lease
```sh
$ kubectl get lease -n openfaas

NAME HOLDER AGE
kafka-connector 18m
```

```sh
kubectl delete lease kafka-connector -n openfaas
```

2. Restart the connector

```sh
kubectl rollout restart deploy/kafka-connector -n openfaas
```

## Configuration

Additional kafka-connector options in `values.yaml`.

| Parameter | Description | Default |
|------------------------|------------------------------------------------------------------------------------------------------------------------------------|--------------------------------|
| `topics` | A single topic or list of comma separated topics to consume. | `faas-request` |
| `replicas` | The number of replicas of this connector, should be set to the size of the partition for the given topic, or a higher lower value. | `1` |
| `brokerHosts` | Host and port for the Kafka bootstrap server, multiple servers can be specified as a comma-separated list. | `kafka:9092` |
| `asyncInvocation` | For long running or slow functions, offload to asychronous function invocations and carry on processing the stream | `false` |
| `upstreamTimeout` | Maximum timeout for upstream function call, must be a Go formatted duration string. | `2m` |
| `rebuildInterval` | Interval for rebuilding function to topic map, must be a Go formatted duration string. | `30s` |
| `gatewayURL` | The URL for the API gateway. | `http://gateway.openfaas:8080` |
| `printResponse` | Output the response of calling a function in the logs. | `true` |
| `printResponseBody` | Output to the logs the response body when calling a function. | `false` |
| `printRequestBody` | Output to the logs the request body when calling a function. | `false` |
| `fullnameOverride` | Override the name value used for the Connector Deployment object. | `` |
| `tls` | Connect to the broker server(s) using TLS encryption | `true` |
| `sasl` | Enable auth with a SASL username/password | `false` |
| `brokerPasswordSecret` | Name of secret for SASL password | `kafka-broker-password` |
| `brokerUsernameSecret` | Name of secret for SASL username | `kafka-broker-username` |
| `caSecret` | Name secret for TLS CA - leave empty to disable | `kafka-broker-ca` |
| `certSecret` | Name secret for TLS client certificate cert - leave empty to disable | `kafka-broker-cert` |
| `keySecret` | Name secret for TLS client certificate private key - leave empty to disable | `kafka-broker-key` |
| `contentType` | Set a HTTP Content Type during function invocation. | `""` |
| `group` | Set the Kafka consumer group name. | `""` |
| `maxBytes` | Set the maximum size of messages from the Kafka broker. | `1024*1024` |
| `sessionLogging` | Enable detailed logging from the consumer group. | `"false"` |
| `initialOffset` | Either newest or oldest. | `"oldest"` |
| `logs.debug` | Print debug logs | `false` |
| `logs.format` | The log encoding format. Supported values: `json` or `console` | `console` |
| Parameter | Description | Default |
| ----------------------- | ---------------------------------------------------------------------------------------------------------------------------------- | ------------------------------ |
| `topics` | A single topic or list of comma separated topics to consume. | `faas-request` |
| `replicas` | The number of replicas of this connector, should be set to the size of the partition for the given topic, or a higher lower value. | `1` |
| `brokerHosts` | Host and port for the Kafka bootstrap server, multiple servers can be specified as a comma-separated list. | `kafka:9092` |
| `asyncInvocation` | Invoke function asychronously and carry on processing the stream | `false` |
| `async.maxInflight` | Limit the number of inflight async invocations for the connector. A value of 0 indicates no concurrency limit. | `0` |
| `nats.external.enabled` | Whether to use an externally-managed NATS server. | `false` |
| `nats.external.host` | The host at which the externally-managed NATS server can be reached | `""` |
| `nats.external.port` | The port at which the externally-managed NATS server can be reached | `""` |
| `upstreamTimeout` | Maximum timeout for upstream function call, must be a Go formatted duration string. | `2m` |
| `rebuildInterval` | Interval for rebuilding function to topic map, must be a Go formatted duration string. | `30s` |
| `gatewayURL` | The URL for the API gateway. | `http://gateway.openfaas:8080` |
| `printResponse` | Output the response of calling a function in the logs. | `true` |
| `printResponseBody` | Output to the logs the response body when calling a function. | `false` |
| `printRequestBody` | Output to the logs the request body when calling a function. | `false` |
| `fullnameOverride` | Override the name value used for the Connector Deployment object. | `""` |
| `tls` | Connect to the broker server(s) using TLS encryption | `true` |
| `sasl` | Enable auth with a SASL username/password | `false` |
| `brokerPasswordSecret` | Name of secret for SASL password | `kafka-broker-password` |
| `brokerUsernameSecret` | Name of secret for SASL username | `kafka-broker-username` |
| `caSecret` | Name secret for TLS CA - leave empty to disable | `kafka-broker-ca` |
| `certSecret` | Name secret for TLS client certificate cert - leave empty to disable | `kafka-broker-cert` |
| `keySecret` | Name secret for TLS client certificate private key - leave empty to disable | `kafka-broker-key` |
| `contentType` | Set a HTTP Content Type during function invocation. | `""` |
| `group` | Set the Kafka consumer group name. | `""` |
| `maxBytes` | Set the maximum size of messages from the Kafka broker. | `1024*1024` |
| `sessionLogging` | Enable detailed logging from the consumer group. | `"false"` |
| `initialOffset` | Either newest or oldest. | `"oldest"` |
| `logs.debug` | Print debug logs | `false` |
| `logs.format` | The log encoding format. Supported values: `json` or `console` | `console` |

Specify each parameter using the `--set key=value[,key=value]` argument to `helm install`. See `values.yaml` for the default configuration.

Expand Down
19 changes: 19 additions & 0 deletions chart/kafka-connector/templates/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ spec:
app: {{ template "connector.name" . }}
component: kafka-connector
spec:
{{- if and .Values.asyncInvocation (gt (int .Values.async.maxInflight) 0) }}
serviceAccountName: {{ template "connector.fullname" . }}
{{- end }}
volumes:
- name: openfaas-license
secret:
Expand Down Expand Up @@ -87,6 +90,12 @@ spec:
- "-key-file=/var/secrets/broker-key/broker-key"
{{- end }}
env:
- name: connector_id
value: "{{template "connector.fullname" . }}"
- name: namespace
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: gateway_url
value: {{ .Values.gatewayURL | quote }}
- name: topics
Expand All @@ -99,6 +108,16 @@ spec:
value: {{ .Values.printRequestBody | quote }}
- name: asynchronous_invocation
value: {{ .Values.asyncInvocation | quote }}
- name: async_max_inflight
value: {{ .Values.async.maxInflight | quote }}
- name: async_callback_url
value: "http://{{ template "connector.fullname" . }}.{{ .Release.Namespace }}:8080/api/v1/callback"
- name: nats_url
{{- if .Values.nats.external.enabled }}
value: "nats://{{ .Values.nats.external.host}}:{{ .Values.nats.external.port }}"
{{- else }}
value: "nats://nats.openfaas:4222"
{{- end }}
{{- if .Values.basic_auth }}
- name: basic_auth
value: "true"
Expand Down
40 changes: 40 additions & 0 deletions chart/kafka-connector/templates/rbac.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{{- if and .Values.asyncInvocation (gt (int .Values.async.maxInflight) 0) }}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ template "connector.fullname" . }}
namespace: {{ .Release.Namespace | quote }}
labels:
app: {{ template "connector.fullname" . }}
component: kafka-connector
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: {{ template "connector.fullname" . }}
namespace: {{ .Release.Namespace | quote }}
labels:
app: {{ template "connector.name" . }}
component: kafka-connector
rules:
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["get", "create"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: {{ template "connector.fullname" . }}
namespace: {{ .Release.Namespace | quote }}
labels:
app: {{ template "connector.fullname" . }}
component: kafka-connector
subjects:
- kind: ServiceAccount
name: {{ template "connector.fullname" . }}
namespace: {{ .Release.Namespace | quote }}
roleRef:
kind: Role
name: {{ template "connector.fullname" . }}
apiGroup: rbac.authorization.k8s.io
{{- end }}
20 changes: 20 additions & 0 deletions chart/kafka-connector/templates/service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: v1
kind: Service
metadata:
labels:
app: {{ template "connector.name" . }}
component: kafka-connector
chart: {{ .Chart.Name }}-{{ .Chart.Version }}
heritage: {{ .Release.Service }}
release: {{ .Release.Name }}
name: {{ template "connector.fullname" . }}
namespace: {{ .Release.Namespace | quote }}
spec:
type: ClusterIP
ports:
- name: http
port: 8080
protocol: TCP
targetPort: 8080
selector:
app: kafka-connector
21 changes: 19 additions & 2 deletions chart/kafka-connector/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ upstreamTimeout: 2m
# interval for rebuilding the map of functions and topics
rebuildInterval: 30s

# Use with slow consumers or long running functions
# Invoke functions asynchronously.
asyncInvocation: false

async:
# Limit the number of inflight async invocations for the connector.
# A value of 0 indicates no concurrency limit.
maxInflight: 0

# 1MB = 1024 bytes * 1024
maxBytes: "1048576"

Expand Down Expand Up @@ -76,6 +81,18 @@ gatewayURL: http://gateway.openfaas:8080
# Basic auth for the gateway
basic_auth: true

# NATS is used for async invocations and is required when
# setting the 'async.maxInflight' parameter to a value other than 0.
nats:
# Configure an externally-managed NATS server.
# When disabled the OpenFaaS embedded nats deployment is used.
# These values should be identical to the configuration in your OpenFaaS deployment values.yaml file
# when external nats is enabled.
external:
enabled: false
host: ""
port: ""

nodeSelector: {}

tolerations: []
Expand Down Expand Up @@ -138,4 +155,4 @@ keySecret: ""

# caSecret: kafka-broker-ca
# certSecret: kafka-broker-cert
# keySecret: kafka-broker-key
# keySecret: kafka-broker-key

0 comments on commit dddbc55

Please sign in to comment.