Thursday, April 30, 2026
banner
Top Selling Multipurpose WP Theme

A single information pipeline takes three weeks to ship. At present, even analysts with zero Python expertise can do it in a day. Here is how we obtained right here.

I am Kirill Kazrou, a knowledge engineer at Mindbox. Our crew usually recalculates our shoppers’ enterprise metrics. This implies we’re continually constructing information marts for billing and evaluation, pulling from dozens of various sources.

For a very long time, we relied on PySpark for all information processing. downside? If you do not have Python expertise, you will not be capable of actually use PySpark. Each new pipeline wanted a developer. And that typically meant ready weeks.

This publish describes tips on how to construct an inside information platform that enables analysts and product managers to launch usually up to date pipelines by merely writing 4 YAML recordsdata.

Why PySpark was slowing me down

Let’s illustrate the issue with a textbook instance: calculating MAU (Month-to-month Lively Customers).

On the floor, this seems to be like a easy SQL job. COUNT(DISTINCT customerId) throughout a number of tables over a time-frame. However with all of the infrastructure overhead of PySpark, Airflow DAG setup, Spark useful resource allocation, and testing, we needed to hand it over to builders. consequence? It takes a full week simply to ship the MAU counter.

It took one to 3 weeks for brand spanking new metrics to be supplied. And every time, the method appeared the identical.

  1. Analysts outlined enterprise necessities, discovered out there builders, and handed over context.
  2. The builders labored out the main points, wrote the PySpark code, went by code evaluations, and configured and deployed the DAG.

What we actually needed was for analysts and product managers (the individuals who perceive enterprise logic greatest and are proficient with SQL and YAML) to have the ability to deal with this themselves. There are not any pythons. There is no such thing as a PySpark.

The best way to construct pipelines utilizing PySpark

PySpark substitute: All you want is YAML and SQL

To take a declarative method, we divided the info layer into three elements and chosen the suitable instruments for every.

  • dlt (information load instrument) — Ingest information into object storage from exterior APIs and databases. It’s configured totally by YAML recordsdata. No code required.
  • Turin’s dbt (information building instrument) — Remodel information utilizing pure SQL. Hyperlink by way of mannequin. ref()robotically builds the dependency graph and handles incremental updates.
  • Airflow + Cosmos — Modify your pipeline. Airflow DAG is robotically generated from: dag.yaml And the dbt venture.

We had been already utilizing Trino as a question engine for ad-hoc queries and plugged it into Superset for BI. Queries utilizing normal logic had already confirmed to have the ability to course of massive datasets sooner and with fewer assets than Spark. As well as, Trino natively helps federated entry to a number of information shops from a single SQL question. For 90% of our pipeline, Trino was an ideal match.

Diagram of the new pipeline workflow: Analysts create YAML configurations and SQL models directly. dbt and Torino automatically handle execution through Airflow. No developer involvement required. The complete process will take one day.
After: Analyst-owned pipeline with dbt + Turin

The best way to load information: dlt.yaml

The primary YAML file describes the place and tips on how to load the info for downstream processing. A working instance is proven beneath. Load billing information from inside API.

product: sg-team
characteristic: billing
schema: billing_tarification

dag:
  dag_id: dlt_billing_tarification
  schedule: "0 4 * * *"
  description: "Day by day refresh of tarification information"
  tags:
    - billing

alerts:
  enabled: true
  severity: warning

supply:
  sort: rest_api
  consumer:
    base_url: "https://internal-api.instance.com"
    auth:
      sort: bearer
      token: dlt-billing.token
  assets:
    - identify: tarification_data
      endpoint:
        path: /tarificationData
        methodology: POST
        json:
          firstPeriod: "{{ previous_month_date }}"
          lastPeriod: "{{ previous_month_date }}"
          pricingPlanLine: CurrentPlan
      write_disposition: change
      processing_steps:
        - map: dlt_custom.billing_tarification_data.map

    - identify: charges_raw
      columns:
        staffUserName:
          data_type: textual content
          nullable: true
      endpoint:
        path: /data-feed/costs
        methodology: POST
        json:
          firstPeriod: "{{ previous_month_date }}"
          lastPeriod: "{{ previous_month_date }}"
      write_disposition: change

    - identify: discounts_raw
      endpoint:
        path: /data-feed/reductions
        methodology: POST
        json:
          firstPeriod: "{{ previous_month_date }}"
          lastPeriod: "{{ previous_month_date }}"
      write_disposition: change

