Skip to content

Instantly share code, notes, and snippets.

@lalitsingh24x7
Last active February 20, 2025 20:31
Show Gist options
  • Select an option

  • Save lalitsingh24x7/e8c287fa55589d24a1a4c892d8784d4b to your computer and use it in GitHub Desktop.

Select an option

Save lalitsingh24x7/e8c287fa55589d24a1a4c892d8784d4b to your computer and use it in GitHub Desktop.
DE pain
1: Test and Quality
Unit Test: Individual transformations - Pytest
Schema Test: Ensures correct column names & types - Great Expectations
Data Quality: Checks for missing values & duplicates - Pandas or PySpark
Integration Test: Validates data flow from source to destination - Pytest, Temp Files
2: Failure, Visibilty and retry
1: ----------------
/my_project
│── Dockerfile
│── transformation.py
│── tests/
│ ├── test_unit.py
│ ├── test_schema.py
│── requirements.txt
│── run_transformation.sh
│── run_tests.sh
│── run_schema_tests.sh
2: --------------------------
FROM python:3.9
WORKDIR /app
# Copy the entire repo
COPY . .
# Install dependencies
RUN pip install -r requirements.txt
# Make the scripts executable
RUN chmod +x run_transformation.sh run_tests.sh run_schema_tests.sh
ENTRYPOINT ["/bin/bash"]
3:----------------------
Transformation Script (transformation.py)
import pandas as pd
def transform():
df = pd.DataFrame({"col1": [1, 2, None], "col2": ["A", "B", "C"]})
df = df.dropna() # Simple transformation
df.to_csv("output.csv", index=False)
print("Transformation completed successfully!")
if __name__ == "__main__":
transform()
4:-------------------------
Unit Test (tests/test_unit.py)
import pandas as pd
def test_no_null_values():
df = pd.read_csv("output.csv")
assert df.isnull().sum().sum() == 0, "Data contains null values!"
5: --------------
Schema Test (tests/test_schema.py)
from great_expectations.dataset import PandasDataset
import pandas as pd
def test_schema():
df = pd.read_csv("output.csv")
dataset = PandasDataset(df)
dataset.expect_column_to_exist("col1")
dataset.expect_column_to_exist("col2")
print("Schema validation passed!")
6: Final DAg
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.utils.dates import days_ago
# Define your Docker image (this must be stored in a container registry like ECR, GCR, or Docker Hub)
DOCKER_IMAGE = "myrepo/mytransformation:latest" # Change this to your image URL
default_args = {
"owner": "airflow",
"retries": 2,
}
dag = DAG(
"run_etl_steps_k8s",
default_args=default_args,
description="Run transformations and tests inside a Kubernetes Pod",
schedule_interval=None, # Run on demand
start_date=days_ago(1),
catchup=False,
)
# Step 1: Run the transformation
run_transformation = KubernetesPodOperator(
namespace="default", # Change to your namespace
image=DOCKER_IMAGE,
cmds=["/bin/bash", "-c"],
arguments=["/app/run_transformation.sh"],
name="run-transformation",
task_id="run_transformation",
get_logs=True,
dag=dag,
)
# Step 2: Run unit tests
run_unit_tests = KubernetesPodOperator(
namespace="default",
image=DOCKER_IMAGE,
cmds=["/bin/bash", "-c"],
arguments=["/app/run_tests.sh"],
name="run-unit-tests",
task_id="run_unit_tests",
get_logs=True,
dag=dag,
)
# Step 3: Run schema validation
run_schema_validation = KubernetesPodOperator(
namespace="default",
image=DOCKER_IMAGE,
cmds=["/bin/bash", "-c"],
arguments=["/app/run_schema_tests.sh"],
name="run-schema-validation",
task_id="run_schema_validation",
get_logs=True,
dag=dag,
)
# Define dependencies
run_transformation >> run_unit_tests >> run_schema_validation
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment