迷途小书童

读过几年书,尘世间一枚不起眼的小书童


  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

  • 搜索

消息存储原理与 ID 规则

发表于 2022-02-15 | 分类于 pulsar

前言

上一篇文章对Pulsar的topic和分区做了介绍,这一篇我们就来对消息存储原理和ID规则来进行一下介绍,在本篇介绍中可能会出现一些让你非常困惑的词汇。不过不要紧我都会一一来进行解释

消息 ID 生成规则

在 Pulsar 中,每条消息都有属于的自己的唯一 ID(即 MessageID),MessageID 由四部分组成:ledgerId:entryID:partition-index:batch-index。其中:

  • partition-index:指分区的编号,在非分区 topic 的时候为 -1。
  • batch-index:在非批量消息的时候为 -1。

消息 ID 的生成规则由 Pulsar 的消息存储机制决定,Pulsar 中消息存储原理图如下:

如上图所示,在 Pulsar中,一个 Topic 的每一个分区会对应一系列的 ledger,其中只有一个 ledger 处于 open 状态即可写状态,而每个 ledger 只会存储与之对应的分区下的消息。

Pulsar 在存储消息时,会先找到当前分区使用的 ledger ,然后生成当前消息对应的 entry ID,entry ID 在同一个 ledger 内是递增的。每个 ledger 存在的时长或保存的 entry 个数超过阈值后会进行切换,新的消息会存储到同一个 partition 中的下一个 ledger 中。

  • 批量生产消息情况下,一个 entry 中可能包含多条消息。
  • 非批量生产的情况下,一个 entry 中包含一条消息(producer 端可以配置这个参数,默认是批量的)。

Ledger 只是一个逻辑概念,是数据的一种逻辑组装维度,并没有对应的实体。而 bookie 只会按照 entry 维度进行写入、查找、获取。

分片机制详解:Legder 和 Entry

Pulsar 中的消息数据以 ledger 的形式存储在 BookKeeper 集群的 bookie 存储节点上。Ledger 是一个只追加的数据结构,并且只有一个写入器,这个写入器负责多个 bookie 的写入。Ledger 的条目会被复制到多个 bookie 中,同时会写入相关的数据来保证数据的一致性。

BookKeeper 需要保存的数据包括:

  • Journals
    • journals 文件里存储了 BookKeeper 的事务日志,在任何针对 ledger 的更新发生前,都会先将这个更新的描述信息持久化到这个 journal 文件中。
    • BookKeeper 提供有单独的 sync 线程根据当前 journal 文件的大小来作 journal 文件的 rolling。
  • EntryLogFile
    • 存储真正数据的文件,来自不同 ledger 的 entry 数据先缓存在内存buffer中,然后批量flush到EntryLogFile中。
    • 默认情况下,所有ledger的数据都是聚合然后顺序写入到同一个EntryLog文件中,避免磁盘随机写。
  • Index 文件
    • 所有 Ledger 的 entry 数据都写入相同的 EntryLog 文件中,为了加速数据读取,会作 ledgerId + entryId 到文件 offset 的映射,这个映射会缓存在内存中,称为 IndexCache。
    • IndexCache 容量达到上限时,会被 sync 线程 flush 到磁盘中。

刚开始看到这里的时候我是有点懵逼的,这都是些啥啊,但是后面仔细思考下,这个设计似乎和mysql的底层原理几乎一致,Journals相当于redolog,EntryLogFile+Index相当于bufferpool。还不明白那就接着看下面的介绍:

Entry 数据写入

  1. 数据首先会同时写入 Journal(写入 Journal 的数据会实时落到磁盘)和 Memtable(读写缓存)。
  2. 写入 Memtable 之后,对写入请求进行响应。
  3. Memtable 写满之后,会 flush 到 Entry Logger 和 Index cache,Entry Logger 中保存数据,Index cache 中保存数据的索引信息,
  4. 后台线程将 Entry Logger 和 Index cache 数据落到磁盘。

Entry 数据读取

  • Tailing read 请求:直接从 Memtable 中读取 Entry。
  • Catch-up read(滞后消费)请求:先读取 Index信息,然后索引从 Entry Logger 文件读取 Entry。

数据一致性保证:LastLogMark

  • 写入的 EntryLog 和 Index 都是先缓存在内存中,再根据一定的条件周期性的 flush 到磁盘,这就造成了从内存到持久化到磁盘的时间间隔,如果在这间隔内 BookKeeper 进程崩溃,在重启后,我们需要根据 journal 文件内容来恢复,这个 LastLogMark 就记录了从 journal 中什么位置开始恢复。
  • 它其实是存在内存中,当 IndexCache 被 flush 到磁盘后其值会被更新,LastLogMark 也会周期性持久化到磁盘文件,供 Bookkeeper 进程启动时读取来从 journal 中恢复。
  • LastLogMark 一旦被持久化到磁盘,即意味着在其之前的 Index 和 EntryLog 都已经被持久化到了磁盘,那么 journal 在这 LastLogMark 之前的数据都可以被清除了。

总结

Pulsar Topic 和分区

发表于 2022-02-14 | 分类于 pulsar

前言

本人最近正在重构一个vdc(virus detect center)系统,为了将旧系统解耦我们将原系统拆分成了很多子系统,系统之间通过mq进行交互,在这之前本人对kafka已经是有些许了解,但是这次小组决定使用pulsar来替代kafka,那么究竟pulsar有什么好,它较kafka到底有什么优势,本人接下来的一个系列篇章就会对pulsar来进行介绍,同时也会讲解实际使用中的一些用法与问题,本系列文章的前提是假定大家对kafka都是一定了解的。

Apache Pulsar 架构

Apache Pulsar 是一个发布-订阅模型的消息系统,由 Broker、Apache BookKeeper、Producer、Consumer 等组件组成。我们知道传统mq的组成是没有Apache BookKeeper这一组件的,那么这个Apache BookKeeper究竟是何方神圣,后面我们会详细说一说

先看一张整体的结构图:

  • Producer : 消息的生产者,负责发布消息到 Topic。
  • Consumer:消息的消费者,负责从 Topic 订阅消息。
  • Broker:无状态服务层,负责接收和传递消息,集群负载均衡等工作,Broker 不会持久化保存元数据,因此可以快速的上、下线。
  • Apache BookKeeper:有状态持久层,由一组 Bookie 存储节点组成,可以持久化地存储消息。

了解kafka的朋友相信一眼就能看出一些异样,那就是kafka的存储其实就是放在broker上面的,也就是说存储与计算实际上是一体的,而从上图可知 Apache Pulsar 在架构设计上采用了计算与存储分离的模式,消息发布和订阅相关的计算逻辑在 Broker 中完成,数据存储在 Apache BookKeeper 集群的 Bookie 节点上。

Topic 与分区

Topic(主题)是某一种分类的名字,消息在 Topic 中可以被存储和发布。生产者往 Topic 中写消息,消费者从 Topic 中读消息。

Pulsar 的 Topic 分为 Partitioned Topic 和 Non-Partitioned Topic 两类,Non-Partitioned Topic 可以理解为一个分区数为1的 Topic。实际上在 Pulsar 中,Topic 是一个虚拟的概念,创建一个3分区的 Topic,实际上是创建了3个“分区Topic”,发给这个 Topic 的消息会被发往这个 Topic 对应的多个 “分区Topic”。
例如:生产者发送消息给一个分区数为3,名为my-topic的 Topic,在数据流向上是均匀或者按一定规则(如果指定了key)发送给了 my-topic-partition-0、my-topic-partition-1 和 my-topic-partition-2 三个“分区 Topic”。

分区 Topic 做数据持久化时,分区是逻辑上的概念,实际存储的单位是分片(Segment)的。

如下图所示,分区 Topic1-Part2 的数据由N个 Segment 组成, 每个 Segment 均匀分布并存储在 Apache BookKeeper 群集中的多个 Bookie 节点中, 每个 Segment 具有3个副本。

物理分区与逻辑分区

逻辑分区和物理分区对比如下:

物理分区:计算与存储耦合,容错需要拷贝物理分区,扩容需要迁移物理分区来达到负载均衡。

逻辑分区:物理“分片”,计算层与存储层隔离,这种结构使得 Apache Pulsar 具备以下优点。

  • Broker 和 Bookie 相互独立,方便实现独立的扩展以及独立的容错。
  • Broker 无状态,便于快速上、下线,更加适合于云原生场景。
  • 分区存储不受限于单个节点存储容量。
  • 分区数据分布均匀,单个分区数据量突出不会使整个集群出现木桶效应。
  • 存储不足扩容时,能迅速利用新增节点平摊存储负载。

总结

和弦的构成(分析)

发表于 2021-10-10 | 分类于 乐理

前言

弹了有段时间的吉他的,一直对吉他乐理方面没有做一个系统的分析,今天来对常用3和弦和7和弦做一个完整的分析以及公式总结

3和弦

  • 大3和弦(大三度+小三度)

    例如:
    C和弦 组成音: 1 <大三度> 3 <小三度> 5
    D和弦 组成音: 2 <大三度> 4(#) <小三度> 6

    同理可推导
    G和弦 组成音:5 <大三度> 7 <小三度> 2^

  • 小3和弦(小三度+大三度)

    例如:
    Cm和弦 组成音: 1 <小三度> 3(b) <大三度> 5
    Dm和弦 组成音: 2 <小三度> 4 <大三度> 6

    同理可以推导

    Gm和弦 组成音: 5 <小三度> 7(b) <大三度> 2^

  • 增3和弦(大三度+大三度)

    例如:C+或者 Caug

  • 减3和弦(小三度+小三度)

    例如:C- 或者 Cdim

7和弦

记得看中国好声音时,李荣浩说leehom的《爱错》中的bridge中那句在这少了你的世界那里的7减5和弦让他头皮发麻,这就来看看啥是7减5和弦,其实那时说7减5的时候王力宏是懵逼的,专业叫法应该叫半减7和弦。

  • 大小7和弦(属7和弦)(大三和弦+小7度)[小7度就是包含两个半音,大7度就是包含一个半音]

    例如:

    C7和弦组成音本质就是 C和弦+小7度(1 3 5 + 7(b))

    F7和弦组成音本质就是 F和弦+小7度(4 6 1^ + 3^(b))

  • 小7和弦(小三和弦+小7度)

    例如:

    Cm7和弦的组成音本质就是 Cm和弦+小7度 (1 3(b) 5 7(b))

  • 大7和弦(大三和弦+大7度)

    例如:

    Cmaj7和弦组成音本质就是C和弦+大7度(1 3 5 + 7)

    Fmaj7和弦组成音本质就是F和弦+大7度 (4 6 1^ 3^)

  • 小大7和弦(小三和弦+大7度)

    例如:

    Cmmaj7和弦组成音本质就是Cm和弦+大7度(1 3(b) 5 7)

  • 半减7和弦

    例如:

    Bm7-5 这个和弦就是上面说到的让李荣浩头皮发麻的7-5和弦了,没错这种和弦的收缩性非常强,用到合适的地方确实会头皮发麻。

    那么这个和弦的本质其实就是Bm和弦(7 2^ 4^(#) + 小7度 + 5音降半音 ( 7 2^ 4^ 6^)

    所以通过公式我们也可以推到出Ammaj7-5和弦的组成音了,哈哈哈 虽然这个和弦不一定存在。

和弦色彩

今天不打算对和弦色彩进行总结,但是要提一句的是,和弦色彩对于编配来说实在是太重要了,就好像一首歌你可以用4536251去完成,你也可以用分割和弦 5/4去代替其中的5,但是两者表达出来的色彩是完全不一样的。后面会对和弦色彩进行一次详细的记录。

总结

像我这种业余玩流行音乐的,和弦其实是一个非常重要的理论基础,就像我们写程序你要写好代码就一定要懂原理,老实说其实掌握一些和弦套路例如6451,4536251还有17654325(卡农和弦)低音下行这些后基本上流行乐坛百分之70的歌你都能弹唱了,无非就是一首换一首,就像你你做开发选用的是dubbo框架还是springcloud框架一样。但是如果想玩的更有趣一点就需要去了解这些框架的构成原理了。

如何搭建一个高性能cdk系统

发表于 2021-10-08 | 分类于 工作沉淀

前言

本人前不久稍稍改进了一个cdkey系统,来总结下是如何设计一个cdkey(兑换码)系统的,本文打算从以下三个方面来说。

  • cdkey兑换
  • 如何生成cdkey以及如何存储
  • 如何保证高可用

cdkey兑换

cdkey本身只是一种推广售卖方式,其实不用也是可以的,不过市面上的商品一般除了正常的收费购买之外一定会伴随有cdkey售卖这样的方式。一个cdkey就对应着一个具体的商品,这也意味着cdkey是需要与商品以及订进行绑定的,举个例子,类似这样的一串字符串XXX3XXXNLMX1YEXXX就是cdkey的一个样式,是一个36进制构成的16位表达式,一般先由系统提前生成好然后下发出去售卖或者活动赠送等,兑换的时候做一些校验工作然后是生成订单流程,订单流程走完之后最后将订单号与cdkey绑定并将其状态更改成已兑换。流程如下图:

流程图因涉及到公司的业务流转 不方便在此贴出

如何生成cdkey以及如何存储

上一节提到目前系统的cdkey是一个36进制的16位表达式,针对36进制我们可以预先定义一个数组:

1
2
3
4
5
6
7
8
9
10
myVal = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P","Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z","0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}
//生成一个cdk的代码,所以是有可能一批cdk里面产生重复的,必须做好本地去重
func genOneKey() string {
buf.Reset()
for j := 0; j < 16; j++ {
val := rand.Intn(len(myVal))
buf.WriteString(myVal[val])
}
return buf.String()
}

当然为了安全我们也可以将上述的数组顺序随意打乱,这种方式在上一篇短链系统设计的时候已经用过一次了,只不过短链用的是64进制而已。我们也可以使用64进制来降低cdkey生成时的碰撞率,不过就cdkey而言为了美观一般就不掺杂小写字母了。本人目前的需求是需要支持大批量cdk申请的,单批次申请需要支持10w,所以我这边cdkey的生成实际是按批次生成的,这样的好处就是可以减少与数据库的网络IO,每次生成一批cdkey然后去数据库校验是否存在,存在了就重试生成一批新cdkey再校验,直到校验不存在就插入数据库然后生成下一批,反复执行直到申请数量全部生成完毕。

总体流程图如下:

接下来是单批次生成cdk详细逻辑:

给出表设计方案 :

下面的表中很多字段都是与具体业务相关的,不必过于关心,但是核心的字段在一个cdk系统中基本上都是不变的。

1
2
3
4
5
6
7
8
//因为表设计也有相关敏感信息不方便在此处贴出具体字段
//cdk订单申请表
CREATE TABLE `cdk_order` (
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='cdk申请单表'

//cdk详情表
CREATE TABLE `cdk_detail` (
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='cdk明细表'

如何保证高可用

一般来说通过集群部署可解决系统高可用问题,但是集群部署就会导致新的问题,可能两个服务节点同时工作产生的cdkey就有可能会出现交集本地去重并不能解决问题,这里我列出了两种方案来解决这个问题,就当前情况而言本人采用了第一种方案:

  • 方案一:通过数据库自身来解决该问题

    上一节介绍的方案就是了,通过数据库校验 + 事务 + 重试来解决

  • 方案二:通过分布式锁+bloomfilter来解决这个问题

    见方案设计图:

总结

本文对cdk系统设计方案作了一点总结,整体的思路就是要减少碰撞,保证申请数量与实际生成数量一致,并且要防止一次性提交数据量过大,具体最后的高可用方案哪种更合适还是依据实际情况再做定夺为佳。

高性能短链系统的一些设计思考

发表于 2021-09-27 | 分类于 工作沉淀

前言

最近遇到一个需求,需要短时间内将大量的长链转换成短链然后再转成二维码,由于我们的短链服务是通过接口外部进行调用的,而且不能批量转换,所以这里就会出现一个情况就是短时间的大量请求接口会以失败告终,如果能自己设计一个短链系统,这个问题就能解决了。那么今天就来谈谈如何设计一个高性能短链系统。

本文将会从以下几个方面来进行探讨

  • 为啥要用短链 长链存在哪些问题
  • 短链是如何实现跳转的
  • 如何生成短链以及如何存储
  • 如何保证高性能高可用

为啥要用短链 长链存在哪些问题

下面是自如给我发的推送短信,点击下方蓝色的链接(短链)

img

浏览器接着会跳转到一个确认跳转页面。

img

那么为啥要用短链表示,直接用长链不行吗,用短链的话有如下好外

1、链接变短,在对内容长度有限制的平台发文,可编辑的文字就变多了

最典型的就是微博,限定了只能发 140 个字,如果一串长链直接怼上去,其他可编辑的内容就所剩无几了,用短链的话,链接长度大大减少,自然可编辑的文字多了不少。

再比如上面的短信如果一个长链直接怼上去满屏都是一个链接,非常不美观。

2、像我前言中的需求需要将链接转成二维码,如果是长链的话二维码非常密集而且很难识别,短链的话就会清爽很多,如下图所示

img

3、链接太长在有些平台上无法自动识别为超链接

短链是如何实现跳转的

从上文可知,短链好处多多,那么它是如何工作的呢。我们在浏览器抓下包看看

img

可以看到请求后,返回了状态码 302(重定向)与 location 值为长链的响应,然后浏览器会再请求这个长链以得到最终的响应,整个交互流程图如下

img

主要步骤就是访问短网址后重定向访问 B,那么问题来了,301 和 302 都是重定向,到底该用哪个,这里需要注意一下 301 和 302 的区别

  • 301,代表 永久重定向,也就是说第一次请求拿到长链接后,下次浏览器再去请求短链的话,不会向短网址服务器请求了,而是直接从浏览器的缓存里拿,这样在 server 层面就无法获取到短网址的点击数了,如果这个链接刚好是某个活动的链接,也就无法分析此活动的效果。所以我们一般不采用 301。
  • 302,代表 临时重定向,也就是说每次去请求短链都会去请求短网址服务器(除非响应中用 Cache-Control 或 Expired 暗示浏览器缓存),这样就便于 server 统计点击数,所以虽然用 302 会给 server 增加一点压力,但在数据异常重要的今天,这点代码是值得的,所以推荐使用 302!

如何生成短链以及如何存储

1、Hash

短链怎么生成,我的第一反应,这不就是以不定长输入(长链)转换成定长输出(短链)【哈希的定义】,观察上面的短链很明显可以看到短链是由固定短链域名 + 长链映射成的一串字母组成(不定长输入–>定长输出),那么这个哈希函数该怎么取呢,相信肯定有很多人说用 MD5,SHA 等算法,网上确实有很多是用md5先生成32位串,然后均分4段做hash处理,最后再随机取其中之一作为最后结果,只是我在想这个md5先生成32位串是否一定有必要,而且既然是加密就意味着性能上会有损失,其实我觉得这里的重点应该是hash并不是加解密,如何提升哈希的运算速度和减少冲突概率才是重点。以下属于借鉴内容了这里推荐 Google 出品的 MurmurHash 算法,MurmurHash 是一种非加密型哈希函数,适用于一般的哈希检索操作。与其它流行的哈希函数相比,对于规律性较强的 key,MurmurHash 的随机分布特征表现更良好。非加密意味着着相比 MD5,SHA 这些函数它的性能肯定更高(实际上性能是 MD5 等加密算法的十倍以上),也正是由于它的这些优点,所以虽然它出现于 2008,但目前已经广泛应用到 Redis、MemCache、Cassandra、HBase、Lucene 等众多著名的软件中。

MurmurHash 提供了两种长度的哈希值,32 bit,128 bit,为了让网址尽可通地短,我们选择 32 bit 的哈希值,32 bit 能表示的最大值近 43 亿,对于中小型公司的业务而言绰绰有余。对上文提到的极客长链做 MurmurHash 计算,得到的哈希值为 3002604296,于是我们现在得到的短链为 固定短链域名+哈希值 = http://xxx.com/a/3002604296

上述结果还是有点长?

觉得10位的短链还是有点长怎么办?首先3002604296 这个结果是10进制数字,有一种方案就是将它转为 62 进制就可以缩短它的长度,10 进制转 62 进制如下,也就是按62取模,对应的余数在下面的字符串中取对应值:

62进制思路 : 0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ

img

经过上面的取模运算然后取映射值,就可以将(3002604296)10进制数字转换成 (3hcCxy)62进制,立马又缩短了4位,因此最终的短链为 http://xxx.com/a/3hcCxy,6 位 62 进制数可表示 568 亿的数,应付长链转换绰绰有余,如果需要更短一点也是可以的,根据实际需求进行取舍吧,我们公司目前用的是8位https://xxx.com/r8CpSjCN

hash冲突了怎么办?

既然是哈希函数,那么很有可能两个不同的长链经过hash之后生成的短链是一样的,那么这个问题要怎么解决?这里给出的思路就是在长链的基础上添加随机字符串然后重试生成短链。

由上文知道访问短链能跳转到长链,那么长短链的关系一定是有一个地方存储的, Redis 或 Mysql ?,一般来说 Mysql 存储首选,redis缓存首选。表结构如下所示

1
2
3
4
5
6
7
CREATE TABLE `long_short_url_map` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`long_url` varchar(160) DEFAULT NULL COMMENT '长链',
`short_url` varchar(10) DEFAULT NULL COMMENT '短链',
`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间'
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

基于上面的组件可以做出如下的设计。

  1. 长链(longurl)经过 MurmurHash 后取模62得到最终短链。
  2. 再根据短链去 long_short_url_map 表中查找看是否存在相关记录,如果不存在,将长链与短链对应关系插入数据库中,存储。
  3. 如果存在,说明已经有相关记录了,此时在长串上拼接一个自定义好的字段,比如「duplicate+RandomNum」,「longurl + duplicate+RandomNum」去重试第一步操作,如果最后还是重复就继续重拾,如果没有重复了就把原longurl和短链的关系存储即可。

上面的步骤是没有加入缓存的,插入一条记录是需要经过两次(甚至三次(概率微乎其微)) sql 查询(1.根据短链查记录是否存在 2.将长短链对应关系插入数据库中),像我前言中的需求在高并发下,明显还是会有瓶颈出现的。一般数据库和应用服务(只做计算不做存储)会部署在两台不同的 server 上,执行两条 sql 就需要两次网络通信,这两次网络通信与两次 sql 执行是整个短链系统的性能瓶颈所在!

引入缓存减少第一次的sql查询?

很显然插入数据那一次的sql肯定没跑了,无论怎样那一次的sql都是要执行的

  1. 方案一 : 给短链字段 short_url 加上唯一索引,把唯一性校验直接交给数据库去做,可行但是数据库压力很大(唯一索引懂的都懂)
  2. 方案二 : 数据量很大的情况下,冲突的概率会增大,此时我们可以使用加Bloomfilter(缓存)来进行优化。

用所有生成的短网址构建布隆过滤器,当一个新的长链生成短链后,先将此短链在Bloomfilter中进行查找,如果不存在,说明 db 里不存在此短网址,可以插入,插入db之前先将短链放入Bloomfilter。Bloomfilter是一种非常省内存的数据结构,长度为 10 亿的布隆过滤器,只需要 125 M 的内存空间。

综上,如果用哈希函数来设计,总体的设计思路如下

img

用哈希算法生成的短链其实已经能满足我们的业务需求,本人目前工作中遇到的短链就是用hash生成的只不过保留的是8位。其实还有另外一种是通过自增序列

2、Sequence

待完善

高性能短链的架构设计

在电商公司,经常有很多活动,秒杀,抢红包等等,在某个时间点的 QPS 会很高,考虑到这种情况,可以引入了 openResty,它是一个基于 Nginx 与 Lua 的高性能 Web 平台,由于 Nginx 的非阻塞 IO 模型,使用 openResty 可以轻松支持 100 w + 的并发数,一般情况下你只要部署一台即可,不过为了避免单点故障,两台为宜,同时 openResty 也自带了缓存机制,集成了 redis 这些缓存模块,也可以直接连 mysql。不需要再通过业务层连这些中间件,性能自然会高不少

img

上图所示,使用 openResty 可以直接跳过了业务层这一步,直达缓存层与数据库层,对性能也有大量提升。

总结

本文对短链设计方案作了一点总结,文中涉及到像Bloomfilter,openResty 等技术,后续再去详细讨论。值得说明的是Bloomfilter的确是一个强大的缓存层有必要好好学习一番。

自定义简易协程池踩坑记录

发表于 2021-09-14 | 分类于 Golang

前言

上一篇文章中完善了一个自定义协程池,正所谓不改就没事,哈哈,果然上周测试环境发版后就出问题了,本人的服务的健康检查一发新版本之后就会立马unhealthy,然后先是观察容器cpu负载竟然是100%,好家伙,登陆容器通过top命令确定了就是自己的服务导致的,然后检查了自己修改过的代码,经过半天的排查果然找到了这个病因。

代码展示(问题代码部分)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//运行一个groutine 开始消费任务 核心的问题就出在了下面这个方法内 
//for循环中嵌套了select 然后通过关闭通道进行跳出
//然而我之前的跳出用的是break,这里就是关键了
//break只能是跳出当前select,外层的for循环还是继续走的,然后就进入了死循环了
func (self *Pool) runGroutine() { // runningWorkers + 1
self.incRunningWorkers() //worker运行ing数量原子自增1
go func() {
defer func() {
self.decRunningWorkers() //worker运行ing数量原子自减1
if r := recover(); r != nil {
// if self.PanicHandler != nil {
// self.PanicHandler(r)
// } else {
// log.Printf("Worker panic: %s\n", r)
// }
err := errs.New(Panic_Sub_Goroutine, "子协程panic") //子协程panic是会导致主协程挂掉的,这一步也是必须进行捕获处理
self.ResultQuene <- err
}
self.checkWorker() // 兜底机制,避免worker全部panic后没有worker消费队列中的数据,理论上这一步非常重要!否则有可能出现死锁状态
}()
for {
select {
case task, ok := <-self.TaskQueue:
if !ok {
//break
return //这里不要用break break只能跳出一层第二层的for循环还会继续 会直接把cpu打满
}
err := task()
self.ResultQuene <- err
}
}
}()
}

总结

好在是在测试环境发布时发现了这样的问题,如果是线上出现该问题还是比较麻烦的,总结就是对select的一些用法还是不够熟悉,下一篇文章就来对golang中的select用法进行一下总结(点此处了解select用法总结)。

go的select用法总结

发表于 2021-09-14 | 分类于 Golang

前言

沿着上一篇留下来的问题继续说,来总结下golang中的select的一些用法,避免后续踩坑。

首先golang中的select语句格式如下

1
2
3
4
5
6
7
8
select {
case <-ch1:
// 如果从 ch1 信道成功接收数据,则执行该分支代码
case ch2 <- 1:
// 如果成功向 ch2 信道成功发送数据,则执行该分支代码
default:
// 如果上面都没有成功,则进入 default 分支处理流程
}

  

直观上跟switch确实是有点相似的,但实际上两者有着本质上的区别。

select里的case后面并不带判断条件,而是一个信道的操作,不同于switch里的case,对于从其它语言转过来的开发者来说有些需要特别注意的地方。

golang 的 select 就是监听 IO 操作,当 IO 操作发生时,触发相应的动作每个case语句里必须是一个IO操作,确切的说,应该是一个面向channel的IO操作。

注:Go 语言的 select 语句借鉴自 Unix 的 select() 函数,在 Unix 中,可以通过调用 select() 函数来监控一系列的文件句柄,一旦其中一个文件句柄发生了 IO 动作,该 select() 调用就会被返回(C 语言中就是这么做的),后来该机制也被用于实现高并发的 Socket 服务器程序。Go 语言直接在语言级别支持 select关键字,用于处理并发编程中通道之间异步 IO 通信问题。

注意:如果 ch1 或者 ch2 信道都阻塞的话,就会立即进入 default 分支,并不会阻塞。但是如果没有 default 语句,则会阻塞直到某个信道操作成功为止。

重要知识点

  1. select语句只能用于信道的读写操作
  2. select中的case条件(非阻塞)是并发执行的,select会选择先操作成功的那个case条件去执行,如果多个同时返回,则随机选择一个执行,此时将无法保证执行顺序。对于阻塞的case语句会直到其中有信道可以操作,如果有多个信道可操作,会随机选择其中一个 case 执行
  3. 对于case条件语句中,如果存在信道值为nil的读写操作,则该分支将被忽略,可以理解为从select语句中删除了这个case语句
  4. 如果有超时条件语句,判断逻辑为如果在这个时间段内一直没有满足条件的case,则执行这个超时case。如果此段时间内出现了可操作的case,则直接执行这个case。一般用超时语句代替了default语句
  5. 对于空的select{},会引起死锁
  6. 对于for中的select{}, 也有可能会引起cpu占用过高的问题

下面列出每种情况的示例代码

1. select语句只能用于信道的读写操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import "fmt"

func main() {
size := 10
ch := make(chan int, size)
for i := 0; i < size; i++ {
ch <- 1
}

ch2 := make(chan int, size)
for i := 0; i < size; i++ {
ch2 <- 2
}

ch3 := make(chan int, 1)

select {
case 3 == 3:
fmt.Println("equal")
case v := <-ch:
fmt.Print(v)
case b := <-ch2:
fmt.Print(b)
case ch3 <- 10:
fmt.Print("write")
default:
fmt.Println("none")
}
}
语句会报错

prog.go:20:9: 3 == 3 evaluated but not used
prog.go:20:9: select case must be receive, send or assign recv<br>从错误信息里我们证实了第一点。

  

2. select中的case语句是随机执行的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import "fmt"

func main() {
size := 10
ch := make(chan int, size)
for i := 0; i < size; i++ {
ch <- 1
}

ch2 := make(chan int, size)
for i := 0; i < size; i++ {
ch2 <- 2
}

ch3 := make(chan int, 1)

select {
case v := <-ch:
fmt.Print(v)
case b := <-ch2:
fmt.Print(b)
case ch3 <- 10:
fmt.Print("write")
default:
fmt.Println("none")
}
}

  多次执行的话,会随机输出不同的值,分别为1,2,write。这是因为ch和ch2是并发执行会同时返回数据,所以会随机选择一个case执行,。但永远不会执行default语句,因为上面的三个case都是可以操作的信道。

3. 对于case条件语句中,如果存在通道值为nil的读写操作,则该分支将被忽略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import "fmt"

func main() {
var ch chan int
// ch = make(chan int)

go func(c chan int) {
c <- 100
}(ch)

select {
case <-ch:
fmt.Print("ok")

}
}
报错

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [select (no cases)]:
main.main()
/tmp/sandbox488456896/main.go:14 +0x60

goroutine 5 [chan send (nil chan)]:
main.main.func1(0x0, 0x1043a070)
/tmp/sandbox488456896/main.go:10 +0x40
created by main.main
/tmp/sandbox488456896/main.go:9 +0x40
可以看到 “goroutine 1 [select (no cases)]” ,虽然写了case条件,但操作的是nil通道,被优化掉了。
要解决这个问题,只能使用make()进行初始化才可以。

  

4. 超时用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"time"
)

func main() {
ch := make(chan int)
go func(c chan int) {
// 修改时间后,再查看执行结果
time.Sleep(time.Second * 1)
ch <- 1
}(ch)

select {
case v := <-ch:
fmt.Print(v)
case <-time.After(2 * time.Second): // 等待 2s
fmt.Println("no case ok")
}

time.Sleep(time.Second * 10)
}

我们通过修改上面的时等待时间可以看到,如果等待时间超出<2秒,则输出1,否则打印“no case ok”

  

5. 空select{}

1
2
3
4
5
6
7
8
9
10
package main

func main() {
select {}
}
goroutine 1 [select (no cases)]:
main.main()
/root/project/practice/mytest/main.go:10 +0x20
exit status 2
直接死锁

  

6. for中的select 引起的CPU过高的问题(上一篇踩坑记录中就是这个问题)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"runtime"
"time"
)

func main() {
quit := make(chan bool)
for i := 0; i != runtime.NumCPU(); i++ {
go func() {
for {
select {
case <-quit:
break
default:
}
}
}()
}

time.Sleep(time.Second * 15)
for i := 0; i != runtime.NumCPU(); i++ {
quit <- true
}
}

  

上面这段代码会把所有CPU都跑满,原因就就在select的用法上。

一般来说,我们用select监听各个case的IO事件,每个case都是阻塞的。上面的例子中,我们希望select在获取到quit通道里面的数据时立即退出循环,但由于他在for{}里面,在第一次读取quit后,仅仅退出了select{},并未退出for,所以下次还会继续执行select{}逻辑,此时永远是执行default,直到quit通道里读到数据,否则会一直在一个死循环中运行,即使放到一个goroutine里运行,也是会占满所有的CPU。

解决方法我这里总结了有三种方案:

1.使用break + 标示位置 相当于 使用goto 跳出for循环 。

2.使用return代替break 直接结束子goroutine 。

3.使用flag,二次break跳出for循环。

总结

我们在使用每一句代码时都需要时刻保持敬畏之心,先将其原理用法都弄清楚然后再投入使用是我们开发工程师应该时刻保持的原则。谨记~

简易协程池

发表于 2021-09-03 | 分类于 Golang

前言

Go 的 goroutine 提供了一种较线程而言更廉价的方式处理并发场景, go 使用二级线程的模式, 将 goroutine 以 M:N 的形式复用到系统线程上, 节省了 cpu 调度的开销, 也避免了用户级线程(协程)进行系统调用时阻塞整个系统线程的问题。

但在实际的开发中如果待执行的tasks数量过多,可能带来的问题就是goroutine数量激增进而导致调度性能下降、GC 频繁、内存暴涨, 引发一系列问题。在面临这样的场景时, 限制 goroutine 的数量、重用 goroutine 显然很有价值。所以可以通过一个goroutine池来对所有的tasks进行排队执行,类似下面就是一个简单的goroutine池,通过定量的goroutine来进行任务的消费。设计下面的协程池有三个重点问题是需要重点考虑到的
1.如何确保任务全部都被消费完?
2.子goroutine发生panic如何处理?
3.子goroutine如果全部发生panic了怎么办?放任死锁吗?

这里并没考虑子goroutine执行时间太长的问题,主要原因我们可以自定义子goroutine的数量,我更希望的是任务能够执行完全而不是任务什么时候执行完,任务执行时间太长应该是业务侧的问题。

代码展示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import (
"sync"
"sync/atomic"

"git.code.oa.com/trpc-go/trpc-go/errs"
"git.code.oa.com/trpc-go/trpc-go/log"
)

//自定义简单协程池 统一管理并发协程
//整体思路:定量的协程从同一个任务队列循环消费任务
//缺点:目前功能对协程超时情况存在缺陷 初始化函数中没有对初始化数量进行校验
type Pool struct {
TaskQueue chan func() error //任务队列
WorkerNumber int //初始化worker数量
TaskNumber int //待消费任务数量
ResultQuene chan error //返回结果队列
FinishCallback func() //全部任务消费完成的回调函数
runningWorkers int64 //运行中的worker数
sync.Mutex
// PanicHandler func(interface{}) //协程panic的处理函数 防止协程panic后导致主协程崩溃
}

//初始化 根据自定义需求设定worker数量以及任务队列长度
func (self *Pool) Init(WorkerNumber int, TaskNumber int) {
self.WorkerNumber = WorkerNumber
self.TaskNumber = TaskNumber
self.TaskQueue = make(chan func() error, TaskNumber)
self.ResultQuene = make(chan error, TaskNumber)
}

//开启worker的消费动作
func (self *Pool) Start() {
//默认开启 WorkerNumber个goruntine
for i := 0; i < self.WorkerNumber; i++ {
self.runGroutine()
}

//获取每个任务的处理结果
for j := 0; j < self.TaskNumber; j++ {
err, ok := <-self.ResultQuene
//这里可以通过自定义决定是否上报日志
if !ok {
break
} else {
if err != nil {
log.Error(err.Error())
}
}
}

//结束时的回调函数 可以做一些通知操作等等
if self.FinishCallback != nil {
self.FinishCallback()
}
}

//运行一个groutine 开始消费任务
func (self *Pool) runGroutine() { // runningWorkers + 1
self.incRunningWorkers() //worker运行ing数量原子自增1
go func() {
defer func() {
self.decRunningWorkers() //worker运行ing数量原子自减1
if r := recover(); r != nil {
// if self.PanicHandler != nil {
// self.PanicHandler(r)
// } else {
// log.Printf("Worker panic: %s\n", r)
// }
err := errs.New(Panic_Sub_Goroutine, "子协程panic") //子协程panic是会导致主协程挂掉的,这一步也是必须进行捕获处理
self.ResultQuene <- err
}
self.checkWorker() // 兜底机制,避免worker全部panic后没有worker消费队列中的数据,理论上这一步非常重要!否则有可能出现死锁状态
}()
for {
select {
case task, ok := <-self.TaskQueue:
if !ok {
//break 这里用break导致线上遇到了一个cpu被打满的bug 下一篇文章中进行说明
return
}
err := task()
self.ResultQuene <- err
}
}
}()
}

func (self *Pool) incRunningWorkers() { // runningWorkers + 1
atomic.AddInt64(&self.runningWorkers, 1)
}

func (self *Pool) decRunningWorkers() { // runningWorkers - 1
atomic.AddInt64(&self.runningWorkers, -1)
}

func (self *Pool) GetRunningWorkers() int64 {
return atomic.LoadInt64(&self.runningWorkers)
}

func (self *Pool) checkWorker() {
self.Lock()
defer self.Unlock()

if self.GetRunningWorkers() == 0 && len(self.TaskQueue) > 0 {
self.runGroutine()
}
}

//关闭通道避免内存泄漏
func (self *Pool) Stop() {
close(self.TaskQueue)
close(self.ResultQuene)
}

//任务入队
func (self *Pool) AddTask(task func() error) {
self.TaskQueue <- task
}

//执行结束后的回调
func (self *Pool) SetFinishCallback(fun func()) {
self.FinishCallback = fun
}

使用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
ctx := context.Background()
ids := make([]int32, 0)
var pool comm.Pool
pool.Init(50, len(ids))
count := int32(0)
for _, id := range ids {
idValue := id
pool.AddTask(func() error {
return func(innerId int32) error {
err := DoSomething(innerId)
if err != nil {
log.ErrorContextf(ctx, "DoSomething failed, id:%d err:%s", innerId, err.Error())
return nil
}
//以上执行无问题,完成数就+1
atomic.AddInt32(&count, 1)
return nil
}(int32(idValue))
})
}
//回调
pool.SetFinishCallback(func() {
//自定义通知操作
fmt.Println("all done")
})
//开始执行
pool.Start()
//关闭资源
pool.Stop()

总结

池化管理自己的goroutine是非常有必要的,可以帮助我们提升开发效率降低异常风险!所以赶紧把你的goroutine管理起来吧

Redis的scan命令解释

发表于 2021-08-25 | 分类于 Redis

scan的种类

1
2
3
4
5
原文地址:https://segmentfault.com/a/1190000018218584
SCAN cursor [MATCH pattern] [COUNT count]
SSCAN KEY cursor [MATCH pattern] [COUNT count]
HSCAN KEY cursor [MATCH pattern] [COUNT count
ZSCAN KEY cursor [MATCH pattern] [COUNT count]

scan:迭代当前库

sscan:迭代一个 set 类型

hscan:迭代一个hash类型,并返回相应的值

zscan:迭代一个sorted set,并且返回相应的分数

知道redis是单进程单线程模型,keys和smembers这种命令可能会阻塞服务器,导致redis有可能出现长时间无法响应其他命令的情况,所以出现了scan系列的命令,通过返回一个游标,可以增量式迭代。

scan类型命令的实现

scan,sscan,hscan,zsan分别有自己的命令入口,入口中会进行参数检测和游标赋值,然后进入统一的入口函数:scanGenericCommand,以hscan命令为例:

图片描述
scanGenericCommand主要分四步:

  • 解析count和match参数.如果没有指定count,默认返回10条数据
  • 开始迭代集合,如果key保存为ziplist或者intset,则一次性返回所有数据,没有游标(游标值直接返回0).由于redis设计只有数据量比较小的时候才会保存为ziplist或者intset,所以此处不会影响性能.

游标在保存为hash的时候发挥作用,具体入口函数为dictScan,下文详细描述。

  • 根据match参数过滤返回值,并且如果这个键已经过期也会直接过滤掉(redis中键过期之后并不会立即删除)
  • 返回结果到客户端,是一个数组,第一个值是游标,第二个值是具体的键值对

dictScan中游标的实现

上文中我们了解到了redis中的rehash操作叫渐进式rehash,所以当迭代一个哈希表时,存在三种情况:

  • 从迭代开始到结束,哈希表没有进行rehash
  • 从迭代开始到结束,哈希表进行了rehash,但是每次迭代时,哈希表要么没开始rehash,要么已经结束了rehash
  • 从迭代开始到结束,某次或某几次迭代时哈希表正在进行rehash

redis中的字典进行rehash时会存在两个哈希表,ht[0]与ht[1],并且是渐进式rehash(即不会一次性全部rehash);新的键值对会存放到ht[1]中并且会逐步将ht[0]的数据转移到ht[1].全部rehash完毕后,ht[1]赋值给ht[0]然后清空ht[1].

因此游标的实现需要兼顾以上三种情况,以上三种情况的游标实现要求如下:

  • 第一种情况比较简单,假设redis的哈希表大小为4,则第一次游标为0,读取第一个bucket的数据,然后游标返回1,下次读取第二个bucket的位置,依次遍历
  • 第二种情况比较复杂,假设redis的哈希表大小为4,如果rehash完后size变成了8.如果仍然按照上边的思路返回游标,则如下图:

图片描述

假设bucket0读完之后返回了游标1,当客户端再次带着游标1返回时哈希表已经进行完rehash,并且size扩大了一倍变成了8.redis按如下方法计算一个键的bucket:

1
hash(key)&(size-1)

即如果size是4时,hash(key)&11,如果size是8时,hash(key)&111.因此当从4扩容到8时,原先在0bucket的数据会分散到0(000)与4(100)两个bucket,bucket对应关系表如下:

图片描述
从二进制来看,当size为4时,hash(key)之后取低两位即 hash(key)&11即key的bucket位置,如果size为8时,bucket位置为 hash(key)&111,即取低三位,当低两位为00时,如果第三位为0,则为000,如果第三位为1,则为100,正好是4.其他槽位的类似.所以如果此时继续按第一种方法遍历,第四个bucket取到的值全部为重复值

  • 第三种情况,如果返回游标1时正在进行rehash,ht[0]中的bucket 1中的部分数据可能已经rehash到 ht[1]中的bucket[1]或者bucket[5],此时必须将ht[0]和ht[1]中的相应bucket全部遍历,否则可能会有遗漏数据

所以为了兼顾以上三种情况,做到不漏数据并且尽量不重复,redis使用了一种叫做reverse binary iteration的方法.具体的游标计算代码如下,具体为什么这么做我也不知道,反正redis就是选择这么做了吗,而且效果很明显:

图片描述
代码逻辑很简单,下面示例从4变为8和从4变为16以及从8变为4和从16变为4时,这种方法为何能够做到不重不漏

图片描述
遍历size为4时的游标状态转移为0-2-1-3.

同理,size为8时的游标状态转移为0-4-2-6-1-5-3-7.

size为16时的游标状态转义为0-8-4-12-2-10-6-14-1-9-5-13-3-11-7-15

图片描述

可以看出,当size由小变大时,所有原来的游标都能在大的hashTable中找到相应的位置,并且顺序一致,不会重复读取并且不会遗漏

例如size原来是4变为了8,且第二次遍历时rehash已经完成.此时游标为2,根据图2,我们知道size为4时的bucket2会rehash到size为8时的2和6.而size为4时的bucket0rehash到size为8时的0和4

由于bucket 0 已经遍历完,也即size为8时的0,4已经遍历,正好开始从2开始继续遍历,不重复也不会遗漏

继续考虑size由大变小的情况.假设size由16变为了4,分两种情况,一种是游标为0,2,1,3中的一种,此时继续读取,也不会遗漏和重复

但如果游标返回的不是这四种,例如返回了10,10&11之后变为了2,所以会从2开始继续遍历.但由于size为16时的bucket2已经读取过,并且2,10,6,14都会rehash到size为4的bucket2,所以会造成重复读取

size为16时的bucket2。即有重复但不会遗漏

总结一下:redis里边rehash从小到大时,scan系列命令不会重复也不会遗漏.而从大到小时,有可能会造成重复但不会遗漏.

截止目前,情况1和情况2已经比较完美的处理了。情况3看看如何处理

情况3需要从ht[0]和ht[1]中都取出数据,主要的难点在于如何在size大的哈希表中找到应该取哪些bucket.redis代码如下:

图片描述
判断条件为:

1
v&(m0^m1)

size 4的m0为00000011,size8的m1为00000111,二者异或之后取值为00000100,即取二者mask高位的值,然后&v,看游标是否在高位还有值

下一个游标的取值方法为

1
v = (  ((v | m0) +1)& ~m0) | ( v & m0)

右半部分 取v的低位,左半部分取v的高位。 (v&m0)取出v的低位 例如size = 4时为 v&00000011

左半部分 (v|m0) + 1即将v的低位都置为1,然后+1之后会进位到v的高位,再次 & ~m0之后即取出了v的高位

整体来看每次将游标v的高位加1.下边举例来看:

假设游标返回了2,并且正在进行rehash,此时size由4变成了8 .则m0 = 00000011 v = 00000010

根据公式计算出的下一个游标为 ( (( 00000010|00000011) +1 ) & (11111100) )| (00000010 & 00000011) = (00000100)&(11111100)|(00000010) = (00000110) 正好是6

判断条件为 (00000010) & (00000011 ^ 00000111) = (00000010) & (00000100) = (00000000) 为0,结束循环

Redis中的渐进式Rehash

发表于 2021-08-24 | 分类于 Redis

前言

如果你有一个hash表,然后不停的往里面写值或者删值,随着操作的不断执行, 哈希表保存的键值对会逐渐地增多或者减少, 为了让哈希表的负载因子(load factor)维持在一个合理的范围之内, 当哈希表保存的键值对数量太多或者太少时, 程序需要对哈希表的大小进行相应的扩展或者收缩。这个过程就叫做rehash,而在redis中这个rehash的过程又叫做渐进式rehash。

Rehash原理

扩展和收缩哈希表的工作可以通过执行 rehash (重新散列)操作来完成, Redis 对字典的哈希表执行 rehash 的步骤如下:

  1. 为字典的ht[1]哈希表分配空间, 这个哈希表的空间大小取决于要执行的操作, 以及ht[0]当前包含的键值对数量 (也即是ht[0].used属性的值):
    • 如果执行的是扩展操作, 那么 ht[1] 的大小为第一个大于等于 ht[0].used * 2 的 2^n (2 的 n 次方幂),也就是说每次扩容后的大小一定是2^n,因为只有2^n -1的二进制每一位都为1;
    • 如果执行的是收缩操作, 那么 ht[1] 的大小为第一个大于等于 ht[0].used 的 2^n 。
  2. 将保存在 ht[0] 中的所有键值对 rehash 到 ht[1] 上面: rehash 指的是重新计算键的哈希值和索引值, 然后将键值对放置到 ht[1]哈希表的指定位置上。
  3. 当 ht[0] 包含的所有键值对都迁移到了 ht[1] 之后 (ht[0] 变为空表), 释放 ht[0] , 将 ht[1] 设置为 ht[0] , 并在 ht[1] 新创建一个空白哈希表, 为下一次 rehash 做准备。

举个例子, 假设程序要对图 1 所示字典的 ht[0] 进行扩展操作, 那么程序将执行以下步骤:

  1. ht[0].used 当前的值为 4 , 4 * 2 = 8 , 而 8 (2^3)恰好是第一个大于等于 4 的 2 的 n 次方, 所以程序会将 ht[1] 哈希表的大小设置为 8 。 图 2 展示了 ht[1] 在分配空间之后, 字典的样子。
  2. 将 ht[0] 包含的四个键值对都 rehash 到 ht[1] , 如图 3 所示。
  3. 释放 ht[0] ,并将 ht[1] 设置为 ht[0] ,然后为 ht[1] 分配一个空白哈希表,如图 4 所示。

至此, 对哈希表的扩展操作执行完毕, 程序成功将哈希表的大小从原来的 4 改为了现在的 8 。

Rehash图示

图1

图1 执行rehash之前

图2

图2 为字典的ht[1]哈希表分配空间

图3

图3 ht[0]的所有值对都已经被迁移到ht[1]

图4

图4 完成rehash之后的字典

哈希表的扩展与收缩

当以下条件中的任意一个被满足时, 程序会自动开始对哈希表执行扩展操作:

  1. 服务器目前没有在执行 BGSAVE 命令或者 BGREWRITEAOF 命令, 并且哈希表的负载因子大于等于 1 ;
  2. 服务器目前正在执行 BGSAVE 命令或者 BGREWRITEAOF 命令, 并且哈希表的负载因子大于等于 5 ;

其中哈希表的负载因子可以通过公式:

1
2
# 负载因子 = 哈希表已保存节点数量 / 哈希表大小
load_factor = ht[0].used / ht[0].size

计算得出。

比如说, 对于一个大小为 4 , 包含 4 个键值对的哈希表来说, 这个哈希表的负载因子为:

1
load_factor = 4 / 4 = 1

又比如说, 对于一个大小为 512 , 包含 256 个键值对的哈希表来说, 这个哈希表的负载因子为:

1
load_factor = 256 / 512 = 0.5

根据 BGSAVE 命令或 BGREWRITEAOF 命令是否正在执行, 服务器执行扩展操作所需的负载因子并不相同, 这是因为在执行 BGSAVE命令或 BGREWRITEAOF 命令的过程中, Redis 需要创建当前服务器进程的子进程, 而大多数操作系统都采用写时复制copy-on-write技术来优化子进程的使用效率, 所以在子进程存在期间, 服务器会提高执行扩展操作所需的负载因子, 从而尽可能地避免在子进程存在期间进行哈希表扩展操作, 这可以避免不必要的内存写入操作, 最大限度地节约内存。

另一方面, 当哈希表的负载因子小于 0.1 时, 程序自动开始对哈希表执行收缩操作。

渐进式Rehash

上面说过, 扩展或收缩哈希表需要将 ht[0] 里面的所有键值对 rehash 到 ht[1] 里面, 但是, 这个 rehash 动作并不是一次性、集中式地完成的, 而是分多次、渐进式地完成的。

这样做的原因在于, 如果 ht[0] 里只保存着四个键值对, 那么服务器可以在瞬间就将这些键值对全部 rehash 到 ht[1] ; 但是, 如果哈希表里保存的键值对数量不是四个, 而是四百万、四千万甚至四亿个键值对, 那么要一次性将这些键值对全部 rehash 到 ht[1] 的话, 庞大的计算量可能会导致服务器在一段时间内停止服务。

因此, 为了避免 rehash 对服务器性能造成影响, 服务器不是一次性将 ht[0] 里面的所有键值对全部 rehash 到 ht[1] , 而是分多次、渐进式地将 ht[0] 里面的键值对慢慢地 rehash 到 ht[1] 。

以下是哈希表渐进式 rehash 的详细步骤:

  1. 为 ht[1] 分配空间, 让字典同时持有 ht[0] 和 ht[1] 两个哈希表。
  2. 在字典中维持一个索引计数器变量 rehashidx , 并将它的值设置为 0 , 表示 rehash 工作正式开始。
  3. 在 rehash 进行期间, 每次对字典执行添加、删除、查找或者更新操作时, 程序除了执行指定的操作以外, 还会顺带将 ht[0] 哈希表在 rehashidx 索引上的所有键值对 rehash 到 ht[1] , 当 rehash 工作完成之后, 程序将 rehashidx 属性的值增一。
  4. 随着字典操作的不断执行, 最终在某个时间点上, ht[0] 的所有键值对都会被 rehash 至 ht[1] , 这时程序将 rehashidx 属性的值设为 -1 , 表示 rehash 操作已完成。

渐进式 rehash 的好处在于它采取分而治之的方式, 将 rehash 键值对所需的计算工作均滩到对字典的每个添加、删除、查找和更新操作上, 从而避免了集中式 rehash 而带来的庞大计算量。

图 5 至图 10 展示了一次完整的渐进式 rehash 过程, 注意观察在整个 rehash 过程中, 字典的 rehashidx 属性是如何变化的。

渐进式Rehash图示

图1

图5 准备开始rehash

图1

图6 rehash索引0上的key value

图1

图7 rehash索引1上的key value

图1

图8 rehash索引2上的key value

图1

图9 rehash索引3上的key value

图1

图10 rehash完成

渐进式 rehash 执行期间的哈希表操作

因为在进行渐进式 rehash 的过程中, 字典会同时使用 ht[0] 和 ht[1] 两个哈希表, 所以在渐进式 rehash 进行期间, 字典的删除(delete)、查找(find)、更新(update)等操作会在两个哈希表上进行: 比如说, 要在字典里面查找一个键的话, 程序会先在 ht[0]里面进行查找, 如果没找到的话, 就会继续到 ht[1] 里面进行查找, 诸如此类。

另外, 在渐进式 rehash 执行期间, 新添加到字典的键值对一律会被保存到 ht[1] 里面, 而 ht[0] 则不再进行任何添加操作: 这一措施保证了 ht[0] 包含的键值对数量会只减不增, 并随着 rehash 操作的执行而最终变成空表。这里提前透漏一句,就是这个渐进式rehash的缩容操作导致了scan命令的重复值出现。下一篇会详细讨论一下。

123…5
史蒂芬猴

史蒂芬猴

dora is my precious

49 日志
27 分类
38 标签
GitHub
© 2023 史蒂芬猴
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
访客数 人 总访问量 次