Skip to main content

API Reference

This page documents the public Python API exposed by ryx. It is aligned with the current codebase: ryx-python/ryx/__init__.py, models.py, queryset.py, fields.py, transaction.py, cache.py, signals.py, router.py, and the PyO3 extension stub.

Setup and Connection Poolsโ€‹

import ryx

await ryx.setup(
"sqlite:///app.db",
max_connections=10,
min_connections=1,
connect_timeout=30,
idle_timeout=600,
max_lifetime=1800,
)

setup() accepts either a single URL or a mapping of aliases to URLs:

await ryx.setup({
"default": "postgres://user:pass@localhost/app",
"replica": "postgres://user:pass@localhost/app_replica",
"logs": "sqlite:///logs.db",
})

Connection helpers:

FunctionPurpose
await ryx.setup(urls, **pool_options)Initialize one or more connection pools
ryx.is_connected(alias="default")Return whether a pool alias is initialized
ryx.list_aliases()Return configured database aliases
ryx.pool_stats()Return pool statistics from the Rust backend

Auto-initialization runs on import unless RYX_AUTO_INITIALIZE is one of 0, false, n, or no. It reads RYX_DATABASE_URL, RYX_DB_<ALIAS>_URL, and the first config file found among ryx.yaml, ryx.yml, ryx.toml, and ryx.json.

Logging is controlled by RYX_LOG_LEVEL. Use NO_LOG, OFF, or SILENT to disable Ryx logging.

Modelsโ€‹

from ryx import Model, CharField, IntField, Index, Constraint

class Post(Model):
title = CharField(max_length=200, db_index=True)
views = IntField(default=0)

class Meta:
table_name = "blog_posts"
app_label = "blog"
database = "default"
ordering = ["-views"]
unique_together = [("title", "views")]
index_together = [("title", "views")]
indexes = [Index(fields=["title"], name="idx_post_title")]
constraints = [Constraint(check="views >= 0", name="chk_views_positive")]
abstract = False
managed = True

Model behavior:

APIPurpose
Model(**kwargs)Create an unsaved instance with field defaults applied
obj.pkReturn the primary-key value
await obj.full_clean()Run field validators and clean()
await obj.save(validate=True, update_fields=None, using=None)Insert or update the row
await obj.delete()Delete the row and clear its primary key
await obj.refresh_from_db(fields=None)Reload fields from the database
Model.DoesNotExistPer-model exception raised by get()
Model.MultipleObjectsReturnedPer-model exception raised by get()

Lifecycle hooks can be overridden:

class Post(Model):
title = CharField(max_length=200)

async def clean(self):
if not self.title.strip():
raise ValidationError({"title": ["Title cannot be empty"]})

async def before_save(self, created: bool): ...
async def after_save(self, created: bool): ...
async def before_delete(self): ...
async def after_delete(self): ...

Managersโ€‹

Every concrete model receives objects = Manager() unless explicitly overridden.

Post.objects.all()
Post.objects.filter(active=True)
Post.objects.exclude(status="draft")
Post.objects.order_by("-created_at")
Post.objects.using("replica")
Post.objects.cache(ttl=60)
Post.objects.stream(chunk_size=100)

await Post.objects.create(title="Hello")
await Post.objects.get(pk=1)
await Post.objects.first()
await Post.objects.last()
await Post.objects.exists()
await Post.objects.count()
await Post.objects.aggregate(total=Count("id"))
await Post.objects.get_or_create(slug="hello", defaults={"title": "Hello"})
await Post.objects.update_or_create(slug="hello", defaults={"title": "New"})
await Post.objects.bulk_create([Post(title="A"), Post(title="B")])
await Post.objects.bulk_update(posts, fields=["title"])
await Post.objects.bulk_delete(Post.objects.filter(active=False))

QuerySetโ€‹

QuerySet is lazy, async, chainable, and immutable. SQL is executed only when the queryset is awaited or when an async terminal method is called.

Building Queriesโ€‹

qs = (
Post.objects
.filter(active=True, views__gte=100)
.exclude(title__istartswith="draft")
.order_by("-views", "title")
.limit(20)
.offset(40)
)
posts = await qs
MethodPurpose
.filter(*q, **kwargs)Add AND-ed filters and Q expressions
.exclude(*q, **kwargs)Add negated filters
.all()Clone the queryset
.order_by(*fields)Add ordering; "-field" means descending
.limit(n)Add LIMIT
.offset(n)Add OFFSET
.distinct()Add DISTINCT
.values(*fields)Select fields and enable grouping for annotations
.annotate(**aggs)Add aggregate expressions per row
.join(table, on, alias=None, kind="INNER")Add an explicit SQL join
.using(alias)Force a database alias
.schema(schema)Target a PostgreSQL schema
.cache(ttl=None, key=None)Cache the first evaluated result
.stream(chunk_size=100, keyset=None, as_dict=False)Iterate in chunks

Slicing is translated to LIMIT/OFFSET:

first_ten = await Post.objects.order_by("id")[:10]
page_two = await Post.objects.order_by("id")[10:20]
third = await Post.objects.order_by("id")[2]

Negative indexes and step slicing are not supported.

select_related() exists on the Python queryset but is currently a no-op in the implementation. Use explicit .join() when you need an eager SQL join.

Terminal Methodsโ€‹

await qs.count()
await qs.exists()
await qs.first()
await qs.get(pk=1)
await qs.update(views=100)
await qs.delete()
await qs.bulk_delete()
await qs.in_bulk([1, 2, 3])
MethodReturn
await qslist[Model]
await qs.count()int
await qs.exists()bool
await qs.first()`Model
await qs.get(**filters)Exactly one model or per-model exception
await qs.update(**fields)Number of updated rows
await qs.delete()Number of deleted rows
await qs.in_bulk(ids, field_name="pk")dict[value, Model]

Debug SQL:

print(Post.objects.filter(active=True).query)

Q Objectsโ€‹

from ryx import Q

Post.objects.filter(Q(active=True) | Q(views__gte=1000))
Post.objects.filter(Q(active=True) & ~Q(status="draft"))
Post.objects.filter((Q(active=True) & Q(views__gte=100)) | Q(featured=True))

Multiple kwargs inside one Q() are AND-ed. Q objects can be combined with regular filter kwargs.

Lookups and Transformsโ€‹

Common lookups:

exact, gt, gte, lt, lte,
contains, icontains,
startswith, istartswith,
endswith, iendswith,
isnull, in, range

Date/time transforms:

date, year, month, day, hour, minute, second, week,
dow, quarter, time, iso_week, iso_dow

JSON transforms/lookups:

key, key_text, json,
has_key, has_any, has_all, contains, contained_by

Examples:

await Post.objects.filter(title__icontains="ryx")
await Post.objects.filter(created_at__year__gte=2025)
await Event.objects.filter(payload__key_text__icontains="error")

Runtime helpers:

ryx.available_lookups()
ryx.list_lookups()
ryx.available_transforms()
ryx.register_lookup("ne", "{col} <> ?")

Decorator form:

@ryx.lookup("ne")
def not_equal():
"""{col} <> ?"""

Aggregatesโ€‹

from ryx import Count, Sum, Avg, Min, Max, RawAgg

await Post.objects.aggregate(
total=Count("id"),
total_views=Sum("views"),
avg_views=Avg("views"),
min_views=Min("views"),
max_views=Max("views"),
)

rows = await (
Post.objects
.values("author_id")
.annotate(post_count=Count("id"), views=Sum("views"))
)

Count("field", distinct=True) produces a distinct count. RawAgg(sql, alias) is available for backend-specific expressions.

Fieldsโ€‹

from ryx import (
AutoField, BigAutoField, SmallAutoField,
IntField, SmallIntField, BigIntField, PositiveIntField,
FloatField, DecimalField,
BooleanField, NullBooleanField,
CharField, TextField, SlugField, EmailField, URLField, IPAddressField,
DateField, DateTimeField, TimeField, DurationField,
UUIDField, JSONField, BinaryField, ArrayField,
ForeignKey, OneToOneField, ManyToManyField,
)

Common field options:

CharField(
max_length=200,
null=False,
blank=False,
default="",
primary_key=False,
unique=False,
db_index=False,
choices=["draft", "published"],
validators=[],
editable=True,
help_text="",
verbose_name="",
db_column="db_column_name",
unique_for_date=None,
unique_for_month=None,
unique_for_year=None,
)

See Field Reference for the full field matrix.

Relationshipsโ€‹

class Author(Model):
name = CharField(max_length=100)

class Post(Model):
author = ForeignKey(Author, on_delete="CASCADE", related_name="posts")

class Profile(Model):
author = OneToOneField(Author, on_delete="CASCADE")

class Tag(Model):
name = CharField(max_length=50)

class TaggedPost(Model):
post = ForeignKey(Post, on_delete="CASCADE")
tag = ForeignKey(Tag, on_delete="CASCADE")

class PostTag(Model):
tags = ManyToManyField(Tag, through=TaggedPost)

Relationship helpers include forward descriptors, reverse FK managers, and many-to-many managers. Reverse FK managers expose queryset-like helpers such as all(), filter(), exclude(), order_by(), count(), exists(), first(), get(), create(), add(), remove(), and delete().

Transactionsโ€‹

from ryx import transaction, get_active_transaction

async with transaction("default") as tx:
await Account.objects.get(pk=1)
tx.savepoint("before_transfer")
tx.rollback_to("before_transfer")
tx.release_savepoint("before_transfer")

current = get_active_transaction()

The transaction context commits on successful exit and rolls back on exception. Nested transactions are implemented with savepoints by the Rust backend.

Bulk Operations and Streamingโ€‹

from ryx import bulk_create, bulk_update, bulk_delete, stream

await bulk_create([Post(title="A"), Post(title="B")], batch_size=500)
await bulk_update(posts, fields=["title"], batch_size=500)
await bulk_delete(Post.objects.filter(active=False))

async for post in Post.objects.filter(active=True).stream(chunk_size=100):
...

async for row in Post.objects.stream(chunk_size=500, keyset="id", as_dict=True):
...

keyset streaming uses cursor-style pagination and is preferred for large tables when the keyset column is indexed.

Validationโ€‹

from ryx import (
ValidationError, Validator, FunctionValidator,
NotNullValidator, NotBlankValidator,
MaxLengthValidator, MinLengthValidator,
MinValueValidator, MaxValueValidator,
RangeValidator, RegexValidator,
EmailValidator, URLValidator, ChoicesValidator,
)

Validation runs in save() by default. Pass validate=False to skip it:

await post.save(validate=False)

Signalsโ€‹

from ryx import (
Signal, receiver,
pre_save, post_save,
pre_delete, post_delete,
pre_update, post_update,
pre_bulk_delete, post_bulk_delete,
)

@receiver(post_save, sender=Post)
async def on_post_saved(sender, instance, created, **kwargs):
...

Signal methods:

signal.connect(handler, sender=None, weak=True)
signal.disconnect(handler, sender=None)
await signal.send(sender, **kwargs)

Cachingโ€‹

from ryx import MemoryCache, configure_cache, get_cache
from ryx import invalidate, invalidate_model, invalidate_all

configure_cache(MemoryCache(max_size=1000, ttl=300))

posts = await Post.objects.filter(active=True).cache(ttl=60)
named = await Post.objects.all().cache(key="all_posts", ttl=300)

await invalidate("all_posts")
await invalidate_model(Post)
await invalidate_all()
cache = get_cache()

MemoryCache implements get, set, delete, delete_many, clear, and keys. It is process-local and TTL-aware.

Raw SQL and Parameterized Executionโ€‹

from ryx.executor_helpers import raw_fetch, raw_execute
from ryx.pool_ext import fetch_with_params, execute_with_params

rows = await raw_fetch("SELECT * FROM posts WHERE active = true", alias="default")
await raw_execute("CREATE INDEX IF NOT EXISTS idx_posts_active ON posts(active)")

rows = await fetch_with_params("SELECT * FROM posts WHERE id = ?", [1])
affected = await execute_with_params("UPDATE posts SET views = ? WHERE id = ?", [10, 1])

Use parameterized helpers for user input. Raw SQL helpers execute the SQL string directly.

Database Routingโ€‹

from ryx.router import BaseRouter, set_router

class Router(BaseRouter):
def db_for_read(self, model, **hints):
return "replica"

def db_for_write(self, model, **hints):
return getattr(model._meta, "database", None) or "default"

def allow_migrate(self, db, app_label, model_name):
return True

set_router(Router())

Read alias resolution order:

  1. QuerySet .using(alias)
  2. Router.db_for_read(model)
  3. Model.Meta.database
  4. "default"

Write alias resolution order:

  1. Explicit save(using=alias) or QuerySet .using(alias) where applicable
  2. Router.db_for_write(model)
  3. Model.Meta.database
  4. "default"

Migrationsโ€‹

from ryx.migrations import MigrationRunner
from ryx.migrations.autodetect import Autodetector
from ryx.migrations.ddl import DDLGenerator, generate_schema_ddl, detect_backend

runner = MigrationRunner([Author, Post], migrations_dir="migrations")
changes = await runner.migrate()

detector = Autodetector([Author, Post], migrations_dir="migrations")
operations = detector.detect()
path = detector.write_migration(operations)

Operation classes:

CreateTable(table, columns, schema="", model=None)
AddField(table, column, schema="", model=None)
AlterField(table, new_col, old_col=None, schema="", model=None)
CreateIndex(table, name, fields, unique=False, schema="", model=None)
RunSQL(sql, reverse_sql="")

DDL helpers support PostgreSQL, MySQL, and SQLite and include schema-aware SQL generation for PostgreSQL.

Sync and Async Bridgeโ€‹

from ryx import run_sync, run_async, sync_to_async, async_to_sync

posts = run_sync(Post.objects.filter(active=True))

async_fn = sync_to_async(blocking_function)
result = await async_fn()

wrapped = async_to_sync(async_function)
value = wrapped()

result = await run_async(blocking_function, arg1, key="value")

Prefer native await inside async applications. Use the bridge helpers for scripts, WSGI handlers, and synchronous integrations.

Exceptionsโ€‹

from ryx import (
RyxError,
DatabaseError,
DoesNotExist,
MultipleObjectsReturned,
PoolNotInitialized,
ValidationError,
)

DoesNotExist and MultipleObjectsReturned also exist as per-model subclasses:

try:
await Post.objects.get(slug="missing")
except Post.DoesNotExist:
...

CLI Entrypointsโ€‹

python -m ryx --url sqlite:///app.db migrate --models myapp.models
ryx --url sqlite:///app.db migrate --models myapp.models

See CLI for command options and config file formats.