DataFrame:通过SparkSql将scala类转为DataFrame DataFrame:通过SparkSql将scala类转为DataFrame的方法
silentwolfyh 人气:0如下所示:
import java.text.DecimalFormat import com.alibaba.fastjson.JSON import com.donews.data.AppConfig import com.typesafe.config.ConfigFactory import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{Row, SaveMode, DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} import org.slf4j.LoggerFactory /** * Created by silentwolf on 2016/6/3. */ case class UserTag(SUUID: String, MAN: Float, WOMAN: Float, AGE10_19: Float, AGE20_29: Float, AGE30_39: Float, AGE40_49: Float, AGE50_59: Float, GAME: Float, MOVIE: Float, MUSIC: Float, ART: Float, POLITICS_NEWS: Float, FINANCIAL: Float, EDUCATION_TRAINING: Float, HEALTH_CARE: Float, TRAVEL: Float, AUTOMOBILE: Float, HOUSE_PROPERTY: Float, CLOTHING_ACCESSORIES: Float, BEAUTY: Float, IT: Float, BABY_PRODUCT: Float, FOOD_SERVICE: Float, HOME_FURNISHING: Float, SPORTS: Float, OUTDOOR_ACTIVITIES: Float, MEDICINE: Float ) object UserTagTable { val LOG = LoggerFactory.getLogger(UserOverviewFirst.getClass) val REP_HOME = s"${AppConfig.HDFS_MASTER}/${AppConfig.HDFS_REP}" def main(args: Array[String]) { var startTime = System.currentTimeMillis() val conf: com.typesafe.config.Config = ConfigFactory.load() val sc = new SparkContext() val sqlContext = new SQLContext(sc) var df1: DataFrame = null if (args.length == 0) { println("请输入: appkey , StartTime : 2016-04-10 ,StartEnd :2016-04-11") } else { var appkey = args(0) var lastdate = args(1) df1 = loadDataFrame(sqlContext, appkey, "2016-04-10", lastdate) df1.registerTempTable("suuidTable") sqlContext.udf.register("taginfo", (a: String) => userTagInfo(a)) sqlContext.udf.register("intToString", (b: Long) => intToString(b)) import sqlContext.implicits._ //***重点***:将临时表中的suuid和自定函数中Json数据,放入UserTag中。 sqlContext.sql(" select distinct(suuid) AS suuid,taginfo(suuid) from suuidTable group by suuid").map { case Row(suuid: String, taginfo: String) => val taginfoObj = JSON.parseObject(taginfo) UserTag(suuid.toString, taginfoObj.getFloat("man"), taginfoObj.getFloat("woman"), taginfoObj.getFloat("age10_19"), taginfoObj.getFloat("age20_29"), taginfoObj.getFloat("age30_39"), taginfoObj.getFloat("age40_49"), taginfoObj.getFloat("age50_59"), taginfoObj.getFloat("game"), taginfoObj.getFloat("movie"), taginfoObj.getFloat("music"), taginfoObj.getFloat("art"), taginfoObj.getFloat("politics_news"), taginfoObj.getFloat("financial"), taginfoObj.getFloat("education_training"), taginfoObj.getFloat("health_care"), taginfoObj.getFloat("travel"), taginfoObj.getFloat("automobile"), taginfoObj.getFloat("house_property"), taginfoObj.getFloat("clothing_accessories"), taginfoObj.getFloat("beauty"), taginfoObj.getFloat("IT"), taginfoObj.getFloat("baby_Product"), taginfoObj.getFloat("food_service"), taginfoObj.getFloat("home_furnishing"), taginfoObj.getFloat("sports"), taginfoObj.getFloat("outdoor_activities"), taginfoObj.getFloat("medicine") )}.toDF().registerTempTable("resultTable") val resultDF = sqlContext.sql(s"select '$appkey' AS APPKEY, '$lastdate' AS DATE,SUUID ,MAN,WOMAN,AGE10_19,AGE20_29,AGE30_39 ," + "AGE40_49 ,AGE50_59,GAME,MOVIE,MUSIC,ART,POLITICS_NEWS,FINANCIAL,EDUCATION_TRAINING,HEALTH_CARE,TRAVEL,AUTOMOBILE," + "HOUSE_PROPERTY,CLOTHING_ACCESSORIES,BEAUTY,IT,BABY_PRODUCT ,FOOD_SERVICE ,HOME_FURNISHING ,SPORTS ,OUTDOOR_ACTIVITIES ," + "MEDICINE from resultTable WHERE SUUID IS NOT NULL") resultDF.write.mode(SaveMode.Overwrite).options( Map("table" -> "USER_TAGS", "zkUrl" -> conf.getString("Hbase.url")) ).format("org.apache.phoenix.spark").save() } } def intToString(suuid: Long): String = { suuid.toString() } def userTagInfo(num1: String): String = { var de = new DecimalFormat("0.00") var mannum = de.format(math.random).toFloat var man = mannum var woman = de.format(1 - mannum).toFloat var age10_19num = de.format(math.random * 0.2).toFloat var age20_29num = de.format(math.random * 0.2).toFloat var age30_39num = de.format(math.random * 0.2).toFloat var age40_49num = de.format(math.random * 0.2).toFloat var age10_19 = age10_19num var age20_29 = age20_29num var age30_39 = age30_39num var age40_49 = age40_49num var age50_59 = de.format(1 - age10_19num - age20_29num - age30_39num - age40_49num).toFloat var game = de.format(math.random * 1).toFloat var movie = de.format(math.random * 1).toFloat var music = de.format(math.random * 1).toFloat var art = de.format(math.random * 1).toFloat var politics_news = de.format(math.random * 1).toFloat var financial = de.format(math.random * 1).toFloat var education_training = de.format(math.random * 1).toFloat var health_care = de.format(math.random * 1).toFloat var travel = de.format(math.random * 1).toFloat var automobile = de.format(math.random * 1).toFloat var house_property = de.format(math.random * 1).toFloat var clothing_accessories = de.format(math.random * 1).toFloat var beauty = de.format(math.random * 1).toFloat var IT = de.format(math.random * 1).toFloat var baby_Product = de.format(math.random * 1).toFloat var food_service = de.format(math.random * 1).toFloat var home_furnishing = de.format(math.random * 1).toFloat var sports = de.format(math.random * 1).toFloat var outdoor_activities = de.format(math.random * 1).toFloat var medicine = de.format(math.random * 1).toFloat "{" + "\"man\"" + ":" + man + "," + "\"woman\"" + ":" + woman + "," + "\"age10_19\"" + ":" + age10_19 + "," + "\"age20_29\"" + ":" + age20_29 + "," + "\"age30_39\"" + ":" + age30_39 + "," + "\"age40_49\"" + ":" + age40_49 + "," + "\"age50_59\"" + ":" + age50_59 + "," + "\"game\"" + ":" + game + "," + "\"movie\"" + ":" + movie + "," + "\"music\"" + ":" + music + "," + "\"art\"" + ":" + art + "," + "\"politics_news\"" + ":" + politics_news + "," + "\"financial\"" + ":" + financial + "," + "\"education_training\"" + ":" + education_training + "," + "\"health_care\"" + ":" + health_care + "," + "\"travel\"" + ":" + travel + "," + "\"automobile\"" + ":" + automobile + "," + "\"house_property\"" + ":" + house_property + "," + "\"clothing_accessories\"" + ":" + clothing_accessories + "," + "\"beauty\"" + ":" + beauty + "," + "\"IT\"" + ":" + IT + "," + "\"baby_Product\"" + ":" + baby_Product + "," + "\"food_service\"" + ":" + food_service + "," + "\"home_furnishing\"" + ":" + home_furnishing + "," + "\"sports\"" + ":" + sports + "," + "\"outdoor_activities\"" + ":" + outdoor_activities + "," + "\"medicine\"" + ":" + medicine + "}"; } def loadDataFrame(ctx: SQLContext, appkey: String, startDay: String, endDay: String): DataFrame = { val path = s"$REP_HOME/appstatistic" ctx.read.parquet(path) .filter(s"timestamp is not null and appkey='$appkey' and day>='$startDay' and day<='$endDay'") } }
以上这篇DataFrame:通过SparkSql将scala类转为DataFrame的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
加载全部内容