spark初探

这边的项目要开始做spark的优化,我得熟悉一下spark的使用先。
先在github上找中文文档做参考(原版读的太麻烦->https://github.com/apachecn/spark-doc-zh

spark是一个集群计算系统。很多时候和hadoop一起使用。它是一个数据处理工具,但没有提供分布式数据存储的功能。hadoop有存储功能,所以它们经常搭配使用。当然,我们学过的mapreduce也是Hadoop的处理能力,hadoop不必要非拉上spark。
但是spark的速度比mapreduce快很多。

它的主要编程接口是一个抽象的dataset,以前的版本是rdd,但文档并不推荐。

入门的时候先用shell学习,shell 可以用Scala python 和java来写。
先建立一个dataset,比如文本文件。
对dataset来说有两种操作,一种action是对它计算并返回值,一种是transform创建新数据集。
在我的理解里action就是很多的api,给出返回值:
http://spark.apachecn.org/#/api/scala/index.html?id=org.apache.spark.sql.dataset
同时,transform可以用filter操作完成。

这两种操作可以同时使用,类似于管道

textFile.filter(line => line.contains(“Spark”)).count() // How many lines contain “Spark”?
它也可以用mapreduce做更复杂的操作。mapreduce课上学过就不过多赘述了。我的理解大概是map对零散的数据提取特征,reduce对提取后的特征进行归纳。中间搭桥的特征就比如说字典的key value那样。

官方文档的举例:

Spark 可以很容易实现 MapReduce:
scala> val wordCounts = textFile.flatMap(line => line.split(“ “)).groupByKey(identity).count()
scala> wordCounts.collect()
在这里, 我们调用了 flatMap 以 transform 一个 lines 的 Dataset 为一个 words 的 Dataset, 然后结合 groupByKey 和 count 来计算文件中每个单词的 counts 作为一个 (String, Long) 的 Dataset pairs。

这个简单的wordcounts就可以拿来做分析了,但学习不能半途而废,我先把入门看完。

用api进行实验只是测试用,大部分独立的应用程序不能这么做。所以需要学习怎么做独立的应用。文档给了一个很好的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;

public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
Dataset<String> logData = spark.read.textFile(logFile).cache();

long numAs = logData.filter(s -> s.contains("a")).count();
long numBs = logData.filter(s -> s.contains("b")).count();

System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

spark.stop();
}
}

这个程序计算Spark README文档中包含字母’a’和字母’b’的行数。

这里需要用到maven。
只需要定义一个pom.xml,然后把源码放到默认的目录,Maven帮我们处理其他事
使用Maven可以进行项目高度自动化构建,依赖管理(这是使用Maven最大的好处),仓库管理。
简称为mvn,到官网安装即可。注意要配置环境变量。
参见:https://www.cnblogs.com/freeweb/p/5241013.html

使用Maven来编译成一个jar应用程序。
在这里我花了接近半天的时间,因为这里确实有很多坑。这里先保存一下一个能用的spark的依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
/* pom.xml*/
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>cn.com</groupId>
<artifactId>test</artifactId>
<version>1.0-SNAPSHOT</version>


<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.11.8</scala.version>
<spark.version>2.1.0</spark.version>
<hadoop.version>2.6.5</hadoop.version>
<jackson.version>2.6.2</jackson.version>
<encoding>UTF-8</encoding>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.2.2</version>
</dependency>


<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>

<!-- json -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20090211</version>
</dependency>
<!--jackson json -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.1.41</version>
</dependency>
</dependencies>

<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

接下来踩的很多坑都和版本不适配有关。尽量还是用ide,比如idea这样的工具进行操作,会回避掉一些坑。
idea有学生优惠,可以免费使用。在linux上下好之后,要进行和maven的联动,这方面有很多博客内容都写了,我就不赘述了,但是一定要记得改仓库地点,conf里的setting文件地址,最好把镜像改成阿里云。

踩过无数个坑(只会在第一次使用时踩的坑),就可以用maven得到一个jar文件。在该目录下运行spark的submit脚本,就可以执行该jar文件。
maven打包之前最好先clean,然后再package。

YOUR_SPARK_HOME/bin/spark-submit \ –class “SimpleApp” \ –master local[4] \ target/simple-project-1.0.jar

注意,这里还有一个坑
这样出来的jar文件是没有办法用java -jar的方式运行的,因为它没有指定主类,需要修改pom文件。
如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
...
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>**fully.qualified.MainClass**</mainClass>
</manifest>
</archive>
</configuration>
...
</plugin>
</plugins>
</build>

这里是在bulid里面的plugin中插入中的内容,注意mainclass中fully.qualified.MainClass
要注意修改为mainclass的class路径,minifest.mf中有。