Skip to content

Instantly share code, notes, and snippets.

@gianpaj
Created October 1, 2025 22:16
Show Gist options
  • Select an option

  • Save gianpaj/cdc523258e090f299e39692498f62099 to your computer and use it in GitHub Desktop.

Select an option

Save gianpaj/cdc523258e090f299e39692498f62099 to your computer and use it in GitHub Desktop.
LoCoBench - example data
# AquaForge Utility Data Lake (`data_lake`)

> **Hexagonally-architected telemetry platform for municipal & industrial utilities.**  
> Extract, cleanse, transform, and surface multi-modal data (water, power, gas, waste, steam) at utility scale—without coupling business rules to infrastructure.

![CI](https://github.com/aquaforge/data_lake/actions/workflows/ci.yml/badge.svg)
![NPM version](https://img.shields.io/npm/v/@aquaforge/data_lake)
![License](https://img.shields.io/github/license/aquaforge/data_lake)

---

## Table of Contents
1. [Why AquaForge?](#why-aquaforge)
2. [Features](#features)
3. [Architecture](#architecture)
4. [Folder Structure](#folder-structure)
5. [Getting Started](#getting-started)
6. [Usage Examples](#usage-examples)
7. [Extending the Platform](#extending-the-platform)
8. [Testing & Quality](#testing--quality)
9. [Production Deployment](#production-deployment)
10. [FAQ](#faq)
11. [Contributing](#contributing)
12. [License](#license)

---

## Why AquaForge?
Traditional data-lake stacks tend to either:
* Couple domain logic to external infrastructure (difficult to swap cloud providers or field gateways).
* Focus on a single telemetry modality (e.g., **just** water or **just** power) and break when regulatory rules change.
* Trade stream processing performance for nightly batch compliance or vice-versa.

AquaForge fixes these shortcomings by applying **Hexagonal Architecture** to the utility space:
* **Core domain** (metering, SCADA, regulatory data-quality) lives in *pure TypeScript* modules—no direct Kafka/S3/etc. imports.
* **Adapters** (Kafka, S3, OPC-UA, MQTT, TimescaleDB, Grafana, …) wrap infrastructure and are hot-swappable.
* **Pipelines** manage ingestion → validation → transformation → storage via composable strategy chains.
* **Observer** side-cars publish metrics, lineage, and quality outcomes to Prometheus and OpenTelemetry.

---

## Features
| Category              | Highlights                                                                        |
|-----------------------|------------------------------------------------------------------------------------|
| Ingestion             | High-throughput Kafka consumer, OPC-UA SCADA listener, batch CSV/SFTP importer     |
| Validation            | Declarative rule engine: schema, range, temporal consistency, regulatory thresholds|
| Transformation        | Strategy-pattern ETL steps, versioned and hot-deployable                          |
| Storage               | Iceberg/S3 lakehouse, TimescaleDB, Parquet partitioning, auto-compaction          |
| Visualization         | Grafana dashboards, Prometheus metrics, data-lineage explorer                     |
| Scheduling            | cron/Quartz powered—supports pipelined & event-driven schedules                   |
| Observability         | Distributed tracing with OpenTelemetry, per-record audit trail                    |
| Resilience            | Exactly-once semantics, message replay, quorum-based error recovery               |

---

## Architecture
                     ┌──────────────┐
                     │ External IoT │
                     │  Gateways    │
                     └──────┬───────┘
                            │
                    (Adapters: OPC-UA, MQTT)
                            │

┌─────────┐ ┌────────────────▼─────────────┐ ┌─────────────┐ │ Schedulers│ │ Ingestion Ports (hexagon) │ │ Batch Files │ │ cron/evt │ │ ──────────────── │ └─────────────┘ └─────┬────┘ │ KafkaTopicPort │ │ │ FileImportPort │ │ └─────────┬──────────────────┘ │ │ ▼ ▼ ┌────────────┐ ┌─────────────┐ │ Validator │ │ Transformer │ (Pipeline Pattern) └────┬───────┘ └────┬────────┘ │ │ ▼ ▼ ┌─────────────────────────────────────┐ │ Lake Storage (S3 / Iceberg) │ (Infrastructure Adapter) └─────────────────────────────────────┘ ▲ ▲ │ │ ┌─────┴─────┐ ┌─────┴──────┐ │ Grafana │ │ OTEL/Prom │ └────────────┘ └────────────┘


Key Principles:
1. **Dependency-inversion**: core never imports external libs (Kafka, AWS SDK, etc.).
2. **Strategy + Pipeline**: each ETL phase chooses a concrete strategy at runtime.
3. **Event Sourcing**: every state mutation generates an immutable event for replay.
4. **Fail-Fast & Recover**: poisoned messages routed to a dead-letter queue with retry policies.

---

## Folder Structure

data_lake/ ├── apps/ # runnable micro-services (ingest, transform, visualize, …) │ └── ingest-kafka/ ├── core/ # pure domain logic (no frameworks) │ ├── metering/ │ ├── validation/ │ └── transformation/ ├── adapters/ # infrastructure (Kafka, S3, Timescale, …) │ └── s3/ ├── packages/ # shared libraries │ ├── config/ │ ├── logger/ │ └── utils/ ├── scripts/ # one-off operational scripts ├── docs/ # ADRs, diagrams ├── tests/ # unit + e2e tests ├── docker/ # Dockerfiles & compose └── README.md


---

## Getting Started

### 1. Prerequisites
* Node.js ≥ 18.x (LTS)
* Docker ≥ 24.x (for local Kafka, S3, Timescale)
* `pnpm` (preferred) or `npm`, `yarn`

### 2. Clone & Install

```bash
git clone https://github.com/aquaforge/data_lake.git
cd data_lake
pnpm install

3. Bootstrap Local Stack

# spins up Kafka, MinIO (S3-compatible), TimescaleDB, Grafana
docker compose -f docker/compose.local.yml up -d

4. Seed Sample Data

pnpm dlake:seed  # publish mock SCADA & metering messages to Kafka

5. Run Ingestion Service

pnpm --filter @aquaforge/ingest-kafka dev

Logs should show messages flowing to the lakehouse and Grafana dashboards lighting up.


Usage Examples

Programmatic Pipeline Construction

import { IngestionPipeline } from '@aquaforge/core/pipeline';
import { SchemaValidator } from '@aquaforge/core/validation';
import { RegulatoryValidator } from '@aquaforge/validation-regulator';
import { IcebergSink } from '@aquaforge/adapters-iceberg';

const pipeline = new IngestionPipeline()
  .use(new SchemaValidator('meter_reading_v2.json'))
  .use(new RegulatoryValidator({ region: 'EU' }))
  .use(new IcebergSink({
    bucket: 'utility-lake',
    table: 'meter_readings',
    partitionBy: ['utility_id', 'reading_date'],
  }));

pipeline.on('error', (err) => {
  console.error('💥 Pipeline failed', err);
  /* potential hook: alerting, DLQ publish, etc. */
});

await pipeline.process(batchOfRecords);

CLI—Run Ad-Hoc Validation

pnpm dlake:validate ./samples/eu_meters_2023Q1.csv --region EU --schema meter_reading_v2.json

Extending the Platform

Creating a New Validation Rule

import { ValidationContext, ValidationRule } from '@aquaforge/core/validation';

export class PressureRangeRule implements ValidationRule {
  public readonly name = 'pressure-range';
  public readonly description = 'Ensures pressure (kPa) within safe operating limits';

  async validate(ctx: ValidationContext): Promise<void> {
    const pressure: number = ctx.record.get('pressure_kpa');
    if (pressure < 200 || pressure > 500) {
      ctx.reject(`Pressure out of bounds: ${pressure} kPa`);
    }
  }
}

Register your rule in core/validation/rules/index.ts:

export * from './pressure-range-rule';

Then add to a pipeline:

pipeline.use(new PressureRangeRule());

Implementing a Custom Sink Adapter

All sinks must implement the SinkPort interface:

export interface SinkPort {
  write(records: DomainRecord[]): Promise<void>;
  flush?(): Promise<void>;
  close?(): Promise<void>;
}

Example TimescaleDB sink in adapters/timescaledb/timescale-sink.ts.


Testing & Quality

  • Unit tests: pnpm test:unit (vitest + ts-mockito)
  • Integration tests: pnpm test:int (spawns dockerized infra)
  • Coverage: pnpm coverage
  • Linting: pnpm lint
  • Conventional commits enforced via Husky + Commitlint
  • Code-style: ESLint + Prettier (auto-fixed on commit)
  • Automated CI/CD: GitHub Actions → SonarCloud → Container registry

Production Deployment

  1. Build & push container images:
pnpm release:build
docker push ghcr.io/aquaforge/ingest-kafka:$(git rev-parse --short HEAD)
  1. Apply Helm chart:
helm upgrade --install aqua-ingest ./helm/ingest \
  --set image.tag=$(git rev-parse --short HEAD) \
  -f ./helm/values/prod.yaml
  1. Observe:
  • Grafana → Ingestion Overview
  • Prometheus → job="aqua_ingest"
  • Loki → aggregated logs

Zero-downtime achieved via rolling deployments and consumer group rebalance.


FAQ

Q: Does AquaForge support Azure Blob or Google Cloud Storage?
A: Yes—bring your own adapter implementing ObjectStoragePort; see adapters/gcs for reference.

Q: How do I replay failed messages?
A: Use the built-in DLQ CLI: pnpm dlake:replay --from 2023-08-01 --to 2023-08-02.

Q: Is the core domain library framework-agnostic?
A: 100 %. It compiles to pure ESM with zero runtime dependencies other than the TypeScript standard lib.


Contributing

  1. Fork & clone repo.
  2. Create feature branch feat/<ticket-id>-brief-description.
  3. Commit using conventional style (feat:, fix:, docs:…).
  4. Run pnpm ci (ensures tests, lint, types pass).
  5. Open PR—CI must pass before review.

All contributions must follow the Community Code of Conduct (see CODE_OF_CONDUCT.md).


License

Apache 2.0 © AquaForge Maintainers

{
"id": "typescript_data_lake_hard_014_multi_session_development_expert_01",
"task_category": "multi_session_development",
"difficulty": "expert",
"title": "Implement Dead-Letter Queue (DLQ) for Pipeline Error Handling",
"description": "The UtilityLake Maestro platform currently processes batches of data. If a single record within a large file fails a validation or transformation step, the error is logged, but the record is dropped. Business stakeholders require a more robust error handling mechanism. They want to isolate and store these failed records in a separate, configurable location (a Dead-Letter Queue or DLQ) for later analysis and reprocessing, without halting the entire pipeline execution for valid records.",
"context_files": [
"UtilityLakeMaestro//tsconfig.base.json",
"UtilityLakeMaestro//apps//api-service//tsconfig.json",
"UtilityLakeMaestro//apps//api-service//src//main.ts",
"UtilityLakeMaestro//apps//api-service//src//app.module.ts"
//
],
"context_length": 576125,
"task_prompt": [
"Your goal is to implement a Dead-Letter Queue (DLQ) feature for individual record processing failures within a pipeline. This will be a multi-session task. Follow the instructions for each session, building upon your previous work.\n\n**Session 1: Core Domain and Logic Implementation**",
"Your first task is to update the core domain model to support a DLQ. ",
"1. Start by analyzing the existing pipeline models and execution flow. Pay close attention to `packages/core-domain/src/models/pipeline-definition.model.ts` and the pipeline execution logic (likely within `packages/core-domain/src/etl/`).",
"2. Modify the `PipelineDefinition` model to include an optional `dlqConfiguration` property. This configuration should specify, at a minimum, an `s3Path` for storing failed records.",
"3. Update the core pipeline execution logic to catch record-level processing errors. If a DLQ is configured for the pipeline, the logic should serialize the failed record (e.g., as JSON) along with its error context and write it to the specified S3 path. Use the existing `object-storage.port.ts` for this. The processing of other records in the batch should continue.",
"",
"**Session 2: API Layer Integration**",
"Now that the core logic supports DLQs, you need to expose this functionality through the `api-service`.",
"1. Update the relevant Data Transfer Objects (DTOs), primarily `apps/api-service/src/pipelines/dto/create-pipeline.dto.ts`, to allow clients to specify the `dlqConfiguration` when creating or updating a pipeline.",
"2. Ensure the `pipelines.service.ts` correctly handles this new property, validating it and persisting it via the metastore repository.",
"3. Verify that the GET endpoints for retrieving pipeline definitions now include the `dlqConfiguration`.",
"",
"**Session 3: User Interface Enhancement**",
"With the backend ready, let's provide a way for users to configure this feature in the `ui-portal`.",
"1. Modify the pipeline creation and editing page (likely `apps/ui-portal/src/pages/pipelines/[id].tsx` and its related components) to add a new form field for the DLQ S3 path.",
"2. This should be an optional field. When a user enters a path, it should be sent to the API as part of the pipeline definition on save.",
"3. Ensure the `api-client.ts` is updated to handle sending and receiving the new `dlqConfiguration` property.",
"",
"**Session 4: Testing and Documentation**",
"To complete the feature, you must ensure it's reliable and documented for other developers and users.",
"1. Add a new end-to-end test case in `apps/api-service/test/pipelines.e2e-spec.ts`. This test should:",
" a. Create a pipeline with a schema validation step and a configured DLQ.",
" b. Trigger the pipeline with a file containing both valid and invalid records.",
" c. Assert that the pipeline execution succeeds.",
" d. Assert that the invalid record has been written to the mock S3 DLQ location.\n2. Update the user guide `docs/guides/PIPELINE_CREATION.md` to explain the new DLQ feature, its purpose, and how to configure the S3 path in the UI."
],
"expected_approach": [
"An expert developer would approach this task systematically, moving from the core domain outwards to the application and UI layers.",
"1. **Analysis & Domain Modeling (Session 1):** The developer would first read `ARCHITECTURE.md` to understand the system's structure. They would then identify `packages/core-domain/src/models/pipeline-definition.model.ts` as the source of truth for a pipeline's structure and add the new optional `dlqConfiguration` interface/property there. Next, they would trace the execution from `pipeline-builder.ts` to `pipeline.ts` to find where individual records are processed within a step's strategy (e.g., `schema-validation.strategy.ts`). They would wrap the record-level processing in a `try...catch` block. Inside the `catch` block, they would check for the pipeline's `dlqConfiguration` and, if present, use the injected `ObjectStoragePort` to write the failed record and error message to the specified S3 path.",
"2. **API Development (Session 2):** The developer would then move to the `api-service`. They'd add the `dlqConfiguration` property to `create-pipeline.dto.ts`, likely using NestJS's `@IsOptional()` and validation decorators. They would then trace the create/update flow through `pipelines.controller.ts` into `pipelines.service.ts` to ensure the new property is passed down to the `metastore.repository.port.ts` for persistence.",
"3. **Frontend Implementation (Session 3):** In the `ui-portal`, the developer would locate the pipeline configuration form, probably within `apps/ui-portal/src/pages/pipelines/[id].tsx` or a shared component. They would add a new controlled input field for the DLQ S3 path, updating the component's state and the payload sent by the `api-client.ts` upon form submission.",
"4. **Verification (Session 4):** For testing, the developer would leverage the existing E2E test setup in `pipelines.e2e-spec.ts`. They would write a new `it` block that mocks the S3 dependency to verify writes. The test would create a pipeline via the API, trigger it, and then assert that the mock S3 `putObject` method was called with the correct parameters (bucket, key, and body containing the failed record).\n\n5. **Documentation:** Finally, they would open `docs/guides/PIPELINE_CREATION.md` and add a new section explaining the DLQ feature in clear, user-friendly language."
],
"ground_truth": [
"The solution requires modifications across the monorepo. Key changes include:",
"1. **`packages/core-domain/src/models/pipeline-definition.model.ts`:**",
" ```typescript",
" // ... existing interface",
" export interface PipelineDefinition {",
" // ... other properties",
" dlqConfiguration?: {",
" s3Path: string;",
" };",
" }",
" ```",
"2. **`packages/core-domain/src/etl/pipeline.ts` (or similar execution logic):**",
" ```typescript",
" // Inside a record processing loop",
" for (const record of records) {",
" try {",
" let processedRecord = record;",
" for (const step of this.definition.steps) {",
" processedRecord = await step.process(processedRecord);",
" }",
" successfulRecords.push(processedRecord);",
" } catch (error) {",
" failedRecords.push({ record, error });",
" if (this.definition.dlqConfiguration?.s3Path) {",
" const dlqPayload = JSON.stringify({ record, error: error.message, timestamp: new Date().toISOString() });",
" const dlqPath = `${this.definition.dlqConfiguration.s3Path}/${this.executionId}/${generateUUID()}.json`;",
" await this.storagePort.putObject(dlqPath, dlqPayload);",
" }",
" }",
" }",
" ```",
"3. **`apps/api-service/src/pipelines/dto/create-pipeline.dto.ts`:**",
" ```typescript",
" // ... imports",
" class DlqConfigurationDto {",
" @IsString()",
" @IsNotEmpty()",
" s3Path: string;",
" }",
" export class CreatePipelineDto {",
" // ... other properties",
" @IsOptional()",
" @ValidateNested()",
" @Type(() => DlqConfigurationDto)",
" dlqConfiguration?: DlqConfigurationDto;",
" }",
" ```",
"4. **`apps/ui-portal/src/pages/pipelines/[id].tsx`:**",
" ```jsx",
" // Inside the form component",
" <label>DLQ S3 Path (Optional)</label>",
" <input",
" type=\"text\"",
" placeholder=\"s3://my-dlq-bucket/failures/\"",
" value={pipeline.dlqConfiguration?.s3Path || ''}",
" onChange={(e) => setPipeline({ ...pipeline, dlqConfiguration: { s3Path: e.target.value } })}",
" />",
" ```",
"5. **`docs/guides/PIPELINE_CREATION.md`:**\n A new section titled \"### Configuring a Dead-Letter Queue (DLQ)\" would be added, explaining that this field allows failed records to be stored in the specified S3 path for later analysis."
],
"evaluation_criteria": [
"**Context Retention & Session Continuity:** Does the agent successfully build upon the work from previous sessions? (e.g., Does it use the correctly named `dlqConfiguration` property in the DTO that it defined in the domain model in the previous session?).",
"**Architectural Adherence:** Does the agent correctly separate concerns by placing logic in the appropriate layer (domain, API, UI)? Does it use the existing `ObjectStoragePort` adapter instead of implementing a new S3 client?",
"**Correctness of Implementation:** Does the feature function correctly end-to-end? Are failed records properly isolated and written to the DLQ without affecting valid records?",
"**Code Quality and Idiomatic Style:** Is the generated TypeScript code strongly-typed and clean? Does it follow the established coding patterns within the NestJS API and Next.js UI?",
"**File Navigation and Modification:** How efficiently does the agent locate and modify the correct files across the `packages/`, `apps/`, and `docs/` directories?",
"**Test-Driven Approach:** Is the E2E test comprehensive? Does it properly mock dependencies and make meaningful assertions about the DLQ's behavior?",
"**Documentation:** Is the update to the documentation clear, accurate, and helpful for a user trying to use the new feature?"
]
}
{
"id": "typescript_data_lake_hard_014_multi_session_development_expert_01",
"task_category": "multi_session_development",
"difficulty": "expert",
"title": "Implement Dead-Letter Queue (DLQ) for Pipeline Error Handling",
"description": "The UtilityLake Maestro platform currently processes batches of data. If a single record within a large file fails a validation or transformation step, the error is logged, but the record is dropped. Business stakeholders require a more robust error handling mechanism. They want to isolate and store these failed records in a separate, configurable location (a Dead-Letter Queue or DLQ) for later analysis and reprocessing, without halting the entire pipeline execution for valid records.",
"context_files": [
"UtilityLakeMaestro//tsconfig.base.json",
"UtilityLakeMaestro//apps//api-service//tsconfig.json",
"UtilityLakeMaestro//apps//api-service//src//main.ts",
"UtilityLakeMaestro//apps//api-service//src//app.module.ts",
"UtilityLakeMaestro//apps//api-service//src//app.controller.ts",
"UtilityLakeMaestro//apps//api-service//src//app.service.ts",
"UtilityLakeMaestro//apps//api-service//src//pipelines//pipelines.service.ts",
"UtilityLakeMaestro//apps//api-service//src//scheduler//batch-jobs.service.ts",
"UtilityLakeMaestro//apps//api-service//src//metastore//metastore.service.ts",
"UtilityLakeMaestro//apps//ui-portal//tsconfig.json",
"UtilityLakeMaestro//apps//ui-portal//next.config.js",
"UtilityLakeMaestro//apps//stream-processor//src//main.ts",
"UtilityLakeMaestro//packages//core-domain//tsconfig.json",
"UtilityLakeMaestro//packages//core-domain//src//models//pipeline-definition.model.ts",
"UtilityLakeMaestro//packages//core-domain//src//models//pipeline-execution.model.ts",
"UtilityLakeMaestro//packages//core-domain//src//models//dataset.model.ts",
"UtilityLakeMaestro//packages//core-domain//src//ports//object-storage.port.ts",
"UtilityLakeMaestro//packages//core-domain//src//monitoring//observer.interfaces.ts",
"UtilityLakeMaestro//packages//core-domain//src//etl//strategies//routing//s3-routing.strategy.ts",
"UtilityLakeMaestro//packages//core-domain//src//index.ts",
"UtilityLakeMaestro//packages//core-domain//src//etl//strategies//validation//data-quality.strategy.ts",
"UtilityLakeMaestro//ARCHITECTURE.md",
"UtilityLakeMaestro//packages//core-domain//src//etl//pipeline-builder.ts",
"UtilityLakeMaestro//apps//ui-portal//src//pages//pipelines//index.tsx",
"UtilityLakeMaestro//apps//ui-portal//src//lib//api-client.ts",
"UtilityLakeMaestro//apps//stream-processor//src//stream.module.ts",
"UtilityLakeMaestro//apps//ui-portal//src//components//layout.tsx",
"UtilityLakeMaestro//apps//api-service//src//pipelines//pipelines.module.ts",
"UtilityLakeMaestro//apps//ui-portal//src//pages//pipelines//[id].tsx",
"UtilityLakeMaestro//apps//ui-portal//src//pages//datasets//index.tsx",
"UtilityLakeMaestro//apps//ui-portal//src//components//MetricsDashboard.tsx",
"UtilityLakeMaestro//packages//core-domain//src//etl//pipeline.interfaces.ts",
"UtilityLakeMaestro//docs//guides//PIPELINE_CREATION.md",
"UtilityLakeMaestro//docs//guides//DEPLOYMENT_GUIDE.md",
"UtilityLakeMaestro//apps//ui-portal//src//pages//index.tsx",
"UtilityLakeMaestro//apps//api-service//src//pipelines//dto//create-pipeline.dto.ts",
"UtilityLakeMaestro//docs//api//postman_collection.json",
"UtilityLakeMaestro//apps//ui-portal//src//styles//globals.css",
"UtilityLakeMaestro//docs//api//openapi.json",
"UtilityLakeMaestro//packages//core-domain//src//monitoring//pipeline-monitor.ts",
"UtilityLakeMaestro//apps//ui-portal//src//components//PipelineVisualizer.tsx",
"UtilityLakeMaestro//apps//api-service//src//pipelines//dto//trigger-pipeline.dto.ts",
"UtilityLakeMaestro//packages//core-domain//src//etl//strategies//enrichment//geo-ip.strategy.ts",
"UtilityLakeMaestro//LICENSE",
"UtilityLakeMaestro//docker-compose.yml",
"UtilityLakeMaestro//packages//core-domain//src//etl//pipeline.ts",
"UtilityLakeMaestro//docs//api//README.md",
"UtilityLakeMaestro//packages//core-domain//src//ports//metastore.repository.port.ts",
"UtilityLakeMaestro//packages//core-domain//src//etl//strategies//transformation//field-masking.strategy.ts",
"UtilityLakeMaestro//apps//api-service//src//scheduler//scheduler.module.ts",
"UtilityLakeMaestro//README.md",
"UtilityLakeMaestro//packages//core-domain//src//etl//strategies//validation//schema-validation.strategy.ts",
"UtilityLakeMaestro//docs//diagrams//system_architecture.png",
"UtilityLakeMaestro//apps//api-service//test//pipelines.e2e-spec.ts",
"UtilityLakeMaestro//apps//api-service//src//pipelines//pipelines.controller.ts",
"UtilityLakeMaestro//apps//stream-processor//src//stream.consumer.ts",
"UtilityLakeMaestro//packages//core-domain//src//ports//event-publisher.port.ts",
"UtilityLakeMaestro//packages//core-domain//src//etl//strategies//transformation//csv-to-json.strategy.ts",
"UtilityLakeMaestro//packages//core-domain//test//etl/pipeline.spec.ts",
"UtilityLakeMaestro//docs//diagrams//data_flow.png",
"UtilityLakeMaestro//apps//api-service//src//config//environments//production.ts",
"UtilityLakeMaestro//CONTRIBUTING.md",
"UtilityLakeMaestro//apps//api-service//src//config//environments//development.ts",
"UtilityLakeMaestro//docs//guides//DEVELOPMENT_SETUP.md"
],
"context_length": 576125,
"task_prompt": "Your goal is to implement a Dead-Letter Queue (DLQ) feature for individual record processing failures within a pipeline. This will be a multi-session task. Follow the instructions for each session, building upon your previous work.\n\n**Session 1: Core Domain and Logic Implementation**\nYour first task is to update the core domain model to support a DLQ. \n1. Start by analyzing the existing pipeline models and execution flow. Pay close attention to `packages/core-domain/src/models/pipeline-definition.model.ts` and the pipeline execution logic (likely within `packages/core-domain/src/etl/`).\n2. Modify the `PipelineDefinition` model to include an optional `dlqConfiguration` property. This configuration should specify, at a minimum, an `s3Path` for storing failed records.\n3. Update the core pipeline execution logic to catch record-level processing errors. If a DLQ is configured for the pipeline, the logic should serialize the failed record (e.g., as JSON) along with its error context and write it to the specified S3 path. Use the existing `object-storage.port.ts` for this. The processing of other records in the batch should continue.\n\n**Session 2: API Layer Integration**\nNow that the core logic supports DLQs, you need to expose this functionality through the `api-service`.\n1. Update the relevant Data Transfer Objects (DTOs), primarily `apps/api-service/src/pipelines/dto/create-pipeline.dto.ts`, to allow clients to specify the `dlqConfiguration` when creating or updating a pipeline.\n2. Ensure the `pipelines.service.ts` correctly handles this new property, validating it and persisting it via the metastore repository.\n3. Verify that the GET endpoints for retrieving pipeline definitions now include the `dlqConfiguration`.\n\n**Session 3: User Interface Enhancement**\nWith the backend ready, let's provide a way for users to configure this feature in the `ui-portal`.\n1. Modify the pipeline creation and editing page (likely `apps/ui-portal/src/pages/pipelines/[id].tsx` and its related components) to add a new form field for the DLQ S3 path.\n2. This should be an optional field. When a user enters a path, it should be sent to the API as part of the pipeline definition on save.\n3. Ensure the `api-client.ts` is updated to handle sending and receiving the new `dlqConfiguration` property.\n\n**Session 4: Testing and Documentation**\nTo complete the feature, you must ensure it's reliable and documented for other developers and users.\n1. Add a new end-to-end test case in `apps/api-service/test/pipelines.e2e-spec.ts`. This test should:\n a. Create a pipeline with a schema validation step and a configured DLQ.\n b. Trigger the pipeline with a file containing both valid and invalid records.\n c. Assert that the pipeline execution succeeds.\n d. Assert that the invalid record has been written to the mock S3 DLQ location.\n2. Update the user guide `docs/guides/PIPELINE_CREATION.md` to explain the new DLQ feature, its purpose, and how to configure the S3 path in the UI.",
"expected_approach": "An expert developer would approach this task systematically, moving from the core domain outwards to the application and UI layers.\n\n1. **Analysis & Domain Modeling (Session 1):** The developer would first read `ARCHITECTURE.md` to understand the system's structure. They would then identify `packages/core-domain/src/models/pipeline-definition.model.ts` as the source of truth for a pipeline's structure and add the new optional `dlqConfiguration` interface/property there. Next, they would trace the execution from `pipeline-builder.ts` to `pipeline.ts` to find where individual records are processed within a step's strategy (e.g., `schema-validation.strategy.ts`). They would wrap the record-level processing in a `try...catch` block. Inside the `catch` block, they would check for the pipeline's `dlqConfiguration` and, if present, use the injected `ObjectStoragePort` to write the failed record and error message to the specified S3 path.\n\n2. **API Development (Session 2):** The developer would then move to the `api-service`. They'd add the `dlqConfiguration` property to `create-pipeline.dto.ts`, likely using NestJS's `@IsOptional()` and validation decorators. They would then trace the create/update flow through `pipelines.controller.ts` into `pipelines.service.ts` to ensure the new property is passed down to the `metastore.repository.port.ts` for persistence.\n\n3. **Frontend Implementation (Session 3):** In the `ui-portal`, the developer would locate the pipeline configuration form, probably within `apps/ui-portal/src/pages/pipelines/[id].tsx` or a shared component. They would add a new controlled input field for the DLQ S3 path, updating the component's state and the payload sent by the `api-client.ts` upon form submission.\n\n4. **Verification (Session 4):** For testing, the developer would leverage the existing E2E test setup in `pipelines.e2e-spec.ts`. They would write a new `it` block that mocks the S3 dependency to verify writes. The test would create a pipeline via the API, trigger it, and then assert that the mock S3 `putObject` method was called with the correct parameters (bucket, key, and body containing the failed record).\n\n5. **Documentation:** Finally, they would open `docs/guides/PIPELINE_CREATION.md` and add a new section explaining the DLQ feature in clear, user-friendly language.",
"ground_truth": "The solution requires modifications across the monorepo. Key changes include:\n\n1. **`packages/core-domain/src/models/pipeline-definition.model.ts`:**\n ```typescript\n // ... existing interface\n export interface PipelineDefinition {\n // ... other properties\n dlqConfiguration?: {\n s3Path: string;\n };\n }\n ```\n\n2. **`packages/core-domain/src/etl/pipeline.ts` (or similar execution logic):**\n ```typescript\n // Inside a record processing loop\n for (const record of records) {\n try {\n let processedRecord = record;\n for (const step of this.definition.steps) {\n processedRecord = await step.process(processedRecord);\n }\n successfulRecords.push(processedRecord);\n } catch (error) {\n failedRecords.push({ record, error });\n if (this.definition.dlqConfiguration?.s3Path) {\n const dlqPayload = JSON.stringify({ record, error: error.message, timestamp: new Date().toISOString() });\n const dlqPath = `${this.definition.dlqConfiguration.s3Path}/${this.executionId}/${generateUUID()}.json`;\n await this.storagePort.putObject(dlqPath, dlqPayload);\n }\n }\n }\n ```\n\n3. **`apps/api-service/src/pipelines/dto/create-pipeline.dto.ts`:**\n ```typescript\n // ... imports\n class DlqConfigurationDto {\n @IsString()\n @IsNotEmpty()\n s3Path: string;\n }\n\n export class CreatePipelineDto {\n // ... other properties\n @IsOptional()\n @ValidateNested()\n @Type(() => DlqConfigurationDto)\n dlqConfiguration?: DlqConfigurationDto;\n }\n ```\n\n4. **`apps/ui-portal/src/pages/pipelines/[id].tsx`:**\n ```jsx\n // Inside the form component\n <label>DLQ S3 Path (Optional)</label>\n <input\n type=\"text\"\n placeholder=\"s3://my-dlq-bucket/failures/\"\n value={pipeline.dlqConfiguration?.s3Path || ''}\n onChange={(e) => setPipeline({ ...pipeline, dlqConfiguration: { s3Path: e.target.value } })}\n />\n ```\n\n5. **`docs/guides/PIPELINE_CREATION.md`:**\n A new section titled \"### Configuring a Dead-Letter Queue (DLQ)\" would be added, explaining that this field allows failed records to be stored in the specified S3 path for later analysis.",
"evaluation_criteria": [
"**Context Retention & Session Continuity:** Does the agent successfully build upon the work from previous sessions? (e.g., Does it use the correctly named `dlqConfiguration` property in the DTO that it defined in the domain model in the previous session?).",
"**Architectural Adherence:** Does the agent correctly separate concerns by placing logic in the appropriate layer (domain, API, UI)? Does it use the existing `ObjectStoragePort` adapter instead of implementing a new S3 client?",
"**Correctness of Implementation:** Does the feature function correctly end-to-end? Are failed records properly isolated and written to the DLQ without affecting valid records?",
"**Code Quality and Idiomatic Style:** Is the generated TypeScript code strongly-typed and clean? Does it follow the established coding patterns within the NestJS API and Next.js UI?",
"**File Navigation and Modification:** How efficiently does the agent locate and modify the correct files across the `packages/`, `apps/`, and `docs/` directories?",
"**Test-Driven Approach:** Is the E2E test comprehensive? Does it properly mock dependencies and make meaningful assertions about the DLQ's behavior?",
"**Documentation:** Is the update to the documentation clear, accurate, and helpful for a user trying to use the new feature?"
],
"metadata": {
"context_length": 576125,
"files_count": 64,
"information_coverage": 0.9071563198228909,
"coverage_range": [
0.8,
1.0
],
"generation_timestamp": "2025-08-05T15:23:59.999904"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment