datax TableStoreStreamReader 使用

  • 2022-07-14
  • 浏览 (1403)

TableStore增量数据导出通道:TableStoreStreamReader

快速介绍

TableStoreStreamReader插件主要用于TableStore的增量数据导出,增量数据可以看作操作日志,除了数据本身外还附有操作信息。

与全量导出插件不同,增量导出插件只有多版本模式,同时不支持指定列。这是与增量导出的原理有关的,导出的格式下面有详细介绍。

使用插件前必须确保表上已经开启Stream功能,可以在建表的时候指定开启,或者使用SDK的UpdateTable接口开启。

开启Stream的方法:
SyncClient client = new SyncClient("", "", "", "");
1. 建表的时候开启:
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // 24代表增量数据保留24小时
client.createTable(createTableRequest);

2. 如果建表时未开启,可以通过UpdateTable开启:
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); 
client.updateTable(updateTableRequest);

实现原理

首先用户使用SDK的UpdateTable功能,指定开启Stream并设置过期时间,即开启了增量功能。

开启后,TableStore服务端就会将用户的操作日志额外保存起来, 每个分区有一个有序的操作日志队列,每条操作日志会在一定时间后被垃圾回收,这个时间即用户指定的过期时间。

TableStore的SDK提供了几个Stream相关的API用于将这部分操作日志读取出来,增量插件也是通过TableStore SDK的接口获取到增量数据的,并将 增量数据转化为多个6元组的形式(pk, colName, version, colValue, opType, sequenceInfo)导入到ODPS中。

Reader的配置模版:

"reader": {
    "name" : "otsstreamreader",
    "parameter" : {
        "endpoint" : "",
        "accessId" : "",
        "accessKey" : "",
        "instanceName" : "",
        //dataTable即需要导出数据的表。
        "dataTable" : "",  
        //statusTable是Reader用于保存状态的表,若该表不存在,Reader会自动创建该表。
        //一次离线导出任务完成后,用户不应删除该表,该表中记录的状态可用于下次导出任务中。
        "statusTable" : "TableStoreStreamReaderStatusTable",  
        //增量数据的时间范围(左闭右开)的左边界。
        "startTimestampMillis" : "",
        //增量数据的时间范围(左闭右开)的右边界。
        "endTimestampMillis" : "",
        //采云间调度只支持天级别,所以提供该配置,作用与startTimestampMillis和endTimestampMillis类似。
        "date": "",
        //是否导出时序信息。
        "isExportSequenceInfo": true,
        //从TableStore中读增量数据时,每次请求的最大重试次数,默认为30。
        "maxRetries" : 30
    }
}

参数说明

名称 说明 类型 必选
endpoint TableStoreServer的Endpoint地址。 String
accessId 用于访问TableStore服务的accessId。 String
accessKey 用于访问TableStore服务的accessKey。 String
instanceName TableStore的实例名称。 String
dataTable 需要导出增量数据的表的名称。该表需要开启Stream,可以在建表时开启,或者使用UpdateTable接口开启。 String
statusTable Reader插件用于记录状态的表的名称,这些状态可用于减少对非目标范围内的数据的扫描,从而加快导出速度。
1. 用户不需要创建该表,只需要给出一个表名。Reader插件会尝试在用户的instance下创建该表,若该表不存在即创建新表,若该表已存在,会判断该表的Meta是否与期望一致,若不一致会抛出异常。
2. 在一次导出完成之后,用户不应删除该表,该表的状态可用于下次导出任务。
3. 该表会开启TTL,数据自动过期,因此可认为其数据量很小。
4. 针对同一个instance下的多个不同的dataTable的Reader配置,可以使用同一个statusTable,记录的状态信息互不影响。
综上,用户配置一个类似TableStoreStreamReaderStatusTable之类的名称即可,注意不要与业务相关的表重名。
String
startTimestampMillis 增量数据的时间范围(左闭右开)的左边界,单位毫秒。
1. Reader插件会从statusTable中找对应startTimestampMillis的位点,从该点开始读取开始导出数据。
2. 若statusTable中找不到对应的位点,则从系统保留的增量数据的第一条开始读取,并跳过写入时间小于startTimestampMillis的数据。
Long
endTimestampMillis 增量数据的时间范围(左闭右开)的右边界,单位毫秒。
1. Reader插件从startTimestampMillis位置开始导出数据后,当遇到第一条时间戳大于等于endTimestampMillis的数据时,结束导出数据,导出完成。
2. 当读取完当前全部的增量数据时,结束读取,即使未达到endTimestampMillis。
Long
date 日期格式为yyyyMMdd,如20151111,表示导出该日的数据。
若没有指定date,则必须指定startTimestampMillis和endTimestampMillis,反之也成立。
String
isExportSequenceInfo 是否导出时序信息,时序信息包含了数据的写入时间等。默认该值为false,即不导出。 Boolean
maxRetries 从TableStore中读增量数据时,每次请求的最大重试次数,默认为30,重试之间有间隔,30次重试总时间约为5分钟,一般无需更改。 Int

导出的数据格式

首先,在TableStore多版本模型下,表中的数据组织为“行-列-版本”三级的模式, 一行可以有任意列,列名也并非固定的,每一列可以含有多个版本,每个版本都有一个特定的时间戳(版本号)。

用户可以通过TableStore的API进行一系列读写操作, TableStore通过记录用户最近对表的一系列写操作(或称为数据更改操作)来实现记录增量数据的目的, 所以也可以把增量数据看作一批操作记录。

TableStore有三类数据更改操作:PutRow、UpdateRow、DeleteRow。

  • PutRow的语义是写入一行,若该行已存在即覆盖该行。

  • UpdateRow的语义是更新一行,对原行其他数据不做更改, 更新可能包括新增或覆盖(若对应列的对应版本已存在)一些列值、删除某一列的全部版本、删除某一列的某个版本。

  • DeleteRow的语义是删除一行。

TableStore会根据每种操作生成对应的增量数据记录,Reader插件会读出这些记录,并导出成Datax的数据格式。

同时,由于TableStore具有动态列、多版本的特性,所以Reader插件导出的一行不对应TableStore中的一行,而是对应TableStore中的一列的一个版本。 即TableStore中的一行可能会导出很多行,每行包含主键值、该列的列名、该列下该版本的时间戳(版本号)、该版本的值、操作类型。若设置isExportSequenceInfo为true,还会包括时序信息。

转换为Datax的数据格式后,我们定义了四种操作类型,分别为:

  • U(UPDATE): 写入一列的一个版本

  • DO(DELETE_ONE_VERSION): 删除某一列的某个版本

  • DA(DELETE_ALL_VERSION): 删除某一列的全部版本,此时需要根据主键和列名,将对应列的全部版本删除

  • DR(DELETE_ROW): 删除某一行,此时需要根据主键,将该行数据全部删除

举例如下,假设该表有两个主键列,主键列名分别为pkName1, pkName2:

pkName1 pkName2 columnName timestamp columnValue opType
pk1_V1 pk2_V1 col_a 1441803688001 col_val1 U
pk1_V1 pk2_V1 col_a 1441803688002 col_val2 U
pk1_V1 pk2_V1 col_b 1441803688003 col_val3 U
pk1_V2 pk2_V2 col_a 1441803688000 DO
pk1_V2 pk2_V2 col_b DA
pk1_V3 pk2_V3 DR
pk1_V3 pk2_V3 col_a 1441803688005 col_val1 U

假设导出的数据如上,共7行,对应TableStore表内的3行,主键分别是(pk1_V1,pk2_V1), (pk1_V2, pk2_V2), (pk1_V3, pk2_V3)。

对于主键为(pk1_V1, pk2_V1)的一行,包含三个操作,分别是写入col_a列的两个版本和col_b列的一个版本。

对于主键为(pk1_V2, pk2_V2)的一行,包含两个操作,分别是删除col_a列的一个版本、删除col_b列的全部版本。

对于主键为(pk1_V3, pk2_V3)的一行,包含两个操作,分别是删除整行、写入col_a列的一个版本。

相关文章

datax README 使用

datax adbpgwriter 使用

datax adswriter 使用

datax cassandrareader 使用

datax cassandrawriter 使用

datax dataxPluginDev 使用

datax drdsreader 使用

datax drdswriter 使用

datax elasticsearchwriter 使用

datax ftpreader 使用

0  赞