绕过org.apache.hadoop.ma pred.InvalidInputException:输入模式S3N:// [...]匹配0文件模式、文件、hadoop、apache

2023-09-11 09:43:46 作者:唐僧不过就是一个耍猴的

这是一个问题,我already问火花用户的邮件列表上,我希望在这里获得更多的成功。

This is a question i've already asked on the spark user mailing list and i hope to get more success here.

我不知道它直接关系到引发火花,虽然有事情做的事实,我不能很容易地解决这个问题。

I'm not sure it's directly related to spark though spark has something to do with the fact I can't easily resolve that problem.

我试图用各种方式从S3获得一些文件。我的问题是,一些这些模式可能会返回任何结果,而当他们这样做,我得到以下异常:

I'm trying to get some files from S3 using various patterns. My problem is that some of those patterns may return nothing, and when they do so, i get the following exception:

org.apache.hadoop.mapred.InvalidInputException: Input Pattern s3n://bucket/mypattern matches 0 files
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:52)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:52)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:52)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58)
    at org.apache.spark.api.java.JavaPairRDD.reduceByKey(JavaPairRDD.scala:335)
    ... 2 more

我想一个办法忽略丢失的文件,只是什么也不做在这种情况下。这里的问题国际海事组织是,我不知道,如果一个模式将返回的东西,直到它的实际执行和火花开始处理数据只发生操作时(这里的 reduceByKey 部分)。所以,我不能只赶上一个错误的地方,并让事情继续。

I would like a way to ignore missing files and just do nothing in that case. The problem here IMO is that i don't know if a pattern will return something until it's actually executed and spark starts processing data only when an action occurs (here, the reduceByKey part). So i can't just catch an error somewhere and let things continue on.

一个解决办法是强制火花单独处理每个路径,但是这可能会花费配发的速度和/或存储方面,所以我正在寻找一个其他的选择,这将是有效的。

One solution would be to force spark to process each path individually but that will probably cost allot in terms of speed and/or memory so i'm looking for an other option that would be efficient.

我使用的火花0.9.1。 谢谢

I'm using spark 0.9.1. Thanks

推荐答案

好了,挖一点到Spark和感谢别人的火花在用户列表上的指导我,我觉得我得到它:

Ok, digging a bit into Spark and thanks to someone guiding me on the spark user list I think i got it:

sc.newAPIHadoopFile("s3n://missingPattern/*", EmptiableTextInputFormat.class, LongWritable.class, Text.class, sc.hadoopConfiguration())
    .map(new Function<Tuple2<LongWritable, Text>, String>() {
        @Override
        public String call(Tuple2<LongWritable, Text> arg0) throws Exception {
            return arg0._2.toString();
        }
    })
    .count();

和由它来完成神奇的EmptiableTextInputFormat:

And the EmptiableTextInputFormat which does the magic:

import java.io.IOException;
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class EmptiableTextInputFormat extends TextInputFormat {
    @Override
    public List<InputSplit> getSplits(JobContext arg0) throws IOException {
        try {
            return super.getSplits(arg0);
        } catch (InvalidInputException e) {
            return Collections.<InputSplit> emptyList();
        }
    }
}

有人可能会最终检查InvalidInputException的消息律更precision。

One could eventually check the message of the InvalidInputException for a lil more precision.