如何使用映射文件名sc.textFile向RDD(" S3N://bucket/*.csv")?文件名、如何使用、textFile、RDD

2023-09-11 09:58:05 作者:寂寞如花、

请注意,我必须使用sc.textFile,但我会接受任何其他的答案。

我想要做的就是简单地添加正在处理到RDD文件名....一些事情一样:

VAR RDD = sc.textFile(S3N://bucket/*.csv).MAP(行=>文件名+,+线)

许多AP preciated!

EDIT2:解EDIT1是使用Hadoop的2.4或更高版本。不过,我没有使用奴隶......等等。然而,一些提到的解决方案只适用于小数据集的工作进行了测试。如果您想使用大数据,你将不得不使用HadoopRDD

编辑:我曾尝试以下,并没有奏效:

 :CP symjar / AWS-java的SDK-1.9.29.jar
:CP symjar / AWS-java的SDK流 - 构建 - 工具 -  1.9.29.jar

进口com.amazonaws.services.s3.AmazonS3Client
进口com.amazonaws.services.s3.model {S3ObjectSummary,ObjectListing,GetObjectRequest}
进口com.amazonaws.auth._


DEF awsAccessKeyId =AKEY
DEF awsSecretAccessKey =SKEY

VAL hadoopConf = sc.hadoopConfiguration;
hadoopConf.set(fs.s3n.impl,org.apache.hadoop.fs.s3native.NativeS3FileSystem)
hadoopConf.set(fs.s3n.awsAccessKeyId,awsAccessKeyId)
hadoopConf.set(fs.s3n.awsSecretAccessKey,awsSecretAccessKey)

VAR RDD = sc.wholeTextFiles(S3N://bucket/dir/*.csv).MAP {情况(文件名,内容)=>文件名 }
rdd.count
 

请注意:这是连接到S3,这是不是一个问题(因为我已经测试过很多很多次)

我得到的错误是:

 信息input.FileInputFormat:工艺总输入路径:4
java.io.FileNotFoundException:文件不存在:/RTLM-918/simple/t1-100.csv
    在org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517)
    在org.apache.hadoop.ma$p$pduce.lib.input.CombineFileInputFormat$OneFileInfo.<init>(CombineFileInputFormat.java:489)
    在org.apache.hadoop.ma$p$pduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280)
    在org.apache.hadoop.ma$p$pduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240)
    在org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:267)
    在org.apache.spark.rdd.RDD $$ anonfun $分区$ 2.apply(RDD.scala:219)
    在org.apache.spark.rdd.RDD $$ anonfun $分区$ 2.apply(RDD.scala:217)
    在scala.Option.getOrElse(Option.scala:120)
    在org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    在org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    在org.apache.spark.rdd.RDD $$ anonfun $分区$ 2.apply(RDD.scala:219)
    在org.apache.spark.rdd.RDD $$ anonfun $分区$ 2.apply(RDD.scala:217)
    在scala.Option.getOrElse(Option.scala:120)
    在org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    在org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
    在org.apache.spark.rdd.RDD.collect(RDD.scala:813)
    在$ IWC万国表IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$&LT; INIT&GT;(小于控制台&GT;:29)。
    在$ IWC万国表IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC&LT; INIT&GT;(小于控制台&GT;:34)。
    在$ IWC万国表IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$&LT; INIT&GT;(小于控制台&GT;:36)。
    在$ IWC万国表IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC&LT; INIT&GT;(小于控制台&GT;:38)。
    在$ IWC万国表IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$&LT; INIT&GT;(小于控制台&GT;:40)。
    在$ IWC万国表IWC万国表$$ $$ IWC万国表$$ $$ IWC&LT; INIT&GT;(小于控制台&GT;:42)。
    在$ IWC万国表IWC万国表$$ $$ IWC万国表$$&LT; INIT&GT;(小于控制台&GT;:44)。
    在$ IWC万国表IWC万国表$$ $$ IWC&LT; INIT&GT;(小于控制台&GT;:46)。
    在$ IWC万国表IWC $$&LT; INIT&GT;(小于控制台&GT;:48)。
    为$ IWC&其中;初始化&GT;(小于控制台&GT;:50)。
    在&其中;初始化&GT;(小于控制台&GT;:52)
    在&其中;初始化&GT;(小于控制台&GT;:56)。
    在&所述; clinit&GT;(小于控制台&GT)
    在&其中;初始化&GT;(小于控制台&GT;:7)
    在&所述; clinit&GT;(小于控制台&GT)
    在$打印(小于控制台&GT;)
    在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)
    在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    在java.lang.reflect.Method.invoke(Method.java:606)
    在org.apache.spark.repl.SparkIMain $ ReadEvalPrint.call(SparkIMain.scala:1065)
    在org.apache.spark.repl.SparkIMain $ Request.loadAndRun(SparkIMain.scala:1338)
    在org.apache.spark.repl.SparkIMain.loadAndRunReq $ 1(SparkIMain.scala:840)
    在org.apache.spark.repl.SparkIMain.inter preT(SparkIMain.scala:871)
    在org.apache.spark.repl.SparkIMain.inter preT(SparkIMain.scala:819)
    在org.apache.spark.repl.SparkILoop.reallyInter preT $ 1(SparkILoop.scala:856)
    在org.apache.spark.repl.SparkILoop.inter pretStartingWith(SparkILoop.scala:901)
    在org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
    在org.apache.spark.repl.SparkILoop.processLine $ 1(SparkILoop.scala:656)
    在org.apache.spark.repl.SparkILoop.innerLoop $ 1(SparkILoop.scala:664)
    在org.apache.spark.repl.SparkILoop.org $阿帕奇$火花$ REPL $ SparkILoop $$循环(SparkILoop.scala:669)
    在org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
    在org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    在org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    在scala.tools.nsc.util.ScalaClassLoader $ .savingContextLoader(ScalaClassLoader.scala:135)
    在org.apache.spark.repl.SparkILoop.org $阿帕奇$火花$ REPL $ SparkILoop $$过程(SparkILoop.scala:944)
    在org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
    在org.apache.spark.repl.Main $。主要(Main.scala:31)
    在org.apache.spark.repl.Main.main(Main.scala)
    在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)
    在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    在java.lang.reflect.Method.invoke(Method.java:606)
    在org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    在org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:166)
    在org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:189)
    在org.apache.spark.deploy.SparkSubmit $。主要(SparkSubmit.scala:110)
    在org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 
