第12章:中间件和信号
12.2 Django信号
信号系统概念
Django信号系统允许某些发送者通知一组接收者发生了某些操作。当某些代码执行时,信号可以让解耦的应用得到通知。信号是Django中实现观察者模式的一种方式。
信号的主要用途:
- 在模型保存或删除时执行某些操作
- 在用户登录或注销时执行某些操作
- 在请求处理过程中执行某些操作
- 实现应用间的解耦通信
内置信号
Django提供了许多内置信号:
python
# 常用的内置信号
from django.db.models.signals import pre_save, post_save, pre_delete, post_delete
from django.contrib.auth.signals import user_logged_in, user_logged_out, user_login_failed
from django.core.signals import request_started, request_finished
from django.db.backends.signals import connection_created
# 模型保存信号
@receiver(pre_save, sender=MyModel)
def my_model_pre_save(sender, instance, **kwargs):
"""在模型保存之前执行"""
print(f"即将保存 {instance}")
@receiver(post_save, sender=MyModel)
def my_model_post_save(sender, instance, created, **kwargs):
"""在模型保存之后执行"""
if created:
print(f"新创建了 {instance}")
else:
print(f"更新了 {instance}")
# 模型删除信号
@receiver(pre_delete, sender=MyModel)
def my_model_pre_delete(sender, instance, **kwargs):
"""在模型删除之前执行"""
print(f"即将删除 {instance}")
@receiver(post_delete, sender=MyModel)
def my_model_post_delete(sender, instance, **kwargs):
"""在模型删除之后执行"""
print(f"已删除 {instance}")
# 用户认证信号
@receiver(user_logged_in)
def user_logged_in_callback(sender, request, user, **kwargs):
"""用户登录时执行"""
print(f"{user.username} 已登录")
# 记录登录日志
LoginLog.objects.create(user=user, action='login')
@receiver(user_logged_out)
def user_logged_out_callback(sender, request, user, **kwargs):
"""用户注销时执行"""
print(f"{user.username} 已注销")
# 记录注销日志
LoginLog.objects.create(user=user, action='logout')
@receiver(user_login_failed)
def user_login_failed_callback(sender, credentials, request, **kwargs):
"""用户登录失败时执行"""
print(f"登录失败: {credentials.get('username')}")
# 记录失败日志
FailedLoginLog.objects.create(
username=credentials.get('username'),
ip_address=request.META.get('REMOTE_ADDR')
)
# 请求处理信号
@receiver(request_started)
def request_started_callback(sender, **kwargs):
"""请求开始时执行"""
print("请求开始处理")
@receiver(request_finished)
def request_finished_callback(sender, **kwargs):
"""请求完成时执行"""
print("请求处理完成")自定义信号
创建和使用自定义信号:
python
# signals.py
import django.dispatch
# 定义自定义信号
payment_completed = django.dispatch.Signal(providing_args=["order", "amount", "user"])
user_profile_updated = django.dispatch.Signal(providing_args=["user", "old_profile", "new_profile"])
email_sent = django.dispatch.Signal(providing_args=["recipient", "subject", "template"])
# 在Django 3.1+中,推荐不指定providing_args
payment_completed = django.dispatch.Signal()
user_profile_updated = django.dispatch.Signal()
email_sent = django.dispatch.Signal()
# 发送信号
# 方式1:使用send()
payment_completed.send(sender=None, order=order, amount=amount, user=user)
# 方式2:使用send_robust()(更安全)
payment_completed.send_robust(sender=None, order=order, amount=amount, user=user)
# models.py
from django.db import models
from django.contrib.auth.models import User
from .signals import payment_completed
class Order(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE)
amount = models.DecimalField(max_digits=10, decimal_places=2)
status = models.CharField(max_length=20, default='pending')
created_at = models.DateTimeField(auto_now_add=True)
def complete_payment(self):
"""完成支付"""
self.status = 'completed'
self.save()
# 发送支付完成信号
payment_completed.send(
sender=self.__class__,
order=self,
amount=self.amount,
user=self.user
)
# receivers.py
from django.dispatch import receiver
from .signals import payment_completed, user_profile_updated
from .models import Order, UserProfile
@receiver(payment_completed)
def handle_payment_completed(sender, order, amount, user, **kwargs):
"""处理支付完成"""
# 发送确认邮件
send_payment_confirmation_email(user, order)
# 更新用户积分
user.profile.points += int(amount)
user.profile.save()
# 记录日志
PaymentLog.objects.create(
user=user,
order=order,
amount=amount,
status='completed'
)
@receiver(user_profile_updated)
def handle_profile_updated(sender, user, old_profile, new_profile, **kwargs):
"""处理用户资料更新"""
# 检查重要信息变更
if old_profile.email != new_profile.email:
# 发送邮箱验证邮件
send_email_verification(user, new_profile.email)
# 记录变更日志
ProfileChangeLog.objects.create(
user=user,
old_data=old_profile.__dict__,
new_data=new_profile.__dict__
)信号处理器
信号处理器的实现和最佳实践:
python
# apps.py
from django.apps import AppConfig
class MyAppConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'myapp'
def ready(self):
"""应用准备就绪时注册信号处理器"""
import myapp.signals # 确保信号处理器被加载
# signals.py
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.core.cache import cache
from django.contrib.auth.models import User
from .models import Article, Comment
# 使用weak=False确保信号处理器不会被垃圾回收
@receiver(post_save, sender=Article, weak=False)
def invalidate_article_cache(sender, instance, **kwargs):
"""文章保存时清除缓存"""
cache.delete(f'article_{instance.pk}')
cache.delete('article_list')
print(f"已清除文章 {instance.pk} 的缓存")
# 条件性接收信号
@receiver(post_save, sender=Comment)
def notify_on_comment(sender, instance, created, **kwargs):
"""新评论时发送通知"""
if created and instance.article.author != instance.author:
# 只有新评论且评论者不是文章作者时才发送通知
send_notification(
recipient=instance.article.author,
message=f"您的文章 '{instance.article.title}' 有新评论"
)
# 使用sender参数接收多个模型的信号
@receiver(post_save, sender=Article)
@receiver(post_save, sender=Comment)
def update_last_modified(sender, instance, **kwargs):
"""更新最后修改时间"""
from django.utils import timezone
# 更新站点最后修改时间
SiteSettings.objects.update(last_modified=timezone.now())
# 异步信号处理器
import threading
@receiver(post_save, sender=Article)
def send_email_async(sender, instance, created, **kwargs):
"""异步发送邮件"""
if created:
# 在新线程中发送邮件,避免阻塞主请求
email_thread = threading.Thread(
target=send_welcome_email,
args=(instance.author.email,)
)
email_thread.daemon = True
email_thread.start()
# 信号处理器中的异常处理
@receiver(post_save, sender=Article)
def risky_signal_handler(sender, instance, **kwargs):
"""可能出错的信号处理器"""
try:
# 可能失败的操作
external_api_call(instance)
except Exception as e:
# 记录错误但不中断其他信号处理器
import logging
logger = logging.getLogger(__name__)
logger.error(f"信号处理器出错: {e}")
# 可以选择重新抛出异常或静默处理
# raise # 重新抛出会中断其他信号处理器
# 使用dispatch_uid避免重复注册
@receiver(post_save, sender=Article, dispatch_uid="unique_article_handler")
def unique_article_handler(sender, instance, **kwargs):
"""确保只注册一次的信号处理器"""
print("处理文章保存")
# 基于条件的信号处理器
from django.conf import settings
@receiver(post_save, sender=Article)
def production_only_handler(sender, instance, **kwargs):
"""仅在生产环境中执行"""
if not settings.DEBUG:
# 生产环境特定的处理
send_to_analytics(instance)
# 信号处理器中的数据库事务
from django.db import transaction
@receiver(post_save, sender=Article)
def transactional_handler(sender, instance, **kwargs):
"""事务性信号处理器"""
with transaction.atomic():
try:
# 执行相关操作
update_statistics(instance)
create_audit_log(instance)
except Exception as e:
# 回滚事务并记录错误
import logging
logger = logging.getLogger(__name__)
logger.error(f"事务性信号处理器出错: {e}")
raise # 重新抛出异常
# 信号处理器的最佳实践示例
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
"""创建用户资料"""
if created:
# 新用户创建时创建资料
from .models import UserProfile
UserProfile.objects.create(user=instance)
@receiver(post_delete, sender=Article)
def cleanup_article_files(sender, instance, **kwargs):
"""删除文章时清理相关文件"""
# 删除文章相关的上传文件
if instance.image:
instance.image.delete(save=False)
# 清理缓存
cache.delete_many([
f'article_{instance.pk}',
'article_list',
f'user_articles_{instance.author.pk}'
])
# 复杂信号处理器示例
@receiver(post_save, sender=Article)
def complex_article_handler(sender, instance, created, **kwargs):
"""复杂的文章处理逻辑"""
# 1. 更新搜索索引
update_search_index.delay(instance.pk) # 使用Celery异步任务
# 2. 发送通知
if created:
send_notification_to_followers(instance.author, instance)
# 3. 更新统计信息
update_user_stats(instance.author)
# 4. 生成缩略图(如果是图片文章)
if instance.image and created:
generate_thumbnail.delay(instance.image.path)
# 5. 记录活动日志
ActivityLog.objects.create(
user=instance.author,
action='created' if created else 'updated',
content_type='article',
object_id=instance.pk
)通过合理使用信号系统,可以实现应用间的解耦,使代码更加模块化和可维护。
小结
Django信号系统提供了强大的解耦机制:
- ✅ 理解信号系统的基本概念和工作原理
- ✅ 掌握常用内置信号的使用方法
- ✅ 能够创建和使用自定义信号
- ✅ 实现信号处理器处理特定业务逻辑
- ✅ 遵循信号处理的最佳实践
合理运用信号系统能够构建更加灵活和可维护的Django应用。
下一篇
我们将学习Django项目实战开发。