PostgreSQL数据库设计与查询优化

从数据建模到查询优化,全面讲解PostgreSQL数据库设计的最佳实践、索引策略、慢查询分析和Prisma ORM使用技巧

2024年3月1日
12 min read
PostgreSQL数据库性能优化后端开发

前言

数据库设计是后端开发的核心技能之一。良好的数据库设计不仅能确保数据一致性,还能大幅提升查询性能。本文将系统性地介绍PostgreSQL数据库设计的最佳实践,从范式设计到查询优化,帮助你构建高性能的数据库系统。

数据库范式与反范式

第三范式(3NF)设计

-- ✅ 符合3NF的设计
-- 用户表
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    email VARCHAR(255) UNIQUE NOT NULL,
    name VARCHAR(100) NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);
 
-- 文章表
CREATE TABLE posts (
    id SERIAL PRIMARY KEY,
    title VARCHAR(255) NOT NULL,
    content TEXT NOT NULL,
    author_id INTEGER REFERENCES users(id),
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);
 
-- 评论表
CREATE TABLE comments (
    id SERIAL PRIMARY KEY,
    content TEXT NOT NULL,
    post_id INTEGER REFERENCES posts(id) ON DELETE CASCADE,
    author_id INTEGER REFERENCES users(id),
    created_at TIMESTAMP DEFAULT NOW()
);
 
-- 标签表
CREATE TABLE tags (
    id SERIAL PRIMARY KEY,
    name VARCHAR(50) UNIQUE NOT NULL
);
 
-- 文章标签关联表(多对多)
CREATE TABLE post_tags (
    post_id INTEGER REFERENCES posts(id) ON DELETE CASCADE,
    tag_id INTEGER REFERENCES tags(id) ON DELETE CASCADE,
    PRIMARY KEY (post_id, tag_id)
);

反范式设计的场景

-- 场景:需要频繁查询文章的评论数和点赞数
-- ❌ 每次都统计(性能差)
SELECT
    p.*,
    (SELECT COUNT(*) FROM comments WHERE post_id = p.id) as comment_count,
    (SELECT COUNT(*) FROM likes WHERE post_id = p.id) as like_count
FROM posts p;
 
-- ✅ 冗余计数字段(空间换时间)
ALTER TABLE posts
ADD COLUMN comment_count INTEGER DEFAULT 0,
ADD COLUMN like_count INTEGER DEFAULT 0;
 
-- 使用触发器维护计数
CREATE OR REPLACE FUNCTION update_post_comment_count()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'INSERT' THEN
        UPDATE posts SET comment_count = comment_count + 1 WHERE id = NEW.post_id;
    ELSIF TG_OP = 'DELETE' THEN
        UPDATE posts SET comment_count = comment_count - 1 WHERE id = OLD.post_id;
    END IF;
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;
 
CREATE TRIGGER trigger_update_comment_count
AFTER INSERT OR DELETE ON comments
FOR EACH ROW EXECUTE FUNCTION update_post_comment_count();

索引设计策略

单列索引

-- 为常用查询字段创建索引
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_posts_author_id ON posts(author_id);
CREATE INDEX idx_comments_post_id ON comments(post_id);
 
-- 查看索引使用情况
EXPLAIN ANALYZE
SELECT * FROM posts WHERE author_id = 1;
 
/*
Bitmap Heap Scan on posts  (cost=4.30..15.98 rows=10 width=...)
  Recheck Cond: (author_id = 1)
  ->  Bitmap Index Scan on idx_posts_author_id  (cost=0.00..4.29...)
*/

复合索引

// 场景:按分类和状态筛选文章
// ✅ 创建复合索引
CREATE INDEX idx_posts_category_status ON posts(category_id, status);
 
// 索引使用规则:最左前缀原则
// ✅ 会使用索引
SELECT * FROM posts WHERE category_id = 1 AND status = 'published';
SELECT * FROM posts WHERE category_id = 1;
 
// ❌ 不会使用索引
SELECT * FROM posts WHERE status = 'published';
 
// ✅ 优化:如果经常只按status查询,创建单独索引
CREATE INDEX idx_posts_status ON posts(status);

部分索引

-- ✅ 只为已发布的文章创建索引(减少索引大小)
CREATE INDEX idx_posts_published ON posts(created_at)
WHERE status = 'published';
 
-- 查询时必须包含相同的WHERE条件才会使用索引
SELECT * FROM posts
WHERE status = 'published' AND created_at > '2024-01-01';

表达式索引

-- ✅ 为经常使用的表达式创建索引
CREATE INDEX idx_users_lower_email ON users(LOWER(email));
 
-- 查询时使用相同的表达式
SELECT * FROM users WHERE LOWER(email) = 'john@example.com';
 
-- ✅ 为JSON字段创建索引
CREATE INDEX idx_metadata_tags ON posts USING GIN ((metadata->>'tags'));
 
