SoFunction
Updated on 2025-04-20

Systematically explain the best practices of Apache Kafka message management and exception handling

introduction

As the core component of the distributed stream processing platform, Apache Kafka is widely used in real-time data pipelines, log aggregation and event-driven architectures. However, in actual use, developers often encounter problems such as difficulty in message cleaning and abnormal consumption formats. This article combines real cases to systematically explain the best practices of Kafka message management and exception handling, covering:

  • How to delete/modify Kafka messages?
  • How to fix the consumer side error (data format mismatch)?
  • Java/Python Code Samples and Command Line Operation Guide

Part 1: Kafka Message Management – ​​Delete and Modify

1.1 Kafka message immutability principle

Kafka's core design is immutable logs, and the written messages cannot be modified or deleted directly. However, it can be implemented indirectly by:

method principle Applicable scenarios Code/Command Example
Log Compaction Keep the latest news from the same key Need logical deletion =compact+ Send new message to overwrite
Rebuild Topic Filter data and write to new Topic Must be physically deleted kafka-console-consumer + grep + kafka-console-producer
Adjustment Retention Shorten retention time triggers automatic cleaning Quickly clean up the entire Topic --alter --add-config =1000

1.1.1 Log Compaction Example

// Producer: Send a message with a Key, and subsequently overwrite the old valueProperties props = new Properties();
("", "kafka-server:9092");
("", "");
("", "");

Producer<String, String> producer = new KafkaProducer<>(props);
(new ProducerRecord<>("ysx_mob_log", "key1", "new_value")); // Old message covering key1();

1.2 Two ways to physically delete messages

Method 1: Rebuild Topic

# Consumption of the original Topic, filter the wrong data and write the new Topic \
  --bootstrap-server kafka-server:9092 \
  --topic ysx_mob_log \
  --from-beginning \
  | grep -v "BAD_DATA" \
  |  \
    --bootstrap-server kafka-server:9092 \
    --topic ysx_mob_log_clean

Method 2: Manually delete Offset (high risk)

// Use KafkaAdminClient to delete the specified Offset (Java example)try (AdminClient admin = (props)) {
    Map<TopicPartition, RecordsToDelete> records = new HashMap<>();
    (new TopicPartition("ysx_mob_log", 0), (100L));
    (records).all().get(); // Delete the message of Offset <100 of Partition 0}

Part 2: Consumer format exception handling

2.1 Common error reporting scenarios

Deserialization failed: The message format does not match the Deserializer set by the consumer.

Data pollution: Producers write illegal data (such as non-JSON strings).

Schema conflict: Schema changes for Avro/Protobuf are not compatible.

2.2 Solution

Scheme 1: Skip the error message

 \
  --bootstrap-server kafka-server:9092 \
  --topic ysx_mob_log \
  --formatter "" \
  --property =true \
  --property = \
  --skip-message-on-error  # Key parameters

Solution 2: Custom deserialization logic (Java)

public class SafeDeserializer implements Deserializer&lt;String&gt; {
    @Override
    public String deserialize(String topic, byte[] data) {
        try {
            return new String(data, StandardCharsets.UTF_8);
        } catch (Exception e) {
            ("Bad message: " + (data));
            return null; // Return to null will be skipped by the consumer        }
    }
}

// Consumer configuration("", "");

Solution 3: Repair the producer data format

// Producer ensures that the write is legal JSONObjectMapper mapper = new ObjectMapper();
String json = (new MyData(...)); // Serialize with Jackson(new ProducerRecord&lt;&gt;("ysx_mob_log", json));

Part 3: Complete practical cases

Scene description

Topic: ysx_mob_log

Problem: Some messages are binary data (non JSON) errors during consumption.

Goal: Clean up illegal messages and repair consumer side.

Operation steps

1. Identify the Offset of the error message

 \
  --bootstrap-server kafka-server:9092 \
  --topic ysx_mob_log \
  --property =true \
  --property =false \
  --offset 0 --partition 0
# Output example: offset=100, value=[B@1a2b3c4d

2. Reconstruct Topic to filter illegal data

# Python Consumer Filters Binary Datafrom kafka import KafkaConsumer
consumer = KafkaConsumer(
    'ysx_mob_log',
    bootstrap_servers='kafka-server:9092',
    value_deserializer=lambda x: ('utf-8') if (b'{') else None
)
for msg in consumer:
    if : print()  # Only handle legal JSON

3. Fix the producer code

// Producer compulsory verification of data formatpublic void sendToKafka(String data) {
    try {
        new ObjectMapper().readTree(data); // Verify whether it is legal JSON        (new ProducerRecord&lt;&gt;("ysx_mob_log", data));
    } catch (Exception e) {
        ("Invalid JSON: {}", data);
    }
}

Summarize

Question Type Recommended plan Key tools/code
Delete a specific message Log Compaction or Rebuild Topic ()
Consumption format abnormal Custom deserialization or skip messages SafeDeserializer--skip-message-on-error
Data source governance Producer adds verification logic Jackson Serialization, Schema Registry

Core principles:

  • Immutable logs are the cornerstone of Kafka, and are prioritized to solve problems by reconstructing data streams or logical filtering.
  • Use it with caution in the production environmentdelete-records, may destroy data consistency.
  • Schema Registry (such as Avro) is recommended to avoid format conflicts.

This is the article about the best practices of Apache Kafka message management and exception handling. For more related contents of Kafka message management and exception handling, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!