Spark On K8s
在K8s上执行Spark程序
准备
- 通过spark提供的zip包内的
bin/docker-image-tool.sh
命令打包。详情请见命令帮助 - 需要将部分第三方jar包包含在镜像内,包括如下jar包
commons-codec-1.10.jar commons-logging-1.2.jar gson-2.2.4.jar httpclient-4.4.1.jar httpcore-4.4.1.jar jdom-1.1.jar okhttp-3.9.1.jar okio-1.13.0.jar
CLI运行案例
当相关的包准备好以后,放到镜像仓库,这时就可以执行该包内自带的示例应用了。
运行Java案例
bin/spark-submit \
--master k8s://https://172.16.0.51:6443 \
--deploy-mode cluster \
--name spark-pi-test \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=1 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.container.image=registry-vpc.cn-hangzhou.aliyuncs.com/mc-docker/spark:v2.4.3 \
--conf spark.kubernetes.container.image.pullSecrets=aliyun-key \
--conf spark.kubernetes.driver.pod.name=spark-pi-test-driver \
--conf spark.kubernetes.driver.label.app=spark-pi-test \
--conf spark.kubernetes.executor.label.app=spark-pi-test \
local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar 1000
运行Python案例
bin/spark-submit \
--master k8s://https://172.16.0.51:6443 \
--deploy-mode cluster \
--name spark-pi-test \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.container.image=registry-vpc.cn-hangzhou.aliyuncs.com/mc-docker/spark-py:v2.4.3 \
--conf spark.kubernetes.container.image.pullSecrets=aliyun-key \
--conf spark.kubernetes.driver.pod.name=spark-pi-test-driver \
--conf spark.kubernetes.driver.label.app=spark-pi-test \
--conf spark.kubernetes.executor.label.app=spark-pi-test \
local:///opt/spark/examples/src/main/python/pi.py 1000
代码案例
Java
OSS案例
public static void main(String[] args) {
List<String> jarList = new ArrayList<>();
jarList.add("emr-core-1.6.0.jar");
jarList.add("aliyun-sdk-oss-3.0.0.jar");
jarList.add("commons-codec-1.10.jar");
jarList.add("jdom-1.1.jar");
jarList.add("commons-logging-1.2.jar");
jarList.add("httpclient-4.4.1.jar");
jarList.add("httpcore-4.4.1.jar");
String ossDepPath = jarList.stream()
.map(s -> "/opt/spark/extraJars/jars/2.4.3/" + s)
.collect(Collectors.joining(","));
try (SparkSession spark = SparkSession.builder()
.config("spark.hadoop.fs.oss.core.dependency.path", ossDepPath)
.config("spark.hadoop.fs.oss.accessKeyId", "xxx")
.config("spark.hadoop.fs.oss.accessKeySecret", "xxx")
.config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-internal.aliyuncs.com")
.config("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
.getOrCreate()) {
RDD<String> rdd = sc.textFile("oss://mc-spark-test/nginx/access.log", 10);
LOG.info("读取数据{}行。", rdd.count());
LOG.info("完成操作了。");
} catch (Exception e) {
LOG.error("操作出错了。", e);
}
}
Elasticsearch案例
public static void main(String[] args) {
String esNode = "internal-es.mucang.me";
String esPort = "80";
SparkSession spark = SparkSession.builder()
.config("es.resource.write", "spark-test-es2/doc")
.config("es.index.auto.create", "true")
.config("es.nodes", esNode)
.config("es.port", esPort)
.config("es.nodes.wan.only", true)
.config("es.net.http.auth.user", "xxx")
.config("es.net.http.auth.pass", "xxx")
.getOrCreate();
try {
int size = 10000;
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
List<Map<String, Object>> dataList = new ArrayList<>();
for (int i = 0; i < size; i++) {
Map<String, Object> data = new HashMap<>();
data.put("id", i);
data.put("name", "name" + i);
data.put("age", RandomUtils.nextInt(40));
dataList.add(data);
}
JavaRDD<Map<String, Object>> rdd = jsc.parallelize(dataList);
JavaEsSpark.saveToEs(rdd, "spark-test-es2/doc");
LOG.info("执行成功,写入{}条数据到ES:[{}:{}]。", size, esNode, esPort);
} catch (Exception e) {
LOG.error("执行错误:", e);
} finally {
spark.stop();
}
}
Maven配置
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>org.elasticsearch:*</include>
<include>com.aliyun.emr:emr-core</include>
<include>com.aliyun.oss:aliyun-sdk-oss</include>
</includes>
</artifactSet>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.company.project.Main</mainClass>
</transformer>
</transformers>
<minimizeJar>false</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
Python
OSS案例
from pyspark.sql import SparkSession
def main():
global spark_session
jar_list = ["emr-core-1.6.0.jar", "aliyun-sdk-oss-3.0.0.jar", "commons-codec-1.10.jar", "jdom-1.1.jar",
"commons-logging-1.2.jar", "httpclient-4.4.1.jar", "httpcore-4.4.1.jar"]
jar_list = list(map(lambda v: "/opt/spark/extraJars/jars/2.4.3/" + v, jar_list))
oss_dep_path = ",".join(jar_list)
try:
spark_session = SparkSession.builder \
.config("spark.hadoop.fs.oss.core.dependency.path", oss_dep_path) \
.config("spark.hadoop.fs.oss.accessKeyId", "") \
.config("spark.hadoop.fs.oss.accessKeySecret", "") \
.config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-internal.aliyuncs.com") \
.config("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem") \
.getOrCreate()
sc = spark_session.sparkContext
rdd = sc.textFile("oss://mc-spark-test/nginx/access.log", 10)
print("数据%d行" % rdd.count())
print("Hello Spark")
except Exception as e:
print("错误:%s" % e)
finally:
spark_session.stop()
if __name__ == '__main__':
main()