SoFunction
Updated on 2025-03-03

Detailed explanation of Spring Boot's support for Apache Pulsar

/spring-boot/docs/3.2.0/reference/htmlsingle/#

Apache Pulsar is supported by providing automatic configuration of Spring for Apache Pulsar projects.

When the classpath exists:spring-pulsarSpring Boot will automatically configure and register the classic (imperative) Spring for Apache Pulsar components. When the classpath exists:spring-pulsar-reactiveSpring Boot also performs the same operation on the reactive components.

There are suitable for imperative and reactive use, respectivelyspring-boot-starter-pulsarandspring-boot-starter-pulsar-reactive"Starters", which facilitates the collection of dependencies.

Connect to Pulsar

When using the Pulsar launcher, Spring Boot will automatically configure and register aPulsarClient bean。

By default, the application attempts to connect to thepulsar://localhost:6650local Pulsar instance. This can be done by-urlThe properties are set to different values ​​to adjust.
Notice: This value must be a valid Pulsar protocol URL.

You can specify any.*Start the application properties to configure the client.

If more control is required to configure PulsarClient, consider registering one or morePulsarClientBuilderCustomizer bean。

Authentication

To connect to a Pulsar cluster that needs authentication, you need to specify which authentication plugin to use, through the settingspluginClassNameand any parameters required by the plugin. The parameter can be set as a mapping from parameter name to parameter value. The following example shows how to configure itAuthenticationOAuth2Plugin.

-class-name=.oauth2.AuthenticationOAuth2
[issuerUrl]=/
[privateKey]=file:///Users/
=urn:sn:acme:dev:my-instance

Notice
Need to be sure.*The name defined below exactly matches the name expected by the authentication plugin (usually camel nomenclature). Spring Boot does not attempt to make any form of loose binding to these entries.

For example, if you want toAuthenticationOAuth2Authentication plug-in configuration issuer URL must be used. If other forms are used,issuerurlorissuer-url, then the settings will not be applied to the plugin.

SSL

By default, the Pulsar client communicates with the Pulsar service in plaintext. The following section describes how to configure the Pulsar client to use TLS encryption (SSL). A prerequisite is that Broker has also been configured to use TLS encryption.

Spring Boot automatic configuration does not currently support any TLS/SSL configuration properties. Instead, you can provide aPulsarClientBuilderCustomizer, the customizer will set the necessary properties on the Pulsar client builder. Pulsar supports two certificate formats: Privacy Enhanced Mail (PEM) and Java KeyStore (JKS).

Follow these steps to configure TLS:

  • Adjust the Pulsar client service URL to usepulsar+ssl://scheme and TLS ports (usually6651)。
  • Adjust the management client service URL to usehttps://scheme and TLS web ports (usually8443)。
  • Provides a client builder customizer that sets relevant properties on the builder.

Connect to Pulsar responsively

When Reactive autoconfiguration is activated, Spring Boot will automatically configure and register aReactivePulsarClient bean。

Connect to the Pulsar management interface

Spring for Apache PulsarPulsarAdministrationThe client also implements automatic configuration.

By default, the application attempts to connect to thehttp://localhost:8080local Pulsar instance. Can be-urlThe property is set to (http|https)://<host>:<port>to adjust this setting.

If more control is needed to configurePulsarAdmin, please consider registering one or morePulsarAdminBuilderCustomizer bean。

Certification

When accessing a Pulsar cluster that requires authentication, the management client requires the same security configuration as the normal Pulsar client. Can beReplace withTo use the above authentication configuration.

hint: Create a theme at startup, please add a type asPulsarTopicbean. If the topic already exists, the bean will be ignored.

Send a message

Spring'sPulsarTemplateAutomatic configuration is implemented, and you can use it to send messages as follows:

import ;
import ;
import ;
@Component
public class MyBean {
    private final PulsarTemplate<String> pulsarTemplate;
    public MyBean(PulsarTemplate<String> pulsarTemplate) {
         = pulsarTemplate;
    }
    public void someMethod() throws PulsarClientException {
        ("someTopic", "Hello");
    }
}

PulsarTemplateDepend onPulsarProducerFactoryTo create the underlying Pulsar producer. Spring Boot's automatic configuration also provides this producer factory, which by default caches the created producers. You can specify any.*and.*Configure the producer factory and cache settings for the prefixed application properties.

If you need more control over the configuration of the producer factory, consider registering one or moreProducerBuilderCustomizerbean. These customizers are applied to all created producers. You can also pass in a message when sendingProducerBuilderCustomizer, affects only current producers.

If you need more control over the message being sent, you can pass in one when sending the messageTypedMessageBuilderCustomizer

Send messages responsively

Spring'sReactivePulsarTemplateAutomatic configuration will also be implemented, and you can use it to send messages as follows:

import ;
import ;
@Component
public class MyBean {
    private final ReactivePulsarTemplate<String> pulsarTemplate;
    public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
         = pulsarTemplate;
    }
    public void someMethod() {
        ("someTopic", "Hello").subscribe();
    }
}

ReactivePulsarTemplateDepend onReactivePulsarSenderFactoryTo actually create the underlying sender. Spring Boot's automatic configuration also provides this sender factory, which by default caches the created senders. You can specify any.*and.*Configure sender factory and cache settings for prefixed application properties.

If you need more control over the configuration of the sender factory, consider registering one or moreReactiveMessageSenderBuilderCustomizerbean. These customizers are applied to all created senders. You can also pass in a message when sendingReactiveMessageSenderBuilderCustomizer, affects only the current transmitter.

If you need more control over the message being sent, you can pass in one when sending the messageMessageSpecBuilderCustomizer

Receive message

When the Apache Pulsar infrastructure exists, any bean can be added by@PulsarListenerAnnotation to create listener endpoints. The following components aresomeTopicA listener endpoint is created on the topic:

import ;
import ;
@Component
public class MyBean {
    @PulsarListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }
}

Spring Boot is automatically configured asPulsarListenerAll necessary components are provided, such asPulsarListenerContainerFactoryand consumer factories for building underlying Pulsar consumers. You can specify any.*and.*Configure these components as prefixed application properties.

If you need more control over the configuration of the consumer factory, consider registering one or moreConsumerBuilderCustomizerbean. These customizers apply to all consumers created by the factory and therefore are applicable to all@PulsarListenerExample. You can also set it@PulsarListenerAnnotatedconsumerCustomizerProperties to customize a single listener.

Responsively receive messages

When the Apache Pulsar infrastructure exists and Reactive autoconfiguration is activated, any bean can be added by adding@ReactivePulsarListenerAnnotation to create responsive listener endpoints. The following components aresomeTopicA responsive listener endpoint is created on the topic:

import ;
import ;
import ;
@Component
public class MyBean {
    @ReactivePulsarListener(topics = "someTopic")
    public Mono<Void> processMessage(String content) {
        // ...
        return ();
    }
}

Spring Boot is automatically configured asReactivePulsarListenerAll necessary components are provided, such asReactivePulsarListenerContainerFactoryand consumer factories for building underlying responsive Pulsar consumers. You can specify any.and.Configure these components as prefixed application properties.

If you need more control over the configuration of the consumer factory, consider registering one or moreReactiveMessageConsumerBuilderCustomizerbean. These customizers apply to all consumers created by the factory and therefore are applicable to all@ReactivePulsarListenerExample. You can also set it@ReactivePulsarListenerAnnotatedconsumerCustomizerProperties to customize a single listener.

Read the message

Pulsar's reader interface enables applications to manually manage cursors. When you connect to a topic using a reader, you need to specify which message to start reading when the reader is connected to the topic.

When the Apache Pulsar infrastructure exists, any bean can be added by@PulsarReaderAnnotation to consume messages using the reader. The following component creates a reader endpoint fromsomeTopicStart reading messages at the beginning of the topic:

import ;
import ;
@Component
public class MyBean {
    @PulsarReader(topics = "someTopic", startMessageId = "earliest")
    public void processMessage(String content) {
        // ...
    }
}

@PulsarReaderDepend onPulsarReaderFactoryTo create the underlying Pulsar reader. Spring Boot's automatic configuration provides this reader factory that can be set by any.*Customize it for the prefixed application properties.

If you need more control over the configuration of the reader factory, consider registering one or moreReaderBuilderCustomizerbean. These customizers apply to all factory-created readers, so they work for all@PulsarReaderExample. You can also set it@PulsarReaderAnnotatedreaderCustomizerProperties to customize a single listener.

Read messages responsively

Spring provides aReactivePulsarReaderFactory, you can use it to create a reader to read messages responsively. The following components create a reader using the provided factory and fromsomeTopicRead a message 5 seconds ago in the topic:

import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
@Component
public class MyBean {
    private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;
    public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
         = pulsarReaderFactory;
    }
    public void someMethod() {
        ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
            .topic("someTopic")
            .startAtSpec((().minusSeconds(5)));
        Mono<Message<String>> message = 
            .createReader(, (readerBuilderCustomizer))
            .readOne();
        // ...
    }
}

Spring Boot's automatic configuration provides this reader factory that can be set by any.*Customize it for the prefixed application properties.

If you need more control over the configuration of the reader factory, consider passing in one or more when creating a reader using the factoryReactiveMessageReaderBuilderCustomizerExample.

If you need more control over the configuration of the reader factory, consider registering one or moreReactiveMessageReaderBuilderCustomizerbean. These customizers are applied to all created readers. You can also pass in one or more when creating a readerReactiveMessageReaderBuilderCustomizer, only apply customization to the created readers.

Extra Pulsar attributes

Only the subset of attributes supported by Pulsar can pass directlyPulsarPropertiesClass use. If you want to use additional properties to adjust the auto-configured components that are not directly supported, you can use the customizers supported by each of the aforementioned components.

This is the end of this article about the detailed explanation of Spring Boot's support for Apache Pulsar. For more related Spring Boot Apache Pulsar content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!