// Copyright Epic Games, Inc. All Rights Reserved. using System; using System.Buffers; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Net.Http; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using System.Xml; using Amazon; using Amazon.Extensions.NETCore.Setup; using Amazon.Runtime; using Amazon.S3; using Amazon.S3.Model; using Amazon.S3.Transfer; using Amazon.S3.Util; using EpicGames.Horde.Storage; using Jupiter.Common; using Jupiter.Common.Implementation; using Jupiter.Common.Utils; using Jupiter.Implementation.Blob; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using OpenTelemetry.Trace; using Exception = System.Exception; using KeyNotFoundException = System.Collections.Generic.KeyNotFoundException; namespace Jupiter.Implementation { public class AmazonS3Store : IBlobStore, IMultipartBlobStore, IRegionalBlobStore { private readonly IAmazonS3 _amazonS3; private readonly IBlobIndex _blobIndex; private readonly INamespacePolicyResolver _namespacePolicyResolver; private readonly ILogger _logger; private readonly IServiceProvider _provider; private readonly S3Settings _settings; private readonly JupiterSettings _jupiterSettings; private readonly ConcurrentDictionary _backends = new ConcurrentDictionary(); public AmazonS3Store(AmazonS3Factory amazonS3Factory, IOptionsMonitor settings, IOptionsMonitor jupiterSettings, IBlobIndex blobIndex, INamespacePolicyResolver namespacePolicyResolver, ILogger logger, IServiceProvider provider) { _amazonS3 = amazonS3Factory.Create(); _blobIndex = blobIndex; _namespacePolicyResolver = namespacePolicyResolver; _logger = logger; _provider = provider; _settings = settings.CurrentValue; _jupiterSettings = jupiterSettings.CurrentValue; } private AmazonStorageBackend GetBackend(NamespaceId ns, string? site = null) { site ??= _jupiterSettings.CurrentSite; string bucketName = GetBucketName(ns, site); return _backends.GetOrAdd($"{ns}:{site}", x => { string? awsRegion = null; // Determine the region for the bucket if we're not dealing with the current site if (site != _jupiterSettings.CurrentSite) { GetBucketLocationRequest bucketLocationRequest = new GetBucketLocationRequest() { BucketName = bucketName }; GetBucketLocationResponse bucketLocationResponse = _amazonS3.GetBucketLocationAsync(bucketLocationRequest).Result; // us-east-1 and eu-west-1 don't contain their actual region identifiers for some reason if (bucketLocationResponse.Location == S3Region.USEast1) { awsRegion = "us-east-1"; } else if (bucketLocationResponse.Location == S3Region.EUWest1) { awsRegion = "eu-west-1"; } else { awsRegion = bucketLocationResponse.Location.Value; } } return ActivatorUtilities.CreateInstance(_provider, new AmazonStorageBackendConfiguration() { BucketName = bucketName, Region = awsRegion }); }); } public async Task GetObjectByRedirectAsync(NamespaceId ns, BlobId identifier) { Uri? uri = await GetBackend(ns).GetReadRedirectAsync(identifier.AsS3Key()); return uri; } public async Task GetObjectMetadataAsync(NamespaceId ns, BlobId identifier) { BlobMetadata? result = await GetBackend(ns).GetMetadataAsync(identifier.AsS3Key()); if (result == null) { throw new BlobNotFoundException(ns, identifier); } return result; } public async Task CopyBlobAsync(NamespaceId ns, NamespaceId targetNamespace, BlobId blobId) { await GetBackend(targetNamespace).CopyBlobAsync(GetBackend(ns), blobId.AsS3Key(), blobId.AsS3Key()); } public async Task PutObjectWithRedirectAsync(NamespaceId ns, BlobId identifier) { Uri? uri = await GetBackend(ns).GetWriteRedirectAsync(identifier.AsS3Key()); return uri; } public async Task PutObjectAsync(NamespaceId ns, ReadOnlyMemory content, BlobId objectName) { await using MemoryStream stream = new MemoryStream(content.ToArray()); return await PutObjectAsync(ns, stream, objectName); } public async Task PutObjectAsync(NamespaceId ns, Stream stream, BlobId objectName) { await GetBackend(ns).WriteAsync(objectName.AsS3Key(), stream, CancellationToken.None); return objectName; } public async Task PutObjectAsync(NamespaceId ns, byte[] content, BlobId objectName) { await using MemoryStream stream = new MemoryStream(content); return await PutObjectAsync(ns, stream, objectName); } private string GetBucketName(NamespaceId ns, string site) { try { NamespacePolicy policy = _namespacePolicyResolver.GetPoliciesForNs(ns); string storagePool = policy.StoragePool; // Prefer explicitly defined bucket names for the current storage pool and region if (_settings.SiteBucketsPerStoragePool.TryGetValue(storagePool, out Dictionary? siteBuckets)) { if (siteBuckets.TryGetValue(site, out string? bucketName)) { return bucketName; } } if (site != _jupiterSettings.CurrentSite) { throw new Exception("Unable to resolve bucket name for specific site. Please define regional bucket names in S3.SiteBucketsPerStoragePool"); } // if the bucket to use for the storage pool has been overriden we use the override if (_settings.StoragePoolBucketOverride.TryGetValue(storagePool, out string? containerOverride)) { return containerOverride; } // by default we use the storage pool as a suffix to determine the bucket for that pool string storagePoolSuffix = string.IsNullOrEmpty(storagePool) ? "" : $"-{storagePool}"; return $"{_settings.BucketName}{storagePoolSuffix}"; } catch (KeyNotFoundException) { throw new NamespaceNotFoundException(ns); } } public async Task GetObjectAsync(NamespaceId ns, BlobId blob, LastAccessTrackingFlags flags = LastAccessTrackingFlags.DoTracking, bool supportsRedirectUri = false) { NamespacePolicy policies = _namespacePolicyResolver.GetPoliciesForNs(ns); try { if (supportsRedirectUri && policies.AllowRedirectUris) { Uri? redirectUri = await GetBackend(ns).GetReadRedirectAsync(blob.AsS3Key()); if (redirectUri != null) { return new BlobContents(redirectUri); } } BlobContents? contents = await GetBackend(ns).TryReadAsync(blob.AsS3Key(), flags); if (contents == null) { throw new BlobNotFoundException(ns, blob); } return contents; } catch (AmazonS3Exception e) { // log information about the failed request except for 404 as its valid to not find objects in S3 if (e.StatusCode != HttpStatusCode.NotFound) { _logger.LogWarning("Exception raised from S3 {Exception}. {RequestId} {Id}", e, e.RequestId, e.AmazonId2); } // rethrow the exception, we just wanted to log more information about the failed request for further debugging throw; } } public async Task ExistsAsync(NamespaceId ns, BlobId blobIdentifier, bool forceCheck) { NamespacePolicy policies = _namespacePolicyResolver.GetPoliciesForNs(ns); if (_settings.UseBlobIndexForExistsCheck && policies.UseBlobIndexForSlowExists && !forceCheck) { return await _blobIndex.BlobExistsInRegionAsync(ns, blobIdentifier); } else { return await GetBackend(ns).ExistsAsync(blobIdentifier.AsS3Key(), CancellationToken.None); } } public async Task DeleteNamespaceAsync(NamespaceId ns) { string bucketName = GetBucketName(ns, _jupiterSettings.CurrentSite); try { await _amazonS3.DeleteBucketAsync(bucketName); } catch (AmazonS3Exception e) { // if the bucket does not exist we get a not found status code if (e.StatusCode == HttpStatusCode.NotFound) { // deleting a none existent bucket is a success return; } // something else happened, lets just process it as usual } } public async IAsyncEnumerable<(BlobId, DateTime)> ListObjectsAsync(NamespaceId ns) { IStorageBackend backend = GetBackend(ns); await foreach ((string path, DateTime time) in backend.ListAsync()) { // ignore objects in the temp prefix if (path.StartsWith("Temp", StringComparison.OrdinalIgnoreCase)) { continue; } string identifierString = path.Substring(path.LastIndexOf("/", StringComparison.Ordinal) + 1); yield return (new BlobId(identifierString), time); } } public async Task DeleteObjectAsync(NamespaceId ns, BlobId blobIdentifier) { IStorageBackend backend = GetBackend(ns); await backend.DeleteAsync(blobIdentifier.AsS3Key()); } public async Task DeleteObjectAsync(IEnumerable namespaces, BlobId blob) { List namespaceIds = namespaces.ToList(); List storagePools = namespaceIds.Select(ns => _namespacePolicyResolver.GetPoliciesForNs(ns).StoragePool).Distinct().ToList(); Dictionary storagePoolsToClean = storagePools.ToDictionary(storagePool => storagePool, storagePool => namespaceIds.FirstOrDefault(id => _namespacePolicyResolver.GetPoliciesForNs(id).StoragePool == storagePool)); foreach ((string _, NamespaceId ns) in storagePoolsToClean) { await GetBackend(ns).DeleteAsync(blob.AsS3Key(), CancellationToken.None); } } #region IMultipartBlobStore private const string TempMultipartPrefix = "Temp/Multipart"; public Task StartMultipartUploadAsync(NamespaceId ns, string blobName) { return GetBackend(ns).StartMultipartUploadAsync($"{TempMultipartPrefix}/{blobName}"); } public Task CompleteMultipartUploadAsync(NamespaceId ns, string blobName, string uploadId, List partIds) { return GetBackend(ns).CompleteMultipartUploadAsync($"{TempMultipartPrefix}/{blobName}", uploadId, partIds); } public Task GetWriteRedirectForPartAsync(NamespaceId ns, string blobName, string uploadId, string partIdentifier) { return Task.FromResult(GetBackend(ns).GetWriteRedirectForPart($"{TempMultipartPrefix}/{blobName}", uploadId, partIdentifier)); } public async Task GetMultipartObjectByNameAsync(NamespaceId ns, string blobName) { return await GetBackend(ns).TryReadAsync($"{TempMultipartPrefix}/{blobName}", LastAccessTrackingFlags.SkipTracking); } public async Task RenameMultipartBlobAsync(NamespaceId ns, string blobName, BlobId blobId) { await GetBackend(ns).RenameMultipartBlobAsync($"{TempMultipartPrefix}/{blobName}", blobId.AsS3Key()); } public Task PutMultipartPartAsync(NamespaceId ns, string blobName, string uploadId, string partIdentifier, byte[] blob) { return GetBackend(ns).PutMultipartPartAsync($"{TempMultipartPrefix}/{blobName}", uploadId, partIdentifier, blob); } public List GetMultipartRanges(NamespaceId ns, string uploadId, ulong blobLength) { ulong countOfChunks = (blobLength / IdealS3ChunkSize) + 1; if (Math.Min(countOfChunks, MaxS3Chunks) == MaxS3Chunks) { throw new Exception("Multipart blob would use more then max chunks in S3, this is not supported"); } List parts = new(); ulong firstByte = 0; ulong lastByte = IdealS3ChunkSize; // last byte is inclusive as per http range requests for (int i = 1; i < (int)countOfChunks+1; i++) { // s3 parts start at 1 and max is 10_000 (inclusive) // s3 part ids are simply incrementing numbers parts.Add(new MultipartByteRange() {FirstByte = firstByte, LastByte = lastByte, PartId=i.ToString()}); firstByte += IdealS3ChunkSize; lastByte = Math.Min(lastByte + IdealS3ChunkSize, blobLength); } return parts; } public MultipartLimits GetMultipartLimits(NamespaceId ns) { return new MultipartLimits { IdealChunkSize = IdealS3ChunkSize, MaxCountOfChunks = (int)MaxS3Chunks, MinChunkSize = 8 * 1024 * 1024 }; } private const ulong MaxS3Chunks = 10_000; private const int IdealS3ChunkSize = 32 * 1024 * 1024; // 32 MB parts - this means the largest file we can upload to S3 is 312GB #endregion #region IRegionalBlobStore public async Task PutObjectAsync(NamespaceId ns, ReadOnlyMemory content, BlobId objectName, string region) { await using MemoryStream stream = new MemoryStream(content.ToArray()); return await PutObjectAsync(ns, stream, objectName, region); } public async Task PutObjectAsync(NamespaceId ns, Stream stream, BlobId objectName, string region) { await GetBackend(ns, region).WriteAsync(objectName.AsS3Key(), stream, CancellationToken.None); return objectName; } public async Task PutObjectAsync(NamespaceId ns, byte[] content, BlobId objectName, string region) { await using MemoryStream stream = new MemoryStream(content); return await PutObjectAsync(ns, stream, objectName, region); } public async Task DeleteObjectAsync(NamespaceId ns, BlobId blobId, string region) { IStorageBackend backend = GetBackend(ns, region); await backend.DeleteAsync(blobId.AsS3Key()); } public async Task ImportBlobAsync(NamespaceId ns, string sourceRegion, BlobId blobId) { AmazonStorageBackend source = GetBackend(ns, sourceRegion); AmazonStorageBackend destination = GetBackend(ns); await destination.EnsureBucketExistsAsync(); await destination.CopyBlobAsync(source, blobId.AsS3Key(), blobId.AsS3Key()); } #endregion } public readonly struct AmazonStorageBackendConfiguration { public string BucketName { get; init; } public string? Region { get; init; } } public class AmazonStorageBackend : IStorageBackend { private readonly IAmazonS3 _amazonS3; private readonly string _bucketName; private readonly IOptionsMonitor _settings; private readonly Tracer _tracer; private readonly ILogger _logger; private readonly BufferedPayloadFactory _payloadFactory; private bool _bucketExistenceChecked; private bool _bucketAccessPolicyApplied; public AmazonStorageBackend(AmazonStorageBackendConfiguration configuration, AmazonS3Factory amazonS3Factory, IOptionsMonitor settings, Tracer tracer, ILogger logger, BufferedPayloadFactory payloadFactory) { _amazonS3 = amazonS3Factory.Create(configuration.Region); _bucketName = configuration.BucketName; _settings = settings; _tracer = tracer; _logger = logger; _payloadFactory = payloadFactory; } public async Task EnsureBucketExistsAsync(CancellationToken cancellationToken = default) { if (_settings.CurrentValue.CreateBucketIfMissing) { if (!_bucketExistenceChecked) { bool bucketExist = await AmazonS3Util.DoesS3BucketExistV2Async(_amazonS3, _bucketName); if (!bucketExist) { PutBucketRequest putBucketRequest = new PutBucketRequest { BucketName = _bucketName, UseClientRegion = true }; try { await _amazonS3.PutBucketAsync(putBucketRequest, cancellationToken); } catch (AmazonS3Exception e) { if (e.StatusCode == HttpStatusCode.Conflict) { // bucket already exists so no need to try and create it, most likely a race condition with another write operation that already tried to create the bucket at the same time in this or another instance // as this will end up with the bucket existing we can just ignore the error } else { throw; } } } _bucketExistenceChecked = true; } } if (_settings.CurrentValue.SetBucketPolicies && !_bucketAccessPolicyApplied) { // block all public access to the bucket try { await _amazonS3.PutPublicAccessBlockAsync(new PutPublicAccessBlockRequest { BucketName = _bucketName, PublicAccessBlockConfiguration = new PublicAccessBlockConfiguration() { RestrictPublicBuckets = true, BlockPublicAcls = true, BlockPublicPolicy = true, IgnorePublicAcls = true, } }, cancellationToken); _bucketAccessPolicyApplied = true; } catch (AmazonS3Exception e) { // if a conflicting operation is being applied to the public access block we just ignore it, as it will get reset the next time we run if (e.StatusCode != HttpStatusCode.Conflict) { throw; } } } } public async Task WriteAsync(string path, Stream stream, CancellationToken cancellationToken) { await EnsureBucketExistsAsync(cancellationToken); if (_settings.CurrentValue.UseMultiPartUpload) { await WriteMultipartAsync(path, stream, cancellationToken); } else { PutObjectRequest request = new PutObjectRequest { BucketName = _bucketName, Key = path, InputStream = stream, UseChunkEncoding = _settings.CurrentValue.UseChunkEncoding, DisablePayloadSigning = _settings.CurrentValue.DisablePayloadChecksums, DisableDefaultChecksumValidation = _settings.CurrentValue.DisablePayloadChecksums, }; try { await _amazonS3.PutObjectAsync(request, cancellationToken); } catch (AmazonS3Exception e) { // if the same object is added twice S3 will raise a error, as we are content addressed we can just accept whichever of the objects so we can ignore that error if (e.StatusCode == HttpStatusCode.Conflict) { return; } if (e.StatusCode == HttpStatusCode.TooManyRequests) { throw new ResourceHasToManyRequestsException(e); } throw; } } } private async Task WriteMultipartAsync(string path, Stream stream, CancellationToken cancellationToken) { FilesystemBufferedPayload? payload = null; try { await using MemoryStream localStream = new MemoryStream(); string? filePath = null; if (stream is FileStream fileStream) { filePath = fileStream.Name; } // use multipart transfers when buffer is larger than 16 MB, which is also the S3 default else if (stream.Length > 16 * (long)Math.Pow(2, 20)) { // will be chunked by TransferUtility using FilesystemBufferedPayloadWriter writer = _payloadFactory.CreateFilesystemBufferedPayloadWriter("s3-upload"); { await using Stream writableStream = writer.GetWritableStream(); await stream.CopyToAsync(writableStream, cancellationToken); } payload = writer.Done(); filePath = payload.TempFile.FullName; } else { // copy the stream as TransferUtility will dispose this stream if any issue happens making us unable to retry await stream.CopyToAsync(localStream, cancellationToken); } const int MaxRetryAttempts = 3; for (int i = 0; i < MaxRetryAttempts; i++) { using TransferUtility utility = new TransferUtility(_amazonS3); try { if (filePath != null) { TransferUtilityUploadRequest uploadRequest = new TransferUtilityUploadRequest() { BucketName = _bucketName, Key = path, FilePath = filePath, DisablePayloadSigning = _settings.CurrentValue.DisablePayloadChecksums, DisableDefaultChecksumValidation = _settings.CurrentValue.DisablePayloadChecksums, }; await utility.UploadAsync(uploadRequest, cancellationToken); } else { // reset the stream position so that we upload the entire stream each attempt localStream.Position = 0; TransferUtilityUploadRequest uploadRequest = new TransferUtilityUploadRequest() { BucketName = _bucketName, Key = path, InputStream = localStream, DisablePayloadSigning = _settings.CurrentValue.DisablePayloadChecksums, DisableDefaultChecksumValidation = _settings.CurrentValue.DisablePayloadChecksums, }; await utility.UploadAsync(uploadRequest, cancellationToken); } break; } catch (AmazonS3Exception e) { // if the same object is added twice S3 will raise a error, as we are content addressed we can just accept whichever of the objects so we can ignore that error if (e.StatusCode == HttpStatusCode.Conflict) { return; } if (e.StatusCode == HttpStatusCode.TooManyRequests) { throw new ResourceHasToManyRequestsException(e); } if (e.StatusCode == HttpStatusCode.ServiceUnavailable && (e.ErrorCode == "SlowDown" || e.ErrorCode == "503 SlowDown")) { throw new ResourceHasToManyRequestsException(e); } throw; } catch (HttpIOException e) { if (i < MaxRetryAttempts-1) { _logger.LogWarning("Http IO exception when writing multipart blob to S3, retrying"); // retry continue; } // to many retries, give up _logger.LogError(e, "Http IO exception when writing multipart blob to S3"); throw; } catch (IOException e) { if (i < MaxRetryAttempts-1) { _logger.LogWarning("Http IO exception when writing multipart blob to S3, retrying"); // retry continue; } // to many retries, give up _logger.LogError(e, "IO exception when writing multipart blob to S3"); throw; } } } finally { payload?.Dispose(); } } private async Task GetMultipartAsync(string path, CancellationToken cancellationToken) { try { GetObjectRequest request = new GetObjectRequest { BucketName = _bucketName, Key = path, PartNumber = 1}; GetObjectResponse response = await _amazonS3.GetObjectAsync(request, cancellationToken); // parts are only set of blobs that was uploaded using multipart, otherwise part 1 is the whole file if (response.PartsCount > 1) { // object had parts, we download them in parallel // -1 because parts start at 1 int partsCount = response.PartsCount.Value; byte[][] parts = new byte[partsCount][]; long[] lengths = new long[partsCount]; try { { parts[0] = ArrayPool.Shared.Rent((int)response.ContentLength); lengths[0] = response.ContentLength; await using MemoryStream ms = new MemoryStream(parts[0]); await response.ResponseStream.CopyToAsync(ms, cancellationToken); } // multipart object, download it in parts, we already have the first part await Parallel.ForEachAsync(Enumerable.Range(2, partsCount-1), cancellationToken, async (part, token) => { GetObjectRequest partRequest = new GetObjectRequest { BucketName = _bucketName, Key = path, PartNumber = part }; GetObjectResponse partResponse = await _amazonS3.GetObjectAsync(partRequest, cancellationToken); // the first part was retrieved in the initial request parts[part-1] = ArrayPool.Shared.Rent((int)partResponse.ContentLength); lengths[part-1] = partResponse.ContentLength; await using MemoryStream ms = new MemoryStream(parts[part-1]); await partResponse.ResponseStream.CopyToAsync(ms, cancellationToken); }); long totalSize = lengths.Sum(); if (totalSize < _settings.CurrentValue.MultiPartMaxMemoryBufferSize) { Stream s = new MemoryStream(new byte[(int)totalSize]); for (int i = 0; i < parts.Length; i++) { byte[] part = parts[i]; await s.WriteAsync(part, 0, (int)lengths[i], cancellationToken); } s.Position = 0; return new BlobContents(s, totalSize); } else { // large payload - we have to buffer to file using FilesystemBufferedPayloadWriter writer = _payloadFactory.CreateFilesystemBufferedPayloadWriter("s3-download"); { await using Stream s = writer.GetWritableStream(); for (int i = 0; i < parts.Length; i++) { byte[] part = parts[i]; await s.WriteAsync(part, 0, (int)lengths[i], cancellationToken); } } return new BlobContents(writer.Done()); } } finally { for (int i = 0; i < partsCount; i++) { // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract // This should never be null but during exceptions we have seen it be null so to avoid throwing more exceptions that hide the original exceptions we just null check here if (parts[i] != null) { ArrayPool.Shared.Return(parts[i]); } } } } else { return new BlobContents(response.ResponseStream, response.ContentLength); } } catch (AmazonS3Exception e) { if (e.ErrorCode == "NoSuchKey") { return null; } if (e.ErrorCode == "NoSuchBucket") { return null; } if (e.StatusCode == HttpStatusCode.TooManyRequests) { throw new ResourceHasToManyRequestsException(e); } throw; } } public async Task TryReadAsync(string path, LastAccessTrackingFlags flags = LastAccessTrackingFlags.DoTracking, CancellationToken cancellationToken = default) { if (_settings.CurrentValue.UseMultiPartDownload) { return await GetMultipartAsync(path, cancellationToken); } else { GetObjectResponse response; try { response = await _amazonS3.GetObjectAsync(_bucketName, path, cancellationToken); } catch (AmazonS3Exception e) { if (e.ErrorCode == "NoSuchKey") { return null; } if (e.ErrorCode == "NoSuchBucket") { return null; } throw; } return new BlobContents(response.ResponseStream, response.ContentLength); } } public async Task ExistsAsync(string path, CancellationToken cancellationToken) { try { await _amazonS3.GetObjectMetadataAsync(_bucketName, path, cancellationToken); } catch (AmazonS3Exception e) { // if the object does not exist we get a not found status code if (e.StatusCode == HttpStatusCode.NotFound) { return false; } // something else happened, lets just process it as usual } return true; } public async IAsyncEnumerable<(string, DateTime)> ListAsync([EnumeratorCancellation] CancellationToken cancellationToken) { if (_settings.CurrentValue.PerPrefixListing) { List hashPrefixes = new List(65536); int i = 0; for (int a = 0; a <= byte.MaxValue; a++) { for (int b = 0; b <= byte.MaxValue; b++) { hashPrefixes.Add(StringUtils.FormatAsHexString(new byte[] { (byte)a, (byte)b })); i++; } } hashPrefixes.Shuffle(); if (!await AmazonS3Util.DoesS3BucketExistV2Async(_amazonS3, _bucketName)) { yield break; } foreach (string hashPrefix in hashPrefixes) { ListObjectsV2Request request = new ListObjectsV2Request { BucketName = _bucketName, Prefix = hashPrefix, MaxKeys = _settings.CurrentValue.PerPrefixMaxKeys }; ListObjectsV2Response response; do { response = await _amazonS3.ListObjectsV2Async(request, cancellationToken); foreach (S3Object obj in response.S3Objects) { yield return (obj.Key, obj.LastModified); } request.ContinuationToken = response.NextContinuationToken; } while (response.IsTruncated); } } else { if (!await AmazonS3Util.DoesS3BucketExistV2Async(_amazonS3, _bucketName)) { yield break; } ListObjectsV2Request request = new ListObjectsV2Request { BucketName = _bucketName }; ListObjectsV2Response response; do { response = await _amazonS3.ListObjectsV2Async(request, cancellationToken); foreach (S3Object obj in response.S3Objects) { yield return (obj.Key, obj.LastModified); } request.ContinuationToken = response.NextContinuationToken; } while (response.IsTruncated); } } public async Task DeleteAsync(string path, CancellationToken cancellationToken) { await _amazonS3.DeleteObjectAsync(_bucketName, path, cancellationToken); } public ValueTask GetReadRedirectAsync(string path) { return new ValueTask(GetPresignedUrl(path, HttpVerb.GET)); } public ValueTask GetWriteRedirectAsync(string path) { return new ValueTask(GetPresignedUrl(path, HttpVerb.PUT)); } /// /// Helper method to generate a presigned URL for a request /// Uri? GetPresignedUrl(string path, HttpVerb verb, int? partNumber = null, string? uploadId = null) { using TelemetrySpan span = _tracer.StartActiveSpan("s3.BuildPresignedUrl") .SetAttribute("Path", path) ; try { GetPreSignedUrlRequest signedUrlRequest = new GetPreSignedUrlRequest(); signedUrlRequest.BucketName = _bucketName; signedUrlRequest.Key = path; signedUrlRequest.Verb = verb; signedUrlRequest.Protocol = _settings.CurrentValue.AssumeHttpForRedirectUri ? Protocol.HTTP : Protocol.HTTPS; signedUrlRequest.Expires = DateTime.UtcNow.AddHours(3.0); if (partNumber.HasValue) { signedUrlRequest.PartNumber = partNumber.Value; } if (uploadId != null) { signedUrlRequest.UploadId = uploadId; } string url = _amazonS3.GetPreSignedURL(signedUrlRequest); return new Uri(url); } catch (Exception ex) { _logger.LogWarning(ex, "Unable to get presigned url for {Path} from S3", path); return null; } } public async Task GetMetadataAsync(string path) { try { GetObjectAttributesResponse? metadata = await _amazonS3.GetObjectAttributesAsync(new GetObjectAttributesRequest { BucketName = _bucketName, Key = path, ObjectAttributes = new List() { ObjectAttributes.ObjectSize } }); if (metadata == null) { return null; } return new BlobMetadata(metadata.ObjectSize, metadata.LastModified); } catch (AmazonS3Exception e) { // if the object does not exist we get a not found status code if (e.StatusCode == HttpStatusCode.NotFound) { return null; } throw; } catch (XmlException) { // Multipart uploaded objects can cause invalid xml objects generated in S3 return null; } } public async Task StartMultipartUploadAsync(string path) { InitiateMultipartUploadResponse response = await _amazonS3.InitiateMultipartUploadAsync(new InitiateMultipartUploadRequest {BucketName = _bucketName, Key = path}); return response.UploadId; } public async Task CompleteMultipartUploadAsync(string path, string uploadId, List partIds) { Dictionary etags = new (); int? partNumberMarker = 0; for (int i = 0; i < 10; i++) { // s3 only allows you to list 1000 parts per request, but supports up to 10_000 parts so we need to potentially list this 10 times to get all parts ListPartsRequest listPartsRequest = new() {BucketName = _bucketName, Key = path, UploadId = uploadId, PartNumberMarker = partNumberMarker.ToString()}; ListPartsResponse partList = await _amazonS3.ListPartsAsync(listPartsRequest); foreach (PartDetail? part in partList.Parts) { etags.Add(part.PartNumber, new PartETag(part.PartNumber, part.ETag)); } if (!partList.IsTruncated) { break; } partNumberMarker = partList.NextPartNumberMarker; } // check if any part is missing List missingPartIds = new(); foreach (string partId in partIds) { if (!etags.ContainsKey(int.Parse(partId))) { missingPartIds.Add(partId); } } if (missingPartIds.Count > 0) { throw new MissingMultipartPartsException(missingPartIds!); } await _amazonS3.CompleteMultipartUploadAsync(new CompleteMultipartUploadRequest {BucketName = _bucketName, Key = path, UploadId = uploadId, PartETags = etags.Values.ToList()}); } public async Task PutMultipartPartAsync(string path, string uploadId, string partIdentifier, byte[] data) { await _amazonS3.UploadPartAsync(new UploadPartRequest { BucketName = _bucketName, Key = path, UploadId = uploadId, PartNumber = int.Parse(partIdentifier), InputStream = new MemoryStream(data)}); } public Uri? GetWriteRedirectForPart(string path, string uploadId, string partIdentifier) { Uri? uri = GetPresignedUrl(path, HttpVerb.PUT, partNumber: int.Parse(partIdentifier), uploadId: uploadId); if (uri != null) { return uri; } return null; } public async Task RenameMultipartBlobAsync(string blobName, string targetBlobName) { await CopyBlobAsync(this, blobName, targetBlobName); // delete the old multipart object await _amazonS3.DeleteObjectAsync(_bucketName, blobName); } internal string GetBucketName() { return _bucketName; } public async Task CopyBlobAsync(AmazonStorageBackend source, string sourcePath, string destinationPath) { if (string.Equals(source._bucketName, _bucketName, StringComparison.OrdinalIgnoreCase)) { // copy within the bucket if (string.Equals(sourcePath, destinationPath, StringComparison.OrdinalIgnoreCase)) { // copy to the same object, this already exists so no need to do any work return; } } GetObjectMetadataResponse metadata = await source._amazonS3.GetObjectMetadataAsync(new GetObjectMetadataRequest {BucketName = source._bucketName, Key = sourcePath, PartNumber = 1} ); if (!metadata.PartsCount.HasValue) { // not a multipart object await _amazonS3.CopyObjectAsync(new CopyObjectRequest() { SourceBucket = source._bucketName, DestinationBucket = _bucketName, SourceKey = sourcePath, DestinationKey = destinationPath, }); return; } InitiateMultipartUploadResponse startMultipartUpload = await _amazonS3.InitiateMultipartUploadAsync(_bucketName, destinationPath); string uploadId = startMultipartUpload.UploadId; ConcurrentBag parts = new(); long chunkSize = metadata.ContentLength; long objectLength = long.Parse(metadata.ContentRange.Split('/')[1]); await Parallel.ForAsync(1, metadata.PartsCount.Value + 1, async(int i, CancellationToken token) => { long startOffset = (i - 1) * chunkSize; long endOffset = (i * chunkSize)-1; // last byte is exclusive endOffset = Math.Min(endOffset, objectLength-1); CopyPartResponse response = await _amazonS3.CopyPartAsync(new CopyPartRequest() { SourceBucket = source._bucketName, DestinationBucket = _bucketName, PartNumber = i, SourceKey = sourcePath, DestinationKey = destinationPath, UploadId = uploadId, FirstByte = startOffset, LastByte = endOffset, }, token); parts.Add(new PartETag(i, response.ETag)); }); await _amazonS3.CompleteMultipartUploadAsync(new CompleteMultipartUploadRequest {BucketName = _bucketName, UploadId = uploadId, Key = destinationPath, PartETags = parts.ToList()}); } } public class AmazonS3Factory { private readonly IServiceProvider _provider; public AmazonS3Factory(IServiceProvider provider) { _provider = provider; } public IAmazonS3 Create(string? region = null) { S3Settings s3Settings = _provider.GetService>()!.CurrentValue; AWSCredentials awsCredentials = _provider.GetService()!; AWSOptions awsOptions = _provider.GetService()!; if (s3Settings.ConnectionString.ToUpper() != "AWS") { awsOptions.DefaultClientConfig.ServiceURL = s3Settings.ConnectionString; } if (region != null) { awsOptions.Region = RegionEndpoint.GetBySystemName(region); } awsOptions.Credentials = awsCredentials; IAmazonS3 serviceClient = awsOptions.CreateServiceClient(); if (serviceClient.Config is AmazonS3Config c) { if (s3Settings.ForceAWSPathStyle) { c.ForcePathStyle = true; // this works around a issue with presigned urls when using minio to emulate S3 due to AWS SDK not defaulting to signature v4 for presigned urls outside of us-east-1 // see https://github.com/minio/minio/issues/19887 AWSConfigsS3.UseSignatureVersion4 = true; } if (s3Settings.UseArnRegion.HasValue) { c.UseArnRegion = s3Settings.UseArnRegion.Value; } } else { throw new NotImplementedException(); } return serviceClient; } } public static class BlobIdentifierExtensions { public static string AsS3Key(this BlobId blobIdentifier) { string s = blobIdentifier.ToString(); string prefix = s.Substring(0, 4); return $"{prefix}/{s}"; } } }