This configuration defines 4 assets from a single API. For every, specify the endpoint, request parameters, and write technique. on this case, change It means “overwrite each time”. You can too add processing steps, outline column varieties, and configure alerts.

The general configuration is as follows YAML 40 strains. With out dlt, every connector is a Python script that handles requests, pagination, retries, serialization to delta desk format, and importing to storage.

The best way to remodel information utilizing SQL: dbt_project.yaml and sources.yaml

The subsequent step is to configure the dbt mannequin. For Turin, which means SQL queries.

Right here is an instance of tips on how to arrange MAU calculations. Occasion preparation from a single supply seems to be like this:

-- int_mau_events_visits.sql (simplified)
{{ config(materialized='desk') }}

WITH interval AS (
    -- Rolling window: final 5 months to present
    SELECT
        YEAR(CURRENT_DATE - INTERVAL '5' MONTH) AS start_year,
        MONTH(CURRENT_DATE - INTERVAL '5' MONTH) AS start_month,
        YEAR(CURRENT_DATE) AS end_year,
        MONTH(CURRENT_DATE) AS end_month
),

occasions AS (
    -- Pull go to occasions inside the interval window
    SELECT src._tenant, src.unmergedCustomerId,
           'visits' AS src_type, src.endpoint
    FROM {{ supply('closing', 'customerstracking_visits') }} src
    CROSS JOIN interval p
    WHERE src.unmergedCustomerId IS NOT NULL
      AND /* ...timestamp filtering by 12 months/month bounds... */
),

events_with_customer AS (
    -- Resolve merged buyer IDs
    SELECT e._tenant,
           COALESCE(mc.mergedCustomerId, e.unmergedCustomerId) AS customerId,
           e.src_type, e.endpoint
    FROM occasions e
    LEFT JOIN {{ ref('int_merged_customers') }} mc
      ON e._tenant = mc._tenant
      AND e.unmergedCustomerId = mc.unmergedCustomerId
)

-- Maintain solely precise (non-deleted) clients
SELECT ewc._tenant, ewc.customerId, ewc.src_type, ewc.endpoint
FROM events_with_customer ewc
WHERE EXISTS (
    SELECT 1 FROM {{ ref('int_actual_customers') }} ac
    WHERE ewc._tenant = ac._tenant
      AND ewc.customerId = ac.customerId
)

All 10 occasion sources observe precisely the identical sample. The one distinction is the supply desk and filter. The fashions are then merged right into a single stream.

