文件管理 · 2022年7月25日

spark文件读写|spark怎样读取本地文件

1. spark 读取多个文件支持通配符吗为什么用通配符提示找不到文件

val wc = sc.textFile("/user/boco/yy/_*").flatMap(_.split(' ')).map((_,1)).groupByKey直接用*代替,不用加“/”,刚我试过了。而且就算加,怎么会加到*后面啊,加到后面就是找"_*"文件夹了

2. spark怎样读取本地文件

默认是从hdfs读取文件,也可以指定sc.textFile("路径").在路径前面加上hdfs://表示从hdfs文件系统上读本地文件读取sc.textFile("路径").在路径前面加上file://表示从本地文件系统读,如file:///home/user/spark/README.md

3. 如何在spark中读写cassandra数据

之前似乎只有twitter解释过放弃Cassandra的原因,新浪架构师Tim Yang写博客分析过:主要原因还是Cassandra还属于新兴产品,其稳定性及最佳实践还比较一般。之前在淘宝实习时所在团队有使用Cassandra,其并发读写效率不高。分布式存储系统一般满足W+R>N,W为同时写成功数,R为同时读成功数,N为一份数据在集群中的份数。因此一般来说分布式存储很难读写性能俱佳。而一般SNS应用对于并发读写的要求均较高,所以这也是Cassandra无法作为核心数据存储的一大原因。一般来说,解决海量数据存储的方式是MySQL Sharding,利用MySQL成熟的运维经验可以实现良好的稳定性,唯一问题就是扩容比较麻烦。

4. 为啥spark读取的空行当做空字符串呀,而不是null

因为这一行是存在的, 是一个字符串, 只是没有东西而已,他输入的每一行都是字符串的,可以用filter过滤掉空的字符串rdd.filter(!_.trim.equals(""))

5. spark-shell读取.log文件获取日志信息后,怎么进行分析比如统计包含404的行数

用spark-shell去进行日志信息的统计的话,首先第一步就是创建rdd,将数据加载进去。第二步,就是进行map操作,然后调用filter进行过滤,包含404的页面,然后调用count进行统计行数。当然,如果你要进行更详细的统计,或者操作(如果你的log,每列数量都相同的话),可以使用java编写自定义的日志处理类,然后在map的时候,对log进行操作。

6. 为什么spark中saveastextfile保存的文件看不到,但是可以从相应目录下读取其中的数据/

重新说明来一下,之前我测试用源的是谷歌浏览器不是IE。在IE中file.FileName包含路径名的,而谷歌不包含。所以我之前的回答中才会说文件名中不包含路径名,我想你问题就是这个引起的。你可以在保存前加个判断

7. spark怎样读取本地文件

默认是从hdfs读取抄文件,也可以指定sc.textFile("路径").在路径前面加上hdfs://表示从hdfs文件系统上读本地文件读取 sc.textFile("路径").在路径前面加上file:// 表示从本地文件系统读,如file:///home/user/spark/README.md

8. spark怎么读取hadoop数据

如何使用Spark/Scala读取Hbase的数据 必须使用高亮参数启动Spark-shell,否则当你遍历RDD时会出现如下的Exception java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable spark-shell–conf spark.serializer=org.apache.spark.serializer.KryoSerializer 以下代码,经过MaprDB实测通过 import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.maprece.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; val tableName = "/app/SubscriptionBillingPlatform/TRANSAC_ID" val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, tableName) //create rdd val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputForma…

9. 如何使用scala+spark读写hbase

