Kafka: a Distributed Messaging System for Log Processing

What is Kafka

Kafka是LinkedIn开发的一个先进的,日志处理的消息系统,它整合了传统日志聚合器和消息系统的优势。Kafka是分布式,可扩展和高吞吐,提供与消息系统类似的API,让应用程序可以实时消费日志事件。

Why Kafka

在互联网公司,每天会产生大量的“log”数据,主要包含以下两类:

  • 用户活动,登陆,pageviews,点击,“likes”,分享,评论和搜索查询
  • 运营指标,服务调用栈,调用延迟,错误,系统指标(CPU,内存,网络,磁盘使用率)

长期以来,日志数据一直是用于跟踪用户参与度、系统使用率和其他指标的分析的组成部分。然而,最新趋势是互联网应用程序将活动数据作为直接用在在站点功能中的生产数据管道的一部分:

  • 搜索相关性
  • 推荐系统
  • 广告定位和报告
  • 安全应用
  • 信息流(newsfeed)

这种生产上,实时使用log数据给数据系统带来了新的挑战。许多处理此类数据的早期系统都依赖于从生产服务器上物理抓取日志文件进行分析。近年来,已经建立了几个专门的分布式日志聚合器,包括 Facebook的Scribe、Yahoo的Data Highway和Cloudera的Flume。这些系统主要设计用于收集日志数据并将其加载到数据仓库或Hadoop中以供离线使用。在LinkedIn,除了传统的离线分析之外,还需要支持上面提到的大多数实时应用程序,延迟不超过几秒钟。

传统的企业消息系统存在时间长,经常作为事件发挥关键作用用于处理异步数据流的总线。然而,它们不适合日志处理:

  • 提供的功能不匹配需求,这些系统通常专注于提供丰富的交付保证,比如事务支持(IBM Websphere),单个消息确认(JMS),支持乱序等,往往会增加这些系统的API和底层实现的复杂性
  • 不是关注高吞吐而设计的,比如JMS不支持批量发送,每条消息都需要完整的TCP/IP往返,这对吞吐量要求是不可行的
  • 分布式支持弱,不方便在多个机器上分区存储消息数据
  • 许多消息传递系统假设近实时消费消息,所以未消费的队列消息总是相当小

Kafka Architecture & Design Principles

基本概念

Producer(生产者)可以向Topic发布消息。然后将发布的消息存储在一组称为Brokers的服务器上。Consumer(消费者)可以从Brokers中订阅一个或多个Topic,并通过从brokers中pull数据来消费订阅的消息。Kafka的整体架构如下图所示,由于Kafka是分布式的,一个Kafka集群通常包括多个brokers。为了负载均衡,一个主题被划分为多个Partition(分区),每个broker存储一个或多个分区。每个Producer可以发送消息到随机选择的分区或者分区键和分区函数决定的分区。Consumer groups是由一个或多个Consumer组成的,一起消费一组订阅的topics,每条消息只会传递给其中一个Consuermr,同一Consumer Group中的consumer可以在不同的进程或者机器上。

kf

存储

一个Topic的每个分区对应一个逻辑log,物理上一个log被实现为一组分片文件(segment files),每个分片大小解决(比如,都是1G)。为了更好的性能,只有当可配置的数量的消息被发布或者超过了可配置的时间,分片文件才被刷入存储盘,然后对消费者可见。

每条消息都是由其在日志中的逻辑偏移量(offset)寻址,消息ID不是连续增加对,要计算下一条消息的ID,必须将当前消息的长度加上它的ID。消费者总是按顺序消费来自特定分区的消息,如果消费者确认了一个特定的消息偏移量,则意味着消费者已收到分区中该偏移量之前的所有消息。Kafka日志和内存索引的布局如下所示。

  • 每个拉取请求都包含消费开始的消息偏移量和可接受的要提取的字节数
  • 每个broker在内存中保存一个排序的offset列表,包括每个段文件中第一条消息的offset
  • Broker通过查找offset列表定位到请求消息所在的segment文件,并将数据发回给consumer
  • 消费者收到一条消息后,它会计算下一条要消费的消息的偏移量,并在下一个拉取请求中使用它

kfk-seg

传输

