《多颗糖的MIT笔记》

《多颗糖的MIT笔记》

 [多颗糖](https://mp.weixin.qq.com/s?__biz=MzIwODA2NjIxOA==&mid=2247484185&idx=1&sn=055263242bfb70e727b8fffb1d879301&chksm=970980dca07e09ca97fef2fda5ad34d5f56df4edc9ce487bc69291fb621fbdd6db4af6a57560&scene=21#wechat_redirect) : 详情请参考微信公众号

1: MapReduce

导读

MapReduce 的问世让集群分布式计算流行起来,同时,作为分布式系统学习的经典案例,MapReduce 很好地展示了分布式系统的概貌,以及我们在 6.824 要学习到的一些主题。

本期材料主要来自于 MapReduce 论文MIT 6.824 2021 视频

MapReduce

MapReduce 是一个在多台机器上并行计算大规模数据的软件架构。主要通过两个操作来实现:MapReduce

工作流

图片

MapReduce 的工作流:

  • 将输入文件分成 M 个小文件(每个文件的大小大概 16M-64M),在集群中启动 MapReduce 实例,其中一个 Master 和多个 Worker;
  • 由 Master 分配任务,将 Map 任务分配给可用的 Worker;
  • Map Worker 读取文件,执行用户自定义的 map 函数,输出 key/value 对,缓存在内存中;
  • 内存中的 (key, value) 对通过 partitioning function() 例如 hash(key) mod R 分为 R 个 regions,然后写入磁盘。完成之后,把这些文件的地址回传给 Master,然后 Master 把这些位置传给 Reduce Worker;
  • Reduce Worker 收到数据存储位置信息后,使用 RPC 从 Map Worker 所在的磁盘读取这些数据,根据 key 进行排序,并将同一 key 的所有数据分组聚合在一起(由于许多不同的 key 值会映射到相同的 Reduce 任务上,因此必须进行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序);
  • Reduce Worker 将分组后的值传给用户自定义的 reduce 函数,输出追加到所属分区的输出文件中;
  • 当所有的 Map 任务和 Reduce 任务都完成后,Master 向用户程序返回结果;

实例

  • 词频统计:这里 Map 函数可以将每个单词统计输出 <word, count>,然后 Reduce 函数同一档次的所有计数相加,得到:<word, total count>
  • 分布式 Grep:Map 函数输出匹配某个模式的一行,Reduce 函数输出输出所有中间数据
  • 分布式排序:Map 函数从每个记录提取 key,输出 (key,record) 对。Reduce 函数不改变任何的值,直接输出。后面我们会介绍顺序保证

容错性

  • Worker 故障:Master 周期性的 ping 每个 Worker,如果指定时间内没回应就是挂了。将这个 Worker 标记为失效,分配给这个失效 Worker 的任务将被重新分配给其他 Worker;
  • Master 故障:中止整个 MapReduce 运算,重新执行。一般很少出现 Master 故障

性能

网络带宽匮乏

在撰写该 paper 时,网络带宽是一个相当匮乏的资源。Master 在调度 Map 任务时会考虑输入文件的位置信息,尽量将一个 Map 任务调度在包含相关输入数据拷贝的机器上执行;如果找不到,Master 将尝试在保存输入数据拷贝的附近的机器上执行 Map 任务。

需要注意的是,新的讲座视频提到,随着后来 Google 的基础设施的扩展和升级,他们对这种存储位置优化的依赖程度降低了

“落伍者(Stragglers)”

影响 MapReduce 执行时间的另一个因素是“落伍者”:一台机器花了很长的时间才完成最后几个 Map 或 Reduce 任务(例如:有台机器硬盘出了问题),导致总的 MapReduce 执行时间超过预期。

通过备用任务(backup tasks)来处理:当 MapReduce 操作快完成的时候,Master 调度备用任务进程来执行剩下的、处于处理中的任务。无论是最初的进程还是备用任务进程任务完成了任务,都将该任务标记为已完成。

其它有趣的特性

Combiner 函数

在某些情况下,Map 函数产生的中间 key 值的重复数据会占很大的比重(例如词频统计,将产生成千上万的 <the, 1> 记录)。用户可以自定义一个可选的 Combiner 函数,Combiner 函数首先在本地将这些记录进行一次合并,然后将合并的结果再通过网络发送出去。

图片

Combiner 函数的代码通常和 Reduce 函数的代码相同,启动这个功能的好处是可以减少通过网络发送到 Reduce 函数的数据量。

问题:为什么默认情况下不启用这个功能?

跳过损坏的记录

用户程序中的 bug 导致 Map 或者 Reduce 函数在处理某些记录的时候crash。通常会修复 bug 再执行 MapReduce,但是找出 bug 并修复它往往不是一件容易的事情(bug 有可能在第三方库)。

与其因为少数坏记录而导致整个执行失败,不如有一个机制可以让损坏的记录被跳过。这在某些情况下是可以接受的,例如在对一个大型数据集进行统计分析时。

Worker 可以记录处理的最后一条记录的序号发送给 Master,当 Master 看到在处理某条记录失败不止一次时,标记这条记录需要被跳过,下次执行时跳过这条记录。

顺序保证

确保在给定的 Reduce 分区中,中间 key/value 对是按照 key 值升序处理的。这样的顺序保证对输出的每个文件都是有序的,这样在 Reduce Worker 在读取时非常方便,例如可以对不同的文件使用归并排序。

但 paper 没说这个顺序保证在哪做的,看起来是在 Map Worker 中最后进行一次排序。

结语

尽管由于一些原因,Google 已经不在使用 MapReduce 了。但 MapReduce 从根本上改变了大规模数据处理架构,它通过一个简单的 API,抽象了处理并行、容错和负载均衡的复杂性,让没有相关经验的程序员也能够在计算机集群上分布式地处理大规模数据集。

Reference

2: RPC and Threads

线程

“进程与线程的区别”是面试者要背下的八股文,这里简单复习下线程。

线程是操作系统能够进行运算调度的最小单位。。大部分情况下,它被包含在进程之中,是进程中的实际运作单位。一个进程中可以并发多个线程,每条线程并行执行不同的任务。

图片

关于线程主要注意:

  • 同一进程中的多个线程共享该进程地址空间、文件描述符等,它们都可以访问全局变量;
  • 每个线程有自己的调用栈、程序计数器和寄存器。

为什么需要线程?

  • 线程实现并发,这是分布式系统所需要的。并发允许我们在一个处理器上调度多个任务,例如:线程可以让我们在等待 I/O 操作时执行其他任务,而不是等待 I/O 操作完成再继续执行;
  • 并行。我们可以在多个核心上并行执行多个任务。不同于单纯的并发,在同一时间只有一个任务在进行(取决于哪个任务在那一瞬间拥有它的 CPU 时间),并行允许多个任务在同一时间进行处理,因为它们是在不同的 CPU 核上执行的。
  • 方便。线程提供了一种在后台执行任务的便捷返回,例如:在后台每秒一次检查 worker 是否正常运行。

Go 有 Goroutines,它是轻量级线程。

线程带来的挑战

  • 死锁
  • 访问共享数据
  • 线程之间的协调。例如:一个线程在生产数据,另一个线程在消费数据,消费者如何等待数据的生产并释放 CPU?生产者如何唤醒消费者?

Go 通过 channelsync.CondWaitGroup 来处理这些问题。

另外,Go 还有一个内置的竞态数据检测器:https://golang.org/doc/articles/race_detector

Event-Driven

除了线程,还提到了事件驱动编程,一个进程只有一个线程,它监听事件,并在事件发生时执行用户指定的函数。Node.js 就使用了 Event-Driven,被称为 event loop。

在我看来,Event-Driven 实现比较困难(这很主观),在分布式系统用得较少,不展开。

RPC

远程过程调用(Remote Procedure Call,RPC)是一个计算机通信协议。该协议允许客户端通过 RPC 执行服务端的函数,就像调用本地函数一样。

图片

一个 RPC 流程一般如下:

  1. 客户端调用 client stub,并将调用参数 push 到栈(stack)中,这个调用是在本地的
  2. client stub 将这些参数包装,并通过系统调用发送到服务端机器。打包的过程叫 marshalling(常见方式:XML、JSON、二进制编码)。
  3. 客户端操作系统将消息传给传输层,传输层发送信息至服务端;
  4. 服务端的传输层将消息传递给 server stub
  5. server stub 解析信息。该过程叫 unmarshalling
  6. server stub 调用程序,并通过类似的方式返回给客户端。
  7. 客户端拿到数据解析后,将执行结果返回给调用者。

这样做的主要好处是它简化了编写分布式应用的过程,因为 RPC 将所有的网络相关的代码都隐藏到了 stub 函数中,程序员不必担心数据转换和解析、打开和关闭连接等细节。

处理失败

从客户端的角度来看,失败是指向服务端发送请求,在特定的时间内没有得到响应。这可能是由多种原因造成的,包括:数据包丢失、服务端处理速度慢、服务端宕机和网络故障。

处理这种情况很棘手,因为客户端不会知道服务端具体的情况,可能导致请求失败的原因有:

  • 服务端没有收到这个请求
  • 服务端执行了请求,但响应之前宕机了
  • 服务端执行了请求并发送了响应,但在响应之前网络故障了

最简单的办法就是重试,但是如果服务端之前已经执行了请求,重复发送请求可能导致服务端执行两次相同的请求,这也可能会导致问题。这种方法对幂等的请求很有效,但非幂等的请求需要别的方法来处理失败。

RPC 可以实现三种语义:

  • At-Most-Once:客户端不会自动重试一个请求。在这种情况下,重新发送请求是客户端的选择。
  • At-Least-Once:客户端会不断重试请求,直到收到请求被执行的肯定确认。这适用于幂等操作。
  • Exactly-Once:在这种模式下,请求既不能重复,也不能丢失。这一点比较难实现,也是容错率最低的,因为它要求必须从服务器上收到响应,不能有重复。如果我们有多台服务器,而处理初始请求的那台服务器故障了,其他服务器可能无法判断请求是否被执行了。

Go RPC 实现了 At-Most-Once 语义,如果没有得到响应,只会返回一个错误。客户端可以选择重试一个失败请求,但服务端要自己处理重复的请求。

我想到的是,可以给请求做个唯一 ID,这样重复的请求能够被检测到,就不再执行,直接返回对应的响应。但也要处理一些细节问题:

  • 如何保证多个客户端的 ID 是唯一的?可以带上客户端 ID,类似于:<client_id, seq>(和 Raft 客户端交互那部分内容对应上了!)
  • 但我们不可能无期限地保存所有的请求 ID,保存多长时间?可以在客户端的请求中包含一个额外的标识符 X,告诉服务端删除 X 之前的所有请求 ID 是安全的
  • 当原始请求还在执行时,如何处理重复的请求?可以等待它完成,也可以直接忽略新的请求。
  • 为了避免服务器宕机,ID 信息还需要写入到磁盘,也许还要跨机器多副本存储。

Reference

3: GFS

本篇主要学习 The Google File System 这篇大名鼎鼎的论文。

分布式存储(包括文件系统)是构建分布式系统的关键,许多其它分布式应用都建立在分布式存储之上。

为什么设计一个分布式存储系统会如此之难?

  • 出发点是提高性能,当单机数据量太大时,需要在多台服务器上分片(Sharding)数据;
  • 由于多台服务器,系统可能会出现更多的故障。如果你有数千台服务器,也许每天都有机器故障,所以我们需要系统能够自动容错;
  • 为了提高容错,需要复制(replication)数据到多台服务器上,一般 2-3 个数据副本;
  • 数据的复制会导致数据潜在的不一致;
  • 为了提高一致性往往会导致更低的性能,这与我们的初衷恰恰相反!

这个循环突出了分布式系统的挑战。GFS 讨论了上述的这些主题:并行性能、容错、复制、一致性,并给出了 Google 在生产环境进行的权衡。

GFS 论文的内容直观,容易理解,从硬件到软件都有讨论,是一篇非常优秀的系统论文!

GFS 的目标

主要设计目标:

  • 大型:大容量,需要存放大量的数据集;
  • 性能:自动分片(Auto-Sharding);
  • 全局:不只是为一个应用而定制,适用于各种不同的应用;
  • 容错:自动容错,不希望每次服务器出了故障,都要手动去修复;

还有一些其他的特性,比如:

  • GFS 只能在一个数据中心运行,理论上可以跨机房,但更复杂;
  • 面向内部的,不开放销售;
  • 面向顺序读写大文件的工作负载(例如前面提到的 MapReduce);

架构

图片

如图所示,GFS 集群包括:一个 master 和多个 chunkserver,并且若干 client 会与之交互。

主要架构特性:

  • chunk:存储在 GFS 中的文件分为多个 chunk,chunk 大小为 64M,每个 chunk 在创建时 master 会分配一个不可变、全局唯一的 64 位标识符(chunk handle);默认情况下,一个 chunk 有 3 个副本,分别在不同的 chunkserver 上;
  • master:维护文件系统的 metadata,它知道文件被分割为哪些 chunk、以及这些 chunk 的存储位置;它还负责 chunk 的迁移、重新平衡(rebalancing)和垃圾回收;此外,master 通过心跳与 chunkserver 通信,向其传递指令,并收集状态;
  • client:首先向 master 询问文件 metadata,然后根据 metadata 中的位置信息去对应的 chunkserver 获取数据;
  • chunkserver:存储 chunk,client 和 chunkserver 不会缓存 chunk 数据,防止数据出现不一致

master 节点

图片

为了简化设计,GFS 只有一个 master 进行全局管理。

master 在内存中存储 3 种 metadata,如下。标记 nv(non-volatile, 非易失) 的数据需要在写入的同时存到磁盘,标记 v 的数据 master 会在启动后查询 chunkserver 集群:

  • namespace(即:目录层次结构)和文件名;(nv)
  • 文件名 -> array of chunk handles 的映射;(nv)
  • chunk handles -> 版本号(nv)、list of chunkservers(v)、primary(v)、租约(v)

课程特别提到,master 会在本地磁盘存储 log,而不是存到数据库,原因是:数据库的本质是某种 B 树或者 hash table,相比之下,追加 log 会非常高效;而且,通过在 log 中创建一些 checkpoint 点,重建状态也会更快。

读文件

图片

  • client 将 文件名+offset 转为文件名+ chunk index,向 master 发起请求;
  • master 在内存中的 metadata 查询对应 chunk 所在的 chunk handle + chunk locations 并返回给 client;
  • client 将 master 返回给它的信息缓存起来,用文件名 + chunk index 作为 key;(注意:client 只缓存 metadata,不缓存 chunk 数据)
  • client 会选择网络上最近的 chunkserver 通信(Google 的数据中心中,IP 地址是连续的,所以可以从 IP 地址差异判断网络位置的远近),并通过 chunk handle + chunk locations 来读取数据;

学生提问:如果读取的数据超过了一个 chunk 怎么办?
Robert教授:我不知道详细的细节。我的印象是,如果应用程序想要读取超过 64MB 的数据,或者就是 2 个字节,但是却跨越了 chunk 的边界,应用程序会通过一个库来向 GFS 发送 RPC,而这个库会注意到这次读请求会跨越 chunk 边界,因此会将一个读请求拆分成两个读请求再发送到 master 节点。所以,这里可能是向 master 节点发送两次读请求,得到了两个结果,之后再向两个不同的 chunk 服务器读取数据。

租约

如果每次写文件都请求 master,那么 master 则会成为性能瓶颈,master 找到拥有该 chunk 的 chunkserver,并给其中一个 chunkserver 授予租约,拥有租约的 chunkserver 称为 Primary,其他叫做 Secondary,之后:

  • master 会增加版本号,并将版本号写入磁盘,然后 master 会向 PrimarySecondary 副本对应的服务器发送消息并告诉它们,谁是 Primary,谁是 Secondary,最新的版本号是什么;
  • 在租约有效期内,对该 chunk 的写操作都由 Primary 负责;
  • 租约的有效期一般为 60 秒,租约到期后 master 可以自由地授予租约;
  • master 可能会在租约到期前撤销租约(例如:重命名文件时);
  • 在写 chunk 时,Primary 也可以请求延长租约有效期,直至整个写完 chunk;

写文件

图片

如图,写文件可分为 7 步:

  1. client 向 master 询问 PrimarySecondary。如果没有 chunkserver 持有租约,master 选择一个授予租约;
  2. master 返回 PrimarySecondary 的信息,client 缓存这些信息,只有当 Primary 不可达或者租约过期才再次联系 master;
  3. client 将追加的记录发送到**每一个 chunkserver(不仅仅是 Primary)**,chunkserver 先将数据写到 LRU 缓存中(不是硬盘!);
  4. 一旦 client 确认每个 chunkserver 都收到数据,client 向 Primary 发送写请求,Primary 可能会收到多个连续的写请求,会先将这些操作的顺序写入本地;
  5. Primary 做完写请求后,将写请求和顺序转发给所有的 Secondary,让他们以同样的顺序写数据;
  6. Secondary 完成后应答 Primary
  7. Primary 应答 client 成功或失败。如果出现失败,client 会重试,但在重试整个写之前,会先重复步骤 3-7;

一致性模型

GFS 是宽松的一致性模型(relaxed consistency model),可以理解是弱一致性的,它并不保证一个 chunk 的所有副本是相同的。如果一个写失败,client 可能会重试:

  • 对于写:可能有部分副本成功,而另一部分失败,副本就会不一致。
  • 对于 record append:也会重试,但是不是在原来的 offset 上重试,而是在失败的记录后面重试,这样 record append 留下的不一致是永久的不一致,并且会让副本包含重复的数据。

图片

如图,先解释图上 definedconsistent 两个概念:

  • defined:一个文件区域在经过一系列操作之后,client 可以看到数据变更写入的所有数据;
  • consistent:所有 client 不论从哪个副本中读取同一份文件,得到的结果都是相同的;

对于 metadata:metadata 都是由 master 来处理的,读写操作通过锁保护,可以保证一致性。

对于文件数据

  • 在没有并发的情况下,写入不会互相干扰,那么则是 defined
  • 在并发的情况下,成功的写入是 consistent 但不是 defined
  • 顺序写和并发写 record append 能够保证是 defined,但是在 defined 的区域之间会夹杂着一些不一致的区域;
  • 如果出现写失败,副本之间会不一致

如何处理这种异常情况,取决于应用程序

GFS 并不是强一致性的,如果这里要转变成强一致性的设计,几乎要重新设计系统,需要考虑:

  • 可能需要让 Primary 重复探测请求;
  • 如果 Primary 要求 Secondary 执行一个操作,Secondary 必须执行而不是返回一个错误;
  • Primary 确认所有的 Secondary 都追加成功之前,Secondary 不能将数据返回给读请求;
  • 可能有一组操作由 Primary 发送给 SecondaryPrimary 在确认所有的 Secondary 收到了请求之前就崩溃了。当 Primary 崩溃了,一个 Secondary 会接任成为新的 Primary

越说越像 paxos 或 raft 算法了。还有其他情况,在此不一一举例了。

为什么 Google 最初选择弱一致性呢?教授在课堂上给出一种解释。

Robert教授:如果你通过搜索引擎做搜索,20000 个搜索结果中丢失了一条或者搜索结果排序是错误的,没有人会注意到这些。这类系统对于错误的接受能力好过类似于银行这样的系统。当然并不意味着所有的网站数据都可以是错误的。如果你通过广告向别人收费,你最好还是保证相应的数字是对的。

快照(snapshot)

GFS 通过 snapshot 来创建一个文件或者目录树的备份,它可以用于备份文件或者创建 checkpoint(用于恢复)。GFS 使用写时复制(copy-on-write)来写快照。

当 master 收到 snapshot 操作请求后:

  • 撤掉即将做快照的 chunk 的租约,准备 snapshot(相当于暂停了所有写操作);
  • master 将操作记录写入磁盘;
  • master 将源文件和目录树的 metadata 进行复制,新创建的快照文件指向与源文件相同的 chunk;

容错性

  • 快恢复:master 和 chunkserver 都设计成在几秒钟内恢复状态和重启;
  • chunk 副本:如前面提到的,chunk 复制到多台机器上;
  • master 副本:master 也会被复制来保证可用性,称为 shadow-master;

数据完整性 checksum

通常一个GFS 集群都有好几百台机器以及几千块硬盘,磁盘损坏是很经常的事情,在数据的读写中经常出现数据损坏。

每一个 chunkserver 都是用 checksum 来检查存储数据的完整性。

每个 chunk 以 64kb 的块进行划分 ,每一个块对应一个 32 位的 checksum,存到 chunkserver 的内存中,通过记录用户数据来持久化存储 checksum。对于读操作,在返回给 client 之前,chunkserver 会校验要读取块的 checksum

为什么是 64Kb 呢?我猜测应该是 64Mb/64Kb 好计算吧。

FAQ

原文:http://nil.csail.mit.edu/6.824/2021/papers/gfs-faq.txt

chunk 大小为什么是 64 MB?

  • 较大的 chunk 较少了 client 与 master 的通信次数;
  • client 能够对一个块进行多次操作,这样可以通过与 chunkserver 保持较长时间的 TCP 连接来减少网络负载;
  • 减少了 metadata 的大小;

带来的问题:

  • chunk 越大,可能部分文件只有 1 个 chunk,对该文件的频繁读写可能会造成热点问题。

值得一提的是,GFS 的继任者 Colossus 将 chunk 大小下调到了 4MB。

为什么是 3 个副本?

选择这个数字是为了最大限度地降低一个块坏的概率。

一项关于磁盘的研究:https://research.google.com/archive/disk_failures.pdf

论文中提到了引用计数——这是什么?

引用计数用来实现 copy-on-write 生成快照。当 GFS 创建一个快照时,它并不立即复制 chunk,而是增加 GFS 中 chunk 的引用计数,表示这个 chunk 被快照引用了,等到客户端修改这个 chunk 时,才需要在 chunkserver 中拷贝 chunk 的数据生成新的 chunk,后续的修改操作落到新生成的 chunk 上。

总结

图片

权侵删:**https://juicefs.com/blog/cn/posts/distributed-filesystem-comparison/

就写到这里吧,本来打算对比下 HDFS、GlusterFS、MooseFS 和最近开源的 JuiceFS 等分布式文件系统,但文章越来越长,精力有限,也超出学习笔记的范畴,只好就此打住。如果你想要继续看分布式文件系统相关分析,留言告诉我。

GFS 生涯的前 5-10 年在 Google 表现出色,取得了巨大成功。但 GFS 是在 2000 年出头刚开始构建分布式系统,所以很多东西是非标准的,时间久了便诞生很多问题。最终,Google 公布了 Colossus 项目,作为 Google 下一代分布式文件系统。

GFS 最严重的局限性就在于它只有一个 master 节点(这篇文章讨论了这个问题:https://queue.acm.org/detail.cfm?id=1594206), 单个 master 会带来以下问题:

  • 随着 GFS 的应用越来越多,文件也越来越多,最后 master 会耗尽内存来存储 metadata;你可以增加内存,但单台计算机的内存始终有上限;
  • master 节点要承载数千个 client 的请求,master 节点的 CPU 每秒只能处理数百个请求,尤其是还要将部分数据写入磁盘——client 的数量会超过单个 master 的能力;
  • 弱一致性会导致应用程序很难处理 GFS 奇怪的语义;
  • 最后一个问题,master 的故障切换不是自动的,需要人工干预来处理已经永久故障的 master 节点,并更换新的服务器,这需要几十分钟甚至更长的时间来处理。对于某些应用程序来说,这个时间太长了。

所以,我们需要一个多副本、多活、高可用、故障自修复的分布式系统!一个自动切到可用副本的复制算法。

学过的同学都知道,这将引出了我们的分布式共识算法!

Reference

4: 主从复制(Primary/Backup Replication)

这一课讨论关于容错(Fault-Tolerance)和复制(Replication)的问题,主要研究 VMware FT 的论文 —— The Design of a Practical System for Fault-Tolerant Virtual Machines

复制的方式

在集群中实现复制主要通过两种方式:

  • **状态转移(State Transfer)**:主机(Primary)将自己所有的状态,拷贝并发送给备机(Backup),一般是增量备份;
  • **复制状态机(Replicated State Machine)**:将备机视为一个确定的状态机——client 发送操作到主机,主机按顺序发送到备机,所有备机执行所有的操作,如果从同一起始状态,以相同的顺序输入相同的操作,它们的输出将是相同的。

VMware FT 使用了复制状态机的方法。

状态转移传输的可能是内存,而复制状态机传输来自客户端的操作或者其他外部事件,人们倾向于使用复制状态机的原因是,外部操作或事件通常比服务的内存状态要小。例如,如果是一个数据库,它的内存状态可能达到 GB 级别。

复制的挑战

要考虑的几个 Big Question:

  • 我们要复制哪些状态?
  • 主机必须等待备机备份完吗?
  • 什么时候切换到备机?
  • 切换时能否看到异常情况?
  • 如果有个副本故障了,我们需要重新添加一个新的副本,这可能是一个代价很高的行为,因为副本可能非常大,如何提升添加新副本的速度?

让我们看看虚拟化巨头 VMware 是怎么做的。

VMware FT 论文总结

总览

图片

如图 1,约定:主虚拟机(Primary VM)简称为主机,Backup VM 简称为备机

VMware FT 需要两台物理服务器,主机与备机保持同步,虚拟机的虚拟磁盘在共享存储上。

所有的输入(如网络、鼠标、键盘等)都会输入到主机,然后通过 Logging channel 转发到备机,对于非确定性的操作,还将发送额外的信息,确保备机以确定性的方式执行这些操作。

两台虚拟机都会执行输入操作,但只有主机的输出会返回客户端,备机的输出会被管理程序丢弃

确定性重放(Deterministic replay)

不确定性(Non-Deterministic)事件比如虚拟中断,不确定性操作比如从处理器读取时钟周期计数器,可能会让主机和备机的运行结果不一样。

这带来三个挑战:

  • 正确捕获所有输入和必要的不确定性输入来保证备机确定性执行;
  • 正确在备机执行不确定性输入;
  • 不降低系统的性能;

VMware **确定性重放(deterministic replay)**能够捕获所有输入和可能的不确定性输入,并写到日志文件记录下来。通过读取日志文件,可以准确重放虚拟机的执行。

对于不确定性输入,必须记录足够的信息来重放,但是论文中没有描述具体的日志格式,Robert 教授猜测可能有三种记录:

  • 事件发生时的指令序号;
  • 日志类型。可能是普通的网络数据输入,也可能是怪异的指令;
  • 数据。

FT 协议(FT Protocol)

VMware FT 通过确定性重放来产生相关的日志条目,但不将日志写入磁盘,而是通过 logging channel 发送给备机。备机实时重放日志项。

为了容错,必须在 loggin channel 上实现严格的容错协议,有以下要求:

输出要求:如果备机在主机故障后接管,备机将以和主机已经向外界发送的输出完全一致的方式继续运行。

最简单的方式是对每一个输出操作创建一个特殊的日志项。

但有一种情况,假设虚拟机运行的是数据库,主机备机的数据都是 10。现在客户端发送自增请求,主机做了 +1 并回复给客户端 11,之后马上宕机了,更糟糕的是主机发送给备机的 +1 操作也丢包了。这时候备机还是 10,并接管了主机的工作,客户端再次请求 +1,又会收到 11 的回复。客户端会得到一个怪异的结果(自增两次还是 11)。

所以要求:

输出规则:主机直到备机接收并确认了和输出相关的日志的时候,才发送输出给外界。

这样做的目的是,只要备机收到了所有的日志条目,即使主机宕机了,备机仍能够重放到客户端最后看到的状态。

图片

如图 2 所示,向外界的输出会被延迟,直到主机收到来自备机的确认。

几乎每一个复制系统都有这个问题:在某个时间点,主机必须停下来等待备机,这肯定会限制性能。

图片

注意:因为没有两阶段提交事务,不能保证所有的输出只被生成一次。备机无法判断主机是在宕机之前还是之后发送了最后的输出,备机可能会重新执行一次输出操作。不过,VMware 通过其网络基础设施来检测重复数据包,并防止输出重传到客户端。

发现与处理故障

主机和备机必须快速知道另一方故障,通过 udp 心跳包监控 logging channel 上的流量相结合来检测,如果心跳超时或 logging channel 流量停止则表明故障。

如果备机故障,主机就会停止向 logging channel 发送日志,继续正常运行。

在这之后备机怎么追上主机呢?VMware有一个工具叫 VMotion,它能够在最小程度上中断虚拟机的执行,克隆一个虚拟机。

如果主机故障,备机必须先重放,直到消耗完最后一个日志项。然后备机接替主机,开始向客户端生产输出。

为了确保一次只有一个虚拟机成为主机,避免出现脑裂,VMware 在共享存储上执行一个原子的 test-and-set 锁指令。该操作每次只能对其中一台机器返回成功,这在主机和备机因为网络分区都想接替工作时很有用。但如果共享存储因为网络问题不能访问,那么无论如何都不能正常工作。

当其中一台虚拟机发生故障时,VMware FT 会在另一台物理机上自动启动新的备份虚拟机来恢复冗余。

FT 的实际实现细节

上一节描述了容错的基础设计和协议,但为了创建一个可用的、健壮的自动化系统,还需要设计和实现许多其他组件。

启动和重启 FT VMs

一个挑战是,如何在主机运行时,以和主机相同的状态启动备机?为了解决这个问题,VMware 提供一个名为 VMware VMotion 的工具,允许在最小化中断的代价下,将运行中的虚拟机从一台服务器迁移到另一台服务器。为了容错,该工具被重新设计为 FT VMotion,以允许将虚拟机克隆到远程主机上,这种克隆操作打断主机的时间不超过 1 秒。

管理 Logging Channel

图片

上图 3 说明了日志从主机产生到在备机上消费的过程。

虚拟机管理程序维护了一个大的日志缓冲(log buffer),保存主机和备机的日志。主机会产生日志项到日志缓冲,备机从日志缓冲消费日志。

如果备机读到空的日志缓冲,则会暂停运行直到日志缓冲有日志;如果主机写日志的时候发现日志缓冲满了,也会暂停运行直到日志项被清除——这种暂停会影响虚拟机的客户端。因此,我们的实现必须最小化主机日志缓冲写满的可能性。

通常,主机日志缓冲满的原因:

  • 带宽太小,建议日志通道带宽 1Gbit/s
  • 备机执行速度太慢,从而消费日志太慢时,主机的日志缓冲也可能会被填满;

在 VMware FT 中已经实现了一种机制,当备机远远落后时(根据论文中的说法,落后超过1秒),可以减缓主机的执行速度。通过减少主机的 CPU 资源来减慢速度。

注意对于主机的减速是很罕见的,通常只在系统处于极端压力的情况下发生。

磁盘 IO 实现问题

有一些和磁盘 IO 相关的细微的实现问题。

问题 1:非阻塞的磁盘操作可以并行执行,因此对同一磁盘位置的同时访问可能导致不确定性。

解决方案:检测所有这类 IO 竞争,然后强制这些竞争的磁盘操作以相同的方式在主机和备机上顺序执行。

怎么检测?论文也没说。

问题 2:虚拟机上的应用程序(或操作系统)的磁盘操作也可能导致内存的竞争

解决方案:通过 Bounce buffer—— 一个和磁盘操作正在访问的内存大小一致的临时缓冲来解决。磁盘读操作被修改为在 bounce buffer 中读取特定数据,并且数据仅在IO操作完成并传递完成的时候拷贝到虚拟机内存。类似的,对于磁盘写操作,将要被发送的数据会先拷贝到 bounce buffer,磁盘写操作修改为写数据到 bounce buffer。

Bounce buffer 的使用会减慢磁盘操作,但是论文表示还没有看到任何明显的性能差异。

问题3:磁盘 IO 因主机故障在主机上没有完成,备份接管后怎么办?

解决方案:发送错误来表明 IO 失败,然后重试错误的 IO。

替代方案

本节讨论一些替代方案,以及他们所做的权衡。

图片

共享磁盘与非共享磁盘:VMware FT 使用了一个主备机都能访问的共享存储。一个替代方案是使用单独(非共享)的虚拟磁盘,主备机分别写入这些磁盘。这种设计可以用在共享存储不能同时被主、备机访问,或者共享存储太贵的情况下。缺点是需要做额外的工作,必须同步磁盘状态。

在备机上执行磁盘读取:在目前实现中,备机绝不会从磁盘进行读取,磁盘操作被认为是一种输入。一个替代的设计是,备机可以执行磁盘读取,当有大量磁盘读的工作负载时,这种方法可以帮助减少日志通道的流量。然而,这种方法有两个主要挑战:

  • 可能会减慢备机的执行速度,因为备机必须执行所有的磁盘读;
  • 如果读取在主机上成功,但在备机上失败(反之亦然)怎么办?必须做一些额外的工作来处理失败的磁盘读操作。

VMware 的性能评估显示,在备机上执行磁盘读取会降低 1-4% 的吞吐量,但同时也降低了日志带宽。

FAQ

来自:https://pdos.csail.mit.edu/6.824/papers/vm-ft-faq.txt

Q: GFS 和 VMware FT 都提供了容错性,哪一个更好?

FT 提供计算容错,你能用它为任何已有的网络服务器提供容错性。FT 提供了相当严格的一致性而且对客户端和服务器都是透明的。例如,你可以将 FT 应用于一个已有的邮件服务器并为其提供容错性。

GFS 只提供存储容错,因为 GFS 只针对特定的简单服务(存储)提供容错性,它的备份策略会比 FT 更高效。例如,GFS 不需要使中断都以完全相同的指令发生在所有的副本上。GFS 通常只会用于一个对外提供完整容错服务的系统的一部分。例如,VMware FT 本身也依赖了一个在主备机间共享的有容错性的存储服务,而你则可以用类似于 GFS 的东西来实现这个共享存储(虽然从细节上来讲 GFS 不太适用于 FT)。

Q: 共享存储上的原子 test-and-set 指令是什么?

在共享存储上的一个服务,最初状态为 false,主机或备机认为对方宕机了,自己应该接管的时候,首先要向共享存储发送一个 test-and-set 操作,伪代码是:

1
2
3
4
5
6
7
8
9
test-and-set() {
acquire_lock()
if flag == true:
release_lock()
return false
else:
flag = true
release_lock()
return true

只有当返回 true 时才能接管主机。主要为了避免当主、备机出现网络分区,都想接管时出现脑裂(即同时有两个主机)的情况。

这有点像一个分布式锁。问题是:伪代码没有展示什么时候 flag会被设为 false

老师解释:论文没有提及什么时候将 flag 重设为 false,也许是管理员人为的操作,也许是交给机器来清理。

小结

本节讨论了主从复制这个经典的分布式话题,主备模式几乎是数据库系统最常见的高可用方案,这篇论文扩充了我的主从复制的了解。

但论文也提到这个系统的一个局限性:仅支持单处理器,多核并行所涉及的不确定性将使系统的实现更具挑战性。现实中的 VMware 是支持多核,但 VMware FT 的论文完全没有讨论,这需要去查阅更多的资料。

Reference

5: 2021 Raft 实现细节

最近一直没更新,就是因为在搞 6.824 的 Raft 实验,2021 年的实验和以往有些不一样,上周终于把 Raft 所有的测试都通过了,记录一下实现过程中的心得。

关于 Raft 的细节不再赘述,还不清楚的读者欢迎点击查看:

之前我把代码放到 github 上,6.824 的老师发邮件给我让我转为私有仓库,说是会影响教学效果,所以这里不再公开源码,有任何实现上的困难可以加入学习群(群二维码在末尾),也请您不要公开您的实现,尊重自己和别人的劳动成果。

整个实验顺序大概是:

  1. 先实现选举
  2. 实现 log 复制,同时记得要加强选举
  3. 实现持久化,这是比较简单的,在需要持久化变量做更改的地方调用持久化函数即可
  4. 实现快照,同时要改掉所有日志索引的计算方式,包括选举那部分的

如果你想更有成就感,看到这里你就可以关了。

基本框架

2021 版的实验中,已经提前给出 ticker() 这个 goroutine,并且在实验手册中说明:不要使用 Go 的 time.Timertime.Ticker,这两个东西非常难以正确使用。

原文:Don’t use Go’s time.Timer or time.Ticker, which are difficult to use correctly.

所以,不能照着网上的很多答案,再去写 <-electionTimer.C 这样子的代码了。顺便说一下,我也很讨厌 <-Timer.C 这样的语法,这是个啥?

课程说用 time.Sleep() 来实现,我也喜欢这样的方式,如果后面想用 C++ 再实现一个 Raft(关注吧,我会在未来实现的),你不可能指望在 C++ 找到 <-Timer.C 这样的东西,但 time.Sleep() 是每一种语言都通用的。

所以,基本的框架大概这样(我删掉了一些判断,你千万不要直接拿去用,稍微改改,问题不大):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (rf *Raft) ticker() {
for rf.killed() == false {

time.Sleep(electionTimeout)

// check heartbeats time
duration := time.Since(rf.getHeartbeatTime())
if duration > electionTimeout {
rf.election()
} else {
continue
}
}
}

最好封装一个角色转换的函数,无论是 becomeLeader(), becomeFollower() 还是 changeState(state int),很多情况下需要变更角色。

接着就照着论文中的 Figure 2 实现,但有些容易让人迷惑的地方,我直接在下文中写出来。

每个 server 都要实现

  • Follower 收到任期不小于自己的心跳 RPC,都要更新自己的心跳计时器;
  • 如果 request.Term > currentTerm: currentTerm = request.Term,并且转为 Follower(没有这个条件,在两个节点选举的时候,会一直互相拿不到票)
  • 提交日志的时候,判断 commitIndex > lastApplied

RequestVote RPC 实现

  1. if args.Term < currentTerm: return false
  2. if votedFor is null or candidateId: 收到此投票请求的服务器 V 将比较谁的日志更完整,所谓更完整,就是:(lastTermV > lastTermC) ||(lastTermV == lastTermC) && (lastIndexV > lastIndexC) 将拒绝投票;(即:V 的任期比 C 的任期新,或任期相同但 V 的日志比 C 的日志更完整);

第 2 点在 lab 2A 中可以先不实现,在 lab 2B 必须实现。

AppendEntries RPC 实现

对于比较直观的内容这里就不说了,下面的内容其实都在 Figure 2 中,只不过需要细看,实现的时候容易忘掉或者忽略掉。

  • 记得更新心跳计时器,如果一个操作耗时比较久,多更新两次也没关系。
  • Figure 2 提到,log 的 first index is 1,所以在初始化 log 的时候先插入一条空的记录,从 index 1 开始,否则在后续的实现过程中,会被边界条件搞到直接放弃。
  • 日志不能直接 append,遇到同一个 indexterm 不一样的,以 Leader 为准,直接覆盖或丢弃 Follower 冲突的日志。怎么判断呢?从 args.PrevLogIndex 开始判断就行了。
  • 论文中提到,nextIndex[]matchIndex[] 除了 Make 的时候要初始化,还要 Reinitialized after election,不要忘了!
  • 对于 lab 2B,如果 AppendEntries RPC 返回失败,nextIndex-- 即可。

下面是比较关键的两个问题,一个日志复制完了,啥时候提交呢?

Leader 什么时候提交?

我们知道,多数派达成的时候提交,但论文写出的明显没有这么简单:

If there exists an N such that N > commitIndex, a majority of matchIndex[i] ≥ N, and log[N].term == currentTerm: set commitIndex = N

即,Leader 收到 AppendEntries RPC 的响应后,然后根据上面的算法找得到 N, 就可以提交了。在实验中,就是可以扔 msg 到 applyCh 了。

那为什么不是简单的日志复制到多数派就提交呢?请看《条分缕析 Raft 算法》中的“延迟提交,选出最佳 Leader”章节。

Follower 什么时候提交?

还是原文:

If leaderCommit > commitIndex, set commitIndex =min(leaderCommit, index of last new entry)

这里就可以提交了。道理很简单,Leader 都提交了,Follower 跟着提交就完事了。

到这里,lab 2B 应该也可以做完了。

快速处理日志冲突

前面说了,lab 2C 是比较简单的,在需要持久化的变量做更改的地方调用持久化函数即可。

但是,6.824 需要快速处理日志冲突,即,实验中如果每次都让 nextIndex-- 太慢了。

我们先看论文原文:

If desired, the protocol can be optimized to reduce the number of rejected AppendEntries RPCs. For example, when rejecting an AppendEntries request, the follower can include the term of the conflicting entry and the first index it stores for that term. With this information, the leader can decrement nextIndex to bypass all of the conflicting entries in that term; one AppendEntries RPC will be required for each term with conflicting entries, rather than one RPC per entry. In practice, we doubt this optimization is necessary, since failures happen infrequently and it is unlikely that there will be many inconsistent entries.

论文说,如果因为日志冲突拒绝了 AppendEntries RPC,可以在响应体中包含冲突的 index 和 term,根据这个信息,Leader 可以快速定位到冲突的位置,一次 RPC 就搞定。否则像之前那样递减 nextIndex,一次只能判断一个 Entry 有没有问题,可能会好多次 RPC。

但作者也提到,在实践中,他怀疑这个优化是否是必须的,因为这样的大规模不一致其实是比较少见的

不过既然实验要我们实现,我们就去实现。

简单来说,考虑三种情况(例子来自讲义:6.824 2020 Lecture 7: Raft (2)):

Case 1 Case 2 Case 3
S1 4 5 5 4 4 4 4
S2 4 6 6 6 4 6 6 6 4 6 6 6

S2 是任期 6 的 Leader,S1 刚重启加入集群,S2 发送给 S1 的 AppendEntries RPC 中 prevLogTerm=6

我们用三个变量来快速处理日志冲突:

  • XTerm:冲突 entry 的任期,如果存在的话
  • XIndex:XTerm 的第一条 entry 的 index
  • XLen:缺失的 log 长度,case 3 中 S1 的 XLen 为 1

上面三种情况对应:

  • Case 1:Leader 中没有 XTerm(即 5),nextIndex = XIndex
  • Case 2:Leader 有 XTerm(即 4),nextIndex = leader's last entry for XTerm
  • Case 3:Follower 的日志太久没追上,nextIndex = XLen(这里是按照讲义上写的,勘误:确实是 nextIndex = XLen)

快照

为啥需要快照?不能放任 log 一直膨胀,我们的存储空间有限,并且会影响重启时日志重放时间。

先确定哪些日志不能做快照?

  • 没应用的
  • 没提交的

宕机重启后执行顺序是怎样的?

  1. 从磁盘读取快照
  2. 从磁盘读取持久化的日志
  3. 将 Raft 的 lastApplied 设为 lastIncludedIndex避免重新执行已经执行的日志(记得,这条也很重要!)

问题:如果 Leader 要将已经压缩的日志传给滞后的 Follower 怎么办?

  • 此时可能 nextIndex < log start index,不能再用 AppendEntries RPC 来修复(这句话还已经暗示了,这个条件要在 sendAppendEntries 里做判断)
  • 所以我们需要 InstallSnapshot RPC 来传递快照

实验提供了两个函数:

  • Snapshot(index int, snapshot []byte):生成 index 之前包括 index 的 log 的快照,Raft 截断这部分日志,只保存尾部的 log;
  • CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool:判断是否要安装快照,如果这里一直返回 true你将获得满分。(这里我按照一直返回 true 实现的,如果你实现困难,可以先在这里改改降低难度。)

You are free to implement Raft in a way that CondInstallSnapShot can always return true; if your implementation passes the tests, you receive full credit.

一些 Tips:

  • 不用实现 offsetdone,一次性发完所有快照,简单很多;
  • Raft 不能再使用 log 的位置或长度来确定日志的 index,所以最好有个独立于日志位置的索引。(当然你非要每次都基于 includedIndex 计算也可以,不推荐,打日志的你怎么一眼看出 index 是什么?);
  • 即使 log 被截断,AppendEntries RPC 还是需要发送之前一条日志的 indexterm,所以我们是不是需要持久化 lastIncludedTerm/lastIncludedIndex 呢?(独立思考)
  • InstallSnapshot RPC 按照 Figure 13 实现就行了,记得做完快照以后,还是要在 log 索引 0 位置插一条空的日志,让 index 从 1 开始。

其它没什么好说的,最关键的是,要把之前直接获取日志索引的地方都改掉,所以你最好写几个获取日志的函数,例如 getLastIndex() 等等。

一个坑:Go 竞态条件下的 slice 处理

1
AppendEntries RPC` 需要传 `entries[]` 参数,这里一开始我直接写成:`entries: rf.log[nextIndex:]

但这样如果多跑几遍测试,就会发现有问题。简单来说,Go 中 slice 是引用类型,传递的是值的拷贝,修改看起来不同的 slice 但其实是指向同一个地址空间。需要用 copy() 做深拷贝。

详见:https://stackoverflow.com/questions/38923237/goroutines-sharing-slices-trying-to-understand-a-data-race

6: ZooKeeper 设计原理

本周学习 ZooKeeper,主要讨论以下两个问题:

  • 分布式协调系统可以有一个独立的通用服务来处理吗?API 应该是什么样的?其他分布式应用程序如何使用它?
  • 我们花了很多钱购买 N 倍数量的服务器,能否将系统的性能提高 N 倍?

由于网上讨论 ZooKeeper 的内容已经非常多了,本文尽量避免重复,但一些关键的内容还是需要反复讨论:ZooKeeper 的架构和两个保证,本文还讨论一些细节的问题,但不展开讨论如何使用 ZooKeeper 进行开发。

注:本文根据 6.824 教学视频和讲义整理,以论文 ZooKeeper: Wait-free coordination for Internet-scale systems 的内容为主,不讨论 Apache ZooKeeper 源码。

ZooKeeper 是什么?

ZooKeeper 是一个分布式协调服务,什么叫协调服务好像也比较抽象,具体来说,ZooKeeper 提供:

  • 统一命名服务
  • 配置管理
  • 成员管理
  • Leader 选举
  • 协调分布式事务
  • 分布式锁

就是分布式系统中经常用到的服务,ZooKeeper 并没有直接提供这些服务,而是提供 API 供开发者实现自己需要的服务。

ZooKeeper 是一个独立的、通用的服务,用来帮助开发者轻松构建分布式应用。相比于上一章学习的 Raft,Raft 可以用于一些多副本系统中,但一般来说 Raft 并不是一个可以直接交互的独立服务。ZooKeeper 实际上运行在 Zab 之上,站在一个更高的系统设计维度上看,Raft 和 Zab 是一样的,都是为了让多个服务器达成共识。至于 Zab 和 Raft 的细节异同,此文不表。

ZooKeeper 技术架构

和之前学习的 Raft 类似,ZooKeeper 集群分为 Leader 和 Follower;和 Raft 有所不同,Raft 只有 Leader 能处理读写请求,ZooKeeper 允许所有节点处理读请求,但写操作仍然只发送给 Leader,这样做的缺点是读操作可能会返回过时的数据,但提高了读的性能。考虑下 Raft,当我们加入更多服务器时,Leader 几乎可以确定是一个瓶颈,因为 Leader 需要处理每一个读写请求,需要将每个请求的拷贝发送给每个其他服务器,随着服务器数量的增加,Raft 系统的性能可能反而会降低。

图片

ZooKeeper 专门为大量的读负载而设计的系统,所以允许所有节点处理读请求,除了 Leader 以外的任何一个副本的数据都可能不是最新的。

但如果系统都不提供线性一致性,我们为什么还要相信这个系统是可用的?

ZooKeeper 有两个基本的一致性保证:线性写和先进先出(FIFO)的客户端请求

线性写

All requests that update the state of ZooKeeper are serializable and respect precedence.

Leader 保证写操作的顺序,并且该顺序在所有 Follower 上保持一致。注意这里用了串行化(serializable)而不是线性一致性(linearizability)。

前面提到 ZooKeeper 不保证线性读。例如,client A 更新了键 X 的值,而 client B 在另一台服务器上读取键 X 的值可能会读到更新之前的值——ZooKeeper 只保证线性写

先进先出的客户端请求

All requests from a given client are executed in the order that they were sent by the client.

另一个保证是,每个客户端可以为其操作(读和写)指定一个顺序,ZooKeeper 会按照客户端指定的顺序来执行。

该如何理解这里呢?分两种情况讨论。

对于写请求,所有的写请求会以客户端发送的相对顺序,加入到所有客户端的写请求中,保证满足线性写。所以如果一个客户端要求,先完成写操作 A,接着写操作 B,之后是写操作 C,那么,在最终整体的写请求中将看到是先写 A 再写 B 再写 C 的,但 A B C 可能不是相邻的

对于读请求,由于读请求不需要经过 Leader,可能会复杂一些,多个读请求可能先在先在某个副本,但这个副本宕机了,剩余读请求切换到了另外的副本上。

ZooKeeper 通过 zxid 来实现,zxid 是最后一个事务的标记,当客户端发出一个请求到一个相同或者不同的副本时,会在请求带上 zxid 标记,副本通过检查客户端的 zxid 和自己的 zxid,保证读到的是更新的 zxid 的数据(没有具体说怎么处理,是阻塞等待还是拒绝请求)。

更进一步,如果同一个客户端发送一个写请求<X, 17>,然后立即去某个副本服务器读 X,这里会暂缓一下读请求,直到这个副本发现写请求的 zxid 已经执行了,即客户端将会读到 <X, 17>,不会读到过期的数据。

这样子看,ZooKeeper 究竟是不是线性一致的?

按照教授的说法,认为 ZooKeeper 不是线性一致的,但也不是完全非线性一致的,只能说对于单个客户端请求来说是线性一致的。

那么 zxid 必须要等到写请求执行完成才返回吗?

实际上不知道具体是如何工作的,可能需要看看 Apache ZooKeeper 源码,但这是个合理的假设。

同步操作 sync

为什么 ZooKeeper 如此流行,其中一个原因是,有一个弥补线性一致性的方法。

ZooKeeper 提供一个 sync 操作,本质上是一个写请求,如果想读到最新写入的数据,可以发送一个 sync 请求,最终会在所有副本中读到最新的数据。

这其实与前面提到的 FIFO 客户端请求类似,sync 就是一个写请求,然后后面跟着一个读请求,保证读请求能读到自己写请求的内容。

同时也要认识到,**sync 是一个代价很高的操作,因为我们将一个读操作转换成了一个写+读操作**,如果不是必须,还是不要这么做。

数据模型

ZooKeeper 用 znode 表示内存数据节点,znode 以层次命名空间的方式组织起来称为数据树(data tree),与文件系统非常类似。例如,使用 /A/B/C 给出 znode C 的路径,C 的父节点是 B,B 的父节点是 A。

我猜,znode 就是借用文件系统 inode 来命名。

图片

客户端可以创建两种类型的 znode:

  • 普通(Regular),Apache ZooKeeper 也叫持久的(Persistent):持久化存储的普通节点。
  • 临时(Ephemeral):在会话结束(主动结束或者因为故障结束)时自动删除的节点,也可以显示删除。

所有 znodes 都存储数据。除了临时 znode,所有 znode 都可以有子节点。

创建新的 znode 节点时,还可以指定 sequential 标识创建顺序的 znode,当设置了这个标识后,znode 的名字末尾会添加上一个单调递增计数器,即 name+ seqno,由父节点维护,如果 n 是新节点,p 是父节点,n 的 seqno 将大于所有在 n 之前创建的 p 的子节点的 seqno

这样组合后其实有四种类型的 znode:

  • PERSISTENT
  • EPHEMERAL
  • PERSISTENT_SEQUENTIAL
  • EPHEMERAL_SEQUENTIAL

ZooKeeper 实现

图片

如图,ZooKeeper 主要由以下组件构成:

  • 请求处理器(Request Processor):将收到的请求转为幂等的事务,根据版本信息,生成包含新数据、版本号和更新的时间戳的 setDataTXN
  • 原子广播(Atomic Broadcast):使用 zab 协议达成共识,这里不展开
  • 多副本数据库(Replicated Database):将内存状态存储为模糊快照(fuzzy snapshot),用来恢复状态。做快照时不锁定当前状态。

模糊快照论文中有简单解释,例如:

1
2
3
⟨SetDataTXN, /foo, f2, 2⟩ 
⟨SetDataTXN, /goo, g2, 2⟩
⟨SetDataTXN, /foo, f3, 3⟩

在处理这些状态变更之后,/foo = f3/goo = g2。但模糊快照可能记录 /foo = f3/goo = g1,版本分别为 3 和 1,这不是 ZooKeeper 数据的最终状态。如果服务器宕机从快照恢复,需要 Zab 重新发送状态变更,和快照一起重放内存状态,最终保证与宕机前的服务状态一致。

客户端 API

create(path, data, flags):使用 path 创建一个新的 znode 节点存储 data,仅第一次创建会成功。flags 用于创建普通或者临时节点,也可以设置 sequential 标识。
delete(path, version):如果 znode.version = version,则删除 znode。
exists(path, watch):如果指定 path 的 znode 存在则返回真,如果不存在则返回假。watch 标识用于在 znode 上设置监视器。
getData(path, watch):返回数据和元数据,如版本信息。watch 标识与 exists() 的 watch 标识一样,但如果 znode 不存在则不会设置监视器。
setData(path, data, version):如果 znode.version = version,则更新 znode 上的 data。
getChildren(path, watch):返回 znode 所有子节点的名称。
sync(path):等待所有更新操作发送到客户端连接的服务器。

几个重要特性:

  • 排他地创建 znode,有且仅有一个 create 返回成功
  • 当客户端连接到 ZooKeeper 时,建立一个会话(session)
  • 所有的方法都有同步和异步的版本
  • 更新操作(deletesetData)会有预期的版本号,如果与 znode 的实际版本号不同,操作将失败
  • 用 watch 来避免轮询

ZooKeeper 应用

对应前面学习到的分布式系统,可以应用在:

  • VMware-FT 中的 test-and-set 服务;
  • GFS 中的 master 可以使用 ZooKeeper 来扮演,甚至可以提高性能,因为所有副本都能提供读服务;
  • 在 MapReduce 中用来管理成员信息,谁是当前的 master、谁是 worker、worker 列表、什么工作分配给什么 worker 等等;

论文中还提到一些配置管理、成员管理、锁、读写锁等应用,Apache ZooKeeper 有更详细的代码示例,网上资源也很多了,在此不表。

遗憾:未涵盖的主题

  • 如何持久化
  • batching 和 pipelining 性能的详细信息
  • fuzzy snapshot 实现细节
  • 幂等运算细节
  • 重复的客户端请求检测

也许看看 Apache ZooKeeper 源代码能有答案。

FAQ

来源:https://pdos.csail.mit.edu/6.824/papers/zookeeper-faq.txt

Q: 线性化(linearizability)和串行化(serializability)有什么不同?

通常来说串行化很像线性化,但不要求遵守实时顺序。详细解释可以查看:http://www.bailis.org/blog/linearizability-versus-serializability/

ZooKeeper 论文的 2.3 节里写到一致性是 serializable 的。

还可以看看:https://jepsen.io/consistency

Q: 什么是流水线(pipelining)

首先,ZooKeeper 的 Leader 会将多次请求合并成一次发送到磁盘和网络,这里是利用 batching 来解决请求很多的情况。

其次,pipelining 可以让客户端不用等待请求返回,就可以继续处理后面的请求,就像异步的操作。

这里和 Raft 提到的性能优化中的 Batching 和 Pipelining 一模一样。

Q: 什么是 wait-free?

精确的定义是:一个 wait-free 的并发数据对象保证任何进程都能在有限的步骤中完成任何操作,无论其他进程的执行速度如何。可以查阅论文:https://cs.brown.edu/~mph/Herlihy91/p124-herlihy.pdf

ZooKeeper 是 wait-free 的,因为它在处理一个客户端请求的时候,无需等待其他客户端的结果。

不过,根据 watch 机制,ZooKeeper 客户端有时候也会等待别的客户端的结果。

Q: Leader 如何知道客户端异步更新时的执行顺序?

论文没说,可能是客户端对异步请求进行编号,追踪每个会话的请求列表。

Q: 如果客户端没有收到请求的响应,它会做什么?

论文没说,Leader 可能会跟踪它收到的每个请求,过滤掉一些重复的请求。和 Lab 3 类型。

Q: 如果客户端发送异步写,然后立即执行读操作,读操作会看到写操作的结果吗?

论文没有明说,但是按照 FIFO 的客户端请求的含义,应该是可以看到的写操作的结果的。这似乎意味着,读操作可能会阻塞,直到服务器收到前面的写操作。

Q: 为什么要实现 fuzzy snapshots?

一个精确的快照将对应日志中的一个特定点,该快照包含这个点之前所有的写,不包含之后所有的写,所以需要防止在创建和写入快照时发生任何写操作,这将降低很多性能。

ZooKeeper 的快照是将内存状态导出写入持久存储:

  • 异步线程生成快照文件;
  • 快照文件名的后缀是最后提交的事务的 zxid
  • 快照文件生成过程中,仍然有新的事务提交;
  • 因此,快照文件不是精确到某一时刻的快照文件,可能与实际存在的任何数据树都不对应,因此是模糊的
  • 这就要求事务操作是幂等的,否则产生不一致;

参见:https://zookeeper.apache.org/doc/r3.7.0/zookeeperAdmin.html

Q: ZooKeeper 如何选主?

使用 Zab 原子广播算法。Zab 的 paper:http://dl.acm.org/citation.cfm?id=2056409

Q: ZooKeeper 的数据库有多大?看起来服务器要有很多内存。

这取决于应用,但论文没有提及,鉴于 ZooKeeper 只是一个协调服务,不是一个常规的存储服务,内存数据库是合理的选择。

Q: 什么是 universal object?

一种并发理论,通常和前面的 wait-free 放在一起理解,同样参见论文:https://cs.brown.edu/~mph/Herlihy91/p124-herlihy.pdf

作者称使用这种并发理论,以表明 Zookeeper 的 API 是通用的:API 包括足够的功能来实现任何你想要的协调方案。

更多:https://en.wikipedia.org/wiki/Non-blocking_algorithm

Q: 能否在不停止 ZooKeeper 服务的情况下添加更多的服务器?

虽然论文里 ZooKeeper 集群成员是静态的,但开源的 ZooKeeper 支持动态扩容:https://zookeeper.apache.org/doc/r3.7.0/zookeeperReconfig.html

有论文阐述其机制:https://www.usenix.org/system/files/conference/atc12/atc12-final74.pdf

Q:客户端的库如何实现 watch 机制?

在大多数情况下,客户端库可能会注册一个回调函数,该函数将在监视触发时被调用。例如,ZooKeeper 的 Go 客户端通过 GetW() 返回一个 channel 来实现。

参见:https://godoc.org/github.com/samuel/go-zookeeper/zk#Conn.GetW

Q: 为什么第6页代码中的读锁跳到第3行,而不是像写锁那样跳到第2行?

这应该是一个 bug,正确的代码块参见:https://zookeeper.apache.org/doc/r3.7.0/recipes.html#Shared+Locks

小结

ZooKeeper 是一个为特定用途而设计的好的例子,放宽了一致性,以提高以读为主的工作负载,论文中的结果显示,ZooKeeper 的吞吐量可以线性拓展。

图片

如今许多分布式系统中应用了 ZooKeeper,可以在这里查看:https://zookeeper.apache.org/doc/r3.7.0/zookeeperUseCases.html。

Reference

运行 3000 次都不出错的 MIT 6.824 Raft 实验

*大家好,最近忙着“大动作”(就快和大家见面了),更新频率低了一些,在此感谢没有取关的每个读者。*

前几天在分布式系统交流群里,小伙伴们都在讨论 6.824 的 raft 实验批量测试 2000 次以上总会出错,错误出在 Figure8UnreliableUnreliableChurn2 这两个测试。我自己其实也遇到了这个问题,这里记录下我自己的解决思路。

能到这步,首先默认你的程序已经没有大的问题,只是在上千次的测试中会有选举不出来(活锁问题)或提交的日志冲突问题。如果连单次测试都还过不去,请先移步《【MIT 6.824】学习笔记 5: 2021 Raft 实验实现细节

上面两个测试说白了,就是把网络搞得很乱,写一堆日志,然后给你 10 秒时间,没选出新的 Leader 就会出错。主要有两种错误:failed to reach agreement 或者 apply error

这两个问题分别用下面两种思路解决。

选举超时不能随便重置

如果仔细阅读过 Students’ Guide to Raft,其实里面很清楚写着,选举超时时间只能在下面三种情况下重置:

  • 收到现任 Leader 的心跳请求,如果 AppendEntries 请求参数的任期是过期的(args.Term < currentTerm),不能重置;
  • 节点开始了一次选举;
  • 节点投票给了别的节点(没投的话也不能重置);

原文:

you should only restart your election timer if a) you get an AppendEntries RPC from the current leader (i.e., if the term in the AppendEntries arguments is outdated, you should not reset your timer); b) you are starting an election; or c) you grant a vote to another peer.

