SpringBoot Logback output log to Kafka
This article implements Logback output logs to Kafka by creating a custom Appender in SpringBoot application.
Configuration related maven dependencies
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="/POM/4.0.0" xmlns:xsi="http:///2001/XMLSchema-instance" xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0."> <modelVersion>4.0.0</modelVersion> <parent> <groupId></groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId></groupId> <artifactId>log2kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <name>log2kafka</name> <description>Demo project for send log to kafka</description> <properties> <>1.8</> </properties> <dependencies> <dependency> <groupId></groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId></groupId> <artifactId>fastjson</artifactId> <version>1.2.60</version> </dependency> <dependency> <groupId></groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId></groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <!-- logbackPlugin --> <dependency> <groupId></groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> <!-- kafkaPlugin --> <dependency> <groupId></groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.1.1</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId></groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
It is not recommended to modify the kafka version in the pom file, as it can easily cause various errors.
Create a kafka tool class for configuring the generator
package .; import ; import ; import ; import ; public class KafkaUtil { public static Producer<String, String> createProducer( String bootstrapServers, String batchSize, String lingerMs, String compressionType, String retries, String maxRequestSize) { // When the configuration item is IS_UNDEFINED, use the default value if (bootstrapServers == null) { bootstrapServers = "localhost:9092"; } if (("IS_UNDEFINED")) { batchSize = "50000"; } if (("IS_UNDEFINED")) { lingerMs = "60000"; } if (("IS_UNDEFINED")) { retries = "3"; } if (("IS_UNDEFINED")) { maxRequestSize = "5242880"; } Properties properties = new Properties(); // kafka address, clusters are separated by commas (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // acks value: // 0: kafka does not return confirmation information, and does not guarantee whether record is received. Because it does not return, the retry mechanism will not take effect // 1: The partition leader confirms that the record is written to the log, but does not guarantee that the information is copied correctly (it is recommended to set to this value) // all: the leader will wait for all information to be synchronized and return to the confirmation information (ProducerConfig.ACKS_CONFIG, "1"); (ProducerConfig.RETRIES_CONFIG, (retries)); // Batch sending, triggering the sending mechanism when the batch size maximum value is reached (batch sending after 10.0) (ProducerConfig.BATCH_SIZE_CONFIG, (batchSize)); // This configuration means that when the number is not reached, data will also be pushed within the specified time. (ProducerConfig.LINGER_MS_CONFIG, (lingerMs)); // Configure cache (ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); if (!("IS_UNDEFINED")) { // Specify compression algorithm (ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType); } // Message size for each request (ProducerConfig.MAX_REQUEST_SIZE_CONFIG, (maxRequestSize)); (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ""); (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ""); return new KafkaProducer<String, String>(properties); } }
package .; import ; import ; import .; import ; import ; import ; import ; import org.; import org.; public class KafkaAppender extends ConsoleAppender<ILoggingEvent> { public static final Logger LOGGER = (); private String bootstrapServers; private String topic; private String batchSize; private String lingerMs; private String compressionType; private String retries; private String maxRequestSize; private String isSend; private Producer<String, String> producer; @Override public String toString() { return "KafkaAppender{" + "bootstrapServers='" + bootstrapServers + '\'' + ", topic='" + topic + '\'' + ", batchSize='" + batchSize + '\'' + ", lingerMs='" + lingerMs + '\'' + ", compressionType='" + compressionType + '\'' + ", retries='" + retries + '\'' + ", maxRequestSize='" + maxRequestSize + '\'' + ", isSend='" + isSend + '\'' + ", producer=" + producer + '}'; } @Override public void start() { (); if ("true".equals()) { if (producer == null) { producer = (, , , , , ); } } } @Override public void stop() { (); if ("true".equals()) { (); } (, "Stopping kafkaAppender..."); } @Override protected void append(ILoggingEvent eventObject) { byte[] byteArray; String log; // Decode the log format byteArray = (eventObject); log = new String(byteArray); ProducerRecord<String, String> record = new ProducerRecord<>(, log); if (() == null && "true".equals()) { (record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { (, "Send log to kafka failed: [{}]", log); } } }); } } public String getBootstrapServers() { return bootstrapServers; } public void setBootstrapServers(String bootstrapServers) { = bootstrapServers; } public String getTopic() { return topic; } public void setTopic(String topic) { = topic; } public String getBatchSize() { return batchSize; } public void setBatchSize(String batchSize) { = batchSize; } public String getLingerMs() { return lingerMs; } public void setLingerMs(String lingerMs) { = lingerMs; } public String getCompressionType() { return compressionType; } public void setCompressionType(String compressionType) { = compressionType; } public String getRetries() { return retries; } public void setRetries(String retries) { = retries; } public String getMaxRequestSize() { return maxRequestSize; } public void setMaxRequestSize(String maxRequestSize) { = maxRequestSize; } public Producer<String, String> getProducer() { return producer; } public void setProducer(Producer<String, String> producer) { = producer; } public String getIsSend() { return isSend; } public void setIsSend(String isSend) { = isSend; } }
In order to realize the sending of Kafka logs according to the specified format, it is directly inheritedConsoleAppender
.
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property name="LOG_HOME" value="./logs"/> <springProperty scope="context" name="springAppName" source=""/> <!-- Read in the configuration filekafkaInformation --> <springProperty scope="context" name="isSend" source="" defalutValue="false"/> <springProperty scope="context" name="bootstrapServers" source="" defalutValue="localhost:9002"/> <springProperty scope="context" name="topic" source="" defalutValue="test-topic"/> <springProperty scope="context" name="batchSize" source="" defalutValue="1"/> <springProperty scope="context" name="lingerMs" source="" defalutValue="1000"/> <springProperty scope="context" name="compressionType" source="" defalutValue="gzip"/> <springProperty scope="context" name="retries" source="" defalutValue="3"/> <springProperty scope="context" name="maxRequestSize" source="" defalutValue="5242880"/> <!-- Configure as needed --> <property name="APP_NAME" value="${springAppName}"/> <appender name="STDOUT" class=""> <encoder class=""> <pattern> { "timestamp":"%date{yyyy-MM-dd HH:mm:}", "app": "${APP_NAME}", "logLevel": "%level", "message": "%message" }\n </pattern> </encoder> </appender> <appender name="KAFKA" class="." > <!-- encoderMust be configured, Log format --> <encoder class=""> <pattern> { "timestamp":"%date{yyyy-MM-dd HH:mm:}", "app": "${APP_NAME}", "logLevel": "%level", "message": "%message" }\n </pattern> </encoder> <bootstrapServers>${bootstrapServers}</bootstrapServers> <topic>${topic}</topic> <batchSize>${batchSize}</batchSize> <lingerMs>${lingerMs}</lingerMs> <compressionType>${compressionType}</compressionType> <retries>${retries}</retries> <maxRequestSize>${maxRequestSize}</maxRequestSize> <isSend>${isSend}</isSend> </appender> <!-- uselogback-kafka-appender When the log level isdebughour,请use该配置,不要useroot --> <logger name=".log2kafka" level="DEBUG"> <appender-ref ref="KAFKA"/> </logger> <!-- Log output level --> <root level="INFO"> <!-- For console output --> <appender-ref ref="STDOUT"/> </root> </configuration>
spring: application: name: log2kafka # You can not configure it when not in uselog: config: kafka: # Whether to send logs to kafka, true or false, must be configured when using it isSend: true # The address of kafka must be configured when using it bootstrapServers: 192.168.254.152:9092,192.168.254.156:9092 # The topic sent to by the log must be configured when using it topic: test-topic # # The number of batch uploads, send after reaching this number batchSize: 5 # # Send after the interval time. Even if the maximum number of batch uploads is not reached, the interval time will be sent, in milliseconds lingerMs: 1000 # # Data compression type# compressionType: gzip # # Retry times# retries: 3 # # Maximum message size, set to 5M here# maxRequestSize: 5242880 server: port: 9090
Summarize
The above is personal experience. I hope you can give you a reference and I hope you can support me more.