Last active
January 31, 2020 21:57
-
-
Save foamdino/d2197fb74e8b6769e051cf090066cbe9 to your computer and use it in GitHub Desktop.
download s3 files scala/alpakka
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package service | |
| import java.time.LocalDate | |
| import akka.NotUsed | |
| import akka.stream.Attributes.LogLevels | |
| import akka.stream.alpakka.csv.scaladsl.{CsvFormatting, CsvQuotingStyle} | |
| import akka.stream.alpakka.s3.scaladsl.S3 | |
| import akka.stream.alpakka.s3.{ListBucketResultContents, MultipartUploadResult} | |
| import akka.stream.scaladsl.{Sink, Source} | |
| import akka.stream.{Attributes, Materializer} | |
| import com.typesafe.scalalogging.Logger | |
| import models.S3Models.S3File | |
| import util.Util._ | |
| import scala.collection.immutable.Seq | |
| import scala.concurrent.Future | |
| class DefaultS3Service extends S3Service { | |
| private val logger = Logger(getClass) | |
| def findReportFileKey(env: String, date: LocalDate)(implicit materializer: Materializer): Future[Option[String]] = { | |
| val prefix = s"$locationReportPath/${date.format(reportPathFormat)}" | |
| val filter = s"location-report-$env-${date.format(reportFileFormat)}" | |
| val objectSource: Source[ListBucketResultContents, NotUsed] = S3.listBucket(exportBucket(env), prefix = Some(prefix)) | |
| objectSource | |
| .map(_.key) | |
| .filter(_.contains(filter)) | |
| .runWith(Sink.headOption) | |
| } | |
| def downloadMovementFiles(env: String, dates: Seq[LocalDate])(implicit materializer: Materializer): Seq[Future[Option[S3File]]] = { | |
| dates.map(date ⇒ s"$movementsPath/stock-movements-$env-$date.csv").map { key ⇒ | |
| downloadFile(env, key) | |
| } | |
| } | |
| def downloadFile(env: String, key: String)(implicit materializer: Materializer): Future[Option[S3File]] = { | |
| S3.download(exportBucket(env), key).map(_.flatMap { | |
| case (source, meta) ⇒ | |
| logger.info(s"File [$key] found, size = [${fileSizeInMb(meta.contentLength)}Mb]") | |
| Option(S3File(fileName(key), source)) | |
| }).runWith(Sink.head) | |
| } | |
| def uploadResults(env: String, key: String, reconciledCsv: Iterable[List[String]])(implicit mat: Materializer): Future[MultipartUploadResult] = | |
| Source(reconciledCsv.toList) | |
| .via(CsvFormatting.format(quotingStyle = CsvQuotingStyle.Required)) | |
| .log("[s3] - reconciled results stream") | |
| .withAttributes(Attributes.logLevels(onFailure = LogLevels.Error, onFinish = LogLevels.Info, onElement = LogLevels.Off)) | |
| .runWith(S3.multipartUpload(exportBucket(env), key)) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment