WebSocket实时聊天应用

基于Next.js和WebSocket构建的高性能实时聊天系统,支持多房间、在线状态同步和消息持久化存储

2024年2月15日
源代码
Next.jsWebSocket实时通信全栈

项目简介

这是一个功能完整的实时聊天应用,使用WebSocket实现低延迟的双向通信。项目支持多房间管理、用户在线状态实时同步、消息持久化存储,并使用Redis作为消息队列来确保高可用性。

核心功能

1. 实时通信

  • WebSocket双向通信
  • 消息实时推送
  • 输入状态指示器
  • 消息已读回执

2. 房间管理

  • 创建和加入房间
  • 房间成员列表
  • 房间消息历史
  • 私聊和群聊支持

3. 用户功能

  • 用户认证和授权
  • 在线状态实时同步
  • 用户头像和昵称
  • 消息通知

4. 消息功能

  • 文本消息发送
  • 图片上传和预览
  • 消息编辑和删除
  • 消息搜索

技术架构

前端

  • Next.js 14 (App Router)
  • TypeScript
  • TailwindCSS
  • Socket.IO Client
  • Zustand (状态管理)
  • React Query

后端

  • Next.js API Routes
  • Socket.IO Server
  • Prisma ORM
  • PostgreSQL
  • Redis (消息队列)
  • NextAuth.js (认证)

数据库设计

model User {
  id        String    @id @default(cuid())
  email     String    @unique
  name      String
  avatar    String?
  status    UserStatus @default(OFFLINE)
  createdAt DateTime  @default(now())
 
  messages  Message[]
  rooms     RoomMember[]
}
 
model Room {
  id        String   @id @default(cuid())
  name      String
  type      RoomType @default(GROUP)
  createdAt DateTime @default(now())
 
  members   RoomMember[]
  messages  Message[]
}
 
model Message {
  id        String      @id @default(cuid())
  content   String
  type      MessageType @default(TEXT)
  createdAt DateTime    @default(now())
  readBy    String[]    @default([])
 
  sender    User        @relation(fields: [senderId], references: [id])
  senderId  String
 
  room      Room        @relation(fields: [roomId], references: [id])
  roomId    String
 
  @@index([roomId, createdAt])
}
 
model RoomMember {
  id        String   @id @default(cuid())
  role      MemberRole @default(MEMBER)
  joinedAt  DateTime @default(now())
 
  user      User     @relation(fields: [userId], references: [id])
  userId    String
 
  room      Room     @relation(fields: [roomId], references: [id])
  roomId    String
 
  @@unique([userId, roomId])
}
 
enum UserStatus {
  ONLINE
  OFFLINE
  AWAY
}
 
enum RoomType {
  DIRECT
  GROUP
}
 
enum MessageType {
  TEXT
  IMAGE
  SYSTEM
}
 
enum MemberRole {
  ADMIN
  MEMBER
}

技术实现

1. WebSocket连接管理

使用Socket.IO实现可靠的WebSocket连接:

// lib/socket-server.ts
import { Server as HTTPServer } from 'http'
import { Server as SocketIOServer } from 'socket.io'
import { prisma } from '@/lib/prisma'
import { redis } from '@/lib/redis'
 
export function initializeSocketServer(httpServer: HTTPServer) {
  const io = new SocketIOServer(httpServer, {
    cors: {
      origin: process.env.NEXT_PUBLIC_APP_URL,
      methods: ['GET', 'POST'],
    },
    transports: ['websocket', 'polling'],
  })
 
  // 认证中间件
  io.use(async (socket, next) => {
    const token = socket.handshake.auth.token
    if (!token) {
      return next(new Error('Authentication error'))
    }
 
    try {
      const user = await verifyToken(token)
      socket.data.userId = user.id
      next()
    } catch (error) {
      next(new Error('Authentication error'))
    }
  })
 
  io.on('connection', async (socket) => {
    const userId = socket.data.userId
 
    // 更新用户在线状态
    await prisma.user.update({
      where: { id: userId },
      data: { status: 'ONLINE' },
    })
 
    // 订阅用户房间
    const rooms = await getUserRooms(userId)
    rooms.forEach((room) => {
      socket.join(room.id)
    })
 
    // 广播用户上线
    io.emit('user:online', { userId })
 
    // 监听消息发送
    socket.on('message:send', async (data) => {
      const message = await createMessage(data)
 
      // 广播到房间
      io.to(data.roomId).emit('message:new', message)
 
      // 存入Redis消息队列
      await redis.lpush(`room:${data.roomId}:messages`, JSON.stringify(message))
    })
 
    // 监听输入状态
    socket.on('typing:start', (data) => {
      socket.to(data.roomId).emit('typing:user', {
        userId,
        roomId: data.roomId,
      })
    })
 
    socket.on('typing:stop', (data) => {
      socket.to(data.roomId).emit('typing:stop', {
        userId,
        roomId: data.roomId,
      })
    })
 
    // 断开连接处理
    socket.on('disconnect', async () => {
      await prisma.user.update({
        where: { id: userId },
        data: { status: 'OFFLINE' },
      })
 
      io.emit('user:offline', { userId })
    })
  })
 
  return io
}

2. 客户端Socket连接

// lib/socket-client.ts
import { io, Socket } from 'socket.io-client'
import { useEffect, useState } from 'react'
 
let socket: Socket | null = null
 
export function getSocket() {
  if (!socket) {
    socket = io(process.env.NEXT_PUBLIC_SOCKET_URL!, {
      auth: {
        token: localStorage.getItem('token'),
      },
      transports: ['websocket', 'polling'],
      reconnection: true,
      reconnectionDelay: 1000,
      reconnectionAttempts: 5,
    })
  }
  return socket
}
 
// React Hook for socket events
export function useSocket() {
  const [isConnected, setIsConnected] = useState(false)
 
  useEffect(() => {
    const socket = getSocket()
 
    socket.on('connect', () => {
      setIsConnected(true)
    })
 
    socket.on('disconnect', () => {
      setIsConnected(false)
    })
 
    return () => {
      socket.off('connect')
      socket.off('disconnect')
    }
  }, [])
 
  return { socket: getSocket(), isConnected }
}

3. 消息组件实现

// components/chat/message-list.tsx
'use client'
 
import { useEffect, useRef } from 'react'
import { useSocket } from '@/lib/socket-client'
import { useMessageStore } from '@/stores/message-store'
 
export function MessageList({ roomId }: { roomId: string }) {
  const { socket } = useSocket()
  const messagesEndRef = useRef<HTMLDivElement>(null)
  const { messages, addMessage } = useMessageStore()
 
  useEffect(() => {
    // 监听新消息
    socket.on('message:new', (message) => {
      if (message.roomId === roomId) {
        addMessage(message)
        scrollToBottom()
      }
    })
 
    return () => {
      socket.off('message:new')
    }
  }, [socket, roomId])
 
  const scrollToBottom = () => {
    messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' })
  }
 
  return (
    <div className="flex-1 overflow-y-auto p-4 space-y-4">
      {messages.map((message) => (
        <div
          key={message.id}
          className={`flex ${
            message.senderId === currentUserId ? 'justify-end' : 'justify-start'
          }`}
        >
          <div className="max-w-[70%] rounded-lg p-3 bg-gray-100">
            <p className="text-sm">{message.content}</p>
            <span className="text-xs text-gray-500">
              {formatTime(message.createdAt)}
            </span>
          </div>
        </div>
      ))}
      <div ref={messagesEndRef} />
    </div>
  )
}

4. 消息输入组件

// components/chat/message-input.tsx
'use client'
 
import { useState } from 'react'
import { useSocket } from '@/lib/socket-client'
 
export function MessageInput({ roomId }: { roomId: string }) {
  const [message, setMessage] = useState('')
  const [isTyping, setIsTyping] = useState(false)
  const { socket } = useSocket()
 
  const handleChange = (e: React.ChangeEvent<HTMLTextAreaElement>) => {
    setMessage(e.target.value)
 
    if (!isTyping) {
      setIsTyping(true)
      socket.emit('typing:start', { roomId })
    }
 
    // 3秒后停止输入状态
    clearTimeout(typingTimer)
    typingTimer = setTimeout(() => {
      setIsTyping(false)
      socket.emit('typing:stop', { roomId })
    }, 3000)
  }
 
  const handleSend = async () => {
    if (!message.trim()) return
 
    socket.emit('message:send', {
      roomId,
      content: message,
      type: 'TEXT',
    })
 
    setMessage('')
    setIsTyping(false)
    socket.emit('typing:stop', { roomId })
  }
 
  return (
    <div className="border-t p-4">
      <textarea
        value={message}
        onChange={handleChange}
        onKeyDown={(e) => {
          if (e.key === 'Enter' && !e.shiftKey) {
            e.preventDefault()
            handleSend()
          }
        }}
        placeholder="输入消息..."
        className="w-full p-2 border rounded-lg resize-none"
        rows={3}
      />
      <button
        onClick={handleSend}
        disabled={!message.trim()}
        className="mt-2 px-4 py-2 bg-blue-500 text-white rounded-lg disabled:opacity-50"
      >
        发送
      </button>
    </div>
  )
}

遇到的挑战

问题1:消息顺序不一致

描述:在高并发场景下,客户端接收到的消息顺序与服务器发送顺序不一致

解决方案

  • 为每条消息添加序列号
  • 客户端按序列号排序消息
  • 使用Redis保证消息顺序
// 消息序列号管理
async function getNextMessageSeq(roomId: string): Promise<number> {
  const seq = await redis.incr(`room:${roomId}:seq`)
  return seq
}
 
// 发送消息时添加序列号
socket.on('message:send', async (data) => {
  const seq = await getNextMessageSeq(data.roomId)
  const message = await createMessage({ ...data, seq })
  io.to(data.roomId).emit('message:new', message)
})

问题2:连接状态不稳定

描述:移动端网络切换导致WebSocket频繁断开重连

解决方案

  • 实现自动重连机制
  • 使用心跳保持连接
  • 断线后自动重新加入房间
// 心跳机制
setInterval(() => {
  socket.emit('ping')
}, 25000)
 
socket.on('pong', () => {
  // 连接正常
})
 
// 自动重连后恢复状态
socket.on('connect', async () => {
  const rooms = await getUserRooms(userId)
  rooms.forEach((room) => {
    socket.join(room.id)
  })
})

问题3:内存泄漏

描述:长时间运行后服务器内存持续增长

解决方案

  • 正确清理事件监听器
  • 使用Redis存储临时数据
  • 定期清理过期连接
// 清理事件监听器
socket.on('disconnect', () => {
  socket.removeAllListeners('message:send')
  socket.removeAllListeners('typing:start')
  socket.removeAllListeners('typing:stop')
})
 
// 定期清理Redis中的过期数据
setInterval(async () => {
  const keys = await redis.keys('room:*:messages')
  for (const key of keys) {
    const length = await redis.llen(key)
    if (length > 1000) {
      await redis.ltrim(key, 0, 999)
    }
  }
}, 3600000) // 每小时执行一次

性能优化

  1. 消息分页加载

    • 实现虚拟滚动
    • 按需加载历史消息
    • 使用React Query缓存
  2. Redis缓存策略

    • 缓存用户在线状态
    • 缓存房间成员列表
    • 缓存最近消息
  3. 数据库优化

    • 添加复合索引
    • 使用连接池
    • 批量插入消息

部署

使用Docker Compose进行部署:

version: '3.8'
 
services:
  app:
    build: .
    ports:
      - "3000:3000"
    environment:
      - DATABASE_URL=postgresql://user:pass@db:5432/chatapp
      - REDIS_URL=redis://redis:6379
    depends_on:
      - db
      - redis
 
  db:
    image: postgres:15
    environment:
      - POSTGRES_DB=chatapp
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
    volumes:
      - postgres_data:/var/lib/postgresql/data
 
  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
 
volumes:
  postgres_data:
  redis_data:

后续计划

  • 语音消息支持
  • 视频通话功能
  • 消息加密
  • AI智能回复
  • 移动端原生应用

总结

这个项目让我深入理解了实时通信的技术栈和最佳实践。通过WebSocket和Redis的配合,实现了高性能、高可用的聊天系统。项目中遇到的并发、连接稳定性等问题,都通过合理的架构设计得到了解决。