Skip to main content

Apache Airflow provider for Planalytix data integration platform

Project description

Planalytix Airflow Provider

Apache Airflow provider package for Planalytix data integration platform.

Orchestrate your data syncs directly from Airflow DAGs with full visibility into job status, progress, and results.

Installation

pip install planalytix-airflow

Or with Poetry:

poetry add planalytix-airflow

Requirements

  • Python 3.8+
  • Apache Airflow 2.5+
  • Planalytix Professional or Enterprise subscription

Quick Start

1. Create an API Key

  1. Log into Planalytix at https://app.planalytix.com
  2. Go to Settings → API Keys
  3. Create a new key with sync:trigger and sync:read permissions
  4. Copy the key (starts with flx_)

2. Configure Airflow Connection

Option A: Via Airflow UI

  1. Go to Admin → Connections
  2. Add a new connection:
    • Connection Id: planalytix_default
    • Connection Type: Planalytix
    • Host: https://api.planalytix.com
    • Password: Your API key (flx_xxxx...)

Option B: Via Environment Variable

export AIRFLOW_CONN_PLANALYTIX_DEFAULT='planalytix://unused:flx_your_api_key@api.planalytix.com'

3. Create Your DAG

from datetime import datetime
from airflow import DAG
from planalytix_provider.operators.sync import PlanalytixSyncOperator

with DAG(
    dag_id="sync_salesforce_daily",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
    catchup=False,
) as dag:
    
    sync_salesforce = PlanalytixSyncOperator(
        task_id="sync_salesforce",
        connection_id="conn_abc123",  # Your Planalytix connection ID
        sync_type="incremental",
        wait_for_completion=True,
        poll_interval=30,
        timeout=3600,
    )

Components

PlanalytixHook

Low-level hook for direct API access:

from planalytix_provider.hooks.planalytix import PlanalytixHook

hook = PlanalytixHook()

# Trigger a sync
job = hook.trigger_sync(
    connection_id="conn_abc123",
    sync_type="incremental",
)

# Check status
status = hook.get_job(job["job_id"])

# Get results
results = hook.get_job_results(job["job_id"])

PlanalytixSyncOperator

Trigger and optionally wait for sync completion:

from planalytix_provider.operators.sync import PlanalytixSyncOperator

# Simple sync with waiting
sync_task = PlanalytixSyncOperator(
    task_id="sync_data",
    connection_id="conn_abc123",
    sync_type="incremental",  # or "full"
    wait_for_completion=True,
    poll_interval=30,
    timeout=3600,
)

# Sync specific streams only
sync_partial = PlanalytixSyncOperator(
    task_id="sync_orders",
    connection_id="conn_abc123",
    streams=["orders", "order_items"],
    sync_type="incremental",
)

# High priority sync (Enterprise only)
sync_priority = PlanalytixSyncOperator(
    task_id="sync_urgent",
    connection_id="conn_abc123",
    priority="high",
    wait_for_completion=True,
)

# Fire and forget (no waiting)
trigger_only = PlanalytixSyncOperator(
    task_id="trigger_sync",
    connection_id="conn_abc123",
    wait_for_completion=False,
)

PlanalytixSyncSensor

Wait for a job triggered elsewhere:

from planalytix_provider.sensors.sync import PlanalytixSyncSensor

# Wait for job from upstream task
wait_for_job = PlanalytixSyncSensor(
    task_id="wait_for_sync",
    job_id="{{ ti.xcom_pull(task_ids='trigger_sync', key='job_id') }}",
    poke_interval=30,
    timeout=3600,
    mode="reschedule",  # Free worker while waiting
)

XCom Values

The operator pushes these values to XCom:

Key Description
job_id The Planalytix job ID
connection_id The connection that was synced
job_status Final status (completed, failed, cancelled)
job_results Full results object (if completed)

Access in downstream tasks:

def process_results(**context):
    job_id = context["ti"].xcom_pull(task_ids="sync_task", key="job_id")
    results = context["ti"].xcom_pull(task_ids="sync_task", key="job_results")
    
    if results:
        rows_synced = results.get("summary", {}).get("total_rows_synced", 0)
        print(f"Synced {rows_synced} rows")

Webhook Integration

For real-time notifications, configure webhooks in Planalytix:

  1. Go to Settings → Webhooks
  2. Add your endpoint URL
  3. Select events: job.completed, job.failed

Your webhook will receive:

{
  "id": "evt_abc123",
  "type": "job.completed",
  "timestamp": "2024-01-15T10:30:00Z",
  "job_id": "job_xyz789",
  "data": {
    "status": "completed",
    "summary": {
      "total_rows_synced": 48291,
      "duration_seconds": 245
    }
  }
}

Idempotency

The operator automatically generates idempotency keys based on:

  • DAG ID
  • Task ID
  • Run ID

This prevents duplicate syncs if a task is retried:

# Idempotency is automatic, but you can override:
sync_task = PlanalytixSyncOperator(
    task_id="sync_data",
    connection_id="conn_abc123",
    idempotency_key="my-custom-key-{{ ds }}",
)

Error Handling

# Fail task on sync failure (default)
sync_task = PlanalytixSyncOperator(
    task_id="sync_data",
    connection_id="conn_abc123",
    fail_on_error=True,  # Default
)

# Continue on failure (for non-critical syncs)
sync_optional = PlanalytixSyncOperator(
    task_id="sync_optional_data",
    connection_id="conn_xyz",
    fail_on_error=False,
)

Tier Features

Feature Professional Enterprise
Trigger syncs
Wait for completion
Webhook notifications
Priority queuing
AI event visibility
Deferrable operators ✅ (coming soon)

Troubleshooting

"No API key found"

Ensure your Airflow connection has the API key in the password field or in extras as api_key.

"Connection not found"

Verify the connection_id matches a connection in your Planalytix account. You can find connection IDs in the Planalytix UI under Connections.

"Orchestration API requires Professional tier"

The orchestration API is available on Professional and Enterprise tiers. Upgrade at https://planalytix.com/pricing

Sync timeout

Increase the timeout parameter or use wait_for_completion=False with a separate sensor.

Support

License

Apache License 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

planalytix_airflow-1.0.3.tar.gz (21.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

planalytix_airflow-1.0.3-py3-none-any.whl (19.9 kB view details)

Uploaded Python 3

File details

Details for the file planalytix_airflow-1.0.3.tar.gz.

File metadata

  • Download URL: planalytix_airflow-1.0.3.tar.gz
  • Upload date:
  • Size: 21.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.9

File hashes

Hashes for planalytix_airflow-1.0.3.tar.gz
Algorithm Hash digest
SHA256 4fca0cc41fb08228e4b414132dc7fedfb8a052f25f52bc812eed8cbe9b93f332
MD5 7e4b4336830758c1fa4ff78c1872ba48
BLAKE2b-256 3fbea3c5b9e1ce0ff6813faf2b75765c4fdeb2725c70e6c40c86a878d237f906

See more details on using hashes here.

File details

Details for the file planalytix_airflow-1.0.3-py3-none-any.whl.

File metadata

File hashes

Hashes for planalytix_airflow-1.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 f136c1484ab4e3250c939d35ecf3f832fa13f6cce97edf6999f4b603ca2f52be
MD5 71a63798705dc7e4b08eea211c6691b3
BLAKE2b-256 da4cc32731ea77283f0fc2990cc52f831872d0e54fdc5f5939520b4422882803

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page