上传文件直接到S3块逐块用使用Iteratees播放斯卡拉斯卡、上传文件、块逐块用、Iteratees

2023-09-11 09:17:16 作者:青莲白雾

我曾经妄图直接上传文件使用Iteratees到S3。我还是新的函数式编程,并发现很难拼凑一些工作code。

我已经写它上传文件的过程块,并把它们发送到S3的iteratee。上传失败,在最后一个错误。

请帮我解决这个问题。

下面是code,我想出了

控制器处理器

 高清uploadFile = Action.async(BodyParser(RH => S3UploadHelper(斗名)s3Iteratee())){隐含请求=>
    未来 {
      如果(uploadLogger.isInfoEnabled)uploadLogger.info(的上传文件的内容:: \ N+ request.body)
      好吧(views.html.index(文件上传))
    }
  }
 

助手类

 案例类S3UploadHelper(斗:字符串键值:字符串= UUID.generate()){

  私人VAL AWS_ACCESS_KEY =
  私人VAL AWS_SECRET_KEY =
  私人VAL yourAWSCredentials =新BasicAWSCredentials(AWS_ACCESS_KEY,AWS_SECRET_KEY)
  VAL amazonS3Client =新AmazonS3Client(yourAWSCredentials)


  私人VAL initRequest =新InitiateMultipartUploadRequest(桶,键)
  私人VAL initResponse = amazonS3Client.initiateMultipartUpload(ini​​tRequest)
  VAL uploadId = initResponse.getUploadId

  VAL uploadLogger =记录仪(上载)

  高清s3Iteratee(ETag的:序号[PartETag] = Seq.empty [PartETag]):Iteratee [数组(字节),要么[结果,CompleteMultipartUploadResult] = {续
    案例:萨尔瓦多[数组[字节] =>
      //检索尚未在previous块处理的一部分,并将其复制到当前块的前
      VAL uploadRequest =新UploadPartRequest()
        .withBucketName(桶)
        .withKey(钥匙)
        .withPartNumber(etags.length + 1)的
        .withUploadId(uploadId)
        .withInputStream(新ByteArrayInputStream的(in.e))
        .withPartSize(in.e.length)
      如果(uploadLogger.isDebugEnabled)uploadLogger.debug(>>中+将String.valueOf(in.e))
      VAL ETAG =未来{amazonS3Client.uploadPart(uploadRequest).getPartETag}
      etag.map(ETag的:+ _)
      Await.result(ETAG,1.seconds)
      s3Iteratee(ETag的)
    案例@空=> s3Iteratee(ETag的)
    案例@ EOF =>
      进口scala.collection.JavaConversions._
      VAL COM prequest =新CompleteMultipartUploadRequest(桶,钥匙,uploadId,etags.toList)
      VAL结果= amazonS3Client.completeMultipartUpload(COM prequest)
      完成(右(结果),中)
    案例=> s3Iteratee(ETag的)
  }

}
 
手机WPS2019 怎样把长文档逐页输出为图片

虽然Iteratee似乎工作,我能够通过块处理文件块,上传失败,一个奇怪的错误。以下是日志

  [调试]上传 - >> [B @ 1df9048d
[调试]上传 - >> [B @ 152dcf59
[调试]上传 - >> [B @ 7cfeb0d8
[调试]上传 - >> [B @ 136844c5
[调试]上传 - >> [B @ 16f41590
[调试]上传 - >> [B @ 6dd85710
[调试]上传 - >> [B @ 64294203
[调试]上传 - >> [B @ 35366c2f
[调试]上传 - >> [B @ 358a78c
[调试]上传 - >> [B @ 2c171020
[调试]上传 - >> [B @ 20076fb
[调试]上传 - >> [B @ 4d13580
[调试]上传 - >> [B @ 42738651
[调试]上传 - >> [B @ 5671082f
[调试]上传 - >> [B @ 57c70bb4
[调试]上传 - >> [B @ 4154394f
[调试]上传 - >> [B @ 4f93cf15
[调试]上传 - >> [B @ 4bac523f
[调试]上传 - >> [B @ eaec52e
[调试]上传 - >> [B @ 6ed00bf5
[调试]上传 - >> [B @ 3f6a8a5d
[调试]上传 - >> [B @ 16fe1a25
[调试]上传 - >> [B @ 6e813a61
[调试]上传 - >> [B @ e01be7
[调试]上传 - >> [B @ 6bb351c4
[调试]上传 - >> [B @ dfa51a5
[调试]上传 - >> [B @ 6acf2049
[调试]上传 - >> [B @ 6a7021d4
[调试]上传 - >> [B @ 1b3c602f
[调试]上传 - >> [B @ 44146d94
[调试]上传 - >> [B @ 574ac037
[调试]上传 - >> [B @ 3cdf258b
[调试]上传 - >> [B @ 441a0727
[调试]上传 - >> [B @ 2385aafd
[调试]上传 - >> [B @ 224f9dc2
[调试]上传 - >> [B @ 6779077d
[调试]上传 - >> [B @ 734e178a
[调试]上传 - >> [B @ 7d92895c
[调试]上传 - >> [B @ 23edaaa1
[调试]上传 - >> [B @ c00134e
[调试]上传 - >> [B @ ff1a703
[错误]玩 - 无法调用动作,最终得到了一个错误:状态code:400,AWS服务:亚马逊S3,AWS请求ID:98h72s0EBA7653AD,AWS错误code:MalformedXML,AWS错误消息:XML你只要不是良好的,或没有验证对我们发布的架构,S3扩展请求ID:R7e44g8oRy5b4xd7MU ++ atibwrBSRFezeMxNCXE38gyzcwci5Zf
[错误]应用 - 

! @ 6k2maob49  - 内部服务器错误,(POST)/ V1 / file_upload]  - >

play.api.Application $$不久$ 1:执行异常[AmazonS3Exception:您提供的XML并没有良好的或没有验证对我们发布的架构]
        在play.api.Application $ class.handleError(Application.scala:296)〜[play_2.10-2.3.2.jar:2.3.2]
        在play.api.DefaultApplication.handleError(Application.scala:402)[play_2.10-2.3.2.jar:2.3.2]
        在play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320) [play_2.10-2.3.2.jar:2.3.2]
        在play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320) [play_2.10-2.3.2.jar:2.3.2]
        在scala.Option.map(Option.scala:145)[斯卡拉-library.jar:NA]
致:com.amazonaws.services.s3.model.AmazonS3Exception:您提供的XML并没有良好的或没有验证对我们发布的架构
        在com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:556)〜[AWS-java的SDK-1.3.11.jar:NA]
        在com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:289)〜[AWS-java的SDK-1.3.11.jar:NA]
        在com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:170)〜[AWS-java的SDK-1.3.11.jar:NA]
        在com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:2723)〜[AWS-java的SDK-1.3.11.jar:NA]
        在com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:1964)〜[AWS-java的SDK-1.3.11.jar:NA]
 

解决方案

我已经做到了这一点,亚马逊S3需要5MB的数据块,我回来元组在最后,你可以改变按您的要求。

  VAL客户端=新AmazonS3Client(新BasicAWSCredentials(access_key,SECRET_KEY))

高清my_parser = BodyParser {

VAL consume_5MB = Traversable.takeUpTo [数组[字节]](1024 * 1024 * 5)及>> Iteratee.consume()
VAL rechunkAdapter:Enumeratee [数组[字节]数组[字节] = Enumeratee.grouped(consume_5MB)

multipartFormData(Multipart.handleFilePart({

  案例Multipart.FileInfo(零件名称,FILE_NAME,CONTENT_TYPE)=> {

    VAL的object_id = java.util.UUID.randomUUID()的toString()的replaceAll。( - ,)
    VAL object_id_key = IF(content_type.getOrElse()。包括(视频)|| content_type.getOrElse()。包括(声音))OBJECT_ID其他OBJECT_ID + file_name.substring(file_name.lastIndexOf('。 '))
    VAR位置= 0
    VAL的ETag =新的java.util.ArrayList [PartETag]()

    VAL initRequest =新InitiateMultipartUploadRequest(桶,object_id_key)
    VAL initResponse = client.initiateMultipartUpload(ini​​tRequest)
    的println(文件名=+ FILE_NAME)
    的println(的contentType =+ CONTENT_TYPE)

    (rechunkAdapter和放大器;>> Iteratee.foldM [数组(字节),INT](1){(C,字节)=>
      未来 {
        的println(得到了一大块!+ C +大小(KB):+(bytes.length / 1024));
        VAL是=新java.io.ByteArrayInputStream中(字节)

        VAL uploadRequest =新UploadPartRequest()
          .withBucketName(桶).w​​ithKey(object_id_key)
          .withUploadId(ini​​tResponse.getUploadId())
          .withPartNumber(三)
          .withFileOffset(位置)
          .withInputStream(是)
          .withPartSize(bytes.length)

        etags.add(client.uploadPart(uploadRequest).getPartETag)
        位置=位置+ bytes.length

        C + 1
      }
    })地图{V =>
      尝试 {
        VAL COM prequest =新CompleteMultipartUploadRequest(
          桶,
          object_id_key,
          initResponse.getUploadId(),
          ETag的)
        client.completeMultipartUpload(COM prequest)
        的println(上传完毕+ FILE_NAME)
        client.setObjectAcl(桶,object_id_key,com.amazonaws.services.s3.model.CannedAccessControlList.PublicRead)
        (object_id_key,FILE_NAME,content_type.getOrElse(应用程序/八位字节流))
      } 抓住 {
        方案E:异常=> {
          的println(S3上传前+ e.getMessage())
          VAL abortMPURequest =新AbortMultipartUploadRequest(XXXXXXX,OBJECT_ID,initResponse.getUploadId())
          client.abortMultipartUpload(abortMPURequest);
         (错误,FILE_NAME,content_type.getOrElse(应用程序/八位字节流))
        }
      }
    }
  }
}))
 

}

I have tried in vain to upload files directly to s3 using Iteratees. I am still new to functional programming, and finding it hard to piece together some working code.

I have written an iteratee which process chunks of the uploaded file and sends them to S3. The upload fails at the end with an error.

Please help me fix this.

Below is the code I came up with

Controller Handler

  def uploadFile = Action.async(BodyParser(rh => S3UploadHelper("bucket-name").s3Iteratee() ))  { implicit request =>
    Future {
      if(uploadLogger.isInfoEnabled) uploadLogger.info(s"Contents of Uploaded file :: \n " + request.body)
      Ok(views.html.index("File uploaded"))
    }
  }

Helper Class

case class S3UploadHelper(bucket: String, key: String = UUID.generate()) {

  private val AWS_ACCESS_KEY = ""
  private val AWS_SECRET_KEY = ""
  private val yourAWSCredentials = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY)
  val amazonS3Client = new AmazonS3Client(yourAWSCredentials)


  private val initRequest = new InitiateMultipartUploadRequest(bucket, key)
  private val initResponse = amazonS3Client.initiateMultipartUpload(initRequest)
  val uploadId = initResponse.getUploadId

  val uploadLogger = Logger("upload")

  def s3Iteratee(etags: Seq[PartETag] = Seq.empty[PartETag]): Iteratee[Array[Byte], Either[Result, CompleteMultipartUploadResult]] = Cont {
    case in: El[Array[Byte]] =>
      // Retrieve the part that has not been processed in the previous chunk and copy it in front of the current chunk
      val uploadRequest = new UploadPartRequest()
        .withBucketName(bucket)
        .withKey(key)
        .withPartNumber(etags.length + 1)
        .withUploadId(uploadId)
        .withInputStream(new ByteArrayInputStream(in.e))
        .withPartSize(in.e.length)
      if(uploadLogger.isDebugEnabled) uploadLogger.debug(">> " + String.valueOf(in.e))
      val etag = Future { amazonS3Client.uploadPart(uploadRequest).getPartETag }
      etag.map(etags :+ _)
      Await.result(etag, 1.seconds)
      s3Iteratee(etags)
    case in @ Empty => s3Iteratee(etags)
    case in @ EOF =>
      import scala.collection.JavaConversions._
      val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toList)
      val result = amazonS3Client.completeMultipartUpload(compRequest)
      Done(Right(result), in)
    case in => s3Iteratee(etags)
  }

}

Although the Iteratee seems to work and I am able to process the file chunk by chunk, the upload fails with a weird error. Here are the logs

[debug] upload - >> [B@1df9048d
[debug] upload - >> [B@152dcf59
[debug] upload - >> [B@7cfeb0d8
[debug] upload - >> [B@136844c5
[debug] upload - >> [B@16f41590
[debug] upload - >> [B@6dd85710
[debug] upload - >> [B@64294203
[debug] upload - >> [B@35366c2f
[debug] upload - >> [B@358a78c
[debug] upload - >> [B@2c171020
[debug] upload - >> [B@20076fb
[debug] upload - >> [B@4d13580
[debug] upload - >> [B@42738651
[debug] upload - >> [B@5671082f
[debug] upload - >> [B@57c70bb4
[debug] upload - >> [B@4154394f
[debug] upload - >> [B@4f93cf15
[debug] upload - >> [B@4bac523f
[debug] upload - >> [B@eaec52e
[debug] upload - >> [B@6ed00bf5
[debug] upload - >> [B@3f6a8a5d
[debug] upload - >> [B@16fe1a25
[debug] upload - >> [B@6e813a61
[debug] upload - >> [B@e01be7
[debug] upload - >> [B@6bb351c4
[debug] upload - >> [B@dfa51a5
[debug] upload - >> [B@6acf2049
[debug] upload - >> [B@6a7021d4
[debug] upload - >> [B@1b3c602f
[debug] upload - >> [B@44146d94
[debug] upload - >> [B@574ac037
[debug] upload - >> [B@3cdf258b
[debug] upload - >> [B@441a0727
[debug] upload - >> [B@2385aafd
[debug] upload - >> [B@224f9dc2
[debug] upload - >> [B@6779077d
[debug] upload - >> [B@734e178a
[debug] upload - >> [B@7d92895c
[debug] upload - >> [B@23edaaa1
[debug] upload - >> [B@c00134e
[debug] upload - >> [B@ff1a703
[error] play - Cannot invoke the action, eventually got an error: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 98h72s0EBA7653AD, AWS Error Code: MalformedXML, AWS Error Message: The XML you provided was not well-formed or did not validate against our published schema, S3 Extended Request ID: R7e44g8oRy5b4xd7MU++atibwrBSRFezeMxNCXE38gyzcwci5Zf
[error] application - 

! @6k2maob49 - Internal server error, for (POST) [/v1/file_upload] ->

play.api.Application$$anon$1: Execution exception[[AmazonS3Exception: The XML you provided was not well-formed or did not validate against our published schema]]
        at play.api.Application$class.handleError(Application.scala:296) ~[play_2.10-2.3.2.jar:2.3.2]
        at play.api.DefaultApplication.handleError(Application.scala:402) [play_2.10-2.3.2.jar:2.3.2]
        at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320) [play_2.10-2.3.2.jar:2.3.2]
        at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320) [play_2.10-2.3.2.jar:2.3.2]
        at scala.Option.map(Option.scala:145) [scala-library.jar:na]
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The XML you provided was not well-formed or did not validate against our published schema
        at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:556) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:289) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:170) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:2723) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:1964) ~[aws-java-sdk-1.3.11.jar:na]

解决方案

I have done this in past, Amazon s3 needs 5Mb chunks, I was returning tuple at last, you could change as per your requirement.

val client = new AmazonS3Client(new BasicAWSCredentials(access_key, secret_key))

def my_parser = BodyParser { 

val consume_5MB = Traversable.takeUpTo[Array[Byte]](1024 * 1024 * 5) &>> Iteratee.consume()
val rechunkAdapter: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped(consume_5MB)

multipartFormData(Multipart.handleFilePart({

  case Multipart.FileInfo(partName, file_name, content_type) => {

    val object_id = java.util.UUID.randomUUID().toString().replaceAll("-", "")
    val object_id_key = if (content_type.getOrElse("").contains("video") || content_type.getOrElse("").contains("audio")) object_id else object_id + file_name.substring(file_name.lastIndexOf('.'))
    var position = 0
    val etags = new java.util.ArrayList[PartETag]()

    val initRequest = new InitiateMultipartUploadRequest(bucket, object_id_key)
    val initResponse = client.initiateMultipartUpload(initRequest)
    println("fileName = " + file_name)
    println("contentType = " + content_type)

    (rechunkAdapter &>> Iteratee.foldM[Array[Byte], Int](1) { (c, bytes) =>
      Future {
        println("got a chunk!  :" + c + " size in KB: " + (bytes.length / 1024));
        val is = new java.io.ByteArrayInputStream(bytes)

        val uploadRequest = new UploadPartRequest()
          .withBucketName(bucket).withKey(object_id_key)
          .withUploadId(initResponse.getUploadId())
          .withPartNumber(c)
          .withFileOffset(position)
          .withInputStream(is)
          .withPartSize(bytes.length)

        etags.add(client.uploadPart(uploadRequest).getPartETag)
        position = position + bytes.length

        c + 1
      }
    }).map { v =>
      try {
        val compRequest = new CompleteMultipartUploadRequest(
          bucket,
          object_id_key,
          initResponse.getUploadId(),
          etags)
        client.completeMultipartUpload(compRequest)
        println("Finished uploading " + file_name)   
        client.setObjectAcl(bucket, object_id_key, com.amazonaws.services.s3.model.CannedAccessControlList.PublicRead)
        (object_id_key, file_name, content_type.getOrElse("application/octet-stream")) 
      } catch {
        case e: Exception => {
          println("S3 upload Ex " + e.getMessage())
          val abortMPURequest = new AbortMultipartUploadRequest("xxxxxxx", object_id, initResponse.getUploadId())
          client.abortMultipartUpload(abortMPURequest);
         ("error", file_name, content_type.getOrElse("application/octet-stream"))
        }
      }
    }
  }
}))

}

 
精彩推荐
图片推荐