Skip to main content

A declarative query language for data processing pipelines

Project description

FlowQuery (Python)

A pure Python implementation of FlowQuery, a declarative OpenCypher-based query language for virtual graphs and data processing pipelines. This package has full functional fidelity with the TypeScript version.

Installation

pip install flowquery

Quick Start

Command Line Interface

Start the interactive REPL:

flowquery

Programmatic Usage

import asyncio
from flowquery import Runner

runner = Runner("WITH 1 as x RETURN x + 1 as result")
asyncio.run(runner.run())
print(runner.results)  # [{'result': 2}]

In Jupyter notebooks, you can use await directly:

from flowquery import Runner

runner = Runner("WITH 1 as x RETURN x + 1 as result")
await runner.run()
print(runner.results)  # [{'result': 2}]

Statement Info: Labels, Properties, and Source Lineage

The Runner exposes a metadata property that mirrors the TypeScript implementation. It reports counts of virtual nodes and relationships created/deleted plus an optional info: StatementInfo describing the structure the query touches — independent of execution.

StatementInfo captures:

  • The node labels and relationship types referenced.
  • The data sources backing the underlying virtual definitions.
  • The node/relationship properties consumed by the query — alias.prop accesses anywhere in MATCH, WHERE, WITH, RETURN, ORDER BY, or function arguments, plus inline pattern properties like (u:User {id: 'rick.o'}).
  • The properties declared by each virtual's RETURN clause via info.declared, so you can validate that a query references only declared properties.
  • Literal values supplied for properties at the call site via info.nodes[label].literal_values — collected from inline pattern properties and from equality / IN predicates such as WHERE u.id = 'rick.o' or WHERE u.id IN ['a', 'b'].

The per-entity nodes and relationships maps give end-to-end lineage from a property to its data source:

from flowquery import Runner

runner = Runner("""
    CREATE VIRTUAL (:City) AS {
        LOAD JSON FROM "https://example.com/cities" AS c
        RETURN c.id AS id, c.name AS name, c.country AS country
    };
    CREATE VIRTUAL (:City)-[:FLIGHT]-(:City) AS {
        LOAD JSON FROM "https://example.com/flights" AS f
        RETURN f.left_id AS left_id, f.right_id AS right_id, f.airline AS airline
    };
    MATCH (a:City {name: 'NYC'})-[r:FLIGHT]->(b:City)
    WHERE b.country IN ['US', 'CA']
    RETURN a.name AS origin, b.name AS destination, r.airline AS airline
""")
info = runner.metadata.info

print(info.nodes)
# {'City': NodeInfo(
#     properties=['country', 'name'],
#     sources=['https://example.com/cities'],
#     literal_values={'country': ['US', 'CA'], 'name': ['NYC']},
# )}
print(info.relationships)
# {'FLIGHT': RelationshipInfo(
#     properties=['airline'],
#     sources=['https://example.com/flights'],
#     literal_values={},
# )}
print(info.declared.nodes['City'])
# DeclaredEntityInfo(
#     properties=['country', 'id', 'name'],
#     sources=['https://example.com/cities'],
# )
print(info.sources)
# ['https://example.com/cities', 'https://example.com/flights']

StatementInfo resolves sources and declared schemas for any virtual the query touches — both inline CREATE VIRTUAL clauses and previously-registered virtuals reached via MATCH or DELETE. The flat node_labels, relationship_types, sources, node_properties, and relationship_properties fields stay in sync with the per-entity nodes and relationships maps. Only purely literal AST subtrees end up in literal_values — values that depend on parameters, references, f-strings, or subqueries are skipped.

The same StatementInfoCrawler can be used directly on any parsed AST without going through a Runner:

from flowquery import StatementInfoCrawler
crawler = StatementInfoCrawler()
info = crawler.crawl(parsed_ast)

Documentation

Extending FlowQuery with Custom Functions

The query language itself is identical between the TypeScript and Python versions. The only difference is that custom functions are written in Python here instead of TypeScript.

Creating a Custom Scalar Function

Scalar functions operate on individual values and return a result:

from flowquery.extensibility import Function, FunctionDef

@FunctionDef({
    "description": "Doubles a number",
    "category": "scalar",
    "parameters": [{"name": "value", "description": "Number to double", "type": "number"}],
    "output": {"description": "Doubled value", "type": "number"},
})
class Double(Function):
    def __init__(self):
        super().__init__("double")
        self._expected_parameter_count = 1

    def value(self):
        return self.get_children()[0].value() * 2

Once defined, use it in your queries:

