在K8s上执行Spark程序

准备

  • 通过spark提供的zip包内的bin/docker-image-tool.sh命令打包。详情请见命令帮助
  • 需要将部分第三方jar包包含在镜像内,包括如下jar包
    commons-codec-1.10.jar
    commons-logging-1.2.jar
    gson-2.2.4.jar
    httpclient-4.4.1.jar
    httpcore-4.4.1.jar
    jdom-1.1.jar
    okhttp-3.9.1.jar
    okio-1.13.0.jar
    

CLI运行案例

当相关的包准备好以后,放到镜像仓库,这时就可以执行该包内自带的示例应用了。

运行Java案例

bin/spark-submit \
 --master k8s://https://172.16.0.51:6443 \
 --deploy-mode cluster \
 --name spark-pi-test \
 --class org.apache.spark.examples.SparkPi \
 --conf spark.executor.instances=1 \
 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
 --conf spark.kubernetes.container.image=registry-vpc.cn-hangzhou.aliyuncs.com/mc-docker/spark:v2.4.3 \
 --conf spark.kubernetes.container.image.pullSecrets=aliyun-key \
 --conf spark.kubernetes.driver.pod.name=spark-pi-test-driver \
 --conf spark.kubernetes.driver.label.app=spark-pi-test \
 --conf spark.kubernetes.executor.label.app=spark-pi-test \
 local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar 1000

运行Python案例

bin/spark-submit \
 --master k8s://https://172.16.0.51:6443 \
 --deploy-mode cluster \
 --name spark-pi-test \
 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
 --conf spark.kubernetes.container.image=registry-vpc.cn-hangzhou.aliyuncs.com/mc-docker/spark-py:v2.4.3 \
 --conf spark.kubernetes.container.image.pullSecrets=aliyun-key \
 --conf spark.kubernetes.driver.pod.name=spark-pi-test-driver \
 --conf spark.kubernetes.driver.label.app=spark-pi-test \
 --conf spark.kubernetes.executor.label.app=spark-pi-test \
 local:///opt/spark/examples/src/main/python/pi.py 1000

代码案例

Java

OSS案例

public static void main(String[] args) {
        List<String> jarList = new ArrayList<>();
        jarList.add("emr-core-1.6.0.jar");
        jarList.add("aliyun-sdk-oss-3.0.0.jar");
        jarList.add("commons-codec-1.10.jar");
        jarList.add("jdom-1.1.jar");
        jarList.add("commons-logging-1.2.jar");
        jarList.add("httpclient-4.4.1.jar");
        jarList.add("httpcore-4.4.1.jar");

        String ossDepPath = jarList.stream()
            .map(s -> "/opt/spark/extraJars/jars/2.4.3/" + s)
            .collect(Collectors.joining(","));

        try (SparkSession spark = SparkSession.builder()
                .config("spark.hadoop.fs.oss.core.dependency.path", ossDepPath)
                .config("spark.hadoop.fs.oss.accessKeyId", "xxx")
                .config("spark.hadoop.fs.oss.accessKeySecret", "xxx")
                .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-internal.aliyuncs.com")
                .config("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
                .getOrCreate()) {
                
            RDD<String> rdd = sc.textFile("oss://mc-spark-test/nginx/access.log", 10);
        LOG.info("读取数据{}行。", rdd.count());

            LOG.info("完成操作了。");
        } catch (Exception e) {
            LOG.error("操作出错了。", e);
        }
    }

Elasticsearch案例

public static void main(String[] args) {
        String esNode = "internal-es.mucang.me";
        String esPort = "80";
        SparkSession spark = SparkSession.builder()
                .config("es.resource.write", "spark-test-es2/doc")
                .config("es.index.auto.create", "true")
                .config("es.nodes", esNode)
                .config("es.port", esPort)
                .config("es.nodes.wan.only", true)
                .config("es.net.http.auth.user", "xxx")
                .config("es.net.http.auth.pass", "xxx")
                .getOrCreate();

        try {
            int size = 10000;
            JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
            List<Map<String, Object>> dataList = new ArrayList<>();
            for (int i = 0; i < size; i++) {
                Map<String, Object> data = new HashMap<>();
                data.put("id", i);
                data.put("name", "name" + i);
                data.put("age", RandomUtils.nextInt(40));
                dataList.add(data);
            }

            JavaRDD<Map<String, Object>> rdd = jsc.parallelize(dataList);
            JavaEsSpark.saveToEs(rdd, "spark-test-es2/doc");
            
            LOG.info("执行成功,写入{}条数据到ES:[{}:{}]。", size, esNode, esPort);
        } catch (Exception e) {
            LOG.error("执行错误:", e);
        } finally {
            spark.stop();
        }

    }

Maven配置

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>2.3</version>
    <executions>
        <!-- Run shade goal on package phase -->
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <artifactSet>
                    <includes>
                        <include>org.elasticsearch:*</include>
                        <include>com.aliyun.emr:emr-core</include>
                        <include>com.aliyun.oss:aliyun-sdk-oss</include>
                    </includes>
                </artifactSet>
                <transformers>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass>org.company.project.Main</mainClass>
                    </transformer>
                </transformers>
                <minimizeJar>false</minimizeJar>
            </configuration>
        </execution>
    </executions>
</plugin>

Python

OSS案例

from pyspark.sql import SparkSession


def main():
    global spark_session
    jar_list = ["emr-core-1.6.0.jar", "aliyun-sdk-oss-3.0.0.jar", "commons-codec-1.10.jar", "jdom-1.1.jar",
                "commons-logging-1.2.jar", "httpclient-4.4.1.jar", "httpcore-4.4.1.jar"]
    jar_list = list(map(lambda v: "/opt/spark/extraJars/jars/2.4.3/" + v, jar_list))
    oss_dep_path = ",".join(jar_list)

    try:
        spark_session = SparkSession.builder \
            .config("spark.hadoop.fs.oss.core.dependency.path", oss_dep_path) \
            .config("spark.hadoop.fs.oss.accessKeyId", "") \
            .config("spark.hadoop.fs.oss.accessKeySecret", "") \
            .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-internal.aliyuncs.com") \
            .config("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem") \
            .getOrCreate()
        sc = spark_session.sparkContext
        rdd = sc.textFile("oss://mc-spark-test/nginx/access.log", 10)
        print("数据%d行" % rdd.count())
        print("Hello Spark")
    except Exception as e:
        print("错误:%s" % e)
    finally:
        spark_session.stop()


if __name__ == '__main__':
    main()