什么是消息队列
为什么需要消息队列?消息队列解决了什么问题
- 应用解耦
- 异步处理
- 流量削峰
常见消息队列
组件 | 推出时间 | 所属 | 开发语言 |
Kafka | 2012年 | Linkin开源,Apache | Scala和Java |
Pulsar | 2016年 | Yahoo开源,Apache | Java |
RocketMQ | 2012年 | 阿里开源,Apache | Java |
RabbitMQ | 2007年 | Pivotal开源,Mozilla | Erlang |
NSQ | 2013年 | itly开源,MIT | Go |
Kafka介绍
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
架构
概念
- Producer:生产者,负责将客户端生产的消息发送到 Kafka 中,可以支持消息的异步发送和批量发送。
- broker:服务代理节点,Kafka 集群中的一台服务器就是一个 broker,可以水平无限扩展,同一个 Topic 的消息可以分布在多个 broker 中。
- Consumer:消费者,通过连接到 Kafka 上来接收消息,用于相应的业务逻辑处理。
- Consumer Group:消费者组,指的是多个消费者共同组成一个组来消费一个 Topic 中的消息。
- ZooKeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。
分区创建
指定分区数partitions、副本数replication-factor
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic input_topic
自定义副本分配方案:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replica-assignment 0:1:2,3:4:5,6:7:8 --topic input_topic
如:0:1:2,3:4:5,6:7:8 代表第一个分区的副本放在0,1,2这三台broker中,第二个分区的副本放在3,4,5这三台broker中,第三个分区的副本放在6,7,8这三台broker中。
分区副本分配
1、副本分配原则
- 将副本平均分配给Broker
- 对于分配给特定的Broker分区,该分区的其他副本分布在其他Broker上
- 如果所有Broker都有机架信息,则尽可能将每个分区的的副本分配给不同的机架
机架感知的意思是Kafka会把partition的各个replicas分散到不同的机架上(不同的网络区域或地域),以提高机架故障时的数据安全性。可以通过修改server.properties来指定broker属于哪个特定的机架(组):broker.rack=rack-id-n
2、 默认分区副本分配方案:
1、无机架 从Broker列表中随机选取一个Broker作为起始位置,通过轮询分配每个分区的第一个副本 以增加位移的方式为每个分区分配剩余的副本 如下图,3个分区,3个副本。第一个Leader分区被分配到broker1上,那么其他两个分区会分配到broker2和broker3上
2、有机架 如果配置了机架,就不是按照broker顺序来了,而是按照交替机架的方式来选择broker。例如broker0、broker1、broker2放置在同一个机架1上,broker3、broker4、broker5放置在同一个机架2上。分区的选择不是按照0、1、2、3、4、5这样的顺序进行选择,而是按照0、3、1、4、2、5这样的顺序进行选择,然后应用简单的循环分配方式。这样做的好处是:如果有机架下线,仍然能够保证可用性。
消息发送
过程
producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。
- 指定了 patition,则直接使用。
- 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition。
- patition 和 key 都未指定,使用轮询选出一个 patition。
从0.8版和0.9版开始,客户端直接从Kafka brokers那里获取元数据信息,集群中的每个 broker 都会缓存所有主题的分区副本信息,元数据同步可以通过配置metadata.max.age.ms参数(默认五分钟)定时刷新元数据
详细流程图:
生产者相关参数
- batch.size(默认16KB)当多条消息被发送到同一个分区时,生产者会尝试把多条消息变成批量发送。这有助于提高客户端和服务器的性能。此配置以字节为单位设置默认批处理大小。如果消息大于此配置的大小,将直接发送。发送到broker的请求将包含多个批处理,每个分区一个批处理,其中包含可发送的数据。如果此参数值设置的太小,可能会降低吞吐量(批量大小为零将完全禁用批处理)。如果此参数设置的太大,可能会更浪费内存,并增加消息发送的延迟时间。
- linger.ms这个参数一般会配合batch.size一起使用,可以通过设置linger.ms的值来表示,如果消息的大小一直达不到batch.size设置的值,那么等待多久后任然允许发送消息,默认是不等待,即消息到来就发送。当我们发送的消息都比较小的时候,可以通过设置linger.ms来减少请求的次数,批次中累积更多的消息后再发送,提高了吞吐量,减少了IO请求。如果设置的太大,则消息会被延迟更长的时间发送。
- max.request.size(默认1M)设置请求消息的最大大小,避免发送大量的请求,限制了单条消息的size与批次消息的size,如果改变此值,需要注意服务器也需要进行相应设置,因为服务器也有接收消息的大小限制。
- retries生产者发送时如果遇到的是可重试的异常时,则可进行发送的重试,此参数规定了重试的次数,默认值为Integer.MAX_VALUE。
- retry.backoff.ms 重试失败执行的频率,默认100ms
- max.in.flight.requests.per.connection(有序需要配置为1)一个消息发送后在得到服务端响应之前,生产者还可以发送的消息条数,配合retries使用,可以保证消息的顺序性,假设有两条消息A、B,A先发送但失败了在执行重试时,B发送且成功了,之后A也重试成功了,此时A、B消息顺序就反了,如果将此参数设置为1,则可以保证A在重试时,B消息无法进行发送,必须等A收到broker响应后B才能发送,设置较高可以提升吞吐量,但会占用更多的内存,此参数值默认是5条。
- acks(默认值1)生产者在确认请求完成之前要求leader已收到的确认数。这控制了发送的消息的持久性。0:生产者只要把消息发送出去即可,不用等待broker的处理结果,消息将立即添加到socket buffer并被视为已发送。在这种情况下,无法保证服务器已收到消息,并且retries配置将不会生效(因为客户端通常不会知道任何故障)。为每条消息返回的偏移量将始终设置为-1。设置为0,吞吐量最高,同样消息的丢失率也最高。1:生成者需要等分区leader将消息写入成功后才认为此消息发送成功,兼顾了吞吐量和消息丢失的问题,但是同样有消息丢失的风险,比如当leader写入成功后突然挂了,其他分区跟随者并为能够将此消息同步,则此消息丢失。-1:生产者会等待所有的副本都写入成功后才认为此消息发送成功,只要至少有一个同步副本保持活跃状态,消息就不会丢失,这是最安全的保障,是吞吐量最低的。
- buffer.memory(默认32MB)生产者可用于缓冲等待发送到服务器的记录的内存总字节数,如果客户端send的速度大于发送到broker的速度,且积压的消息大于这个设置的值,就会造成send阻塞,阻塞时间为max.block.ms设置的值,如果超过时间就抛出异常。配合压测可以调试出一个合理的大小。
- max.block.ms (默认60s)当执行KafkaProducer.send() 或KafkaProducer.partitionsFor()时阻塞等待的时间,之所以会阻塞是因为可能buffer满了或者获取元数据异常,那么超过这个时间就会抛出异常。
消息存储Kafka
参数log.dirs配置了数据文件的存储目录,每个分区的每个patition为一个目录
存储形式
kafka采取分片和索引机制,每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件,Producer生产的数据会被不断追加到该log文件末端。
segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充索引文件采用稀疏索引存储offset到position(物理地址为position+文件起始的地址)的映射。
副本机制(保证HA)
每个分区都有各自的主副本和从副本。主副本叫做leader,从副本叫做 follower(消费者和生产者都是从leader读写数据,不与follower交互)
- ISR:表示和 Leader 保持同步的 Follower 集合。如果 Follower 在replica.lag.time.max.ms时间范围内未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。Leader 发生故障之后,就会从 ISR 中选举新的Leader。
- OSR:表示 Follower 与 Leader 副本同步时,延迟过多的副本。
- AR: 分区的所有副本 =ISR + OSR
故障处理:
- LEO(Log End Offset):每个副本的最后一个offset,LEO就是最新的offset + 1。
- HW(High Watermark):水位线,所有副本中最小的LEO ,消费者只能看到这个水位线左边的消息,从而保证数据的一致性。
Follower 发生故障时,会被踢出ISR,并且Leader和其他的Follower会继续接收请求。等Follower恢复后,会将HW之后的数据截取掉,从HW开始向Leader同步。当该Follwer的LEO追上Leader的HW,就可以重新加入ISR。
如果Leader发生故障,会从ISR中选举出新的Leader。之后Follower会将log文件高于HW的部分截取掉,之后从新的Leader同步数据。
Broker相关参数
Jvm Heap Size堆大小配置: KAFKA_HEAP_OPTS="-Xmx16G -Xms16G” 建议大小不超过主机内存的50%
- auto.create.topics.enable是否允许自动创建topic,默认为自动创建,一般建议设置为不允许自动创建。
- unclean.leader.election.enable(默认false)这个参数表明了什么样的副本才有资格竞争Leader,如果设置成false,则表示对于落后太多的副本是没有资格竞选Leader的;如果是true,那么则有可能选出一个差很多数据的副本成为Leader,从而造成了数据丢失。建议维持false。
- replica.lag.time.max.ms表示Follower副本能够落后Leader副本的最长时间间隔,默认值为10秒,只要不超过10秒,就不算是落后的副本。
- auto.leader.rebalance.enable如果把auto.leader.rebalance.enable设置为true,则Kafka会定期的对Topic的分区进行Leader选举,他很有可能会莫名其妙的把原本好好的Leader换掉,此参数值默认为true,一般建议改为false。
- message.max.bytes这个参数表示Broker能够处理的最大消息大小,默认为1M,建议适当调大一些
- replication.factor(默认1)这个参数用来表示分区的副本数,建议大于等于3(<=broker 数),确保Kafka的高可用性。
- min.insync.replicas这个参数定义了消息至少要被写入多少个副本才算是“已提交”,默认是1,建议设置成大于1。注意:min.insync.replicas只有在ack设置成-1时才会生效。
- num.io.threads表示每台Broker启动后自动创建的I/O线程数量,I/O线程是真正处理请求的线程,默认是8个。建议配置数量为cpu核数2倍,最大不超过3倍。
- num.network.threads表示每台Broker启动时专门用于从网络接收请求并向网络发送响应的线程数量,默认是3个。建议配置数量为cpu核数加1
- log.retention.hours日志保留时间,建议保留3~7天
- log.segment.bytes 段文件建议配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快。如果文件过小,则文件数量比较多。
消息消费
消费者组初始化过程
Coordinator:辅助实现消费者组的初始化和分区的分配。
Coordinator节点选择 = groupid的hashcode值 % 50( __consumer_offsets 主题的分区数量(offsets.topic.num.partitions配置),默认3副本(offsets.topic.replication.factor参数))
例如: groupid的hashcode值 = 1,1% 50 = 1,那么__consumer_offsets 主题的1号分区,所在的leader在哪个broker上,就选择这个节点作为消费者组的coordinator。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。
Kafka Offset 默认情况下是保存在Kafka 的内部主题"__consumer_offsets" 中
分区分配策略
当分区数增加或有新的消费者加入消费者组时,会触发reblance。期间消费会暂停,生产环境应该尽量避免。消费者客户端参数 partition.assignment.strategy 可用来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为 org.apache.kafka.clients.consumer.RangeAssignor
RangeAssignor分配策略
对于每一个topic,假设 n = 分区数/消费者数量,m = 分区数%消费者数量,那么前 m 个消费者每个分配 n+1 个分区,后面的(消费者数量- m)个消费者每个分配 n 个分区。
假设消费组内有2个消费者 C0 和 C1,都订阅了主题 t0 和 t1,并且每个主题都有4个分区,那么订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t0p3、t1p0、t1p1、t1p2、t1p3。最终的分配结果为:
消费者C0:t0p0、t0p1、t1p0、t1p1
消费者C1:t0p2、t0p3、t1p2、t1p3
假设上面例子中2个主题都只有3个分区,那么订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:
消费者C0:t0p0、t0p1、t1p0、t1p1
消费者C1:t0p2、t1p2
RoundRobinAssignor分配策略
RoundRobinAssignor 分配策略的原理是将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。 如果同一个消费组内所有的消费者的订阅信息都是相同的,那么 RoundRobinAssignor 分配策略的分区分配会是均匀的。
举个例子,假设消费组中有2个消费者 C0 和 C1,都订阅了主题 t0 和 t1,并且每个主题都有3个分区,那么订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:
消费者C0:t0p0、t0p2、t1p1
消费者C1:t0p1、t1p0、t1p2
StickyAssignor分配策略(粘性,最优)
Kafka 从 0.11.x 版本开始引入这种分配策略,它主要有两个目的:
- 分区的分配要尽可能均匀。
- 分区的分配尽可能与上次分配的保持相同。 假设消费组内有3个消费者(C0、C1 和 C2),它们都订阅了4个主题(t0、t1、t2、t3),并且每个主题有2个分区。也就是说,整个消费组订阅了 t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1 这8个分区。最终的分配结果如下:
消费者C0:t0p0、t1p1、t3p0
消费者C1:t0p1、t2p0、t3p1
消费者C2:t1p0、t2p1
看上去似乎与采用 RoundRobinAssignor 分配策略所分配的结果相同。再假设此时消费者 C1 脱离了消费组,那么消费组就会执行rebalance,进而消费分区会重新分配。如果采用 RoundRobinAssignor 分配策略,那么此时的分配结果如下:
消费者C0:t0p0、t1p0、t2p0、t3p0
消费者C2:t0p1、t1p1、t2p1、t3p1
如分配结果所示,RoundRobinAssignor 分配策略会按照消费者 C0 和 C2 进行重新轮询分配。如果此时使用的是 StickyAssignor 分配策略,那么分配结果为:
消费者C0:t0p0、t1p1、t3p0、t2p0
消费者C2:t1p0、t2p1、t0p1、t3p1
消费过程
消费者相关参数
- max.poll.records单次poll()的调用可返回的最大消息总数,默认是500条。这个取值的大小,将会影响一次poll()所需消耗的时间。
- max.poll.interval.ms当调用poll()之后,如果在max.poll.interval.ms指定的时间内未消费完消息,也就是未再次调用poll()方法,则Consumer会主动发起离开组的请求,从而产生Rebalance。因此max.poll.interval.ms参数是会对Rebalance产生影响的,默认是5分钟,可根据实际消费能力适当调整。
- session.timeout.ms这个参数是用于检测当多长时间内没有收到心跳,就认为其是有故障的,从而将其中Group中移除,产生Reblance,需要注意,该值必须在broker配置group.min.session.timeout.ms与group.max.session.timeout.ms的范围内。显然session.timeout.ms参数也会对Rebalance产生影响,其默认值是10秒。
- heartbeat.interval.msConsumer的心跳频率,设置的越小频率就越高,对带宽的消耗也就越大,相反则不能快速发现实例是否已经“挂”了,通常情况下建议heartbeat.interval.ms设置为session.timeout.ms的三分之一,默认是3秒,如果session.timeout.ms有调整,记得heartbeat.interval.ms也需要一起调整。
- enable.auto.commit这个参数表示Consumer会自动定期的提交位移,默认为自动提交,自动提交虽然方便,但如果控制不好,很容易造成消息丢失、或者消息重复,所以一般建议改为手动提交。
- auto.commit.interval.ms结合enable.auto.commit开启自动提交,auto.commit.interval.ms就用来控制自动提交的频率,默认是5秒
- fetch.min.bytes这是一个调优相关的参数,默认为1字节,表示如果请求的数据量小于1字节,broker就是攒一攒,等足够1字节了再一起返回给Consumer,这个值建议可以适当调大一点,以提高服务的吞吐量。
- fetch.max.wait.ms如果在fetch.max.wait.ms指定的时间内,数据量依然没有达到fetch.min.bytes所设置的值,那broker也不会再等了,将直接返回数据给Consumer,此值默认为500ms
高吞吐的原因
- 日志结构的持久性。Kafka 利用了一种分段式的、只追加 (Append-Only) 的日志,基本上把自身的读写操作限制为顺序 I/O,也就使得它在各种存储介质上能有很快的速度
- 日志记录批处理。Kafka 的 Clients 和 Brokers 会把多条读写的日志记录合并成一个批次,然后才通过网络发送出去。
- 分区并行
- 零拷贝 (java FileChannel.transferto)
数据从Producer到Broker,需要将来自网卡的消息持久化的磁盘中,Kafka中采用mmap的方式写,并且不会立即持久化到磁盘中,而是存入page cache内核缓冲区中就直接返回成功。后续有消费者来拉取消息的时候,也是先从缓冲区中查找消息,如果有就直接发送给消费者,不会再查找磁盘,又提升了拉消息的性能。
数据从Broker到Consumer,需要将磁盘中的消息通过网卡发送出去,Kafka中采用sendfile的方式,将磁盘文件读到OS内核缓冲区后,直接转到socket buffer进行网络发送
参考资料
https://www.cnblogs.com/hongdada/p/16935601.html
https://blog.csdn.net/shipfei_csdn/article/details/103919110
https://blog.csdn.net/CSDN_WYL2016/article/details/128361676
https://www.cnblogs.com/alvinscript/p/17442726.html