公司有一些实时数据处理的项目,存储用的是hbase,提供实时的检索,当然hbase里面存储的数据模型都是简单的,复杂的多维检索的结果是在es里面存储的,公司也正在引入Kylin作为OLAP的数据分析引擎,这块后续有空在研究下。接着上面说的,hbase存储着一些实时的数据,前两周新需求需要对hbase里面指定表的数据做一次全量的update以满足业务的发展,平时操作hbase都是单条的curd,或者插入一个批量的list,用的都是hbase的java api比较简单,但这次涉及全量update,所以如果再用原来那种单线程的操作api,势必速度回慢上许多。关于批量操作Hbase,一般我们都会用MapRece来操作,这样可以大大加快处理效率,原来也写过MR操作Hbase,过程比较繁琐,最近一直在用scala做spark的相关开发,所以就直接使用scala+spark来搞定这件事了,当然底层用的还是Hbase的TableOutputFormat和TableOutputFormat这个和MR是一样的,在spark里面把从hbase里面读取的数据集转成rdd了,然后做一些简单的过滤,转化,最终在把结果写入到hbase里面。整个流程如下:(1)全量读取hbase表的数据(2)做一系列的ETL(3)把全量数据再写回hbase核心代码如下://获取confval conf=HBaseConfiguration.create() //设置读取的表conf.set(TableInputFormat.INPUT_TABLE,tableName) //设置写入的表conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)//创建sparkConfval sparkConf=new SparkConf() //设置spark的任务名sparkConf.setAppName("read and write for hbase ") //创建spark上下文val sc=new SparkContext(sparkConf)//为job指定输出格式和输出表名val newAPIJobConfiguration1 = Job.getInstance(conf)newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName)newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])//全量读取hbase表val rdd=sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])//过滤空数据,然后对每一个记录做更新,并转换成写入的格式val final_rdd= rdd.filter(checkNotEmptyKs).map(forDatas)//转换后的结果,再次做过滤val save_rdd=final_rdd.filter(checkNull)//最终在写回hbase表save_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration)sc.stop()从上面的代码可以看出来,使用spark+scala操作hbase是非常简单的。下面我们看一下,中间用到的几个自定义函数:第一个:checkNotEmptyKs作用:过滤掉空列簇的数据def checkNotEmptyKs(f:((ImmutableBytesWritable,Result))):Boolean={ val r=f._2 val rowkey=Bytes.toString(r.getRow) val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(Bytes.toBytes("ks")).asScala if(map.isEmpty) false else true}第二个:forDatas作用:读取每一条数据,做update后,在转化成写入操作def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={ val r=f._2 //获取Resultval put:Put=new Put(r.getRow) //声明putval ks=Bytes.toBytes("ks") //读取指定列簇val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(ks).asScalamap.foreach(kv=>{//遍历每一个rowkey下面的指定列簇的每一列的数据做转化val kid= Bytes.toString(kv._1)//知识点idvar value=Bytes.toString(kv._2)//知识点的value值value="修改后的value"put.addColumn(ks,kv._1,Bytes.toBytes(value)) //放入put对象}) if(put.isEmpty) null else (new ImmutableBytesWritable(),put)}第三个:checkNull 作用:过滤最终结果里面的null数据def checkNull(f:((ImmutableBytesWritable,Put))):Boolean={ if(f==null) false else true}上面就是整个处理的逻辑了,需要注意的是对hbase里面的无效数据作过滤,跳过无效数据即可,逻辑是比较简单的,代码量也比较少。

10. spark中怎么将读取的每一行的数据按某几行拼接成一行 新手,求指教,谢谢!

assert(args.length>1)val_from=args(0)val_to=args(1)vals=sc.textFile(_from).collect()valn=if(args.length>2)args(2).toIntelse2valnumSlices=s.length/nvalx=sc.parallelize(s,numSlices).zipWithIndex().aggregate(List.empty[List[String]])(seqOp=(result,lineWithIndex)=>{lineWithIndexmatch{case(line,index)=>if(index%n==0)List(line)::resultelse(line::result.head)::result.tail}},combOp=(x,y)=>x:::y).map(_.reversemkString"")sc.parallelize(x).saveAsTextFile(_to)

textFile 不能指定分区数目,所以只能parallelize, n是每几行一合并,RDD的aggregate方法与foldLeft类似,因为并行,合并之后的行间顺序不确定的

下面给出非RDD操作示例

vals=List("123","234","345","456","567","678")valn=3//sparkRDD没有foldLeftvalx=s.zipWithIndex.foldLeft(List.empty[List[String]])((result,lineWithIndex)=>{lineWithIndexmatch{case(line,index)=>if(index%n==0)List(line)::resultelse(line::result.head)::result.tail}}).reverse.map(_.reverse)println(x)