3.SparkCore 键值对RDD

解决方案

这包括文件名的唯一文本的方法是 wholeTextFiles

  sc.wholeTextFiles(路径).MAP {情况(文件名,内容)=&GT; ...}
 

Please note, I must use the sc.textFile, but I would accept any other answers.

What I want to do is to simply add the filename that is being processed to RDD.... some thing like:

var rdd = sc.textFile("s3n://bucket/*.csv").map(line=>filename+","+line)

Much appreciated!

EDIT2: SOLUTION TO EDIT1 is to use Hadoop 2.4 or above. However, I have not tested it by using the slaves... etc. However, some of the mentioned solutions work only for the small data-sets. If you want to use the big-data, you will have to use the HadoopRDD

EDIT: I have tried the following, and it did not work:

:cp symjar/aws-java-sdk-1.9.29.jar
:cp symjar/aws-java-sdk-flow-build-tools-1.9.29.jar

import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.{S3ObjectSummary, ObjectListing, GetObjectRequest}
import com.amazonaws.auth._


def awsAccessKeyId = "AKEY"
def awsSecretAccessKey = "SKEY"

val hadoopConf = sc.hadoopConfiguration;
hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3n.awsAccessKeyId", awsAccessKeyId)
hadoopConf.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey)

var rdd = sc.wholeTextFiles("s3n://bucket/dir/*.csv").map { case (filename, content) =>  filename }
rdd.count

NOTE: It is connecting to S3, and that is not an issue (as I have tested it many many times).

The error I get is:

INFO input.FileInputFormat: Total input paths to process : 4
java.io.FileNotFoundException: File does not exist: /RTLM-918/simple/t1-100.csv
    at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.<init>(CombineFileInputFormat.java:489)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240)
    at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:267)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
    at $iwC$$iwC$$iwC.<init>(<console>:46)
    at $iwC$$iwC.<init>(<console>:48)
    at $iwC.<init>(<console>:50)
    at <init>(<console>:52)
    at .<init>(<console>:56)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

解决方案

The only text method that includes the file name is wholeTextFiles.

sc.wholeTextFiles(path).map { case (filename, content) => ... }