当前位置:首页 > Web开发 > 正文

使用Amazon EMR和Apache Hudi在S3上插入,更新,删除数据

2024-03-31 Web开发

将数据存储在Amazon S3中可带来很多好处,包括规模、可靠性、成本效率等方面。最重要的是,你可以利用Amazon EMR中的Apache Spark,Hive和Presto之类的开源工具来处理和分析数据。 尽管这些工具功能强大,但是在处理需要进行增量数据处理以及记录级别插入,更新和删除场景时,仍然非常具有挑战。

与客户交谈时,我们发现有些场景需要处理对单条记录的增量更新,例如:

遵守数据隐私法规,在该法规中,用户选择忘记或更改应用程序对数据使用方式的协议。

使用流数据,当你必须要处理特定的数据插入和更新事件时。

实现变更数据捕获(CDC)架构来跟踪和提取企业数据仓库或运营数据存储中的数据库变更日志。

恢复迟到的数据,或分析特定时间点的数据。

从今天开始,EMR 5.28.0版包含Apache Hudi(孵化中),因此你不再需要构建自定义解决方案来执行记录级别的插入,更新和删除操作。Hudi是Uber于2016年开始开发,以解决摄取和ETL管道效率低下的问题。最近几个月,EMR团队与Apache Hudi社区紧密合作,提供了一些补丁,包括将Hudi更新为Spark 2.4.4,支持Spark Avro,增加了对AWS Glue Data Catalog的支持,以及多个缺陷修复。

使用Hudi,即可以在S3上执行记录级别的插入,更新和删除,从而使你能够遵守数据隐私法律、消费实时流、捕获更新的数据、恢复迟到的数据和以开放的、供应商无关的格式跟踪历史记录和回滚。 创建数据集和表,然后Hudi管理底层数据格式。Hudi使用Apache Parquet和Apache Avro进行数据存储,并内置集成Spark,Hive和Presto,使你能够使用与现在所使用的相同工具来查询Hudi数据集,并且几乎实时地访问新数据。

启动EMR群集时,只要选择以下组件之一(Hive,Spark,Presto),就可以自动安装和配置Hudi的库和工具。你可以使用Spark创建新的Hudi数据集,以及插入,更新和删除数据。每个Hudi数据集都会在集群的已配置元存储库(包括AWS Glue Data Catalog)中进行注册,并显示为可以通过Spark,Hive和Presto查询的表。

Hudi支持两种存储类型,这些存储类型定义了如何写入,索引和从S3读取数据:

写时复制(Copy On Write)– 数据以列格式(Parquet)存储,并且在写入时更新数据数据会创建新版本文件。此存储类型最适合用于读取繁重的工作负载,因为数据集的最新版本在高效的列式文件中始终可用。

读时合并(Merge On Read)– 将组合列(Parquet)格式和基于行(Avro)格式来存储数据; 更新记录至基于行的增量文件中,并在以后进行压缩,以创建列式文件的新版本。 此存储类型最适合于繁重的写工作负载,因为新提交(commit)会以增量文件格式快速写入,但是要读取数据集,,则需要将压缩的列文件与增量文件合并。

下面让我们快速预览下如何在EMR集群中设置和使用Hudi数据集。

结合Apache Hudi与Amazon EMR

从EMR控制台开始创建集群。在高级选项中,选择EMR版本5.28.0(第一个包括Hudi的版本)和以下应用程序:Spark,Hive和Tez。在硬件选项中,添加了3个任务节点,以确保有足够的能力运行Spark和Hive。

群集就绪后,使用在安全性选项中选择的密钥对,通过SSH进入主节点并访问Spark Shell。 使用以下命令来启动Spark Shell以将其与Hudi一起使用:

$ spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

使用以下Scala代码将一些示例ELB日志导入写时复制存储类型的Hudi数据集中:

import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor //Set up various input values as variables val inputDataPath = "s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1/" val hudiTableName = "elb_logs_hudi_cow" val hudiTablePath = "s3://MY-BUCKET/PATH/" + hudiTableName // Set up our Hudi Data Source Options val hudiOptions = Map[String,String]( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "request_ip", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "request_verb", HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "request_timestamp", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "request_verb", DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName) // Read data from S3 and create a DataFrame with Partition and Record Key val inputDF = spark.read.format("parquet").load(inputDataPath) // Write data into the Hudi dataset inputDF.write .format("org.apache.hudi") .options(hudiOptions) .mode(SaveMode.Overwrite) .save(hudiTablePath)

在Spark Shell中,现在就可以计算Hudi数据集中的记录:

scala> inputDF2.count() res1: Long = 10491958

温馨提示: 本文由Jm博客推荐,转载请保留链接: https://www.jmwww.net/file/web/42745.html