Airflow Xcom < 5000+ REAL >
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def explicit_push_callable(ti): # Manually pushing a custom key and value ti.xcom_push(key="custom_metric", value=94.7) # Returning a value automatically creates key "return_value" return "Task completed successfully" def explicit_pull_callable(ti): # Pulling from a specific task by ID and explicit key metric = ti.xcom_pull(task_ids="pusher_task", key="custom_metric") # Pulling the implicit default return value status_msg = ti.xcom_pull(task_ids="pusher_task", key="return_value") print(f"Pulled metric: metric, Status message: status_msg") with DAG(dag_id="classic_xcom_pipeline", start_date=datetime(2026, 1, 1), schedule=None) as dag: pusher_task = PythonOperator( task_id="pusher_task", python_callable=explicit_push_callable ) puller_task = PythonOperator( task_id="puller_task", python_callable=explicit_pull_callable ) pusher_task >> puller_task Use code with caution. Jinja Template Pulling
What are you passing between tasks (metadata strings, execution IDs, or large dataframes)? airflow xcom
Master Apache Airflow XComs: The Complete Guide to Cross-Task Communication from airflow import DAG from airflow
: Ensure your data payloads match strict JSON requirements before pushing, or implement custom serialization handlers to prevent schema validation crashes. key="return_value") print(f"Pulled metric: metric
[Task A] --(push dataframe)--> [Custom XCom Backend] --(upload payload)--> [AWS S3 / GCS] | (write URI only) v [Airflow Metadata Database] | (read URI only) v [Task B] <--(pull dataframe)-- [Custom XCom Backend] <--(download payload)-- [AWS S3 / GCS] Implementing a Custom S3 XCom Backend
If you need to pass large objects (e.g., Pandas DataFrames), you can implement a backend that automatically saves to S3.