消息队列 RabbitMQ基础知识

消息队列

使用消息队列主要有两点好处:

  1. 通过异步处理提高系统性能(削峰、减少响应所需时间);
  2. 降低系统耦合性。

通过异步处理提高系统性能(削峰、减少响应所需时间)

在不使用消息队列服务器的时候,用户的请求数据直接写入数据库,在高并发的情况下数据库压力剧增,使得响应速度变慢。但是在使用消息队列之后,用户的请求数据发送给消息队列之后立即 返回,再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列服务器处理速度快于数据库(消息队列也比数据库有更好的伸缩性),因此响应速度得到大幅改善。

因为用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中可能失败。因此使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。这就类似我们平时手机订火车票和电影票。

降低系统耦合性

对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。

另外为了避免消息队列服务器宕机造成消息丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等消息真正被消费者服务器处理后才删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息。

备注: 不要认为消息队列只能利用发布-订阅模式工作,只不过在解耦这个特定业务环境下是使用发布-订阅模式的。除了发布-订阅模式,还有点对点订阅模式(一个消息只有一个消费者),我们比较常用的是发布-订阅模式。 另外,这两种消息模型是 JMS 提供的,AMQP 协议还提供了 5 种消息模型

使用消息队列带来的一些问题

  • 系统可用性降低: 系统可用性在某种程度上降低,为什么这样说呢?在加入MQ之前,你不用考虑消息丢失或者说MQ挂掉等等的情况,但是,引入MQ之后你就需要去考虑了!
  • 系统复杂性提高: 加入MQ之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消- 息传递的顺序性等等问题!
  • 一致性问题: 我上面讲了消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了!

JMS

JMS(JAVA Message Service,java消息服务)是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。JMS(JAVA Message Service,Java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。 ActiveMQ 就是基于 JMS 规范实现的。

JMS两种消息模型

①点到点(P2P)模型

(Queue)作为消息通信载体;满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队
列中保留直到被消费或超时。比如:我们生产者发送100条消息的话,两个消费者来消费一般情况下两个消费者会按
照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)

② 发布/订阅(Pub/Sub)模型

(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的。

JMS 五种不同的消息正文格式

JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现
有消息格式的一些级别的兼容性。 StreamMessage – Java原始值的数据流 MapMessage–一套名称-值对 TextMessage–一个字符串对象 ObjectMessage–一个序列化的 Java对象 BytesMessage–一个字节的数据流

RabbitMQ

AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦和通讯。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用 Erlang 语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,具有很高的易用性和可用性。

ConnectionFactory、Connection、Channel 都是RabbitMQ对外提供的API中最基本的对象。

ConnectionFactory:ConnectionFactory为Connection的制造工厂。

Connection:Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。

Channel(信道):信道是建立在“真实的”TCP连接上的虚拟连接,在一条TCP链接上创建多少条信道是没有限制的,把他想象成光纤就是可以了。它是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

image
queue队列
image
生产者Send Message “A”被传送到Queue中,消费者发现消息队列Queue中有订阅的消息,就会将这条消息A读取出来进行一些列的业务操作。这里只是一个消费正对应一个队列Queue,也可以多个消费者订阅同一个队列Queue,当然这里就会将Queue里面的消息平分给其他的消费者,但是会存在一个一个问题就是如果每个消息的处理时间不同,就会导致某些消费者一直在忙碌中,而有的消费者处理完了消息后一直处于空闲状态,因为前面已经提及到了Queue会平分这些消息给相应的消费者。这里我们就可以使用prefetchCount来限制每次发送给消费者消息的个数。详情见下图所示:
image
生产者在将消息发送给 Exchange 的时候,一般会指定一个 routing key,来指定这个消息的路由规则。 Exchange 会根据 routing key 和 Exchange Type(交换器类型) 以及 Binding key 的匹配情况来决定把消息路由到哪个 Queue。RabbitMQ为routing key设定的长度限制为255 bytes。

RabbitMQ常用的Exchange Type有 Fanout、 Direct、 Topic、 Headers 这四种。

Fanout

这种类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,这时 Routing key 不起作用。
image

Direct

这种类型的Exchange路由规则也很简单,它会把消息路由到那些 binding key 与 routing key完全匹配的Queue中。
image
当生产者(P)发送消息时Rotuing key=booking时,这时候将消息传送给Exchange,Exchange获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的Queue,这时发现Queue1和Queue2都符合,就会将消息传送给这两个队列,如果我们以Rotuing key=create和Rotuing key=confirm发送消息时,这时消息只会被推送到Queue2队列中,其他Routing Key的消息将会被丢弃。

Topic

这种类型的Exchange的路由规则支持 binding key 和 routing key 的模糊匹配,会把消息路由到满足条件的Queue。 binding key 中可以存在两种特殊字符 *#,用于做模糊匹配,其中 *用于匹配一个单词,# 用于匹配0个或多个单词,单词以符号.为分隔符。
image

Headers

这种类型的Exchange不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配,对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

Message durability(消息的持久化)

如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为RabbitMQ的简单介绍,所以这里将不讲解RabbitMQ相关的事务。具体可以参考 RabbitMQ之消息确认机制(事务+Confirm)


接口优化===同步下单改为异步下单
收到请求后由redis先预检库存,不足的话直接返回,减少数据库访问
如果有库存,则请求放到消息队列。返回一个排队中
请求出队。客户端轮询是否秒杀成功

Redis与RabbitMQ作为消息队列的比较

可靠消费

Redis:没有相应的机制保证消息的消费,当消费者消费失败的时候,消息体丢失,需要手动处理
RabbitMQ:具有消息消费确认,即使消费者消费失败,也会自动使消息体返回原队列,同时可全程持久化,保证消息体被正确消费

可靠发布

Reids:不提供,需自行实现
RabbitMQ:具有发布确认功能,保证消息被发布到服务器

高可用

Redis:采用主从模式,读写分离,但是故障转移还没有非常完善的官方解决方案
RabbitMQ:集群采用磁盘、内存节点,任意单点故障都不会影响整个队列的操作

持久化

Redis:将整个Redis实例持久化到磁盘
RabbitMQ:队列,消息,都可以选择是否持久化

消费者负载均衡

Redis:不提供,需自行实现
RabbitMQ:根据消费者情况,进行消息的均衡分发

队列监控

Redis:不提供,需自行实现
RabbitMQ:后台可以监控某个队列的所有信息,(内存,磁盘,消费者,生产者,速率等)

流量控制

Redis:不提供,需自行实现
RabbitMQ:服务器过载的情况,对生产者速率会进行限制,保证服务可靠性

出入队性能

对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。
测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。

注:此数据来源于互联网,部分数据有误,已修正


应用场景分析

Redis:轻量级,高并发,延迟敏感
即时数据分析、秒杀计数器、缓存等

RabbitMQ:重量级,高并发,异步
批量数据异步处理、并行任务串行化,高负载任务的负载均衡等

重复消费问题

保证消息不被重复消费的关键是保证消息队列的幂等性,这个问题针对业务场景来答分以下几点:

  1. 比如,你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

  2. 再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。

  3. 如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

数据丢失问题

生产者丢数据

生产者的消息没有投递到MQ中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。

transaction机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。

然而缺点就是吞吐量下降了。因此,按照博主的经验,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

消息队列丢数据

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步

①、将queue的持久化标识durable设置为true,则代表是一个持久的队列

②、发送消息的时候将deliveryMode=2

这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)

