如何查看SparkSQL 生成的抽象语法树?
匠心源码 人气:1前言
在《Spark SQL内核剖析》书中4.3章节,谈到Catalyst体系中生成的抽象语法树的节点都是以Context来结尾,在ANLTR4以及生成的SqlBaseParser解析SQL生成,其源码部分就是语法解析,其生成的抽象语法树的节点都是ParserRuleContext的子类。
提出问题
ANLTR4解析SQL生成抽象语法树,最终这颗树长成什么样子,如何查看?
源码分析
测试示例
spark.sql("select id, count(name) from student group by id").show()
源码入口
SparkSession的sql 方法如下:
def sql(sqlText: String): DataFrame = { // TODO 1. 生成LogicalPlan // sqlParser 为 SparkSqlParser val logicalPlan: LogicalPlan = sessionState.sqlParser.parsePlan(sqlText) // 根据 LogicalPlan val frame: DataFrame = Dataset.ofRows(self, logicalPlan) frame // sqlParser }
定位SparkSqlParser
入口源码涉及到SessionState这个关键类,其初始化代码如下:
lazy val sessionState: SessionState = { parentSessionState .map(_.clone(this)) .getOrElse { // 构建 org.apache.spark.sql.internal.SessionStateBuilder val state = SparkSession.instantiateSessionState( SparkSession.sessionStateClassName(sparkContext.conf), self) initialSessionOptions.foreach { case (k, v) => state.conf.setConfString(k, v) } state } }
org.apache.spark.sql.SparkSession$#sessionStateClassName 方法具体如下:
private def sessionStateClassName(conf: SparkConf): String = { // spark.sql.catalogImplementation, 分为 hive 和 in-memory模式,默认为 in-memory 模式 conf.get(CATALOG_IMPLEMENTATION) match { case "hive" => HIVE_SESSION_STATE_BUILDER_CLASS_NAME // hive 实现 org.apache.spark.sql.hive.HiveSessionStateBuilder case "in-memory" => classOf[SessionStateBuilder].getCanonicalName // org.apache.spark.sql.internal.SessionStateBuilder } }
其中,这里用到了builder模式,org.apache.spark.sql.internal.SessionStateBuilder就是用来构建 SessionState的。在 SparkSession.instantiateSessionState 中有具体说明,如下:
/** * Helper method to create an instance of `SessionState` based on `className` from conf. * The result is either `SessionState` or a Hive based `SessionState`. */ private def instantiateSessionState( className: String, sparkSession: SparkSession): SessionState = { try { // org.apache.spark.sql.internal.SessionStateBuilder // invoke `new [Hive]SessionStateBuilder(SparkSession, Option[SessionState])` val clazz = Utils.classForName(className) val ctor = clazz.getConstructors.head ctor.newInstance(sparkSession, None).asInstanceOf[BaseSessionStateBuilder].build() } catch { case NonFatal(e) => throw new IllegalArgumentException(s"Error while instantiating '$className':", e) } }
其中,BaseSessionStateBuilder下面有两个主要实现,分别为 org.apache.spark.sql.hive.HiveSessionStateBuilder(hive模式) 和 org.apache.spark.sql.internal.SessionStateBuilder(in-memory模式,默认)
org.apache.spark.sql.internal.BaseSessionStateBuilder#build 方法,源码如下:
/** * Build the [[SessionState]]. */ def build(): SessionState = { new SessionState( session.sharedState, conf, experimentalMethods, functionRegistry, udfRegistration, () => catalog, sqlParser, () => analyzer, () => optimizer, planner, streamingQueryManager, listenerManager, () => resourceLoader, createQueryExecution, createClone) }
SessionState中,包含了很多的参数,关键参数介绍如下:
conf:SparkConf对象,对SparkSession的配置
functionRegistry:FunctionRegistry对象,负责函数的注册,其内部维护了一个map对象用于维护注册的函数。
UDFRegistration:UDFRegistration对象,用于注册UDF函数,其依赖于FunctionRegistry
catalogBuilder: () => SessionCatalog:返回SessionCatalog对象,其主要用于管理SparkSession的Catalog
sqlParser: ParserInterface, 实际为 SparkSqlParser 实例,其内部调用ASTBuilder将SQL解析为抽象语法树
analyzerBuilder: () => Analyzer, org.apache.spark.sql.internal.BaseSessionStateBuilder.analyzer 自定义 org.apache.spark.sql.catalyst.analysis.Analyzer.Analyzer
optimizerBuilder: () => Optimizer, // org.apache.spark.sql.internal.BaseSessionStateBuilder.optimizer --> 自定义 org.apache.spark.sql.execution.SparkOptimizer.SparkOptimizer
planner: SparkPlanner, // org.apache.spark.sql.internal.BaseSessionStateBuilder.planner --> 自定义 org.apache.spark.sql.execution.SparkPlanner.SparkPlanner
resourceLoaderBuilder: () => SessionResourceLoader,返回资源加载器,主要用于加载函数的jar或资源
createQueryExecution: LogicalPlan => QueryExecution:根据LogicalPlan生成QueryExecution对象
parsePlan方法
SparkSqlParser没有该方法的实现,具体是现在其父类 AbstractSqlParser中,如下:
/** Creates LogicalPlan for a given SQL string. */ // TODO 根据 sql语句生成 逻辑计划 LogicalPlan override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser => val singleStatementContext: SqlBaseParser.SingleStatementContext = parser.singleStatement() astBuilder.visitSingleStatement(singleStatementContext) match { case plan: LogicalPlan => plan case _ => val position = Origin(None, None) throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position) } }
其中 parse 方法后面的方法是一个回调函数,它在parse 方法中被调用,如下:
org.apache.spark.sql.execution.SparkSqlParser#parse源码如下:
private val substitutor = new VariableSubstitution(conf) // 参数替换器 protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = { super.parse(substitutor.substitute(command))(toResult) }
其中,substitutor是一个参数替换器,用于把SQL中的参数都替换掉,继续看其父类AbstractSqlParser的parse 方法:
protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = { logDebug(s"Parsing command: $command") // 词法分析 val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command))) lexer.removeErrorListeners() lexer.addErrorListener(ParseErrorListener) lexer.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced // 语法分析 val tokenStream = new CommonTokenStream(lexer) val parser = new SqlBaseParser(tokenStream) parser.addParseListener(PostProcessor) parser.removeErrorListeners() parser.addErrorListener(ParseErrorListener) parser.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced try { try { // first, try parsing with potentially faster SLL mode parser.getInterpreter.setPredictionMode(PredictionMode.SLL) // 使用 AstBuilder 生成 Unresolved LogicalPlan toResult(parser) } catch { case e: ParseCancellationException => // if we fail, parse with LL mode tokenStream.seek(0) // rewind input stream parser.reset() // Try Again. parser.getInterpreter.setPredictionMode(PredictionMode.LL) toResult(parser) } } catch { case e: ParseException if e.command.isDefined => throw e case e: ParseException => throw e.withCommand(command) case e: AnalysisException => val position = Origin(e.line, e.startPosition) throw new ParseException(Option(command), e.message, position, position) } }
在这个方法中调用ANLTR4的API将SQL转换为AST抽象语法树,然后调用 toResult(parser) 方法,这个 toResult 方法就是parsePlan 方法的回调方法。
截止到调用astBuilder.visitSingleStatement 方法之前, AST抽象语法树已经生成。
打印生成的AST
修改源码
下面,看 astBuilder.visitSingleStatement 方法:
override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) { val statement: StatementContext = ctx.statement printRuleContextInTreeStyle(statement, 1) // 调用accept 生成 逻辑算子树AST visit(statement).asInstanceOf[LogicalPlan] }
在使用访问者模式访问AST节点生成UnResolved LogicalPlan之前,我定义了一个方法用来打印刚解析生成的抽象语法树, printRuleContextInTreeStyle 代码如下:
/** * 树形打印抽象语法树 */ private def printRuleContextInTreeStyle(ctx: ParserRuleContext, level:Int): Unit = { val prefix:String = "|" val curLevelStr: String = "-" * level val childLevelStr: String = "-" * (level + 1) println(s"${prefix}${curLevelStr} ${ctx.getClass.getCanonicalName}") val children: util.List[ParseTree] = ctx.children if( children == null || children.size() == 0) { return } children.iterator().foreach { case context: ParserRuleContext => printRuleContextInTreeStyle(context, level + 1) case _ => println(s"${prefix}${childLevelStr} ${ctx.getClass.getCanonicalName}") } }
三种SQL打印示例
SQL示例1(带where)
select name from student where age > 18
其生成的AST如下:
|- org.apache.spark.sql.catalyst.parser.SqlBaseParser.StatementDefaultContext |-- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QueryContext |--- org.apache.spark.sql.catalyst.parser.SqlBaseParser.SingleInsertQueryContext |---- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QueryTermDefaultContext |----- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QueryPrimaryDefaultContext |------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.QuerySpecificationContext |------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QuerySpecificationContext |------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.NamedExpressionSeqContext |-------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.NamedExpressionContext |--------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ExpressionContext |---------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.PredicatedContext |----------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ValueExpressionDefaultContext |------------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.ColumnReferenceContext |------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.IdentifierContext |-------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |--------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.FromClauseContext |-------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.FromClauseContext |-------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.RelationContext |--------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.TableNameContext |---------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.TableIdentifierContext |----------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.IdentifierContext |------------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |---------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.TableAliasContext |------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QuerySpecificationContext |------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.PredicatedContext |-------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ComparisonContext |--------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ValueExpressionDefaultContext |---------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ColumnReferenceContext |----------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.IdentifierContext |------------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |--------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ComparisonOperatorContext |---------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ComparisonOperatorContext |--------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ValueExpressionDefaultContext |---------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ConstantDefaultContext |----------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.NumericLiteralContext |------------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.IntegerLiteralContext |------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.IntegerLiteralContext |---- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QueryOrganizationContext
SQL示例2(带排序)
select name from student where age > 18 order by id desc
其生成的AST如下:
|- org.apache.spark.sql.catalyst.parser.SqlBaseParser.StatementDefaultContext |-- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QueryContext |--- org.apache.spark.sql.catalyst.parser.SqlBaseParser.SingleInsertQueryContext |---- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QueryTermDefaultContext |----- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QueryPrimaryDefaultContext |------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.QuerySpecificationContext |------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QuerySpecificationContext |------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.NamedExpressionSeqContext |-------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.NamedExpressionContext |--------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ExpressionContext |---------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.PredicatedContext |----------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ValueExpressionDefaultContext |------------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.ColumnReferenceContext |------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.IdentifierContext |-------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |--------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.FromClauseContext |-------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.FromClauseContext |-------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.RelationContext |--------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.TableNameContext |---------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.TableIdentifierContext |----------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.IdentifierContext |------------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |---------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.TableAliasContext |------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QuerySpecificationContext |------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.PredicatedContext |-------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ComparisonContext |--------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ValueExpressionDefaultContext |---------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ColumnReferenceContext |----------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.IdentifierContext |------------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |--------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ComparisonOperatorContext |---------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ComparisonOperatorContext |--------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ValueExpressionDefaultContext |---------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ConstantDefaultContext |----------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.NumericLiteralContext |------------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.IntegerLiteralContext |------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.IntegerLiteralContext |---- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QueryOrganizationContext |----- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QueryOrganizationContext |----- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QueryOrganizationContext |----- org.apache.spark.sql.catalyst.parser.SqlBaseParser.SortItemContext |------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.ExpressionContext |------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.PredicatedContext |-------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ValueExpressionDefaultContext |--------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ColumnReferenceContext |---------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.IdentifierContext |----------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |------------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.SortItemContext
SQL示例2(带分组)
select id, count(name) from student group by id
其生成的AST如下:
|- org.apache.spark.sql.catalyst.parser.SqlBaseParser.StatementDefaultContext |-- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QueryContext |--- org.apache.spark.sql.catalyst.parser.SqlBaseParser.SingleInsertQueryContext |---- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QueryTermDefaultContext |----- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QueryPrimaryDefaultContext |------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.QuerySpecificationContext |------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QuerySpecificationContext |------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.NamedExpressionSeqContext |-------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.NamedExpressionContext |--------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ExpressionContext |---------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.PredicatedContext |----------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ValueExpressionDefaultContext |------------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.ColumnReferenceContext |------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.IdentifierContext |-------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |--------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |-------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.NamedExpressionSeqContext |-------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.NamedExpressionContext |--------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ExpressionContext |---------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.PredicatedContext |----------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ValueExpressionDefaultContext |------------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.FunctionCallContext |------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QualifiedNameContext |-------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.IdentifierContext |--------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |---------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.FunctionCallContext |------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ExpressionContext |-------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.PredicatedContext |--------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ValueExpressionDefaultContext |---------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ColumnReferenceContext |----------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.IdentifierContext |------------------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |------------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.FunctionCallContext |------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.FromClauseContext |-------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.FromClauseContext |-------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.RelationContext |--------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.TableNameContext |---------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.TableIdentifierContext |----------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.IdentifierContext |------------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |---------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.TableAliasContext |------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.AggregationContext |-------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.AggregationContext |-------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.AggregationContext |-------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ExpressionContext |--------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.PredicatedContext |---------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ValueExpressionDefaultContext |----------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.ColumnReferenceContext |------------ org.apache.spark.sql.catalyst.parser.SqlBaseParser.IdentifierContext |------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |-------------- org.apache.spark.sql.catalyst.parser.SqlBaseParser.UnquotedIdentifierContext |---- org.apache.spark.sql.catalyst.parser.SqlBaseParser.QueryOrganizationContext
总结
在本篇文章中,主要从测试代码出发,到如何调用ANTLR4解析SQL得到生成AST,并且修改了源码来打印这个AST树。尽管现在看来,使用ANTLR解析SQL生成AST是一个black box,但对于Spark SQL来说,其后续流程的输入已经得到。
加载全部内容