Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Spark数据湖

如果在spark中使用iceberg

spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.13:1.2.1 \
            --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
            --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
            --conf spark.sql.catalog.spark_catalog.type=hive \
            --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
            --conf spark.sql.catalog.local.type=hadoop \
            --conf spark.sql.catalog.local.warehouse=$PWD/warehouse \
            --conf spark.sql.defaultCatalog=local \
            --conf spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=4747

SparkSession

SparkSession就是spark shell中的spark变量的类型。SparkSession的提供了很多读写数据的方法,还有支持通过sql方法执行SQL语句。

SparkCatalog

SparkCatalog是提供table的查找,打开,管理Table对象。另外一个Catalog实 现是SparkSessionCatalog,用于支持Spark内建的Catalog,比如Spark使用Hive Metastore管理Table的路径、分区等信息时。Iceberg是通过向Spark注册了一个 SparkCatalog实现支持Iceberg在Table功能上的扩展。 SparkCatalog支持如下操作:

  • 创建Iceberg格式的Table
  • 打开一个Iceberg格式的Table,返回一个Iceberg的Table对象
  • 创建View
  • 对Table的一些配置进行修改
  • 对Table改名
  • 删除一个Table

SparkCatalog还依赖一些其他类实现其功能

  • HadoopCatalog
  • HadoopTableOperations
  • BaseTable BaseTable通过TableOperations依赖于HadoopTableOperations
  • SparkTable SparkTable是org.apache.spark.sql.connector.catalog.Table的实现,是BaseTable的代理对象。

QueryExecution

  • QueryExecution
    • assertCommandExecuted()
      • commandExecuted
        • analyzed.mapChildren(eagerlyExecuteCommands)
        • eagerlyExecuteCommands(analyzed)
        • analyzed

spark写流程

  • AppendData –(ExtendedV2Writes strategy)–> AppendData(write = SparkWriteBuilder.build())
  • Spark –> AppendDataExec.run() –> V2TableWriteExec.writeWithV2
    • SparkWrite.asBatchAppend()
    • BatchWrite.createBatchWriterFactory()
    • WriterFactory.createWriter()
    • DataWriter.write(row)
      • UnpartitionedDataWriter.write(row)
      • PartitionedDataWriter.write(row)
  • V2TableWriteExec in WriteToDataSourceV2Exec.scala is a physical plan
    • writeWithV2(batchWrite: BatchWrite)
  • SparkWrite
  • BatchAppend indirectly implements BatchWrite
    • createBatchWriteFactory returns WriterFactory
    • commit
  • WriterFactory in SparkWrite.java
    • createWriter returns UnpartitonedDataWriter or PartitionedDataWriter
  • UnpartitonedDataWriter
  • RollingDataWriter
  • PartitionedDataWriter
  • OutputFileFactory - Factory responsible for generating unique but recognizable data file names and file objects.
  • SparkFileWriteFactory - Return data file writer for Parquet, ORC or AVRO

创建QueryExecution

  • SessionState.executePlan(logicalPlan: LogicalPlan)

Analalyze UnresolvedRelation(multipartIdentifier, …)

  • Dataset.ofRows
    • SessionState.executePlan(LogicalPlan)
      • createQueryexecution(LogicalPlan, mode)
    • QueryExecution.assertAnalyzed()
      • lazy val analyzed
        • RuleExecutor.execute(LogicalPlan)
          • batches.foreach(…)
            • ResolveRelations.apply
              • ResolveRelations.lookupRelations
    • new DatasetRow

Table Scan Pipeline

  • AstBuilder.visitTable
    • UnresolvedRelation (multiIdentifiers: Seq[String])
  • Analyzer.ResolveRelations.apply(plan: LogicalPlan)
    • Analzyer.lookupRelation
      • DataSourceV2Relation (table: SparkTable, catalog: SparkCatalog, …)
  • V2ScanRelationPushDown.apply(plan: LogicalPlan) ???
    • createScanBuilder(plan: LogicalPlan): LogicalPlan
    • pushDownFilters(plan: LogicalPlan): LogicalPlan
      • PushDownUtils.pushFilters(scanBuilder: ScanBuilder, filters: Seq[Expression])
        • SparkScanBuilder.pushFilters(filters: Filter[])
    • pruneColumns(plan: LogicalPlan): LogicalPlan
      • DataSourceV2ScanRelation (relation: DataSourceV2Relation, scan: Scan, output: Seq[AttributeReference])
        • SparkBatchQueryScan <– SparkScanBuilder.build
  • DataSourceV2Strategy.apply
    • BatchScanExec(output: Seq[AttributeReference], scan: Scan, runtimeFilters: Seq[Expression], keyGroupedPartitioning: Option[Seq[Expression]])
      • val inputRDD: RDD[InternalRow]
        • val filteredPartitions: Seq[Seq[InputPartition]]
          • SparkBatchQueryScan.filter(filters: Filter[])
            • SparkPartitioningAwareScan.tasks()
              • SnapshotScan.planFiles()
                • DataTableScan.doPlanFiles()
                  • Snapshot.dataManifests(io)
                  • Snapshot.deleteManifests(io)
                  • ManifestGroup.planFiles()
                    • ManifestGroup.plan(ManifestGroup::createFileScanTasks)
          • SparkBatch.planInputPartitions()
            • input parameter taskGroups = SparkPartitioningAwareScan.taskGroups()
              • SparkPartitioningAwareScan.tasks()

Iceberg Table Spec

  • Table Metadata (json file)
    • Snapshots
      • Manifest List (avro file)
        • Manifest Files (avro file; data manifest or delete manifest)
          • Data Files (parquet/orc, other supported formats)

开发计划

功能列表

  • 写流程修改
    • 找到数据入口,写入日志 和memtables
    • Memtable flush和 日志回收
    • 每个task生成一个loglet(日志的一段)
    • 维护日志ID
    • 设计衡量数据partition column的选择机制
      • 数据分布是否均衡?
      • 新老数据合并的开销?
      • 小文件的数目?写与读的矛盾,通过控制小文件数目
      • 查询开销:读取的无关数据的比重?
    • 设计转换机制(修改partition,增加partition等)
      • 乒乓转换:一个当前数据分布层,一个转换目标层
      • 限制单个文件大小。写入一定量数据,同时完成同样量的转换,保证整个数据转换任务分散并可以完成
      • 减少文件内容的转化,尽量按文件级别操作数据

OBSOLETE

特性列表

  • 自动数据partitioning
    • 数据先写日志
    • 动态partitioning管理
  • 面向数据湖的两层Index结构
    • 共享的文件级Index
    • 单文件的Index

支持分层的数据湖格式(HTAP)

  • 基于lsm结构组织文件
    • 第一层采用tiering的方式排布
    • 第二层以上采用leveling的方式组织
    • 行是否唯一
  • 共享索引结构
    • 通过MVCC支持一写多读
    • 对第一层的数据进行索引
      • 使用什么列来做索引内容
      • 索引指针是什么
    • 二层索引
      • 第一层range索引
      • 第二层文件内索引
    • 使用AVRO构建索引节点
  • 读数据
    • 逐层读取数据,再做数据合并
    • 多个partition并发查询,从第一层过滤自己感兴趣的数据
  • 写数据
    • 先写到第一层,再聚合起来下降到下层
    • 写类型
      • insert into
      • merge into
      • insert overwrite
    • 在内存中生成关键列的索引,生成索引文件或部分