on
记配置n遍spark多机分布式环境
背景
最近由于论文的关系,设计的算法需要在分布式环境下,测试算法的通信时间通信代价,于是尝试配置了多台机器的分布式环境。由于配置过程较为复杂,其中也遇到许许多多问题,由于各式各样的因素,不得不一直转换不同的环境,完成机器的配置。虽然由于水平不足,犯了许多不必要的配置错误,有的问题看起来比较愚蠢,但为了之后避免踩入相同的坑,也就将这一路以来,不断配置更新的过程写成文章,以方便查找。
配置20遍
最初使用的平台是人大校级计算平台,在这个平台上,可以申请一定数量的机器,然后以科研结果作为经费抵扣。使用此平台的原因是之前有前辈在上面配置过 Spark 环境,而我有一定机会可以直接利用他配置好的成果,然而事情并没有像我想象的那么简单。此时出现了两个主要的问题,其一是该环境并没有真正配置yarn,并不能做到真正的并行;其次实际上此平台的集群是在一个大机器上分割出的小虚拟机组成集群,这样的集群实际上的通信代价是非常低的,这无法体现出我们算法的优势,因此我不得不寻找其他平台。之后就在组里先找了6台服务器,直接利用这6台服务器搭建一个集群,虽然机器数目少一点,但平摊下来,每个机器都比原来的配置要更好。当然事情不会那么顺利,由于我实验操作的数据量极大,我不断试探服务器计算能力的上限,最终这些服务器也难堪重负,纷纷内存耗尽、磁盘耗尽,引发了一系列不好的连锁反应,究其原因是我没有做docker环境隔离(要学的东西还很多)。由于当时论文ddl在即,让我只能在夜间跑代码,完全是不可能完成目标的,因此我不得不使用阿里云下的服务器。之后就搞了16台阿里云服务器,并在上面配置真·分布式环境,此时我已经有了十次左右配置环境的经验,但哪怕如此,又经历了经费不足、神秘bug等等意想不到的问题,但我最终还是勉强完成了论文,初次投稿当然还是被拒了。之后改投论文的过程中,吸取了服务器可能很容易崩,随时可能换机器的现实,尽可能地将许多作业改为了批处理,终于又配置了很多次,最终完成了实验和论文。
分布式环境的成分
HDFS
虽然说使用 Spark without Hadooop 从一定程度上配置或许会简单一点,但为了比较清晰地感受分布式环境,并更好地存储数据,我还是采用了 Hadoop 与 Spark 分开配置的策略,这里使用的 Hadoop 版本为 3.3.1。
(HDFS 其实就是一个分布式的文件管理系统,将数据分布式的存储在不同的机器上,一方面可以存的更多,一方面也是可以使得处理数据更快,数据直接分布在不同机器上,也就省去了从主机向其他机器发送数据的通信过程。)
SPARK
这里使用Spark的版本是3.1.2 (Spark 分布式计算的环境,利用这样已有的环境就不需要自己去写通信、底层调度,也不必担心各种死锁的问题。)
由于我是不太会 Scala 的(但是任意一种语言,稍微看看基础代码我还能做到),为了方便上手,这里使用的是 PySpark, PySpark 是python环境下提供的spark接口,这样只需要掌握好启动命令,再学一些简单的语法,就可以将spark 调动起来。
其他部分
JDK 自然是必不可少的,我这里用的是1.8.0,scala也需要装一下,我用的是2.12.11。
多机
多机免密
要想真正把多台机器运行起来,首先需要的是保证多台机器之间不再需要手动输入密码。虽然两台机器配置免密登陆很简单,但多台机器要保证免密登陆,靠手动收发文件就变得十分磨人,16台机器就要操作 16^2 次。这里网上有各种教程,我只提一下核心思路,不想放太多代码。核心思想是先在文件中按照一定格式存放好服务器的信息(分为主节点和从节点,为了方便,一般可以让所有机器之间互相通信),先尝试能否批量访问每个服务器,再批量生成证书,并将证书分发。 利用以下命令,清除 known_hosts 文件
ssh-keygen -f '~/.ssh/known_hosts' -R ip地址
利用以下命令,将证书分发到服务器:
ssh-copy-id -i ~/.ssh/id_rsa.pub -p 端口号 用户名@ip地址
利用 fabric 包的 Connection 函数,创建 host ,连接服务器。
host = Connection(ip地址, port = 端口, user=用户名,connect_kwargs={'password':密码,'timeout':10})
在创建了 host 以后,即可利用 host 执行清除 known_hosts 的命令、分发证书的命令,以及其他各式各样的命令。 下面给出例子:
result = host.run(需要执行的命令,pty=True,watchers=[passwd,yes],hide=True,warn=True,timeout=60)
这里的pty是指伪终端,不设置的话,有的命令会失败;watchers解决的是,当遇到不同的反馈时,需要作出的反馈,例子如下:
passwd = Responder(
pattern=r'password',
response=node.passwd + '\n'
)
yes = Responder(
pattern = r'(yes/no)',
response = 'yes\n'
)
利用正则表达式,根据不同条件下返回的字符串,输入不同的应答,这样就可以做到自动化应答。
这里也是参考不少网络上的教程,遇到问题,只需要把握原理,基本就能解决。当实在解决不了时,可以切换到手动模式,重新进行调试。
基于一台机器的多机收发
这里的多机收发目的是希望批量将机器的文件收发到不同机器上。利用上一节的host,只能创建一个虚拟的终端,不太适合收发文件。从一个服务器往其他服务器传输 Java、Spark、hosts等文件,并不需要远程登录其他服务器,执行传输文件命令即可。 其次,可以从每个服务器中,收集它们的信息,并在同一个机器中整合,在后面提到的 bug,正是需要从每个机器中收集信息。
如果配置好了免密登陆,那么可以直接利用os命令,直接执行本机的命令,例如文件接收:
os.system('scp -P 端口号 用户名@ip地址:目标文件地址 本地目标地址')
如果没有配置好密码,可以使用 pexpect 包,发密码并执行命令,例如从本地往服务器中收发文件,例子如下:
scp_crt_command = 'scp -P 端口号 用户名@ip地址:目标文件地址 本地目标地址'
child = pexpect.spawn(scp_crt_command)
child.expect(r'password')
child.sendline(node.passwd)
child.read()
这一节的命令胜在简单,在不想写太多代码时,可以利用尽可能短的方式,完成代码的收发工作。当然,若是想在每台机器上执行一定的命令,还是需要采用上一小节的方式。
分布式环境的配置
HDFS + Spark 的配置文件,只需要完成一次,并保存好一份副本,即可在服务器失效的情况下,快速再完成配置。这里的配置也可以参考许多博客中说的,多机配置 HDFS+Spark 环境,最好需要配置yarn,以保证分布式的运行。其中有太多的细节,基本上也是根据官网提供的流程走。
首先是配置.bashrc文件,将scala、hdfs、spark、jre等的路径配置好,我这里给出配置的例子:
export JAVA_HOME=/root/spark/jdk1.8.0_301
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$CLASSPATH
export PATH=$PATH:/root/spark/jdk1.8.0_301/bin
export PATH=$PATH:/root/spark/scala-2.12.11/bin
export HADOOP_HOME=/root/spark/hadoop-3.3.1
export PATH=$PATH:${HADOOP_HOME}/bin
export SPARK_HOME=/root/spark/spark-3.1.2
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
利用文件收发可以批量将这一段代码以文件的形式发送到每个机器上,再使用免密登陆部分的代码,使每个机器将这些代码写入到每个机器的 .bashrc 文件中。Scala和Java的安装只需要下好包,并设置.bashrc即可。HDFS和Spark则需要改一些配置文件。
HDFS
HDFS 的配置主要修改 etc/hadoop/core-site.xml、etc/hadoop/hdfs-site.xml,要配置YARN的时候需要配置etc/hadoop/mapred-site.xml、etc/hadoop/yarn-site.xml文件,配置的内容可以参考 hadoop 的官网。 这里给出我的配置方案: 首先配置 worker 文件,我有16台机器,则配置了16个worker:
#workers
Slave01
# 这里代表中间的13个节点,写作workerXX
Slave15
要想顺利使用 Master-Slave 或者是 Main-Worker,亦或者是其他主从名字,都需要在将 ip 地址于设置的名字一一对应,并写入到/etc/hosts 文件中。
# core-site.xml
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/mnt/tmp</value>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://Master:9000</value>
</property>
</configuration>
# hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<!--value>Master:50090</value-->
<value>Master:50090</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/mnt/tmp/hadoop/dfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/mnt/tmp/hadoop/dfs/data</value>
</property>
<property>
<name>dfs.wenhdf.enabled</name>
<value>true</value>
</property>
</configuration>
#mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>Master:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>Master:19888</value>
</property>
</configuration>
这里可以利用其中一个节点分担主节点的管理压力,也可以都设置为 Master
#yarn-site.xml
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>Slave01</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>Slave01:8032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>Slave01:8030</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>Slave01:8031</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>Slave01:8033</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>Slave01:8088</value>
</property>
</configuration>
只有真正配置好yarn,在执行任务的过程中,才能真正调度好每台机器的CPU,否则计算只是本地模式,无法做到真正测试分布式的目的。
在完全配置好Hadoop之后,将相同的内容分发到其他机器上,然后再执行 sbin/start-all.sh 将整个服务调动起来,也可以通过执行每个部分的start文件,开始部分服务。但要保证HDFS正确的执行,还需要重要的一步:
bin/hdfs namenode -format
这一步的作用是将namenode初始化,当服务不小心宕掉后,或是服务器关机多时,有可能导致HDFS无法重启服务时,也需要删除Hadoop中的缓存文件,再重新运行此命令,使服务重启。至于Hadoop临时文件的位置,则设置在了 core-site.xml 的属性 hadoop.tmp.dir 里。而我为了保证当缓存过多时,不影响服务器的正常执行,将临时目录挂在了其他的硬盘上——/mnt/tmp,这里也需要因人而异。
最后,可以利用jps命令查看各部分服务是否启动成功,并依据没启动成功的部分,查找相应位置的配置。总的来说,HDFS的配置的坑并不算多,依照网上的教程来,难度算是比较适中的。
Spark
Spark的配置,其实也不算难,但由于它在最上层,下层出错之后,往往难以定位,总是认为是Spark配置的问题,这会导致很难查找错误。Spark的配置主要关注于 conf/spark-env.sh 、conf/workers, conf/workers没太多好说的,跟Hadoop配置的一样即可。我给出我配置 spark-env.sh 的结果:
export HADOOP_HOME=/root/spark/hadoop-3.3.1
export HADOOP_CONF_DIR=/root/spark/hadoop-3.3.1/etc/hadoop
export JAVA_HOME=/root/spark/jdk1.8.0_301
export SCALA_HOME=/root/spark/scala-2.12.11
export SCALA_LIBRARY_PATH=${SCALA_HOME}/lib
export SPARK_WORKING_MEMORY=60g
export SPARK_MASTER_IP=Master
export SPARK_DIST_CLASSPATH=$(/root/spark/hadoop-3.3.1/bin/hadoop classpath)
export LD_LIBRARY_PATH=$HADOOP_HOME/lib/native:$LD_LIBRARY_PATH
export SPARK_MASTER_HOST=Master
export SPARK_MASTER_PORT=17077
export SPARK_MASTER_WEBUI_PORT=7078
export SPARK_WORKER_CORES=4
export SPARK_WORKER_MEMORY=60g
export SPARK_EXECUTOR_MEMORY=50g
export SPARK_WORKER_INSTANCE=1
export SPARK_LOCAL_DIRS=/mnt/tmp
有的部分,我自己也不是很清楚其背后的含义,总之是参考了多个教程整合所得。容易出问题的地方在于,有时候会跑空内存,就需要在这里改内存。但根据我自己实战的经验,往往内存不足是因为没做到真正的分布式,代码只在主节点执行,直到内存被跑空,需要看yarn配置是否成功。
在所有的一切设置好后,再将spark目录分发到每个机器的相同目录下,即可使用 sbin/start-all.sh 开启整个spark 服务。
服务器配置
在完成单机的配置之后,只需要分别把同样的内容,分发到不同机器上,再开启服务即可。所以最前面远程调用每个机器、分发文件代码都是保证能高效配置更多机器的基础。一台一台的修改、发送,有可能一天都配置不好集群,但利用批量化的处理,一个小时内即可完成复杂的配置任务,关键是留好副本。
服务器的配置除了分发jdk、scala、hadoop、spark之外,还需要配置免密、修改 etc/hosts 、.bashrc,由于我使用了 Cython 代码,还涉及到远程调用每个机器进行python 包的编译安装,使用 python, 最好将anaconda一起传输配置。
踩坑
坑一: 机器和集群的问题
除了机器本身访问网络异常等问题外,机器最好在同一个局域网下,这样利用内网IP进行配置和访问,就可以避免防火墙带来的干扰。其次是机器最好预先计算好运行的费用,赶稿期间,机器没钱了,可以是非常致命的打击!此外,最好不要在别人都用的服务器上,做太大规模的计算,频繁地读写,很容易把磁盘写满,也提醒了这类型的分布式,其实更关键的部分在于磁盘大小,而不在于GPU之类的。
坑二: 设备名与IP
这是我在前两次集群中没有遇到的问题,当第三次在阿里云服务器上配置时,直接让我配置到怀疑人生。明明都是一套路子,但是等到调用Spark时,服务就是起不来。在我检查日志后,反复搜索,都没找到一个解决的方法。事实证明这种bug,可能并不会每次都出现。直到我读到这篇文章《spark远程调用的几个坑》 时,恍然大悟,从简短的语句中找到了解决之道。通常在每个终端中的开头都是 用户名@机器名
,但是如果hosts文件中,没有对应的ip地址,那么在其他服务器中,有可能就无法识别该机器是哪台机器。解决方法正是从这篇文章中找到,在每个服务器的hosts文件中,都加上:
机器名 IP地址
机器名的提取,我也是从每个hosts文件中得到的,再一次体现了,批量收发文件,和批量执行命令的重要性。
坑三: 漏步骤的执行
即使我配置了那么多遍服务,但在一次又一次的重复配置中发现,我还是可能遗漏步骤。比较常见的是遗漏执行 namenode 初始化,
bin/hdfs namenode -format
遗漏执行spark的启动命令,等等。
相信Spark的配置中,必定仍有坑是我没踩到的,更多的坑也可能出现在 Spark 的执行中,总之道路漫漫。
测试运行
这里给一些简单的测试例子查看配置是否成功。
HDFS
创建,查看文件夹;上传文件
hdfs dfs -mkdir test
hdfs dfs -ls test
hdfs dfs -put test_file test
Spark
指定 master 可以启动 Spark 的 不同模式,可以在 Spark 的终端测试 Spark 是否配置成功。
# 启动命令
bin/spark-shell --master spark://Master:17077
shell 内的测试脚本
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
val result: RDD[Int] = sc.makeRDD(Array[Int](1,2,3,4,5,6))
result.count()
Jupyter notebook
以下是经过我查找之后,试出来的利用 Jupyter notebook 运行 PySpark 的代码,有的地方可能不太完整,但可以参照类似的设置方式进行修改。
import os
memory = '200g'
pyspark_submit_args = ' --driver-memory ' + memory +' pyspark-shell'+' --num-executors 15 --executor-cores 8'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args
import findspark
findspark.init()
from pyspark import SparkContext
# 真分布式模式
sc=SparkContext(master="spark://Master:17077",appName="test")
# 本地模式
# sc=SparkContext(appName="test")
sc._conf.set("spark.driver.maxResultSize", '50g')
小结
本文比较粗略记录了配置分布式 Spark 环境,但没有记录详细的 PySpark 代码。根据多次配置的经验,每次更换环境之后,都有可能出现新的问题,但总的来说,网络上都能找到解决方案。我将自己认为比较重要的部分进行了记录,将来如果还需要再基于 Spark 进行研究,只希望能利用此次记录快速再次进行配置。Spark 代码本身的书写并不算特别复杂,虽然我也并没完全掌握,但将来还需要对 Spark 进行更近一步研究时,会考虑再写一篇写 Spark 代码时,遇到的疑难杂症。