Spark 1.6 Technical Preview – with HDP 2.3
原文链接:http://zh.hortonworks.com/hadoop-tutorial/apache-spark-1-6-technical-preview-with-hdp-2-3/
本文由chinahadoop.com翻译
关于转载授授权
China Hadoop文章,欢迎个人转发朋友圈,自媒体、媒体、机构转载务必申请授权,后台留言“机构名称+文章标题+转载”,申请过授权的不必再次申请,只要按约定转载即可,但文末需放置China Hadoop二维码。
这个技术概要允许你在YARN上用HDP2.3来评估Apache Spark 1.6。
在YARN下,Hadoop能够支持各种类型的工作负荷。YARN下的Spark成为了在同一硬件资源下运行的另一个工作负荷。
这个技术概要描述了该如何进行:
在YARN上运行Spark以及运行权威的Spark的例子:SparkPi和WORDCOUNT。
在HDP2.3上运行Spark1.6
使用Spark的数据帧API
从Hive中读取或写入数据
为JDBC / ODBC数据库使用SparkSQL Thrift Server
在Spark实例中使用ORC文件
使用SparkR
使用数据组API
使用DataSet API。
当您准备好超完成这些任务时,尽量试一试在Apache Spark中的机器学习的例子。
HDP群集要求:
该技术概要可以安装在任意HDP 2.3.x集群上,无论是多节点集群或一个单节点HDP沙箱。
安装
Spark1.6技术预览版在RPM和DEB格式包中提供。以下说明假定是使用RPM安装包:
1.下载Spark 1.6 RPM库
2. wget -nv http://private-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.3.4.1-10/hdp.repo-O /etc/yum.repos.d/HDP-TP.repo
3.
4. For installing on Ubuntuuse the following
http://private-repo-1.hortonworks.com/HDP/ubuntu12/2.x/updates/2.3.4.1-10/hdp.list
5.安装Spark包
下载Spark 1.6 RPM(如果需要的话,还下载pySpark),并在HDP2.3群集上设置它。
yum install spark_2_3_4_1_10-master -y
如果你想使用pySpark,那就按照以下方法安装它,并确保Python安装在所有节点上。
yum install spark_2_3_4_1_10-python -y
这个RPM安装程序还会下载Hadoop相关性的代码。这会将Spark创建为一个操作系统用户,并且会在HDFS中创建一个用户/Spark目录。
6.设置JAVA_HOME和SPARK_HOME
确保在启动Spark shell或者thrift server之前设置好JAVA_HOME。
export JAVA_HOME=<path to JDK 1.8>
在Spark二进制文件解压缩的目录(/usr/hdp/2.3.4.1-10/spark)下安装Spark。并将SPARK_HOME变量到这个目录。
export SPARK_HOME=/usr/hdp/2.3.4.1-10/spark/
7.在Spark conf目录下创建hive-site:
作为root用户,创建SPARK_HOME/conf/hive-site.xml文件。编辑文件只包含以下配置设置:
-
<configuration>
9. <property>
10. <name>hive.metastore.uris</name>
11. <!–Make sure that <value> points to the Hive Metastore URI inyour cluster –>
12. <value>thrift://sandbox.hortonworks.com:9083</value>
13. <description>URI for client to contact metastore server</description>
14. </property>
</configuration>
运行SparkPI的实例
为了在Spark中测试计算密集型任务,通过在一个圆中“投掷飞镖”例子来计算圆周率来推算Pi-在一个方形区域((0,0)到(1,1))中产生点并计算多少点落入方形内的单位圆。结果近似于pi / 4,这是用于估计Pi。
1.改变到你的Spark目录并切换到Spark OS用户
-
cd $SPARK_HOME
su spark
3.在yarn客户端中运行Spark Pi实例
./bin/spark-submit –class org.apache.spark.examples.SparkPi–master yarn-client –num-executors 3–driver-memory 512m–executor-memory 512m–executor-cores 1 lib/spark-examples*.jar 10
注:Pi工作应该非常顺利的完成。它应该产生类似于以下的输出。圆周率接近输出端的价值。
15/12/1613:21:05 INFO DAGScheduler:Job0 finished: reduce at SparkPi.scala:36, took 4.313782 s
Piis roughly 3.139492
15/12/1613:21:05 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
在Spark下使用WordCount
首先,为Spark WordCount实例复制一个输入文件
把你想在WORDCOUNT中使用的输入文件上传到HDFS中。您可以使用任何文本文件作为输入。在下面的例子中,将以log4j.properties为例来介绍。
作为spark用户:
hadoop fs -copyFromLocal /etc/hadoop/conf/log4j.properties /tmp/data
运行wordcount
1.运行spark shell:
./bin/spark-shell –master yarn-client –driver-memory 512m–executor-memory 512m
输出类似于下面所显示的内容,然后是“scala>” REPLprompt:
Welcome to
____ __
/ __/__ ___ _____//__
_\\/ _ \/ _ `/ __/ '_/
/___/.__/\_,_/_/ /_/\_\ version 1.6.0
/_/
Using Scalaversion 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_60)
Type inexpressions to have them evaluated.
Type :help formore information.
15/12/1613:21:57 INFO SparkContext: Running Spark version 1.6.0
…
scala>
2. At the Scala REPL prompt, enter:
3. val file = sc.textFile("/tmp/data")
4. val counts = file.flatMap(line => line.split(" ")).map(word =>(word,1)).reduceByKey(_ + _)
counts.saveAsTextFile("/tmp/wordcount")
5.输出回顾
在scala shell中观测wordcount输出
scala > counts.count()
打印wordcount工作中的全部输出
scala > counts.toArray().foreach(println)
用HDFS来监测wordcount输出
1.退出scala shell
scala >exit
2.观测wordcount结果
hadoop fs -ls /tmp/wordcount
它将展示如下类似的输出
/tmp/wordcount/_SUCCESS
/tmp/wordcount/part-00000
/tmp/wordcount/part-00001
3. 使用HDFS cat命令来看WORDCOUNT输出。例如:
hadoop fs -cat /tmp/wordcount/part-00000
使用Spark DataFrame API
DataFrame API为数据的访问提供了便利,因为它看起来像一个概念表。
数据框API提供了对数据的访问更加容易,因为它看起来像概念的表。熟悉Python Pandas库或R data frames的开发人员也将熟悉Spark DataFrames。
1. 作为spark用户,上传people.text和people.json文件到HDFS:
2. cd $SPARK_HOME
3. su spark
4. hdfs dfs -copyFromLocal examples/src/main/resources/people.txt people.txt
hdfs dfs -copyFromLocal examples/src/main/resources/people.json people.json
5.作为spark用户,启动Spark Shell
-
cd $SPARK_HOME
7. su spark
./bin/spark-shell –num-executors 2–executor-memory 512m–master yarn-client
8.在spark shell中输入以下代码:
scala>val df = sqlContext.read.format("json").load("people.json")
9.使用df.show来显示DataFrame目录
-
scala>df.show
11.15/12/1613:28:15 INFO YarnScheduler:RemovedTaskSet2.0, whose tasks have all completed,from pool
12.+—-+——-+
13.| age| name|
14.+—-+——-+
15.|null|Michael|
16.| 30| Andy|
17.| 19|Justin|
+—-+——-+
额外的DataFrameAPI实例
scala>import org.apache.spark.sql.functions._
// Select all,and increment age by 1
scala>df.select(df("name"), df("age")+1).show()
// Select peopleolder than 21
scala>df.filter(df("age")>21).show()
// Count peopleby age
df.groupBy("age").count().show()
指定模式编程
import org.apache.spark.sql._
val sqlContext =new org.apache.spark.sql.SQLContext(sc)
val people = sc.textFile("people.txt")
val schemaString ="nameage"
import org.apache.spark.sql.types.{StructType,StructField,StringType}
val schema =StructType(schemaString.split(" ").map(fieldName =>StructField(fieldName,StringType,true)))
val rowRDD = people.map(_.split(",")).map(p =>Row(p(0), p(1).trim))
valpeopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
peopleDataFrame.registerTempTable("people")
val results = sqlContext.sql("SELECT name FROM people")
results.map(t =>"Name: "+ t(0)).collect().foreach(println)
这将产生类似如下输出
15/12/1613:29:19 INFO DAGScheduler:Job9 finished: collect at :39, took 0.251161 s
15/12/1613:29:19 INFO YarnHistoryService:About to POST entity application_1450213405513_0012 with10 events to timeline service http://green3:8188/ws/v1/timeline/
Name:Michael
Name:Andy
Name:Justin
运行Hive实例
下面的例子就是在Hive目录下读取和写入HDFS
1.在yarn群中启动spark shell
2. su hdfs
./bin/spark-shell –num-executors 2–executor-memory 512m–master yarn-client
3.创建Hive环境
scala> val hiveContext =new org.apache.spark.sql.hive.HiveContext(sc)
你将看到类似如下的输出
…
hiveContext: org.apache.spark.sql.hive.HiveContext= org.apache.spark.sql.hive.HiveContext@7d9b2e8d
4.创建一个Hive表格
scala> hiveContext.sql("CREATE TABLE IF NOT EXISTS TestTable (key INT, value STRING)")
你将看到类似如下的输出
…
15/12/1613:36:12 INFO PerfLogger:</PERFLOG method=Driver.run
start=1450290971011end=1450290972561 duration=1550
from=org.apache.hadoop.hive.ql.Driver>
res8: org.apache.spark.sql.DataFrame=[result:string]
5.把KV值数据导入表格中
scala> hiveContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTOTABLE TestTable")
6.调用Hive collect_list UDF:
scala> hiveContext.sql("from TestTable SELECT key, collect_list(value) group by key order bykey").collect.foreach(println)
读写ORC文件
Hortonworks在社区中的工作是为Spark带来全面的ORC支持。最近我们在博客中讲述在spark中使用ORC,可以在博客中看到所有ORC的例子。
Accessing the Spark SQL Thrift Server for JDBC/ODBC Access
在这个技术预览中,Spark SQL的Thrift Server为JDBC访问Spark SQL提供了权限。
1.作为root用户,建立一个日志目录,并让spark用户成为所有者
2. mkdir logs
chown spark:hadoop logs
3. 开始Thrift Server
从JAVA_HOME中开始Spark SQL Thrift Server。指定Thrift JDBC Server的端口值 (默认值是10015)
-
su spark
./sbin/start-thriftserver.sh –master yarn-client –executor-memory 512m–hiveconf hive.server2.thrift.port=10015
5.通过Beeline连接到Thrift Server
从SPARK_HOME中启动Beeline
-
su spark
7. cd $SPARK_HOME
./bin/beeline
8.连接到Thrift Server并发布SQL指令
在Beeline中指定如下指令,把<hostname>替换为你启动的Spark Thrift Server的主机名:
beeline>!connect jdbc:hive2://<hostname>:10015
提示:
这个例子没有启用安全功能,所以必须设定用户名和密码。
在Sandbox环境中或许需要等待几秒钟,在Sandbox环境中等待10-15秒后,尝试“show tables”命令。
15/12/1613:43:02 INFO HiveConnection:Willtry to open client
transport with JDBC Uri: jdbc:hive2://green4:10015
Connected to:Spark SQL (version 1.6.0)
Driver:SparkProjectCore(version 1.6.0.2.3.4.1-4)
Transaction isolation: TRANSACTION_REPEATABLE_READ
1: jdbc:hive2://green4:10015> show tables;
+————–+————–+–+
| tableName | isTemporary |
+————–+————–+–+
| testtable |false |
| testtable16 |false |
+————–+————–+–+
2 rows selected (0.893 seconds)
1: jdbc:hive2://green4:10015>
9.停止Thrift Server:
./sbin/stop-thriftserver.sh
运行SparkR
在你运行SparkR之前,确保R被安装在所有节点上。关于怎么在CentOS上安装R的信息可以看http://www.jason-french.com/blog/2013/03/11/installing-r-in-linux/。确保设置好JAVA_HOME。
1.运行SparkR
2. su spark
3. cd $SPARK_HOME
./bin/sparkR
这将会显示如下类似的输出
Welcome to
____ __
/ __/__ ___ _____//__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.0
/_/
Spark context isavailable as sc, SQL context is available as sqlContext
>
4.在你的R提示符中,创造一个DataFrame并把前几行列出来。
-
sqlContext <- sparkRSQL.init(sc)
6. df <- createDataFrame(sqlContext, faithful)
head(df)
这将显示如下的类似输出:
…
eruptions waiting
1 3.600 79
2 1.800 54
3 3.333 74
4 2.283 62
5 4.533 85
6 2.883 55
7.阅读“people”DataFrame:
-
people <- read.df(sqlContext,"people.json","json")
head(people)
这将显示如下的类似输出:
age name
1 NA Michael
2 30 Andy
3 19 Justin
想要了解更多SparkR的例子可以访问 https://spark.apache.org/docs/latest/sparkr.html.
退出R
> quit()
使用DataSet API
Spark DatasetAPI把现有最好的RDD和Data Frames组合在一起,并且能够在现有的JVM 类型上直接运行安全类型和用户功能。
1.作为spark用户,启动spark shell
2. cd $SPARK_HOME
3. su spark
./bin/spark-shell –num-executors 2–executor-memory 512m–master yarn-client
4.在spark shell中,输入如下代码;
-
scala>
6. val ds =Seq(1,2,3).toDS()
7. ds.map(_ +1).collect()// Returns:Array(2, 3, 4)
8.
9. // Encoders arealso created for case classes.
10.caseclassPerson(name:String, age:Long)
11.val ds =Seq(Person("Andy",32)).toDS()
12.
13.// DataFrames canbe converted to a Dataset by providing a class. Mapping will be done by name.
14.val path ="people.json"
val people = sqlContext.read.json(path).as[Person]
运行机器学习的spark应用
为了优化MLlib性能,就需要安装NETLIB-Java本机库。如果本机库在运行时不可用,您会看到一条警告消息并且一个纯粹的JVM安装启用将被替代使用。
如果想在Python中使用MLlib,你需要安装NumPy1.4版本或者更新版本。
想了解更多Spark ML的例子可以访问 http://spark.apache.org/docs/latest/mllib-guide.html
China Hadoop大数据研究网:
http://chinahadoop.com/
中国hadoop技术峰会2016北京站报名参会网站:
http://www.chinahadoop.com/signup.php(也可点击阅读原文来了解详情)
门票价格表
微信名:
HadoopSummit
微信ID:
hadoopinchina
中国Hadoop技术峰会是亚太地区举办最早、规模最大、影响力最广阔的大数据盛会。
Chinahadoop.com是China Hadoop Summit的内容网站。
HadoopSummit是Chinahadoop.com的微信发布平台。