Created
September 26, 2025 17:52
-
-
Save recalde/7264ebfc4d120f0c103e9a832ea70e43 to your computer and use it in GitHub Desktop.
s3LogProvider
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // S3LogFileProvider.cs | |
| using System; | |
| using System.Buffers.Text; | |
| using System.Collections.Concurrent; | |
| using System.Collections.Generic; | |
| using System.Globalization; | |
| using System.IO; | |
| using System.Linq; | |
| using System.Runtime.CompilerServices; | |
| using System.Text.RegularExpressions; | |
| using System.Threading; | |
| using System.Threading.Tasks; | |
| using Amazon; | |
| using Amazon.S3; | |
| using Amazon.S3.Model; | |
| namespace YourApp.Logging | |
| { | |
| /// <summary> | |
| /// Provides listing and local-cached access to S3-stored log files organized under: | |
| /// s3://{S3_LOG_BUCKET}/{S3_LOG_PREFIX}/{YYYYMMDD}/... | |
| /// Uses ETag sidecars to avoid re-downloading unchanged files. | |
| /// </summary> | |
| public sealed class S3LogFileProvider | |
| { | |
| public const string EnvBucket = "S3_LOG_BUCKET"; | |
| public const string EnvPrefix = "S3_LOG_PREFIX"; | |
| public const string EnvTmpDir = "TMP_DIR"; // optional; defaults to Path.GetTempPath() | |
| public const string EnvAwsRegion= "AWS_REGION"; // optional; AWS SDK can also resolve from standard config | |
| private static readonly Regex YmdRegex = new(@"^\d{8}$", RegexOptions.Compiled); | |
| private readonly IAmazonS3 _s3; | |
| private readonly string _bucket; | |
| private readonly string _basePrefix; // trailing slash trimmed | |
| private readonly string _tmpRoot; | |
| // Prevent multiple concurrent downloads of the same key | |
| private readonly ConcurrentDictionary<string, SemaphoreSlim> _keyLocks = new(); | |
| public S3LogFileProvider(IAmazonS3? s3Client = null) | |
| { | |
| _bucket = Environment.GetEnvironmentVariable(EnvBucket) ?? | |
| throw new InvalidOperationException($"Missing env {EnvBucket}"); | |
| var pfx = Environment.GetEnvironmentVariable(EnvPrefix) ?? | |
| throw new InvalidOperationException($"Missing env {EnvPrefix}"); | |
| _basePrefix = TrimSlashes(pfx); | |
| _tmpRoot = Environment.GetEnvironmentVariable(EnvTmpDir); | |
| if (string.IsNullOrWhiteSpace(_tmpRoot)) | |
| _tmpRoot = Path.GetTempPath(); | |
| Directory.CreateDirectory(_tmpRoot); | |
| if (s3Client is not null) | |
| { | |
| _s3 = s3Client; | |
| } | |
| else | |
| { | |
| var region = Environment.GetEnvironmentVariable(EnvAwsRegion); | |
| _s3 = string.IsNullOrWhiteSpace(region) | |
| ? new AmazonS3Client() // region from default chain | |
| : new AmazonS3Client(RegionEndpoint.GetBySystemName(region)); | |
| } | |
| } | |
| /// <summary> | |
| /// Returns distinct calc dates (YYYYMMDD) discovered as immediate child prefixes under the base prefix. | |
| /// Sorted descending (most recent first). | |
| /// </summary> | |
| public async Task<IReadOnlyList<string>> ListCalcDatesAsync(CancellationToken ct = default) | |
| { | |
| var dates = new HashSet<string>(); | |
| string? continuation = null; | |
| do | |
| { | |
| var req = new ListObjectsV2Request | |
| { | |
| BucketName = _bucket, | |
| Prefix = _basePrefix.Length == 0 ? null : _basePrefix + "/", | |
| Delimiter = "/", | |
| ContinuationToken = continuation | |
| }; | |
| var resp = await _s3.ListObjectsV2Async(req, ct).ConfigureAwait(false); | |
| foreach (var cp in resp.CommonPrefixes) | |
| { | |
| var tail = LastSegment(cp); | |
| if (YmdRegex.IsMatch(tail)) | |
| dates.Add(tail); | |
| } | |
| continuation = resp.IsTruncated ? resp.NextContinuationToken : null; | |
| } | |
| while (continuation != null && !ct.IsCancellationRequested); | |
| // Sort newest first | |
| var sorted = dates | |
| .OrderByDescending(d => d, StringComparer.Ordinal) | |
| .ToList() | |
| .AsReadOnly(); | |
| return sorted; | |
| } | |
| /// <summary> | |
| /// Enumerates local references to all log files for a calc date. | |
| /// Each file is downloaded (if needed) into the tmp cache and reused by matching ETag. | |
| /// </summary> | |
| public async IAsyncEnumerable<LocalLogFile> EnumerateLogsAsync( | |
| string calcDate, | |
| [EnumeratorCancellation] CancellationToken ct = default) | |
| { | |
| if (!YmdRegex.IsMatch(calcDate)) | |
| throw new ArgumentException("calcDate must be in YYYYMMDD format.", nameof(calcDate)); | |
| var prefix = $"{_basePrefix}/{calcDate}/"; | |
| string? continuation = null; | |
| do | |
| { | |
| var req = new ListObjectsV2Request | |
| { | |
| BucketName = _bucket, | |
| Prefix = prefix, | |
| ContinuationToken = continuation | |
| }; | |
| var resp = await _s3.ListObjectsV2Async(req, ct).ConfigureAwait(false); | |
| foreach (var obj in resp.S3Objects.Where(o => !o.Key.EndsWith("/", StringComparison.Ordinal))) | |
| { | |
| var local = await GetOrDownloadAsync(obj, ct).ConfigureAwait(false); | |
| yield return local; | |
| } | |
| continuation = resp.IsTruncated ? resp.NextContinuationToken : null; | |
| } | |
| while (continuation != null && !ct.IsCancellationRequested); | |
| } | |
| /// <summary> | |
| /// Resolves a single object to a local cached file. Uses .etag sidecar for validation. | |
| /// </summary> | |
| public async Task<LocalLogFile> GetOrDownloadAsync(S3Object obj, CancellationToken ct = default) | |
| { | |
| // Consistent local path based on S3 key: put under tmpRoot/s3cache/{bucket}/{key} | |
| var localPath = BuildLocalPath(obj.Key); | |
| var etagPath = localPath + ".etag"; | |
| // Ensure per-key download serialization | |
| var keyLock = _keyLocks.GetOrAdd(obj.Key, _ => new SemaphoreSlim(1, 1)); | |
| await keyLock.WaitAsync(ct).ConfigureAwait(false); | |
| try | |
| { | |
| // If exists and ETag matches, reuse | |
| if (File.Exists(localPath) && File.Exists(etagPath)) | |
| { | |
| var cachedEtag = await File.ReadAllTextAsync(etagPath, ct).ConfigureAwait(false); | |
| if (NormalizeEtag(cachedEtag) == NormalizeEtag(obj.ETag)) | |
| { | |
| return new LocalLogFile(obj.Key, localPath, obj.Size, obj.LastModified, obj.ETag); | |
| } | |
| } | |
| // Otherwise download to temp, then move into place atomically | |
| Directory.CreateDirectory(Path.GetDirectoryName(localPath)!); | |
| // Optional: HEAD to confirm ETag before downloading (can save bandwidth on rare races) | |
| var head = await _s3.GetObjectMetadataAsync(_bucket, obj.Key, ct).ConfigureAwait(false); | |
| var currentEtag = head.ETag; | |
| if (File.Exists(localPath) && File.Exists(etagPath)) | |
| { | |
| var cachedEtag = await File.ReadAllTextAsync(etagPath, ct).ConfigureAwait(false); | |
| if (NormalizeEtag(cachedEtag) == NormalizeEtag(currentEtag)) | |
| { | |
| // Someone else refreshed it just now | |
| return new LocalLogFile(obj.Key, localPath, head.ContentLength, head.LastModified, currentEtag); | |
| } | |
| } | |
| var tmpFile = localPath + ".downloading"; | |
| if (File.Exists(tmpFile)) TryDelete(tmpFile); | |
| using (var get = await _s3.GetObjectAsync(_bucket, obj.Key, ct).ConfigureAwait(false)) | |
| using (var fs = File.Create(tmpFile)) | |
| { | |
| await get.ResponseStream.CopyToAsync(fs, ct).ConfigureAwait(false); | |
| } | |
| // Replace atomically | |
| if (File.Exists(localPath)) File.Delete(localPath); | |
| File.Move(tmpFile, localPath); | |
| await File.WriteAllTextAsync(etagPath, currentEtag, ct).ConfigureAwait(false); | |
| return new LocalLogFile(obj.Key, localPath, head.ContentLength, head.LastModified, currentEtag); | |
| } | |
| finally | |
| { | |
| keyLock.Release(); | |
| } | |
| } | |
| /// <summary> | |
| /// Optional helper to clean old cached files (e.g., on startup or periodically). | |
| /// </summary> | |
| public int PurgeCacheOlderThan(TimeSpan age) | |
| { | |
| var root = Path.Combine(_tmpRoot, "s3cache", SafePath(_bucket)); | |
| if (!Directory.Exists(root)) return 0; | |
| var cutoff = DateTimeOffset.UtcNow - age; | |
| int deleted = 0; | |
| foreach (var path in Directory.EnumerateFiles(root, "*", SearchOption.AllDirectories)) | |
| { | |
| try | |
| { | |
| var info = new FileInfo(path); | |
| if (info.LastWriteTimeUtc < cutoff.UtcDateTime) | |
| { | |
| info.Delete(); | |
| deleted++; | |
| } | |
| } | |
| catch | |
| { | |
| // best effort | |
| } | |
| } | |
| return deleted; | |
| } | |
| // ----------------- helpers ----------------- | |
| private string BuildLocalPath(string s3Key) | |
| { | |
| // Ensure stable, nested path to avoid huge single-dir fanout | |
| var full = Path.Combine(_tmpRoot, "s3cache", SafePath(_bucket), SafePath(s3Key)); | |
| var dir = Path.GetDirectoryName(full)!; | |
| Directory.CreateDirectory(dir); | |
| return full; | |
| } | |
| private static string SafePath(string part) | |
| { | |
| // Replace illegal filename chars on most filesystems | |
| foreach (var c in Path.GetInvalidFileNameChars()) | |
| part = part.Replace(c, '_'); | |
| // Preserve directory structure inside s3 key | |
| var segments = part.Split(new[] { '/', '\\' }, StringSplitOptions.RemoveEmptyEntries) | |
| .Select(seg => string.Join("_", seg.Split(Path.GetInvalidFileNameChars(), StringSplitOptions.RemoveEmptyEntries))); | |
| return Path.Combine(segments.ToArray()); | |
| } | |
| private static string TrimSlashes(string s) | |
| { | |
| return s.Trim().Trim('/').Trim(); | |
| } | |
| private static string LastSegment(string prefixWithSlash) | |
| { | |
| // e.g. "env/logs/20250115/" -> "20250115" | |
| var trimmed = prefixWithSlash.TrimEnd('/'); | |
| var idx = trimmed.LastIndexOf('/'); | |
| return idx >= 0 ? trimmed[(idx + 1)..] : trimmed; | |
| } | |
| private static void TryDelete(string path) | |
| { | |
| try { File.Delete(path); } catch { /* best effort */ } | |
| } | |
| private static string NormalizeEtag(string? etag) | |
| { | |
| if (string.IsNullOrWhiteSpace(etag)) return string.Empty; | |
| etag = etag.Trim(); | |
| if (etag.StartsWith("\"") && etag.EndsWith("\"")) | |
| etag = etag[1..^1]; | |
| return etag; | |
| } | |
| } | |
| /// <summary> | |
| /// Represents a locally-cached S3 log object. | |
| /// </summary> | |
| public sealed record LocalLogFile( | |
| string S3Key, | |
| string LocalPath, | |
| long Size, | |
| DateTime LastModified, | |
| string ETag); | |
| } | |
| // Example.cs | |
| using System; | |
| using System.Threading; | |
| using System.Threading.Tasks; | |
| using Amazon.S3; | |
| using YourApp.Logging; | |
| class Example | |
| { | |
| public static async Task Main() | |
| { | |
| // Optionally inject your own IAmazonS3 (for custom creds/retries) | |
| var provider = new S3LogFileProvider(); | |
| var dates = await provider.ListCalcDatesAsync(); | |
| Console.WriteLine("Calc Dates:"); | |
| foreach (var d in dates) Console.WriteLine($" - {d}"); | |
| if (dates.Count > 0) | |
| { | |
| Console.WriteLine($"\nEnumerating logs for {dates[0]}"); | |
| await foreach (var log in provider.EnumerateLogsAsync(dates[0])) | |
| { | |
| Console.WriteLine($"{log.S3Key} -> {log.LocalPath} ({log.Size} bytes, etag={log.ETag})"); | |
| // read/process the local file here | |
| } | |
| } | |
| // Optional cache cleanup | |
| var deleted = provider.PurgeCacheOlderThan(TimeSpan.FromHours(6)); | |
| Console.WriteLine($"Purged {deleted} old cached files."); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment