Spark SQL的自定义函数UDF使用
CarveStone 人气:0Spark_SQL的UDF使用
用户自定义函数,也叫UDF,可以让我们使用Python/Java/Scala注册自定义函数,并在SQL中调用。这种方法很常用,通常用来给机构内的SQL用户们提供高级功能支持,这样这些用户就可以直接调用注册的函数而无需自己去通过编程来实现了。
- 在Spark SQL中,编写UDF 尤为简单。Spark SQL不仅有自己的UDF接口,也支持已有的Apache Hive UDF。我们可以使用Spark支持的编程语言编写好函数,然后通过Spark SQL内建的方法传递进来,非常便捷地注册我们自己的UDF。
- 在Scala和Python中,可以利用语言原生的函数和lambda语法的支持,而在Java中,则需要扩展对应的UDF类。UDF能够支持各种数据类型,返回类型也可以与调用时的参数类型完全不一样。
UDF简单使用
首先通过代码建立一个测试的DataFrame数据,通过RDD产生,再转换成DataFrame格式,通过写简单的UDF函数,对数据进行操作并输出,例如:
import org.apache.spark.sql.Row import org.apache.spark.rdd._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType} // 通过RDD创建测试数据 val rdd: RDD[Row] = sc.parallelize(List("Michael,male, 29", "Andy,female, 30", "Justin,male, 19", "Dela,female, 25", "Magi,male, 20", "Pule,male,21")) .map(_.split(",")).map(p => Row(p(0),p(1),p(2).trim.toInt)) // 创建Schema val schema = StructType( Array( StructField("name",StringType, true),StructField("sex",StringType, true),StructField("age",IntegerType,true))) // 转换DataFrame val peopleDF = spark.sqlContext.createDataFrame(rdd,schema) // 注册UDF函数 spark.udf.register("strlen",(x:String)=>x.length) // 创建临时表 peopleDF.registerTempTable("people") // 选择输出语句,(选择输出列:名字,名字长度,性别从表people中) spark.sql("select name, strlen(name) as strlen,sex from people").show()
创建 DataFrame
scala> val df = spark.read.json("data/user.json") df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
注册 UDF
scala> spark.udf.register("addName",(x:String)=> "Name:"+x) res9: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
创建临时表
scala> df.createOrReplaceTempView("people")
应用 UDF
scala> spark.sql("Select addName(name),age from people").show()
加载全部内容