实时将Cassandra数据引流到Kafka(上)
2020-05-08 09:47:27 Author: www.4hou.com(查看原文) 阅读量:247 收藏

在Yelp,我们使用Cassandra来支持各种用例。截至发布日期,已有25个Cassandra集群在生产环境中运行,且每个集群的部署大小不同。存储在这些集群中的数据通常是按原样需要的,或在其他用例中处于转换状态,比如分析、索引等等,不过对于这些用例,Cassandra不是最合适的数据存储。

Cassandra是一个混合型的非关系的数据库,类似于Google的BigTable。其主要功能比Dynamo (分布式的Key-Value存储系统)更丰富,但支持度却不如文档存储MongoDB(介于关系数据库和非关系数据库之间的开源产品,是非关系数据库当中功能最丰富,最像关系数据库的。支持的数据结构非常松散,是类似json的bjson格式,因此可以存储比较复杂的数据类型)。Cassandra最初由Facebook开发,后转变成了开源项目。

以前我们曾介绍过,Yelp围绕其数据存储开发了一个完整的connector生态系统,可以将数据流导入和导出数据管道。本文将深入探讨Cassandra Source Connector,该应用程序用于将Cassandra数据引流到数据管道(Data Pipeline)。

什么是数据管道?

对于做数据分析或者需要建模的人来说,面对庞杂的大数据,最棘手的难题往往就是多线程工作了。面对这种情况,你其实可以尝试搭建一套Data Pipeline系统。本文将Data Pipeline翻译为数据管道,就是一套让你的工作数据化、流程化、自动化的系统方法。

Yelp的数据管道是Apache Kafka之上的一种抽象,由一个叫做Schematizer的模式注册表支持。它目前是Yelp数百个用例的主干,范围从分析和实验到通知、排名和搜索索引。

data-pipeline.jpg

Yelp的数据管道生态系统

下面,我们简要回顾一下数据管道。

1. 发布到数据管道中的数据必须被模式化,但如果数据没有预定义的架构,则无法发布。

2. 对于由数据存储支持的数据,数据管道中的相应流必须符合“流-表二元性”(Stream-Table Duality)。

3. 数据管道中的每条消息都必须包含数据存储中等效行的全部内容,此外,UPDATE和DELETE消息还必须包含更改之前等效行的先前快照。

来自Cassandra的数据流带来的挑战

由于Cassandra的工作方式,满足上述数据管道需求可能会带来一些挑战。

整理写入顺序

虽然Cassandra使用多个数据副本以提高可用性,但是,没有真正的全局复制流概念。每次写入均被独立复制,所有节点都有资格进行协调。这意味着,可以在不同副本上以不同顺序处理并发写入。 Cassandra使用多种机制(提示切换,修复,最后一次写入获胜)来确保数据最终保持一致,尽管这些副本最终会就数据的最终值达成共识,但这并不能解决写入顺序上的差异。因此,Cassandra Source Connector需要遵守类似于Cassandra的写入顺序保证。

获取完整的行内容

不需要Cassandra写入包含所有表列,即使是这种情况,该行的当前状态也将取决于写入中的数据以及所有先前写入的影子数据。因此,仅使用写入数据不足以确定新的行状态。

获取前一行内容

与确定新行值时的情况一样,需要知道给定Mutation之前的行状态,这个优先行状态表示之前所有写入操作的叠加。

分布式数据所有权

Cassandra中数据的所有权分布在每个数据中心的节点之间,所有节点都能够协调写入操作。因此,处理这些写入集群的操作涉及合并来自多个节点的信息。

具体方案

在设计Cassandra Source Connector时考虑了几种方法,这篇文章对可用的主要数据流传输选项以及每个项的利弊进行了详尽的描述,包括:

1. 同时写入Cassandra 和 Kafka;

2. 直接写入Kafka并使用Cassandra Sink将数据加载到Cassandra中(“Kafka作为事件源”);

3. 处理由Cassandra的Change Data Capture或CDC公开的提交日志(“解析提交日志”),Change Data Capture(缩写入为 CDC)——大概可以机翻为 “变动数据捕获”——你可以将它视为和数据库有关的架构设计模式的一种。它的核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入,更新,删除等),将这些变更按发生的顺序完整记录下来,写入到消息INTERMEDIATE阶段件中以供其他服务进行订阅及消费。

另外上述文章还研究了Kafka Connect的Cassandra Source的使用。该connector使用“批量”或“增量”更新模式将数据从Cassandra表引流到Kafka。这两种模式都通过定期轮询表中的数据来起作用,批量模式执行全表扫描,发布整个结果,而增量模式查询自上次采样以来写入的行。不过着两种模式都有其缺点:

1. 在大型表上扫描批量模式表非常昂贵,并且每次扫描都会发布大量重复数据。

2. 增量模式仅适用于某些类型的工作载荷,写入操作必须是仅追加操作,并且单调递增的列(例如时间戳)作为主键的一部分。此外,轮询此数据可能会导致额外的群集载荷。

最终,基于处理Cassandra CDC的解决方案对connector最有意义。

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写入。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

设计思路

Cassandra的分布式部署特性,加上实现写入的顺序和满足数据管道语义的需要,使得创建单个应用程序非常具有挑战性。因此,Cassandra Source Connector被构建为两个独立的组件,每个组件都负责解决具体的一部分问题。

csource-high-level.jpg

高级别的Cassandra源连接器

CDC Publisher:在Cassandra节点上本地运行的服务,它使用CDC将原始Cassandra写入发布到INTERMEDIATE阶段Kafka流。这些流充当统一的提交日志,删除了分布式数据所有权方面的内容,并定义了要处理的事件顺序。

数据管道物化器(DP Materializer) :在Apache Flink上运行的应用程序,它处理由CDC PUBLISHER生成的原始Cassandra写入操作,并将它们作为数据管道消息发布。

CDC Publisher

CDC PUBLISHER将所有在Cassandra表中进行的写入操作作为序列化分区更新生成到特定于表的Kafka流中。

用CDC处理Cassandra写入

在Cassandra的3.8版中引入的更改数据捕获(Change Data Capture, CDC)功能被CDC PUBLISHER用于处理写入操作。

通常(禁用CDC的环境下),Cassandra以下列方式存储写入操作:

1. 客户端写入操作被每个节点持久保存到内存表和提交日志中;

2. 内存表会定期在磁盘上的SSTable中进行刷新,SSTable文件是内存表 数据到一定阈值写入文件形成的,由于内存容量总是有限的,将一定量数据写入磁盘可以存放更多数据,所以leveldb相比redis能存放更多数据。

cassandra-write-path-with-cdc.jpg

Cassandra写入路径

提交日志由一系列大小固定的文件(默认为32MB)组成,称为“提交日志段(commit log segment)”。一旦内存表在SSTable被刷新, Cassandra就会丢弃这些片段。

如果启用了CDC,那么所有包含对被跟踪表的写入操作的Cassandra提交日志段文件都会被标记。当相应的内存表不再引用这些文件时,它们将被移动到一个单独的目录中,而不是被丢弃。

cassandra-write-path-with-cdc.jpg

Cassandra用CDC写入路径

目前,Cassandra用CDC写入路径会有几个挑战:

1. 每个节点的处理:由于每个节点只存储完整表数据的一部分,因此必须在多个节点上处理CDC。

2. 复制:相同的写入存储在每个数据副本上,从而导致重复处理。

3. 数据不完整:提交日志段只包含来自写入的信息,不包含相应行的完整视图。

4. CDC不包含关于表的架构信息;

5. CDC目录大小限制:如果CDC目录太大,节点将拒绝新表的写入操作。

6. 延迟性:提交的日志段必须已满并且不能再由内存表引用,然后才能进行处理。对于写入速率较低的集群,提交日志段可能需要一段时间才能填满,从而影响延迟。

尽管有这些缺点,但我们还是使用CDC,因为它是Cassandra开放源码社区为处理提交的数据而开发的解决方案。这也意味着,CDC实现的任何改进都可以通过升级Cassandra版本来实现。

部署CDC

部署思路

region-deployment.jpg

CDC部

为了确保处理CDC不会在实际群集上造成任何性能问题,因此有必要创建了一个虚拟Cassandra数据中心,该数据中心在逻辑上独立于标准的特定于区域的数据中心。CDC Publisher服务器仅部署在此数据中心的节点上。由于所有的写入操作都是针对所有数据中心中的数据副本的,这就足以确保更改所有的表。此外,此数据中心中的节点可以以不同的方式提供,因为它们不实时提供客户端的读取请求。

延迟问题的解决

如上所述,使用CDC的一个问题是延迟,即从向Cassandra写入数据到可供处理的数据之间的时间。CDC只允许处理不再需要的提交日志文件,这意味着它们应该是完整的,不被现有的内存表引用。为了给connector引入可预测的延迟上限,我们采用了以下方法:

删除内存表引用

当内存表变得太大时,Cassandra会定期在SSTable上进行刷新。但是,具有低写入速率的表很少被刷新,从而延迟了整个集群的CDC处理。为了确保不会发生这种情况,CDC数据中心中的节点会定期(通常5-10分钟)触发所有内存表的显式刷新。这就确保一个完整的提交日志段最多只能等待一个刷新间隔,然后才能处理它。因为只有CDC数据中心节点被刷新,所以不会影响其他数据中心的客户端读取性能。

数据填充

提交日志段的大小是固定的,如果被跟踪的表的写入速率很慢,则可能需要一段时间才能完全填满一个段。通过创建一个与CDC Publisher分开的进程来约束此填充时间,该进程以可预测的速率写入“filler”表。该表仅在CDC数据中心中被复制,并且被完全复制到所有节点。为了减少对性能的影响,仅需要执行更少的大型写入操作(小于100K),仅写入单个密钥,并对数据进行TTL加密。

处理CDC

为了帮助处理CDC提交日志片段,Cassandra库为应用程序提供了一个要实现的处理程序接口。该接口允许处理提交日志段中出现的所有Mutation的写入流。Mutation类是Cassandra用来表示数据的Java对象,即:

1. 一个Mutation包含多个表的PartitionUpdate对象;

2. PartitionUpdate包含单个分区键值的行对象,键值(key-value)分布式存储系统查询速度快、存放数据量大、支持高并发,非常适合通过主键进行查询,但不能进行复杂的条件查询。

3. 行包含单个集群键值的数据。

mutations.jpg

一个Cassandra Mutation的结构

CDC PUBLISHER的主要功能是将这些Mutation分解为单独的PartitionUpdate对象,如果一个PartitionUpdate包含多个行,则这些行将进一步细分为一系列具有单行的更新。因此,每个更新只包含一个Cassandra主键的数据。

mutation-breakdown.jpg

将一个Mutation分解为单独的行对象

每个生成的PartitionUpdate对象都被序列化,并被引流到Kafka,由Cassandra库提供的序列化程序在发布之前用于序列化。

如何引流到Kafka

PartitionUpdate有效载荷用于构建要发布到INTERMEDIATE阶段Kafka流的消息,每条消息都包括:

1. 序列化PartitionUpdate;

2. 用于序列化的Cassandra消息版本;

3. 用于审核的元数据(主机、文件、位置等)。

然后将消息发布到表特定的Kafka流,一个流可以具有多个分区以进行可伸缩发布,在这种情况下,消息根据Cassandra分区键路由到Kafka分区。因此,对于单个分区键的所有写入操作都将以相同的主题分区结束。

cdc-publisher-pkey-partitioning (1).jpg

将CDC发布到一个多分区Kafka主题

本文翻译自:https://engineeringblog.yelp.com/2019/12/cassandra-source-connector-part-1.html如若转载,请注明原文地址:


文章来源: https://www.4hou.com/posts/8Bn2
如有侵权请联系:admin#unsafe.sh