亲宝软件园·资讯

展开

Spark SQL的自定义函数UDF使用

CarveStone 人气:0

Spark_SQL的UDF使用

用户自定义函数,也叫UDF,可以让我们使用Python/Java/Scala注册自定义函数,并在SQL中调用。这种方法很常用,通常用来给机构内的SQL用户们提供高级功能支持,这样这些用户就可以直接调用注册的函数而无需自己去通过编程来实现了。

UDF简单使用

首先通过代码建立一个测试的DataFrame数据,通过RDD产生,再转换成DataFrame格式,通过写简单的UDF函数,对数据进行操作并输出,例如:

import org.apache.spark.sql.Row
import org.apache.spark.rdd._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
// 通过RDD创建测试数据
val rdd: RDD[Row] = sc.parallelize(List("Michael,male, 29",            
"Andy,female, 30",
"Justin,male, 19",
"Dela,female, 25",
"Magi,male, 20",
"Pule,male,21"))
.map(_.split(",")).map(p => Row(p(0),p(1),p(2).trim.toInt))
// 创建Schema
val schema = StructType( Array( StructField("name",StringType, true),StructField("sex",StringType, true),StructField("age",IntegerType,true)))
// 转换DataFrame  
val peopleDF = spark.sqlContext.createDataFrame(rdd,schema) 
// 注册UDF函数    
spark.udf.register("strlen",(x:String)=>x.length)
// 创建临时表       
peopleDF.registerTempTable("people")                  
// 选择输出语句,(选择输出列:名字,名字长度,性别从表people中)
spark.sql("select name, strlen(name) as strlen,sex from people").show()

创建 DataFrame

scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

注册 UDF

scala> spark.udf.register("addName",(x:String)=> "Name:"+x)
res9: org.apache.spark.sql.expressions.UserDefinedFunction = 
UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

创建临时表

scala> df.createOrReplaceTempView("people")

应用 UDF

scala> spark.sql("Select addName(name),age from people").show()

加载全部内容

相关教程
猜你喜欢
用户评论