kafka是个啥?

前言

其实写这篇文章主要有两个目的,第一个还是学而时习之巩固总结一下之前学过的东西,第二个为后面的pulsar介绍做个铺垫,因为本人最近就是用pulsar对系统进行了重构,作为时下最流行的两款MQ,kafka具有广大的群众基础属于是老牌明星,而pulsar就属于是后起之秀,各有千秋,话不多说进入今天的正题~

消息队列能做什么

什么是消息队列

举个简单的例子吧,最近也看了新闻了听说某地的疫情十分严重,想着就提前在家屯点货已备不时之需吧,毕竟现在这个世界一切都很难说了,晚上kuakuakua的在网上买了很多应急食材,隔天快递小哥就打电话来了 : “喂,你好,是xxx吗,下午几点在家吗,我给你送东西过来”,我这边的回复是:”你好,我是xxx,我今天不在家哦,晚上下班我在家,你晚上再过来?”,快递员:”啊?不行哦,我晚上下班了,我只有下午是送货时间,要不周末再给您送?”,我:”不行不行,今晚必须送来!”,好家伙然后双方就像下面一样僵住了,来看下面的图 :

其实我们完全可以把上面的例子中我和山姆配送员当作是两个需要交互的系统来看,我有空的时候配送员没空,配送员有空的时候我没空,也就导致了上面僵住的局面,双方内心无数mmp,那怎么解决这个问题了?来看看下面的方案:

咋们每个小区不都有保安亭嘛l…山姆配送员在他有空的时候把东西放置在保安亭,然后他就可以去忙别的事情了,我咧等晚上下班了就去保安亭拿自己的东西,完事儿,两者皆大欢喜,如果当作系统来看,这个保安亭就是今天要介绍的消息队列MQ,那么有了这个消息队列有啥好处作用了?

消息队列的作用

  1. 异步

    山姆小哥打电话给我后需要一直在你楼下等着,直到我下楼面对面拿走快递再去配送其他人的。山姆小哥将快递放在保安亭后,立马可以去配送其他人的快递,不需要一直在楼下矗着等待我下楼。提高的效率不止一点半点。

  2. 解耦

    山姆小哥手上有很多人的快递都需要配送,他每次都需要先电话一一确认收货人是否有空、哪个时间段有空,然后再确定好送货的方案。这样完全依赖收货人了!

    如果快递一多,山姆小哥估计的忙疯了……如果有了保安亭,山姆小哥只需要将同一个片区的货物统一放置在对应保安亭然后一一通知收货人,然后通知收货人收到通知来取货就可以了,这时候山姆小哥和收货人就实现了完全解耦,谁都不需要依赖谁的时间是否有空。

  3. 削峰

    这点的体现在每年的电商节都是能看到的,每年双11都会有大量的订单产生,然后发货却如细水长流般,这便是削峰,先通过mq接收大流量的订单,然后后续再慢慢处理每一个订单。

所以我们能看到在系统需要交互的场景中,使用消息队列中间件真的是有很多好处的,但是任何一个东西有利便有弊,比如滥用mq也会增加系统复杂度,你的系统是否可以承受这样复杂度的维护,所以我们的原则应该是在合适的场景选择合适的组建来解决我们的问题才是正解。

通信模式

正所谓无规矩不成方圆,消息队列也有自己的行规,总体就有以下两种模式:

点对点模式

如下图所示,点对点模式通常是基于拉取或者轮询的消息传送模型,这个模型的特点是发送到队列的消息被一个且只有一个消费者进行处理。生产者将消息放入消息队列后,由消费者主动的去拉取消息进行消费。它的优点是消费者拉取消息的频率可以由自己控制。但是消息队列是否有消息需要消费,在消费者端无法感知,所以在消费者端需要额外的线程去监控。

发布订阅模式

如下图所示,发布订阅模式是一个基于消息送的消息传送模型,该模型可以有多种不同的订阅者。生产者将消息放入消息队列后,队列会将消息推送给订阅过该类消息的消费者(比如关注了公众号,关注了主播你会第一时间接收到他们的发布内容)由于是消费者被动接收推送,所以无需感知消息队列是否有待消费的消息!但是consumer1、consumer2、consumer3由于机器性能不一样,所以处理消息的能力也会不一样,但消息队列却无法感知消费者消费的速度!所以推送的速度成了发布订阅模模式的一个问题!假设三个消费者处理速度分别是10M/s、5M/s、2M/s,如果队列推送的速度为6M/s,则consumer2和consumer3无法正常处理该队列的消息内容的,如果队列推送的速度为1M/s,这时三者都能正常处理但是咧consumer1、consumer2又会出现资源的极大浪费!

Kafka

上面我们对消息队列有了一个大体的认识,了解了它的一些作用和工作模式,那么接下来就轮到今天的主角登场了~

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展能力(虽然它的横向扩展能力不太得劲儿)……

基础架构及术语

先来看本人整理的一张结构图,来大致了解一下kafka的整体架构,看不懂没关系,我们先来听个响 :

有点懵逼是正常的,容我先来对里面的这些概念进行一下解释,应该就会有一个清晰的理解了:

Producer:生产者,消息的产生者也可以叫投递者,这个角色就是开局例子里的山姆小哥,是消息的起源。

Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们暂且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1、broker-2等

kafka cluster:kafka集群,由上面的broker组成,主要就是为了系统的高可用,消息不丢失。

Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。开局的例子中我们可以理解多个临近的小区构成一个topic。

Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!开局的例子中我们可以理解我所在的小区就是一个partition

Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。这个在开局的例子中并没有很好的对应的对象,这属于是kafka为了实现高可用的一种兜底机制。

Message:每一条发送的消息主体。可以理解就是我买的应急物资。

Consumer:消费者,即消息的消费方,可以理解就是开局例子里面的我,是消息的终点。

Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!这个也很好理解我所在的那个小区的其他Consumer,我们共同组建了一个Consumer Group!

Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。它内部通过zab协议来实现,有兴趣的朋友可以自行去了解下。

通过本人这样一类比介绍,相信各位一定对上图有了一定的理解了吧,其实说的简单一点,消息队列无非就是三个核心:写消息、存消息、消费消息。那么下面就来一一介绍一下:

怎么写消息

请看下图为本人整理的写入流程,producer其实只会与Leader进行直接交互,他并不关心Follower的状态,换句话说producer只关注Leader的状态和地址,至于kafka内部是如何选处leader的它并不关心,反正他发送之前会先去集群拉取对应leader的信息

整个的发送的流程就在上图中展示的很明显了,我就不再重述一遍了,值得注意的是第四点消息写入leader后,follower是主动的去leader进行同步的!producer使用的是push模式将数据发送给broker,每条消息追加到分区中,顺序写入磁盘,注意了这一点是kafka之所以那么快的一项保证,kafka在写入的过程中采用了mmap+顺序写(减少了磁盘旋转与寻道的时间,这并不是本文的终点所以不会展开说),所以保证同一分区内的数据是有序的!写入示意图如下:

细心的小伙伴其实应该也发现问题了,不是有了topic了嘛,为啥还要搞一个partition出来了?其实分区的主要目的是:

1、 方便扩展。因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。

2、 提高并发。以partition为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。一台机器的性能是有限的,当一个机器处理不了当前消息量的时候,我们就可以水平扩展出多台机器同时进行处理。

诶,细心的朋友这时候又会提问了,既然可以扩展出多台机器,那我咋知道哪台机器应该去处理哪个partition里面的消息了?这样其实就是一个负载均衡的问题了,在kafka里面其实这里是有一些约定原则的,如下 :

kafka中有几个原则:

1、 partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。

2、 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。

3、 如果既没指定partition,又没有设置key,则会轮询选出一个partition。

kafka如何保证消息不丢失:

安全性是每个mq首当其冲应该保证的问题,如果连投递的消息安全送达到消费者手中都无法保证那这个消息队列存在也就毫无意义,就好像你买了应急物资却一直收不到货但是商场那边显示是已送达~,那kafka是怎么做到消息发送后不丢失的了

上图中的的写入流程其实有写出来哈,其实就是通过ACK应答机制~!在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为01all

0代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。

1代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。

all代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。总体而言,安全和效率并不能两全其美,就像鱼与熊掌不可兼得一般,应该有所取舍。

最后要注意的是,如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量根据默认配置都是1。

怎么存消息

好,上面我们讲述了消息写入的全过程,那写入后的消息是如何存放的了,山姆小哥将我购买的应急物资难道就是随手一扔?答案肯定是否哈,生产者将数据写入broker后,此时cluster就需要对数据进行存储了,毫无疑问数据是被存在硬盘里的,只有这样才能保证数据在broker发生异常时不丢失数据,可能在我们的一般的认知里,写入磁盘是比较耗时的操作,不适合这种高并发的组件。但是上面写消息的时候已经讲过了 MMAP + 顺序写 是kafka高性能的保证之一,在消费消息的时候其实还有一个sendfile的零拷贝方案也是其高性能的保证之一,这个我们后面有机会再说。Partition 结构前面说过了每个topic都可以分为一个或多个partition,如果觉得topic比较抽象,那partition就是实实在在的东西!Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。

来看上图,这个partition 0有三组segment文件,每个log文件的大小是一样的,但是存储的message数量是不一定相等的(每条的message大小不一致)。文件的命名是以该segment最小offset来命名的,如000.index存储offset为0~555555的消息,kafka就是利用分段+索引的方式来解决查找效率的问题。

Message结构上面说到log文件就实际是存储message的地方,我们在producer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!

名词解释

1、 offset:又叫偏移量是一个占8byte的有序id号,它可以唯一确定每条消息在parition内的位置!通过起始位置+偏移量可以迅速获取到对应消息的起始位置。bingo~

2、 消息大小:消息大小占用xxbyte,用于描述消息内容实际占用的空间大小。

3、 消息体:消息体存放的是实际的消息数据(被压缩过,后面有时间我也会出一篇关于压缩算法的文章十分有趣哈),占用的空间根据具体的消息而不一样。

存储策略

写到这里,醒目的小伙伴应该会有一个疑问了,生产者发了那么多消息到broker集群,先不管有没有consumer对其进行消费处理,那这些消息会一直存在吗?如果一直存在broker总有撑爆的一天吧?那kafka也是有对应的策略去处理过期消息的,kafka会在以下两种方案中择其一来对数据进行删除:

1、 基于时间,默认配置是168小时(7天)。也就是说kafka的内部线程会对存在超过168小时的数据进行删除。

2、 基于大小,默认配置是1073741824。需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能!

如何消费消息

上面我们讲解了消息是如何发送和存储的,那么接下来就是最后的消费重头戏了,消息存储在log文件后,consumer们就可以对其进行消费了。kafka的consumer会主动的去kafka集群拉取消息,消费者在拉取消息的时候也是只会关注Leader的,这点与producer完全一致,consumer通常也不是孤军奋战的,它们通常都是结伴而行,多个consumer可以组成一个消费者组(consumer group),每个消费者组都有一个组id(唯一id)!同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是永远不会出现一个组内多个消费者对同一分区的数据进行消费~,来看我整理的下图进行加深理解 :

图中一个的topic有4个partition,但是对应的消费者组中只有3个消费者,这时就会出现上图中出现的情况,最后的consumer2承担了两个partition的消费工作,那如果是消费者组的消费者多于partition的数量,那会不会出现多个消费者消费同一个partition的数据呢?答案是不会的,多出来的消费者不消费任何partition的数据,它就像一个废人一样存在的毫无意义,所以在实际的应用中,应该将消费者组的consumer的数量与partition的数量保持一致!在保存数据的小节里面,我们聊到了partition划分为多组segment,每个segment又包含.log、.index、.timeindex文件,存放的每条message包含offset、消息大小、消息体。

其实我们已经不止一次的提到segment和offset,那么我们最后就来看看kafka是如何通过这两个参数快速定位到对应消息的。

消息查找策略

1、 先找到offset为555560的message所在的segment文件(利用二分法查找),这里找到的就是segment 2文件夹。

2、 打开找到的segment中的.index文件(也就是555555.index文件,该文件起始偏移量为55555+1,我们要查找的offset为555560的message在该index内的偏移量为55555+5=555560,所以这里要查找的相对offset为5)。由于该文件采用的是稀疏索引的方式存储着相对offset及对应message物理偏移量的关系,所以直接找相对offset为5的索引找不到,这里同样利用二分法查找相对offset小于或者等于指定的相对offset的索引条目中最大的那个相对offset,所以找到的是相对offset为4的这个索引。

3、 根据找到的相对offset为4的索引确定message存储的物理偏移位置为256。打开数据文件,从位置为256的那个地方开始顺序扫描直到找到offset为555560的那条Message。

这整套机制是建立在offset为有序的基础上,如果offset无序那这套机制也就毫无卵用了,利用segment+有序offset+稀疏索引+二分查找+顺序查找等多种手段来高效的查找数据!

Ok,通过以上三步consumer就能准确的拿到需要处理的message进行处理了。接下来就只需要记录下这个位置然后下次继续从此处往后进行消费就可以了,那最后的问题又来了到底该如何存放这个offset位置的值了?直接公布答案,其实在早期的版本中,消费者将消费到的offset维护zookeeper中,consumer每间隔一段时间上报一次,这里容易导致重复消费,且性能不好!在新的版本中消费者消费到的offset已经直接维护在kafka集群的_consumeroffsets这个topic中!

总结

噗,终于讲完了~,希望本文能让你对kafka有一个全面且深刻的了解,其实kafka里面还有很多知识这里篇幅有限不能一一讲解,譬如rebalance又譬如零拷贝等等,有兴趣的可以自行去学习了解一下哈,毕竟本人是为了分享pulsar才拿kafka做铺垫的哈,后面pulsar见,完结撒花~。