加入收藏 | 设为首页 | 会员中心 | 我要投稿 鹰潭站长网 (https://www.0701zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 运营中心 > 网站设计 > 教程 > 正文

基于Spark的数据分析实践

发布时间:2019-06-19 21:37:30 所属栏目:教程 来源:EAWorld
导读:引言: Spark是在借鉴了MapReduce之上发展而来的,继承了其分布式并行计算的优点并改进了MapReduce明显的缺陷。Spark主要包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX等组件。 本文主要分析了 Spark RDD 以及 RDD 作为开发的不足之处,介

TextFile DataFrame

  1. import.org.apache.spark.sql._ 
  2. //定义数据的列名称和类型 
  3. valdt=StructType(List(id:String,name:String,gender:String,age:Int)) 
  4. ​ 
  5. //导入user_info.csv文件并指定分隔符 
  6. vallines = sc.textFile("/path/user_info.csv").map(_.split(",")) 
  7. ​ 
  8. //将表结构和数据关联起来,把读入的数据user.csv映射成行,构成数据集 
  9. valrowRDD = lines.map(x=>Row(x(0),x(1),x(2),x(3).toInt)) 
  10. ​ 
  11. //通过SparkSession.createDataFrame()创建表,并且数据表表头 
  12. val df= spark.createDataFrame(rowRDD, dt) 

读取规则数据文件作为DataFrame

  1. SparkSession.Builder builder = SparkSession.builder() 
  2. Builder.setMaster("local").setAppName("TestSparkSQLApp") 
  3. SparkSession spark = builder.getOrCreate(); 
  4. SQLContext sqlContext = spark.sqlContext(); 
  5. ​ 
  6. # 读取 JSON 数据,path 可为文件或者目录 
  7. valdf=sqlContext.read().json(path); 
  8. ​ 
  9. # 读取 HadoopParquet 文件 
  10. vardf=sqlContext.read().parquet(path); 
  11. ​ 
  12. # 读取 HadoopORC 文件 
  13. vardf=sqlContext.read().orc(path); 

JSON 文件为每行一个 JSON 对象的文件类型,行尾无须逗号。文件头也无须[]指定为数组;SparkSQL 读取是只是按照每行一条 JSON Record序列化;

Parquet文件

  1. Configurationconfig = new Configuration(); 
  2. ParquetFileReaderreader = ParquetFileReader.open( 
  3.  HadoopInputFile.fromPath(new Path("hdfs:///path/file.parquet"),conf)); 
  4. Map<String, String>schema = reader.getFileMetaData().getKeyValueMetaData(); 
  5. String allFields= schema.get("org.apache.spark.sql.parquet.row.metadata"); 

allFiedls 的值就是各字段的名称和具体的类型,整体是一个json格式进行展示。

读取 Hive 表作为 DataFrame

Spark2 API 推荐通过 SparkSession.Builder 的 Builder 模式创建 SparkContext。 Builder.getOrCreate() 用于创建 SparkSession,SparkSession 是 SparkContext 的封装。

在Spark1.6中有两个核心组件SQLcontext和HiveContext。SQLContext 用于处理在 SparkSQL 中动态注册的表,HiveContext 用于处理 Hive 中的表。

从Spark2.0以上的版本开始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext。SQLContext.sql 即可执行 Hive 中的表,也可执行内部注册的表;

在需要执行 Hive 表时,只需要在 SparkSession.Builder 中开启 Hive 支持即可(enableHiveSupport())。

  1. SparkSession.Builder builder = SparkSession.builder().enableHiveSupport(); 
  2. SparkSession spark = builder.getOrCreate(); 
  3. SQLContext sqlContext = spark.sqlContext(); 

// db 指 Hive 库中的数据库名,如果不写默认为 default

// tableName 指 hive 库的数据表名

  1. sqlContext.sql(“select * from db.tableName”) 

SparkSQL ThriftServer

//首先打开 Hive 的 Metastore服务

  1. hive$bin/hive –-service metastore –p 8093 

//把 Spark 的相关 jar 上传到hadoophdfs指定目录,用于指定sparkonyarn的依赖 jar

  1. spark$hadoop fs –put jars/*.jar /lib/spark2 

// 启动 spark thriftserver 服务

  1. spark$ sbin/start-thriftserver.sh --master yarn-client --driver-memory 1G --conf  
  2. spark.yarn.jars=hdfs:///lib/spark2/*.jar 

当hdfs 上传了spark 依赖 jar 时,通过spark.yarn.jars 可看到日志 spark 无须每个job 都上传jar,可节省启动时间

  1. 19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.0.5.jar 
  2. 19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.1.7.3.jar 

(编辑:鹰潭站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读