API Reference
This page documents the public Python API exposed by ryx. It is aligned with
the current codebase: ryx-python/ryx/__init__.py, models.py,
queryset.py, fields.py, transaction.py, cache.py, signals.py,
router.py, and the PyO3 extension stub.
Setup and Connection Poolsโ
import ryx
await ryx.setup(
"sqlite:///app.db",
max_connections=10,
min_connections=1,
connect_timeout=30,
idle_timeout=600,
max_lifetime=1800,
)
setup() accepts either a single URL or a mapping of aliases to URLs:
await ryx.setup({
"default": "postgres://user:pass@localhost/app",
"replica": "postgres://user:pass@localhost/app_replica",
"logs": "sqlite:///logs.db",
})
Connection helpers:
| Function | Purpose |
|---|---|
await ryx.setup(urls, **pool_options) | Initialize one or more connection pools |
ryx.is_connected(alias="default") | Return whether a pool alias is initialized |
ryx.list_aliases() | Return configured database aliases |
ryx.pool_stats() | Return pool statistics from the Rust backend |
Auto-initialization runs on import unless RYX_AUTO_INITIALIZE is one of
0, false, n, or no. It reads RYX_DATABASE_URL,
RYX_DB_<ALIAS>_URL, and the first config file found among ryx.yaml,
ryx.yml, ryx.toml, and ryx.json.
Logging is controlled by RYX_LOG_LEVEL. Use NO_LOG, OFF, or SILENT to
disable Ryx logging.
Modelsโ
from ryx import Model, CharField, IntField, Index, Constraint
class Post(Model):
title = CharField(max_length=200, db_index=True)
views = IntField(default=0)
class Meta:
table_name = "blog_posts"
app_label = "blog"
database = "default"
ordering = ["-views"]
unique_together = [("title", "views")]
index_together = [("title", "views")]
indexes = [Index(fields=["title"], name="idx_post_title")]
constraints = [Constraint(check="views >= 0", name="chk_views_positive")]
abstract = False
managed = True
Model behavior:
| API | Purpose |
|---|---|
Model(**kwargs) | Create an unsaved instance with field defaults applied |
obj.pk | Return the primary-key value |
await obj.full_clean() | Run field validators and clean() |
await obj.save(validate=True, update_fields=None, using=None) | Insert or update the row |
await obj.delete() | Delete the row and clear its primary key |
await obj.refresh_from_db(fields=None) | Reload fields from the database |
Model.DoesNotExist | Per-model exception raised by get() |
Model.MultipleObjectsReturned | Per-model exception raised by get() |
Lifecycle hooks can be overridden:
class Post(Model):
title = CharField(max_length=200)
async def clean(self):
if not self.title.strip():
raise ValidationError({"title": ["Title cannot be empty"]})
async def before_save(self, created: bool): ...
async def after_save(self, created: bool): ...
async def before_delete(self): ...
async def after_delete(self): ...
Managersโ
Every concrete model receives objects = Manager() unless explicitly
overridden.
Post.objects.all()
Post.objects.filter(active=True)
Post.objects.exclude(status="draft")
Post.objects.order_by("-created_at")
Post.objects.using("replica")
Post.objects.cache(ttl=60)
Post.objects.stream(chunk_size=100)
await Post.objects.create(title="Hello")
await Post.objects.get(pk=1)
await Post.objects.first()
await Post.objects.last()
await Post.objects.exists()
await Post.objects.count()
await Post.objects.aggregate(total=Count("id"))
await Post.objects.get_or_create(slug="hello", defaults={"title": "Hello"})
await Post.objects.update_or_create(slug="hello", defaults={"title": "New"})
await Post.objects.bulk_create([Post(title="A"), Post(title="B")])
await Post.objects.bulk_update(posts, fields=["title"])
await Post.objects.bulk_delete(Post.objects.filter(active=False))
QuerySetโ
QuerySet is lazy, async, chainable, and immutable. SQL is executed only when
the queryset is awaited or when an async terminal method is called.
Building Queriesโ
qs = (
Post.objects
.filter(active=True, views__gte=100)
.exclude(title__istartswith="draft")
.order_by("-views", "title")
.limit(20)
.offset(40)
)
posts = await qs
| Method | Purpose |
|---|---|
.filter(*q, **kwargs) | Add AND-ed filters and Q expressions |
.exclude(*q, **kwargs) | Add negated filters |
.all() | Clone the queryset |
.order_by(*fields) | Add ordering; "-field" means descending |
.limit(n) | Add LIMIT |
.offset(n) | Add OFFSET |
.distinct() | Add DISTINCT |
.values(*fields) | Select fields and enable grouping for annotations |
.annotate(**aggs) | Add aggregate expressions per row |
.join(table, on, alias=None, kind="INNER") | Add an explicit SQL join |
.using(alias) | Force a database alias |
.schema(schema) | Target a PostgreSQL schema |
.cache(ttl=None, key=None) | Cache the first evaluated result |
.stream(chunk_size=100, keyset=None, as_dict=False) | Iterate in chunks |
Slicing is translated to LIMIT/OFFSET:
first_ten = await Post.objects.order_by("id")[:10]
page_two = await Post.objects.order_by("id")[10:20]
third = await Post.objects.order_by("id")[2]
Negative indexes and step slicing are not supported.
select_related() exists on the Python queryset but is currently a no-op in
the implementation. Use explicit .join() when you need an eager SQL join.
Terminal Methodsโ
await qs.count()
await qs.exists()
await qs.first()
await qs.get(pk=1)
await qs.update(views=100)
await qs.delete()
await qs.bulk_delete()
await qs.in_bulk([1, 2, 3])
| Method | Return |
|---|---|
await qs | list[Model] |
await qs.count() | int |
await qs.exists() | bool |
await qs.first() | `Model |
await qs.get(**filters) | Exactly one model or per-model exception |
await qs.update(**fields) | Number of updated rows |
await qs.delete() | Number of deleted rows |
await qs.in_bulk(ids, field_name="pk") | dict[value, Model] |
Debug SQL:
print(Post.objects.filter(active=True).query)
Q Objectsโ
from ryx import Q
Post.objects.filter(Q(active=True) | Q(views__gte=1000))
Post.objects.filter(Q(active=True) & ~Q(status="draft"))
Post.objects.filter((Q(active=True) & Q(views__gte=100)) | Q(featured=True))
Multiple kwargs inside one Q() are AND-ed. Q objects can be combined with
regular filter kwargs.
Lookups and Transformsโ
Common lookups:
exact, gt, gte, lt, lte,
contains, icontains,
startswith, istartswith,
endswith, iendswith,
isnull, in, range
Date/time transforms:
date, year, month, day, hour, minute, second, week,
dow, quarter, time, iso_week, iso_dow
JSON transforms/lookups:
key, key_text, json,
has_key, has_any, has_all, contains, contained_by
Examples:
await Post.objects.filter(title__icontains="ryx")
await Post.objects.filter(created_at__year__gte=2025)
await Event.objects.filter(payload__key_text__icontains="error")
Runtime helpers:
ryx.available_lookups()
ryx.list_lookups()
ryx.available_transforms()
ryx.register_lookup("ne", "{col} <> ?")
Decorator form:
@ryx.lookup("ne")
def not_equal():
"""{col} <> ?"""
Aggregatesโ
from ryx import Count, Sum, Avg, Min, Max, RawAgg
await Post.objects.aggregate(
total=Count("id"),
total_views=Sum("views"),
avg_views=Avg("views"),
min_views=Min("views"),
max_views=Max("views"),
)
rows = await (
Post.objects
.values("author_id")
.annotate(post_count=Count("id"), views=Sum("views"))
)
Count("field", distinct=True) produces a distinct count. RawAgg(sql, alias) is available for backend-specific expressions.
Fieldsโ
from ryx import (
AutoField, BigAutoField, SmallAutoField,
IntField, SmallIntField, BigIntField, PositiveIntField,
FloatField, DecimalField,
BooleanField, NullBooleanField,
CharField, TextField, SlugField, EmailField, URLField, IPAddressField,
DateField, DateTimeField, TimeField, DurationField,
UUIDField, JSONField, BinaryField, ArrayField,
ForeignKey, OneToOneField, ManyToManyField,
)
Common field options:
CharField(
max_length=200,
null=False,
blank=False,
default="",
primary_key=False,
unique=False,
db_index=False,
choices=["draft", "published"],
validators=[],
editable=True,
help_text="",
verbose_name="",
db_column="db_column_name",
unique_for_date=None,
unique_for_month=None,
unique_for_year=None,
)
See Field Reference for the full field matrix.
Relationshipsโ
class Author(Model):
name = CharField(max_length=100)
class Post(Model):
author = ForeignKey(Author, on_delete="CASCADE", related_name="posts")
class Profile(Model):
author = OneToOneField(Author, on_delete="CASCADE")
class Tag(Model):
name = CharField(max_length=50)
class TaggedPost(Model):
post = ForeignKey(Post, on_delete="CASCADE")
tag = ForeignKey(Tag, on_delete="CASCADE")
class PostTag(Model):
tags = ManyToManyField(Tag, through=TaggedPost)
Relationship helpers include forward descriptors, reverse FK managers, and
many-to-many managers. Reverse FK managers expose queryset-like helpers such as
all(), filter(), exclude(), order_by(), count(), exists(),
first(), get(), create(), add(), remove(), and delete().
Transactionsโ
from ryx import transaction, get_active_transaction
async with transaction("default") as tx:
await Account.objects.get(pk=1)
tx.savepoint("before_transfer")
tx.rollback_to("before_transfer")
tx.release_savepoint("before_transfer")
current = get_active_transaction()
The transaction context commits on successful exit and rolls back on exception. Nested transactions are implemented with savepoints by the Rust backend.
Bulk Operations and Streamingโ
from ryx import bulk_create, bulk_update, bulk_delete, stream
await bulk_create([Post(title="A"), Post(title="B")], batch_size=500)
await bulk_update(posts, fields=["title"], batch_size=500)
await bulk_delete(Post.objects.filter(active=False))
async for post in Post.objects.filter(active=True).stream(chunk_size=100):
...
async for row in Post.objects.stream(chunk_size=500, keyset="id", as_dict=True):
...
keyset streaming uses cursor-style pagination and is preferred for large
tables when the keyset column is indexed.
Validationโ
from ryx import (
ValidationError, Validator, FunctionValidator,
NotNullValidator, NotBlankValidator,
MaxLengthValidator, MinLengthValidator,
MinValueValidator, MaxValueValidator,
RangeValidator, RegexValidator,
EmailValidator, URLValidator, ChoicesValidator,
)
Validation runs in save() by default. Pass validate=False to skip it:
await post.save(validate=False)
Signalsโ
from ryx import (
Signal, receiver,
pre_save, post_save,
pre_delete, post_delete,
pre_update, post_update,
pre_bulk_delete, post_bulk_delete,
)
@receiver(post_save, sender=Post)
async def on_post_saved(sender, instance, created, **kwargs):
...
Signal methods:
signal.connect(handler, sender=None, weak=True)
signal.disconnect(handler, sender=None)
await signal.send(sender, **kwargs)
Cachingโ
from ryx import MemoryCache, configure_cache, get_cache
from ryx import invalidate, invalidate_model, invalidate_all
configure_cache(MemoryCache(max_size=1000, ttl=300))
posts = await Post.objects.filter(active=True).cache(ttl=60)
named = await Post.objects.all().cache(key="all_posts", ttl=300)
await invalidate("all_posts")
await invalidate_model(Post)
await invalidate_all()
cache = get_cache()
MemoryCache implements get, set, delete, delete_many, clear, and
keys. It is process-local and TTL-aware.
Raw SQL and Parameterized Executionโ
from ryx.executor_helpers import raw_fetch, raw_execute
from ryx.pool_ext import fetch_with_params, execute_with_params
rows = await raw_fetch("SELECT * FROM posts WHERE active = true", alias="default")
await raw_execute("CREATE INDEX IF NOT EXISTS idx_posts_active ON posts(active)")
rows = await fetch_with_params("SELECT * FROM posts WHERE id = ?", [1])
affected = await execute_with_params("UPDATE posts SET views = ? WHERE id = ?", [10, 1])
Use parameterized helpers for user input. Raw SQL helpers execute the SQL string directly.
Database Routingโ
from ryx.router import BaseRouter, set_router
class Router(BaseRouter):
def db_for_read(self, model, **hints):
return "replica"
def db_for_write(self, model, **hints):
return getattr(model._meta, "database", None) or "default"
def allow_migrate(self, db, app_label, model_name):
return True
set_router(Router())
Read alias resolution order:
- QuerySet
.using(alias) Router.db_for_read(model)Model.Meta.database"default"
Write alias resolution order:
- Explicit
save(using=alias)or QuerySet.using(alias)where applicable Router.db_for_write(model)Model.Meta.database"default"
Migrationsโ
from ryx.migrations import MigrationRunner
from ryx.migrations.autodetect import Autodetector
from ryx.migrations.ddl import DDLGenerator, generate_schema_ddl, detect_backend
runner = MigrationRunner([Author, Post], migrations_dir="migrations")
changes = await runner.migrate()
detector = Autodetector([Author, Post], migrations_dir="migrations")
operations = detector.detect()
path = detector.write_migration(operations)
Operation classes:
CreateTable(table, columns, schema="", model=None)
AddField(table, column, schema="", model=None)
AlterField(table, new_col, old_col=None, schema="", model=None)
CreateIndex(table, name, fields, unique=False, schema="", model=None)
RunSQL(sql, reverse_sql="")
DDL helpers support PostgreSQL, MySQL, and SQLite and include schema-aware SQL generation for PostgreSQL.
Sync and Async Bridgeโ
from ryx import run_sync, run_async, sync_to_async, async_to_sync
posts = run_sync(Post.objects.filter(active=True))
async_fn = sync_to_async(blocking_function)
result = await async_fn()
wrapped = async_to_sync(async_function)
value = wrapped()
result = await run_async(blocking_function, arg1, key="value")
Prefer native await inside async applications. Use the bridge helpers for
scripts, WSGI handlers, and synchronous integrations.
Exceptionsโ
from ryx import (
RyxError,
DatabaseError,
DoesNotExist,
MultipleObjectsReturned,
PoolNotInitialized,
ValidationError,
)
DoesNotExist and MultipleObjectsReturned also exist as per-model subclasses:
try:
await Post.objects.get(slug="missing")
except Post.DoesNotExist:
...
CLI Entrypointsโ
python -m ryx --url sqlite:///app.db migrate --models myapp.models
ryx --url sqlite:///app.db migrate --models myapp.models
See CLI for command options and config file formats.