SELECT * FROM posts WHERE metadata->>'tags' @> '["react"]';

查询性能分析

EXPLAIN ANALYZE详解

-- 基本用法
EXPLAIN ANALYZE
SELECT p.*, u.name as author_name
FROM posts p
JOIN users u ON p.author_id = u.id
WHERE p.created_at > '2024-01-01'
ORDER BY p.created_at DESC
LIMIT 10;
 
/*
Limit  (cost=0.56..23.45 rows=10 width=...)
  ->  Nested Loop  (cost=0.56..45.67 rows=20 width=...)
        ->  Index Scan Backward using idx_posts_created ON posts p
             Filter: (created_at > '2024-01-01'::date)
        ->  Index Scan using users_pkey on users u
             Index Cond: (id = p.author_id)
*/
 
-- 关键指标解读:
-- cost: 执行成本(第一个数字是启动成本,第二个是总成本)
-- rows: 预计返回行数
-- width: 每行的平均字节数
-- actual time: 实际执行时间

识别慢查询

-- 启用慢查询日志
ALTER DATABASE mydb SET log_min_duration_statement = 1000; -- 1秒
 
-- 查询最慢的查询
SELECT
    query,
    calls,
    total_time,
    mean_time,
    max_time
FROM pg_stat_statements
ORDER BY mean_time DESC
LIMIT 10;
 
-- 查看表的统计信息
SELECT
    schemaname,
    tablename,
    seq_scan,  -- 顺序扫描次数
    seq_tup_read,  -- 顺序扫描读取的行数
    idx_scan,  -- 索引扫描次数
    idx_tup_fetch  -- 索引扫描获取的行数
FROM pg_stat_user_tables
ORDER BY seq_scan DESC;

优化案例

-- ❌ 慢查询:N+1问题
SELECT * FROM posts;
-- 然后在应用层为每个post查询作者
SELECT * FROM users WHERE id = ?;
 
-- ✅ 优化:使用JOIN
SELECT p.*, u.name as author_name
FROM posts p
LEFT JOIN users u ON p.author_id = u.id;
 
-- ❌ 慢查询:子查询
SELECT *
FROM posts
WHERE id IN (
    SELECT post_id FROM comments WHERE author_id = 1
);
 
-- ✅ 优化:使用EXISTS
SELECT p.*
FROM posts p
WHERE EXISTS (
    SELECT 1 FROM comments c
    WHERE c.post_id = p.id AND c.author_id = 1
);
 
-- ❌ 慢查询:使用函数导致索引失效
SELECT * FROM posts WHERE YEAR(created_at) = 2024;
 
-- ✅ 优化:改写查询条件
SELECT * FROM posts
WHERE created_at >= '2024-01-01'
  AND created_at < '2025-01-01';

事务和锁机制

事务隔离级别

// Prisma中使用事务
import { PrismaClient } from '@prisma/client'
 
const prisma = new PrismaClient()
 
// ✅ 事务示例:转账操作
async function transfer(fromUserId: string, toUserId: string, amount: number) {
  await prisma.$transaction(async (tx) => {
    // 检查余额
    const fromUser = await tx.user.findUnique({
      where: { id: fromUserId },
      select: { balance: true },
    })
 
    if (!fromUser || fromUser.balance < amount) {
      throw new Error('余额不足')
    }
 
    // 扣款
    await tx.user.update({
      where: { id: fromUserId },
      data: { balance: { decrement: amount } },
    })
 
    // 到账
    await tx.user.update({
      where: { id: toUserId },
      data: { balance: { increment: amount } },
    })
 
    // 记录转账记录
    await tx.transaction.create({
      data: {
        fromUserId,
        toUserId,
        amount,
        type: 'TRANSFER',
      },
    })
  })
}
 
// ✅ 设置事务隔离级别
await prisma.$transaction(
  async (tx) => {
    // 执行操作
  },
  {
    isolationLevel: 'Serializable', // 最高隔离级别
  }
)

悲观锁和乐观锁

-- 悲观锁:FOR UPDATE
BEGIN;
SELECT * FROM products WHERE id = 1 FOR UPDATE;
-- 其他事务会等待锁释放
UPDATE products SET stock = stock - 1 WHERE id = 1;
COMMIT;
 
-- 乐观锁:使用版本号
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255),
    stock INTEGER,
    version INTEGER DEFAULT 0
);
 
-- 更新时检查版本号
UPDATE products
SET stock = stock - 1, version = version + 1
WHERE id = 1 AND version = 10;
 
-- 如果没有更新到记录(version不匹配),说明发生了冲突

死锁避免

// ❌ 可能导致死锁
// 事务1:先锁A再锁B
// 事务2:先锁B再锁A
 
// ✅ 避免死锁:统一锁顺序
async function updateMultipleRecords() {
  await prisma.$transaction(async (tx) => {
    // 始终按照ID排序来获取锁
    const ids = [5, 3, 8, 1].sort()
 
    for (const id of ids) {
      await tx.record.update({
        where: { id },
        data: { /* ... */ },
      })
    }
  })
}

数据库连接池优化

// prisma/schema.prisma
datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL")
}
 
// .env
DATABASE_URL="postgresql://user:password@localhost:5432/mydb?connection_limit=20&pool_timeout=10"
 
// ✅ 连接池配置说明
const poolConfig = {
  connection_limit: 20,     // 最大连接数
  pool_timeout: 10,         // 获取连接超时时间(秒)
  connect_timeout: 10,      // 连接超时
  socket_timeout: 10,       // Socket超时
}
 
// ✅ 根据环境调整连接池大小
// 开发环境
const DEV_POOL_SIZE = 5
 
// 生产环境(根据公式计算)
// connections = ((core_count * 2) + effective_spindle_count)
const PROD_POOL_SIZE = 20

Prisma ORM最佳实践

高效查询

// ❌ N+1查询问题
const posts = await prisma.post.findMany()
for (const post of posts) {
  const author = await prisma.user.findUnique({
    where: { id: post.authorId },
  })
}
 
// ✅ 使用include预加载
const posts = await prisma.post.findMany({
  include: {
    author: true,
    comments: {
      include: {
        author: true,
      },
    },
  },
})
 
// ✅ 使用select只查询需要的字段
const posts = await prisma.post.findMany({
  select: {
    id: true,
    title: true,
    author: {
      select: {
        name: true,
      },
    },
  },
})
 
// ✅ 批量操作
await prisma.post.createMany({
  data: [
    { title: 'Post 1', content: '...', authorId: 1 },
    { title: 'Post 2', content: '...', authorId: 1 },
  ],
  skipDuplicates: true, // 跳过重复记录
})
 
// ✅ 批量更新
await prisma.post.updateMany({
  where: { authorId: 1 },
  data: { published: true },
})

原始SQL查询

// 复杂查询使用原始SQL
const result = await prisma.$queryRaw`
  SELECT
    u.name,
    COUNT(p.id) as post_count,
    AVG(p.view_count) as avg_views
  FROM users u
  LEFT JOIN posts p ON p.author_id = u.id
  GROUP BY u.id, u.name
  HAVING COUNT(p.id) > 10
  ORDER BY post_count DESC
  LIMIT 10
`
 
// ✅ 使用参数化查询防止SQL注入
const email = 'user@example.com'
const user = await prisma.$queryRaw`
  SELECT * FROM users WHERE email = ${email}
`

数据库备份和恢复

# 备份数据库
pg_dump -h localhost -U postgres mydb > backup.sql
 
# 备份特定表
pg_dump -h localhost -U postgres -t users -t posts mydb > backup.sql
 
# 压缩备份
pg_dump -h localhost -U postgres mydb | gzip > backup.sql.gz
 
# 恢复数据库
psql -h localhost -U postgres mydb < backup.sql
 
# 定时备份脚本
#!/bin/bash
BACKUP_DIR="/backups/postgres"
DATE=$(date +%Y%m%d_%H%M%S)
pg_dump -h localhost -U postgres mydb | gzip > "$BACKUP_DIR/backup_$DATE.sql.gz"
 
# 只保留最近7天的备份
find $BACKUP_DIR -name "backup_*.sql.gz" -mtime +7 -delete

监控和维护

-- 查看数据库大小
SELECT
    pg_database.datname,
    pg_size_pretty(pg_database_size(pg_database.datname)) AS size
FROM pg_database
ORDER BY pg_database_size(pg_database.datname) DESC;
 
-- 查看表大小
SELECT
    tablename,
    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
 
-- 清理无用数据(VACUUM)
VACUUM ANALYZE posts;
 
-- 重建索引
REINDEX TABLE posts;
 
-- 查看膨胀的表
SELECT
    schemaname,
    tablename,
    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as total_size,
    round(100 * pg_relation_size(schemaname||'.'||tablename) /
          pg_total_relation_size(schemaname||'.'||tablename)) as table_percentage
FROM pg_tables
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;

总结

PostgreSQL数据库优化是一个系统工程,需要从设计、查询、索引、事务多个层面综合考虑:

  1. 设计阶段:合理的范式设计,必要时进行反范式优化
  2. 索引策略:为常用查询创建合适的索引,避免过度索引
  3. 查询优化:使用EXPLAIN分析,避免N+1问题
  4. 事务管理:选择合适的隔离级别,避免死锁
  5. 连接池:根据负载合理配置连接池大小
  6. 持续监控:定期检查慢查询和数据库性能指标

记住:过早优化是万恶之源,先保证功能正确,再根据实际性能瓶颈进行优化。