这个问题其实很容易理解,主要容易错在,代码改着改着,就会不小心搞错了选举时间重置的位置,然后给后面的排查埋下了坑。

检查一下你是否胡乱重置了这个时间,我和群里另一位小伙伴的问题就出现在第三种情况。

正确处理 rpc 响应

处理 rpc 响应的时候也要小心,收到 rpc 响应的时候,如果 currentTerm != args.Term 了,这次 rpc 就要丢掉不能用了。

当然,如果节点角色已经变了,那也要忽略掉这次 rpc 响应。

总结

调试这个问题主要是要细心,关注细节,多读几遍:https://thesquareplanet.com/blog/students-guide-to-raft/

进行批量测试的脚本在这里:https://gist.github.com/jonhoo/f686cacb4b9fe716d5aa

使用方法是:

1
./go-test-many.sh 测试次数 并行数(默认是 CPU 个数) 哪个测试

例如要测试 2C 这个测试 2000 次,并行 8 个测试。

1
./go-test-many.sh 2000 8 2C

又比如单独测试 TestFigure8Unreliable2C 2000 次。

1
./go-test-many.sh 2000 8 TestFigure8Unreliable2C

Raft 的 Figure 8 讲了什么问题?为什么需要 no-op 日志?

发现之前写的 Raft 文章并没有分析过 Figure 8 的问题,而这张图比较容易让人产生歧义,群里讨论过不止一次。在这里谈谈我的理解。

