Spark SQL supports querying data via SQL and in order to use this feature, we must enable Spark with Hive support, because Spark uses Hive Metastore to store metadata. By default, Spark uses an in-memory embedded database called Derby to store the metadata, but it can also configure to use an external Hive Metastore. Spark Hive Configuration can be found here: Hive Tables - Spark 2.4.1 Documentation
Except using DDL SQL to manipulate metadata stored in Hive Metastore, Spark SQL also provides a minimalist API know as Catalog API to manipulate metadata in spark applications. Spark Catalog API can be found here: Catalog (Spark 2.2.1 JavaDoc) and pyspark.sql.catalog — PySpark master documentation.
After using Spark Catalog API for a period of time, I found some pitfalls.
1. Problem in creating external table with specified schema
When a user create an external table with specified schema, the schema will actually not be used and the table will fail to be recognized as a partitioned table. I dived into the Spark source code and found the reason. The source code is from Spark 2.3.2.
Source Code is listed below and I omitted some code and added some notes start with //NOTE:
.
The API where implemented by CatalogImpl.scala
override def createTable(
tableName: String,
source: String,
schema: StructType,
options: Map[String, String]): DataFrame = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
val storage = DataSource.buildStorageFormatFromOptions(options)
val tableType = if (storage.locationUri.isDefined) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
}
//NOTE: user specified schema is set to tableDesc and use no partitionColumnNames can specify here
val tableDesc = CatalogTable(
identifier = tableIdent,
tableType = tableType,
storage = storage,
schema = schema,
provider = Some(source)
)
val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None)
sparkSession.sessionState.executePlan(plan).toRdd
sparkSession.table(tableIdent)
}
The Catalog createTable API will eventually run CreateDataSourceTableCommand
in createDataSourceTables.scala
case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boolean)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
...
//NOTE: the resolved dataSource include an inferred schema and partitionSchema
val dataSource: BaseRelation =
DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
partitionColumns = table.partitionColumnNames,
className = table.provider.get,
bucketSpec = table.bucketSpec,
options = table.storage.properties ++ pathOption,
// As discussed in SPARK-19583, we don't check if the location is existed
catalogTable = Some(tableWithDefaultOptions)).resolveRelation(checkFilesExist = false)
val partitionColumnNames = if (table.schema.nonEmpty) {
//NOTE: if user specified schama, use the partitionColumnNames from CatalogTable passed to the class
table.partitionColumnNames
} else {
...
}
val newTable = dataSource match {
case r: HadoopFsRelation if r.overlappedPartCols.nonEmpty =>
...
case _ =>
table.copy(
schema = dataSource.schema, //NOTE: no matter whether user specify a schema, just use the infered schmea here
partitionColumnNames = partitionColumnNames,
...
)
}
...
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
Seq.empty[Row]
}
}
Solution
Don’t know the reason why the source code is written in that way and whether this should be considered as a bug. One solution we come up with for this problem is to implement a custom crateTable API.
object CatalogUtil {
lazy val sparkSession = SparkSession.getActiveSession.get
...
def createTable(
tableName: String,
source: String,
path: String,
schema: StructType,
options: Map[String, String],
partitionColumnsInSchema: Boolean
): Unit = {
...
val table = CatalogTable(
identifier = tableIdent,
tableType = tableType,
storage = storage,
schema = schema,
provider = Some(newSource)
)
...
val dataSource: BaseRelation =
DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
partitionColumns = table.partitionColumnNames,
className = table.provider.get,
...
)).resolveRelation(checkFilesExist = false)
//NOTE: alway use infered partitionColumnNames here, cause we don't allow use specify partitionColumnNames
val partitionColumns: Array[StructField] = {
assert(table.partitionColumnNames.isEmpty)
dataSource match {
case r: HadoopFsRelation => r.partitionSchema.fields
case _ => Array.empty
}
}
val partitionColumnNames = partitionColumns.map(_.name)
//NOTE: merge partitionColumns into table schema if necessary
val newSchema: StructType = if (table.schema.nonEmpty) {
if (partitionColumnsInSchema) {
table.schema
} else {
table.schema.fields.map(_.name).intersect(partitionColumnNames) match {
case Array() => StructType(table.schema.fields ++ partitionColumns)
case arr => {
val message = "Partition column names: " +
s"[${arr.mkString(",")}] cannot exist in user specified schema.\n" +
s" Inferred partition columns: [${partitionColumnNames.mkString(",")}].\n" +
s" User specified schema:\n${table.schema.treeString}"
throw new ConflictedSchemaException(message)
}
}
}
} else {
dataSource.schema
}
val newTable =
table.copy(
schema = newSchema, //NOTE: use specifed schema
partitionColumnNames = partitionColumnNames,
...
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
}
}
As you see our custom API is written in Scala, if we want to use in a python application, we must use Python Java Gateway. You can find more detail here:Using Scala code in PySpark applications
2. Partitioned table data only avaliable after invoked recoverPartitions
If you created an external partitioned table and immediately read data from the table, you will find the data is empty. Data only available after invoked recoverPartitions
method. This is different from a non-partitioned table and Spark Catalog doesn’t have a convenient method for testing whether the table is partitioned.
class SparkCatalog(Catalog):
...
def is_partitioned_table(self, table_name):
return "PARTITIONED BY" in self.get_create_table_statement(table_name)
def get_create_table_statement(self, table_name):
return self._sparkSession.sql(f"show create table `{table_name}`").collect()[0]["createtab_stmt"]
def create_or_refresh_table(...):
...
if self.is_partitioned_table(table):
self.recoverPartitions(table)
...
3. refreshTable
API only refresh the cache in SparkContext
catalog.refreshTable
only refresh the cached data and metadata of the given table in sparkContext, if we want to update the table’s meta(schema, transient_lastDdlTime and etc) in the hive metastore, we need to drop and recreate the table.