消息服务
- 消息服务中间件可以用来提升系统异步通信,扩展解耦能力
- 消息服务两个重要概念:
- 消息代理(message broker)
-
目的地(destination)
<font color=red>当消息发送者发送消息后,将由消息代理接管,消息代理保证消息传递到指定目的地</font>
- 消息队列主要有两种形式的目的地: 队列(queue)-主题(topic)
-
队列(queue): 点对点消息通信(point-to-point)
-
点对点式:
- 消息发送者发送消息,消息代理将消息放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移除队列
- 消息只有唯一的发送者和接受者,但不是说只有一个接收者
-
点对点式:
-
主题(topic): 发布(publish)-订阅(subscribe) 消息通信
-
发布订阅式:
- 发送者(发布者) 发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息
-
发布订阅式:
-
队列(queue): 点对点消息通信(point-to-point)
-
JMS(Java Message Service)
- Java消息服务.基于JVM消息代理的规范
- ActiveMQ,HornetMQ是JMS实现
-
AMQP(Advanced Message Queuing Protocol)
- 高级消息队列协议.也是一个消息代理规范,兼容JMS
- RabbitMQ是AMQP的shixian
- AMQP与JMS的比较:
AMQP | JMS | |
---|---|---|
定义 | 网络线级协议 | Java API |
跨语言 | 是 | 否 |
跨平台 | 是 | 否 |
模型 | 提供五种消息模型:<br />1.direct exchange<br />2.fanout exchange<br />3.topic exchange<br />4.headers exchange<br />5.system exchange<br />本质来讲,后面四种和JMS的pub-sub模型没有太大差别,仅是在路由机制上做了更详细的划分 | 提供两种消息模型<br />1.Peer-2-Peer 2.pub-sub |
支持消息类型 | byte[]<br />当实际应用时,有复杂的消息,可以将消息序列化后发送 | 多种消息类型<br />TextMessage<br />MapMessage<br />BytesMessage<br />StreamMessage<br />ObjectMessage<br />Message(只有消息头和属性) |
特点 | AMQP定义了wire-level层的协议标准.天然具有跨平台,跨语言的特性 | JMS定义了Java API层的标准.在Java体系中,多个client均可以通过JMS进行交互,不需要应用修改代码,但是对跨平台的支持较差 |
Spring支持:
1.spring-jms提供了对JMS的支持
2.spring-rabbit提供了对AMQP的支持
3.需要ConnectionFactory的实现来连接消息代理
4.提供JMSTemplate,RabbitTemplate来发送消息
5.@JmsListener(JMS),@RabbitListener(AMQP)注解在方法上监听消息代理发布消息
6.@EnableJms,@EnableRabbit开启支持
SpringBoot自动配置
1.JmsAutoConfiguration
2.RabbitAutoConfiguration
RabbitMQ
RabbitMQ是erlang开发的AMQP的开源实现
RabbitMQ的核心概念
-
Message: 消息
- 消息是不具名的,由消息头和消息体组成
- 消息体(数据)是不透明的,消息头由一系列可选属性组成
- routing-key: 路由键
- priority: 优先级
- delivery-model: 是否持久性存储
- Publisher: 消息生产者,向交换器发布消息的客户端应用程序
-
Exchange: 交换器
- 用来接收生产者发送的消息并将这些消息根据路由键(routing-key)通过路由规则给服务器中的队列
- Exchange有四种类型,不同类型的Exchange转发消息的策略有所区别:
- direct(默认)
- fanout
- topic
- headers
-
Queue: 消息队列
- 用来保存消息直到发送给消费者
- 是消息的容器,也是消息的终点
- 一个消息可以投入一个或多个队列
- 消息一直在队列里面,等待消费者连接到这个队列取走
-
Binding: 绑定
- 用于消息队列和交换器之间的关联
- 一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,可以将交换器理解成一个由绑定构成的路由表
- Exchange和Queue的绑定可以是多对多关系
- Connection: 网络连接,如TCP连接
-
Channel: 信道
- 多路复用连接中的一条独立的双向数据流通道
- 信道是建立在真实的TCP连接内的虚拟连接
- AMQP命令都是通过信道发出去的,不管是发布消息,订阅队列还是接收消息,这些动作都是通过信道完成的
- 对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入信道的概念,以复用一条TCP连接
- Consumer: 消息消费者,从消息队列中取得消息的客户端应用程序
-
Virtual Host: 虚拟主机
- 表示一批交换器,消息队列和相关对象
- 虚拟主机是共享相同的身份认证和加密环境的独立服务器域
- 每个Virtual Host本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列,交换器,绑定和权限限制
- Virtual Host是AMQP概念的基础,必须在连接时指定,RabbitMQ默认的Virtual Host是 "/" 在RabbitMQ中是根据区域划分的
- Broker: 消息队列服务器实体
RabbitMQ运行机制
-
AMQP中的消息路由:
- AMQP中增加了Exchange和Binding角色.生产者把消息发布到Exchange上,消息最终到达队列并被消费者接收,Binding决定交换器的消息应该发送到哪个队列
-
Exchange类型:
Exchange分发消息时根据类型的不同分发策略有区别,共有四种类型:direct,fanout,topic, headers.(headers匹配AMQP消息的header而不是路由键,header交换器和direct交换器完全一致.但性能差很多,目前几乎不用了)-
Direct Exchange:
- 消息中的路由键(routing-key)如果和Binding中的Binding key一致,交换器就会将消息发送到对应的队列中
- 路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为"key",则只转发routing key标记为"key"的消息
- 完全匹配,单播的模式
-
Fanout Exchange:
- 每个发到fanout类型交换器的消息都会分到所有绑定的队列上去
- fanout交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上
- fanout类型转发消息是最快的,广播模式
-
Topic Exchange:
- topic交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,队列需要绑定到一个模式上
- 将路由键和绑定键的字符串切分成单词,这些单词之间用 "." 隔开
- 会识别两个通配符: "#" 和 "*" , "#" 匹配0个或多个单词, " * " 匹配一个单词
-
Direct Exchange:
整合RabbitMQ
- 引入spring-boot-starter-amqp依赖
- application.yml配置
- 测试RabbitMQ
- AmqpAdmin:管理组件
- RabbitTemplate:消息发送处理组件
-
RabbitMQ自动配置原理:
RabbitAutoConfiguration
@Configuration
@ConditionalOnClass({RabbitTemplate.class, Channel.class})
@EnableConfigurationProperties({RabbitProperties.class})
@Import({RabbitAnnotationDrivenConfiguration.class})
public class RabbitAutoConfiguration {
public RabbitAutoConfiguration() {
}
@Configuration
@ConditionalOnClass({RabbitMessagingTemplate.class})
@ConditionalOnMissingBean({RabbitMessagingTemplate.class})
@Import({RabbitAutoConfiguration.RabbitTemplateConfiguration.class})
protected static class MessagingTemplateConfiguration {
protected MessagingTemplateConfiguration() {
}
@Bean
@ConditionalOnSingleCandidate(RabbitTemplate.class)
public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) {
return new RabbitMessagingTemplate(rabbitTemplate);
}
}
@Configuration
@Import({RabbitAutoConfiguration.RabbitConnectionFactoryCreator.class})
protected static class RabbitTemplateConfiguration {
private final RabbitProperties properties;
private final ObjectProvider<MessageConverter> messageConverter;
private final ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers;
public RabbitTemplateConfiguration(RabbitProperties properties, ObjectProvider<MessageConverter> messageConverter, ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) {
this.properties = properties;
this.messageConverter = messageConverter;
this.retryTemplateCustomizers = retryTemplateCustomizers;
}
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
PropertyMapper map = PropertyMapper.get();
RabbitTemplate template = new RabbitTemplate(connectionFactory);
MessageConverter messageConverter = (MessageConverter)this.messageConverter.getIfUnique();
if (messageConverter != null) {
template.setMessageConverter(messageConverter);
}
template.setMandatory(this.determineMandatoryFlag());
Template properties = this.properties.getTemplate();
if (properties.getRetry().isEnabled()) {
template.setRetryTemplate((new RetryTemplateFactory((List)this.retryTemplateCustomizers.orderedStream().collect(Collectors.toList()))).createRetryTemplate(properties.getRetry(), Target.SENDER));
}
properties.getClass();
map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis).to(template::setReceiveTimeout);
properties.getClass();
map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout);
properties.getClass();
map.from(properties::getExchange).to(template::setExchange);
properties.getClass();
map.from(properties::getRoutingKey).to(template::setRoutingKey);
properties.getClass();
map.from(properties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
return template;
}
private boolean determineMandatoryFlag() {
Boolean mandatory = this.properties.getTemplate().getMandatory();
return mandatory != null ? mandatory : this.properties.isPublisherReturns();
}
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(
prefix = "spring.rabbitmq",
name = {"dynamic"},
matchIfMissing = true
)
@ConditionalOnMissingBean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
}
@Configuration
@ConditionalOnMissingBean({ConnectionFactory.class})
protected static class RabbitConnectionFactoryCreator {
protected RabbitConnectionFactoryCreator() {
}
@Bean
public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties, ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception {
PropertyMapper map = PropertyMapper.get();
CachingConnectionFactory factory = new CachingConnectionFactory((com.rabbitmq.client.ConnectionFactory)this.getRabbitConnectionFactoryBean(properties).getObject());
properties.getClass();
map.from(properties::determineAddresses).to(factory::setAddresses);
properties.getClass();
map.from(properties::isPublisherConfirms).to(factory::setPublisherConfirms);
properties.getClass();
map.from(properties::isPublisherReturns).to(factory::setPublisherReturns);
org.springframework.boot.autoconfigure.amqp.RabbitProperties.Cache.Channel channel = properties.getCache().getChannel();
channel.getClass();
map.from(channel::getSize).whenNonNull().to(factory::setChannelCacheSize);
channel.getClass();
map.from(channel::getCheckoutTimeout).whenNonNull().as(Duration::toMillis).to(factory::setChannelCheckoutTimeout);
Connection connection = properties.getCache().getConnection();
connection.getClass();
map.from(connection::getMode).whenNonNull().to(factory::setCacheMode);
connection.getClass();
map.from(connection::getSize).whenNonNull().to(factory::setConnectionCacheSize);
connectionNameStrategy.getClass();
map.from(connectionNameStrategy::getIfUnique).whenNonNull().to(factory::setConnectionNameStrategy);
return factory;
}
private RabbitConnectionFactoryBean getRabbitConnectionFactoryBean(RabbitProperties properties) throws Exception {
PropertyMapper map = PropertyMapper.get();
RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
properties.getClass();
map.from(properties::determineHost).whenNonNull().to(factory::setHost);
properties.getClass();
map.from(properties::determinePort).to(factory::setPort);
properties.getClass();
map.from(properties::determineUsername).whenNonNull().to(factory::setUsername);
properties.getClass();
map.from(properties::determinePassword).whenNonNull().to(factory::setPassword);
properties.getClass();
map.from(properties::determineVirtualHost).whenNonNull().to(factory::setVirtualHost);
properties.getClass();
map.from(properties::getRequestedHeartbeat).whenNonNull().asInt(Duration::getSeconds).to(factory::setRequestedHeartbeat);
Ssl ssl = properties.getSsl();
if (ssl.isEnabled()) {
factory.setUseSSL(true);
ssl.getClass();
map.from(ssl::getAlgorithm).whenNonNull().to(factory::setSslAlgorithm);
ssl.getClass();
map.from(ssl::getKeyStoreType).to(factory::setKeyStoreType);
ssl.getClass();
map.from(ssl::getKeyStore).to(factory::setKeyStore);
ssl.getClass();
map.from(ssl::getKeyStorePassword).to(factory::setKeyStorePassphrase);
ssl.getClass();
map.from(ssl::getTrustStoreType).to(factory::setTrustStoreType);
ssl.getClass();
map.from(ssl::getTrustStore).to(factory::setTrustStore);
ssl.getClass();
map.from(ssl::getTrustStorePassword).to(factory::setTrustStorePassphrase);
ssl.getClass();
map.from(ssl::isValidateServerCertificate).to((validate) -> {
factory.setSkipServerCertificateValidation(!validate);
});
ssl.getClass();
map.from(ssl::getVerifyHostname).to(factory::setEnableHostnameVerification);
}
properties.getClass();
map.from(properties::getConnectionTimeout).whenNonNull().asInt(Duration::toMillis).to(factory::setConnectionTimeout);
factory.afterPropertiesSet();
return factory;
}
}
}
- 自动配置了连接工厂:RabbitConnectionFactory
- RabbitProperties封装了RabbitMQ的配置属性
- RabbitTemplate: 给RabbitMQ发送和接收消息
- AmqpAdmin是RabbitMQ是系统管理功能组件
- @EnableRabbit+@RabbitListener监听消息队列的内容
RabbitTemplate
rabbitTemplate.send(exchange,routeKey,message)
rabbitTemplate.receive(queueName)
Message需要自己构造一个:定义消息体内容和消息头
rabbitTemplate.convertAndSend(exchange,routeKey,object)
rabbitTemplate.receiveAndConvert(queueName)
object默认当作消息体,只需要传入要发送的对象,自动Java序列化发送给rabbitmq
- 自定义序列化(json)方式:MessageConverter
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
AmqpAdmin
- 创建交换器
@Autowired
AmqpAdmin amqpAdmin;
public void createExchange(){
amqpAdmin.declareExchange(new DirectExchange("exchangeDirect",true));
amqpAdmin.declareBinding(new Binding("destination",Binding.DestinationType.QUEUE,"exchange","routingKey"));
}
RabbitMQ监听
- @EnableRabbit: 在类上开启基于注解的RabbitMQ
- @RabbitListener: 配置RabbitMQ的监听
@Service
public class BookService {
@RabbitListener(queues = "queueName")
public void receive(){
System.out.println("监听收到的消息");
}
@RabbitListener(queues="queueName")
public void receiveMessage(Message message){
System.out.println(message.getBody());
System.out.println(message.getMessageProperties());
}
}