Last active
February 20, 2025 20:31
-
-
Save lalitsingh24x7/e8c287fa55589d24a1a4c892d8784d4b to your computer and use it in GitHub Desktop.
DE pain
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| 1: Test and Quality | |
| Unit Test: Individual transformations - Pytest | |
| Schema Test: Ensures correct column names & types - Great Expectations | |
| Data Quality: Checks for missing values & duplicates - Pandas or PySpark | |
| Integration Test: Validates data flow from source to destination - Pytest, Temp Files | |
| 2: Failure, Visibilty and retry | |
| 1: ---------------- | |
| /my_project | |
| │── Dockerfile | |
| │── transformation.py | |
| │── tests/ | |
| │ ├── test_unit.py | |
| │ ├── test_schema.py | |
| │── requirements.txt | |
| │── run_transformation.sh | |
| │── run_tests.sh | |
| │── run_schema_tests.sh | |
| 2: -------------------------- | |
| FROM python:3.9 | |
| WORKDIR /app | |
| # Copy the entire repo | |
| COPY . . | |
| # Install dependencies | |
| RUN pip install -r requirements.txt | |
| # Make the scripts executable | |
| RUN chmod +x run_transformation.sh run_tests.sh run_schema_tests.sh | |
| ENTRYPOINT ["/bin/bash"] | |
| 3:---------------------- | |
| Transformation Script (transformation.py) | |
| import pandas as pd | |
| def transform(): | |
| df = pd.DataFrame({"col1": [1, 2, None], "col2": ["A", "B", "C"]}) | |
| df = df.dropna() # Simple transformation | |
| df.to_csv("output.csv", index=False) | |
| print("Transformation completed successfully!") | |
| if __name__ == "__main__": | |
| transform() | |
| 4:------------------------- | |
| Unit Test (tests/test_unit.py) | |
| import pandas as pd | |
| def test_no_null_values(): | |
| df = pd.read_csv("output.csv") | |
| assert df.isnull().sum().sum() == 0, "Data contains null values!" | |
| 5: -------------- | |
| Schema Test (tests/test_schema.py) | |
| from great_expectations.dataset import PandasDataset | |
| import pandas as pd | |
| def test_schema(): | |
| df = pd.read_csv("output.csv") | |
| dataset = PandasDataset(df) | |
| dataset.expect_column_to_exist("col1") | |
| dataset.expect_column_to_exist("col2") | |
| print("Schema validation passed!") | |
| 6: Final DAg | |
| from airflow import DAG | |
| from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator | |
| from airflow.utils.dates import days_ago | |
| # Define your Docker image (this must be stored in a container registry like ECR, GCR, or Docker Hub) | |
| DOCKER_IMAGE = "myrepo/mytransformation:latest" # Change this to your image URL | |
| default_args = { | |
| "owner": "airflow", | |
| "retries": 2, | |
| } | |
| dag = DAG( | |
| "run_etl_steps_k8s", | |
| default_args=default_args, | |
| description="Run transformations and tests inside a Kubernetes Pod", | |
| schedule_interval=None, # Run on demand | |
| start_date=days_ago(1), | |
| catchup=False, | |
| ) | |
| # Step 1: Run the transformation | |
| run_transformation = KubernetesPodOperator( | |
| namespace="default", # Change to your namespace | |
| image=DOCKER_IMAGE, | |
| cmds=["/bin/bash", "-c"], | |
| arguments=["/app/run_transformation.sh"], | |
| name="run-transformation", | |
| task_id="run_transformation", | |
| get_logs=True, | |
| dag=dag, | |
| ) | |
| # Step 2: Run unit tests | |
| run_unit_tests = KubernetesPodOperator( | |
| namespace="default", | |
| image=DOCKER_IMAGE, | |
| cmds=["/bin/bash", "-c"], | |
| arguments=["/app/run_tests.sh"], | |
| name="run-unit-tests", | |
| task_id="run_unit_tests", | |
| get_logs=True, | |
| dag=dag, | |
| ) | |
| # Step 3: Run schema validation | |
| run_schema_validation = KubernetesPodOperator( | |
| namespace="default", | |
| image=DOCKER_IMAGE, | |
| cmds=["/bin/bash", "-c"], | |
| arguments=["/app/run_schema_tests.sh"], | |
| name="run-schema-validation", | |
| task_id="run_schema_validation", | |
| get_logs=True, | |
| dag=dag, | |
| ) | |
| # Define dependencies | |
| run_transformation >> run_unit_tests >> run_schema_validation |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment