A declarative query language for data processing pipelines
Project description
FlowQuery (Python)
A pure Python implementation of FlowQuery, a declarative OpenCypher-based query language for virtual graphs and data processing pipelines. This package has full functional fidelity with the TypeScript version.
Installation
pip install flowquery
Quick Start
Command Line Interface
Start the interactive REPL:
flowquery
Programmatic Usage
import asyncio
from flowquery import Runner
runner = Runner("WITH 1 as x RETURN x + 1 as result")
asyncio.run(runner.run())
print(runner.results) # [{'result': 2}]
In Jupyter notebooks, you can use await directly:
from flowquery import Runner
runner = Runner("WITH 1 as x RETURN x + 1 as result")
await runner.run()
print(runner.results) # [{'result': 2}]
Statement Info: Labels, Properties, and Source Lineage
The Runner exposes a metadata property that mirrors the TypeScript
implementation. It reports counts of virtual nodes and relationships
created/deleted plus an optional info: StatementInfo describing the
structure the query touches — independent of execution.
StatementInfo captures:
- The node labels and relationship types referenced.
- The data sources backing the underlying virtual definitions.
- The node/relationship properties consumed by the query —
alias.propaccesses anywhere inMATCH,WHERE,WITH,RETURN,ORDER BY, or function arguments, plus inline pattern properties like(u:User {id: 'rick.o'}). - The properties declared by each virtual's
RETURNclause viainfo.declared, so you can validate that a query references only declared properties. - Literal values supplied for properties at the call site via
info.nodes[label].literal_values— collected from inline pattern properties and from equality /INpredicates such asWHERE u.id = 'rick.o'orWHERE u.id IN ['a', 'b'].
The per-entity nodes and relationships maps give end-to-end lineage
from a property to its data source:
from flowquery import Runner
runner = Runner("""
CREATE VIRTUAL (:City) AS {
LOAD JSON FROM "https://example.com/cities" AS c
RETURN c.id AS id, c.name AS name, c.country AS country
};
CREATE VIRTUAL (:City)-[:FLIGHT]-(:City) AS {
LOAD JSON FROM "https://example.com/flights" AS f
RETURN f.left_id AS left_id, f.right_id AS right_id, f.airline AS airline
};
MATCH (a:City {name: 'NYC'})-[r:FLIGHT]->(b:City)
WHERE b.country IN ['US', 'CA']
RETURN a.name AS origin, b.name AS destination, r.airline AS airline
""")
info = runner.metadata.info
print(info.nodes)
# {'City': NodeInfo(
# properties=['country', 'name'],
# sources=['https://example.com/cities'],
# literal_values={'country': ['US', 'CA'], 'name': ['NYC']},
# )}
print(info.relationships)
# {'FLIGHT': RelationshipInfo(
# properties=['airline'],
# sources=['https://example.com/flights'],
# literal_values={},
# )}
print(info.declared.nodes['City'])
# DeclaredEntityInfo(
# properties=['country', 'id', 'name'],
# sources=['https://example.com/cities'],
# )
print(info.sources)
# ['https://example.com/cities', 'https://example.com/flights']
StatementInfo resolves sources and declared schemas for any virtual
the query touches — both inline CREATE VIRTUAL clauses and
previously-registered virtuals reached via MATCH or DELETE. The flat
node_labels, relationship_types, sources, node_properties, and
relationship_properties fields stay in sync with the per-entity nodes
and relationships maps. Only purely literal AST subtrees end up in
literal_values — values that depend on parameters, references,
f-strings, or subqueries are skipped.
The same StatementInfoCrawler can be used directly on any parsed AST
without going through a Runner:
from flowquery import StatementInfoCrawler
crawler = StatementInfoCrawler()
info = crawler.crawl(parsed_ast)
Documentation
- Language Reference (clauses, expressions, functions, graph operations, and more)
- Quick Cheat Sheet
- Full Documentation
- Contributing Guide
- Virtual Graph Demo Notebook — a demo of virtual graph capabilities and custom function extensibility
Extending FlowQuery with Custom Functions
The query language itself is identical between the TypeScript and Python versions. The only difference is that custom functions are written in Python here instead of TypeScript.
Creating a Custom Scalar Function
Scalar functions operate on individual values and return a result:
from flowquery.extensibility import Function, FunctionDef
@FunctionDef({
"description": "Doubles a number",
"category": "scalar",
"parameters": [{"name": "value", "description": "Number to double", "type": "number"}],
"output": {"description": "Doubled value", "type": "number"},
})
class Double(Function):
def __init__(self):
super().__init__("double")
self._expected_parameter_count = 1
def value(self):
return self.get_children()[0].value() * 2
Once defined, use it in your queries:
WITH 5 AS num RETURN double(num) AS result
// Returns: [{"result": 10}]
Creating a Custom String Function
from flowquery.extensibility import Function, FunctionDef
@FunctionDef({
"description": "Reverses a string",
"category": "scalar",
"parameters": [{"name": "text", "description": "String to reverse", "type": "string"}],
"output": {"description": "Reversed string", "type": "string"},
})
class StrReverse(Function):
def __init__(self):
super().__init__("strreverse")
self._expected_parameter_count = 1
def value(self) -> str:
return str(self.get_children()[0].value())[::-1]
Usage:
WITH 'hello' AS s RETURN strreverse(s) AS reversed
// Returns: [{"reversed": "olleh"}]
Creating a Custom Aggregate Function
Aggregate functions process multiple values and return a single result. They require a ReducerElement to track state:
from flowquery.extensibility import AggregateFunction, FunctionDef, ReducerElement
class MinReducerElement(ReducerElement):
def __init__(self):
self._value = None
@property
def value(self):
return self._value
@value.setter
def value(self, val):
self._value = val
@FunctionDef({
"description": "Collects the minimum value",
"category": "aggregate",
"parameters": [{"name": "value", "description": "Value to compare", "type": "number"}],
"output": {"description": "Minimum value", "type": "number"},
})
class MinValue(AggregateFunction):
def __init__(self):
super().__init__("minvalue")
self._expected_parameter_count = 1
def reduce(self, element):
current = self.first_child().value()
if element.value is None or current < element.value:
element.value = current
def element(self):
return MinReducerElement()
Usage:
UNWIND [5, 2, 8, 1, 9] AS num RETURN minvalue(num) AS min
// Returns: [{"min": 1}]
Creating a Custom Async Data Provider
Async providers allow you to create custom data sources that can be used with LOAD JSON FROM:
from flowquery.extensibility import AsyncFunction, FunctionDef
@FunctionDef({
"description": "Provides example data for testing",
"category": "async",
"parameters": [],
"output": {"description": "Example data object", "type": "object"},
})
class GetExampleData(AsyncFunction):
def __init__(self):
super().__init__("getexampledata")
self._expected_parameter_count = 0
async def generate(self):
yield {"id": 1, "name": "Alice"}
yield {"id": 2, "name": "Bob"}
Usage:
LOAD JSON FROM getexampledata() AS data RETURN data.id AS id, data.name AS name
// Returns: [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
Using Custom Functions with Expressions
Custom functions integrate seamlessly with FlowQuery expressions and can be combined with other functions:
// Using custom function with expressions
WITH 5 * 3 AS num RETURN addhundred(num) + 1 AS result
// Using multiple custom functions together
WITH 2 AS num RETURN triple(num) AS tripled, square(num) AS squared
Introspecting Registered Functions
You can use the built-in functions() function to discover registered functions including your custom ones:
WITH functions() AS funcs
UNWIND funcs AS f
WITH f WHERE f.name = 'double'
RETURN f.name AS name, f.description AS description, f.category AS category
License
MIT License - see LICENSE for details.
Links
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flowquery-1.0.53.tar.gz.
File metadata
- Download URL: flowquery-1.0.53.tar.gz
- Upload date:
- Size: 95.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2bd57d92a3c9730ad9e924cdabcb66ac665dae8ad01da975254bf4ae3891c15b
|
|
| MD5 |
d19629bbffe5fe0da61e7b17af86d796
|
|
| BLAKE2b-256 |
c68d94e23a57d16c9e368388b6cb334e71d7bc423586930e57ba825d71e82234
|
File details
Details for the file flowquery-1.0.53-py3-none-any.whl.
File metadata
- Download URL: flowquery-1.0.53-py3-none-any.whl
- Upload date:
- Size: 155.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b66f2df6bb3b8b84300497e2610cea0d5540c24c45cfa215967ae83e3cbf066a
|
|
| MD5 |
a16de09d20acfa372c36a8098b78a519
|
|
| BLAKE2b-256 |
7ff788a733870f061c04b93425ce4c331fca817dd80f5f681d6208415d0269eb
|