Skip to content

Instantly share code, notes, and snippets.

@recalde
Created September 26, 2025 17:52
Show Gist options
  • Select an option

  • Save recalde/7264ebfc4d120f0c103e9a832ea70e43 to your computer and use it in GitHub Desktop.

Select an option

Save recalde/7264ebfc4d120f0c103e9a832ea70e43 to your computer and use it in GitHub Desktop.
s3LogProvider
// S3LogFileProvider.cs
using System;
using System.Buffers.Text;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Amazon;
using Amazon.S3;
using Amazon.S3.Model;
namespace YourApp.Logging
{
/// <summary>
/// Provides listing and local-cached access to S3-stored log files organized under:
/// s3://{S3_LOG_BUCKET}/{S3_LOG_PREFIX}/{YYYYMMDD}/...
/// Uses ETag sidecars to avoid re-downloading unchanged files.
/// </summary>
public sealed class S3LogFileProvider
{
public const string EnvBucket = "S3_LOG_BUCKET";
public const string EnvPrefix = "S3_LOG_PREFIX";
public const string EnvTmpDir = "TMP_DIR"; // optional; defaults to Path.GetTempPath()
public const string EnvAwsRegion= "AWS_REGION"; // optional; AWS SDK can also resolve from standard config
private static readonly Regex YmdRegex = new(@"^\d{8}$", RegexOptions.Compiled);
private readonly IAmazonS3 _s3;
private readonly string _bucket;
private readonly string _basePrefix; // trailing slash trimmed
private readonly string _tmpRoot;
// Prevent multiple concurrent downloads of the same key
private readonly ConcurrentDictionary<string, SemaphoreSlim> _keyLocks = new();
public S3LogFileProvider(IAmazonS3? s3Client = null)
{
_bucket = Environment.GetEnvironmentVariable(EnvBucket) ??
throw new InvalidOperationException($"Missing env {EnvBucket}");
var pfx = Environment.GetEnvironmentVariable(EnvPrefix) ??
throw new InvalidOperationException($"Missing env {EnvPrefix}");
_basePrefix = TrimSlashes(pfx);
_tmpRoot = Environment.GetEnvironmentVariable(EnvTmpDir);
if (string.IsNullOrWhiteSpace(_tmpRoot))
_tmpRoot = Path.GetTempPath();
Directory.CreateDirectory(_tmpRoot);
if (s3Client is not null)
{
_s3 = s3Client;
}
else
{
var region = Environment.GetEnvironmentVariable(EnvAwsRegion);
_s3 = string.IsNullOrWhiteSpace(region)
? new AmazonS3Client() // region from default chain
: new AmazonS3Client(RegionEndpoint.GetBySystemName(region));
}
}
/// <summary>
/// Returns distinct calc dates (YYYYMMDD) discovered as immediate child prefixes under the base prefix.
/// Sorted descending (most recent first).
/// </summary>
public async Task<IReadOnlyList<string>> ListCalcDatesAsync(CancellationToken ct = default)
{
var dates = new HashSet<string>();
string? continuation = null;
do
{
var req = new ListObjectsV2Request
{
BucketName = _bucket,
Prefix = _basePrefix.Length == 0 ? null : _basePrefix + "/",
Delimiter = "/",
ContinuationToken = continuation
};
var resp = await _s3.ListObjectsV2Async(req, ct).ConfigureAwait(false);
foreach (var cp in resp.CommonPrefixes)
{
var tail = LastSegment(cp);
if (YmdRegex.IsMatch(tail))
dates.Add(tail);
}
continuation = resp.IsTruncated ? resp.NextContinuationToken : null;
}
while (continuation != null && !ct.IsCancellationRequested);
// Sort newest first
var sorted = dates
.OrderByDescending(d => d, StringComparer.Ordinal)
.ToList()
.AsReadOnly();
return sorted;
}
/// <summary>
/// Enumerates local references to all log files for a calc date.
/// Each file is downloaded (if needed) into the tmp cache and reused by matching ETag.
/// </summary>
public async IAsyncEnumerable<LocalLogFile> EnumerateLogsAsync(
string calcDate,
[EnumeratorCancellation] CancellationToken ct = default)
{
if (!YmdRegex.IsMatch(calcDate))
throw new ArgumentException("calcDate must be in YYYYMMDD format.", nameof(calcDate));
var prefix = $"{_basePrefix}/{calcDate}/";
string? continuation = null;
do
{
var req = new ListObjectsV2Request
{
BucketName = _bucket,
Prefix = prefix,
ContinuationToken = continuation
};
var resp = await _s3.ListObjectsV2Async(req, ct).ConfigureAwait(false);
foreach (var obj in resp.S3Objects.Where(o => !o.Key.EndsWith("/", StringComparison.Ordinal)))
{
var local = await GetOrDownloadAsync(obj, ct).ConfigureAwait(false);
yield return local;
}
continuation = resp.IsTruncated ? resp.NextContinuationToken : null;
}
while (continuation != null && !ct.IsCancellationRequested);
}
/// <summary>
/// Resolves a single object to a local cached file. Uses .etag sidecar for validation.
/// </summary>
public async Task<LocalLogFile> GetOrDownloadAsync(S3Object obj, CancellationToken ct = default)
{
// Consistent local path based on S3 key: put under tmpRoot/s3cache/{bucket}/{key}
var localPath = BuildLocalPath(obj.Key);
var etagPath = localPath + ".etag";
// Ensure per-key download serialization
var keyLock = _keyLocks.GetOrAdd(obj.Key, _ => new SemaphoreSlim(1, 1));
await keyLock.WaitAsync(ct).ConfigureAwait(false);
try
{
// If exists and ETag matches, reuse
if (File.Exists(localPath) && File.Exists(etagPath))
{
var cachedEtag = await File.ReadAllTextAsync(etagPath, ct).ConfigureAwait(false);
if (NormalizeEtag(cachedEtag) == NormalizeEtag(obj.ETag))
{
return new LocalLogFile(obj.Key, localPath, obj.Size, obj.LastModified, obj.ETag);
}
}
// Otherwise download to temp, then move into place atomically
Directory.CreateDirectory(Path.GetDirectoryName(localPath)!);
// Optional: HEAD to confirm ETag before downloading (can save bandwidth on rare races)
var head = await _s3.GetObjectMetadataAsync(_bucket, obj.Key, ct).ConfigureAwait(false);
var currentEtag = head.ETag;
if (File.Exists(localPath) && File.Exists(etagPath))
{
var cachedEtag = await File.ReadAllTextAsync(etagPath, ct).ConfigureAwait(false);
if (NormalizeEtag(cachedEtag) == NormalizeEtag(currentEtag))
{
// Someone else refreshed it just now
return new LocalLogFile(obj.Key, localPath, head.ContentLength, head.LastModified, currentEtag);
}
}
var tmpFile = localPath + ".downloading";
if (File.Exists(tmpFile)) TryDelete(tmpFile);
using (var get = await _s3.GetObjectAsync(_bucket, obj.Key, ct).ConfigureAwait(false))
using (var fs = File.Create(tmpFile))
{
await get.ResponseStream.CopyToAsync(fs, ct).ConfigureAwait(false);
}
// Replace atomically
if (File.Exists(localPath)) File.Delete(localPath);
File.Move(tmpFile, localPath);
await File.WriteAllTextAsync(etagPath, currentEtag, ct).ConfigureAwait(false);
return new LocalLogFile(obj.Key, localPath, head.ContentLength, head.LastModified, currentEtag);
}
finally
{
keyLock.Release();
}
}
/// <summary>
/// Optional helper to clean old cached files (e.g., on startup or periodically).
/// </summary>
public int PurgeCacheOlderThan(TimeSpan age)
{
var root = Path.Combine(_tmpRoot, "s3cache", SafePath(_bucket));
if (!Directory.Exists(root)) return 0;
var cutoff = DateTimeOffset.UtcNow - age;
int deleted = 0;
foreach (var path in Directory.EnumerateFiles(root, "*", SearchOption.AllDirectories))
{
try
{
var info = new FileInfo(path);
if (info.LastWriteTimeUtc < cutoff.UtcDateTime)
{
info.Delete();
deleted++;
}
}
catch
{
// best effort
}
}
return deleted;
}
// ----------------- helpers -----------------
private string BuildLocalPath(string s3Key)
{
// Ensure stable, nested path to avoid huge single-dir fanout
var full = Path.Combine(_tmpRoot, "s3cache", SafePath(_bucket), SafePath(s3Key));
var dir = Path.GetDirectoryName(full)!;
Directory.CreateDirectory(dir);
return full;
}
private static string SafePath(string part)
{
// Replace illegal filename chars on most filesystems
foreach (var c in Path.GetInvalidFileNameChars())
part = part.Replace(c, '_');
// Preserve directory structure inside s3 key
var segments = part.Split(new[] { '/', '\\' }, StringSplitOptions.RemoveEmptyEntries)
.Select(seg => string.Join("_", seg.Split(Path.GetInvalidFileNameChars(), StringSplitOptions.RemoveEmptyEntries)));
return Path.Combine(segments.ToArray());
}
private static string TrimSlashes(string s)
{
return s.Trim().Trim('/').Trim();
}
private static string LastSegment(string prefixWithSlash)
{
// e.g. "env/logs/20250115/" -> "20250115"
var trimmed = prefixWithSlash.TrimEnd('/');
var idx = trimmed.LastIndexOf('/');
return idx >= 0 ? trimmed[(idx + 1)..] : trimmed;
}
private static void TryDelete(string path)
{
try { File.Delete(path); } catch { /* best effort */ }
}
private static string NormalizeEtag(string? etag)
{
if (string.IsNullOrWhiteSpace(etag)) return string.Empty;
etag = etag.Trim();
if (etag.StartsWith("\"") && etag.EndsWith("\""))
etag = etag[1..^1];
return etag;
}
}
/// <summary>
/// Represents a locally-cached S3 log object.
/// </summary>
public sealed record LocalLogFile(
string S3Key,
string LocalPath,
long Size,
DateTime LastModified,
string ETag);
}
// Example.cs
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.S3;
using YourApp.Logging;
class Example
{
public static async Task Main()
{
// Optionally inject your own IAmazonS3 (for custom creds/retries)
var provider = new S3LogFileProvider();
var dates = await provider.ListCalcDatesAsync();
Console.WriteLine("Calc Dates:");
foreach (var d in dates) Console.WriteLine($" - {d}");
if (dates.Count > 0)
{
Console.WriteLine($"\nEnumerating logs for {dates[0]}");
await foreach (var log in provider.EnumerateLogsAsync(dates[0]))
{
Console.WriteLine($"{log.S3Key} -> {log.LocalPath} ({log.Size} bytes, etag={log.ETag})");
// read/process the local file here
}
}
// Optional cache cleanup
var deleted = provider.PurgeCacheOlderThan(TimeSpan.FromHours(6));
Console.WriteLine($"Purged {deleted} old cached files.");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment