SoFunction
Updated on 2025-04-14

Conditional routing and filtering functions of SpringIntegration message routing

introduction

In an enterprise integration architecture, message routing is a crucial link, which is responsible for distributing messages to different target channels according to predetermined rules. As the implementation framework of the enterprise integration model, Spring Integration provides powerful Router components to meet various complex routing needs. Router can intelligently determine the flow direction of the message based on the content, message header or other conditions of the message, so that each component of the system can focus on its own core functions, and improve the system's modularity and maintainability. This article will explore in-depth the implementation and application scenarios of Router in Spring Integration, especially related technologies for conditional routing and message filtering, and demonstrate how to effectively use these functions in actual projects through specific examples.

1. Router basic concepts

Router is one of the core components in Spring Integration, and its main responsibility is to route input messages to one or more output channels based on specific conditions. Through Router, flexible message flow can be built to realize dynamic branch processing of business logic. Spring Integration provides a variety of types of Router implementations, including PayloadTypeRouter, HeaderValueRouter, RecipientListRouter, ExpressionEvaluatingRouter, etc. Developers can choose the appropriate Router type according to specific needs. Router works by receiving messages from input channels, evaluating messages based on configured routing rules, and then deciding which or output channels to which messages are sent.

import ;
import ;
import ;
import ;
import ;
import ;
@Configuration
public class BasicRouterConfig {
    // Define the channel    @Bean
    public MessageChannel inputChannel() {
        return new DirectChannel();
    }
    @Bean
    public MessageChannel orderChannel() {
        return new DirectChannel();
    }
    @Bean
    public MessageChannel inventoryChannel() {
        return new DirectChannel();
    }
    @Bean
    public MessageChannel customerChannel() {
        return new DirectChannel();
    }
    // Basic router implementation    @Bean
    @Router(inputChannel = "inputChannel")
    public String route(Message<?> message) {
        //Route to different channels according to different types of messages        Object payload = ();
        if (payload instanceof Order) {
            return "orderChannel";
        } else if (payload instanceof InventoryItem) {
            return "inventoryChannel";
        } else if (payload instanceof Customer) {
            return "customerChannel";
        } else {
            throw new IllegalArgumentException("Unknown message type: " + ().getName());
        }
    }
    // Sample data class    public static class Order {
        private String orderId;
        // Other fields are omitted    }
    public static class InventoryItem {
        private String itemId;
        // Other fields are omitted    }
    public static class Customer {
        private String customerId;
        // Other fields are omitted    }
}

2. Conditional routing implementation

Conditional routing refers to routing messages to different target channels based on specific conditions in message content or message header information. Spring Integration provides a variety of ways to implement conditional routing, including using SpEL expressions, Java DSL, and annotation-based configuration. ExpressionEvaluatingRouter allows the use of SpEL expressions to define routing conditions, so that complex routing logic can be implemented through concise expressions. Through conditional routing, the system can dynamically determine the processing flow of messages based on business rules, such as dividing orders into high priority and normal priority processing based on order amount, or providing different levels of services according to customer type.

import ;
import ;
import ;
import ;
import ;
import ;
import ;
@Configuration
public class ConditionalRouterConfig {
    // Conditional router using SpEL expression    @Bean
    @Router(inputChannel = "orderInputChannel")
    public ExpressionEvaluatingRouter orderRouter() {
        ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(" > 1000 ? 'vipOrderChannel' : 'regularOrderChannel'");
        ("true", "vipOrderChannel");
        ("false", "regularOrderChannel");
        return router;
    }
    // Configure conditional routing using Java DSL    @Bean
    public  conditionRoutingFlow() {
        return 
                .from("paymentInputChannel")
                .<Payment, String>route(
                        payment -> {
                            if (() < 100) {
                                return "smallPaymentChannel";
                            } else if (() < 1000) {
                                return "mediumPaymentChannel";
                            } else {
                                return "largePaymentChannel";
                            }
                        },
                        mapping -> mapping
                                .subFlowMapping("smallPaymentChannel", sf -> sf
                                        .handle(message -> {
                                            ("Processing micropayments: " + ());
                                        }))
                                .subFlowMapping("mediumPaymentChannel", sf -> sf
                                        .handle(message -> {
                                            ("Medium payment processing: " + ());
                                        }))
                                .subFlowMapping("largePaymentChannel", sf -> sf
                                        .handle(message -> {
                                            ("Processing large payments: " + ());
                                        }))
                )
                .get();
    }
    // Multi-condition routing example    @Bean
    @Router(inputChannel = "customerInputChannel")
    public String routeCustomer(Customer customer) {
        //Route according to customer type and credit score        if (().equals("VIP") && () > 700) {
            return "premiumServiceChannel";
        } else if (().equals("VIP")) {
            return "vipServiceChannel";
        } else if (() > 700) {
            return "priorityServiceChannel";
        } else {
            return "regularServiceChannel";
        }
    }
    // Sample data class    public static class Payment {
        private double amount;
        public double getAmount() {
            return amount;
        }
    }
    public static class Customer {
        private String type;
        private int creditScore;
        public String getType() {
            return type;
        }
        public int getCreditScore() {
            return creditScore;
        }
    }
}

