introduction
In an enterprise integration architecture, message routing is a crucial link, which is responsible for distributing messages to different target channels according to predetermined rules. As the implementation framework of the enterprise integration model, Spring Integration provides powerful Router components to meet various complex routing needs. Router can intelligently determine the flow direction of the message based on the content, message header or other conditions of the message, so that each component of the system can focus on its own core functions, and improve the system's modularity and maintainability. This article will explore in-depth the implementation and application scenarios of Router in Spring Integration, especially related technologies for conditional routing and message filtering, and demonstrate how to effectively use these functions in actual projects through specific examples.
1. Router basic concepts
Router is one of the core components in Spring Integration, and its main responsibility is to route input messages to one or more output channels based on specific conditions. Through Router, flexible message flow can be built to realize dynamic branch processing of business logic. Spring Integration provides a variety of types of Router implementations, including PayloadTypeRouter, HeaderValueRouter, RecipientListRouter, ExpressionEvaluatingRouter, etc. Developers can choose the appropriate Router type according to specific needs. Router works by receiving messages from input channels, evaluating messages based on configured routing rules, and then deciding which or output channels to which messages are sent.
import ; import ; import ; import ; import ; import ; @Configuration public class BasicRouterConfig { // Define the channel @Bean public MessageChannel inputChannel() { return new DirectChannel(); } @Bean public MessageChannel orderChannel() { return new DirectChannel(); } @Bean public MessageChannel inventoryChannel() { return new DirectChannel(); } @Bean public MessageChannel customerChannel() { return new DirectChannel(); } // Basic router implementation @Bean @Router(inputChannel = "inputChannel") public String route(Message<?> message) { //Route to different channels according to different types of messages Object payload = (); if (payload instanceof Order) { return "orderChannel"; } else if (payload instanceof InventoryItem) { return "inventoryChannel"; } else if (payload instanceof Customer) { return "customerChannel"; } else { throw new IllegalArgumentException("Unknown message type: " + ().getName()); } } // Sample data class public static class Order { private String orderId; // Other fields are omitted } public static class InventoryItem { private String itemId; // Other fields are omitted } public static class Customer { private String customerId; // Other fields are omitted } }
2. Conditional routing implementation
Conditional routing refers to routing messages to different target channels based on specific conditions in message content or message header information. Spring Integration provides a variety of ways to implement conditional routing, including using SpEL expressions, Java DSL, and annotation-based configuration. ExpressionEvaluatingRouter allows the use of SpEL expressions to define routing conditions, so that complex routing logic can be implemented through concise expressions. Through conditional routing, the system can dynamically determine the processing flow of messages based on business rules, such as dividing orders into high priority and normal priority processing based on order amount, or providing different levels of services according to customer type.
import ; import ; import ; import ; import ; import ; import ; @Configuration public class ConditionalRouterConfig { // Conditional router using SpEL expression @Bean @Router(inputChannel = "orderInputChannel") public ExpressionEvaluatingRouter orderRouter() { ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(" > 1000 ? 'vipOrderChannel' : 'regularOrderChannel'"); ("true", "vipOrderChannel"); ("false", "regularOrderChannel"); return router; } // Configure conditional routing using Java DSL @Bean public conditionRoutingFlow() { return .from("paymentInputChannel") .<Payment, String>route( payment -> { if (() < 100) { return "smallPaymentChannel"; } else if (() < 1000) { return "mediumPaymentChannel"; } else { return "largePaymentChannel"; } }, mapping -> mapping .subFlowMapping("smallPaymentChannel", sf -> sf .handle(message -> { ("Processing micropayments: " + ()); })) .subFlowMapping("mediumPaymentChannel", sf -> sf .handle(message -> { ("Medium payment processing: " + ()); })) .subFlowMapping("largePaymentChannel", sf -> sf .handle(message -> { ("Processing large payments: " + ()); })) ) .get(); } // Multi-condition routing example @Bean @Router(inputChannel = "customerInputChannel") public String routeCustomer(Customer customer) { //Route according to customer type and credit score if (().equals("VIP") && () > 700) { return "premiumServiceChannel"; } else if (().equals("VIP")) { return "vipServiceChannel"; } else if (() > 700) { return "priorityServiceChannel"; } else { return "regularServiceChannel"; } } // Sample data class public static class Payment { private double amount; public double getAmount() { return amount; } } public static class Customer { private String type; private int creditScore; public String getType() { return type; } public int getCreditScore() { return creditScore; } } }
3. Message header-based routing
In enterprise integration scenarios, message headers usually contain important metadata, such as message type, priority, source system and other information, which are very useful for routing decisions. HeaderValueRouter is specifically used to route based on the value of the message header, simplifying the routing configuration based on the message header. Through message header routing, routing decisions can be made quickly without parsing the message content, which improves system performance, and also separates routing logic from business logic, enhancing the modularity of the system. This routing method is particularly suitable for processing messages from different systems, or scenarios where classification processing is required based on the metadata of the message.
import ; import ; import ; import ; import ; import ; @Configuration public class HeaderBasedRouterConfig { // Message header-based router @Bean @Router(inputChannel = "requestChannel") public HeaderValueRouter messageTypeRouter() { HeaderValueRouter router = new HeaderValueRouter("message-type"); ("ORDER", "orderProcessingChannel"); ("INVENTORY", "inventoryManagementChannel"); ("SHIPPING", "shippingChannel"); ("PAYMENT", "paymentProcessingChannel"); // Set the default channel, use when there is no matching message header value ("unknownMessageChannel"); return router; } // Message header injection example @Bean public headerEnricher() { Map<String, Object> headersToAdd = new HashMap<>(); ("message-type", "ORDER"); ("priority", "HIGH"); return new (headersToAdd); } // Example method for sending messages public void sendMessage() { // Create a message containing a message header Message<String> orderMessage = MessageBuilder .withPayload("Order Data Content") .setHeader("message-type", "ORDER") .setHeader("priority", "HIGH") .build(); Message<String> inventoryMessage = MessageBuilder .withPayload("Inventory Data Content") .setHeader("message-type", "INVENTORY") .setHeader("priority", "MEDIUM") .build(); // Send the message to the requestChannel, the router will route it according to the message-type header requestChannel().send(orderMessage); requestChannel().send(inventoryMessage); } @Bean public requestChannel() { return new (); } }
4. Dynamic routing and routing tables
In some complex integration scenarios, routing rules may need to change dynamically based on runtime conditions, or need to be defined in the configuration file rather than hardcoded in the code. Spring Integration provides the ability to dynamic routing, allowing developers to modify routing rules at runtime or load routing tables from external configurations. AbstractMappingMessageRouter is the basic class that implements dynamic routing. It maintains a channel map table that can be updated at runtime. This approach enables the system to adapt to changes in business rules without modifying code and redeployment, improving system flexibility and maintainability.
import ; import ; import ; import ; import ; import ; import ; import ; import ; import ; import ; import ; import ; @Configuration public class DynamicRouterConfig { @Autowired private RoutingRuleService routingRuleService; // Customize dynamic router @Bean @ServiceActivator(inputChannel = "dynamicRoutingChannel") public AbstractMappingMessageRouter dynamicRouter() { return new AbstractMappingMessageRouter() { @Override protected Collection<MessageChannel> determineTargetChannels(Message<?> message) { // Get the latest routing rules from the service Map<String, String> routingRules = (); // Determine the routing key based on the message content or header information String routingKey = extractRoutingKey(message); // Find the target channel name according to the routing key String channelName = (routingKey, "defaultChannel"); // Get the target channel and return MessageChannel channel = getChannelResolver().resolveDestination(channelName); return (channel); } private String extractRoutingKey(Message<?> message) { // Implement the logic of extracting routing keys from messages // This is simplified to get from a specific message header return (String) ().get("routing-key"); } }; } // Routing rules service, used to manage and provide routing rules @Bean public RoutingRuleService routingRuleService() { return new RoutingRuleService(); } // Routing rule management service public static class RoutingRuleService { private Map<String, String> routingRules = new HashMap<>(); public RoutingRuleService() { // Initialize the default routing rules ("ORDER", "orderChannel"); ("INVENTORY", "inventoryChannel"); ("CUSTOMER", "customerChannel"); } public Map<String, String> getRoutingRules() { return routingRules; } public void updateRoutingRule(String key, String channelName) { (key, channelName); } public void loadRoutingRules(Properties properties) { ((k, v) -> ((), ())); } } //Routing rules update API @Bean @ServiceActivator(inputChannel = "routingRuleUpdateChannel") public void updateRoutingRule(Message<RoutingRuleUpdate> message) { RoutingRuleUpdate update = (); ((), ()); } //Route rule update request public static class RoutingRuleUpdate { private String key; private String channelName; // Omit getter and setter } }
5. Message filtering and selective routing
Message filtering is a special form of routing that determines whether to allow messages to continue to flow based on specific conditions. Spring Integration's Filter component is used to implement this function, which can filter out messages that do not meet the criteria based on the content of the message or the message header information. Filters can be used as separate components or combined with routers to implement more complex routing logic. For example, when processing an order message, invalid orders can be filtered out, or different types of orders can be routed to different processing channels. This selective routing mechanism allows the system to process different types of messages more targetedly, improving processing efficiency and maintainability of the system.
import ; import ; import ; import ; import ; import ; import ; import ; import ; @Configuration public class FilterAndSelectiveRoutingConfig { // Message filter example @Bean @Filter(inputChannel = "unfilteredChannel", outputChannel = "validOrderChannel") public MessageSelector orderValidator() { return message -> { Order order = (Order) (); // Verify that the order is valid boolean isValid = () != null && !().isEmpty() && () != null && () > 0; return isValid; }; } // Examples combining filtering and routing @Bean public filterAndRouteFlow() { return .from("inputOrderChannel") // First filter invalid orders .filter(message -> { Order order = (Order) (); return (); }) // Then route according to the order type .<Order, String>route( order -> (), mapping -> mapping .subFlowMapping("RETAIL", sf -> ("retailOrderChannel")) .subFlowMapping("WHOLESALE", sf -> ("wholesaleOrderChannel")) .subFlowMapping("ONLINE", sf -> ("onlineOrderChannel")) .defaultSubFlowMapping(sf -> ("unknownOrderChannel")) ) .get(); } // Use RecipientListRouter to implement conditional multi-channel routing @Bean @Router(inputChannel = "orderRoutingChannel") public RecipientListRouter orderRouter() { RecipientListRouter router = new RecipientListRouter(); // Add routing conditions based on SpEL expression ("highValueOrderChannel", " > 1000"); ("priorityCustomerOrderChannel", " == 'VIP'"); ("internationalOrderChannel", " != 'China'"); // Send the order to the audit channel at the same time ("orderAuditChannel"); return router; } // Example of processing invalid orders @Bean @ServiceActivator(inputChannel = "invalidOrderChannel") public void handleInvalidOrder(Message<Order> message) { Order order = (); // Record invalid orders ("Invalid Order: " + ()); // Create a notification message Message<String> notification = MessageBuilder .withPayload("Order " + () + "Verification failed") .setHeader("notification-type", "ORDER_VALIDATION_FAILURE") .setHeader("order-id", ()) .build(); // Send notification notificationChannel().send(notification); } @Bean public notificationChannel() { return new (); } // Sample data class public static class Order { private String orderId; private String customerId; private String customerType; private String type; private List<OrderItem> items; private double totalAmount; private Address shippingAddress; // Omit getter and setter public boolean isValid() { return items != null && !() && customerId != null && totalAmount > 0; } } public static class OrderItem { private String productId; private int quantity; private double price; // Omit getter and setter } public static class Address { private String street; private String city; private String state; private String zipCode; private String country; // Omit getter and setter } }
6. Error handling and routing
Error handling is an important consideration in enterprise integration. Spring Integration provides rich error handling mechanisms, including error channels, global error handlers, and component-specific error handling configurations. During the routing process, various errors may occur, such as the inability to find a matching channel, message processing exceptions, etc. By configuring the error channel and error handler, messages can be routed to a specific error handling process when an error occurs, thereby enabling centralized processing and recovery of errors. This mechanism allows the system to deal with various abnormal situations more robustly, improving the reliability and availability of the system.
import ; import ; import ; import ; import ; import ; import ; import ; import ; import ; import ; import ; import ; import ; import ; @Configuration @IntegrationComponentScan public class ErrorHandlingRouterConfig { // Define the error channel @Bean public MessageChannel errorChannel() { return new DirectChannel(); } // Define the main router process, including error handling @Bean public IntegrationFlow routerWithErrorHandling() { return IntegrationFlows .from("inputChannel") .<Message<?>, String>route( message -> { try { // Extract the routing key from the message String type = (String) ().get("message-type"); if (type == null) { throw new IllegalArgumentException("Message type cannot be empty"); } return type; } catch (Exception e) { // Put exception information into the message header throw new MessagingException(message, "Routing Error: " + (), e); } }, mapping -> mapping .subFlowMapping("ORDER", sf -> ("orderChannel")) .subFlowMapping("INVENTORY", sf -> ("inventoryChannel")) .defaultSubFlowMapping(sf -> ("unknownTypeChannel")) ) // Error configuration channel .errorChannel("errorChannel") .get(); } // Error handling service @Bean @ServiceActivator(inputChannel = "errorChannel") public void handleError(Message<MessagingException> errorMessage) { MessagingException exception = (); Message<?> failedMessage = (); ("An error occurred while processing the message: " + ()); ("Failed message: " + failedMessage); // Execute different error handling logic according to the exception type if (() instanceof IllegalArgumentException) { // Send to invalid message channel invalidMessageChannel().send(MessageBuilder .withPayload(()) .copyHeaders(()) .setHeader("error-message", ()) .build()); } else { // Send to the retry channel and try to reprocess it retryChannel().send(failedMessage); } } // Router containing retry logic @Bean public IntegrationFlow retryableRouterFlow() { return IntegrationFlows .from("retryChannel") .<Object, String>route( payload -> { if (payload instanceof Order) { return "orderChannel"; } else if (payload instanceof InventoryItem) { return "inventoryChannel"; } else { return "unknownTypeChannel"; } }, // Application retry notification spec -> (retryAdvice()) ) .get(); } // Retry notification configuration @Bean public RequestHandlerRetryAdvice retryAdvice() { RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice(); // Configure the retry policy retryTemplate = new (); // Set a retry policy: try up to 3 times (new (3)); // Set back strategy: exponential backing, initial 1 second, maximum 30 seconds backOffPolicy = new (); (1000); (30000); (2.0); (backOffPolicy); (retryTemplate); // Set recovery policy: Send to dead letter channel ErrorMessageSendingRecoverer recoverer = new ErrorMessageSendingRecoverer(deadLetterChannel()); (recoverer); return advice; } // Define dead letter channel @Bean public MessageChannel deadLetterChannel() { return new DirectChannel(); } // Define invalid message channel @Bean public MessageChannel invalidMessageChannel() { return new DirectChannel(); } // Define the retry channel @Bean public MessageChannel retryChannel() { return new DirectChannel(); } // Sample Message Gateway @MessagingGateway(defaultRequestChannel = "inputChannel") public interface MessageRoutingGateway { void send(Message<?> message); } }
Summarize
Spring Integration's Router component provides powerful message routing capabilities for enterprise application integration, allowing the system to flexibly process message flows according to different conditions. This article introduces in detail the basic concepts of Router, conditional routing implementation, message header-based routing, dynamic routing and routing tables, message filtering and selective routing, error handling and routing, etc. These technologies provide strong support for building complex enterprise integration solutions, allowing various components of the system to collaborate in a loosely coupled manner, improving the system's maintainability and scalability. In practical applications, developers can choose appropriate routing policies based on specific needs, and build a flexible and robust message processing process by combining multiple routing mechanisms.
This is the article about the conditional routing and filtering of SpringIntegration message routing. For more related SpringIntegration message routing content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!