文件管理 · 2022年8月15日

sparkhdfs文件|问题spark 如何写二进制文件到hdfs

『壹』 如何在hadoop和spark中定义多文件输出

菜鸟老三

hadoop多文件输出

现实环境中,常常遇到一个问题就是想使用多个Rece,但是迫于setup和cleanup在每一个Rece中会调用一次,只能设置一个Rece,无法是实现负载均衡。

问题,如果要在rece中输出两种文件,一种是标志,另一种是正常业务数据,实现方案有三种:

(1)设置一个rece,在rece中将数据封装到一个集合中,在cleanup中将数据写入到hdfs中,但是如果数据量巨大,一个rece无法充分利用资源,实现负载均衡,但是如果数据量较小,可以使用

(2)设置多文件输出,使用MultipleOutputs类

具体见代码:

[java]view plainprint?

privateMultipleOutputsmos;

@Override

protectedvoidsetup(Contextcontext)

throwsIOException,InterruptedException{

mos=newMultipleOutputs(context);

}

@Override

protectedvoidrece(Textkey,Iterable<Text>values,Contextcontext)

throwsIOException,InterruptedException{

Stringkey1=key.toString();

for(Textt:values){

if(key1.equals("a")){

mos.write("a",key,t);

}elseif(key1.equals("b")){

mos.write("b",key,t);

}elseif(key1.equals("c")){

mos.write("c",key,t);

}

}

}

@Override

protectedvoidcleanup(

Contextcontext)

throwsIOException,InterruptedException{

mos.close();

}

[java]view plainprint?

main方法中配置

[java]view plainprint?

<prename="code"class="java">MultipleOutputs.addNamedOutput(job,"a",TextOutputFormat.class,Text.class,Text.class);

MultipleOutputs.addNamedOutput(job,"b",TextOutputFormat.class,Text.class,Text.class);

MultipleOutputs.addNamedOutput(job,"c",TextOutputFormat.class,Text.class,Text.class);

[java]view plainprint?

结果文件为a-r-0000,b-r-0000,c-r-0000,part-r-0000

『贰』 如何在spark中删除hdfs的某个文件夹

hadoop 添加删除datanode及tasktracker 首先: 建议datanode和tasktracker分开写独立的exclude文件,因为一个节点即可以同时是datanode和tasktracker,也可以单独是datanode或tasktracker。 1、删除datanode 修改namenode上的hdfs-site.xml <property><name>dfs.hosts</name><value>/usr/local/hadoop/conf/datanode-allow-list</value></property> <property><name>dfs.hosts.exclude</name><value>/usr/local/hadoop/conf/datanode-deny-list</value></property>其中dfs.host列出了连入namenode的节点,如果为空,则所有的datanode都可以连入namenode。如果不为空,则文件中存在的datanode可以连入。 dfs.hosts.exclude列出了禁止连入namenode的节点。 如果一个节点同时存在于dfs.hosts和dfs.hosts.exclude,则禁止连入。 具体步骤 (1)将要删除的datanode加入到dfs.hosts.exclude指定的文件中。(最好使用主机名,IP有时可能不生效) (2)动态刷新配置,不需要重启namenode hadoop dfsadmin -refreshNodes(3)通过hadoop dfsadmin -report或webui,可以看到,该datanode的状态为Decommissioning (4)等待一段时间,该节点为dead状态。 (5)删除dfs.hosts文件中该节点 即下架目标机器后,再次编辑dfs.hosts.exclude,把刚才下架的机器的ip或机器名移走(6) hadoop dfsadmin -refreshNodes注:按照上面的操作后,如果你想要重新启用该节点为datanode,从dfs.hosts.exclude中删除该节点,refreshNodes,然后,到该节点上,重启启动该datanode: /usr/local/hadoop/bin/hadoop-daemon.sh stop datanode/usr/local/hadoop/bin/hadoop-daemon.sh start datanode注:正确的删除datanode的方法应该是按照上面的方法,使用exclude文件,而不应该直接去datanode上去sotp datanode,这样会造出数据丢失,而且stop以后,webui和hadoop dfsadmin -report都仍然显示该datanode节点。除非重新启动namenode。 之所以重新启用exclude的节点时可以stop datanode,因为此时该datanode不包含在cluster中,所以,stop后不会造成数据丢失。 2、添加datanode 如果存在dfs.hosts文件,且不为空,则添加新的datanode到该文件中,refreshNodes。 到新节点上,启动即可 /usr/local/hadoop/bin/hadoop-daemon.sh start datanode如果不存在dfs.hosts文件,或文件为空,直接启动新节点即可。 3、删除tasktracker 原理和步骤与删除datanode一样。 <property><name>mapred.hosts</name><value>/usr/local/hadoop/conf/tasktracker-allow-list</value></property> <property><name>mapred.hosts.exclude</name><value>/usr/local/hadoop/conf/tasktracker-deny-list</value></property>动态刷新配置的命令为: hadoop mradmin -refreshNodes 立刻生效,可以在webui中看到,nodes节点数量的变化,以及Excluded Nodes节点的变化。 具体的步骤参考上面的删除datanode的步骤 注:按照上面的操作后,如果你想要重新启用该节点为tasktracker,从mapred.hosts.exclude中删除该节点,refreshNodes,然后,到该节点上,重启启动该tasktracker: /usr/local/hadoop/bin/hadoop-daemon.sh stop tasktracker/usr/local/hadoop/bin/hadoop-daemon.sh start tasktracker注:正确的删除tasktracker的方法应该是按照上面的方法,使用exclude文件,而不应该直接去tasktracker上去sotp tasktracker,这样会造成job失败,而且stop以后,webui上仍然显示该tasktracker节点。除非重新启动jobtracker。 我遇到的一个问题: 在exclude文件中,我使用了IP,发现tasktracker仍然参与计算。 在webui中发现,Excluded Nodes中存在该IP,Nodes中仍然存在该tasktracker的hostname。 解决的办法就是,在exclude中使用hostname而不使用IP。 判断是否真正生效:如果exclued文件中正确的填写了要失效的node,则总得nodes数量应该减小。 4、添加tasktracker 如果存在mapred.hosts文件,且不为空,则添加新的tasktracker到该文件中,refreshNodes。 到新节点上,启动即可 /usr/local/hadoop/bin/hadoop-daemon.sh start tasktracker如果不存在mapred.hosts文件,或文件为空,直接启动新节点即可。 6、添加或删除datanode后,平衡磁盘利用率 运行bin/start-balancer.sh,这个会很耗时间 备注: 如果不balance,那么cluster会把新的数据都存放在新的node上,这样会降低mr的工作效率; /usr/local/hadoop/bin/start-balancer.sh -threshold 0.17下架目标机器后,再次编辑mapred.hosts.exclude,把刚才下架的机器的ip或机器名移走threshold 是平衡阈值,默认是10%,值越低各节点越平衡,但消耗时间也更长。 balancer也可以在有mr job的cluster上运行,默认dfs.balance.bandwidthPerSec很低,为1M/s。在没有mr job时,可以提高该设置加快负载均衡时间。 在namenode的hdfs-site.xml中增加设置balance的带宽,默认只有1M: <property><name>dfs.balance.bandwidthPerSec</name><value>10485760</value><description>Specifies themaximum bandwidth that each datanode can utilize for the balancing purpose interm of the number of bytes per second.</description></property>

『叁』 刚学习spark,想上传文件给hdfs,是不是需要hadoop然后java编程这样是用eclip

spark会把hdfs当做一个数据源来处理, 所以数据存储都要做, 之后编程是从Hadoop改成spark就可以了. 是否用eclipse无所谓, 只要能编译运行就可以

『肆』 spark与hdfs怎么加载本地文件

hdfs和本地文件系统是并列的, 都可以使用spark进行加载, 只不过 hdfs是分布式的文件系统, spark 可以使用 sc.textFile("hdfs://….")操作hdfs的文件也可以 sc.textFile("本地文件路径")来加载本地文件的路径。

『伍』 spark为什么addfile hdfs上的文件失败

你跟我一开始情况差不多,应该是tasktracker和jobtracker没有启动,你把这两个单独启动:./bin/start-mapped.sh。这样应该就行了

『陆』 问题,spark 如何写二进制文件到hdfs

存放二进制文件的数据要使用字节型数组,不能是字符型数组:Dim DAT() As ByteDim FileSize As Long '文件长度FileSize = FileLen(文件名) '获取文件长度ReDim DAT(FileSize – 1) As ByteOpen 文件名 For Binary As #1Get #1, , DATClose数据已经在DAT数组中了,你可以进行任意处理

『柒』 spark和hadoop的区别

直接比较Hadoop和Spark有难度,因为它们处理的许多任务都一样,但是在一些方面又并不相互重叠。

比如说,Spark没有文件管理功能,因而必须依赖Hadoop分布式文件系统(HDFS)或另外某种解决方案。

Hadoop框架的主要模块包括如下:

Hadoop Common

Hadoop分布式文件系统(HDFS)

Hadoop YARN

Hadoop MapRece

虽然上述四个模块构成了Hadoop的核心,不过还有其他几个模块。这些模块包括:Ambari、Avro、Cassandra、Hive、 Pig、Oozie、Flume和Sqoop,它们进一步增强和扩展了Hadoop的功能。

Spark确实速度很快(最多比Hadoop MapRece快100倍)。Spark还可以执行批量处理,然而它真正擅长的是处理流工作负载、交互式查询和机器学习。

相比MapRece基于磁盘的批量处理引擎,Spark赖以成名之处是其数据实时处理功能。Spark与Hadoop及其模块兼容。实际上,在Hadoop的项目页面上,Spark就被列为是一个模块。

Spark有自己的页面,因为虽然它可以通过YARN(另一种资源协调者)在Hadoop集群中运行,但是它也有一种独立模式。它可以作为 Hadoop模块来运行,也可以作为独立解决方案来运行。

MapRece和Spark的主要区别在于,MapRece使用持久存储,而Spark使用弹性分布式数据集(RDDS)。

性能

Spark之所以如此快速,原因在于它在内存中处理一切数据。没错,它还可以使用磁盘来处理未全部装入到内存中的数据。

Spark的内存处理为来自多个来源的数据提供了近乎实时分析的功能:营销活动、机器学习、物联网传感器、日志监控、安全分析和社交媒体网站。另 外,MapRece使用批量处理,其实从来就不是为惊人的速度设计的。它的初衷是不断收集来自网站的信息,不需要这些数据具有实时性或近乎实时性。

易用性

支持Scala(原生语言)、Java、Python和Spark SQL。Spark SQL非常类似于SQL 92,所以几乎不需要经历一番学习,马上可以上手。

Spark还有一种交互模式,那样开发人员和用户都可以获得查询和其他操作的即时反馈。MapRece没有交互模式,不过有了Hive和Pig等附加模块,采用者使用MapRece来得容易一点。

成本

“Spark已证明在数据多达PB的情况下也轻松自如。它被用于在数量只有十分之一的机器上,对100TB数据进行排序的速度比Hadoop MapRece快3倍。”这一成绩让Spark成为2014年Daytona GraySort基准。

兼容性

MapRece和Spark相互兼容;MapRece通过JDBC和ODC兼容诸多数据源、文件格式和商业智能工具,Spark具有与MapRece同样的兼容性。

数据处理

MapRece是一种批量处理引擎。MapRece以顺序步骤来操作,先从集群读取数据,然后对数据执行操作,将结果写回到集群,从集群读 取更新后的数据,执行下一个数据操作,将那些结果写回到结果,依次类推。Spark执行类似的操作,不过是在内存中一步执行。它从集群读取数据后,对数据 执行操作,然后写回到集群。

Spark还包括自己的图形计算库GraphX。GraphX让用户可以查看与图形和集合同样的数据。用户还可以使用弹性分布式数据集(RDD),改变和联合图形,容错部分作了讨论。

容错

至于容错,MapRece和Spark从两个不同的方向来解决问题。MapRece使用TaskTracker节点,它为 JobTracker节点提供了心跳(heartbeat)。如果没有心跳,那么JobTracker节点重新调度所有将执行的操作和正在进行的操作,交 给另一个TaskTracker节点。这种方法在提供容错性方面很有效,可是会大大延长某些操作(即便只有一个故障)的完成时间。

Spark使用弹性分布式数据集(RDD),它们是容错集合,里面的数据元素可执行并行操作。RDD可以引用外部存储系统中的数据集,比如共享式文件系统、HDFS、HBase,或者提供Hadoop InputFormat的任何数据源。Spark可以用Hadoop支持的任何存储源创建RDD,包括本地文件系统,或前面所列的其中一种文件系统。

RDD拥有五个主要属性:

分区列表

计算每个分片的函数

依赖其他RDD的项目列表

面向键值RDD的分区程序(比如说RDD是散列分区),这是可选属性

计算每个分片的首选位置的列表(比如HDFS文件的数据块位置),这是可选属性

RDD可能具有持久性,以便将数据集缓存在内存中。这样一来,以后的操作大大加快,最多达10倍。Spark的缓存具有容错性,原因在于如果RDD的任何分区丢失,就会使用原始转换,自动重新计算。

可扩展性

按照定义,MapRece和Spark都可以使用HDFS来扩展。那么,Hadoop集群能变得多大呢?

据称雅虎有一套42000个节点组成的Hadoop集群,可以说扩展无极限。最大的已知Spark集群是8000个节点,不过随着大数据增多,预计集群规模也会随之变大,以便继续满足吞吐量方面的预期。

安全

Hadoop支持Kerberos身份验证,这管理起来有麻烦。然而,第三方厂商让企业组织能够充分利用活动目录Kerberos和LDAP用于身份验证。同样那些第三方厂商还为传输中数据和静态数据提供数据加密。

Hadoop分布式文件系统支持访问控制列表(ACL)和传统的文件权限模式。Hadoop为任务提交中的用户控制提供了服务级授权(Service Level Authorization),这确保客户拥有正确的权限。

Spark的安全性弱一点,目前只支持通过共享密钥(密码验证)的身份验证。Spark在安全方面带来的好处是,如果你在HDFS上运行Spark,它可以使用HDFS ACL和文件级权限。此外,Spark可以在YARN上运行,因而能够使用Kerberos身份验证。

总结

Spark与MapRece是一种相互共生的关系。Hadoop提供了Spark所没有的功能特性,比如分布式文件系统,而Spark 为需要它的那些数据集提供了实时内存处理。完美的大数据场景正是设计人员当初预想的那样:让Hadoop和Spark在同一个团队里面协同运行。

『捌』 sparksql怎么讲hdfs上的文件导入hbase

打开你的视频文件夹,直接把文件拖到会声会影的时间线上就好了,注意要放在视频轨,不然放不上去的。另个,还要使用会声会影支持的视频文件格式才行,有些格式是不支持的。

『玖』 spark与hdfs怎么加载本地文件

默认是从hdfs读取文件,也可以指定sc.textFile("路径").在路径前面加上hdfs://表示从hdfs文件系统上读本地文件读取 sc.textFile("路径").在路径前面加上file:// 表示从本地文件系统读,如file:///home/user/spark/README.md

『拾』 spark怎么从hadoop读取文件

Hadoop配置节点:sg202(namenode SecondaryNameNode) sg206(datanode) sg207(datanode) sg208(datanode)Spark配置节点:sg201(Master) sg211(Worker)