Skip to content

第12章:中间件和信号

12.2 Django信号

信号系统概念

Django信号系统允许某些发送者通知一组接收者发生了某些操作。当某些代码执行时,信号可以让解耦的应用得到通知。信号是Django中实现观察者模式的一种方式。

信号的主要用途:

  1. 在模型保存或删除时执行某些操作
  2. 在用户登录或注销时执行某些操作
  3. 在请求处理过程中执行某些操作
  4. 实现应用间的解耦通信

内置信号

Django提供了许多内置信号:

python
# 常用的内置信号
from django.db.models.signals import pre_save, post_save, pre_delete, post_delete
from django.contrib.auth.signals import user_logged_in, user_logged_out, user_login_failed
from django.core.signals import request_started, request_finished
from django.db.backends.signals import connection_created

# 模型保存信号
@receiver(pre_save, sender=MyModel)
def my_model_pre_save(sender, instance, **kwargs):
    """在模型保存之前执行"""
    print(f"即将保存 {instance}")

@receiver(post_save, sender=MyModel)
def my_model_post_save(sender, instance, created, **kwargs):
    """在模型保存之后执行"""
    if created:
        print(f"新创建了 {instance}")
    else:
        print(f"更新了 {instance}")

# 模型删除信号
@receiver(pre_delete, sender=MyModel)
def my_model_pre_delete(sender, instance, **kwargs):
    """在模型删除之前执行"""
    print(f"即将删除 {instance}")

@receiver(post_delete, sender=MyModel)
def my_model_post_delete(sender, instance, **kwargs):
    """在模型删除之后执行"""
    print(f"已删除 {instance}")

# 用户认证信号
@receiver(user_logged_in)
def user_logged_in_callback(sender, request, user, **kwargs):
    """用户登录时执行"""
    print(f"{user.username} 已登录")
    # 记录登录日志
    LoginLog.objects.create(user=user, action='login')

@receiver(user_logged_out)
def user_logged_out_callback(sender, request, user, **kwargs):
    """用户注销时执行"""
    print(f"{user.username} 已注销")
    # 记录注销日志
    LoginLog.objects.create(user=user, action='logout')

@receiver(user_login_failed)
def user_login_failed_callback(sender, credentials, request, **kwargs):
    """用户登录失败时执行"""
    print(f"登录失败: {credentials.get('username')}")
    # 记录失败日志
    FailedLoginLog.objects.create(
        username=credentials.get('username'),
        ip_address=request.META.get('REMOTE_ADDR')
    )

# 请求处理信号
@receiver(request_started)
def request_started_callback(sender, **kwargs):
    """请求开始时执行"""
    print("请求开始处理")

@receiver(request_finished)
def request_finished_callback(sender, **kwargs):
    """请求完成时执行"""
    print("请求处理完成")

自定义信号

创建和使用自定义信号:

python
# signals.py
import django.dispatch

# 定义自定义信号
payment_completed = django.dispatch.Signal(providing_args=["order", "amount", "user"])
user_profile_updated = django.dispatch.Signal(providing_args=["user", "old_profile", "new_profile"])
email_sent = django.dispatch.Signal(providing_args=["recipient", "subject", "template"])

# 在Django 3.1+中,推荐不指定providing_args
payment_completed = django.dispatch.Signal()
user_profile_updated = django.dispatch.Signal()
email_sent = django.dispatch.Signal()

# 发送信号
# 方式1:使用send()
payment_completed.send(sender=None, order=order, amount=amount, user=user)

# 方式2:使用send_robust()(更安全)
payment_completed.send_robust(sender=None, order=order, amount=amount, user=user)

# models.py
from django.db import models
from django.contrib.auth.models import User
from .signals import payment_completed

class Order(models.Model):
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    amount = models.DecimalField(max_digits=10, decimal_places=2)
    status = models.CharField(max_length=20, default='pending')
    created_at = models.DateTimeField(auto_now_add=True)
    
    def complete_payment(self):
        """完成支付"""
        self.status = 'completed'
        self.save()
        
        # 发送支付完成信号
        payment_completed.send(
            sender=self.__class__,
            order=self,
            amount=self.amount,
            user=self.user
        )

# receivers.py
from django.dispatch import receiver
from .signals import payment_completed, user_profile_updated
from .models import Order, UserProfile

@receiver(payment_completed)
def handle_payment_completed(sender, order, amount, user, **kwargs):
    """处理支付完成"""
    # 发送确认邮件
    send_payment_confirmation_email(user, order)
    
    # 更新用户积分
    user.profile.points += int(amount)
    user.profile.save()
    
    # 记录日志
    PaymentLog.objects.create(
        user=user,
        order=order,
        amount=amount,
        status='completed'
    )

@receiver(user_profile_updated)
def handle_profile_updated(sender, user, old_profile, new_profile, **kwargs):
    """处理用户资料更新"""
    # 检查重要信息变更
    if old_profile.email != new_profile.email:
        # 发送邮箱验证邮件
        send_email_verification(user, new_profile.email)
    
    # 记录变更日志
    ProfileChangeLog.objects.create(
        user=user,
        old_data=old_profile.__dict__,
        new_data=new_profile.__dict__
    )

信号处理器

信号处理器的实现和最佳实践:

python
# apps.py
from django.apps import AppConfig

class MyAppConfig(AppConfig):
    default_auto_field = 'django.db.models.BigAutoField'
    name = 'myapp'
    
    def ready(self):
        """应用准备就绪时注册信号处理器"""
        import myapp.signals  # 确保信号处理器被加载

# signals.py
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.core.cache import cache
from django.contrib.auth.models import User
from .models import Article, Comment

# 使用weak=False确保信号处理器不会被垃圾回收
@receiver(post_save, sender=Article, weak=False)
def invalidate_article_cache(sender, instance, **kwargs):
    """文章保存时清除缓存"""
    cache.delete(f'article_{instance.pk}')
    cache.delete('article_list')
    print(f"已清除文章 {instance.pk} 的缓存")

# 条件性接收信号
@receiver(post_save, sender=Comment)
def notify_on_comment(sender, instance, created, **kwargs):
    """新评论时发送通知"""
    if created and instance.article.author != instance.author:
        # 只有新评论且评论者不是文章作者时才发送通知
        send_notification(
            recipient=instance.article.author,
            message=f"您的文章 '{instance.article.title}' 有新评论"
        )

# 使用sender参数接收多个模型的信号
@receiver(post_save, sender=Article)
@receiver(post_save, sender=Comment)
def update_last_modified(sender, instance, **kwargs):
    """更新最后修改时间"""
    from django.utils import timezone
    # 更新站点最后修改时间
    SiteSettings.objects.update(last_modified=timezone.now())

# 异步信号处理器
import threading

@receiver(post_save, sender=Article)
def send_email_async(sender, instance, created, **kwargs):
    """异步发送邮件"""
    if created:
        # 在新线程中发送邮件,避免阻塞主请求
        email_thread = threading.Thread(
            target=send_welcome_email,
            args=(instance.author.email,)
        )
        email_thread.daemon = True
        email_thread.start()

# 信号处理器中的异常处理
@receiver(post_save, sender=Article)
def risky_signal_handler(sender, instance, **kwargs):
    """可能出错的信号处理器"""
    try:
        # 可能失败的操作
        external_api_call(instance)
    except Exception as e:
        # 记录错误但不中断其他信号处理器
        import logging
        logger = logging.getLogger(__name__)
        logger.error(f"信号处理器出错: {e}")
        # 可以选择重新抛出异常或静默处理
        # raise  # 重新抛出会中断其他信号处理器

# 使用dispatch_uid避免重复注册
@receiver(post_save, sender=Article, dispatch_uid="unique_article_handler")
def unique_article_handler(sender, instance, **kwargs):
    """确保只注册一次的信号处理器"""
    print("处理文章保存")

# 基于条件的信号处理器
from django.conf import settings

@receiver(post_save, sender=Article)
def production_only_handler(sender, instance, **kwargs):
    """仅在生产环境中执行"""
    if not settings.DEBUG:
        # 生产环境特定的处理
        send_to_analytics(instance)

# 信号处理器中的数据库事务
from django.db import transaction

@receiver(post_save, sender=Article)
def transactional_handler(sender, instance, **kwargs):
    """事务性信号处理器"""
    with transaction.atomic():
        try:
            # 执行相关操作
            update_statistics(instance)
            create_audit_log(instance)
        except Exception as e:
            # 回滚事务并记录错误
            import logging
            logger = logging.getLogger(__name__)
            logger.error(f"事务性信号处理器出错: {e}")
            raise  # 重新抛出异常

# 信号处理器的最佳实践示例
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
    """创建用户资料"""
    if created:
        # 新用户创建时创建资料
        from .models import UserProfile
        UserProfile.objects.create(user=instance)

@receiver(post_delete, sender=Article)
def cleanup_article_files(sender, instance, **kwargs):
    """删除文章时清理相关文件"""
    # 删除文章相关的上传文件
    if instance.image:
        instance.image.delete(save=False)
    
    # 清理缓存
    cache.delete_many([
        f'article_{instance.pk}',
        'article_list',
        f'user_articles_{instance.author.pk}'
    ])

# 复杂信号处理器示例
@receiver(post_save, sender=Article)
def complex_article_handler(sender, instance, created, **kwargs):
    """复杂的文章处理逻辑"""
    # 1. 更新搜索索引
    update_search_index.delay(instance.pk)  # 使用Celery异步任务
    
    # 2. 发送通知
    if created:
        send_notification_to_followers(instance.author, instance)
    
    # 3. 更新统计信息
    update_user_stats(instance.author)
    
    # 4. 生成缩略图(如果是图片文章)
    if instance.image and created:
        generate_thumbnail.delay(instance.image.path)
    
    # 5. 记录活动日志
    ActivityLog.objects.create(
        user=instance.author,
        action='created' if created else 'updated',
        content_type='article',
        object_id=instance.pk
    )

通过合理使用信号系统,可以实现应用间的解耦,使代码更加模块化和可维护。

小结

Django信号系统提供了强大的解耦机制:

  1. ✅ 理解信号系统的基本概念和工作原理
  2. ✅ 掌握常用内置信号的使用方法
  3. ✅ 能够创建和使用自定义信号
  4. ✅ 实现信号处理器处理特定业务逻辑
  5. ✅ 遵循信号处理的最佳实践

合理运用信号系统能够构建更加灵活和可维护的Django应用。

下一篇

我们将学习Django项目实战开发。

13.1 项目规划 →

目录

返回课程目录

Released under the Apache 2.0 License.