Consume messages from Kafka and apply them to Redis in the form of commands.
The following commands are supported at this time:
Support for additional write-based commands will be added in the future.
Records must adhere to a specific schema in order to be processed by the connector.
Each Redis command type has its own schema. This can be an issue if you are utilizing a schema registry because the default subject naming strategy assumes that topics will only have one schema. To overcome this, you must configure the subject naming strategy to be either RecordNameStrategy
or TopicRecordNameStrategy
. Check out the official documentation and this Confluent blog post for more information.
Keys are ignored.
{
"namespace": "io.github.jaredpetersen.kafkaconnectredis",
"name": "RedisSetCommand",
"type": "record",
"fields": [
{
"name": "key",
"type": "string"
},
{
"name": "value",
"type": "string"
},
{
"name": "expiration",
"type": [
"null",
{
"name": "RedisSetCommandExpiration",
"type": "record",
"fields": [
{
"name": "type",
"type": {
"name": "RedisSetCommandExpirationType",
"type": "enum",
"symbols": [
"EX",
"PX",
"KEEPTTL"
]
}
},
{
"name": "time",
"type": [
"null",
"long"
]
}
]
}
],
"default": null
},
{
"name": "condition",
"type": [
"null",
{
"name": "RedisSetCommandCondition",
"type": "enum",
"symbols": [
"NX",
"XX",
"KEEPTTL"
]
}
],
"default": null
}
]
}
{
"name": "io.github.jaredpetersen.kafkaconnectredis.RedisSetCommand",
"type": "struct",
"fields": [
{
"field": "key",
"type": "string",
"optional": false
},
{
"field": "value",
"type": "string",
"optional": false
},
{
"field": "expiration",
"type": "struct",
"fields": [
{
"field": "type",
"type": "string",
"optional": false
},
{
"field": "time",
"type": "int64",
"optional": true
}
],
"optional": true
},
{
"field": "condition",
"type": "string",
"optional": true
}
]
}
{
"namespace": "io.github.jaredpetersen.kafkaconnectredis",
"name": "RedisExpireCommand",
"type": "record",
"fields": [
{
"name": "key",
"type": "string"
},
{
"name": "seconds",
"type": "long"
}
]
}
{
"name": "io.github.jaredpetersen.kafkaconnectredis.RedisExpireCommand",
"type": "struct",
"fields": [
{
"field": "key",
"type": "string",
"optional": false
},
{
"field": "seconds",
"type": "int64",
"optional": false
}
]
}
{
"namespace": "io.github.jaredpetersen.kafkaconnectredis",
"name": "RedisExpireatCommand",
"type": "record",
"fields": [
{
"name": "key",
"type": "string"
},
{
"name": "timestamp",
"type": "long"
}
]
}
{
"name": "io.github.jaredpetersen.kafkaconnectredis.RedisExpireatCommand",
"type": "struct",
"fields": [
{
"field": "key",
"type": "string",
"optional": false
},
{
"field": "timestamp",
"type": "int64",
"optional": false
}
]
}
{
"namespace": "io.github.jaredpetersen.kafkaconnectredis",
"name": "RedisPexpireCommand",
"type": "record",
"fields": [
{
"name": "key",
"type": "string"
},
{
"name": "milliseconds",
"type": "long"
}
]
}
{
"name": "io.github.jaredpetersen.kafkaconnectredis.RedisPexpireCommand",
"type": "struct",
"fields": [
{
"field": "key",
"type": "string",
"optional": false
},
{
"field": "milliseconds",
"type": "int64",
"optional": false
}
]
}
{
"namespace": "io.github.jaredpetersen.kafkaconnectredis",
"name": "RedisSaddCommand",
"type": "record",
"fields": [
{
"name": "key",
"type": "string"
},
{
"name": "values",
"type": {
"type": "array",
"items": "string"
}
}
]
}
{
"name": "io.github.jaredpetersen.kafkaconnectredis.RedisSaddCommand",
"type": "struct",
"fields": [
{
"field": "key",
"type": "string",
"optional": false
},
{
"field": "values",
"type": "array",
"items": {
"type": "string"
},
"optional": false
}
]
}
{
"namespace": "io.github.jaredpetersen.kafkaconnectredis",
"name": "RedisGeoaddCommand",
"type": "record",
"fields": [
{
"name": "key",
"type": "string"
},
{
"name": "values",
"type": {
"type": "array",
"items": {
"name": "RedisGeoaddCommandGeolocation",
"type": "record",
"fields": [
{
"name": "longitude",
"type": "double"
},
{
"name": "latitude",
"type": "double"
},
{
"name": "member",
"type": "string"
}
]
}
}
}
]
}
{
"name": "io.github.jaredpetersen.kafkaconnectredis.RedisGeoaddCommand",
"type": "struct",
"fields": [
{
"field": "key",
"type": "string",
"optional": false
},
{
"field": "values",
"type": "array",
"items": {
"type": "struct",
"fields": [
{
"field": "longitude",
"type": "double",
"optional": false
},
{
"field": "latitude",
"type": "double",
"optional": false
},
{
"field": "member",
"type": "string",
"optional": false
}
]
},
"optional": false
}
]
}
{
"namespace": "io.github.jaredpetersen.kafkaconnectredis",
"name": "RedisArbitraryCommand",
"type": "record",
"fields": [
{
"name": "command",
"type": "string"
},
{
"name": "arguments",
"type": {
"type": "array",
"items": "string"
}
}
]
}
{
"name": "io.github.jaredpetersen.kafkaconnectredis.RedisArbitraryCommand",
"type": "struct",
"fields": [
{
"field": "command",
"type": "string",
"optional": false
},
{
"field": "arguments",
"type": "array",
"items": {
"type": "string"
},
"optional": false
}
]
}
Splitting the workload between multiple tasks is possible via the configuration property tasks.max
. The configured number will exactly determine the number of tasks that are created.
Name | Type | Default | Importance | Description |
---|---|---|---|---|
redis.uri |
string | High | Redis connection information provided via a URI string. | |
redis.cluster.enabled |
boolean | false | High | Target Redis is running as a cluster. |