Table of contents
What is Durability in Kafka
Durability in Kafka is all about reducing the likelihood of losing a message.
Confluent Cloud enforces a replication factor of 3 to ensure data durability .
Producers can control the durability of messages written to Kafka brokers using the
acks config parameter. Although we can use producer
acks configuration to optimize throughput and latency, it is primarily used in the context of durability.
acks=all offers the highest durability guarantee as the leader broker waits for all ISRs to acknowledge saving of a message (in other words, leader waits for a message to get "committed" and become available for reads by consumers). Only after this, the leader broker returns an acknowledgement to the Producer application. Therefore, the message won't be lost as long as at least one ISR remains alive.
The number of ISRs that must acknowledge the saving of a message is defined by
min.insync.replicas (discussed in detail below).
More details on acks config - https://krishnakrmahto.com/configure-kafka-for-high-throughput#heading-producer-acks-for-production-throughput
Minimum In-sync Replicas
The replica brokers of a partition that are "in-sync" with the leader are called in-sync replicas (ISRs). A produced message is replicated across all ISRs before it is made available for reads by the consumer applications.
When a Producer has set
acks=allin its config, minimum in-sync replicas represent the minimum number of ISRs that must acknowledge the saving of a message to the leader broker before the leader broker returns success response to the producer's produce request. This is set through the
min.insync.replicas=1, which means the leader returns a response as soon as it saves the message without waiting for acknowledgements from the ISRs. So there is a chance that we may lose messages if the leader broker goes down before the followers have completed replicating the messages.
Moreover, a broker acknowledges messages that are not persisted to the disk - Kafka relies on replication across the followers for durability. The idea behind this is that having three machines in separate racks or availability zones, each with a copy of the data, is safer than writing the messages to disk on the leader, because simultaneous failures on two different racks or zones are so unlikely .
min.insync.replicasto a higher value in tandem with setting the Producer config
acks=allensures that a produce request was indeed replicated to more than 1 ISRs which strengthens the durability guarantee.
min.insync.replicasshould not be set to be equal to the replication factor of the topic because of the reason discussed in the Durability vs Write Availability trade-off below.
Kindly note that setting any value of
min.insync.replicas > 1is effective only when the Producer
acksconfig is set to
acks=all(discussed in the points above).
Use cases for which occasional loss of messages can be tolerated are not recommended to have the value of this configuration increased from the default 1 .
Durability vs Write Availability
A leader can remove a follower from the ISR list, or a broker itself can go down. If the number of ISRs of a topic partition available is lesser than the configured
min.insync.replicas, then the leader broker will not accept any writes to the partition which consequently manifests as the Producer throwing exception. In other words, the topic partition will become unavailable for writes.
So as the value of
min.insync.replicasis increased, the cluster can tolerate lesser ISR unavailability for writes. For e.g., if number of ISRs = 3 and
min.insync.replicas\=1, then we can tolerate unavailability of 2 ISRs, and if
min.insync.replicas\=2, then we can tolerate unavailability of only 1 ISR without the leader rejecting produce requests.
The maximum possible number of ISRs of a partition can be equal to the replication factor of the topic. However, setting
min.insync.replicas\=replication factor is undesirable because it means a partition can become unavailable for writes even when a single replica dies (since the loss of even one ISR in this case results in the total ISR count becoming lesser than the minimum configured).
In short, increasing
min.insync.replicasincreases durability of messages of a topic, but weakens the write availability of a partition.
Durability vs Read Availability
- Consumers can read from a topic as long as at least the leader broker is available which is always an ISR. Therefore, reads are not affected by the value of
Durability vs Producer Throughput
- Setting a higher value for
min.insync.replicascomes into effect only when
acks=allis set on the Producer side. So the same trade-off as the Producer
acksconfig holds for
min.insync.replicasas well - https://krishnakrmahto.com/configure-kafka-for-high-throughput#heading-trade-offs-3.
Durability vs Producer Latency
- As already seen, setting a higher value for
min.insync.replicascomes into effect only when
acks=allis set on the Producer side. So the same trade-off as the Producer acks holds for
min.insync.replicasas well - https://krishnakrmahto.com/configure-kafka-for-high-throughput#heading-trade-offs-3
Producers can retry sending messages to ensure no data is lost if sending messages to the broker fails.
The number of retries is controlled by the
It is recommended to keep
retriesat its default value (MAX_INT) and tune
delivery.timeout.msinstead to control the retry behaviour .
delivery.timeout.msshould be set as the upper bound for the total time between sending a message and receiving an acknowledgement from the broker, which should also reflect the business requirement of how long a message is valid for .
Things to consider while Configuring Producer Retries
If a Producer application does not receive any response from the leader broker within a specified interval (controlled by the Producer configuration -
request.timeout.ms), then the producer will retry sending a message which although might have been unacknowledged yet persisted in the broker. This results in duplicate messages.
Multiple send requests might be in flight (controlled by Producer configuration -
max.in.flight.requests.per.connection). Messages might get stored in the broker when an older failed request is retried by the Producer, although newer messages might have already been stored in the broker.
Preventing Message Duplication and Message Ordering Issues - Idempotent Producer
Both of the issues are addressed by configuring the Producer as idempotent. This is controlled by the
enable.idempotenceto true, the Producer configuration must also be set to the following values as mentioned below:
max.in.flight.requests.per.connection (<= 5)
retries > 0
acks = all
An Idempotent Producer is assigned a Producer ID by the broker during producer initialization.
Every message sent by an Idempotent Producer is assigned a monotonically increasing sequence number starting with 0 (zero) on topic-partition basis.
The leader broker stores the sequence numbers it has already received from every Producer ID. The broker uses these stored sequence numbers to determine whether a message is:
new → the sequence number of the message = the last sequence number the broker had seen + 1.
duplicate → the sequence number of the message is <= the last sequence number the broker had seen.
out-of-order → the sequence number of the message is > the last sequence number the broker had seen + 1.
If a Producer retries sending a duplicate sequence number, it receives a DUPLICATE_SEQUENCE_NUMBER exception from the broker and the Producer simply considers the message as sent.
If a broker receives out-of-order sequence numbers, then it returns an OUT_OF_ORDER_SEQUENCE_NUMBER exception to the Producer for each such message. After this, the producer requeues the message in increasing order to the buffer so that it can be sent again.
Committing Consumer Offsets
Consumer offsets track which messages have been consumed by a consumer.
Handling unexpected consumer failures is important to make sure that messages don't get lost when they are consumed/processed. This is not an absolute threat to the durability of messages stored in the broker, but the messages effectively seize to exist for the consumer if its offsets are mis-committed during consumer failures.
So it is advised to avoid situations where a consumer application commits an offset, then starts processing a message, and then fails unexpectedly before it finishes. In such cases, the subsequent consumer that replaces the failed consumer will start processing the messages that have offsets greater than the committed offsets.
By default, a consumer commits offsets automatically (controlled by Consumer config
enable.auto.commit) during the poll() request at fixed intervals of 5 seconds (controlled by Consumer config
auto.commit.interval.ms). This offset commit is performed at the time of consumer metadata update (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#poll(org.apache.kafka.common.utils.Timer, boolean)) before fetching new consumer records from the broker. Therefore, the default auto-commit strategy commits only those offsets that were processed till the last poll. This causes no side effects to the durability of messages, although this can cause the consumer to process a message more than once in case of consumer failure with uncommitted but processed offsets.
We can disable auto-commit by setting
enable.auto.committo false, and manually commit the offsets by calling either commit() or commitAsync() methods. As already discussed, we need to ensure that we should try to avoid committing offsets before the corresponding messages are processed if we choose to commit offsets manually.
Kafka - The Definitive Guide
If references to declarative claims made in this doc are not found in the references above, then they may have been taken directly from the Kafka code base - https://github.com/apache/kafka
Did you find this article valuable?
Support Krishna Kumar Mahto by becoming a sponsor. Any amount is appreciated!