是什么
这是一个视频传输协议,实现浏览器 / 应用间的低延迟点对点(P2P)实时音视频通信,无需插件,适用于视频会议、在线聊天等互动场景。
怎么玩
a b两个客户端,可以是本机浏览器上的两个页面、也可以是不同网络下的两台机器;信令服务器 需要自己开发,两个客户端都能访问到。
通常,a,b通过websocket与信令服务器建立链接,交换彼此支持的视频信息SDP(如:编解码器、分辨率等),同时通过ICE协议,在复杂网络环境(如家用路由器后的设备)中找到双方可达的网络路径(生成 ICE 候选者)。
这里有两点需要解释一下,信令服务器更多是转发客户端信息,可能是一个客户端要发给某个客户端的信息,也可能是一个广播信息;ICE可以理解为是两个客户端互相通信的ip:port地址。
拿到ICE候选者之后,两个客户端就可以建立P2P连接了。webRTC的offer、answer消息交换完成之后,开始传输视频流。如果有多个客户端,那么每2个客户端都要建立P2P连接,房间信息也是保存在信令服务器中,同时客户端的加入/退出消息,也是由信令服务器转发给所有客户端。而客户端需要做的就是根据信令服务器传递的控制信息,增加/移除P2P连接,渲染视频流数据。
一些概念
信令(Signaling)的作用,在 P2P 连接建立前,双方需要交换两类关键信息:
- SDP:描述本地媒体能力(如支持的编解码器、分辨率等)。
- ICE 候选者:描述本地网络地址(如公网 IP、内网 IP 等),用于找到彼此的网络路径。
webRTC-js核心API,RTCPeerConnection,专门负责点对点(P2P)实时媒体流(音视频、数据)的传输管理,它是 WebRTC 实现低延迟通信的关键。其主要功能包括:
- NAT 穿透:通过 ICE(Interactive Connectivity Establishment)协议,在复杂网络环境(如家用路由器后的设备)中找到双方可达的网络路径(生成 ICE 候选者)。
- 媒体协商:通过交换 SDP(Session Description Protocol)描述,协商音视频的编解码器、传输格式等参数。
- 数据传输:最终的音视频数据通过 UDP 协议 传输(因为 UDP 延迟低,适合实时场景),同时通过 RTCP(RTP Control Protocol)监控传输质量(如丢包率、抖动),动态调整传输策略。
ICE的工作流程
- 为什么需要ICE
互联网中,大多数设备并非直接暴露在公网中,而是通过NAT(网络地址转换) 设备(如家用路由器)接入公网。NAT 会隐藏设备的私有 IP 地址,对外只展示路由器的公网 IP,这导致直接的 P2P 连接难以建立(设备不知道彼此的实际网络地址)。
ICE 的作用就是:在 NAT 环境中,通过一系列机制找到两个设备都能访问的网络路径,最终建立 P2P 连接。 - ICE 的核心组成
- STUN 服务器(Session Traversal Utilities for NAT)作用,帮助设备发现自己在公网中的 “映射地址”(即 NAT 分配的公网 IP 和端口)。工作流程:设备向 STUN 服务器发送请求,STUN 服务器返回该设备经过 NAT 转换后的公网地址(称为 “服务器自反地址”),设备可将此地址作为自己的 “候选连接地址”。
- TURN 服务器(Traversal Using Relays around NAT)作用,当 P2P 连接失败时(如严格型 NAT 阻止直接通信),作为 “中继服务器” 转发数据。工作流程:如果两个设备无法直接连接,TURN 服务器会成为中间转发节点,所有数据通过 TURN 服务器中转(此时不再是纯 P2P,而是客户端 - 服务器 - 客户端模式)。
- ICE 候选者(ICE Candidates)设备可能的网络地址,包括:本地地址(私有 IP,如 192.168.x.x);服务器自反地址(通过 STUN 获得的公网映射地址);中继地址(通过 TURN 服务器获得的转发地址)。
- ICE 的工作流程
收集候选者:设备通过 STUN 服务器获取自反地址,通过 TURN 服务器获取中继地址,再加上本地地址,形成一组 “ICE 候选者”。
交换候选者:通过信令服务器(如之前 Demo 中的 WebSocket 服务器),两个设备互相交换各自的候选者列表。
连通性检查:设备对彼此的候选者进行两两配对(如 A 的候选者 1 与 B 的候选者 1、A 的候选者 1 与 B 的候选者 2 等),通过发送 “绑定请求” 测试是否能建立连接。
选择最优路径:ICE 会优先选择延迟最低的直接连接(如本地地址或自反地址),只有当所有直接连接失败时,才会使用 TURN 中继。
其他架构方案
在 WebRTC 多人通信场景中(如 3 人以上视频会议),纯 P2P 架构会面临带宽爆炸问题:每个设备需要向其他所有设备发送自己的视频流(例如 5 人会议,每个设备需上传 4 路流、下载 4 路流),带宽消耗随人数呈指数增长。解决多人通信的 “带宽爆炸” 问题,其中 MCU 适合高人数、低客户端性能场景,SFU 适合中低人数、低延迟场景。
- MCU(Multipoint Control Unit,多点控制单元)
所有客户端将自己的音视频流发送到 MCU;MCU 对多路视频流进行 “混合”(如拼接成网格布局),对多路音频流进行 “混音”(只保留当前说话人的声音);MCU 将混合后的单路音视频流发送给每个客户端。 - SFU(Selective Forwarding Unit,选择性转发单元)
所有客户端将自己的音视频流发送到 SFU;SFU 不混合流,而是根据每个客户端的需求(如 “想看谁的视频”),将对应的原始流转发给该客户端;客户端接收多路流后,在本地进行布局渲染(如网格排列)。
优化
- 网络适应性优化(最关键)
动态调整码率(Adaptive Bitrate),优化分辨率和帧率,使用更高效的编码格式 - 网络连接优化
优化 ICE 穿透,减少网络抖动和丢包 - 架构优化(多人场景)
使用 SFU 替代纯 P2P,区域部署服务器 - 设备和性能优化
启用硬件编解码,减少不必要的视频处理 - 其他细节优化
优先保障音频流畅,快速重连机制,监控和告警
关键示例代码
信令服务器,维护房间状态,转发消息
{// 通知房间内已有用户新用户加入for id, existingClient := range room.clients {err := existingClient.conn.WriteJSON(map[string]interface{}{"type": "user-joined","userID": userID,"username": username,"from": id, // 添加发送者ID})if err != nil {log.Printf("通知用户 %s 新用户加入失败: %v", id, err)}}room.clients[userID] = client// 向新用户发送房间内已有用户列表var existingUsers []map[string]stringfor id, c := range room.clients {if id != userID { // 不包含自己existingUsers = append(existingUsers, map[string]string{"userID": id,"username": c.username,})}}// 确保即使没有用户也发送空数组而不是nilerr = client.conn.WriteJSON(map[string]interface{}{"type": "existing-users","users": existingUsers,"yourUserID": userID,"yourUsername": username,})if err != nil {log.Printf("向新用户 %s 发送用户列表失败: %v", userID, err)}room.mu.Unlock()log.Printf("用户 %s(%s) 加入房间 %s,当前房间人数: %d", username, userID, roomID, len(room.clients))// 处理消息for {var msg map[string]interface{}err := conn.ReadJSON(&msg)if err != nil {log.Printf("用户 %s 读取消息错误: %v", userID, err)break}// 添加消息发送者信息msg["from"] = userID// 将消息转发给目标用户或所有用户room.mu.Lock()if target, ok := msg["target"].(string); ok && target != "" {// 发送给指定用户if c, exists := room.clients[target]; exists {if err := c.conn.WriteJSON(msg); err != nil {log.Printf("向用户 %s 发送消息错误: %v", target, err)}}} else {// 广播给所有其他用户for id, c := range room.clients {if id != userID { // 不发送给自己if err := c.conn.WriteJSON(msg); err != nil {log.Printf("向用户 %s 发送消息错误: %v", id, err)}}}}room.mu.Unlock()}// 客户端断开连接后从房间移除room.mu.Lock()delete(room.clients, userID)// 通知其他用户该用户离开for id, c := range room.clients {c.conn.WriteJSON(map[string]interface{}{"type": "user-left","userID": userID,"from": id,})}room.mu.Unlock()log.Printf("用户 %s 离开房间 %s,当前房间人数: %d", userID, roomID, len(room.clients))// 如果房间为空,删除房间room.mu.Lock()if len(room.clients) == 0 {roomsMu.Lock()delete(rooms, roomID)roomsMu.Unlock()log.Printf("房间 %s 已空,已删除", roomID)}room.mu.Unlock()
}
客户端监听信息做不同处理
ws.onmessage = handleSignalingMessage;// 处理信令消息
async function handleSignalingMessage(event) {try {const message = JSON.parse(event.data);console.log('收到信令消息:', message);// 验证消息类型if (!message.type) {console.error('收到无效消息,缺少type字段:', message);return;}switch (message.type) {case 'existing-users':// 处理已存在的用户列表,添加空数组检查myUserID = message.yourUserID;myUsername = message.yourUsername;currentRoomElement.textContent = currentRoom;roomInfo.classList.remove('hidden');statusElement.innerHTML = `<i class="fa fa-check-circle text-green-500"></i> 已加入房间: ${currentRoom}`;// 为每个已存在的用户创建连接,确保users是数组const existingUsers = message.users || [];if (Array.isArray(existingUsers)) {existingUsers.forEach(user => {if (user && user.userID && user.username) {addUser(user.userID, user.username);createPeerConnection(user.userID, true); // 作为发起方创建连接}});} else {console.warn('existing-users消息中的users不是数组:', message.users);}updateUserList();break;case 'user-joined':// 新用户加入,验证必要字段if (!message.userID || !message.username) {console.error('收到无效的user-joined消息:', message);return;}addUser(message.userID, message.username);createPeerConnection(message.userID, false); // 作为接收方创建连接updateUserList();showNotification(`${message.username} 加入了房间`);break;case 'user-left':// 用户离开,验证必要字段if (!message.userID) {console.error('收到无效的user-left消息:', message);return;}const username = users.get(message.userID) || '未知用户';removeUser(message.userID);updateUserList();showNotification(`${username} 离开了房间`);break;case 'offer':// 收到offer,验证必要字段if (!message.from || !message.sdp) {console.error('收到无效的offer消息:', message);return;}await handleOffer(message);break;case 'answer':// 收到answer,验证必要字段if (!message.from || !message.sdp) {console.error('收到无效的answer消息:', message);return;}await handleAnswer(message);break;case 'candidate':// 收到ICE候选者,验证必要字段if (!message.from || !message.candidate) {console.error('收到无效的candidate消息:', message);return;}await handleCandidate(message);break;case 'media-type-changed':// 处理媒体类型变更(摄像头/屏幕共享切换)if (!message.from || !message.mediaType) {console.error('收到无效的media-type-changed消息:', message);return;}const userId = message.from;const mediaType = message.mediaType;remoteMediaTypes.set(userId, mediaType);// 更新UI显示const videoContainer = document.getElementById(`video-${userId}`);if (videoContainer) {const statusElement = videoContainer.querySelector('.video-status');if (mediaType === 'screen') {if (!statusElement) {const newStatus = document.createElement('div');newStatus.className = 'video-status';newStatus.textContent = '屏幕共享';videoContainer.appendChild(newStatus);} else {statusElement.textContent = '屏幕共享';}} else {if (statusElement) {statusElement.remove();}}}break;default:console.log('收到未知类型的消息:', message.type);}} catch (error) {console.error('处理信令消息错误:', error);showNotification(`发生错误: ${error.message}`, 'error');}
}
客户端建立P2P连接
// 创建对等连接
async function createPeerConnection(peerID, initiator = true) {// 如果已存在连接,则不重复创建if (peerConnections.has(peerID)) {return peerConnections.get(peerID);}console.log(`创建与 ${peerID} 的连接,initiator: ${initiator}`);// 创建新的RTCPeerConnectionconst pc = new RTCPeerConnection(configuration);peerConnections.set(peerID, pc);// 添加本地媒体流到连接if (localStream) {localStream.getTracks().forEach(track => {try {pc.addTrack(track, localStream);console.log(`已添加轨道到与 ${peerID} 的连接`);} catch (error) {console.error(`添加轨道到与 ${peerID} 的连接失败:`, error);}});}// 监听远程流pc.ontrack = (event) => {console.log('收到来自', peerID, '的远程流,轨道数量:', event.streams[0].getTracks().length);if (!remoteStreams.has(peerID)) {remoteStreams.set(peerID, event.streams[0]);const username = users.get(peerID) || '未知用户';const mediaType = remoteMediaTypes.get(peerID) || 'camera';createVideoElement(peerID, username, false, event.streams[0], mediaType);}};// 监听连接状态变化pc.onconnectionstatechange = () => {console.log(`与 ${peerID} 的连接状态:`, pc.connectionState);const username = users.get(peerID) || '未知用户';switch (pc.connectionState) {case 'connected':showNotification(`已与 ${username} 建立连接`);break;case 'disconnected':showNotification(`与 ${username} 的连接已断开`);break;case 'failed':showNotification(`与 ${username} 的连接失败,正在尝试重连...`);// 连接失败时尝试重新连接setTimeout(() => {if (peerConnections.has(peerID)) {peerConnections.delete(peerID);createPeerConnection(peerID, initiator);}}, 3000);break;case 'closed':console.log(`与 ${peerID} 的连接已关闭`);break;}};// 发送ICE候选者pc.onicecandidate = (event) => {if (event.candidate && ws && ws.readyState === WebSocket.OPEN) {try {ws.send(JSON.stringify({type: 'candidate',target: peerID,candidate: event.candidate}));console.log('已向', peerID, '发送ICE候选者');} catch (error) {console.error('发送ICE候选者失败:', error);}}};// ICE连接状态变化pc.oniceconnectionstatechange = () => {console.log(`与 ${peerID} 的ICE连接状态:`, pc.iceConnectionState);};// 如果是发起者,创建并发送offerif (initiator) {try {const offer = await pc.createOffer();await pc.setLocalDescription(offer);ws.send(JSON.stringify({type: 'offer',target: peerID,sdp: offer}));console.log('已向', peerID, '发送offer');} catch (error) {console.error('创建offer失败:', error);showNotification(`创建连接失败: ${error.message}`, 'error');}}return pc;
}