消费者丢数据

启用手动确认模式可以解决这个问题

① 自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。

②手动确认模式,如果消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其他消费者;如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常;如果对异常进行了捕获,但是没有在finally里ack,也会一直重复发送消息(重试机制)。

③不确认模式,acknowledge=”none” 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。

如何保证消息的顺序性?

针对这个问题,通过某种算法,将需要保持先后顺序的消息放到同一个消息队列中。然后只用一个消费者去消费该队列。同一个queue里的消息一定是顺序消息的。我的观点是保证入队有序就行,出队以后的顺序交给消费者自己去保证,没有固定套路。例如B消息的业务应该保证在A消息后业务后执行,那么我们保证A消息先进queueA,B消息后进queueB就可以了。

kafka和rabbitmq对比

吞吐量

kafka吞吐量更高:
1)Zero Copy机制,内核copy数据直接copy到网络设备,不必经过内核到用户再到内核的copy,减小了copy次数和上下文切换次数,大大提高了效率。
2)磁盘顺序读写,减少了寻道等等的时间。
3)批量处理机制,服务端批量存储,客户端主动批量pull数据,消息处理效率高。
4)存储具有O(1)的复杂度,读物因为分区和segment,是O(log(n))的复杂度。
5)分区机制,有助于提高吞吐量。

可靠性

rabbitmq可靠性更好:
1)确认机制(生产者和exchange,消费者和队列);
2)支持事务,但会造成阻塞;
3)委托(添加回调来处理发送失败的消息)和备份交换器(将发送失败的消息存下来后面再处理)机制;

高可用

1)rabbitmq采用mirror queue,即主从模式,数据是异步同步的,当消息过来,主从全部写完后,回ack,这样保障了数据的一致性。
2)每个分区都可以有一个或多个副本,这些副本保存在不同的broker上,broker信息存储在zookeeper上,当broker不可用会重新选举leader。
kafka支持同步负责消息和异步同步消息(有丢消息的可能),生产者从zk获取leader信息,发消息给leader,follower从leader pull数据然后回ack给leader。

负载均衡

1)kafka通过zk和分区机制实现:zk记录broker信息,生产者可以获取到并通过策略完成负载均衡;通过分区,投递消息到不同分区,消费者通过服务组完成均衡消费。
2)需要外部支持。

模型

1)rabbitmq:
producer,broker遵循AMQP(exchange,bind,queue),consumer;
broker为中心,exchange分topic,direct,fanout和header,路由模式适合多种场景;
consumer消费位置由broker通过确认机制保存;
2)kafka:
producer,broker,consumer,未遵循AMQP;
consumer为中心,获取消息模式由consumer自己决定;
offset保存在消费者这边,broker无状态;
消息是名义上的永久存储,每个parttition按segment保存自己的消息为文件(可配置清理周期);
consumer可以通过重置offset消费历史消息;
需要绑定zk;

综上,kafka和rabbitmq适应场景不同,kafka适用于高吞吐量场景,rabbitmq适用于对可靠性要求高的场景,综合来讲kafka由于其超高的效率和offset、分区的灵活性,更多的得到了开发者的青睐。