Reading Records
Get a Single Recordโ
# By primary key
post = await Post.objects.get(pk=1)
# By any field
post = await Post.objects.get(slug="hello-world")
# Raises DoesNotExist if not found
# Raises MultipleObjectsReturned if more than one matches
Safe Single Record Accessโ
# Returns None if not found
post = await Post.objects.filter(slug="hello-world").first()
# Returns None if empty
post = await Post.objects.last()
Get All Recordsโ
all_posts = await Post.objects.all()
Filtered Resultsโ
active_posts = await Post.objects.filter(active=True)
popular_posts = await Post.objects.filter(views__gte=1000)
Check Existenceโ
if await Post.objects.filter(active=True).exists():
print("There are active posts")
Countโ
count = await Post.objects.filter(active=True).count()
Bulk Lookupโ
posts = await Post.objects.in_bulk([1, 2, 3, 4, 5])
# โ {1: <Post pk=1>, 2: <Post pk=2>, ...}
Slicingโ
# First 10
first_ten = await Post.objects.all()[:10]
# Pagination
page_two = await Post.objects.all()[10:20]
# Single by index
third = await Post.objects.all()[2]
Async Iterationโ
async for post in Post.objects.filter(active=True):
process(post)
Streaming Large Resultsโ
Via bulk.streamโ
from ryx.bulk import stream
async for batch in stream(Post.objects.filter(active=True), chunk_size=500):
for post in batch:
process(post)
Via QuerySet.streamโ
# Keyset-based pagination (efficient for large datasets)
async for batch in (
Post.objects
.filter(active=True)
.order_by("id")
.stream(chunk_size=100, keyset="id")
):
for post in batch:
process(post)
# As dicts instead of model instances
async for batch in (
Post.objects
.filter(active=True)
.stream(chunk_size=100, as_dict=True)
):
for row in batch:
print(row["title"])
Next Stepsโ
โ Updating โ Modify existing records