Spark数据湖
如果在spark中使用iceberg
spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.13:1.2.1 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.local.type=hadoop \
--conf spark.sql.catalog.local.warehouse=$PWD/warehouse \
--conf spark.sql.defaultCatalog=local \
--conf spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=4747
SparkSession
SparkSession就是spark shell中的spark变量的类型。SparkSession的提供了很多读写数据的方法,还有支持通过sql方法执行SQL语句。
SparkCatalog
SparkCatalog是提供table的查找,打开,管理Table对象。另外一个Catalog实 现是SparkSessionCatalog,用于支持Spark内建的Catalog,比如Spark使用Hive Metastore管理Table的路径、分区等信息时。Iceberg是通过向Spark注册了一个 SparkCatalog实现支持Iceberg在Table功能上的扩展。 SparkCatalog支持如下操作:
- 创建Iceberg格式的Table
- 打开一个Iceberg格式的Table,返回一个Iceberg的Table对象
- 创建View
- 对Table的一些配置进行修改
- 对Table改名
- 删除一个Table
SparkCatalog还依赖一些其他类实现其功能
- HadoopCatalog
- HadoopTableOperations
- BaseTable BaseTable通过TableOperations依赖于HadoopTableOperations
- SparkTable SparkTable是org.apache.spark.sql.connector.catalog.Table的实现,是BaseTable的代理对象。
QueryExecution
- QueryExecution
- assertCommandExecuted()
- commandExecuted
- analyzed.mapChildren(eagerlyExecuteCommands)
- eagerlyExecuteCommands(analyzed)
- analyzed
- commandExecuted
- assertCommandExecuted()
spark写流程
- AppendData –(ExtendedV2Writes strategy)–> AppendData(write = SparkWriteBuilder.build())
- Spark –> AppendDataExec.run() –> V2TableWriteExec.writeWithV2
- SparkWrite.asBatchAppend()
- BatchWrite.createBatchWriterFactory()
- WriterFactory.createWriter()
- DataWriter
.write(row) - UnpartitionedDataWriter.write(row)
- PartitionedDataWriter.write(row)
- V2TableWriteExec in WriteToDataSourceV2Exec.scala is a physical plan
- writeWithV2(batchWrite: BatchWrite)
- SparkWrite
- BatchAppend indirectly implements BatchWrite
- createBatchWriteFactory returns WriterFactory
- commit
- WriterFactory in SparkWrite.java
- createWriter returns UnpartitonedDataWriter or PartitionedDataWriter
- UnpartitonedDataWriter
- RollingDataWriter
- PartitionedDataWriter
- OutputFileFactory - Factory responsible for generating unique but recognizable data file names and file objects.
- SparkFileWriteFactory - Return data file writer for Parquet, ORC or AVRO
创建QueryExecution
- SessionState.executePlan(logicalPlan: LogicalPlan)
Analalyze UnresolvedRelation(multipartIdentifier, …)
- Dataset.ofRows
- SessionState.executePlan(LogicalPlan)
- createQueryexecution(LogicalPlan, mode)
- QueryExecution.assertAnalyzed()
- lazy val analyzed
- RuleExecutor.execute(LogicalPlan)
- batches.foreach(…)
- ResolveRelations.apply
- ResolveRelations.lookupRelations
- ResolveRelations.apply
- batches.foreach(…)
- RuleExecutor.execute(LogicalPlan)
- lazy val analyzed
- new DatasetRow
- SessionState.executePlan(LogicalPlan)
Table Scan Pipeline
- AstBuilder.visitTable
- UnresolvedRelation (multiIdentifiers: Seq[String])
- Analyzer.ResolveRelations.apply(plan: LogicalPlan)
- Analzyer.lookupRelation
- DataSourceV2Relation (table: SparkTable, catalog: SparkCatalog, …)
- Analzyer.lookupRelation
- V2ScanRelationPushDown.apply(plan: LogicalPlan) ???
- createScanBuilder(plan: LogicalPlan): LogicalPlan
- pushDownFilters(plan: LogicalPlan): LogicalPlan
- PushDownUtils.pushFilters(scanBuilder: ScanBuilder, filters: Seq[Expression])
- SparkScanBuilder.pushFilters(filters: Filter[])
- PushDownUtils.pushFilters(scanBuilder: ScanBuilder, filters: Seq[Expression])
- pruneColumns(plan: LogicalPlan): LogicalPlan
- DataSourceV2ScanRelation (relation: DataSourceV2Relation, scan: Scan, output: Seq[AttributeReference])
- SparkBatchQueryScan <– SparkScanBuilder.build
- DataSourceV2ScanRelation (relation: DataSourceV2Relation, scan: Scan, output: Seq[AttributeReference])
- DataSourceV2Strategy.apply
- BatchScanExec(output: Seq[AttributeReference], scan: Scan, runtimeFilters: Seq[Expression], keyGroupedPartitioning: Option[Seq[Expression]])
- val inputRDD: RDD[InternalRow]
- val filteredPartitions: Seq[Seq[InputPartition]]
- SparkBatchQueryScan.filter(filters: Filter[])
- SparkPartitioningAwareScan.tasks()
- SnapshotScan.planFiles()
- DataTableScan.doPlanFiles()
- Snapshot.dataManifests(io)
- Snapshot.deleteManifests(io)
- ManifestGroup.planFiles()
- ManifestGroup.plan(ManifestGroup::createFileScanTasks)
- DataTableScan.doPlanFiles()
- SnapshotScan.planFiles()
- SparkPartitioningAwareScan.tasks()
- SparkBatch.planInputPartitions()
- input parameter taskGroups = SparkPartitioningAwareScan.taskGroups()
- SparkPartitioningAwareScan.tasks()
- input parameter taskGroups = SparkPartitioningAwareScan.taskGroups()
- SparkBatchQueryScan.filter(filters: Filter[])
- val filteredPartitions: Seq[Seq[InputPartition]]
- val inputRDD: RDD[InternalRow]
- BatchScanExec(output: Seq[AttributeReference], scan: Scan, runtimeFilters: Seq[Expression], keyGroupedPartitioning: Option[Seq[Expression]])
Iceberg Table Spec
- Table Metadata (json file)
- Snapshots
- Manifest List (avro file)
- Manifest Files (avro file; data manifest or delete manifest)
- Data Files (parquet/orc, other supported formats)
- Manifest Files (avro file; data manifest or delete manifest)
- Manifest List (avro file)
- Snapshots
开发计划
功能列表
- 写流程修改
- 找到数据入口,写入日志
和memtables Memtable flush和日志回收- 每个task生成一个loglet(日志的一段)
- 维护日志ID
- 设计衡量数据partition column的选择机制
- 数据分布是否均衡?
- 新老数据合并的开销?
- 小文件的数目?写与读的矛盾,通过控制小文件数目
- 查询开销:读取的无关数据的比重?
- 设计转换机制(修改partition,增加partition等)
- 乒乓转换:一个当前数据分布层,一个转换目标层
- 限制单个文件大小。写入一定量数据,同时完成同样量的转换,保证整个数据转换任务分散并可以完成
- 减少文件内容的转化,尽量按文件级别操作数据
- 找到数据入口,写入日志
OBSOLETE
特性列表
- 自动数据partitioning
- 数据先写日志
- 动态partitioning管理
- 面向数据湖的两层Index结构
- 共享的文件级Index
- 单文件的Index
支持分层的数据湖格式(HTAP)
- 基于lsm结构组织文件
- 第一层采用tiering的方式排布
- 第二层以上采用leveling的方式组织
- 行是否唯一
- 共享索引结构
- 通过MVCC支持一写多读
- 对第一层的数据进行索引
- 使用什么列来做索引内容
- 索引指针是什么
- 二层索引
- 第一层range索引
- 第二层文件内索引
- 使用AVRO构建索引节点
- 读数据
- 逐层读取数据,再做数据合并
- 多个partition并发查询,从第一层过滤自己感兴趣的数据
- 写数据
- 先写到第一层,再聚合起来下降到下层
- 写类型
- insert into
- merge into
- insert overwrite
- 在内存中生成关键列的索引,生成索引文件或部分