Figure 8 用来说明为什么 Leader 不能提交之前任期的日志,只能通过提交自己任期的日志,从而间接提交之前任期的日志。

图片

先按错误的情况,也就是 Leader 可以提交之前任期的日志。那么上述的流程:

  • (a) S1 是任期 2 的 Leader(仔细看,有个黑框),日志已经复制到了 S2。
  • (b) S1 宕机,S5 获得 S3、S4 和 S5 的选票成为 Leader,然后写了一条日志 index=2 & term=3。
  • (c) S5 刚写完就宕机了,S1 重新当选 Leader,currentTerm = 4,此刻还没有新的请求进来,S1 将 index=2 & term = 2 的日志复制到了 S3,多数派达成,S1 提交了这个日志(注意,term=2 不是当前任期的日志,我们在讨论错误的情况)。然后请求进来,刚写了本地 index=3 & term=4 的日志,S1 就故障了。
  • (d) 这时候 S5 可以通过来自 S2、S3、S4 和自己的投票,重新成为 Leader(currentTerm>=5),并将 index=2 && term=3 的日志复制到其他所有节点并提交,此时 index=2 的日志提交了两次!一次 term=2,一次term=3,这是绝对不允许发生的,已经提交的日志不能够被覆盖!
  • (e) 这里的情况是,S1 在宕机之前将自己 term=4 的日志复制到了大多数机器上,这样 S5 就不可能选举成功。这是 S1 不发生故障,正确复制的情况。

这里主要通过 (c) 和 (d) 来说明问题所在。其实这张图用 Raft 大论文的图会比较好理解。(d) 和 (e) 分别对应 term=4 有没有复制到多数派的情况。

图片

所以,我们要增加提交的约束,不让 (d) 这种情况发生。这个约束就是,Leader 只能提交自己任期的日志

我们再来看看,加了约束后会变成什么样?前面 (a) 和 (b) 没有任何改变,我们从 (c) 开始。

  • (c) 还是将 index=2 & term=2 复制到大多数,由于 currentTerm = 4,所以不能提交这条日志。如果 S1 将 term = 4 的日志复制到多数派,那么 Leader 就可以提交日志,index=2 & term=2 也会间接一起提交,其实这就是 (e) 的情况,1-2-4 都被提交。
  • (d) 的情况我觉得是理解问题的关键。如果 S1 只将 term=4 写入自己的日志,然后宕机了;S5 选举成功成为 Leader,然后将 index=2 & term=3 的日志复制到所有节点,现在 index=2 是没有提交过的,S5 能提交 index=2 & term=3 的日志吗?

答案是不能。因为 S5 在 S1(term=4) 选举出来后 currentTerm 至少是 5,也可能是 6、7、8……我们假设就是 5,但这条日志 term = 3,Leader 不能提交之前任期的日志,所以这条日志是不能提交的。只有等到新的请求进来,超过半数节点复制了 1-3-5 后,term=3 的日志才能跟着 term=5 的一起提交。

