- Apache Kafka architecture, design principles, and purposes
- Distributed Systems - Scalability, Fault Tolerance, High Availability
- Primary functions of: Producer, Consumer, Broker
- Meaning of “immutable” log
- Meaning of “committed”
- Topics, Partitions
- Essential services of Apache Zookeeper
- Replication, Leaders, Followers
- Kafka Messages, structure, make-up, metadata
- Kafka Controller
- Exactly Once Semantics
- Replication is fundamental to Kafka's architecture.
- Replication is essential because it enables Kafka to ensure availability and durability in the event of individual node failures.
- Distributed: Kafka is designed to be distributed, allowing it to scale horizontally by adding more brokers to the cluster.
- Partitioning: Kafka topics are divided into partitions, allowing for parallel processing and increased throughput.
- Fault Tolerance: Kafka provides fault tolerance through partition replication, ensuring that data remains available even if a broker fails.
- Scalability: Kafka can scale horizontally by adding more brokers to the cluster, increasing the capacity to handle more partitions and consumers.
- Durability: Kafka stores messages on disk, providing durability and allowing consumers to replay messages if needed.
- High Throughput: Kafka is optimized for high throughput, making it suitable for use cases that require processing large volumes of data.
- Low Latency: Kafka offers low-latency message delivery, making it suitable for real-time data processing applications.
- Reliability: Kafka guarantees message delivery and ordering, ensuring that messages are not lost or duplicated.
- Kafka Guarantees:
- Message ordering within a partition.
- Produceed messages are considered "commited" when they are written to the partition on all of its in-sync replicas (though they may not necessarily be flushed to disk).
- Commited messages will not be lost, even if a broker fails at least one in-sync replica remains alive.
- Consumer can only read commited messages.
- Kafka Guarantees:
- Scalability in Kafka refers to the ability of the Kafka cluster to handle increased loads by adding more brokers, topics, or partitions.
-
Topic Partitioning:
- Kafka topics are divided into partitions, allowing for parallel processing and increased throughput.
- Each partition can be replicated across multiple brokers for fault tolerance.
-
Horizontal Scalability:
- Adding more brokers to a Kafka cluster increases the capacity to handle more partitions and consumers.
- Producers and consumers can be scaled independently based on their load.
- Partition Count: Choose an optimal number of partitions for a topic based on expected load and consumer parallelism.
- Consumer Groups: Utilize consumer groups to distribute the load among multiple consumers, allowing them to read from different partitions simultaneously.
- Fault Tolerance in Kafka ensures that the system continues to operate correctly even in the event of broker failures or other component failures.
-
Replication:
- Each partition can be configured with a replication factor, which defines how many copies of the partition exist across different brokers.
- If a broker fails, other brokers with replicas can take over, ensuring data availability.
-
In-Sync Replicas (ISR):
- Replicas that are fully caught up with the leader are part of the ISR.
- Kafka will only accept writes if the number of in-sync replicas meets the
min.insync.replicas
configuration.
- Set Appropriate Replication Factors: Use a replication factor of at least 3 for production environments to ensure high availability and fault tolerance.
- Monitor ISR: Regularly monitor the ISR to ensure that replicas are in sync with the leader.
- High Availability in Kafka ensures that the system remains accessible and operational with minimal downtime.
-
Broker Redundancy:
- Deploy multiple brokers in a Kafka cluster to prevent a single point of failure.
- Use rack awareness to spread replicas across different racks or availability zones.
-
Automatic Leader Election:
- Kafka automatically elects a new leader for a partition if the current leader fails, ensuring continued availability.
- Cluster Monitoring: Implement monitoring tools (e.g., Confluent Control Center, Prometheus) to track the health and performance of the Kafka cluster.
- Configuration Tuning: Optimize configurations such as
unclean.leader.election.enable
to control how leader elections are handled during broker failures.
Apache Kafka provides built-in client APIs designed to help developers create applications that interact seamlessly with Kafka.
Kafka uses a binary wire protocol, allowing applications to read from or write to Kafka by transmitting the appropriate byte sequences to Kafka’s network port.
Steps of a Kafka ProducerRecord
Workflow
-
Producer Sends Data:
The producer creates aProducerRecord
to write data to Kafka. -
Specifying Details:
- The
ProducerRecord
includes the topic and the value to send. - Optionally, the producer can also specify a key and/or a partition.
- The
-
Serialization:
Once theProducerRecord
is sent, the producer first serializes the key and value into ByteArrays to ensure they can be transmitted over the network. -
Partition Assignment:
- If a partition is specified in the
ProducerRecord
, the partitioner bypasses selection and directly uses the specified partition. - If no partition is specified, the partitioner selects one, typically using the key from the
ProducerRecord
(if provided). The producer now knows the topic and partition for the record.
- If a partition is specified in the
-
Batching:
The producer adds the record to a batch of records destined for the same topic and partition. -
Sending to Brokers:
A separate thread handles the transmission of these batches to the appropriate Kafka brokers. -
Broker Response:
- Once the broker receives the messages, it sends back a response.
- If successful, it returns a
RecordMetadata
object, including the topic, partition, and offset of the record within the partition. - If an error occurs, the broker sends an error response. The producer may retry sending the record several times before ultimately giving up and returning an error.
- A Kafka producer has three mandatory properties:
bootstrap.servers
: List of host:port pairs of brokers that the producer will use to establish initial connection to the Kafka cluster.key.serializer
: Name of a class that will be used to serialize the keys of the records we will produce to Kafka. The producer will use this class to serialize the key object to a byte array. Setting key.serializer is required even if you intend to send only values.value.serializer
: Name of a class that will be used to serialize the values of the records we will pro‐ duce to Kafka. Class that will serialize the message key object to a byte array, you set value.serializer to a class that will serialize the message value object.
- After creating a producer instance, e.g.,
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
, we can begin sending messages.-
There are three main approaches to sending messages:
-
Fire-and-Forget:
- The producer sends a message to the server without waiting for confirmation, effectively ignoring whether the message was successfully delivered or not.
-
Synchronous Send:
- The
send()
method returns aFuture
object. We use theget()
method to block execution until the response is received, allowing us to confirm whether the message was successfully delivered.
- The
-
Asynchronous Send:
- The
send()
method is called with a callback function. This function is invoked when the Kafka broker sends a response, enabling non-blocking confirmation of the message's status.
- The
-
-
Some parameters have a significant impact on memory use, performance, and reliability of the producers.
acks
: Controls the number of partition replicas that must acknowledge receiving the record before the producer considers the write operation successful.acks=0
: The producer does not wait for a response from the broker before assuming the message was sent successfully. As a result, if an issue occurs and the broker does not receive the message, the producer remains unaware, and the message is lost.acks=1
: The producer receives a success response from the broker as soon as the leader replica acknowledges receipt of the message. (but not necessarily synced to disk).- This setting provides a balance between reliability and performance, as the producer receives confirmation that the message was received by the leader replica.
- However, if the leader replica fails before the message is replicated to the followers, the message will be lost.
acks=all
: The producer receives a success response from the broker only after all in-sync replicas have acknowledged receipt of the message.
buffer.memory
: This configuration determines the amount of memory the producer allocates for buffering messages that are waiting to be sent to brokers.compression.type
: By default, messages are transmitted without compression. However, this parameter can be adjusted to utilize snappy, gzip, or lz4 compression algorithms, which will compress the data before it is sent to the brokers.retries
: The value of theretries
parameter determines the number of attempts the producer will make to resend the message before it ultimately gives up and informs the client of a problem. If it's a trasitient error message from the server.batch.size
: This parameter regulates the amount of memory, measured in bytes (not in messages!), allocated for each batch.linger.ms
: Thelinger.ms
parameter determines the wait time for additional messages before sending the current batch. TheKafkaProducer
will send a batch of messages either when the current batch reaches its capacity or when thelinger.ms
time limit is met. By default, the producer transmits messages as soon as a sender thread is available, even if only one message is in the batch. By settinglinger.ms
to a value greater than 0, we instruct the producer to pause for a few milliseconds to allow for additional messages to be added to the batch before it is sent to the brokers. Although this approach increases latency, it also enhances throughput, as sending more messages at once reduces the overhead per message.client.id
: This can be any string, which will help the brokers identify messages sent from the client. It is utilized for logging and metrics, as well as for managing quotas.max.in.flight.requests.per.connection
: This parameter regulates the number of messages the producer will transmit to the server without waiting for responses. A higher setting can enhance throughput while increasing memory usage, but excessively high values may reduce throughput by making batching less efficient. Setting this value to 1 ensures that messages are written to the broker in the exact order they were sent, even in the event of retries.timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms
: These parameters dictate how long the producer will wait for a response from the server when sending data (request.timeout.ms
), as well as when fetching metadata, such as the current leaders for the partitions to which we are writing (metadata.fetch.timeout.ms
).max.block.ms
: This parameter determines how long the producer will block when invokingsend()
and when explicitly requesting metadata usingpartitionsFor()
.max.request.size
: This setting controls the size of a produce request made by the producer. It limits both the maximum size of a single message that can be sent and the total number of messages the producer can include in one request. For instance, with a default maximum request size of 1 MB, the largest message allowed is 1 MB, or the producer can batch up to 1,000 messages that are each 1 KB in size into a single request. Additionally, the broker enforces its own maximum limit on message size (message.max.bytes
). It is generally advisable to align these configurations to avoid situations where the producer attempts to send messages that the broker will reject due to size constraints.receive.buffer.bytes, send.buffer.bytes
: These settings specify the sizes of the TCP send and receive buffers utilized by the sockets during data transmission and reception. If these values are set to -1, the operating system's default settings will be applied. It is advisable to increase these buffer sizes when producers or consumers are communicating with brokers located in a different data center, as those network connections usually experience higher latency and lower bandwidth.
- Custom Serializer: Instead of developing a custom serializer library from scratch, we recommend utilizing existing generic serialization libraries and their serializers and deserializers, such as JSON, Apache Avro, Thrift, or Protobuf.
- Avro: Key Feature: One of the most notable features of Avro, which makes it well-suited for use in a messaging system like Kafka, is that when the application producing messages transitions to a new schema, the applications consuming the data can continue to process messages without needing any changes or updates.
- A consumer reads data from Kafka.
- Rebalance: Moving partition ownership from one consumer to another is referred to as a rebalance.
- Consumers heartbeats: Consumers maintain membership in a consumer group and their ownership of assigned partitions by sending heartbeats to a Kafka broker designated as the group coordinator.
- poll() method: The consumer uses the
poll()
method to fetch records from Kafka. - .wakeup(): This method is used to interrupt a blocking call, such as
poll()
, and throw aWakeupException
. This is useful when you want to shut down a consumer or stop it from blocking indefinitely.
kakfa-console-producer.sh
: This tool allows you to produce messages to a Kafka topic.kafka-console-producer --producer.config client.properties --bootstrap-server localhost:9093 --topic kafka-security-topic
: This command produces messages to a Kafka topic.
group.id
: A unique string that identifies the consumer group this consumer belongs to.fetch.min.bytes
: The minimum amount of data the server should return for a fetch request. If insufficient data is available, the request will waitmin.fetch.bytes
until the minimum amount of data is available.fetch.max.wait.ms
: By configuringfetch.min.bytes
, you instruct Kafka to wait until it has sufficient data to send before responding to the consumer. Thefetch.max.wait.ms
parameter allows you to control how long Kafka will wait. By default, Kafka waits up to 500 ms, which can introduce up to 500 ms of additional latency if there isn’t enough data available in the Kafka topic to meet the minimum data requirement. To limit potential latency—especially if there are SLAs governing the maximum latency of the application—you can setfetch.max.wait.ms
to a lower value. For example, if you setfetch.max.wait.ms
to 100 ms andfetch.min.bytes
to 1 MB, Kafka will receive a fetch request from the consumer and will respond with data either when it has 1 MB to return or after 100 ms, whichever occurs first.max.partition.fetch.bytes
: The maximum amount of data the server should return for a fetch request. This setting can be used to control the maximum amount of data the consumer will receive in a single request. If the volume of data returned by a singlepoll()
is very large, it may take the consumer a longer time to process the data. This can cause delays in reaching the next iteration of the poll loop, potentially leading to a session timeout. To address this issue, you have two options: either lowermax.partition.fetch.bytes
or increase the session timeout.session.timeout.ms
: The maximum amount of time the consumer can be out of contact with the broker before it is considered dead and its partitions are reassigned to another consumer in the group.auto.offset.reset
: This parameter determines what happens when there is no initial offset or the current offset no longer exists on the server. The default behavior is to reset the offset to the latest offset. However, you can also set it to the earliest offset or to none, in which case an error will be thrown.enable.auto.commit
: This parameter controls whether the consumer’s offset is automatically committed to the broker. If set to true, the consumer will automatically commit the offset periodically. If set to false, the consumer must manually commit the offset.partition.assignment.strategy
: This parameter allows you to specify the strategy used by the consumer group coordinator to assign partitions to consumers. The default strategy isorg.apache.kafka.clients.consumer.RangeAssignor
, which assigns partitions based on the range of partition IDs. You can also use theorg.apache.kafka.clients.consumer.RoundRobinAssignor
, which assigns partitions in a round-robin fashion.client.id
: This can be any string, which will help the brokers identify messages sent from the client. It is utilized for logging and metrics, as well as for managing quotas.max.poll.records
: This parameter controls the maximum number of records returned in a single call topoll()
. By adjusting this value, you can control the amount of data the consumer processes in each iteration of the poll loop. If the volume of data returned by a singlepoll()
is very large, it may take the consumer a longer time to process the data. This can cause delays in reaching the next iteration of the poll loop, potentially leading to a session timeout. To address this issue, you have two options: either lowermax.partition.fetch.bytes
or increase the session timeout.receive.buffer.bytes and send.buffer.bytes
: These settings specify the sizes of the TCP send and receive buffers utilized by the sockets during data transmission and reception. If these values are set to -1, the operating system's default settings will be applied. It is advisable to increase these buffer sizes when producers or consumers are communicating with brokers located in a different data center, as those network connections usually experience higher latency and lower bandwidth.
kafka-consumer-groups.sh --bootstrap-server localhost:9093 --describe --group <GROUP-ID>
: [Describe example] This tool allows you to list, describe, and delete consumer groups.- Outout previous command:
GROUP-ID
: The ID of the consumer group.TOPIC
: The topic to which the consumer group is subscribed.PARTITION
: The partition number.CURRENT-OFFSET
: The current offset of the consumer group. This is the position of the consumer in the partition.LOG-END-OFFSET
: The log end offset of the partition. This is the offset of the last message in the partition committed to the broker log.LAG
: The difference between the log end offset and the current offset.OWNER
: The ID of the consumer.
- Outout previous command:
kafka-console-consumer.sh
: This tool allows you to consume messages from a Kafka topic.
- Immutable Log: Kafka maintains an immutable log of messages for each partition. Once a message is written to a partition, it cannot be modified or deleted. This ensures that the order of messages is preserved and that messages are not lost or altered.
- Append-Only: Messages are appended to the end of the log, and once written, they cannot be changed. This allows consumers to read messages in the order they were written and ensures that messages are not lost or modified.
- Produced messages are deemed "committed" when they are written to the partition on all of its in-sync replicas (though they may not necessarily be flushed to disk). Producers can choose to receive acknowledgments for sent messages when the message is fully committed, when it is written to the leader, or when it is sent over the network.
- This implies that we have a mechanism to monitor which records have been accessed by a consumer within the group.
- The action of updating the current position in the partition is referred to as a commit.
- Committed Offset: The offset of the last record that was successfully read by a consumer and processed by the application.
- Consumer Offset: The offset of the next record that the consumer will read from the partition.
- Offset Commit: The action of updating the current position in the partition to the committed offset.
- Topics are the primary mechanism for organizing and categorizing messages in Kafka.
- Retention policy:
- Compaction/Compacted: A topic can be configured for compaction, which retains only the latest message for each key. This is useful for maintaining the latest state of a record, such as the current balance of an account.
- Clean: The messages that have been compacted before.
- Dirty: The messages that have been written after the last compaction.
- The compact policy never compacts the current segment. Messages are eligible for compaction only in inactive segments.
- Compaction can affect the read and write performance of a topic.
- Delete: The default retention policy is to delete messages after a certain period of time.
- Compaction/Compacted: A topic can be configured for compaction, which retains only the latest message for each key. This is useful for maintaining the latest state of a record, such as the current balance of an account.
- Retention policy:
- Apache Kafka preserves the order of messages within a partition.
- If maintaining order is essential, it is recommended to set
in.flight.requests.per.session=1
. This ensures that while a batch of messages is being retried, no additional messages will be sent, as this could disrupt the correct order. - When a topic is created, Kafka first determines how to allocate the partitions among the brokers.
- Partitioning is the process of dividing a topic into multiple partitions, each of which can be replicated across multiple brokers.
- Partition allocation:
- It spread replicas evenly among brokers
- Each partition, each replica is on a different broker
- If the brokers have rack information, Kafka will assign the replicas for each partition to different racks whenever possible. This approach ensures that an event leading to downtime for an entire rack does not result in complete unavailability of the partitions.
- The allocation of partitions to brokers does not consider available space or existing load; however, the allocation of partitions to disks does take the number of partitions into account.
- More partitions allow greater parallelism for consumption, but this will also result in more files across the brokers.
num.partitions
(Broker config) The default number of log partitions per topic
- Automatically migrating data to new machines
- Tool: Inside the Kafka Tools: kafka-reassign-partitions.sh
- It's a good idea to create a manual reassignment plan if we have large topics, since that tools doens't take in account the size of the topics.
- Also doesn't provide a plan to reduce the number of partitions to migrate from brokers to brokers.
- So, maybe we endup trusting that tool blindly and we'll strees our Kafka cluster for nothing.
kafka-leader-election.sh
: This tool allows you to trigger a leader election for a specific partition.- This tells the cluster controller to select the ideal leader for partitions.
- This is useful when a broker fails, and you want to ensure that the partition leaders are correctly reassigned.
kafka-reassign-partitions.sh
: This tool allows you to reassign partitions between brokers.- This is useful when you want to balance the load across brokers or move partitions to new brokers.
- This command can be use to verify the status of the reassignment.
- It's a best practice to save first the current partition assignment before running this command in order to be able to rollback if something goes wrong.
- Partition reassigment has a significant impact on the performance of the cluster, so it's recommended to do it during off-peak hours. Breaking reassignment into smaller batches can help to reduce the impact on the cluster.
- This command also allows to change the replication factor of a topic.
- Leader replica: Each partition has one replica designated as the leader. All produce and consume requests are routed through the leader to ensure consistency.
- Another responsibility of the leader is to determine which follower replicas are up-to-date with it.
auto.leader.rebalance.enable=true
: This parameter allows Kafka to automatically rebalance leaders among the brokers in the cluster. If a broker fails, the leader for the partitions on that broker will be reassigned to another broker. Preferred Leaders: The first replica in the list is always designated as the preferred leader. This remains true regardless of who the current leader is, even if the replicas have been reassigned to different brokers using the replica reassignment tool.- Produce requests and fetch requests must be sent to the leader replica of a partition.
- Follower replica: The remaining replicas are referred to as followers. They replicate the data from the leader to ensure fault tolerance and high availability.
- Their main responsibility is to replicate messages from the leader and remain up-to-date with the latest messages that the leader has.
- If a leader replica for a partition crashes, one of the follower replicas will be promoted to serve as the new leader for that partition.
- Out of Sync: If a replica has not requested a message for more than 10 seconds, or if it has requested messages but has not caught up to the most recent message in over 10 seconds, it is deemed out of sync. Should a replica fail to keep pace with the leader, it will not be eligible to become the new leader in the event of a failure, as it does not contain all the messages.
- In-Sync Replicas (ISR): Replicas that are fully caught up with the leader are part of the ISR. Kafka will only accept writes if the number of in-sync replicas meets the
min.insync.replicas
configuration.- Only in-sync replicas are eligible to be elected as partition leaders if the current leader fails.
- To stay in sync with the leader, the replicas send Fetch requests to the leader, which are the same type of requests that consumers use to consume messages. In response to these requests, the leader sends the messages to the replicas. These Fetch requests include the offset of the next message that the replica wishes to receive and are always processed in order.
replica.lag.time.max.ms
: This parameter specifies the maximum time that a replica can lag behind the leader before it is considered out of sync. If a replica does not request a message from the leader within this time frame, it is considered out of sync and will be removed from the ISR.replica.lag.time.max.ms
: This parameter specifies the maximum time that a replica can lag behind the leader before it is considered out of sync. If a replica does not request a message from the leader within this time frame, it is considered out of sync and will be removed from the ISR.
- Kafka messages consist of key-value pairs.
- Ordering:
- All messages with the same key will be sent to the same partition.
- This implies that if a process is reading only a subset of the partitions in a topic, all records associated with a single key will be processed by the same process.
- Messages without key=null no ordering
- When the key is null and the default partitioner is applied, the record will be randomly assigned to one of the available partitions of the topic. A round-robin algorithm will be employed to evenly distribute the messages across the partitions.
- All messages with the same key will be sent to the same partition.
- Tombstone message: A message with a null value that is used to delete a key from a compacted topic.
- cleaner thread: The cleaner thread is responsible for removing tombstone messages from the log.
- Key: An optional field that can be used to determine the partition to which the message will be written.
- Value: The actual data of the message.
- Timestamp: The timestamp of the message.
- Headers: Additional metadata that can be attached to the message.
- Offeset: The offset of the message within the partition.
- Magic Byte: The version of the message format.
- Compression Codec: The codec used to compress the message.
- Key Size: The size of the key in bytes.
- Kafka Index: The index is a file that maps the offset of a message to its physical location in the log. To assist brokers in quickly locating the message for a specific offset.
- The index maps offsets to segment files and their corresponding positions within the file.
- Indexes are also divided into segments, allowing us to delete old index entries when messages are purged.
- Indexes are regenerated automatically, if they are deleted.
- The controller is one of the Kafka brokers that, in addition to performing the standard broker functions, is responsible for electing partition leaders.
- The first broker that starts in the cluster becomes the controller by creating an ephemeral node in ZooKeeper named
/controller
. - In case of a controller failure, the first node to create the new controller in ZooKeeper becomes the new controller, while the other nodes will receive a "node already exists" error.
- The controller is responsible for electing leaders among the partitions and replicas whenever it detects that nodes have joined or left the cluster.
- The controller uses the epoch number to prevent a "split brain" scenario, where two nodes mistakenly believe that each is the current controller.
- 📺 Apache Kafka® Transactions: Message Delivery and Exactly-Once Semantics
- Transactions - Kafka Internals
- Exactly-Once Semantics Are Possible: Here’s How Kafka Does It
- Transactions in Apache Kafka
- Enabling Exactly-Once in Kafka Streams
enable.idempotence
: This parameter ensures that the producer sends messages to the broker with exactly-once semantics.transactional.id
: This parameter is used to identify the producer in a transaction. It is essential to set this parameter when using transactions.- Exactly-Once Semantics: This guarantees that each message is processed exactly once, even if there are failures during processing.
- At-Least-Once Semantics: This guarantees that each message is processed at least once, but it may be processed multiple times.
- At-Most-Once Semantics: This guarantees that each message is processed at most once, but it may not be processed at all.
- Transactional Guarantees: Kafka provides exactly-once semantics by using transactions. A transaction is a sequence of read and write operations that are executed as a single unit of work. Either all operations in the transaction are completed successfully, or none of them are.
- Producer Transactions: To achieve exactly-once semantics, the producer must use transactions. This allows the producer to send messages to Kafka and commit the transaction only if all messages are successfully written to the broker.