3. Message header-based routing

In enterprise integration scenarios, message headers usually contain important metadata, such as message type, priority, source system and other information, which are very useful for routing decisions. HeaderValueRouter is specifically used to route based on the value of the message header, simplifying the routing configuration based on the message header. Through message header routing, routing decisions can be made quickly without parsing the message content, which improves system performance, and also separates routing logic from business logic, enhancing the modularity of the system. This routing method is particularly suitable for processing messages from different systems, or scenarios where classification processing is required based on the metadata of the message.

import ;
import ;
import ;
import ;
import ;
import ;
@Configuration
public class HeaderBasedRouterConfig {
    // Message header-based router    @Bean
    @Router(inputChannel = "requestChannel")
    public HeaderValueRouter messageTypeRouter() {
        HeaderValueRouter router = new HeaderValueRouter("message-type");
        ("ORDER", "orderProcessingChannel");
        ("INVENTORY", "inventoryManagementChannel");
        ("SHIPPING", "shippingChannel");
        ("PAYMENT", "paymentProcessingChannel");
        // Set the default channel, use when there is no matching message header value        ("unknownMessageChannel");
        return router;
    }
    // Message header injection example    @Bean
    public  headerEnricher() {
        Map<String, Object> headersToAdd = new HashMap<>();
        ("message-type", "ORDER");
        ("priority", "HIGH");
        return new (headersToAdd);
    }
    // Example method for sending messages    public void sendMessage() {
        // Create a message containing a message header        Message<String> orderMessage = MessageBuilder
                .withPayload("Order Data Content")
                .setHeader("message-type", "ORDER")
                .setHeader("priority", "HIGH")
                .build();
        Message<String> inventoryMessage = MessageBuilder
                .withPayload("Inventory Data Content")
                .setHeader("message-type", "INVENTORY")
                .setHeader("priority", "MEDIUM")
                .build();
        // Send the message to the requestChannel, the router will route it according to the message-type header        requestChannel().send(orderMessage);
        requestChannel().send(inventoryMessage);
    }
    @Bean
    public  requestChannel() {
        return new ();
    }
}

4. Dynamic routing and routing tables

In some complex integration scenarios, routing rules may need to change dynamically based on runtime conditions, or need to be defined in the configuration file rather than hardcoded in the code. Spring Integration provides the ability to dynamic routing, allowing developers to modify routing rules at runtime or load routing tables from external configurations. AbstractMappingMessageRouter is the basic class that implements dynamic routing. It maintains a channel map table that can be updated at runtime. This approach enables the system to adapt to changes in business rules without modifying code and redeployment, improving system flexibility and maintainability.

import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
@Configuration
public class DynamicRouterConfig {
    @Autowired
    private RoutingRuleService routingRuleService;
    // Customize dynamic router    @Bean
    @ServiceActivator(inputChannel = "dynamicRoutingChannel")
    public AbstractMappingMessageRouter dynamicRouter() {
        return new AbstractMappingMessageRouter() {
            @Override
            protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
                // Get the latest routing rules from the service                Map<String, String> routingRules = ();
                // Determine the routing key based on the message content or header information                String routingKey = extractRoutingKey(message);
                // Find the target channel name according to the routing key                String channelName = (routingKey, "defaultChannel");
                // Get the target channel and return                MessageChannel channel = getChannelResolver().resolveDestination(channelName);
                return (channel);
            }
            private String extractRoutingKey(Message<?> message) {
                // Implement the logic of extracting routing keys from messages                // This is simplified to get from a specific message header                return (String) ().get("routing-key");
            }
        };
    }
    // Routing rules service, used to manage and provide routing rules    @Bean
    public RoutingRuleService routingRuleService() {
        return new RoutingRuleService();
    }
    // Routing rule management service    public static class RoutingRuleService {
        private Map<String, String> routingRules = new HashMap<>();
        public RoutingRuleService() {
            // Initialize the default routing rules            ("ORDER", "orderChannel");
            ("INVENTORY", "inventoryChannel");
            ("CUSTOMER", "customerChannel");
        }
        public Map<String, String> getRoutingRules() {
            return routingRules;
        }
        public void updateRoutingRule(String key, String channelName) {
            (key, channelName);
        }
        public void loadRoutingRules(Properties properties) {
            ((k, v) -> ((), ()));
        }
    }
    //Routing rules update API    @Bean
    @ServiceActivator(inputChannel = "routingRuleUpdateChannel")
    public void updateRoutingRule(Message<RoutingRuleUpdate> message) {
        RoutingRuleUpdate update = ();
        ((), ());
    }
    //Route rule update request    public static class RoutingRuleUpdate {
        private String key;
        private String channelName;
        // Omit getter and setter    }
}

5. Message filtering and selective routing

Message filtering is a special form of routing that determines whether to allow messages to continue to flow based on specific conditions. Spring Integration's Filter component is used to implement this function, which can filter out messages that do not meet the criteria based on the content of the message or the message header information. Filters can be used as separate components or combined with routers to implement more complex routing logic. For example, when processing an order message, invalid orders can be filtered out, or different types of orders can be routed to different processing channels. This selective routing mechanism allows the system to process different types of messages more targetedly, improving processing efficiency and maintainability of the system.

import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
@Configuration
public class FilterAndSelectiveRoutingConfig {
    // Message filter example    @Bean
    @Filter(inputChannel = "unfilteredChannel", outputChannel = "validOrderChannel")
    public MessageSelector orderValidator() {
        return message -> {
            Order order = (Order) ();
            // Verify that the order is valid            boolean isValid = () != null && !().isEmpty()
                    && () != null
                    && () > 0;
            return isValid;
        };
    }
    // Examples combining filtering and routing    @Bean
    public  filterAndRouteFlow() {
        return 
                .from("inputOrderChannel")
                // First filter invalid orders                .filter(message -> {
                    Order order = (Order) ();
                    return ();
                })
                // Then route according to the order type                .<Order, String>route(
                        order -> (),
                        mapping -> mapping
                                .subFlowMapping("RETAIL", sf -> ("retailOrderChannel"))
                                .subFlowMapping("WHOLESALE", sf -> ("wholesaleOrderChannel"))
                                .subFlowMapping("ONLINE", sf -> ("onlineOrderChannel"))
                                .defaultSubFlowMapping(sf -> ("unknownOrderChannel"))
                )
                .get();
    }
    // Use RecipientListRouter to implement conditional multi-channel routing    @Bean
    @Router(inputChannel = "orderRoutingChannel")
    public RecipientListRouter orderRouter() {
        RecipientListRouter router = new RecipientListRouter();
        // Add routing conditions based on SpEL expression        ("highValueOrderChannel", " > 1000");
        ("priorityCustomerOrderChannel", " == 'VIP'");
        ("internationalOrderChannel", " != 'China'");
        // Send the order to the audit channel at the same time        ("orderAuditChannel");
        return router;
    }
    // Example of processing invalid orders    @Bean
    @ServiceActivator(inputChannel = "invalidOrderChannel")
    public void handleInvalidOrder(Message<Order> message) {
        Order order = ();
        // Record invalid orders        ("Invalid Order: " + ());
        // Create a notification message        Message<String> notification = MessageBuilder
                .withPayload("Order " + () + "Verification failed")
                .setHeader("notification-type", "ORDER_VALIDATION_FAILURE")
                .setHeader("order-id", ())
                .build();
        // Send notification        notificationChannel().send(notification);
    }
    @Bean
    public  notificationChannel() {
        return new ();
    }
    // Sample data class    public static class Order {
        private String orderId;
        private String customerId;
        private String customerType;
        private String type;
        private List<OrderItem> items;
        private double totalAmount;
        private Address shippingAddress;
        // Omit getter and setter        public boolean isValid() {
            return items != null && !()
                    && customerId != null
                    && totalAmount > 0;
        }
    }
    public static class OrderItem {
        private String productId;
        private int quantity;
        private double price;
        // Omit getter and setter    }
    public static class Address {
        private String street;
        private String city;
        private String state;
        private String zipCode;
        private String country;
        // Omit getter and setter    }
}

6. Error handling and routing

Error handling is an important consideration in enterprise integration. Spring Integration provides rich error handling mechanisms, including error channels, global error handlers, and component-specific error handling configurations. During the routing process, various errors may occur, such as the inability to find a matching channel, message processing exceptions, etc. By configuring the error channel and error handler, messages can be routed to a specific error handling process when an error occurs, thereby enabling centralized processing and recovery of errors. This mechanism allows the system to deal with various abnormal situations more robustly, improving the reliability and availability of the system.

import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
@Configuration
@IntegrationComponentScan
public class ErrorHandlingRouterConfig {
    // Define the error channel    @Bean
    public MessageChannel errorChannel() {
        return new DirectChannel();
    }
    // Define the main router process, including error handling    @Bean
    public IntegrationFlow routerWithErrorHandling() {
        return IntegrationFlows
                .from("inputChannel")
                .<Message<?>, String>route(
                        message -> {
                            try {
                                // Extract the routing key from the message                                String type = (String) ().get("message-type");
                                if (type == null) {
                                    throw new IllegalArgumentException("Message type cannot be empty");
                                }
                                return type;
                            } catch (Exception e) {
                                // Put exception information into the message header                                throw new MessagingException(message, "Routing Error: " + (), e);
                            }
                        },
                        mapping -> mapping
                                .subFlowMapping("ORDER", sf -> ("orderChannel"))
                                .subFlowMapping("INVENTORY", sf -> ("inventoryChannel"))
                                .defaultSubFlowMapping(sf -> ("unknownTypeChannel"))
                )
                // Error configuration channel                .errorChannel("errorChannel")
                .get();
    }
    // Error handling service    @Bean
    @ServiceActivator(inputChannel = "errorChannel")
    public void handleError(Message<MessagingException> errorMessage) {
        MessagingException exception = ();
        Message<?> failedMessage = ();
        ("An error occurred while processing the message: " + ());
        ("Failed message: " + failedMessage);
        // Execute different error handling logic according to the exception type        if (() instanceof IllegalArgumentException) {
            // Send to invalid message channel            invalidMessageChannel().send(MessageBuilder
                    .withPayload(())
                    .copyHeaders(())
                    .setHeader("error-message", ())
                    .build());
        } else {
            // Send to the retry channel and try to reprocess it            retryChannel().send(failedMessage);
        }
    }
    // Router containing retry logic    @Bean
    public IntegrationFlow retryableRouterFlow() {
        return IntegrationFlows
                .from("retryChannel")
                .<Object, String>route(
                        payload -> {
                            if (payload instanceof Order) {
                                return "orderChannel";
                            } else if (payload instanceof InventoryItem) {
                                return "inventoryChannel";
                            } else {
                                return "unknownTypeChannel";
                            }
                        },
                        // Application retry notification                        spec -> (retryAdvice())
                )
                .get();
    }
    // Retry notification configuration    @Bean
    public RequestHandlerRetryAdvice retryAdvice() {
        RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
        // Configure the retry policy         retryTemplate = new ();
        // Set a retry policy: try up to 3 times        (new (3));
        // Set back strategy: exponential backing, initial 1 second, maximum 30 seconds         backOffPolicy = new ();
        (1000);
        (30000);
        (2.0);
        (backOffPolicy);
        (retryTemplate);
        // Set recovery policy: Send to dead letter channel        ErrorMessageSendingRecoverer recoverer = new ErrorMessageSendingRecoverer(deadLetterChannel());
        (recoverer);
        return advice;
    }
    // Define dead letter channel    @Bean
    public MessageChannel deadLetterChannel() {
        return new DirectChannel();
    }
    // Define invalid message channel    @Bean
    public MessageChannel invalidMessageChannel() {
        return new DirectChannel();
    }
    // Define the retry channel    @Bean
    public MessageChannel retryChannel() {
        return new DirectChannel();
    }
    // Sample Message Gateway    @MessagingGateway(defaultRequestChannel = "inputChannel")
    public interface MessageRoutingGateway {
        void send(Message<?> message);
    }
}

Summarize

Spring Integration's Router component provides powerful message routing capabilities for enterprise application integration, allowing the system to flexibly process message flows according to different conditions. This article introduces in detail the basic concepts of Router, conditional routing implementation, message header-based routing, dynamic routing and routing tables, message filtering and selective routing, error handling and routing, etc. These technologies provide strong support for building complex enterprise integration solutions, allowing various components of the system to collaborate in a loosely coupled manner, improving the system's maintainability and scalability. In practical applications, developers can choose appropriate routing policies based on specific needs, and build a flexible and robust message processing process by combining multiple routing mechanisms.

This is the article about the conditional routing and filtering of SpringIntegration message routing. For more related SpringIntegration message routing content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!