-- int_mau_events.sql (union of all sources)
SELECT * FROM {{ ref('int_mau_events_inapps_targetings') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_inapps_clicks') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_visits') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_orders') }}
-- ...plus 6 extra sources

And eventually, the info mart the place every part is aggregated.

-- mau_period_datamart.sql
{{ config(
    materialized='incremental',
    incremental_strategy='merge',
    unique_key=['_tenant', 'start_year', 'start_month', 'end_year', 'end_month']
) }}

 int -%

WITH interval AS (
    SELECT
        YEAR(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_year,
        MONTH(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_month,
        YEAR(CURRENT_DATE) AS end_year,
        MONTH(CURRENT_DATE) AS end_month
),
events_resolved AS (
    SELECT * FROM {{ ref('int_mau_events') }}
),
metrics_by_tenant AS (
    SELECT
        er._tenant,
        COUNT(DISTINCT CASE WHEN src_type = 'visits'
              THEN customerId END) AS CustomersTracking_Visits,
        COUNT(DISTINCT CASE WHEN src_type = 'orders'
              THEN customerId END) AS ProcessingOrders_Orders,
        COUNT(DISTINCT CASE WHEN src_type = 'mailings'
              THEN customerId END) AS Mailings_MessageStatuses,
        -- ...different metrics
        COUNT(DISTINCT customerId) AS MAU
    FROM events_resolved er
    GROUP BY er._tenant
)
SELECT m.*, p.start_year, p.start_month, p.end_year, p.end_month
FROM metrics_by_tenant m
CROSS JOIN interval p

For datamart configuration, use: incremental_strategy='merge'. dbt robotically generates a merge question and unique_key For upserts. There is no such thing as a must implement incremental loading manually.

To tie the mannequin to 1 venture, set it up as follows: dbt_project.yaml:

identify: mau_period
model: '1.0.0'

fashions:
  mau_period:
    +on_table_exists: change
    +on_schema_change: append_new_columns

and sources.yamldescribes the enter desk.

sources:
  - identify: closing
    database: data_platform
    schema: closing
    tables:
      - identify: inapps_targetings_v2
      - identify: inapps_clicks_v2
      - identify: customerstracking_visits
      - identify: processingorders_orders
      - identify: cdp_mergedcustomers_v2
      # ...

The consequence is similar enterprise logic as in PySpark, however written in pure SQL. sources.yaml Change the typedspark schema. {{ ref() }} and {{ supply() }} change .get_table()Computerized execution ordering by way of dependency graph replaces guide Spark useful resource tuning.

The best way to configure airflow: dag.yaml

The fourth configuration file defines when and the way Airflow executes the pipeline.

product: sg-team
characteristic: billing
schema: mau
schedule: "15 21 * * *"  # each day at 00:15 MSK

params:
  - identify: start_date
    description: "Begin date (YYYY-MM-DD). Go away empty for auto"
    default: ""
  - identify: end_date
    description: "Finish date (YYYY-MM-DD). Go away empty for auto"
    default: ""
  - identify: months_back
    description: "Months to look again (default: 5)"
    default: 5

alerts:
  enabled: true
  severity: warning

The Python script then parses dag.yaml and dbt_project.yaml It additionally makes use of the Cosmos library to generate a completely useful Airflow DAG. that is, Solely a part of the Python code all through the setup. Create it as soon as and it’ll work in all of your dbt tasks. The essential elements are:

def _build_dbt_project_dags(project_path: Path, environ: dict) -> checklist[DbtDag]:
    config_dict = yaml.safe_load(dag_config_path.read_text())
    config = DagConfig.model_validate(config_dict)

    # YAML params → Airflow Params
    params = {}
    operator_vars = {}
    for param in config.params:
        params[param.name] = Param(
            default=param.default if param.default just isn't None else "",
            description=param.description,
        )
        operator_vars[param.name] = f"{{{{ params.{param.identify} }}}}"

    # Cosmos creates the DAG from the dbt venture
    with DbtDag(
        dag_id=f"dbt_{project_path.identify}",
        schedule=config.schedule,
        params=params,
        project_config=ProjectConfig(dbt_project_path=project_path),
        profile_config=ProfileConfig(
            profile_name="default",
            target_name=project_name,
            profile_mapping=TrinoLDAPProfileMapping(
                conn_id="trino_default",
                profile_args={
                    "database": profile_database,
                    "schema": profile_schema,
                },
            ),
        ),
        operator_args={"vars": operator_vars},
    ) as dag:
        # Create schema earlier than operating fashions
        create_schema = SQLExecuteQueryOperator(
            task_id="create_schema",
            conn_id="trino_default",
            sql=f"CREATE SCHEMA IF NOT EXISTS {profile_database}.{profile_schema} ...",
        )
        # Connect to root duties
        for unique_id, _ in dag.dbt_graph.filtered_nodes.gadgets():
            process = dag.tasks_map[unique_id]
            if not process.upstream_task_ids:
                create_schema >> process

cosmos reads manifest.json dbt venture, parse the mannequin dependency graph, and create separate Airflow duties for every mannequin. Job dependencies are robotically constructed primarily based on: ref() Name it with SQL.

How analysts can construct pipelines with out builders

If an analyst wants a brand new recurring pipeline, they will put it collectively in a number of steps.

Step 1. Create a folder inside the repository. dbt-projects/my_new_pipeline/.

Step 2. If you should embody exterior information, create a YAML configuration for dlt.

Step 3. SQL mannequin fashions/ Write the supply within the folder. sources.yaml.

Step 4. create dbt_project.yaml and dag.yaml.

Step 5. Push to Git, evaluation, and merge.

CI/CD builds the dbt venture and sends the artifacts to S3. Airflow reads the DAG file from there, and Cosmos parses the dbt venture and generates the duty graph. As scheduled, dbt runs the mannequin on Turin within the right order. The tip result’s an up to date information mart within the warehouse that may be accessed by supersets.

Adjustments after migration

A before-and-after comparison shows that pipeline delivery time has decreased from 1-3 weeks with PySpark to 1 day with a YAML-based stack, shifting ownership of pipelines from developers to analysts.
What has modified: From weeks to a day, from builders to analysts.

To construct your individual pipeline, analysts want to grasp: ref() and supply() idea, distinction desk and incremental Fundamentals of materialization and Git. We have carried out a number of inside workshops and put collectively step-by-step guides for every sort of process.

Why the brand new stack will not utterly change PySpark

For about 10% of our pipelines, PySpark remains to be the one choice when the transformation merely does not match into SQL. Though dbt helps Jinja macros, it’s not a full-fledged Python substitute. Additionally, it is disingenuous to disregard the restrictions of recent instruments.

dlt+delta: Experimental replace/insert help. The storage layer makes use of delta format. dlt’s Delta connector is marked as experimental, so the merge technique did not work out of the field. I needed to discover a workaround. In some circumstances, change as an alternative of merge (on the expense of incrementality), and in different elements I wrote customized processing_steps.

Turin has restricted fault tolerance. Turin has a fault tolerance mechanism, however it works by writing intermediate outcomes to S3. With information volumes on the terabyte scale, this isn’t sensible. The sheer variety of S3 operations makes them prohibitively costly. If fault tolerance just isn’t enabled, your complete question will fail if the Trino employee goes down. In distinction, Spark solely restarts failed duties. We addressed this downside with DAG-level retries and decomposing heavy fashions into chains of intermediate fashions.

UDFs and customized logic. Spark means that you can write customized logic in Python inside your pipeline, which may be very helpful. With new architectures, this turns into much more tough. Including dbt on high of Turin does not assist. Jinja solely generates SQL, and dbt’s Python mannequin solely works with Snowflake, Databricks, and BigQuery. UDFs will be written in Turin, however solely in Java. This entails overhead akin to separate repositories, construct pipelines, and deploying JARs to all employees. So in case your transformation does not conform to SQL, you will find yourself with both an unmaintainable SQL monster or a standalone script that breaks your lineage.

What’s subsequent: testing, mannequin templates, coaching

Higher testing. PySpark has carried out a number of testing of its pipelines, however the brand new structure hasn’t caught up but. Latest dbt variations launched unit assessments. Now you can validate your SQL mannequin logic towards mock information with out launching your complete pipeline. I want to add dbt assessments each on the mannequin stage and in a separate monitoring layer.

Reusable templates for widespread patterns. Lots of the dbt fashions are related. A single configuration can write dozens of fashions with the identical sample. Solely the supply desk and filter differ. I plan to extract the shared logic right into a dbt macro.

Develop your platform’s consumer base. We wish extra engineers and analysts to work with information independently. We plan common in-house coaching periods, documentation, and onboarding guides to assist new customers shortly stand up to hurry and begin constructing their very own fashions.

In case your crew is caught in the identical “analysts ready for builders” loop, I’d love to listen to the way you clear up it. Connect with me on LinkedIn Then examine notes.


All photographs on this article are by the creator until in any other case famous.

banner
Top Selling Multipurpose WP Theme

Converter

Top Selling Multipurpose WP Theme

Newsletter

Subscribe my Newsletter for new blog posts, tips & new photos. Let's stay updated!

banner
Top Selling Multipurpose WP Theme

Leave a Comment

banner
Top Selling Multipurpose WP Theme

Latest

Best selling

22000,00 $
16000,00 $
6500,00 $
900000,00 $

Top rated

6500,00 $
22000,00 $
900000,00 $

Products

Knowledge Unleashed
Knowledge Unleashed

Welcome to Ivugangingo!

At Ivugangingo, we're passionate about delivering insightful content that empowers and informs our readers across a spectrum of crucial topics. Whether you're delving into the world of insurance, navigating the complexities of cryptocurrency, or seeking wellness tips in health and fitness, we've got you covered.