Software Architecture

Integration with Spring

Event-driven Spring Applications

Spring has a number of options to choose from, from integration and streaming all the way to cloud functions and data flows.

  • Event-driven microservices
  • Streaming data
  • Integration/应用集成

应用集成

Enterprise Integration Patterns


https://www.enterpriseintegrationpatterns.com/

四大集成模式

Messaging Pattern

The pattern is centered around messages – discrete payloads of data that move from an originating system or process to one or multiple systems or processes via predefined channels.


Messaging Pattern

The pattern arose as the most flexible way to integrate multiple disparate systems in a way that:

  • Almost completely decouples the systems involved in the integration
  • Allows participant systems in the integration to be completely agnostic of each others underlying protocols, formatting, or other implementation details
  • Encourages the development and reuse of components involved in the integration

核心:Message Channel

Connect the applications using a Message Channel, where one application writes information to the channel and the other one reads that information from the channel.

信息封装:Message

Package the information into a Message, a data record that the messaging system can transmit through a message channel.


处理过程:Pipes and Filters

Use the Pipes and Filters architectural style to divide a larger processing task into a sequence of smaller, independent processing steps (Filters) that are connected by channels (Pipes).


转换逻辑:Message Translator

Use a special filter, a Message Translator, between other filters or applications to translate one data format into another.

处理条件:Message Router

Insert a special filter, a Message Router, which consumes a Message from one Message Channel and republishes it to a different Message Channel channel depending on a set of conditions.

Source/Sink:Message Endpoint

Connect an application to a messaging channel using a Message Endpoint, a client of the messaging system that the application can then use to send or receive messages.


https://online.visual-paradigm.com/cn/diagrams/features/enterprise-integration-patterns-diagram-tool/

Spring Integration

Spring Integration enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters. Those adapters provide a higher-level of abstraction over Spring’s support for remoting, messaging, and scheduling.

Spring Integration’s primary goal is to provide a simple model for building enterprise integration solutions while maintaining the separation of concerns that is essential for producing maintainable, testable code.

主要组件

