【大数据】- Spark技术概览

Scala

Scala教程

  • 函数式风格编写代码
  • 数据类型都是对象 对于任何对象,如果在其后面使用(),都将调用该对象的 apply 方法
  • List 右操作符
  • Turple
  • 函数式编程思想 一个简单的原则,如果代码中含有 var 类型的变量,这段代码就是传统的指令式编程,如果代码只有 val 变量,这段代码就很有可能是函数式代码,因此学会函数式编程关键是不使用 vars 来编写代码。

  • 面向对象方法 类的方法以 def 定义开始,要注意的 Scala 的方法的参数都是 val 类型,而不是 var 类型,因此在函数体内不可以修改参数的值

  • object singlon static method
  • 伴生对象
  • scala操作基本数据类型的简写方式,实际是调用该对象的方法,也即是操作符语法 中缀运算符,前缀运算符,后缀运算符
  • 类定义的主构造函数
  • 辅助构造函数 在 Scala 中也只有主构造函数才能调用基类的构造函数,这种限制有它的优点,使得 Scala 构造函数更加简洁和提高一致性。

  • 隐式类型转换

  • 变量没有返回值
  • for 迭代模式

  • 异常捕获 通常情况下,finally 块用来做些清理工作,而不应该产生结果,但如果在 finally 块中使用 return 来返回某个值,这个值将覆盖 try-catch 产生的结果

Scala 安装

export SCALA_HOME=/usr/local/scala-2.11.4
export PATH=$SCALA_HOME/bin:$PATH

执行如下命令是否安装成功

source /etc/profile
scala -version

Spark 原理解析

Spark 安装

本机模式

本地模式出现如下问题:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/04/07 22:33:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/04/07 22:33:38 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
17/04/07 22:33:38 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
17/04/07 22:33:38 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
17/04/07 22:33:38 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
17/04/07 22:33:38 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
17/04/07 22:33:38 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
17/04/07 22:33:38 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
17/04/07 22:33:38 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
17/04/07 22:33:38 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
17/04/07 22:33:38 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
17/04/07 22:33:38 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
17/04/07 22:33:38 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
17/04/07 22:33:38 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
17/04/07 22:33:38 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
17/04/07 22:33:38 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
17/04/07 22:33:38 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.
17/04/07 22:33:38 ERROR SparkContext: Error initializing SparkContext.
java.net.BindException: Cannot assign requested address: Service 'sparkDriver' failed after 16 retries (starting from 0)! Consider explicitly setting the appropriate port for the service 'sparkDriver' (for example spark.ui.port for SparkUI) to an available port or increasing spark.port.maxRetries.
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Net.java:433)
    at sun.nio.ch.Net.bind(Net.java:425)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
    at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:127)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:501)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1218)
    at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:506)
    at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:491)
    at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:965)
    at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:210)
    at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:353)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:455)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:745)
java.net.BindException: Cannot assign requested address: Service 'sparkDriver' failed after 16 retries (starting from 0)! Consider explicitly setting the appropriate port for the service 'sparkDriver' (for example spark.ui.port for SparkUI) to an available port or increasing spark.port.maxRetries.
  at sun.nio.ch.Net.bind0(Native Method)
  at sun.nio.ch.Net.bind(Net.java:433)
  at sun.nio.ch.Net.bind(Net.java:425)
  at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
  at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:127)
  at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:501)
  at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1218)
  at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:506)
  at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:491)
  at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:965)
  at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:210)
  at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:353)
  at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408)
  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:455)
  at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
  at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
  at java.lang.Thread.run(Thread.java:745)
<console>:14: error: not found: value spark
       import spark.implicits._
              ^
<console>:14: error: not found: value spark
       import spark.sql
              ^
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.

scala>
  • 解决办法

/etc/hosts 配置的主机ip地址和域名不匹配

执行测试用例

Spark单机模式

Hadoop安装

Spark安装

  • 配置spark-env.sh
export JAVA_HOME=/opt/softwares/java/jdk1.8.0_131
export HADOOP_HOME=/opt/app/hadoop-2.6.0
export HADOOP_CONF_DIR=/opt/app/hadoop-2.6.0/etc/hadoop
export SCALA_HOME=/opt/softwares/scala/scala-2.12.4
export SPARK_HOME=/opt/app/spark-2.3.0-bin-hadoop2.6
export SPARK_MASTER_IP=cenos-hadoop1
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8099
export SPARK_WORKER_CORES=3
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=10G
export SPARK_WORKER_WEBUI_PORT=8081
export SPARK_EXECUTOR_CORES=1
export SPARK_EXECUTOR_MEMORY=1G
#export SPARK_CLASSPATH=/opt/hadoop-lzo/current/hadoop-lzo.jar
#export SPARK_CLASSPATH=$SPARK_CLASSPATH:$CLASSPATH
export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$HADOOP_HOME/lib/native

启动集群

启动主节点
start-master.sh
启动worker
start-slave-sh spark://hostname:port

提交spark程序到集群执行

./bin/spark-submit \
  --class <main-class> \
  --master spark://centos-hadoop1.comsince.com:7077 \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  
  [application-arguments]
/opt/app/spark-2.3.0-bin-hadoop2.6/bin/spark-submit \
--class scala/example/SimpleApp \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 3 \
/opt/app/sbt/scala-app/simple-project_2.11-1.0.jar \

问题说明

  • Spark 与scala版本不一致导致
Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
  at com.comsince.scala.spark.sql.DailyTop3Keyword$.main(DailyTop3Keyword.scala:25)
  at com.comsince.scala.spark.sql.DailyTop3Keyword.main(DailyTop3Keyword.scala)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
  at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
18/03/19 18:15:02 INFO spark.SparkContext: Invoking stop() from shutdown hook
  • 总结 最新稳定组合版本:Scala 2.10.4 + Spark 1.5.1 + Kafka 0.8.2.2 + Hadoop 2.6.1 maven 编译2.10的项目必须使用java 1.7版本不然会报如下的错误
/home/liaojinlong/Share/GitWorkPlace/spark/demo/src/main/java:-1: info: compiling
[INFO] Compiling 23 source files to /home/liaojinlong/Share/GitWorkPlace/spark/demo/target/classes at 1521515866533
[ERROR] error: error while loading CharSequence, class file '/home/liaojinlong/Share/jdk/jdk1.8.0_131/jre/lib/rt.jar(java/lang/CharSequence.class)' is broken
[INFO] (class java.lang.RuntimeException/bad constant pool tag 18 at byte 10)
[ERROR] error: error while loading AnnotatedElement, class file '/home/liaojinlong/Share/jdk/jdk1.8.0_131/jre/lib/rt.jar(java/lang/reflect/AnnotatedElement.class)' is broken
[INFO] (class java.lang.RuntimeException/bad constant pool tag 18 at byte 76)
[ERROR] error: error while loading Comparator, class file '/home/liaojinlong/Share/jdk/jdk1.8.0_131/jre/lib/rt.jar(java/util/Comparator.class)' is broken

SparkSQL

Spark Hive支持

  • 将hive-site拷贝到spark conf目录
  • hive使用mysql作为metastor的数据源,需要将mysql驱动拷贝到spark lib目录
  • spark-env.sh 完整配置如下
export SPARK_CLASSPATH=/opt/app/spark-1.5.1-bin-hadoop2.4/lib/mysql-connector-java-5.1.27-bin.jar
export JAVA_HOME=/opt/softwares/java/jdk1.8.0_131
export HADOOP_HOME=/opt/app/hadoop-2.6.0
export HADOOP_CONF_DIR=/opt/app/hadoop-2.6.0/etc/hadoop
export SCALA_HOME=/opt/softwares/scala/scala-2.12.4
export SPARK_HOME=/opt/app/spark-1.5.1-bin-hadoop2.4
export HIVE_CONF_DIR=/opt/app/hive-0.13.1/conf
export SPARK_MASTER_IP=centos-hadoop1
export SPARK_WORKER_MEMORY=1G

Spark Streaming

Kafka

Kafka Streaming

Spark HBase Herberos问题

Search

    Post Directory