DEV Community

Cover image for Apache Hudi on AWS Glue
Sagar Lakshmipathy
Sagar Lakshmipathy

Posted on

Apache Hudi on AWS Glue

Have you wondered how to write Hudi tables (Scala) in AWS Glue?
Look no further.

Pre-requisites

  • Create a Glue Database called hudi_db from the Databases under Data Catalog menu in the Glue Console

Let's pick the Apache Hudi Spark QuickStart guide to drive this example.

Configuring the job

  • In Glue console, choose ETL Jobs then choose Script Editor
  • Now in the tabs above, choose Job details and in Language choose Scala
  • Feel free to make any infra changes as required.
  • Click on Advanced properties and navigate to Job parameters and add the below parameters one by one. Of course, change these variables as you prefer.
    • --S3_OUTPUT_PATH as s3://hudi-spark-quickstart/write-path/
    • --class as GlueApp
    • --conf as spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
    • --datalake-formats as hudi

Note: In this example, I'm using the default Hudi version - 0.12.0 - that comes with Glue 4.0. If you want to use a different Hudi version, you might have to add the jar to the class path by adding one more property --extra-jars and point to the S3 path of the Hudi JAR file.

On to the cool stuff now.

Scripting

Navigate to the Script tab and add the below Scala code

Let's add the boiler plate imports

import com.amazonaws.services.glue.{GlueContext, DynamicFrame}
import com.amazonaws.services.glue.util.GlueArgParser
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.types._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import com.amazonaws.services.glue.log.GlueLogger
Enter fullscreen mode Exit fullscreen mode

Add glue specific code, i.e. to parse the job parameters and to create a glueContext

object GlueApp {
  def main(sysArgs: Array[String]) {
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "S3_OUTPUT_PATH").toArray)
    val spark: SparkSession = SparkSession.builder().appName("AWS Glue Hudi Job").getOrCreate()
    val glueContext: GlueContext = new GlueContext(spark.sparkContext)
    val logger = new GlueLogger()
Enter fullscreen mode Exit fullscreen mode

Prepping the data.

import spark.implicits._

    val tableName = "trips"
    val recordKeyColumn = "uuid"
    val precombineKeyColumn = "ts"
    val partitionKeyColumn = "city"
    val s3OutputPath = args("S3_OUTPUT_PATH")
    val glueDbName = "hudi_db"
    val writePath = s"$s3OutputPath/$tableName"


    val columns = Seq("ts","uuid","rider","driver","fare","city")
    val data =
      Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
        (1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
        (1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
        (1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"    ),
        (1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
Enter fullscreen mode Exit fullscreen mode

Add the options required by Hudi to write the table and sync it with Glue Database.

    val hudiOptions = Map[String, String](
      "hoodie.table.name" -> tableName,
      "hoodie.datasource.write.recordkey.field" -> recordKeyColumn,
      "hoodie.datasource.write.precombine.field" -> precombineKeyColumn,
      "hoodie.datasource.write.partitionpath.field" -> partitionKeyColumn,
      "hoodie.datasource.write.hive_style_partitioning" -> "true",
      "hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
      "hoodie.datasource.write.operation" -> "upsert",
      "hoodie.datasource.hive_sync.enable" -> "true",
      "hoodie.datasource.hive_sync.database" -> glueDbName,
      "hoodie.datasource.hive_sync.table" -> tableName,
      "hoodie.datasource.hive_sync.partition_fields" -> partitionKeyColumn,
      "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
      "hoodie.datasource.hive_sync.use_jdbc" -> "false",
      "hoodie.datasource.hive_sync.mode" -> "hms",
      "path" -> writePath
    )
Enter fullscreen mode Exit fullscreen mode

Finally create the dataframe and write it to S3.

    var inserts = spark.createDataFrame(data).toDF(columns:_*)

    inserts.write
      .format("hudi")
      .options(hudiOptions)
      .mode("overwrite")
      .save()

    logger.info("Data successfully written to S3 using Hudi")
  }
}
Enter fullscreen mode Exit fullscreen mode

Querying

Now that we have written the table to S3, we can query this table from Athena.

SELECT * FROM "hudi_db"."trips" limit 10;
Enter fullscreen mode Exit fullscreen mode

Top comments (0)