introduction
As the core component of the distributed stream processing platform, Apache Kafka is widely used in real-time data pipelines, log aggregation and event-driven architectures. However, in actual use, developers often encounter problems such as difficulty in message cleaning and abnormal consumption formats. This article combines real cases to systematically explain the best practices of Kafka message management and exception handling, covering:
- How to delete/modify Kafka messages?
- How to fix the consumer side error (data format mismatch)?
- Java/Python Code Samples and Command Line Operation Guide
Part 1: Kafka Message Management – Delete and Modify
1.1 Kafka message immutability principle
Kafka's core design is immutable logs, and the written messages cannot be modified or deleted directly. However, it can be implemented indirectly by:
method | principle | Applicable scenarios | Code/Command Example |
---|---|---|---|
Log Compaction | Keep the latest news from the same key | Need logical deletion |
=compact + Send new message to overwrite |
Rebuild Topic | Filter data and write to new Topic | Must be physically deleted |
kafka-console-consumer + grep + kafka-console-producer
|
Adjustment Retention | Shorten retention time triggers automatic cleaning | Quickly clean up the entire Topic | --alter --add-config =1000 |
1.1.1 Log Compaction Example
// Producer: Send a message with a Key, and subsequently overwrite the old valueProperties props = new Properties(); ("", "kafka-server:9092"); ("", ""); ("", ""); Producer<String, String> producer = new KafkaProducer<>(props); (new ProducerRecord<>("ysx_mob_log", "key1", "new_value")); // Old message covering key1();
1.2 Two ways to physically delete messages
Method 1: Rebuild Topic
# Consumption of the original Topic, filter the wrong data and write the new Topic \ --bootstrap-server kafka-server:9092 \ --topic ysx_mob_log \ --from-beginning \ | grep -v "BAD_DATA" \ | \ --bootstrap-server kafka-server:9092 \ --topic ysx_mob_log_clean
Method 2: Manually delete Offset (high risk)
// Use KafkaAdminClient to delete the specified Offset (Java example)try (AdminClient admin = (props)) { Map<TopicPartition, RecordsToDelete> records = new HashMap<>(); (new TopicPartition("ysx_mob_log", 0), (100L)); (records).all().get(); // Delete the message of Offset <100 of Partition 0}
Part 2: Consumer format exception handling
2.1 Common error reporting scenarios
Deserialization failed: The message format does not match the Deserializer set by the consumer.
Data pollution: Producers write illegal data (such as non-JSON strings).
Schema conflict: Schema changes for Avro/Protobuf are not compatible.
2.2 Solution
Scheme 1: Skip the error message
\ --bootstrap-server kafka-server:9092 \ --topic ysx_mob_log \ --formatter "" \ --property =true \ --property = \ --skip-message-on-error # Key parameters
Solution 2: Custom deserialization logic (Java)
public class SafeDeserializer implements Deserializer<String> { @Override public String deserialize(String topic, byte[] data) { try { return new String(data, StandardCharsets.UTF_8); } catch (Exception e) { ("Bad message: " + (data)); return null; // Return to null will be skipped by the consumer } } } // Consumer configuration("", "");
Solution 3: Repair the producer data format
// Producer ensures that the write is legal JSONObjectMapper mapper = new ObjectMapper(); String json = (new MyData(...)); // Serialize with Jackson(new ProducerRecord<>("ysx_mob_log", json));
Part 3: Complete practical cases
Scene description
Topic: ysx_mob_log
Problem: Some messages are binary data (non JSON) errors during consumption.
Goal: Clean up illegal messages and repair consumer side.
Operation steps
1. Identify the Offset of the error message
\ --bootstrap-server kafka-server:9092 \ --topic ysx_mob_log \ --property =true \ --property =false \ --offset 0 --partition 0 # Output example: offset=100, value=[B@1a2b3c4d
2. Reconstruct Topic to filter illegal data
# Python Consumer Filters Binary Datafrom kafka import KafkaConsumer consumer = KafkaConsumer( 'ysx_mob_log', bootstrap_servers='kafka-server:9092', value_deserializer=lambda x: ('utf-8') if (b'{') else None ) for msg in consumer: if : print() # Only handle legal JSON
3. Fix the producer code
// Producer compulsory verification of data formatpublic void sendToKafka(String data) { try { new ObjectMapper().readTree(data); // Verify whether it is legal JSON (new ProducerRecord<>("ysx_mob_log", data)); } catch (Exception e) { ("Invalid JSON: {}", data); } }
Summarize
Question Type | Recommended plan | Key tools/code |
---|---|---|
Delete a specific message | Log Compaction or Rebuild Topic |
、()
|
Consumption format abnormal | Custom deserialization or skip messages |
SafeDeserializer 、--skip-message-on-error
|
Data source governance | Producer adds verification logic | Jackson Serialization, Schema Registry |
Core principles:
- Immutable logs are the cornerstone of Kafka, and are prioritized to solve problems by reconstructing data streams or logical filtering.
- Use it with caution in the production environment
delete-records
, may destroy data consistency. - Schema Registry (such as Avro) is recommended to avoid format conflicts.
This is the article about the best practices of Apache Kafka message management and exception handling. For more related contents of Kafka message management and exception handling, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!