A single information pipeline takes three weeks to ship. At present, even analysts with zero Python expertise can do it in a day. Here is how we obtained right here.
I am Kirill Kazrou, a knowledge engineer at Mindbox. Our crew usually recalculates our shoppers’ enterprise metrics. This implies we’re continually constructing information marts for billing and evaluation, pulling from dozens of various sources.
For a very long time, we relied on PySpark for all information processing. downside? If you do not have Python expertise, you will not be capable of actually use PySpark. Each new pipeline wanted a developer. And that typically meant ready weeks.
This publish describes tips on how to construct an inside information platform that enables analysts and product managers to launch usually up to date pipelines by merely writing 4 YAML recordsdata.
Why PySpark was slowing me down
Let’s illustrate the issue with a textbook instance: calculating MAU (Month-to-month Lively Customers).
On the floor, this seems to be like a easy SQL job. COUNT(DISTINCT customerId) throughout a number of tables over a time-frame. However with all of the infrastructure overhead of PySpark, Airflow DAG setup, Spark useful resource allocation, and testing, we needed to hand it over to builders. consequence? It takes a full week simply to ship the MAU counter.
It took one to 3 weeks for brand spanking new metrics to be supplied. And every time, the method appeared the identical.
- Analysts outlined enterprise necessities, discovered out there builders, and handed over context.
- The builders labored out the main points, wrote the PySpark code, went by code evaluations, and configured and deployed the DAG.
What we actually needed was for analysts and product managers (the individuals who perceive enterprise logic greatest and are proficient with SQL and YAML) to have the ability to deal with this themselves. There are not any pythons. There is no such thing as a PySpark.
PySpark substitute: All you want is YAML and SQL
To take a declarative method, we divided the info layer into three elements and chosen the suitable instruments for every.
- dlt (information load instrument) — Ingest information into object storage from exterior APIs and databases. It’s configured totally by YAML recordsdata. No code required.
- Turin’s dbt (information building instrument) — Remodel information utilizing pure SQL. Hyperlink by way of mannequin.
ref()robotically builds the dependency graph and handles incremental updates. - Airflow + Cosmos — Modify your pipeline. Airflow DAG is robotically generated from:
dag.yamlAnd the dbt venture.
We had been already utilizing Trino as a question engine for ad-hoc queries and plugged it into Superset for BI. Queries utilizing normal logic had already confirmed to have the ability to course of massive datasets sooner and with fewer assets than Spark. As well as, Trino natively helps federated entry to a number of information shops from a single SQL question. For 90% of our pipeline, Trino was an ideal match.

The best way to load information: dlt.yaml
The primary YAML file describes the place and tips on how to load the info for downstream processing. A working instance is proven beneath. Load billing information from inside API.
product: sg-team
characteristic: billing
schema: billing_tarification
dag:
dag_id: dlt_billing_tarification
schedule: "0 4 * * *"
description: "Day by day refresh of tarification information"
tags:
- billing
alerts:
enabled: true
severity: warning
supply:
sort: rest_api
consumer:
base_url: "https://internal-api.instance.com"
auth:
sort: bearer
token: dlt-billing.token
assets:
- identify: tarification_data
endpoint:
path: /tarificationData
methodology: POST
json:
firstPeriod: "{{ previous_month_date }}"
lastPeriod: "{{ previous_month_date }}"
pricingPlanLine: CurrentPlan
write_disposition: change
processing_steps:
- map: dlt_custom.billing_tarification_data.map
- identify: charges_raw
columns:
staffUserName:
data_type: textual content
nullable: true
endpoint:
path: /data-feed/costs
methodology: POST
json:
firstPeriod: "{{ previous_month_date }}"
lastPeriod: "{{ previous_month_date }}"
write_disposition: change
- identify: discounts_raw
endpoint:
path: /data-feed/reductions
methodology: POST
json:
firstPeriod: "{{ previous_month_date }}"
lastPeriod: "{{ previous_month_date }}"
write_disposition: change
This configuration defines 4 assets from a single API. For every, specify the endpoint, request parameters, and write technique. on this case, change It means “overwrite each time”. You can too add processing steps, outline column varieties, and configure alerts.
The general configuration is as follows YAML 40 strains. With out dlt, every connector is a Python script that handles requests, pagination, retries, serialization to delta desk format, and importing to storage.
The best way to remodel information utilizing SQL: dbt_project.yaml and sources.yaml
The subsequent step is to configure the dbt mannequin. For Turin, which means SQL queries.
Right here is an instance of tips on how to arrange MAU calculations. Occasion preparation from a single supply seems to be like this:
-- int_mau_events_visits.sql (simplified)
{{ config(materialized='desk') }}
WITH interval AS (
-- Rolling window: final 5 months to present
SELECT
YEAR(CURRENT_DATE - INTERVAL '5' MONTH) AS start_year,
MONTH(CURRENT_DATE - INTERVAL '5' MONTH) AS start_month,
YEAR(CURRENT_DATE) AS end_year,
MONTH(CURRENT_DATE) AS end_month
),
occasions AS (
-- Pull go to occasions inside the interval window
SELECT src._tenant, src.unmergedCustomerId,
'visits' AS src_type, src.endpoint
FROM {{ supply('closing', 'customerstracking_visits') }} src
CROSS JOIN interval p
WHERE src.unmergedCustomerId IS NOT NULL
AND /* ...timestamp filtering by 12 months/month bounds... */
),
events_with_customer AS (
-- Resolve merged buyer IDs
SELECT e._tenant,
COALESCE(mc.mergedCustomerId, e.unmergedCustomerId) AS customerId,
e.src_type, e.endpoint
FROM occasions e
LEFT JOIN {{ ref('int_merged_customers') }} mc
ON e._tenant = mc._tenant
AND e.unmergedCustomerId = mc.unmergedCustomerId
)
-- Maintain solely precise (non-deleted) clients
SELECT ewc._tenant, ewc.customerId, ewc.src_type, ewc.endpoint
FROM events_with_customer ewc
WHERE EXISTS (
SELECT 1 FROM {{ ref('int_actual_customers') }} ac
WHERE ewc._tenant = ac._tenant
AND ewc.customerId = ac.customerId
)
All 10 occasion sources observe precisely the identical sample. The one distinction is the supply desk and filter. The fashions are then merged right into a single stream.
-- int_mau_events.sql (union of all sources)
SELECT * FROM {{ ref('int_mau_events_inapps_targetings') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_inapps_clicks') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_visits') }}
UNION ALL
SELECT * FROM {{ ref('int_mau_events_orders') }}
-- ...plus 6 extra sources
And eventually, the info mart the place every part is aggregated.
-- mau_period_datamart.sql
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key=['_tenant', 'start_year', 'start_month', 'end_year', 'end_month']
) }}
int -%
WITH interval AS (
SELECT
YEAR(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_year,
MONTH(CURRENT_DATE - INTERVAL '{{ months_back }}' MONTH) AS start_month,
YEAR(CURRENT_DATE) AS end_year,
MONTH(CURRENT_DATE) AS end_month
),
events_resolved AS (
SELECT * FROM {{ ref('int_mau_events') }}
),
metrics_by_tenant AS (
SELECT
er._tenant,
COUNT(DISTINCT CASE WHEN src_type = 'visits'
THEN customerId END) AS CustomersTracking_Visits,
COUNT(DISTINCT CASE WHEN src_type = 'orders'
THEN customerId END) AS ProcessingOrders_Orders,
COUNT(DISTINCT CASE WHEN src_type = 'mailings'
THEN customerId END) AS Mailings_MessageStatuses,
-- ...different metrics
COUNT(DISTINCT customerId) AS MAU
FROM events_resolved er
GROUP BY er._tenant
)
SELECT m.*, p.start_year, p.start_month, p.end_year, p.end_month
FROM metrics_by_tenant m
CROSS JOIN interval p
For datamart configuration, use: incremental_strategy='merge'. dbt robotically generates a merge question and unique_key For upserts. There is no such thing as a must implement incremental loading manually.
To tie the mannequin to 1 venture, set it up as follows: dbt_project.yaml:
identify: mau_period
model: '1.0.0'
fashions:
mau_period:
+on_table_exists: change
+on_schema_change: append_new_columns
and sources.yamldescribes the enter desk.
sources:
- identify: closing
database: data_platform
schema: closing
tables:
- identify: inapps_targetings_v2
- identify: inapps_clicks_v2
- identify: customerstracking_visits
- identify: processingorders_orders
- identify: cdp_mergedcustomers_v2
# ...
The consequence is similar enterprise logic as in PySpark, however written in pure SQL. sources.yaml Change the typedspark schema. {{ ref() }} and {{ supply() }} change .get_table()Computerized execution ordering by way of dependency graph replaces guide Spark useful resource tuning.
The best way to configure airflow: dag.yaml
The fourth configuration file defines when and the way Airflow executes the pipeline.
product: sg-team
characteristic: billing
schema: mau
schedule: "15 21 * * *" # each day at 00:15 MSK
params:
- identify: start_date
description: "Begin date (YYYY-MM-DD). Go away empty for auto"
default: ""
- identify: end_date
description: "Finish date (YYYY-MM-DD). Go away empty for auto"
default: ""
- identify: months_back
description: "Months to look again (default: 5)"
default: 5
alerts:
enabled: true
severity: warning
The Python script then parses dag.yaml and dbt_project.yaml It additionally makes use of the Cosmos library to generate a completely useful Airflow DAG. that is, Solely a part of the Python code all through the setup. Create it as soon as and it’ll work in all of your dbt tasks. The essential elements are:
def _build_dbt_project_dags(project_path: Path, environ: dict) -> checklist[DbtDag]:
config_dict = yaml.safe_load(dag_config_path.read_text())
config = DagConfig.model_validate(config_dict)
# YAML params → Airflow Params
params = {}
operator_vars = {}
for param in config.params:
params[param.name] = Param(
default=param.default if param.default just isn't None else "",
description=param.description,
)
operator_vars[param.name] = f"{{{{ params.{param.identify} }}}}"
# Cosmos creates the DAG from the dbt venture
with DbtDag(
dag_id=f"dbt_{project_path.identify}",
schedule=config.schedule,
params=params,
project_config=ProjectConfig(dbt_project_path=project_path),
profile_config=ProfileConfig(
profile_name="default",
target_name=project_name,
profile_mapping=TrinoLDAPProfileMapping(
conn_id="trino_default",
profile_args={
"database": profile_database,
"schema": profile_schema,
},
),
),
operator_args={"vars": operator_vars},
) as dag:
# Create schema earlier than operating fashions
create_schema = SQLExecuteQueryOperator(
task_id="create_schema",
conn_id="trino_default",
sql=f"CREATE SCHEMA IF NOT EXISTS {profile_database}.{profile_schema} ...",
)
# Connect to root duties
for unique_id, _ in dag.dbt_graph.filtered_nodes.gadgets():
process = dag.tasks_map[unique_id]
if not process.upstream_task_ids:
create_schema >> process
cosmos reads manifest.json dbt venture, parse the mannequin dependency graph, and create separate Airflow duties for every mannequin. Job dependencies are robotically constructed primarily based on: ref() Name it with SQL.
How analysts can construct pipelines with out builders
If an analyst wants a brand new recurring pipeline, they will put it collectively in a number of steps.
Step 1. Create a folder inside the repository. dbt-projects/my_new_pipeline/.
Step 2. If you should embody exterior information, create a YAML configuration for dlt.
Step 3. SQL mannequin fashions/ Write the supply within the folder. sources.yaml.
Step 4. create dbt_project.yaml and dag.yaml.
Step 5. Push to Git, evaluation, and merge.
CI/CD builds the dbt venture and sends the artifacts to S3. Airflow reads the DAG file from there, and Cosmos parses the dbt venture and generates the duty graph. As scheduled, dbt runs the mannequin on Turin within the right order. The tip result’s an up to date information mart within the warehouse that may be accessed by supersets.
Adjustments after migration

To construct your individual pipeline, analysts want to grasp: ref() and supply() idea, distinction desk and incremental Fundamentals of materialization and Git. We have carried out a number of inside workshops and put collectively step-by-step guides for every sort of process.
Why the brand new stack will not utterly change PySpark
For about 10% of our pipelines, PySpark remains to be the one choice when the transformation merely does not match into SQL. Though dbt helps Jinja macros, it’s not a full-fledged Python substitute. Additionally, it is disingenuous to disregard the restrictions of recent instruments.
dlt+delta: Experimental replace/insert help. The storage layer makes use of delta format. dlt’s Delta connector is marked as experimental, so the merge technique did not work out of the field. I needed to discover a workaround. In some circumstances, change as an alternative of merge (on the expense of incrementality), and in different elements I wrote customized processing_steps.
Turin has restricted fault tolerance. Turin has a fault tolerance mechanism, however it works by writing intermediate outcomes to S3. With information volumes on the terabyte scale, this isn’t sensible. The sheer variety of S3 operations makes them prohibitively costly. If fault tolerance just isn’t enabled, your complete question will fail if the Trino employee goes down. In distinction, Spark solely restarts failed duties. We addressed this downside with DAG-level retries and decomposing heavy fashions into chains of intermediate fashions.
UDFs and customized logic. Spark means that you can write customized logic in Python inside your pipeline, which may be very helpful. With new architectures, this turns into much more tough. Including dbt on high of Turin does not assist. Jinja solely generates SQL, and dbt’s Python mannequin solely works with Snowflake, Databricks, and BigQuery. UDFs will be written in Turin, however solely in Java. This entails overhead akin to separate repositories, construct pipelines, and deploying JARs to all employees. So in case your transformation does not conform to SQL, you will find yourself with both an unmaintainable SQL monster or a standalone script that breaks your lineage.
What’s subsequent: testing, mannequin templates, coaching
Higher testing. PySpark has carried out a number of testing of its pipelines, however the brand new structure hasn’t caught up but. Latest dbt variations launched unit assessments. Now you can validate your SQL mannequin logic towards mock information with out launching your complete pipeline. I want to add dbt assessments each on the mannequin stage and in a separate monitoring layer.
Reusable templates for widespread patterns. Lots of the dbt fashions are related. A single configuration can write dozens of fashions with the identical sample. Solely the supply desk and filter differ. I plan to extract the shared logic right into a dbt macro.
Develop your platform’s consumer base. We wish extra engineers and analysts to work with information independently. We plan common in-house coaching periods, documentation, and onboarding guides to assist new customers shortly stand up to hurry and begin constructing their very own fashions.
In case your crew is caught in the identical “analysts ready for builders” loop, I’d love to listen to the way you clear up it. Connect with me on LinkedIn Then examine notes.
All photographs on this article are by the creator until in any other case famous.