Features

  • Implementation of most of the Enterprise Integration Patterns(EIP)

    • Endpoint
    • Channel (Point-to-point and Publish/Subscribe
    • Filter
    • Transformer
    • Aggregator

More important

  • Integration with External Systems
    • ReST/HTTP
    • FTP/SFTP
    • Twitter
    • WebServices (SOAP and ReST)
    • TCP/UDP
    • JMS
    • RabbitMQ
    • Email ...

Message Type

public interface Message<T> {
    T getPayload();
    MessageHeaders getHeaders();
}

Message<String> message1 = MessageBuilder.withPayload("test").setHeader("foo", "bar").build();
Message<String> message2 = MessageBuilder.fromMessage(message1).build();

Message Channel Type

public interface MessageChannel {
    boolean send(Message message);
    boolean send(Message message, long timeout);
}
public interface PollableChannel extends MessageChannel {
    Message<?> receive();
    Message<?> receive(long timeout);
}

public interface SubscribableChannel extends MessageChannel {
    boolean subscribe(MessageHandler handler);
    boolean unsubscribe(MessageHandler handler);
}

Message Channel Implementations

  • PublishSubscribeChannel
  • QueueChannel
  • PriorityChannel
  • RendezvousChannel
  • DirectChannel
  • ExecutorChannel
  • FluxMessageChannel

Message Endpoints (Filters)

  • Message Transformer
  • Message Filter
  • Message Router
  • Splitter
  • Aggregator
  • Service Activator(Endpoint in EIP)
  • Channel Adapter(Endpoint in EIP)

Message Transformer

A message transformer is responsible for converting a message’s content or structure and returning the modified message. Probably the most common type of transformer is one that converts the payload of the message from one format to another (such as from XML to java.lang.String). Similarly, a transformer can add, remove, or modify the message’s header values.

    @Transformer
    Order generateOrder(String productId) {
        return new Order(productId);
    }

Message Transformer Implementations

  • Object-to-String Transformer
  • Object-to-Map and Map-to-Object Transformers
  • Stream Transformers
  • JSON Transformers

Message Filter

Message filters are used to decide whether a Message should be passed along or dropped based on some criteria, such as a message header value or message content itself. Therefore, a message filter is similar to a router, except that, for each message received from the filter’s input channel, that same message may or may not be sent to the filter’s output channel.

public interface MessageSelector {
    boolean accept(Message<?> message);
}
MessageFilter filter = new MessageFilter(someSelector);

Message Router

A message router is responsible for deciding what channel or channels (if any) should receive the message next. A message router is often used as a dynamic alternative to a statically configured output channel on a service activator or other endpoint capable of sending reply messages.

  • PayloadTypeRouter
  • HeaderValueRouter
  • RecipientListRouter

Splitter

A splitter is another type of message endpoint whose responsibility is to accept a message from its input channel, split that message into multiple messages, and send each of those to its output channel. This is typically used for dividing a “composite” payload object into a group of messages containing the subdivided payloads.

Aggregator

The aggregator is a type of message endpoint that receives multiple messages and combines them into a single message. Technically, the aggregator is more complex than a splitter, because it is required to maintain state (the messages to be aggregated), to decide when the complete group of messages is available, and to timeout if necessary. Furthermore, in case of a timeout, the aggregator needs to know whether to send the partial results, discard them, or send them to a separate channel. Spring Integration provides a CorrelationStrategy, a ReleaseStrategy, and configurable settings for timeout, whether to send partial results upon timeout, and a discard channel.

Service Activator

A Service Activator is a generic endpoint for connecting a service instance to the messaging system. The input message channel must be configured, and, if the service method to be invoked is capable of returning a value, an output message Channel may also be provided.

Channel Adapter

A channel adapter is an endpoint that connects a message channel to some other system or transport. Channel adapters may be either inbound or outbound. Typically, the channel adapter does some mapping between the message and whatever object or resource is received from or sent to the other system (file, HTTP Request, JMS message, and others). Depending on the transport, the channel adapter may also populate or extract message header values. Spring Integration provides a number of channel adapters, which are described in upcoming chapters.

Inbound & Outbound

  • An inbound channel adapter endpoint connects a source system to a MessageChannel

  • An outbound channel adapter endpoint connects a MessageChannel to a target system.

Hello World

  • inputChannel: MessageChannel
  • outputChannel: MessageChannel
  • service-activator: ServiceActivator
sa-spring/spring-integration/tree/master/src/main/java/com/example/spring/integration/helloworld

Hello World Components

MessageChannel inputChannel = context.getBean("inputChannel", MessageChannel.class);
PollableChannel outputChannel = context.getBean("outputChannel", PollableChannel.class);
inputChannel.send(new GenericMessage<String>("World"));
logger.info("==> HelloWorldDemo: " + outputChannel.receive(0).getPayload());
<channel id="inputChannel" />
<channel id="outputChannel">
    <queue capacity="10" />
</channel>
<service-activator input-channel="inputChannel" output-channel="outputChannel" ref="helloService" method="sayHello" />
<beans:bean id="helloService" class="com.example.spring.integration.helloworld.HelloService" />

File-Copying Integration

  • fileReadingMessageSource: InboundChannelAdapter
  • fileWritingMessageHandler: ServiceActivator
  • fileChannel: MessageChannel

https://www.bilibili.com/video/BV1Ak4y1k7B6?p=3 0:50

sa-spring/spring-integration/tree/master/src/main/java/com/example/spring/integration/filecopy
File-Copying Integration Components
@Bean
@InboundChannelAdapter(value = "fileChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> fileReadingMessageSource() {
    FileReadingMessageSource sourceReader = new FileReadingMessageSource();
    sourceReader.setDirectory(new File(INPUT_DIR));
    sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
    return sourceReader;
}
@Bean
public MessageChannel fileChannel() {
    return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "fileChannel")
public MessageHandler fileWritingMessageHandler() {
    FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
    handler.setFileExistsMode(FileExistsMode.REPLACE);
    handler.setExpectReply(false);
    return handler;
}

Integration Endpoints

AMQP/RabbitMQ Integration

https://www.bilibili.com/video/BV1Ak4y1k7B6?p=3 10:10


sa-spring/spring-integration/tree/master/src/main/java/com/example/spring/integration/amqp

小结:Messaging Pattern

The pattern arose as the most flexible way to integrate multiple disparate systems in a way that:

  • Almost completely decouples the systems involved in the integration
  • Allows participant systems in the integration to be completely agnostic of each others underlying protocols, formatting, or other implementation details
  • Encourages the development and reuse of components involved in the integration

![bg right:15% 80%](images/11-spring-integration.png)