Signals
Signals let you react to model lifecycle events without coupling code to the model itself.
Built-in Signals
| Signal | When | Kwargs |
|---|---|---|
pre_save | Before INSERT/UPDATE | sender, instance, created |
post_save | After INSERT/UPDATE | sender, instance, created |
pre_delete | Before DELETE | sender, instance |
post_delete | After DELETE | sender, instance |
pre_update | Before bulk .update() | sender, queryset, fields |
post_update | After bulk .update() | sender, queryset, updated_count, fields |
pre_bulk_delete | Before bulk .delete() | sender, queryset |
post_bulk_delete | After bulk .delete() | sender, queryset, deleted_count |
Connecting via Decorator
from ryx import receiver, post_save, post_delete
@receiver(post_save, sender=Post)
async def on_post_saved(sender, instance, created, **kwargs):
if created:
await send_notification(f"New post: {instance.title}")
else:
await invalidate_cache(f"post:{instance.pk}")
@receiver(post_delete, sender=Post)
async def on_post_deleted(sender, instance, **kwargs):
await cleanup_files(instance.pk)
Connecting via Method
async def log_change(sender, instance, **kwargs):
await audit_log.log(f"{sender.__name__} #{instance.pk} changed")
post_save.connect(log_change, sender=Post, weak=False)
tip
Use weak=False for long-lived receivers. Weak references are garbage-collected if nothing else holds a reference.
Disconnecting
post_save.disconnect(on_post_saved, sender=Post)
Custom Signals
from ryx import Signal
post_published = Signal("post_published")
@receiver(post_published, sender=Post)
async def notify_subscribers(sender, instance, **kwargs):
subscribers = await get_subscribers(instance.author_id)
for sub in subscribers:
await send_email(sub.email, f"New post: {instance.title}")
# Fire the signal
await post_published.send(sender=Post, instance=post)
Concurrent Execution
Signal handlers run concurrently via asyncio.gather. If one handler raises, others still execute.
Next Steps
→ Hooks — Per-instance lifecycle methods