实时将Cassandra数据引流到Kafka(下) - 嘶吼 RoarTalk – 回归最本质的信息安全,互联网安全新媒体,4hou.com

实时将Cassandra数据引流到Kafka(下)

lucywang 技术 2020-02-21 15:43:05
收藏

导语:自2018年第四季度以来,Cassandra Sourceconnector一直被用于Yelp的运行中。它支持多个用例,这些用例有助于解决一些关于其设计选择的问题。

实时将Cassandra数据引流到Kafka(上)

INTERMEDIATE阶段的Kafka流

生成的Kafka流包含对跟踪的Cassandra表的所有写入操作,由于对主键的所有更新都驻留在同一个主题分区中,因此要为每个键设置写入的顺序。

虽然不能保证事件会按写入时间顺序进行,但是也不能保证写入会按写入时间顺序提交给Cassandra副本。此外,每个数据副本将有一个重复的写入副本。即使是这种情况,INTERMEDIATE阶段的数据流也充当表的统一提交日志。它们为每个键提供事件的顺序,可以确定地将这些事件处理为发布到数据管道所需的行更新的有序流。

数据流的一致性

假设连接器使用Cassandra写路径,那么生成的Kafka流的一致性将不会比基础数据存储区更加一致。当从每个副本以其本地提交顺序发布写入内容时,处理后的流最初应与从单个副本中读取内容保持一致。随着来自其他副本的数据被处理,流最终变得一致。当所有副本都发布更新后,一致性将等同于覆盖所有CDC数据中心节点的读取操作。

这个最终一致性的时间限制将由Cassandra客户端使用的写入一致性级别确定,如果必须立即在流中显示更新,则必须使用高一致性级别(例如EACH_QUORUM)来确保提交到CDC数据中心中的节点。如果写入操作使用了较低或本地一致性,那么PartitionUpdate可能不会出现在输出流中(在最坏的情况下),直到下一次表修复。请注意,这与为直接阅读Cassandra的客户提供的保证是一致的。

此时,INTERMEDIATE阶段的Kafka流就包含了Cassandra PartitionUpdate对象,这些对象由键以松散的顺序分区。现在必须反序列化这些对象,将它们转换为有序的数据管道消息,并发布到管道中,整个过程都是通过DP Materializer完成的。

至此,我们就介绍完了Yelp的实时流数据的基础架构,并深入探讨了如何实时传输MySQL和Cassandra数据,如何自动跟踪和迁移Schema,如何处理和转换流,以及如何将所有这些数据连接到Redshift、Salesforce和Elasticsearch等数据存储中。

接下来,我们将要介绍Cassandra Source Connector的要求和设计选择,并深入介绍了CDCPublisher的详细信息。如上所述,CDC PUBLISHER处理Cassandra CDC数据,并将其作为松散排序的PartitionUpdate对象作为INTERMEDIATE阶段的密钥流发布到Kafka中,然后INTERMEDIATE阶段的数据流作为DP Materializer的输入。

Data Pipeline Materializer

DP Materializer会获取CDC发布的序列化PartitionUpdate对象,将它们转换为完整的数据管道消息,并将它们发布到数据管道中。

DP Materializer是构建在Apache Flink(一个流处理框架)之上的,目前,Flink已经在Yelp的产品中使用了几年,现在已经应用于各种流媒体应用中。它以RocksDB的形式提供了一个固有的状态后端,这对于保证有序 CDC发布非常重要。此外,Flink的检查点和保存点功能提供了非常强大的容错能力。

该应用程序具有两个主要阶段:

1.Schema推断( “引导阶段”),Schema,即XML Schema,XSD (XML Schema Definition)是W3C于2001年5月发布的推荐标准,指出如何形式描述XML文档的元素;

2.ETL(“转换阶段”);

Schema推断

在引导阶段,发布到数据管道所需的avroSchema源自Cassandra表Schema,该过程会首先构建Cassandra库使用的Cassandra表元数据对象(CFMetaData)。加载此元数据需要使用库功能来处理来自CDC Publisher流的序列化Cassandra数据。元数据对象包含表主键、列类型和表CREATE语句中指定的所有其他属性的信息。处理此架构表示以生成Avro架构,其中每个Cassandra列均由等效的Avro类型表示。

由于DP Materializer部署在Cassandra集群之外,所以它无法从本地节点上的文件(比如CDCPublisher)加载表元数据。相反,它使用Cassandra客户端连接到Cassandra,并从正在流的表的Schema中派生CFMetaData。这是通过以下步骤完成的:

1.连接到集群后,将检索create table和type(用于UDT)语句。

