WebSocket实时聊天应用
基于Next.js和WebSocket构建的高性能实时聊天系统,支持多房间、在线状态同步和消息持久化存储
2024年2月15日
源代码Next.jsWebSocket实时通信全栈
项目简介
这是一个功能完整的实时聊天应用,使用WebSocket实现低延迟的双向通信。项目支持多房间管理、用户在线状态实时同步、消息持久化存储,并使用Redis作为消息队列来确保高可用性。
核心功能
1. 实时通信
- WebSocket双向通信
- 消息实时推送
- 输入状态指示器
- 消息已读回执
2. 房间管理
- 创建和加入房间
- 房间成员列表
- 房间消息历史
- 私聊和群聊支持
3. 用户功能
- 用户认证和授权
- 在线状态实时同步
- 用户头像和昵称
- 消息通知
4. 消息功能
- 文本消息发送
- 图片上传和预览
- 消息编辑和删除
- 消息搜索
技术架构
前端
- Next.js 14 (App Router)
- TypeScript
- TailwindCSS
- Socket.IO Client
- Zustand (状态管理)
- React Query
后端
- Next.js API Routes
- Socket.IO Server
- Prisma ORM
- PostgreSQL
- Redis (消息队列)
- NextAuth.js (认证)
数据库设计
model User {
id String @id @default(cuid())
email String @unique
name String
avatar String?
status UserStatus @default(OFFLINE)
createdAt DateTime @default(now())
messages Message[]
rooms RoomMember[]
}
model Room {
id String @id @default(cuid())
name String
type RoomType @default(GROUP)
createdAt DateTime @default(now())
members RoomMember[]
messages Message[]
}
model Message {
id String @id @default(cuid())
content String
type MessageType @default(TEXT)
createdAt DateTime @default(now())
readBy String[] @default([])
sender User @relation(fields: [senderId], references: [id])
senderId String
room Room @relation(fields: [roomId], references: [id])
roomId String
@@index([roomId, createdAt])
}
model RoomMember {
id String @id @default(cuid())
role MemberRole @default(MEMBER)
joinedAt DateTime @default(now())
user User @relation(fields: [userId], references: [id])
userId String
room Room @relation(fields: [roomId], references: [id])
roomId String
@@unique([userId, roomId])
}
enum UserStatus {
ONLINE
OFFLINE
AWAY
}
enum RoomType {
DIRECT
GROUP
}
enum MessageType {
TEXT
IMAGE
SYSTEM
}
enum MemberRole {
ADMIN
MEMBER
}技术实现
1. WebSocket连接管理
使用Socket.IO实现可靠的WebSocket连接:
// lib/socket-server.ts
import { Server as HTTPServer } from 'http'
import { Server as SocketIOServer } from 'socket.io'
import { prisma } from '@/lib/prisma'
import { redis } from '@/lib/redis'
export function initializeSocketServer(httpServer: HTTPServer) {
const io = new SocketIOServer(httpServer, {
cors: {
origin: process.env.NEXT_PUBLIC_APP_URL,
methods: ['GET', 'POST'],
},
transports: ['websocket', 'polling'],
})
// 认证中间件
io.use(async (socket, next) => {
const token = socket.handshake.auth.token
if (!token) {
return next(new Error('Authentication error'))
}
try {
const user = await verifyToken(token)
socket.data.userId = user.id
next()
} catch (error) {
next(new Error('Authentication error'))
}
})
io.on('connection', async (socket) => {
const userId = socket.data.userId
// 更新用户在线状态
await prisma.user.update({
where: { id: userId },
data: { status: 'ONLINE' },
})
// 订阅用户房间
const rooms = await getUserRooms(userId)
rooms.forEach((room) => {
socket.join(room.id)
})
// 广播用户上线
io.emit('user:online', { userId })
// 监听消息发送
socket.on('message:send', async (data) => {
const message = await createMessage(data)
// 广播到房间
io.to(data.roomId).emit('message:new', message)
// 存入Redis消息队列
await redis.lpush(`room:${data.roomId}:messages`, JSON.stringify(message))
})
// 监听输入状态
socket.on('typing:start', (data) => {
socket.to(data.roomId).emit('typing:user', {
userId,
roomId: data.roomId,
})
})
socket.on('typing:stop', (data) => {
socket.to(data.roomId).emit('typing:stop', {
userId,
roomId: data.roomId,
})
})
// 断开连接处理
socket.on('disconnect', async () => {
await prisma.user.update({
where: { id: userId },
data: { status: 'OFFLINE' },
})
io.emit('user:offline', { userId })
})
})
return io
}2. 客户端Socket连接
// lib/socket-client.ts
import { io, Socket } from 'socket.io-client'
import { useEffect, useState } from 'react'
let socket: Socket | null = null
export function getSocket() {
if (!socket) {
socket = io(process.env.NEXT_PUBLIC_SOCKET_URL!, {
auth: {
token: localStorage.getItem('token'),
},
transports: ['websocket', 'polling'],
reconnection: true,
reconnectionDelay: 1000,
reconnectionAttempts: 5,
})
}
return socket
}
// React Hook for socket events
export function useSocket() {
const [isConnected, setIsConnected] = useState(false)
useEffect(() => {
const socket = getSocket()
socket.on('connect', () => {
setIsConnected(true)
})
socket.on('disconnect', () => {
setIsConnected(false)
})
return () => {
socket.off('connect')
socket.off('disconnect')
}
}, [])
return { socket: getSocket(), isConnected }
}3. 消息组件实现
// components/chat/message-list.tsx
'use client'
import { useEffect, useRef } from 'react'
import { useSocket } from '@/lib/socket-client'
import { useMessageStore } from '@/stores/message-store'
export function MessageList({ roomId }: { roomId: string }) {
const { socket } = useSocket()
const messagesEndRef = useRef<HTMLDivElement>(null)
const { messages, addMessage } = useMessageStore()
useEffect(() => {
// 监听新消息
socket.on('message:new', (message) => {
if (message.roomId === roomId) {
addMessage(message)
scrollToBottom()
}
})
return () => {
socket.off('message:new')
}
}, [socket, roomId])
const scrollToBottom = () => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' })
}
return (
<div className="flex-1 overflow-y-auto p-4 space-y-4">
{messages.map((message) => (
<div
key={message.id}
className={`flex ${
message.senderId === currentUserId ? 'justify-end' : 'justify-start'
}`}
>
<div className="max-w-[70%] rounded-lg p-3 bg-gray-100">
<p className="text-sm">{message.content}</p>
<span className="text-xs text-gray-500">
{formatTime(message.createdAt)}
</span>
</div>
</div>
))}
<div ref={messagesEndRef} />
</div>
)
}4. 消息输入组件
// components/chat/message-input.tsx
'use client'
import { useState } from 'react'
import { useSocket } from '@/lib/socket-client'
export function MessageInput({ roomId }: { roomId: string }) {
const [message, setMessage] = useState('')
const [isTyping, setIsTyping] = useState(false)
const { socket } = useSocket()
const handleChange = (e: React.ChangeEvent<HTMLTextAreaElement>) => {
setMessage(e.target.value)
if (!isTyping) {
setIsTyping(true)
socket.emit('typing:start', { roomId })
}
// 3秒后停止输入状态
clearTimeout(typingTimer)
typingTimer = setTimeout(() => {
setIsTyping(false)
socket.emit('typing:stop', { roomId })
}, 3000)
}
const handleSend = async () => {
if (!message.trim()) return
socket.emit('message:send', {
roomId,
content: message,
type: 'TEXT',
})
setMessage('')
setIsTyping(false)
socket.emit('typing:stop', { roomId })
}
return (
<div className="border-t p-4">
<textarea
value={message}
onChange={handleChange}
onKeyDown={(e) => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault()
handleSend()
}
}}
placeholder="输入消息..."
className="w-full p-2 border rounded-lg resize-none"
rows={3}
/>
<button
onClick={handleSend}
disabled={!message.trim()}
className="mt-2 px-4 py-2 bg-blue-500 text-white rounded-lg disabled:opacity-50"
>
发送
</button>
</div>
)
}遇到的挑战
问题1:消息顺序不一致
描述:在高并发场景下,客户端接收到的消息顺序与服务器发送顺序不一致
解决方案:
- 为每条消息添加序列号
- 客户端按序列号排序消息
- 使用Redis保证消息顺序
// 消息序列号管理
async function getNextMessageSeq(roomId: string): Promise<number> {
const seq = await redis.incr(`room:${roomId}:seq`)
return seq
}
// 发送消息时添加序列号
socket.on('message:send', async (data) => {
const seq = await getNextMessageSeq(data.roomId)
const message = await createMessage({ ...data, seq })
io.to(data.roomId).emit('message:new', message)
})问题2:连接状态不稳定
描述:移动端网络切换导致WebSocket频繁断开重连
解决方案:
- 实现自动重连机制
- 使用心跳保持连接
- 断线后自动重新加入房间
// 心跳机制
setInterval(() => {
socket.emit('ping')
}, 25000)
socket.on('pong', () => {
// 连接正常
})
// 自动重连后恢复状态
socket.on('connect', async () => {
const rooms = await getUserRooms(userId)
rooms.forEach((room) => {
socket.join(room.id)
})
})问题3:内存泄漏
描述:长时间运行后服务器内存持续增长
解决方案:
- 正确清理事件监听器
- 使用Redis存储临时数据
- 定期清理过期连接
// 清理事件监听器
socket.on('disconnect', () => {
socket.removeAllListeners('message:send')
socket.removeAllListeners('typing:start')
socket.removeAllListeners('typing:stop')
})
// 定期清理Redis中的过期数据
setInterval(async () => {
const keys = await redis.keys('room:*:messages')
for (const key of keys) {
const length = await redis.llen(key)
if (length > 1000) {
await redis.ltrim(key, 0, 999)
}
}
}, 3600000) // 每小时执行一次性能优化
-
消息分页加载
- 实现虚拟滚动
- 按需加载历史消息
- 使用React Query缓存
-
Redis缓存策略
- 缓存用户在线状态
- 缓存房间成员列表
- 缓存最近消息
-
数据库优化
- 添加复合索引
- 使用连接池
- 批量插入消息
部署
使用Docker Compose进行部署:
version: '3.8'
services:
app:
build: .
ports:
- "3000:3000"
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/chatapp
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
db:
image: postgres:15
environment:
- POSTGRES_DB=chatapp
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
volumes:
postgres_data:
redis_data:后续计划
- 语音消息支持
- 视频通话功能
- 消息加密
- AI智能回复
- 移动端原生应用
总结
这个项目让我深入理解了实时通信的技术栈和最佳实践。通过WebSocket和Redis的配合,实现了高性能、高可用的聊天系统。项目中遇到的并发、连接稳定性等问题,都通过合理的架构设计得到了解决。