Skip to content

Instantly share code, notes, and snippets.

@foamdino
Last active January 31, 2020 21:57
Show Gist options
  • Select an option

  • Save foamdino/d2197fb74e8b6769e051cf090066cbe9 to your computer and use it in GitHub Desktop.

Select an option

Save foamdino/d2197fb74e8b6769e051cf090066cbe9 to your computer and use it in GitHub Desktop.
download s3 files scala/alpakka
package service
import java.time.LocalDate
import akka.NotUsed
import akka.stream.Attributes.LogLevels
import akka.stream.alpakka.csv.scaladsl.{CsvFormatting, CsvQuotingStyle}
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.alpakka.s3.{ListBucketResultContents, MultipartUploadResult}
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{Attributes, Materializer}
import com.typesafe.scalalogging.Logger
import models.S3Models.S3File
import util.Util._
import scala.collection.immutable.Seq
import scala.concurrent.Future
class DefaultS3Service extends S3Service {
private val logger = Logger(getClass)
def findReportFileKey(env: String, date: LocalDate)(implicit materializer: Materializer): Future[Option[String]] = {
val prefix = s"$locationReportPath/${date.format(reportPathFormat)}"
val filter = s"location-report-$env-${date.format(reportFileFormat)}"
val objectSource: Source[ListBucketResultContents, NotUsed] = S3.listBucket(exportBucket(env), prefix = Some(prefix))
objectSource
.map(_.key)
.filter(_.contains(filter))
.runWith(Sink.headOption)
}
def downloadMovementFiles(env: String, dates: Seq[LocalDate])(implicit materializer: Materializer): Seq[Future[Option[S3File]]] = {
dates.map(date ⇒ s"$movementsPath/stock-movements-$env-$date.csv").map { key ⇒
downloadFile(env, key)
}
}
def downloadFile(env: String, key: String)(implicit materializer: Materializer): Future[Option[S3File]] = {
S3.download(exportBucket(env), key).map(_.flatMap {
case (source, meta) ⇒
logger.info(s"File [$key] found, size = [${fileSizeInMb(meta.contentLength)}Mb]")
Option(S3File(fileName(key), source))
}).runWith(Sink.head)
}
def uploadResults(env: String, key: String, reconciledCsv: Iterable[List[String]])(implicit mat: Materializer): Future[MultipartUploadResult] =
Source(reconciledCsv.toList)
.via(CsvFormatting.format(quotingStyle = CsvQuotingStyle.Required))
.log("[s3] - reconciled results stream")
.withAttributes(Attributes.logLevels(onFailure = LogLevels.Error, onFinish = LogLevels.Info, onElement = LogLevels.Off))
.runWith(S3.multipartUpload(exportBucket(env), key))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment