简单高效地从 DynamoDB 同步数据到 Redshift
2024-12-3 10:37:5 Author: www.upx8.com(查看原文) 阅读量:0 收藏

需求很简单,将一张 DynamoDB 的表数据同步到 Redshift 表,需要两张表数据保持一致,数据量不大,最多 10 万级别,Dynamo 表的更新为不定时地人工发布,频率更新不高,这种场景下如何设计简单易实现的方案?

问题拆解分析

数据更新频率低

所以不必做实时同步,引入 DynamoDB Streams 使得系统变复杂,设置一个定时任务触发器就可以了。

数据量不大

10 万级别的数据,每次扫 DynamoDB 全表取数是可以接受得,不需要做 CDC(Change data capture),引入和维护实时数据流的成本相对离线处理是高昂的。

数据保持一致

DynamoDB 和 Redshift 表每次同步后保持一致,需要做数据比对,找出新增、修改和删除的数据,当然暴力一点也可以对 Redshift 表进行全删全插,但会影响 Redshift 表,致使一段时间的不可用状态,不建议这样操作。

方案设计实现

全量扫描 DynamoDB 数据导入到 Redshift 表,可以直接使用 Redshift COPY 命令,语法如下:

copy <redshift_tablename> from 'dynamodb://<dynamodb_table_name>'
authorization
readratio '<integer>';

我们不能直接 COPY 导入 Redshift 目标表,这是 Append 写入,无法删改,那么可以先 COPY 写入到一张临时表,现在问题就变成了如何对比两张 Redshift 表做更新,这是一个 Redshift ETL 处理的场景。

Redshift 支持 MERGE 语法,能够实现比对源表对目标表做新增、修改或者删除操作,参考下面语法:

MERGE INTO target_table 
USING source_table [ [ AS ] alias ] 
ON match_condition 
[ WHEN MATCHED THEN { UPDATE SET col_name = { expr } [,...] | DELETE }
WHEN NOT MATCHED THEN INSERT [ ( col_name [,...] ) ] VALUES ( { expr } [, ...] ) |
REMOVE DUPLICATES ]

Redshift 支持与另一张表匹配数据来对目标表进行批量删除操作,语法参考:

[ WITH [RECURSIVE] common_table_expression [, common_table_expression , ...] ]
DELETE [ FROM ] { table_name | materialized_view_name }
    [ { USING } table_name, ... ]
    [ WHERE condition ]`

有了以上这些支持,那么可以完全使用 Redshift SQL 来实现数据同步需求,步骤如下:

  1. COPY 命令从 DynamoDB 全量复制数据写入 Redshift 更新临时表

  2. 用 EXCEPT 找出目标表中需要删除的数据写入到删除临时表

  3. 用 MERGE 语句从更新临时表获取数据对目标表做批量新增和修改;

  4. 用 DELETE USING 语句从删除临时表匹配数据对目标表做批量删除删除;

  5. 将这些 SQL 封装进 Lambda 调用,设置 CloudWatch Event Rules 定时触发执行。

步骤 1、2 只对临时表数据写入,不影响目标表,步骤 3、4 为一个事务,因为事务的隔离性并不会在事务处理阶段影响其他用户对目标表的读取,如果更新或者删除操作任一失败会自动回滚,这样能够保证目标表不会在数据同步期间不可用。

完整实现 SQL 代码示例:

-- Transtraction 1
truncate table t_order_upsert_staging;
copy t_order_upsert_staging 
from 'dynamodb://t_order' 
iam_role 'xxxxxxx' 
readratio 100;

truncate table t_order_delete_staging;
insert into t_order_delete_staging
with t_deletes as (
    select * from t_order
    except 
    select * from t_order_upsert_staging
)
select * from t_deletes;

-- Transtraction 2
MERGE INTO t_order 
USING t_order_upsert_staging AS t
ON t_order.pk=t.pk 
REMOVE DUPLICATES;

DELETE FROM t_order 
USING t_order_delete_staging t 
WHERE t_order.pk=t.pk;

以上方案充分利用了 Redshift 的 ETL 能力,不会引入其他 AWS 组件产生额外费用,实现简单高效,很好地满足了需求。

引用:


文章来源: https://www.upx8.com/4508
如有侵权请联系:admin#unsafe.sh