WITH 5 AS num RETURN double(num) AS result
// Returns: [{"result": 10}]

Creating a Custom String Function

from flowquery.extensibility import Function, FunctionDef

@FunctionDef({
    "description": "Reverses a string",
    "category": "scalar",
    "parameters": [{"name": "text", "description": "String to reverse", "type": "string"}],
    "output": {"description": "Reversed string", "type": "string"},
})
class StrReverse(Function):
    def __init__(self):
        super().__init__("strreverse")
        self._expected_parameter_count = 1

    def value(self) -> str:
        return str(self.get_children()[0].value())[::-1]

Usage:

WITH 'hello' AS s RETURN strreverse(s) AS reversed
// Returns: [{"reversed": "olleh"}]

Creating a Custom Aggregate Function

Aggregate functions process multiple values and return a single result. They require a ReducerElement to track state:

from flowquery.extensibility import AggregateFunction, FunctionDef, ReducerElement

class MinReducerElement(ReducerElement):
    def __init__(self):
        self._value = None

    @property
    def value(self):
        return self._value

    @value.setter
    def value(self, val):
        self._value = val

@FunctionDef({
    "description": "Collects the minimum value",
    "category": "aggregate",
    "parameters": [{"name": "value", "description": "Value to compare", "type": "number"}],
    "output": {"description": "Minimum value", "type": "number"},
})
class MinValue(AggregateFunction):
    def __init__(self):
        super().__init__("minvalue")
        self._expected_parameter_count = 1

    def reduce(self, element):
        current = self.first_child().value()
        if element.value is None or current < element.value:
            element.value = current

    def element(self):
        return MinReducerElement()

Usage:

UNWIND [5, 2, 8, 1, 9] AS num RETURN minvalue(num) AS min
// Returns: [{"min": 1}]

Creating a Custom Async Data Provider

Async providers allow you to create custom data sources that can be used with LOAD JSON FROM:

from flowquery.extensibility import AsyncFunction, FunctionDef

@FunctionDef({
    "description": "Provides example data for testing",
    "category": "async",
    "parameters": [],
    "output": {"description": "Example data object", "type": "object"},
})
class GetExampleData(AsyncFunction):
    def __init__(self):
        super().__init__("getexampledata")
        self._expected_parameter_count = 0

    async def generate(self):
        yield {"id": 1, "name": "Alice"}
        yield {"id": 2, "name": "Bob"}

Usage:

LOAD JSON FROM getexampledata() AS data RETURN data.id AS id, data.name AS name
// Returns: [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]

Using Custom Functions with Expressions

Custom functions integrate seamlessly with FlowQuery expressions and can be combined with other functions:

// Using custom function with expressions
WITH 5 * 3 AS num RETURN addhundred(num) + 1 AS result

// Using multiple custom functions together
WITH 2 AS num RETURN triple(num) AS tripled, square(num) AS squared

Introspecting Registered Functions

You can use the built-in functions() function to discover registered functions including your custom ones:

WITH functions() AS funcs
UNWIND funcs AS f
WITH f WHERE f.name = 'double'
RETURN f.name AS name, f.description AS description, f.category AS category

License

MIT License - see LICENSE for details.

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowquery-1.0.53.tar.gz (95.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flowquery-1.0.53-py3-none-any.whl (155.1 kB view details)

Uploaded Python 3

File details

Details for the file flowquery-1.0.53.tar.gz.

File metadata

  • Download URL: flowquery-1.0.53.tar.gz
  • Upload date:
  • Size: 95.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for flowquery-1.0.53.tar.gz
Algorithm Hash digest
SHA256 2bd57d92a3c9730ad9e924cdabcb66ac665dae8ad01da975254bf4ae3891c15b
MD5 d19629bbffe5fe0da61e7b17af86d796
BLAKE2b-256 c68d94e23a57d16c9e368388b6cb334e71d7bc423586930e57ba825d71e82234

See more details on using hashes here.

File details

Details for the file flowquery-1.0.53-py3-none-any.whl.

File metadata

  • Download URL: flowquery-1.0.53-py3-none-any.whl
  • Upload date:
  • Size: 155.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for flowquery-1.0.53-py3-none-any.whl
Algorithm Hash digest
SHA256 b66f2df6bb3b8b84300497e2610cea0d5540c24c45cfa215967ae83e3cbf066a
MD5 a16de09d20acfa372c36a8098b78a519
BLAKE2b-256 7ff788a733870f061c04b93425ce4c331fca817dd80f5f681d6208415d0269eb

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page