虽然加了这个约束不会重复提交了,但如果一直没新的请求进来,index=2 & term=3 岂不是就一直不能提交?那这里不就阻塞了吗?如果这里是 kv 数据库,问题就很明显了。假设 (c) 或 (d) 中 index=2 那条日志里的 Command 是 Set("k", "1"),S5 当选 Leader 后,客户端来查询 Get("k"),Leader 查到日志有记录但又不能回复 1 给客户端(因为按照约束这条日志未提交),线性一致性要求不能返回陈旧的数据,Leader 迫切地需要知道这条日志到底能不能提交。

所以 raft 论文提到了引入 no-op 日志来解决这个问题。这个在 etcd 中有实现。

引入 no-op 日志

no-op 日志即只有 index 和 term 信息,command 信息为空。也是要写到磁盘存储的。

具体流程是在 Leader 刚选举成功的时候,立即追加一条 no-op 日志,并立即复制到其它节点,no-op 日志一经提交,Leader 前面那些未提交的日志全部间接提交,问题就解决了。像上面的 kv 数据库,有了 no-op 日志之后,Leader 就能快速响应客户端查询了。

本质上,no-op 日志使 Leader 隐式地快速提交之前任期未提交的日志,确认当前 commitIndex,这样系统才会快速对外正常工作。

另外说一句,6.824 的实验不需要实现 no-op 日志。

这个问题之前阿里巴巴团队称之为“幽灵复现”,参见《如何解决分布式系统中的“幽灵复现”?》,里面讨论了 Paxos、Raft 和 Zab 的解决方案。


《多颗糖的MIT笔记》
http://example.com/2024/03/11/MIT-6.824-Note/《多颗糖的MIT笔记》/
作者
where
发布于
2024年3月11日
许可协议