SoFunction
Updated on 2025-04-08

SpringBoot implements Logback output log to Kafka method

SpringBoot Logback output log to Kafka

This article implements Logback output logs to Kafka by creating a custom Appender in SpringBoot application.

Configuration related maven dependencies

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="/POM/4.0.0"
         xmlns:xsi="http:///2001/XMLSchema-instance"
         xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId></groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId></groupId>
    <artifactId>log2kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>log2kafka</name>
    <description>Demo project for send log to kafka</description>

    <properties>
        <>1.8</>
    </properties>

    <dependencies>
        <dependency>
            <groupId></groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId></groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.60</version>
        </dependency>

        <dependency>
            <groupId></groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId></groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- logbackPlugin -->
        <dependency>
            <groupId></groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
        <!-- kafkaPlugin -->
        <dependency>
            <groupId></groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId></groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

It is not recommended to modify the kafka version in the pom file, as it can easily cause various errors.

Create a kafka tool class for configuring the generator

package .;

import ;
import ;
import ;

import ;

public class KafkaUtil {

    public static Producer<String, String> createProducer(
            String bootstrapServers, String batchSize, String lingerMs,
            String compressionType, String retries, String maxRequestSize) {
        // When the configuration item is IS_UNDEFINED, use the default value        if (bootstrapServers == null) {
            bootstrapServers = "localhost:9092";
        }
        if (("IS_UNDEFINED")) {
            batchSize = "50000";
        }
        if (("IS_UNDEFINED")) {
            lingerMs = "60000";
        }

        if (("IS_UNDEFINED")) {
            retries = "3";
        }
        if (("IS_UNDEFINED")) {
            maxRequestSize = "5242880";
        }

        Properties properties = new Properties();
        // kafka address, clusters are separated by commas        (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // acks value:        // 0: kafka does not return confirmation information, and does not guarantee whether record is received. Because it does not return, the retry mechanism will not take effect        // 1: The partition leader confirms that the record is written to the log, but does not guarantee that the information is copied correctly (it is recommended to set to this value)        // all: the leader will wait for all information to be synchronized and return to the confirmation information        (ProducerConfig.ACKS_CONFIG, "1");
        (ProducerConfig.RETRIES_CONFIG, (retries));
        // Batch sending, triggering the sending mechanism when the batch size maximum value is reached (batch sending after 10.0)        (ProducerConfig.BATCH_SIZE_CONFIG, (batchSize));
        // This configuration means that when the number is not reached, data will also be pushed within the specified time.        (ProducerConfig.LINGER_MS_CONFIG, (lingerMs));
        // Configure cache        (ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        if (!("IS_UNDEFINED")) {
            // Specify compression algorithm            (ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
        }
        // Message size for each request        (ProducerConfig.MAX_REQUEST_SIZE_CONFIG, (maxRequestSize));
        (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "");
        (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "");
        return new KafkaProducer<String, String>(properties);
    }
}

package .;

import ;
import ;
import .;
import ;
import ;
import ;
import ;
import org.;
import org.;

public class KafkaAppender extends ConsoleAppender<ILoggingEvent> {

    public static final Logger LOGGER = ();

    private String bootstrapServers;
    private String topic;
    private String batchSize;
    private String lingerMs;
    private String compressionType;
    private String retries;
    private String maxRequestSize;
    private String isSend;

    private Producer<String, String> producer;

    @Override
    public String toString() {
        return "KafkaAppender{" +
                "bootstrapServers='" + bootstrapServers + '\'' +
                ", topic='" + topic + '\'' +
                ", batchSize='" + batchSize + '\'' +
                ", lingerMs='" + lingerMs + '\'' +
                ", compressionType='" + compressionType + '\'' +
                ", retries='" + retries + '\'' +
                ", maxRequestSize='" + maxRequestSize + '\'' +
                ", isSend='" + isSend + '\'' +
                ", producer=" + producer +
                '}';
    }

    @Override
    public void start() {
        ();
        if ("true".equals()) {
            if (producer == null) {
                producer = (, ,
                        , , , );
            }
        }
    }

    @Override
    public void stop() {
        ();
        if ("true".equals()) {
            ();
        }

        (, "Stopping kafkaAppender...");
    }

