From 5cc5e4c43c469ce0e68e22870c84e896f5887191 Mon Sep 17 00:00:00 2001 From: Lee Harding Date: Sat, 16 Jan 2021 21:02:33 -0800 Subject: [PATCH] Initial commit. --- .github/workflows/publish.yml | 36 ++ .gitignore | 433 ++++++++++++++++++ .../DecompressGzAndUpload.csproj | 16 + Examples/DecompressGzAndUpload/Function.cs | 52 +++ Examples/DecompressGzAndUpload/Readme.md | 52 +++ .../aws-lambda-tools-defaults.json | 13 + Examples/DecompressGzAndUpload/payload.json | 1 + Examples/DecompressGzAndUpload/response.json | 1 + .../DecompressGzAndUpload/serverless.template | 41 ++ .../DecryptAesAndUpload.csproj | 16 + Examples/DecryptAesAndUpload/Function.cs | 57 +++ Examples/DecryptAesAndUpload/Readme.md | 52 +++ .../aws-lambda-tools-defaults.json | 13 + .../DecryptAesAndUpload/serverless.template | 41 ++ .../DecryptPgpAndUpload.csproj | 18 + Examples/DecryptPgpAndUpload/Function.cs | 56 +++ Examples/DecryptPgpAndUpload/Readme.md | 55 +++ .../aws-lambda-tools-defaults.json | 13 + .../DecryptPgpAndUpload/serverless.template | 41 ++ Examples/Readme.md | 39 ++ Examples/ZipS3ListToS3/Function.cs | 57 +++ Examples/ZipS3ListToS3/Readme.md | 56 +++ Examples/ZipS3ListToS3/ZipS3ListToS3.csproj | 16 + .../aws-lambda-tools-defaults.json | 13 + Examples/ZipS3ListToS3/serverless.template | 41 ++ Readme.md | 7 + S3UploadStream.sln | 111 +++++ S3UploadStream/S3UploadStream.cs | 185 ++++++++ S3UploadStream/S3UploadStream.csproj | 20 + 29 files changed, 1552 insertions(+) create mode 100644 .github/workflows/publish.yml create mode 100644 .gitignore create mode 100644 Examples/DecompressGzAndUpload/DecompressGzAndUpload.csproj create mode 100644 Examples/DecompressGzAndUpload/Function.cs create mode 100644 Examples/DecompressGzAndUpload/Readme.md create mode 100644 Examples/DecompressGzAndUpload/aws-lambda-tools-defaults.json create mode 100644 Examples/DecompressGzAndUpload/payload.json create mode 100644 Examples/DecompressGzAndUpload/response.json create mode 100644 Examples/DecompressGzAndUpload/serverless.template create mode 100644 Examples/DecryptAesAndUpload/DecryptAesAndUpload.csproj create mode 100644 Examples/DecryptAesAndUpload/Function.cs create mode 100644 Examples/DecryptAesAndUpload/Readme.md create mode 100644 Examples/DecryptAesAndUpload/aws-lambda-tools-defaults.json create mode 100644 Examples/DecryptAesAndUpload/serverless.template create mode 100644 Examples/DecryptPgpAndUpload/DecryptPgpAndUpload.csproj create mode 100644 Examples/DecryptPgpAndUpload/Function.cs create mode 100644 Examples/DecryptPgpAndUpload/Readme.md create mode 100644 Examples/DecryptPgpAndUpload/aws-lambda-tools-defaults.json create mode 100644 Examples/DecryptPgpAndUpload/serverless.template create mode 100644 Examples/Readme.md create mode 100644 Examples/ZipS3ListToS3/Function.cs create mode 100644 Examples/ZipS3ListToS3/Readme.md create mode 100644 Examples/ZipS3ListToS3/ZipS3ListToS3.csproj create mode 100644 Examples/ZipS3ListToS3/aws-lambda-tools-defaults.json create mode 100644 Examples/ZipS3ListToS3/serverless.template create mode 100644 Readme.md create mode 100644 S3UploadStream.sln create mode 100644 S3UploadStream/S3UploadStream.cs create mode 100644 S3UploadStream/S3UploadStream.csproj diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..f2ef6af --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,36 @@ +name: Publish NuGet + +on: + release: + types: + - created + +jobs: + publish: + runs-on: ubuntu-latest + name: Upload NuGet Package + steps: + + - name: Checkout repository + uses: actions/checkout@v1 + + - name: Setup .NET Core @ Latest + uses: actions/setup-dotnet@v1 + with: + source-url: https://nuget.pkg.github.com/mlhpdx/index.json + env: + NUGET_AUTH_TOKEN: ${{secrets.GITHUB_TOKEN}} + + - name: Get the version + id: get_version + run: echo ::set-output name=VERSION::${GITHUB_REF#refs/tags/} + + - name: Build solution and generate NuGet package + run: | + cd S3UploadStream + dotnet pack -c Release -o out /p:Version=$VERSION + env: + VERSION: ${{ steps.get_version.outputs.VERSION }} + + - name: Push generated package to GitHub registry + run: dotnet nuget push ./S3UploadStream/out/*.nupkg --skip-duplicate --no-symbols true diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..21d9bca --- /dev/null +++ b/.gitignore @@ -0,0 +1,433 @@ +## Ignore Visual Studio temporary files, build results, and +## files generated by popular Visual Studio add-ons. +## +## Get latest from https://github.com/github/gitignore/blob/master/VisualStudio.gitignore + +# User-specific files +*.rsuser +*.suo +*.user +*.userosscache +*.sln.docstates + +# User-specific files (MonoDevelop/Xamarin Studio) +*.userprefs + +# Mono auto generated files +mono_crash.* + +# Build results +[Dd]ebug/ +[Dd]ebugPublic/ +[Rr]elease/ +[Rr]eleases/ +x64/ +x86/ +[Aa][Rr][Mm]/ +[Aa][Rr][Mm]64/ +bld/ +[Bb]in/ +[Oo]bj/ +[Ll]og/ + +# Visual Studio 2015/2017 cache/options directory +.vs/ +# Uncomment if you have tasks that create the project's static files in wwwroot +#wwwroot/ + +# Visual Studio 2017 auto generated files +Generated\ Files/ + +# MSTest test Results +[Tt]est[Rr]esult*/ +[Bb]uild[Ll]og.* + +# NUNIT +*.VisualState.xml +TestResult.xml + +# Build Results of an ATL Project +[Dd]ebugPS/ +[Rr]eleasePS/ +dlldata.c + +# Benchmark Results +BenchmarkDotNet.Artifacts/ + +# .NET Core +project.lock.json +project.fragment.lock.json +artifacts/ + +# StyleCop +StyleCopReport.xml + +# Files built by Visual Studio +*_i.c +*_p.c +*_h.h +*.ilk +*.meta +*.obj +*.iobj +*.pch +*.pdb +*.ipdb +*.pgc +*.pgd +*.rsp +*.sbr +*.tlb +*.tli +*.tlh +*.tmp +*.tmp_proj +*_wpftmp.csproj +*.log +*.vspscc +*.vssscc +.builds +*.pidb +*.svclog +*.scc + +# Chutzpah Test files +_Chutzpah* + +# Visual C++ cache files +ipch/ +*.aps +*.ncb +*.opendb +*.opensdf +*.sdf +*.cachefile +*.VC.db +*.VC.VC.opendb + +# Visual Studio profiler +*.psess +*.vsp +*.vspx +*.sap + +# Visual Studio Trace Files +*.e2e + +# TFS 2012 Local Workspace +$tf/ + +# Guidance Automation Toolkit +*.gpState + +# ReSharper is a .NET coding add-in +_ReSharper*/ +*.[Rr]e[Ss]harper +*.DotSettings.user + +# JustCode is a .NET coding add-in +.JustCode + +# TeamCity is a build add-in +_TeamCity* + +# DotCover is a Code Coverage Tool +*.dotCover + +# AxoCover is a Code Coverage Tool +.axoCover/* +!.axoCover/settings.json + +# Visual Studio code coverage results +*.coverage +*.coveragexml + +# NCrunch +_NCrunch_* +.*crunch*.local.xml +nCrunchTemp_* + +# MightyMoose +*.mm.* +AutoTest.Net/ + +# Web workbench (sass) +.sass-cache/ + +# Installshield output folder +[Ee]xpress/ + +# DocProject is a documentation generator add-in +DocProject/buildhelp/ +DocProject/Help/*.HxT +DocProject/Help/*.HxC +DocProject/Help/*.hhc +DocProject/Help/*.hhk +DocProject/Help/*.hhp +DocProject/Help/Html2 +DocProject/Help/html + +# Click-Once directory +publish/ + +# Publish Web Output +*.[Pp]ublish.xml +*.azurePubxml +# Note: Comment the next line if you want to checkin your web deploy settings, +# but database connection strings (with potential passwords) will be unencrypted +*.pubxml +*.publishproj + +# Microsoft Azure Web App publish settings. Comment the next line if you want to +# checkin your Azure Web App publish settings, but sensitive information contained +# in these scripts will be unencrypted +PublishScripts/ + +# NuGet Packages +*.nupkg +# The packages folder can be ignored because of Package Restore +**/[Pp]ackages/* +# except build/, which is used as an MSBuild target. +!**/[Pp]ackages/build/ +# Uncomment if necessary however generally it will be regenerated when needed +#!**/[Pp]ackages/repositories.config +# NuGet v3's project.json files produces more ignorable files +*.nuget.props +*.nuget.targets + +# Microsoft Azure Build Output +csx/ +*.build.csdef + +# Microsoft Azure Emulator +ecf/ +rcf/ + +# Windows Store app package directories and files +AppPackages/ +BundleArtifacts/ +Package.StoreAssociation.xml +_pkginfo.txt +*.appx +*.appxbundle +*.appxupload + +# Visual Studio cache files +# files ending in .cache can be ignored +*.[Cc]ache +# but keep track of directories ending in .cache +!?*.[Cc]ache/ + +# Others +ClientBin/ +~$* +*~ +*.dbmdl +*.dbproj.schemaview +*.jfm +*.pfx +*.publishsettings +orleans.codegen.cs + +# Including strong name files can present a security risk +# (https://github.com/github/gitignore/pull/2483#issue-259490424) +#*.snk + +# Since there are multiple workflows, uncomment next line to ignore bower_components +# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622) +#bower_components/ + +# RIA/Silverlight projects +Generated_Code/ + +# Backup & report files from converting an old project file +# to a newer Visual Studio version. Backup files are not needed, +# because we have git ;-) +_UpgradeReport_Files/ +Backup*/ +UpgradeLog*.XML +UpgradeLog*.htm +ServiceFabricBackup/ +*.rptproj.bak + +# SQL Server files +*.mdf +*.ldf +*.ndf + +# Business Intelligence projects +*.rdl.data +*.bim.layout +*.bim_*.settings +*.rptproj.rsuser +*- Backup*.rdl + +# Microsoft Fakes +FakesAssemblies/ + +# GhostDoc plugin setting file +*.GhostDoc.xml + +# Node.js Tools for Visual Studio +.ntvs_analysis.dat +node_modules/ + +# Visual Studio 6 build log +*.plg + +# Visual Studio 6 workspace options file +*.opt + +# Visual Studio 6 auto-generated workspace file (contains which files were open etc.) +*.vbw + +# Visual Studio LightSwitch build output +**/*.HTMLClient/GeneratedArtifacts +**/*.DesktopClient/GeneratedArtifacts +**/*.DesktopClient/ModelManifest.xml +**/*.Server/GeneratedArtifacts +**/*.Server/ModelManifest.xml +_Pvt_Extensions + +# Paket dependency manager +.paket/paket.exe +paket-files/ + +# FAKE - F# Make +.fake/ + +# CodeRush personal settings +.cr/personal + +# Python Tools for Visual Studio (PTVS) +__pycache__/ +*.pyc + +# Cake - Uncomment if you are using it +# tools/** +# !tools/packages.config + +# Tabs Studio +*.tss + +# Telerik's JustMock configuration file +*.jmconfig + +# BizTalk build output +*.btp.cs +*.btm.cs +*.odx.cs +*.xsd.cs + +# OpenCover UI analysis results +OpenCover/ + +# Azure Stream Analytics local run output +ASALocalRun/ + +# MSBuild Binary and Structured Log +*.binlog + +# NVidia Nsight GPU debugger configuration file +*.nvuser + +# MFractors (Xamarin productivity tool) working folder +.mfractor/ + +# Local History for Visual Studio +.localhistory/ + +# BeatPulse healthcheck temp database +healthchecksdb + +# Backup folder for Package Reference Convert tool in Visual Studio 2017 +MigrationBackup/ + +## +## Visual studio for Mac +## + + +# globs +Makefile.in +*.userprefs +*.usertasks +config.make +config.status +aclocal.m4 +install-sh +autom4te.cache/ +*.tar.gz +tarballs/ +test-results/ + +# Mac bundle stuff +*.dmg +*.app + +# content below from: https://github.com/github/gitignore/blob/master/Global/macOS.gitignore +# General +.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk + +# content below from: https://github.com/github/gitignore/blob/master/Global/Windows.gitignore +# Windows thumbnail cache files +Thumbs.db +ehthumbs.db +ehthumbs_vista.db + +# Dump file +*.stackdump + +# Folder config file +[Dd]esktop.ini + +# Recycle Bin used on file shares +$RECYCLE.BIN/ + +# Windows Installer files +*.cab +*.msi +*.msix +*.msm +*.msp + +# Windows shortcuts +*.lnk + +# JetBrains Rider +.idea/ +*.sln.iml + +## +## Visual Studio Code +## +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json diff --git a/Examples/DecompressGzAndUpload/DecompressGzAndUpload.csproj b/Examples/DecompressGzAndUpload/DecompressGzAndUpload.csproj new file mode 100644 index 0000000..24b12a9 --- /dev/null +++ b/Examples/DecompressGzAndUpload/DecompressGzAndUpload.csproj @@ -0,0 +1,16 @@ + + + netcoreapp3.1 + true + Lambda + + + + + + + + + + + \ No newline at end of file diff --git a/Examples/DecompressGzAndUpload/Function.cs b/Examples/DecompressGzAndUpload/Function.cs new file mode 100644 index 0000000..1183cb3 --- /dev/null +++ b/Examples/DecompressGzAndUpload/Function.cs @@ -0,0 +1,52 @@ +using System.Threading.Tasks; +using System.Text.Json; +using System.Diagnostics; +using Amazon.Lambda.Core; +using Amazon.S3; +using System.Security.Cryptography; +using System; +using Cppl.Utilities.AWS; +using System.IO.Compression; + +// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. +[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] + +namespace DecompressGzAndUpload +{ + public class Functions + { + readonly static string BUCKET = System.Environment.GetEnvironmentVariable("BUCKET_NAME"); + const string NAME = "csv_1GB"; + const string GZ_DATA = NAME + ".gz"; + const string OUTPUT_KEY = NAME + ".csv"; + + public Functions() + { + } + + public async Task Process(JsonDocument request, ILambdaContext context) + { + var s3 = new AmazonS3Client(); + + // easier than doing math on the timestamps in logs + var timer = new Stopwatch(); + timer.Start(); + + context.Logger.LogLine($"{timer.Elapsed}: Getting started."); + using var stream = (await s3.GetObjectAsync(BUCKET, GZ_DATA)).ResponseStream; + + using var decompress = new GZipStream(stream, CompressionMode.Decompress); + using (var output = new S3UploadStream(s3, BUCKET, OUTPUT_KEY)) { + await decompress.CopyToAsync(output); + } + context.Logger.LogLine($"{timer.Elapsed}: Done copying."); + timer.Stop(); + + return new { + AesFile = $"s3://{BUCKET}/{GZ_DATA}", + CsvFile = $"s3://{BUCKET}/{OUTPUT_KEY}", + Status = "ok" + }; + } + } +} diff --git a/Examples/DecompressGzAndUpload/Readme.md b/Examples/DecompressGzAndUpload/Readme.md new file mode 100644 index 0000000..6b149b2 --- /dev/null +++ b/Examples/DecompressGzAndUpload/Readme.md @@ -0,0 +1,52 @@ +# Streaming Gzip Decompression from and to S3 + +This example demonstrates a Lambda function that decompresses a file in S3 that is much larger than the available RAM, and uploads the content back to S3 using a `S3UploadStream`. In this case the Lambda is intentionally under-provisioned with ony 512 MB of RAM while the file to be processed is around 1GB. Processing such a file with a naiive solution where it is read and decompressed into RAM (likely using a `MemoryStream`) would obviously not work here, and saving to an attached drive (EFS) would be slow at best. The example file is randomly-generated CSV content compressed with gzip. + +**NOTE**: Running this Lambda processes a very large amount of data, and will cost you money. Keep that in mind, please. Also note that this example reads and WRITES to the bucket you specify, hense following the instructions to create a new bucket for these examples is probably a good idea -- see the parent folder's [Readme.md](../Readme.md) for more information. + +## Deploying and Running from the Command Line + +Deploy application (run from the project directory, see NOTE about about the bucket) +``` + export STACK=s3-upload-stream-example-gzip + dotnet lambda deploy-serverless --stack-name $STACK --s3-bucket $BUCKET --template-parameters Bucket=$BUCKET +``` + +Invoking the example function (it will run for a couple minutes) +``` + export LAMBDA=$(aws cloudformation describe-stack-resources --stack-name $STACK --query "StackResources[? LogicalResourceId == 'Process'].PhysicalResourceId" --output text) + aws lambda invoke --function-name $LAMBDA --cli-read-timeout 900 --log-type Tail --query LogResult --output text payload.json > log.txt +``` + +View the resulting payload, and logs +``` + more payload.json + cat log.txt | base64 -d +``` + +Cleanup the stack +``` + aws cloudformation delete-stack --stack-name $STACK +``` + +## Results + +Here are the log messages and output of a (second, not cold-start) run of the function: + +``` +START RequestId: 1f598d78-90c6-4b51-a441-6b5eee5b48b3 Version: $LATEST +00:00:00.0000588: Getting started. +00:01:35.7067260: Done copying. +END RequestId: 1f598d78-90c6-4b51-a441-6b5eee5b48b3 +REPORT RequestId: 1f598d78-90c6-4b51-a441-6b5eee5b48b3 Duration: 96440.03 ms Billed Duration: 96441 ms Memory Size: 512 MB Max Memory Used: 512 MB Init Duration: 201.62 ms +``` + +For completeness, here is the function output payload: + +```json +{ + "AesFile": "s3://s3-upload-examples-5i2iwewz/csv_1GB.gz", + "CsvFile": "s3://s3-upload-examples-5i2iwewz/csv_1GB.csv", + "Status": "ok" +} +``` diff --git a/Examples/DecompressGzAndUpload/aws-lambda-tools-defaults.json b/Examples/DecompressGzAndUpload/aws-lambda-tools-defaults.json new file mode 100644 index 0000000..0381919 --- /dev/null +++ b/Examples/DecompressGzAndUpload/aws-lambda-tools-defaults.json @@ -0,0 +1,13 @@ +{ + "Information": [ + "This file provides default values for the deployment wizard inside Visual Studio and the AWS Lambda commands added to the .NET Core CLI.", + "To learn more about the Lambda commands with the .NET Core CLI execute the following command at the command line in the project root directory.", + "dotnet lambda help", + "All the command line options for the Lambda command can be specified in this file." + ], + "configuration": "Release", + "framework": "netcoreapp3.1", + "s3-prefix": "DecompressGzAndUpload/", + "template": "serverless.template", + "template-parameters": "" +} \ No newline at end of file diff --git a/Examples/DecompressGzAndUpload/payload.json b/Examples/DecompressGzAndUpload/payload.json new file mode 100644 index 0000000..405e1b7 --- /dev/null +++ b/Examples/DecompressGzAndUpload/payload.json @@ -0,0 +1 @@ +{"AesFile":"s3://s3-upload-examples-5i2iwewz/csv_1GB.gz","CsvFile":"s3://s3-upload-examples-5i2iwewz/csv_1GB.csv","Status":"ok"} \ No newline at end of file diff --git a/Examples/DecompressGzAndUpload/response.json b/Examples/DecompressGzAndUpload/response.json new file mode 100644 index 0000000..bed6905 --- /dev/null +++ b/Examples/DecompressGzAndUpload/response.json @@ -0,0 +1 @@ +"U1RBUlQgUmVxdWVzdElkOiA3OGY0YmNmMy00NDMzLTRmMjMtODljOC02NDBiOTUwYTNlNDcgVmVyc2lvbjogJExBVEVTVAowMDowMDowMC4wMDAwMDc1OiBHZXR0aW5nIHN0YXJ0ZWQuClRoZSBzcGVjaWZpZWQga2V5IGRvZXMgbm90IGV4aXN0LjogQW1hem9uUzNFeGNlcHRpb24KICAgYXQgQW1hem9uLlJ1bnRpbWUuSW50ZXJuYWwuSHR0cEVycm9yUmVzcG9uc2VFeGNlcHRpb25IYW5kbGVyLkhhbmRsZUV4Y2VwdGlvbihJRXhlY3V0aW9uQ29udGV4dCBleGVjdXRpb25Db250ZXh0LCBIdHRwRXJyb3JSZXNwb25zZUV4Y2VwdGlvbiBleGNlcHRpb24pCiAgIGF0IEFtYXpvbi5SdW50aW1lLkludGVybmFsLkV4Y2VwdGlvbkhhbmRsZXJgMS5IYW5kbGUoSUV4ZWN1dGlvbkNvbnRleHQgZXhlY3V0aW9uQ29udGV4dCwgRXhjZXB0aW9uIGV4Y2VwdGlvbikKICAgYXQgQW1hem9uLlJ1bnRpbWUuSW50ZXJuYWwuRXJyb3JIYW5kbGVyLlByb2Nlc3NFeGNlcHRpb24oSUV4ZWN1dGlvbkNvbnRleHQgZXhlY3V0aW9uQ29udGV4dCwgRXhjZXB0aW9uIGV4Y2VwdGlvbikKICAgYXQgQW1hem9uLlJ1bnRpbWUuSW50ZXJuYWwuRXJyb3JIYW5kbGVyLkludm9rZUFzeW5jW1RdKElFeGVjdXRpb25Db250ZXh0IGV4ZWN1dGlvbkNvbnRleHQpCiAgIGF0IEFtYXpvbi5SdW50aW1lLkludGVybmFsLkNhbGxiYWNrSGFuZGxlci5JbnZva2VBc3luY1tUXShJRXhlY3V0aW9uQ29udGV4dCBleGVjdXRpb25Db250ZXh0KQogICBhdCBBbWF6b24uUnVudGltZS5JbnRlcm5hbC5FbmRwb2ludERpc2NvdmVyeUhhbmRsZXIuSW52b2tlQXN5bmNbVF0oSUV4ZWN1dGlvbkNvbnRleHQgZXhlY3V0aW9uQ29udGV4dCkKICAgYXQgQW1hem9uLlJ1bnRpbWUuSW50ZXJuYWwuRW5kcG9pbnREaXNjb3ZlcnlIYW5kbGVyLkludm9rZUFzeW5jW1RdKElFeGVjdXRpb25Db250ZXh0IGV4ZWN1dGlvbkNvbnRleHQpCiAgIGF0IEFtYXpvbi5SdW50aW1lLkludGVybmFsLkNyZWRlbnRpYWxzUmV0cmlldmVyLkludm9rZUFzeW5jW1RdKElFeGVjdXRpb25Db250ZXh0IGV4ZWN1dGlvbkNvbnRleHQpCiAgIGF0IEFtYXpvbi5SdW50aW1lLkludGVybmFsLlJldHJ5SGFuZGxlci5JbnZva2VBc3luY1tUXShJRXhlY3V0aW9uQ29udGV4dCBleGVjdXRpb25Db250ZXh0KQogICBhdCBBbWF6b24uUnVudGltZS5JbnRlcm5hbC5SZXRyeUhhbmRsZXIuSW52b2tlQXN5bmNbVF0oSUV4ZWN1dGlvbkNvbnRleHQgZXhlY3V0aW9uQ29udGV4dCkKICAgYXQgQW1hem9uLlJ1bnRpbWUuSW50ZXJuYWwuQ2FsbGJhY2tIYW5kbGVyLkludm9rZUFzeW5jW1RdKElFeGVjdXRpb25Db250ZXh0IGV4ZWN1dGlvbkNvbnRleHQpCiAgIGF0IEFtYXpvbi5SdW50aW1lLkludGVybmFsLkNhbGxiYWNrSGFuZGxlci5JbnZva2VBc3luY1tUXShJRXhlY3V0aW9uQ29udGV4dCBleGVjdXRpb25Db250ZXh0KQogICBhdCBBbWF6b24uUzMuSW50ZXJuYWwuQW1hem9uUzNFeGNlcHRpb25IYW5kbGVyLkludm9rZUFzeW5jW1RdKElFeGVjdXRpb25Db250ZXh0IGV4ZWN1dGlvbkNvbnRleHQpCiAgIGF0IEFtYXpvbi5SdW50aW1lLkludGVybmFsLkVycm9yQ2FsbGJhY2tIYW5kbGVyLkludm9rZUFzeW5jW1RdKElFeGVjdXRpb25Db250ZXh0IGV4ZWN1dGlvbkNvbnRleHQpCiAgIGF0IEFtYXpvbi5SdW50aW1lLkludGVybmFsLk1ldHJpY3NIYW5kbGVyLkludm9rZUFzeW5jW1RdKElFeGVjdXRpb25Db250ZXh0IGV4ZWN1dGlvbkNvbnRleHQpCiAgIGF0IERlY29tcHJlc3NHekFuZFVwbG9hZC5GdW5jdGlvbnMuUHJvY2VzcyhKc29uRG9jdW1lbnQgcmVxdWVzdCwgSUxhbWJkYUNvbnRleHQgY29udGV4dCkgaW4gL21udC9jL1VzZXJzL2xoYXJkaW5nLkRFU0tUT1AtR0gyQ0U0Ri9TMyBVcGxvYWQgU3RyZWFtL0V4YW1wbGVzL0RlY29tcHJlc3NHekFuZFVwbG9hZC9GdW5jdGlvbi5jczpsaW5lIDM2CiAgIGF0IGxhbWJkYV9tZXRob2QoQ2xvc3VyZSAsIFN0cmVhbSAsIFN0cmVhbSAsIExhbWJkYUNvbnRleHRJbnRlcm5hbCApCgogICBhdCBBbWF6b24uUnVudGltZS5IdHRwV2ViUmVxdWVzdE1lc3NhZ2UuR2V0UmVzcG9uc2VBc3luYyhDYW5jZWxsYXRpb25Ub2tlbiBjYW5jZWxsYXRpb25Ub2tlbikKICAgYXQgQW1hem9uLlJ1bnRpbWUuSW50ZXJuYWwuSHR0cEhhbmRsZXJgMS5JbnZva2VBc3luY1tUXShJRXhlY3V0aW9uQ29udGV4dCBleGVjdXRpb25Db250ZXh0KQogICBhdCBBbWF6b24uUnVudGltZS5JbnRlcm5hbC5SZWRpcmVjdEhhbmRsZXIuSW52b2tlQXN5bmNbVF0oSUV4ZWN1dGlvbkNvbnRleHQgZXhlY3V0aW9uQ29udGV4dCkKICAgYXQgQW1hem9uLlJ1bnRpbWUuSW50ZXJuYWwuVW5tYXJzaGFsbGVyLkludm9rZUFzeW5jW1RdKElFeGVjdXRpb25Db250ZXh0IGV4ZWN1dGlvbkNvbnRleHQpCiAgIGF0IEFtYXpvbi5TMy5JbnRlcm5hbC5BbWF6b25TM1Jlc3BvbnNlSGFuZGxlci5JbnZva2VBc3luY1tUXShJRXhlY3V0aW9uQ29udGV4dCBleGVjdXRpb25Db250ZXh0KQogICBhdCBBbWF6b24uUnVudGltZS5JbnRlcm5hbC5FcnJvckhhbmRsZXIuSW52b2tlQXN5bmNbVF0oSUV4ZWN1dGlvbkNvbnRleHQgZXhlY3V0aW9uQ29udGV4dCkKCkVORCBSZXF1ZXN0SWQ6IDc4ZjRiY2YzLTQ0MzMtNGYyMy04OWM4LTY0MGI5NTBhM2U0NwpSRVBPUlQgUmVxdWVzdElkOiA3OGY0YmNmMy00NDMzLTRmMjMtODljOC02NDBiOTUwYTNlNDcJRHVyYXRpb246IDE3Ny43MCBtcwlCaWxsZWQgRHVyYXRpb246IDE3OCBtcwlNZW1vcnkgU2l6ZTogMjU2IE1CCU1heCBNZW1vcnkgVXNlZDogMTA2IE1CCQo=" diff --git a/Examples/DecompressGzAndUpload/serverless.template b/Examples/DecompressGzAndUpload/serverless.template new file mode 100644 index 0000000..bd38ce7 --- /dev/null +++ b/Examples/DecompressGzAndUpload/serverless.template @@ -0,0 +1,41 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Transform": "AWS::Serverless-2016-10-31", + "Description": "An demo of `S3UploadStream` that does decompression of a large file in S3, with the result being streamed back to S3.", + "Parameters" : { + "Bucket" : { + "Type" : "String", + "Description" : "Enter the name of the bucket to we used with this S3UploadStream example." + } + }, + "Globals": { + "Function": { + "MemorySize": 512, + "Runtime": "dotnetcore3.1", + "Timeout": 900, + "Environment": { + "Variables": { + "BUCKET_NAME": { "Ref" : "Bucket" } + } + } + } + }, + "Resources": { + "Process": { + "Type": "AWS::Serverless::Function", + "Properties": { + "Handler": "DecompressGzAndUpload::DecompressGzAndUpload.Functions::Process", + "CodeUri": "", + "Role": null, + "Policies": [ + "AWSLambdaBasicExecutionRole", + { + "S3CrudPolicy": { + "BucketName": { "Ref" : "Bucket" } + } + } + ] + } + } + } +} \ No newline at end of file diff --git a/Examples/DecryptAesAndUpload/DecryptAesAndUpload.csproj b/Examples/DecryptAesAndUpload/DecryptAesAndUpload.csproj new file mode 100644 index 0000000..24b12a9 --- /dev/null +++ b/Examples/DecryptAesAndUpload/DecryptAesAndUpload.csproj @@ -0,0 +1,16 @@ + + + netcoreapp3.1 + true + Lambda + + + + + + + + + + + \ No newline at end of file diff --git a/Examples/DecryptAesAndUpload/Function.cs b/Examples/DecryptAesAndUpload/Function.cs new file mode 100644 index 0000000..9fd6c13 --- /dev/null +++ b/Examples/DecryptAesAndUpload/Function.cs @@ -0,0 +1,57 @@ +using System.Threading.Tasks; +using System.Text.Json; +using System.Diagnostics; +using Amazon.Lambda.Core; +using Amazon.S3; +using System.Security.Cryptography; +using System; +using Cppl.Utilities.AWS; + +// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. +[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] + +namespace DecryptAesAndUpload +{ + public class Functions + { + readonly static string BUCKET = System.Environment.GetEnvironmentVariable("BUCKET_NAME"); + const string NAME = "csv_1GB"; + const string AES_DATA = NAME + ".aes"; + const string OUTPUT_KEY = NAME + ".csv"; + + public Functions() + { + } + + public async Task Process(JsonDocument request, ILambdaContext context) + { + var s3 = new AmazonS3Client(); + + // easier than doing math on the timestamps in logs + var timer = new Stopwatch(); + timer.Start(); + + context.Logger.LogLine($"{timer.Elapsed}: Getting started."); + using var stream = (await s3.GetObjectAsync(BUCKET, AES_DATA)).ResponseStream; + + // setup a decryptor + using var aes = AesManaged.Create(); + aes.IV = Convert.FromBase64String("EqYoED0ag4vlPnFkWZMCog=="); + aes.Key = Convert.FromBase64String("Sgf9NocncDHSBqMXrMthXbToAQmthMpC6eJ6Hw51Ghg="); + + using var idecrypt = aes.CreateDecryptor(); + using var cstream = new CryptoStream(stream, idecrypt, CryptoStreamMode.Read); + using (var output = new S3UploadStream(s3, BUCKET, OUTPUT_KEY)) { + await cstream.CopyToAsync(output); + } + context.Logger.LogLine($"{timer.Elapsed}: Done copying."); + timer.Stop(); + + return new { + AesFile = $"s3://{BUCKET}/{AES_DATA}", + CsvFile = $"s3://{BUCKET}/{OUTPUT_KEY}", + Status = "ok" + }; + } + } +} diff --git a/Examples/DecryptAesAndUpload/Readme.md b/Examples/DecryptAesAndUpload/Readme.md new file mode 100644 index 0000000..d2b36d2 --- /dev/null +++ b/Examples/DecryptAesAndUpload/Readme.md @@ -0,0 +1,52 @@ +# Streaming AES 256 Decryption from and to S3 + +This example demonstrates a Lambda function that decrypt a file in S3 that is much larger than the available RAM, and uploads the content back to S3 using a `S3UploadStream`. In this case the Lambda is intentionally under-provisioned with ony 1024 MB of RAM while the file to be processed is around 1GB. Processing such a file with a naiive solution where it is read and decompressed into RAM (likely using a `MemoryStream`) would obviously not work here, and saving to an attached drive (EFS) would be slow at best. The example file is randomly-generated CSV content encrypted with AES 256. + +**NOTE**: Running this Lambda processes a very large amount of data, and will cost you money. Keep that in mind, please. Also note that this example reads and WRITES to the bucket you specify, hense following the instructions to create a new bucket for these examples is probably a good idea -- see the parent folder's [Readme.md](../Readme.md) for more information. + +## Deploying and Running from the Command Line + +Deploy application (run from the project directory, see NOTE about about the bucket) +``` + export STACK=s3-upload-stream-example-eas + dotnet lambda deploy-serverless --stack-name $STACK --s3-bucket $BUCKET --template-parameters Bucket=$BUCKET +``` + +Invoking the example function +``` + export LAMBDA=$(aws cloudformation describe-stack-resources --stack-name $STACK --query "StackResources[? LogicalResourceId == 'Process'].PhysicalResourceId" --output text) + aws lambda invoke --function-name $LAMBDA --log-type Tail --cli-read-timeout 900 --query LogResult --output text payload.json > log.txt +``` + +View the resulting payload, and logs +``` + more payload.json + cat log.txt | base64 -d +``` + +Cleanup the stack +``` + aws cloudformation delete-stack --stack-name $STACK +``` + +## Results + +Here are the log messages and output of a (second, not cold-start) run of the function: + +``` +START RequestId: a60cf25a-f575-4d33-a573-ab7716befcaa Version: $LATEST +00:00:00.0000047: Getting started. +00:01:18.2201559: Done copying. +END RequestId: a60cf25a-f575-4d33-a573-ab7716befcaa +REPORT RequestId: a60cf25a-f575-4d33-a573-ab7716befcaa Duration: 78222.65 ms Billed Duration: 78223 ms Memory Size: 1024 MB Max Memory Used: 788 MB +``` + +For completeness, here is the function output payload: + +```json +{ + "AesFile": "s3://s3-upload-examples-5i2iwewz/csv_1GB.aes", + "CsvFile": "s3://s3-upload-examples-5i2iwewz/csv_1GB.csv", + "Status": "ok" +} +``` diff --git a/Examples/DecryptAesAndUpload/aws-lambda-tools-defaults.json b/Examples/DecryptAesAndUpload/aws-lambda-tools-defaults.json new file mode 100644 index 0000000..1253485 --- /dev/null +++ b/Examples/DecryptAesAndUpload/aws-lambda-tools-defaults.json @@ -0,0 +1,13 @@ +{ + "Information": [ + "This file provides default values for the deployment wizard inside Visual Studio and the AWS Lambda commands added to the .NET Core CLI.", + "To learn more about the Lambda commands with the .NET Core CLI execute the following command at the command line in the project root directory.", + "dotnet lambda help", + "All the command line options for the Lambda command can be specified in this file." + ], + "configuration": "Release", + "framework": "netcoreapp3.1", + "s3-prefix": "DecryptAesAndUpload/", + "template": "serverless.template", + "template-parameters": "" +} \ No newline at end of file diff --git a/Examples/DecryptAesAndUpload/serverless.template b/Examples/DecryptAesAndUpload/serverless.template new file mode 100644 index 0000000..4219238 --- /dev/null +++ b/Examples/DecryptAesAndUpload/serverless.template @@ -0,0 +1,41 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Transform": "AWS::Serverless-2016-10-31", + "Description": "An demo of `S3UploadStream` that decrypts a large file in S3, with the result being streamed back to S3.", + "Parameters" : { + "Bucket" : { + "Type" : "String", + "Description" : "Enter the name of the bucket to we used with this S3UploadStream example." + } + }, + "Globals": { + "Function": { + "MemorySize": 1024, + "Runtime": "dotnetcore3.1", + "Timeout": 900, + "Environment": { + "Variables": { + "BUCKET_NAME": { "Ref" : "Bucket" } + } + } + } + }, + "Resources": { + "Process": { + "Type": "AWS::Serverless::Function", + "Properties": { + "Handler": "DecryptAesAndUpload::DecryptAesAndUpload.Functions::Process", + "CodeUri": "", + "Role": null, + "Policies": [ + "AWSLambdaBasicExecutionRole", + { + "S3CrudPolicy": { + "BucketName": { "Ref" : "Bucket" } + } + } + ] + } + } + } +} \ No newline at end of file diff --git a/Examples/DecryptPgpAndUpload/DecryptPgpAndUpload.csproj b/Examples/DecryptPgpAndUpload/DecryptPgpAndUpload.csproj new file mode 100644 index 0000000..ef8f4bb --- /dev/null +++ b/Examples/DecryptPgpAndUpload/DecryptPgpAndUpload.csproj @@ -0,0 +1,18 @@ + + + netcoreapp3.1 + true + Lambda + + + + + + + + + + + + + \ No newline at end of file diff --git a/Examples/DecryptPgpAndUpload/Function.cs b/Examples/DecryptPgpAndUpload/Function.cs new file mode 100644 index 0000000..bab62b6 --- /dev/null +++ b/Examples/DecryptPgpAndUpload/Function.cs @@ -0,0 +1,56 @@ +using System.Threading.Tasks; +using System.Text.Json; +using System.Diagnostics; +using Amazon.Lambda.Core; +using Amazon.S3; +using System.Security.Cryptography; +using System; +using PgpCore; +using Cppl.Utilities.AWS; + +// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. +[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] + +namespace DecryptPgpAndUpload +{ + public class Functions + { + readonly static string BUCKET = System.Environment.GetEnvironmentVariable("BUCKET_NAME"); + const string NAME = "csv_1GB"; + const string PGP_DATA = NAME + ".pgp"; + const string PGP_PRIVATE_KEY = "private_key.asc"; + const string PGP_PASSWORD = "this is not a great place for a password"; + const string OUTPUT_KEY = NAME + ".csv"; + + public Functions() + { + } + + public async Task Process(JsonDocument request, ILambdaContext context) + { + var s3 = new AmazonS3Client(); + + // easier than doing math on the timestamps in logs + var timer = new Stopwatch(); + timer.Start(); + + context.Logger.LogLine($"{timer.Elapsed}: Getting started."); + using var data = new SeekableS3Stream(s3, BUCKET, PGP_DATA, 5 * 1024 * 1024, 10); + using var key = new SeekableS3Stream(s3, BUCKET, PGP_PRIVATE_KEY, 32 * 1024); + + using var pgp = new PGP(); + using (var output = new S3UploadStream(s3, BUCKET, OUTPUT_KEY)) { + await pgp.DecryptStreamAsync(data, output, key, PGP_PASSWORD); + } + + context.Logger.LogLine($"{timer.Elapsed}: Done copying."); + timer.Stop(); + + return new { + PgpFile = $"s3://{BUCKET}/{PGP_DATA}", + CsvFile = $"s3://{BUCKET}/{OUTPUT_KEY}", + Status = "ok" + }; + } + } +} diff --git a/Examples/DecryptPgpAndUpload/Readme.md b/Examples/DecryptPgpAndUpload/Readme.md new file mode 100644 index 0000000..d61e702 --- /dev/null +++ b/Examples/DecryptPgpAndUpload/Readme.md @@ -0,0 +1,55 @@ +# Streaming PGP Decryption from and to S3 + +This example demonstrates a Lambda function that decrypt a file in S3 that is much larger than the available RAM, and uploads the content back to S3 using a `S3UploadStream` and `SeekableS3Stream` (see [here](https://github.com/mlhpdx/seekable-s3-stream)). In this case the Lambda is intentionally under-provisioned with ony 1024 MB of RAM while the file to be processed is around 1GB. Processing such a file with a naiive solution where it is read and decompressed into RAM (likely using a `MemoryStream`) would obviously not work here, and saving to an attached drive (EFS) would be slow at best. The example file is randomly-generated CSV content encrypted with PGP. + +PGP requires the input streams to be random-access (support seeking), so this example includes my seekable s3 stream package, `SeekableS3Stream`, which I [wrote about](https://medium.com/circuitpeople/random-access-seekable-streams-for-amazon-s3-in-c-bd2414255dcd). Since the package is hosted on GitHub you'll need to follow their instructions to setup a package source `https://nuget.pkg.github.com/mlhpdx/index.json`. + +**NOTE**: Running this Lambda processes a very large amount of data, and will cost you money. Keep that in mind, please. Also note that this example reads and WRITES to the bucket you specify, hense following the instructions to create a new bucket for these examples is probably a good idea -- see the parent folder's [Readme.md](../Readme.md) for more information. + +## Deploying and Running from the Command Line + +Deploy application (run from the project directory, see NOTE about about the bucket) +``` + export STACK=s3-upload-stream-example-pgp + dotnet lambda deploy-serverless --stack-name $STACK --s3-bucket $BUCKET --template-parameters Bucket=$BUCKET +``` + +Invoking the example function +``` + export LAMBDA=$(aws cloudformation describe-stack-resources --stack-name $STACK --query "StackResources[? LogicalResourceId == 'Process'].PhysicalResourceId" --output text) + aws lambda invoke --function-name $LAMBDA --log-type Tail --cli-read-timeout 900 --query LogResult --output text payload.json > log.txt +``` + +View the resulting payload, and logs +``` + more payload.json + cat log.txt | base64 -d +``` + +Cleanup the stack +``` + aws cloudformation delete-stack --stack-name $STACK +``` + +## Results + +Here are the log messages and output of a run of the function: + +``` +START RequestId: d97a222a-fb5e-4982-9b27-b54e9a93025f Version: $LATEST +00:00:00.0000598: Getting started. +00:03:13.5352870: Done copying. +END RequestId: d97a222a-fb5e-4982-9b27-b54e9a93025f +REPORT RequestId: d97a222a-fb5e-4982-9b27-b54e9a93025f Duration: 193934.01 ms Billed Duration: 193935 ms Memory Size: 1024 MB Max Memory Used: 900 MB Init Duration: 185.49 ms +``` + +For completeness, here is the function output payload: + +```json +{ + "PgpFile": "s3://s3-upload-examples-5i2iwewz/csv_1GB.pgp", + "CsvFile": "s3://s3-upload-examples-5i2iwewz/csv_1GB.csv", + "Status": "ok" +} +``` + diff --git a/Examples/DecryptPgpAndUpload/aws-lambda-tools-defaults.json b/Examples/DecryptPgpAndUpload/aws-lambda-tools-defaults.json new file mode 100644 index 0000000..bae7d99 --- /dev/null +++ b/Examples/DecryptPgpAndUpload/aws-lambda-tools-defaults.json @@ -0,0 +1,13 @@ +{ + "Information": [ + "This file provides default values for the deployment wizard inside Visual Studio and the AWS Lambda commands added to the .NET Core CLI.", + "To learn more about the Lambda commands with the .NET Core CLI execute the following command at the command line in the project root directory.", + "dotnet lambda help", + "All the command line options for the Lambda command can be specified in this file." + ], + "configuration": "Release", + "framework": "netcoreapp3.1", + "s3-prefix": "DecryptPgpAndUpload/", + "template": "serverless.template", + "template-parameters": "" +} \ No newline at end of file diff --git a/Examples/DecryptPgpAndUpload/serverless.template b/Examples/DecryptPgpAndUpload/serverless.template new file mode 100644 index 0000000..8eda502 --- /dev/null +++ b/Examples/DecryptPgpAndUpload/serverless.template @@ -0,0 +1,41 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Transform": "AWS::Serverless-2016-10-31", + "Description": "An demo of `S3UploadStream` that decrypts a large PGP file in S3, with the result being streamed back to S3.", + "Parameters" : { + "Bucket" : { + "Type" : "String", + "Description" : "Enter the name of the bucket to we used with this S3UploadStream example." + } + }, + "Globals": { + "Function": { + "MemorySize": 1024, + "Runtime": "dotnetcore3.1", + "Timeout": 900, + "Environment": { + "Variables": { + "BUCKET_NAME": { "Ref" : "Bucket" } + } + } + } + }, + "Resources": { + "Process": { + "Type": "AWS::Serverless::Function", + "Properties": { + "Handler": "DecryptPgpAndUpload::DecryptPgpAndUpload.Functions::Process", + "CodeUri": "", + "Role": null, + "Policies": [ + "AWSLambdaBasicExecutionRole", + { + "S3CrudPolicy": { + "BucketName": { "Ref" : "Bucket" } + } + } + ] + } + } + } +} \ No newline at end of file diff --git a/Examples/Readme.md b/Examples/Readme.md new file mode 100644 index 0000000..98e7101 --- /dev/null +++ b/Examples/Readme.md @@ -0,0 +1,39 @@ +# S3 Upload Stream Examples + +The following example projects implement Lambda functions that use `S3UploadStream` to perform worloads that stream content from S3 and back to S3 without holding objects entirely in memory. + +- [Object Decompression](./DecompressGzAndUpload) +- [AES Decryption](./DecryptAesAndUpload) +- [PGP Decryption](./DecryptPgpAndUpload) +- [Zip S3 Objects into a Zip Archive in S3](./ZipS3ListToS3) + +## Deploying and Preparing to Run these Examples from the Command Line + +This application may be deployed using the [Amazon.Lambda.Tools Global Tool](https://github.com/aws/aws-extensions-for-dotnet-cli#aws-lambda-amazonlambdatools) from the command line. The command examples below should work on any Linux, but were composed on WSL 2 so YMMV. + +Install Amazon.Lambda.Tools Global Tools if not already installed: +``` + dotnet tool install -g Amazon.Lambda.Tools +``` + +If already installed check if new version is available: +``` + dotnet tool update -g Amazon.Lambda.Tools +``` + +Create a bucket, and copy the files needed for the examples into it (a few GB of objects): +``` + export BUCKET=s3-upload-examples-$(cat /dev/urandom | tr -dc 'a-z0-9' | fold -w 8 | head -n 1) + aws s3 mb s3://$BUCKET + aws s3 cp s3://us-east-1.whitepuffies.com s3://$BUCKET --recursive +``` + +Once the bucket is created (and the BUCKET environment variable set) you can proceed to the examples above. Each example includes instructions for creating a stack, using it, and removing it (each stack only uses resources in the bucket you create here, both at deployment time and while running the examples). Once you're done with the example, the bucket can be removed. + +To cleanup the bucket: + +**NOTE:** Only run the following commands if you created a *dedicated bucket for these samples*. Running them will result in ALL OBJECTS in the bucket being removed, and the bucket deleted. + +``` + aws s3 rb s3://$BUCKET --force +``` \ No newline at end of file diff --git a/Examples/ZipS3ListToS3/Function.cs b/Examples/ZipS3ListToS3/Function.cs new file mode 100644 index 0000000..37a433a --- /dev/null +++ b/Examples/ZipS3ListToS3/Function.cs @@ -0,0 +1,57 @@ +using System.Threading.Tasks; +using System.Text.Json; +using System.Diagnostics; +using Amazon.Lambda.Core; +using Amazon.S3; +using System.Security.Cryptography; +using System; +using Cppl.Utilities.AWS; +using System.IO.Compression; +using Amazon.S3.Model; + +// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. +[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] + +namespace ZipS3ListToS3 +{ + public class Functions + { + readonly static string BUCKET = System.Environment.GetEnvironmentVariable("BUCKET_NAME"); + const string NAME = "zip_me"; + const string PREFIX = NAME + "/"; + const string OUTPUT_KEY = NAME + ".zip"; + + public Functions() + { + } + + public async Task Process(JsonDocument request, ILambdaContext context) + { + var s3 = new AmazonS3Client(); + + // easier than doing math on the timestamps in logs + var timer = new Stopwatch(); + timer.Start(); + + context.Logger.LogLine($"{timer.Elapsed}: Getting started."); + using (var output = new S3UploadStream(s3, BUCKET, OUTPUT_KEY)) { + using var zip = new ZipArchive(output, ZipArchiveMode.Create); + await foreach (var o in s3.Paginators.ListObjects(new ListObjectsRequest() { BucketName = BUCKET, Prefix = PREFIX }).S3Objects) { + context.Logger.LogLine($"{timer.Elapsed}: Starting on {o.Key}."); + using var stream = (await s3.GetObjectAsync(BUCKET, o.Key)).ResponseStream; + using var entry = zip.CreateEntry(o.Key.Substring(PREFIX.Length)).Open(); + await stream.CopyToAsync(entry); + context.Logger.LogLine($"{timer.Elapsed}: Done with {o.Key}."); + } + } + context.Logger.LogLine($"{timer.Elapsed}: Done."); + timer.Stop(); + + return new { + Prefix = PREFIX, + ZipFile = $"s3://{BUCKET}/{OUTPUT_KEY}", + Status = "ok" + }; + } + } +} diff --git a/Examples/ZipS3ListToS3/Readme.md b/Examples/ZipS3ListToS3/Readme.md new file mode 100644 index 0000000..ca40519 --- /dev/null +++ b/Examples/ZipS3ListToS3/Readme.md @@ -0,0 +1,56 @@ +# Streaming Zip Compression from and to S3 + +This example demonstrates a Lambda function that compresses objects in S3 that are in combination much larger than the available RAM, and uploads the compression archive back to S3 using a `S3UploadStream`. In this case the Lambda is intentionally under-provisioned with ony 1024 MB of RAM while the files to be processed total around 2GB. Producing such a Zip with a naiive solution where each included file is read into RAM and the Zip produced in RAM (likely using `MemoryStream`s) would obviously not work here. Likewise, saving each file to an attached drive (EFS) and producing the zip there would be slow at best. This examples Zip's all the object found at a given prefix (`zip_me/`). + +**NOTE**: Running this Lambda processes a very large amount of data, and will cost you money. Keep that in mind, please. Also note that this example reads and WRITES to the bucket you specify, hense following the instructions to create a new bucket for these examples is probably a good idea -- see the parent folder's [Readme.md](../Readme.md) for more information. + +## Deploying and Running from the Command Line + +Deploy application (run from the project directory, see NOTE about about the bucket) +``` + export STACK=s3-upload-stream-example-zip + dotnet lambda deploy-serverless --stack-name $STACK --s3-bucket $BUCKET --template-parameters Bucket=$BUCKET +``` + +Invoking the example function +``` + export LAMBDA=$(aws cloudformation describe-stack-resources --stack-name $STACK --query "StackResources[? LogicalResourceId == 'Process'].PhysicalResourceId" --output text) + aws lambda invoke --function-name $LAMBDA --log-type Tail --cli-read-timeout 900 --query LogResult --output text payload.json > log.txt +``` + +Viewing the resulting payload, and logs +``` + more payload.json + cat log.txt | base64 -d +``` + +Cleanup the stack +``` + aws cloudformation delete-stack --stack-name $STACK +``` + +## Results + +Here are the log messages and output of a (second, not cold-start) run of the function: + +``` +START RequestId: 7839b691-2c46-43a0-b38f-f5aaedf05b66 Version: $LATEST +00:00:00.0000016: Getting started. +00:00:00.0688976: Starting on zip_me/a.bin. +00:01:52.5103068: Done with zip_me/a.bin. +00:01:52.5105763: Starting on zip_me/b.csv. +00:03:55.3356461: Done with zip_me/b.csv. +00:03:56.9912055: Done. +END RequestId: 7839b691-2c46-43a0-b38f-f5aaedf05b66 +REPORT RequestId: 7839b691-2c46-43a0-b38f-f5aaedf05b66 Duration: 236993.76 ms Billed Duration: 236994 ms Memory Size: 1024 MB Max Memory Used: 858 MB +``` + +For completeness, here is the function output payload: + +```json +{ + "Prefix": "zip_me/", + "ZipFile": "s3://s3-upload-examples-5i2iwewz/zip_me.zip", + "Status": "ok" +} +``` diff --git a/Examples/ZipS3ListToS3/ZipS3ListToS3.csproj b/Examples/ZipS3ListToS3/ZipS3ListToS3.csproj new file mode 100644 index 0000000..24b12a9 --- /dev/null +++ b/Examples/ZipS3ListToS3/ZipS3ListToS3.csproj @@ -0,0 +1,16 @@ + + + netcoreapp3.1 + true + Lambda + + + + + + + + + + + \ No newline at end of file diff --git a/Examples/ZipS3ListToS3/aws-lambda-tools-defaults.json b/Examples/ZipS3ListToS3/aws-lambda-tools-defaults.json new file mode 100644 index 0000000..4e7476c --- /dev/null +++ b/Examples/ZipS3ListToS3/aws-lambda-tools-defaults.json @@ -0,0 +1,13 @@ +{ + "Information": [ + "This file provides default values for the deployment wizard inside Visual Studio and the AWS Lambda commands added to the .NET Core CLI.", + "To learn more about the Lambda commands with the .NET Core CLI execute the following command at the command line in the project root directory.", + "dotnet lambda help", + "All the command line options for the Lambda command can be specified in this file." + ], + "configuration": "Release", + "framework": "netcoreapp3.1", + "s3-prefix": "ZipS3ListToS3/", + "template": "serverless.template", + "template-parameters": "" +} \ No newline at end of file diff --git a/Examples/ZipS3ListToS3/serverless.template b/Examples/ZipS3ListToS3/serverless.template new file mode 100644 index 0000000..8f8c5a9 --- /dev/null +++ b/Examples/ZipS3ListToS3/serverless.template @@ -0,0 +1,41 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Transform": "AWS::Serverless-2016-10-31", + "Description": "An demo of `S3UploadStream` that creates a zip archive from of a set of files in S3, with the result being streamed back to S3.", + "Parameters" : { + "Bucket" : { + "Type" : "String", + "Description" : "Enter the name of the bucket to we used with this S3UploadStream example." + } + }, + "Globals": { + "Function": { + "MemorySize": 1024, + "Runtime": "dotnetcore3.1", + "Timeout": 900, + "Environment": { + "Variables": { + "BUCKET_NAME": { "Ref" : "Bucket" } + } + } + } + }, + "Resources": { + "Process": { + "Type": "AWS::Serverless::Function", + "Properties": { + "Handler": "ZipS3ListToS3::ZipS3ListToS3.Functions::Process", + "CodeUri": "", + "Role": null, + "Policies": [ + "AWSLambdaBasicExecutionRole", + { + "S3CrudPolicy": { + "BucketName": { "Ref" : "Bucket" } + } + } + ] + } + } + } +} \ No newline at end of file diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000..a2ad11a --- /dev/null +++ b/Readme.md @@ -0,0 +1,7 @@ +# S3 Upload Stream + +This code demonstrates how to perform uploads to S3 without holding the entire content in memory or storage beforehand. This approach is more memory efficient than using `MemoryStream` and offers compatibility with libraries and packages that work with a `Stream` interface. Examples for extracting GZIP, decrypting AES and PGP, and uploading dynamically-generated CSV are provided. A [NuGet package](https://github.com/mlhpdx/s3-upload-stream/packages) is also available for the library, hosted here on GitHub. + +This project is a follow-on to `SeekableS3Stream`, also on [Github](https://github.com/mlhpdx/seekable-s3-stream). Using both allows for very efficient and simple large file processing to and from S3 via AWS Lambda, as demonstrated in one of the [Examples](./Examples/Readme.md). + +For a more detailed explaination, check out the article on [Medium](https://medium.com/circuitpeople/streaming-uploads-for-amazon-s3-in-c-...). diff --git a/S3UploadStream.sln b/S3UploadStream.sln new file mode 100644 index 0000000..a5e77e0 --- /dev/null +++ b/S3UploadStream.sln @@ -0,0 +1,111 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 15 +VisualStudioVersion = 15.0.26124.0 +MinimumVisualStudioVersion = 15.0.26124.0 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "S3UploadStream", "S3UploadStream\S3UploadStream.csproj", "{16DFA0F3-42D2-4E71-BE7B-42B885FCD828}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Examples", "Examples", "{B9B3E61A-C8A3-47A5-88E9-47435BE40E2F}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DecompressGzAndUpload", "Examples\DecompressGzAndUpload\DecompressGzAndUpload.csproj", "{6A581B6D-4674-4295-9315-7D950E4C01D1}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DecryptAesAndUpload", "Examples\DecryptAesAndUpload\DecryptAesAndUpload.csproj", "{1E168D5F-036C-4C72-B754-A16D5D517B05}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ZipS3ListToS3", "Examples\ZipS3ListToS3\ZipS3ListToS3.csproj", "{F93B279D-F34B-40B1-88C0-4C3679FBB682}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DecryptPgpAndUpload", "Examples\DecryptPgpAndUpload\DecryptPgpAndUpload.csproj", "{3E5B6847-4031-440E-BB10-DA9F8CB53C11}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Debug|x64 = Debug|x64 + Debug|x86 = Debug|x86 + Release|Any CPU = Release|Any CPU + Release|x64 = Release|x64 + Release|x86 = Release|x86 + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {16DFA0F3-42D2-4E71-BE7B-42B885FCD828}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {16DFA0F3-42D2-4E71-BE7B-42B885FCD828}.Debug|Any CPU.Build.0 = Debug|Any CPU + {16DFA0F3-42D2-4E71-BE7B-42B885FCD828}.Debug|x64.ActiveCfg = Debug|Any CPU + {16DFA0F3-42D2-4E71-BE7B-42B885FCD828}.Debug|x64.Build.0 = Debug|Any CPU + {16DFA0F3-42D2-4E71-BE7B-42B885FCD828}.Debug|x86.ActiveCfg = Debug|Any CPU + {16DFA0F3-42D2-4E71-BE7B-42B885FCD828}.Debug|x86.Build.0 = Debug|Any CPU + {16DFA0F3-42D2-4E71-BE7B-42B885FCD828}.Release|Any CPU.ActiveCfg = Release|Any CPU + {16DFA0F3-42D2-4E71-BE7B-42B885FCD828}.Release|Any CPU.Build.0 = Release|Any CPU + {16DFA0F3-42D2-4E71-BE7B-42B885FCD828}.Release|x64.ActiveCfg = Release|Any CPU + {16DFA0F3-42D2-4E71-BE7B-42B885FCD828}.Release|x64.Build.0 = Release|Any CPU + {16DFA0F3-42D2-4E71-BE7B-42B885FCD828}.Release|x86.ActiveCfg = Release|Any CPU + {16DFA0F3-42D2-4E71-BE7B-42B885FCD828}.Release|x86.Build.0 = Release|Any CPU + {D3AFBF9F-230F-4B4E-B36C-981A2CD4CA42}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D3AFBF9F-230F-4B4E-B36C-981A2CD4CA42}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D3AFBF9F-230F-4B4E-B36C-981A2CD4CA42}.Debug|x64.ActiveCfg = Debug|Any CPU + {D3AFBF9F-230F-4B4E-B36C-981A2CD4CA42}.Debug|x64.Build.0 = Debug|Any CPU + {D3AFBF9F-230F-4B4E-B36C-981A2CD4CA42}.Debug|x86.ActiveCfg = Debug|Any CPU + {D3AFBF9F-230F-4B4E-B36C-981A2CD4CA42}.Debug|x86.Build.0 = Debug|Any CPU + {D3AFBF9F-230F-4B4E-B36C-981A2CD4CA42}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D3AFBF9F-230F-4B4E-B36C-981A2CD4CA42}.Release|Any CPU.Build.0 = Release|Any CPU + {D3AFBF9F-230F-4B4E-B36C-981A2CD4CA42}.Release|x64.ActiveCfg = Release|Any CPU + {D3AFBF9F-230F-4B4E-B36C-981A2CD4CA42}.Release|x64.Build.0 = Release|Any CPU + {D3AFBF9F-230F-4B4E-B36C-981A2CD4CA42}.Release|x86.ActiveCfg = Release|Any CPU + {D3AFBF9F-230F-4B4E-B36C-981A2CD4CA42}.Release|x86.Build.0 = Release|Any CPU + {6A581B6D-4674-4295-9315-7D950E4C01D1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6A581B6D-4674-4295-9315-7D950E4C01D1}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6A581B6D-4674-4295-9315-7D950E4C01D1}.Debug|x64.ActiveCfg = Debug|Any CPU + {6A581B6D-4674-4295-9315-7D950E4C01D1}.Debug|x64.Build.0 = Debug|Any CPU + {6A581B6D-4674-4295-9315-7D950E4C01D1}.Debug|x86.ActiveCfg = Debug|Any CPU + {6A581B6D-4674-4295-9315-7D950E4C01D1}.Debug|x86.Build.0 = Debug|Any CPU + {6A581B6D-4674-4295-9315-7D950E4C01D1}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6A581B6D-4674-4295-9315-7D950E4C01D1}.Release|Any CPU.Build.0 = Release|Any CPU + {6A581B6D-4674-4295-9315-7D950E4C01D1}.Release|x64.ActiveCfg = Release|Any CPU + {6A581B6D-4674-4295-9315-7D950E4C01D1}.Release|x64.Build.0 = Release|Any CPU + {6A581B6D-4674-4295-9315-7D950E4C01D1}.Release|x86.ActiveCfg = Release|Any CPU + {6A581B6D-4674-4295-9315-7D950E4C01D1}.Release|x86.Build.0 = Release|Any CPU + {1E168D5F-036C-4C72-B754-A16D5D517B05}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1E168D5F-036C-4C72-B754-A16D5D517B05}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1E168D5F-036C-4C72-B754-A16D5D517B05}.Debug|x64.ActiveCfg = Debug|Any CPU + {1E168D5F-036C-4C72-B754-A16D5D517B05}.Debug|x64.Build.0 = Debug|Any CPU + {1E168D5F-036C-4C72-B754-A16D5D517B05}.Debug|x86.ActiveCfg = Debug|Any CPU + {1E168D5F-036C-4C72-B754-A16D5D517B05}.Debug|x86.Build.0 = Debug|Any CPU + {1E168D5F-036C-4C72-B754-A16D5D517B05}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1E168D5F-036C-4C72-B754-A16D5D517B05}.Release|Any CPU.Build.0 = Release|Any CPU + {1E168D5F-036C-4C72-B754-A16D5D517B05}.Release|x64.ActiveCfg = Release|Any CPU + {1E168D5F-036C-4C72-B754-A16D5D517B05}.Release|x64.Build.0 = Release|Any CPU + {1E168D5F-036C-4C72-B754-A16D5D517B05}.Release|x86.ActiveCfg = Release|Any CPU + {1E168D5F-036C-4C72-B754-A16D5D517B05}.Release|x86.Build.0 = Release|Any CPU + {F93B279D-F34B-40B1-88C0-4C3679FBB682}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F93B279D-F34B-40B1-88C0-4C3679FBB682}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F93B279D-F34B-40B1-88C0-4C3679FBB682}.Debug|x64.ActiveCfg = Debug|Any CPU + {F93B279D-F34B-40B1-88C0-4C3679FBB682}.Debug|x64.Build.0 = Debug|Any CPU + {F93B279D-F34B-40B1-88C0-4C3679FBB682}.Debug|x86.ActiveCfg = Debug|Any CPU + {F93B279D-F34B-40B1-88C0-4C3679FBB682}.Debug|x86.Build.0 = Debug|Any CPU + {F93B279D-F34B-40B1-88C0-4C3679FBB682}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F93B279D-F34B-40B1-88C0-4C3679FBB682}.Release|Any CPU.Build.0 = Release|Any CPU + {F93B279D-F34B-40B1-88C0-4C3679FBB682}.Release|x64.ActiveCfg = Release|Any CPU + {F93B279D-F34B-40B1-88C0-4C3679FBB682}.Release|x64.Build.0 = Release|Any CPU + {F93B279D-F34B-40B1-88C0-4C3679FBB682}.Release|x86.ActiveCfg = Release|Any CPU + {F93B279D-F34B-40B1-88C0-4C3679FBB682}.Release|x86.Build.0 = Release|Any CPU + {3E5B6847-4031-440E-BB10-DA9F8CB53C11}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3E5B6847-4031-440E-BB10-DA9F8CB53C11}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3E5B6847-4031-440E-BB10-DA9F8CB53C11}.Debug|x64.ActiveCfg = Debug|Any CPU + {3E5B6847-4031-440E-BB10-DA9F8CB53C11}.Debug|x64.Build.0 = Debug|Any CPU + {3E5B6847-4031-440E-BB10-DA9F8CB53C11}.Debug|x86.ActiveCfg = Debug|Any CPU + {3E5B6847-4031-440E-BB10-DA9F8CB53C11}.Debug|x86.Build.0 = Debug|Any CPU + {3E5B6847-4031-440E-BB10-DA9F8CB53C11}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3E5B6847-4031-440E-BB10-DA9F8CB53C11}.Release|Any CPU.Build.0 = Release|Any CPU + {3E5B6847-4031-440E-BB10-DA9F8CB53C11}.Release|x64.ActiveCfg = Release|Any CPU + {3E5B6847-4031-440E-BB10-DA9F8CB53C11}.Release|x64.Build.0 = Release|Any CPU + {3E5B6847-4031-440E-BB10-DA9F8CB53C11}.Release|x86.ActiveCfg = Release|Any CPU + {3E5B6847-4031-440E-BB10-DA9F8CB53C11}.Release|x86.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {D3AFBF9F-230F-4B4E-B36C-981A2CD4CA42} = {C8CD868B-38DD-4731-9D6D-7753B1142B53} + {6A581B6D-4674-4295-9315-7D950E4C01D1} = {B9B3E61A-C8A3-47A5-88E9-47435BE40E2F} + {1E168D5F-036C-4C72-B754-A16D5D517B05} = {B9B3E61A-C8A3-47A5-88E9-47435BE40E2F} + {F93B279D-F34B-40B1-88C0-4C3679FBB682} = {B9B3E61A-C8A3-47A5-88E9-47435BE40E2F} + {3E5B6847-4031-440E-BB10-DA9F8CB53C11} = {B9B3E61A-C8A3-47A5-88E9-47435BE40E2F} + EndGlobalSection +EndGlobal diff --git a/S3UploadStream/S3UploadStream.cs b/S3UploadStream/S3UploadStream.cs new file mode 100644 index 0000000..00daa5b --- /dev/null +++ b/S3UploadStream/S3UploadStream.cs @@ -0,0 +1,185 @@ +using Amazon.S3; +using Amazon.S3.Model; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; + +namespace Cppl.Utilities.AWS +{ + public class S3UploadStream : Stream + { + /* Note the that maximum size (as of now) of a file in S3 is 5TB so it isn't + * safe to assume all uploads will work here. MAX_PART_SIZE times MAX_PART_COUNT + * is ~50TB, which is too big for S3. */ + const long MIN_PART_LENGTH = 5L * 1024 * 1024; // all parts but the last this size or greater + const long MAX_PART_LENGTH = 5L * 1024 * 1024 * 1024; // 5GB max per PUT + const long MAX_PART_COUNT = 10000; // no more than 10,000 parts total + const long DEFAULT_PART_LENGTH = MIN_PART_LENGTH; + + internal class Metadata + { + public string BucketName; + public string Key; + public long PartLength = DEFAULT_PART_LENGTH; + + public int PartCount = 0; + public string UploadId; + public MemoryStream CurrentStream; + + public long Position = 0; // based on bytes written + public long Length = 0; // based on bytes written or SetLength, whichever is larger (no truncation) + + public List Tasks = new List(); + public ConcurrentDictionary PartETags = new ConcurrentDictionary(); + } + + Metadata _metadata = new Metadata(); + IAmazonS3 _s3 = null; + + public S3UploadStream(IAmazonS3 s3, string s3uri, long partLength = DEFAULT_PART_LENGTH) + : this(s3, new Uri(s3uri), partLength) + { + } + + public S3UploadStream(IAmazonS3 s3, Uri s3uri, long partLength = DEFAULT_PART_LENGTH) + : this (s3, s3uri.Host, s3uri.LocalPath.Substring(1), partLength) + { + } + + public S3UploadStream(IAmazonS3 s3, string bucket, string key, long partLength = DEFAULT_PART_LENGTH) + { + _s3 = s3; + _metadata.BucketName = bucket; + _metadata.Key = key; + _metadata.PartLength = partLength; + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + if (_metadata != null) + { + Flush(true); + CompleteUpload(); + } + } + _metadata = null; + base.Dispose(disposing); + } + + public override bool CanRead => false; + public override bool CanSeek => false; + public override bool CanWrite => true; + public override long Length => _metadata.Length = Math.Max(_metadata.Length, _metadata.Position); + + public override long Position + { + get => _metadata.Position; + set => throw new NotImplementedException(); + } + + public override int Read(byte[] buffer, int offset, int count) => throw new NotImplementedException(); + public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException(); + + public override void SetLength(long value) + { + _metadata.Length = Math.Max(_metadata.Length, value); + _metadata.PartLength = Math.Max(MIN_PART_LENGTH, Math.Min(MAX_PART_LENGTH, _metadata.Length / MAX_PART_COUNT)); + } + + private void StartNewPart() + { + if (_metadata.CurrentStream != null) { + Flush(false); + } + _metadata.CurrentStream = new MemoryStream(); + _metadata.PartLength = Math.Min(MAX_PART_LENGTH, Math.Max(_metadata.PartLength, (_metadata.PartCount / 2 + 1) * MIN_PART_LENGTH)); + } + + public override void Flush() + { + Flush(false); + } + + private void Flush(bool disposing) + { + if (_metadata.UploadId == null) { + _metadata.UploadId = _s3.InitiateMultipartUploadAsync(new InitiateMultipartUploadRequest() + { + BucketName = _metadata.BucketName, + Key = _metadata.Key + }).GetAwaiter().GetResult().UploadId; + } + // INFO: Don't complete the request here. A flush just means send the buffer. If it's + // less than MIN_PART_SIZE bytes and not the last part an exception will be thrown by + // S3 but c'est la vie! Maybe that should generate a warning. + if (_metadata.CurrentStream != null) + { + var i = ++_metadata.PartCount; + + _metadata.CurrentStream.Seek(0, SeekOrigin.Begin); + var request = new UploadPartRequest() + { + BucketName = _metadata.BucketName, + Key = _metadata.Key, + UploadId = _metadata.UploadId, + PartNumber = i, + IsLastPart = disposing, + InputStream = _metadata.CurrentStream + }; + _metadata.CurrentStream = null; + + var upload = Task.Run(async () => + { + var response = await _s3.UploadPartAsync(request); + _metadata.PartETags.AddOrUpdate(i, response.ETag, + (n, s) => response.ETag); + request.InputStream.Dispose(); + }); + _metadata.Tasks.Add(upload); + } + } + + private void CompleteUpload() + { + Task.WaitAll(_metadata.Tasks.ToArray()); + + if (Length > 0) { + _s3.CompleteMultipartUploadAsync(new CompleteMultipartUploadRequest() + { + BucketName = _metadata.BucketName, + Key = _metadata.Key, + PartETags = _metadata.PartETags.Select(e => new PartETag(e.Key, e.Value)).ToList(), + UploadId = _metadata.UploadId + }).GetAwaiter().GetResult(); + } + } + + public override void Write(byte[] buffer, int offset, int count) + { + if (count == 0) return; + + // write as much of the buffer as will fit to the current part, and if needed + // allocate a new part and continue writing to it (and so on). + var o = offset; + var c = Math.Min(count, buffer.Length - offset); // don't over-read the buffer, even if asked to + do + { + if (_metadata.CurrentStream == null || _metadata.CurrentStream.Length >= _metadata.PartLength) + StartNewPart(); + + var remaining = _metadata.PartLength - _metadata.CurrentStream.Length; + var w = Math.Min(c, (int)remaining); + _metadata.CurrentStream.Write(buffer, o, w); + + _metadata.Position += w; + c -= w; + o += w; + } while (c > 0); + } + } +} diff --git a/S3UploadStream/S3UploadStream.csproj b/S3UploadStream/S3UploadStream.csproj new file mode 100644 index 0000000..ee03d6f --- /dev/null +++ b/S3UploadStream/S3UploadStream.csproj @@ -0,0 +1,20 @@ + + + + netcoreapp3.1 + Cppl.Utilities.AWS + S3UploadStream + Lee Harding + CircuitPeople + S3UploadStream Library + A `Stream` subclass that allows uploading dynamically generated (streaming, unknown size) content to AWS S3. + stream;s3;aws;http-range;content-length;multi-part + https://github.com/mlhpdx/s3-upload-stream + MIT + + + + + + +