Apache Airflow provider for Planalytix data integration platform
Project description
Planalytix Airflow Provider
Apache Airflow provider package for Planalytix data integration platform.
Orchestrate your data syncs directly from Airflow DAGs with full visibility into job status, progress, and results.
Installation
pip install planalytix-airflow
Or with Poetry:
poetry add planalytix-airflow
Requirements
- Python 3.8+
- Apache Airflow 2.5+
- Planalytix Professional or Enterprise subscription
Quick Start
1. Create an API Key
- Log into Planalytix at https://app.planalytix.com
- Go to Settings → API Keys
- Create a new key with
sync:triggerandsync:readpermissions - Copy the key (starts with
flx_)
2. Configure Airflow Connection
Option A: Via Airflow UI
- Go to Admin → Connections
- Add a new connection:
- Connection Id:
planalytix_default - Connection Type:
Planalytix - Host:
https://api.planalytix.com - Password: Your API key (
flx_xxxx...)
- Connection Id:
Option B: Via Environment Variable
export AIRFLOW_CONN_PLANALYTIX_DEFAULT='planalytix://unused:flx_your_api_key@api.planalytix.com'
3. Create Your DAG
from datetime import datetime
from airflow import DAG
from planalytix_provider.operators.sync import PlanalytixSyncOperator
with DAG(
dag_id="sync_salesforce_daily",
start_date=datetime(2024, 1, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
sync_salesforce = PlanalytixSyncOperator(
task_id="sync_salesforce",
connection_id="conn_abc123", # Your Planalytix connection ID
sync_type="incremental",
wait_for_completion=True,
poll_interval=30,
timeout=3600,
)
Components
PlanalytixHook
Low-level hook for direct API access:
from planalytix_provider.hooks.planalytix import PlanalytixHook
hook = PlanalytixHook()
# Trigger a sync
job = hook.trigger_sync(
connection_id="conn_abc123",
sync_type="incremental",
)
# Check status
status = hook.get_job(job["job_id"])
# Get results
results = hook.get_job_results(job["job_id"])
PlanalytixSyncOperator
Trigger and optionally wait for sync completion:
from planalytix_provider.operators.sync import PlanalytixSyncOperator
# Simple sync with waiting
sync_task = PlanalytixSyncOperator(
task_id="sync_data",
connection_id="conn_abc123",
sync_type="incremental", # or "full"
wait_for_completion=True,
poll_interval=30,
timeout=3600,
)
# Sync specific streams only
sync_partial = PlanalytixSyncOperator(
task_id="sync_orders",
connection_id="conn_abc123",
streams=["orders", "order_items"],
sync_type="incremental",
)
# High priority sync (Enterprise only)
sync_priority = PlanalytixSyncOperator(
task_id="sync_urgent",
connection_id="conn_abc123",
priority="high",
wait_for_completion=True,
)
# Fire and forget (no waiting)
trigger_only = PlanalytixSyncOperator(
task_id="trigger_sync",
connection_id="conn_abc123",
wait_for_completion=False,
)
PlanalytixSyncSensor
Wait for a job triggered elsewhere:
from planalytix_provider.sensors.sync import PlanalytixSyncSensor
# Wait for job from upstream task
wait_for_job = PlanalytixSyncSensor(
task_id="wait_for_sync",
job_id="{{ ti.xcom_pull(task_ids='trigger_sync', key='job_id') }}",
poke_interval=30,
timeout=3600,
mode="reschedule", # Free worker while waiting
)
XCom Values
The operator pushes these values to XCom:
| Key | Description |
|---|---|
job_id |
The Planalytix job ID |
connection_id |
The connection that was synced |
job_status |
Final status (completed, failed, cancelled) |
job_results |
Full results object (if completed) |
Access in downstream tasks:
def process_results(**context):
job_id = context["ti"].xcom_pull(task_ids="sync_task", key="job_id")
results = context["ti"].xcom_pull(task_ids="sync_task", key="job_results")
if results:
rows_synced = results.get("summary", {}).get("total_rows_synced", 0)
print(f"Synced {rows_synced} rows")
Webhook Integration
For real-time notifications, configure webhooks in Planalytix:
- Go to Settings → Webhooks
- Add your endpoint URL
- Select events:
job.completed,job.failed
Your webhook will receive:
{
"id": "evt_abc123",
"type": "job.completed",
"timestamp": "2024-01-15T10:30:00Z",
"job_id": "job_xyz789",
"data": {
"status": "completed",
"summary": {
"total_rows_synced": 48291,
"duration_seconds": 245
}
}
}
Idempotency
The operator automatically generates idempotency keys based on:
- DAG ID
- Task ID
- Run ID
This prevents duplicate syncs if a task is retried:
# Idempotency is automatic, but you can override:
sync_task = PlanalytixSyncOperator(
task_id="sync_data",
connection_id="conn_abc123",
idempotency_key="my-custom-key-{{ ds }}",
)
Error Handling
# Fail task on sync failure (default)
sync_task = PlanalytixSyncOperator(
task_id="sync_data",
connection_id="conn_abc123",
fail_on_error=True, # Default
)
# Continue on failure (for non-critical syncs)
sync_optional = PlanalytixSyncOperator(
task_id="sync_optional_data",
connection_id="conn_xyz",
fail_on_error=False,
)
Tier Features
| Feature | Professional | Enterprise |
|---|---|---|
| Trigger syncs | ✅ | ✅ |
| Wait for completion | ✅ | ✅ |
| Webhook notifications | ✅ | ✅ |
| Priority queuing | ❌ | ✅ |
| AI event visibility | ❌ | ✅ |
| Deferrable operators | ❌ | ✅ (coming soon) |
Troubleshooting
"No API key found"
Ensure your Airflow connection has the API key in the password field or in extras as api_key.
"Connection not found"
Verify the connection_id matches a connection in your Planalytix account. You can find connection IDs in the Planalytix UI under Connections.
"Orchestration API requires Professional tier"
The orchestration API is available on Professional and Enterprise tiers. Upgrade at https://planalytix.com/pricing
Sync timeout
Increase the timeout parameter or use wait_for_completion=False with a separate sensor.
Support
- Documentation: https://docs.planalytix.com/integrations/airflow
- Issues: https://github.com/planalytix/planalytix-airflow/issues
- Email: support@planalytix.com
License
Apache License 2.0
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file planalytix_airflow-1.0.3.tar.gz.
File metadata
- Download URL: planalytix_airflow-1.0.3.tar.gz
- Upload date:
- Size: 21.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4fca0cc41fb08228e4b414132dc7fedfb8a052f25f52bc812eed8cbe9b93f332
|
|
| MD5 |
7e4b4336830758c1fa4ff78c1872ba48
|
|
| BLAKE2b-256 |
3fbea3c5b9e1ce0ff6813faf2b75765c4fdeb2725c70e6c40c86a878d237f906
|
File details
Details for the file planalytix_airflow-1.0.3-py3-none-any.whl.
File metadata
- Download URL: planalytix_airflow-1.0.3-py3-none-any.whl
- Upload date:
- Size: 19.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f136c1484ab4e3250c939d35ecf3f832fa13f6cce97edf6999f4b603ca2f52be
|
|
| MD5 |
71a63798705dc7e4b08eea211c6691b3
|
|
| BLAKE2b-256 |
da4cc32731ea77283f0fc2990cc52f831872d0e54fdc5f5939520b4422882803
|