2.Cassandra的查询处理器用于将检索到的create语句解析为表元数据对象。

3. 检索有关先前从表中删除的列的信息,并将其添加到上一步中构建的元数据中。要读取在删除列之前创建的表数据,需要加载删除的列信息。

1.jpeg

在Cassandra中加载表元数据

加载元数据后,DP Materializer将从元数据构建avroSchema。在这个推导阶段会发生一些关键的事情:

1.该表的分区键和集群键被映射为avroSchema的主键;

2.表中的所有其他列(分区和集群键除外)都创建为空,当表中的Schema发生变更时,这就保证了相应的avroSchema总是与它们以前的版本兼容。除非使用不同的类型重新添加一个列,不过这本身就会导致问题。

Schema生成目前支持几乎所有有效的Cassandra列类型(除非Avro禁止),包括集合、元组、udt及其嵌套。

Schema变更检测

由于上述Schema推断是引导阶段的一部分,因此DP Materializer需要能够在线检测CassandraSchema更改并自动更新输出AvroSchema。为此,它实现了Cassandra的Schema更改监听器接口(由Cassandra客户端提供),以检测何时对被跟踪表的Schema进行了更改。一旦检测到,相应的Cassandra元数据将被更新,avroSchema将通过更新的元数据重建。

ETL(使用、转换和发布)

在DP Materializer的这个阶段,使用、处理来自CDCPublisher的序列化PartitionUpdate对象,并将其转换为数据管道消息,以便发布到管道中。使用者和发布者是由Flink提供的开箱即用的,因此本节主要关注DP Materializer的转换器部分。

2.jpeg

数据管道实现

状态结构

转换器是由Flink的RocksDB状态支持的,此状态被抽象为映射对象的集合,每个映射对应于Cassandra表中的一个分区键。在Cassandra中,每个map对象的键都来自该分区的集群键。分区更新(最多包含一行)存储为映射中对应的集群键的值。对于没有定义集群键的表,每个映射包含一个带有空键的单一条目。

3.jpeg

状态结构

状态加载和内存管理由Flink在内部处理,此外,Flink的数据流键控机制保证了一个分区键的所有更新都将被路由到同一个工作程序,并在应用程序重新启动时对同一个map对象进行持久的处理。

注意,来自CDCPublisher的PartitionUpdate对象可以重复多次,也可以被无序复制(按写入时间)。此外,分区更新通常可能不包含Cassandra行的全部内容。

转换器

该应用程序的核心部分是转换器,它的主要作用如下:

1.将Cassandra CDC数据处理为给定avro主键(Cassandra partition key + clustering key[s])的完整行(带有原象),以便发布到数据管道。

2.使用适当的数据管道消息类型生成最终输出消息;

转换器使用行(PartitionUpdate)保存在映像对象的状态,以及在CDCPublisher 传入PartitionUpdate对象生成完整的行内容,前一行的内容(如果是更新、删除消息),并输出消息的类型。

这是通过反序列化输入PartitionUpdate并将其与保存的PartitionUpdate合并来实现的,是通过使用与Cassandra用于在读取期间合并来自SSTable的数据相同的PartitionUpdate合并功能来完成的。合并后的API接受两个PartitionUpdate对象,一个来自Flink状态,另一个来自CDC PUBLISHER的输出流。这将产生一个合并的PartitionUpdate,用于使用在引导阶段派生的Schema构建avro记录。如果需要前面的行值,那么它是从Flink状态下保存的PartitionUpdate派生出来的。最后,使用合并的PartitionUpdate更新状态。

4.jpeg

确定行状态

这个过程处理重复的和无序的PartitionUpdate对象,Cassandra合并功能的使用与Cassandra读取的“最后写入胜利”冲突解决方法相同。为了避免发布重复的消息,需要验证输入PartitionUpdate更改了行状态。这是通过计算已保存和合并的PartitionUpdate对象的md5摘要实现的。如果摘要相同,则忽略PartitionUpdate。

合并、更新状态和发布逻辑可以总结如下:

1.传入的PartitionUpdate与保存的PartitionUpdate合并(如果存在),确定相应的数据管道消息:

2.如果合并的PartitionUpdate包含实时(非逻辑删除)数据,但保存的不包含实时数据,则会发布一条CREATE消息。

3.如果合并和保存的PartitionUpdate对象都包含实时数据,则如果对象的md5摘要不同,则发布一条UPDATE消息。

3.如果合并的PartitionUpdate包含逻辑删除的数据,而保存的分区包含活动数据,则会发布删除消息。

4.如果保存和合并的PartitionUpdate对象的md5摘要不同,则合并的PartitionUpdate将保存在该状态。

因此,在转换阶段的末尾,具有适当的数据管道消息类型和完整行内容的消息就可以发布到数据管道中了。

支持回填

引导一个数据流

可以在Cassandra节点上存储有限数量的CDC日志,因此,当connector将表设置为流时,只处理当时(以及以后)CDC目录中可用的数据。然而,为了保持流表的二元性,Cassandra表中的所有现有数据都需要在流中重新播放。

为此,回填引导过程将作为SSTable读取存储在磁盘上的数据。为了确保在回填期间不通过压缩修改SSTable文件集,将获取表的快照,并通过该快照处理SSTable文件。Cassandra SSTable阅读器将扫描的数据作为一系列PartitionUpdate对象返回。CDC PUBLISHER以与提交日志段相同的方式处理这些PartitionUpdate对象,并将它们发布到Kafka中,然后通过DP Materializer将它们转换为数据管道消息。

每当connector第一次设置要跟踪的Cassandra表时,都会遵循这个过程。如果需要在DP Materializer中重建状态,也可以这样做。

重建一个流

如果被跟踪的表的输出流被损坏或删除,则可以通过重新播放DP Materializer的存储状态来重新构建流。由于所有序列化的PartitionUpdate对象都存储在状态中,所以不需要重新发布来自SSTable的数据。

对未来工作的设想

分区进行操作

当前的系统设计会独立地处理每一行的变更,DP Materializer的单个输入消息最多会向数据管道发送一条消息。当前不支持在分区上对多行值进行更改,比如:

1.完全分区删除(仅在使用集群时);

2.对tombstone进行归类;

什么是tombstone?

当一个动态库(native 程序)开始执行时,系统会注册一些连接到 debuggerd 的 signal handlers,当系统 crash 的时候,会保存一个 tombstone 文件到/data/tombstones目录下(Logcat中也会有相应的信息),文件的确就像墓碑一样记录了死亡了的进程的基本信息(例如进程的进程号,线程号),死亡的地址(在哪个地址上发生了 Crash),死亡时的现场是什么样的(记录了一系列的堆栈调用信息)等等。

3.静态列;

但是,存在潜在的支持途径。在处理过程中,DP Materializer将所有行存储在单个Cassandra分区中,作为同一映像对象的条目。可以设想也分别存储分区级别状态,当此状态更改时,DP Materializer可以遍历整个映射(Cassandra分区)并为所有受影响的行生成数据管道消息。

生存时间(TTL)

连接器当前不支持TTL数据,忽略TTL值,并且根据其写入时间将数据视为运行数据。

删除tombstone

不支持从DP Materializer的Flink状态删除tombstone,除非被新数据覆盖,否则它们将无限期地保留在那里。在更新行状态时,可能会删除旧的tombstone,类似于表上的gc_grace_seconds参数。但是,这对从不更新的行没有帮助。此外,需要非常小心地确保回填或修复表不会在输出流中创建僵尸数据。

发布延迟

如上所述,提交日志段必须是满的,并且在被Cassandra处理之前不再被内存表引用。尽管使用了CDC日志填充器实现,但在向数据管道发布时还是会引入一些延迟。在Cassandra 4中这个限制应该被解决了,它引入了读取实时提交日志段的功能,因此将确保发布延迟尽可能接近实时发布。

总结

自2018年第四季度以来,Cassandra Sourceconnector一直被用于Yelp的运行中。它支持多个用例,这些用例有助于解决一些关于其设计选择的问题:

Avro作为序列化格式

Cassandra在单个分区中允许的最大单元格数(行*列)为20亿,这意味着一行可能有20亿列。然而,Avro序列化和反序列化成为了一个瓶颈,一旦列数开始增加到数百个,就无法满足可能的最大列数。根据流量的吞吐量需求和Cassandra表的大小(以列的数量为单位),使用者可能需要水平扩展。

此外,一些Cassandra数据类型(例如DECIMAL)没有直观的Avro数据类型等效项。在这种情况下,要么无法支持列,要么必须定义自定义avro数据类型。

Flink结构大小

由于表中的每一行都以序列化的PartitionUpdate的形式存储在状态中,对于大型表,状态大小可能会变得非常大。状态大小成为代码推送和维护的瓶颈,因为它必须在每次部署和重新启动应用程序时重新加载。需要进行额外的工作,以最小化保存和加载大型表的状态所需的时间。

本文翻译自:https://engineeringblog.yelp.com/2019/12/cassandra-source-connector-part-2.html如若转载,请注明原文地址
  • 分享至
取消

感谢您的支持,我会继续努力的!

扫码支持

打开微信扫一扫后点击右上角即可分享哟

发表评论