为了提升传输效率,Kafka的生产者可以按batch发送数据,消费者可以每次请求多个特定大小的消息,通常为几百KB。另一个设计选择就是依赖底层文件系统的page cache,不在进程中缓存消息,因此垃圾回收方面的开销非常小,由于生产者和消费者都按顺序访问段文件,消费者通常会稍微落后于生产者,因此正常的操作系统缓存启发式算法非常有效(特别是write-through缓存和read-ahead),生产和消费都具有与数据大小呈线性关系的一致性能。

常规的发送本地数据到远端socket的流程是:

  1. read() 该系统调用导致从用户模式到内核模式的上下文切换。在内部发出sys_read()(或等效函数)以从文件中读取数据。第一个副本由直接内存访问 (DMA) 引擎执行,该引擎从磁盘读取文件内容并将它们存储到内核地址空间缓冲区中(Read buffer)。
  2. 请求的数据量从读取缓冲区复制到用户缓冲区(Application buffer),然后 read() 调用返回。调用的返回导致另一个上下文从内核切换回用户模式。现在数据存储在用户地址空间缓冲区中。
  3. send() 套接字调用导致上下文从用户模式切换到内核模式。执行第三次复制,将数据再次放入内核地址空间缓冲区。不过这一次,数据被放入另一个缓冲区(Socket buffer),该缓冲区与目标套接字相关联。
  4. send() 系统调用返回,创建第四次上下文切换。独立且异步地,当DMA引擎将数据从内核缓冲区传递到协议引擎时(NIC buffer),会发生第四次复制。

Kafka使用Zero-copy优化传输后到传输流程是:

  1. transferTo/sendFile方法让文件内容被DMA引擎复制到读取缓冲区中。然后数据被内核复制到与输出套接字关联的内核缓冲区中
  2. 没有数据被复制到套接字缓冲区中。相反,只有包含有关数据位置和长度信息的描述符才会附加到套接字缓冲区。DMA引擎将数据直接从内核缓冲区传递到协议引擎,从而消除了剩余的最终CPU拷贝

常规流程需要4次拷贝,2次系统调用(read,send),使用zero-copy后只需要2次拷贝(DMA),1次系统调用。

无状态Broker

Broker不维护每个消费者消费了多少的信息的状态信息,这样的设计大大降低了代理的复杂性和开销,然而,Broker不知道是否所有订阅者都使用了该消息,Kafka使用一个简单的基于时间的SLA保留策略来解决这个问题,性能不会随着数据量的增加而降低,这一事实使得这种长期保留(通常为7天)成为可能。这种设计有一个重要的附带好处,消费者可以故意回退到旧的偏移量并重新消费数据,消费着可以checkpoint最小未处理的offset,错误恢复时可以从这个offset重新消费数据。值得注意的是,pull模型的消费者更容易支持这种设计。

分布式协调

为了在不引入过多的协调负担的前提下,把消息均匀分布存储在brokers,Kafka做了两个设计上的选择,

  1. 让分区作为一个topic的最小并行单位(与大多数的分布式计算类似,比如spark,presto),这意味着在任何时候同一个分区的所有消息只会被每个消费者组的一个消费者消费,避免维护同一消费者组内的消费者之间的协调的锁与状态。只有在做rebalance的时候才需要协调工作,但这个是低频事件。
  2. 消费者们使用去中心化的方式协调,不引入一个中心主节点,其实就是借助Zookeeper(更新:现在Confluent已经移除ZK了)。

Zookeeper是一个高可用一致性服务,提供了易用的类文件系统API,支持创建,删除,list一个path的children,设置path的value,它可以做一些有意思的事情:

  • 对一个path注册一个watcher,在这个path的value或者children变更时收到通知
  • 一个path可以被创建为临时节点,在创建它的client连接断开时会被ZK移除
  • ZK把数据副本存储在多个server上,保证高可靠和高可用

Kafka对ZK的使用包括如下场景:

  1. 检测brokers和consumers的增加和衣橱
  2. 触发每个consumer的rebalance,当1中的事件发生时
  3. 维护消费关系和每个分区的消费的offset

Zookeeper中保存几种registry,

  • Broker registry,brokers的host和port,存储的topics和分区。
  • Consumer registry,所属的consumer group和订阅的所有topics
  • 每个consumer group与一个onwership registry,offset registry
  • Ownership registry,每个订阅的分区有一个path,id是拥有该分区的consumers的id
  • Offset registry,每个分区的最后消费的offset