第16章:性能优化
16.1 数据库优化
查询优化
数据库查询优化是提升Django应用性能的关键:
python
# models.py
from django.db import models
class Category(models.Model):
name = models.CharField(max_length=100)
slug = models.SlugField(unique=True)
class Meta:
indexes = [
models.Index(fields=['slug']),
]
class Tag(models.Model):
name = models.CharField(max_length=50)
slug = models.SlugField(unique=True)
class Meta:
indexes = [
models.Index(fields=['slug']),
]
class Article(models.Model):
title = models.CharField(max_length=200)
author = models.ForeignKey(User, on_delete=models.CASCADE, db_index=True)
category = models.ForeignKey(Category, on_delete=models.SET_NULL, null=True, blank=True, db_index=True)
tags = models.ManyToManyField(Tag, blank=True)
content = models.TextField()
view_count = models.PositiveIntegerField(default=0, db_index=True)
created_at = models.DateTimeField(auto_now_add=True, db_index=True)
updated_at = models.DateTimeField(auto_now=True)
status = models.CharField(max_length=10, default='draft', db_index=True)
class Meta:
indexes = [
models.Index(fields=['status', 'created_at']),
models.Index(fields=['author', 'created_at']),
models.Index(fields=['view_count']),
models.Index(fields=['-created_at']),
]
ordering = ['-created_at']
# views.py - 优化前的查询
def article_list_bad(request):
"""性能较差的文章列表查询"""
articles = Article.objects.all() # 获取所有字段
for article in articles:
# N+1查询问题
print(article.author.username) # 每次都查询作者
print(article.category.name) # 每次都查询分类
for tag in article.tags.all(): # 每次都查询标签
print(tag.name)
return render(request, 'article_list.html', {'articles': articles})
# views.py - 优化后的查询
def article_list_good(request):
"""优化后的文章列表查询"""
articles = Article.objects.select_related(
'author', 'category' # 预加载外键关联对象
).prefetch_related(
'tags' # 预加载多对多关系
).filter(
status='published'
).only( # 只获取需要的字段
'title', 'content', 'author__username', 'category__name'
)[:20] # 限制结果数量
return render(request, 'article_list.html', {'articles': articles})
# 使用Prefetch进行更精细的控制
from django.db.models import Prefetch
def article_list_advanced(request):
"""高级优化的文章列表查询"""
# 预加载标签并添加过滤条件
articles = Article.objects.select_related(
'author', 'category'
).prefetch_related(
Prefetch(
'tags',
queryset=Tag.objects.filter(name__startswith='tech'),
to_attr='tech_tags' # 自定义属性名
)
).filter(
status='published'
).annotate(
# 添加注解字段
comment_count=models.Count('comments', distinct=True)
).order_by('-created_at')[:20]
return render(request, 'article_list.html', {'articles': articles})
# 避免不必要的查询
def article_detail_optimized(request, slug):
"""优化的文章详情查询"""
try:
# 使用select_related和prefetch_related
article = Article.objects.select_related(
'author__profile', 'category'
).prefetch_related(
Prefetch(
'comments',
queryset=Comment.objects.select_related('author').filter(
is_approved=True, parent=None
),
to_attr='approved_comments'
),
'tags'
).get(slug=slug, status='published')
# 获取相关文章(避免N+1查询)
related_articles = Article.objects.filter(
category=article.category,
status='published'
).exclude(
pk=article.pk
).select_related('author').only(
'title', 'slug', 'created_at'
)[:5]
return render(request, 'article_detail.html', {
'article': article,
'related_articles': related_articles
})
except Article.DoesNotExist:
raise Http404("文章不存在")
# 使用聚合查询减少数据库访问
def category_stats(request):
"""分类统计查询"""
# 使用annotate进行聚合
categories = Category.objects.annotate(
article_count=models.Count('article', filter=models.Q(article__status='published')),
total_views=models.Sum('article__view_count', filter=models.Q(article__status='published'))
).filter(
article_count__gt=0
).order_by('-article_count')
return render(request, 'category_stats.html', {'categories': categories})
# 批量操作优化
def bulk_create_articles(request):
"""批量创建文章优化"""
articles_data = [
{'title': f'文章{i}', 'content': f'内容{i}', 'author_id': 1}
for i in range(1000)
]
# 使用bulk_create减少SQL查询次数
articles = [Article(**data) for data in articles_data]
Article.objects.bulk_create(articles, batch_size=100)
return JsonResponse({'status': 'success'})
def bulk_update_articles(request):
"""批量更新文章优化"""
# 获取需要更新的文章
articles = Article.objects.filter(status='draft')
# 使用bulk_update减少SQL查询次数
for article in articles:
article.status = 'published'
Article.objects.bulk_update(articles, ['status'], batch_size=100)
return JsonResponse({'status': 'success'})select_related和prefetch_related
详细使用select_related和prefetch_related:
python
# select_related示例 - 用于外键和一对一关系
def select_related_examples():
"""select_related使用示例"""
# 基本用法
articles = Article.objects.select_related('author').all()
# 生成的SQL: SELECT * FROM article INNER JOIN auth_user ON ...
# 多层关联
articles = Article.objects.select_related('author__profile', 'category').all()
# 生成的SQL: SELECT * FROM article
# INNER JOIN auth_user ON ...
# INNER JOIN user_profile ON ...
# INNER JOIN category ON ...
# 结合过滤
published_articles = Article.objects.select_related(
'author', 'category'
).filter(
status='published'
).order_by('-created_at')[:10]
# prefetch_related示例 - 用于多对多和反向外键关系
def prefetch_related_examples():
"""prefetch_related使用示例"""
# 基本用法
articles = Article.objects.prefetch_related('tags').all()
# 生成两条SQL:
# 1. SELECT * FROM article
# 2. SELECT * FROM tag INNER JOIN article_tags ON ...
# 多个关系
articles = Article.objects.prefetch_related('tags', 'comments').all()
# 嵌套预加载
articles = Article.objects.prefetch_related(
'tags',
'comments__author' # 评论的作者
).all()
# 使用Prefetch对象进行精细控制
from django.db.models import Prefetch
articles = Article.objects.prefetch_related(
Prefetch(
'comments',
queryset=Comment.objects.select_related('author').filter(is_approved=True),
to_attr='approved_comments' # 自定义属性名
)
).all()
# 性能对比示例
def performance_comparison():
"""性能对比示例"""
# 未优化的查询 - N+1问题
print("未优化查询:")
start_time = time.time()
articles = Article.objects.all()
for article in articles[:10]: # 只查看前10篇文章
print(f"文章: {article.title}")
print(f"作者: {article.author.username}") # 每次都查询数据库
print(f"分类: {article.category.name}") # 每次都查询数据库
for tag in article.tags.all(): # 每次都查询数据库
print(f"标签: {tag.name}")
end_time = time.time()
print(f"耗时: {end_time - start_time:.2f}秒")
# 优化后的查询
print("\n优化后查询:")
start_time = time.time()
articles = Article.objects.select_related(
'author', 'category'
).prefetch_related('tags').all()[:10]
for article in articles:
print(f"文章: {article.title}")
print(f"作者: {article.author.username}")
print(f"分类: {article.category.name}")
for tag in article.tags.all():
print(f"标签: {tag.name}")
end_time = time.time()
print(f"耗时: {end_time - start_time:.2f}秒")数据库索引
创建和优化数据库索引:
python
# models.py
class Article(models.Model):
title = models.CharField(max_length=200)
slug = models.SlugField(unique=True)
author = models.ForeignKey(User, on_delete=models.CASCADE)
category = models.ForeignKey(Category, on_delete=models.SET_NULL, null=True)
content = models.TextField()
view_count = models.PositiveIntegerField(default=0)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
status = models.CharField(max_length=10, default='draft')
is_featured = models.BooleanField(default=False)
class Meta:
# 单字段索引
indexes = [
models.Index(fields=['slug']), # 唯一索引已在字段定义中指定
models.Index(fields=['view_count']),
models.Index(fields=['created_at']),
models.Index(fields=['status']),
models.Index(fields=['is_featured']),
]
# 复合索引
indexes += [
models.Index(fields=['status', 'created_at']), # 状态和创建时间
models.Index(fields=['author', 'created_at']), # 作者和创建时间
models.Index(fields=['category', 'status', 'created_at']), # 分类、状态和创建时间
]
# 部分索引(PostgreSQL)
# indexes += [
# models.Index(
# fields=['created_at'],
# condition=models.Q(status='published'),
# name='published_articles_created_at_idx'
# ),
# ]
# 自定义索引名称
class Category(models.Model):
name = models.CharField(max_length=100)
slug = models.SlugField(unique=True)
class Meta:
indexes = [
models.Index(
fields=['slug'],
name='category_slug_idx' # 自定义索引名称
),
]
# 函数索引示例(PostgreSQL)
# class Article(models.Model):
# title = models.CharField(max_length=200)
#
# class Meta:
# indexes = [
# models.Index(
# models.functions.Lower('title'),
# name='article_title_lower_idx'
# ),
# ]
# 数据库查询分析
def analyze_queries():
"""分析数据库查询"""
from django.db import connection
from django.conf import settings
# 启用查询日志
settings.DEBUG = True
# 执行查询
articles = Article.objects.select_related('author', 'category').filter(
status='published'
).order_by('-created_at')[:20]
# 分析查询
print(f"执行了 {len(connection.queries)} 个查询")
for i, query in enumerate(connection.queries):
print(f"查询 {i+1}: {query['sql']}")
print(f"耗时: {query['time']}秒")
print("-" * 50)
# 重置查询日志
connection.queries_log.clear()
# 使用django-debug-toolbar监控查询
# 安装: pip install django-debug-toolbar
# settings.py
# INSTALLED_APPS = [
# # ...
# 'debug_toolbar',
# ]
# MIDDLEWARE = [
# # ...
# 'debug_toolbar.middleware.DebugToolbarMiddleware',
# ]
# INTERNAL_IPS = [
# '127.0.0.1',
# ]通过这些数据库优化技术,可以显著提升Django应用的查询性能,减少数据库负载。
小结
Django数据库优化的核心要点:
- ✅ 使用select_related和prefetch_related解决N+1查询问题
- ✅ 合理设计数据库索引提升查询效率
- ✅ 实施批量操作减少数据库访问次数
- ✅ 使用聚合查询和注解减少数据传输
- ✅ 监控和分析查询性能找出瓶颈
数据库优化是提升应用性能的重要手段。
下一篇
我们将学习缓存系统的使用。