    @Override
    protected void append(ILoggingEvent eventObject) {
        byte[] byteArray;
        String log;
        // Decode the log format        byteArray = (eventObject);
        log = new String(byteArray);
        ProducerRecord<String, String> record = new ProducerRecord<>(, log);
        if (() == null && "true".equals()) {
            (record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        (, "Send log to kafka failed: [{}]", log);
                    }
                }
            });
        }
    }

    public String getBootstrapServers() {
        return bootstrapServers;
    }

    public void setBootstrapServers(String bootstrapServers) {
         = bootstrapServers;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
         = topic;
    }

    public String getBatchSize() {
        return batchSize;
    }

    public void setBatchSize(String batchSize) {
         = batchSize;
    }

    public String getLingerMs() {
        return lingerMs;
    }

    public void setLingerMs(String lingerMs) {
         = lingerMs;
    }

    public String getCompressionType() {
        return compressionType;
    }

    public void setCompressionType(String compressionType) {
         = compressionType;
    }

    public String getRetries() {
        return retries;
    }

    public void setRetries(String retries) {
         = retries;
    }

    public String getMaxRequestSize() {
        return maxRequestSize;
    }

    public void setMaxRequestSize(String maxRequestSize) {
         = maxRequestSize;
    }

    public Producer<String, String> getProducer() {
        return producer;
    }

    public void setProducer(Producer<String, String> producer) {
         = producer;
    }

    public String getIsSend() {
        return isSend;
    }

    public void setIsSend(String isSend) {
         = isSend;
    }
}

In order to realize the sending of Kafka logs according to the specified format, it is directly inheritedConsoleAppender.

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="LOG_HOME" value="./logs"/>
    <springProperty scope="context" name="springAppName"
                    source=""/>
    <!-- Read in the configuration filekafkaInformation -->
    <springProperty scope="context" name="isSend"
                    source="" defalutValue="false"/>
    <springProperty scope="context" name="bootstrapServers"
                    source="" defalutValue="localhost:9002"/>
    <springProperty scope="context" name="topic"
                    source="" defalutValue="test-topic"/>
    <springProperty scope="context" name="batchSize"
                    source="" defalutValue="1"/>
    <springProperty scope="context" name="lingerMs"
                    source="" defalutValue="1000"/>
    <springProperty scope="context" name="compressionType"
                    source="" defalutValue="gzip"/>
    <springProperty scope="context" name="retries"
                    source="" defalutValue="3"/>
    <springProperty scope="context" name="maxRequestSize"
                    source="" defalutValue="5242880"/>
    <!-- Configure as needed -->
    <property name="APP_NAME" value="${springAppName}"/>
    <appender name="STDOUT" class="">
        <encoder class="">
            <pattern>
                {
                "timestamp":"%date{yyyy-MM-dd HH:mm:}",
                "app": "${APP_NAME}",
                "logLevel": "%level",
                "message": "%message"
                }\n
            </pattern>
        </encoder>
    </appender>

    <appender name="KAFKA" class="." >
        <!-- encoderMust be configured, Log format -->
        <encoder class="">
            <pattern>
{
"timestamp":"%date{yyyy-MM-dd HH:mm:}",
"app": "${APP_NAME}",
"logLevel": "%level",
"message": "%message"
}\n
            </pattern>
        </encoder>
        <bootstrapServers>${bootstrapServers}</bootstrapServers>
        <topic>${topic}</topic>
        <batchSize>${batchSize}</batchSize>
        <lingerMs>${lingerMs}</lingerMs>
        <compressionType>${compressionType}</compressionType>
        <retries>${retries}</retries>
        <maxRequestSize>${maxRequestSize}</maxRequestSize>
        <isSend>${isSend}</isSend>
    </appender>
    <!-- uselogback-kafka-appender When the log level isdebughour,请use该配置,不要useroot -->
    <logger name=".log2kafka" level="DEBUG">
        <appender-ref ref="KAFKA"/>
    </logger>
    <!-- Log output level -->
    <root  level="INFO">
        <!-- For console output -->
        <appender-ref ref="STDOUT"/>
    </root>

</configuration>

spring:
  application:
    name: log2kafka
# You can not configure it when not in uselog:
  config:
    kafka:
      # Whether to send logs to kafka, true or false, must be configured when using it      isSend: true
      # The address of kafka must be configured when using it      bootstrapServers: 192.168.254.152:9092,192.168.254.156:9092
      # The topic sent to by the log must be configured when using it      topic: test-topic
# # The number of batch uploads, send after reaching this number      batchSize: 5
# # Send after the interval time. Even if the maximum number of batch uploads is not reached, the interval time will be sent, in milliseconds      lingerMs: 1000
# # Data compression type#      compressionType: gzip
# # Retry times#      retries: 3
# # Maximum message size, set to 5M here#      maxRequestSize: 5242880
server:
  port: 9090

Summarize

The above is personal experience. I hope you can give you a reference and I hope you can support me more.