Skip to main content
Bun.serve() 支持服务器端 WebSockets,具备实时压缩、TLS 支持以及 Bun 原生的发布-订阅 API。
⚡️ 吞吐量提升 7 倍Bun 的 WebSockets 速度非常快。以 Linux x64 上的简单聊天室为例,Bun 每秒能处理的请求数是 Node.js + "ws" 的 7 倍。
每秒发送消息数运行时客户端数
~700,000(Bun.serve) Bun v0.2.1 (x64)16
~100,000(ws) Node v18.10.0 (x64)16
Bun 内部的 WebSocket 实现基于 uWebSockets

启动 WebSocket 服务器

下面是一个使用 Bun.serve 创建的简单 WebSocket 服务器示例,所有传入请求都会在 fetch 处理函数中被升级为 WebSocket 连接。Socket 处理函数在 websocket 参数中声明。
https://mintcdn.com/ikxin/RzFFGbzo0-4huILA/icons/typescript.svg?fit=max&auto=format&n=RzFFGbzo0-4huILA&q=85&s=a3dffd2241f05776d3bd25171d0c5a79server.ts
Bun.serve({
  fetch(req, server) {
    // 将请求升级为 WebSocket
    if (server.upgrade(req)) {
      return; // 不返回 Response
    }
    return new Response("升级失败", { status: 500 });
  },
  websocket: {}, // 处理程序
});
支持以下 WebSocket 事件处理函数:
https://mintcdn.com/ikxin/RzFFGbzo0-4huILA/icons/typescript.svg?fit=max&auto=format&n=RzFFGbzo0-4huILA&q=85&s=a3dffd2241f05776d3bd25171d0c5a79server.ts
Bun.serve({
  fetch(req, server) {}, // 升级逻辑
  websocket: {
    message(ws, message) {}, // 接收到消息
    open(ws) {}, // 连接打开
    close(ws, code, message) {}, // 连接关闭
    drain(ws) {}, // 套接字准备好接收更多数据
  },
});
在 Bun 中,处理函数只需为服务器声明一次,而不是为每个套接字重复声明。ServerWebSocket 要求在 Bun.serve() 中传入一个 WebSocketHandler 对象,该对象包含 openmessageclosedrainerror 方法。这与客户端的 WebSocket 类不同,后者是继承自 EventTarget 的,使用 onmessageonopenonclose 等事件回调。客户端通常不会打开大量连接,因此事件驱动的 API 设计合理。但服务器端往往会打开 大量 连接,这意味着:
  • 为每个连接添加/移除事件监听器的时间开销累计很大;
  • 为每个连接存储回调函数的引用,消耗额外内存;
  • 开发者通常为每个连接创建新的函数,也会占用更多内存。
因此,ServerWebSocket 采用非事件驱动的 API,要求在 Bun.serve() 中传入包含事件方法的单个对象,并且复用该对象来处理所有连接。这样可以减少内存使用,同时减少添加和移除事件监听器的时间开销。
每个处理函数的第一个参数是处理该事件的 ServerWebSocket 实例。ServerWebSocket 类是基于 Bun 的快速、本地原生实现的 WebSocket,并附加了一些额外功能。
https://mintcdn.com/ikxin/RzFFGbzo0-4huILA/icons/typescript.svg?fit=max&auto=format&n=RzFFGbzo0-4huILA&q=85&s=a3dffd2241f05776d3bd25171d0c5a79server.ts
Bun.serve({
  fetch(req, server) {}, // 升级逻辑
  websocket: {
    message(ws, message) {
      ws.send(message); // 将消息原样返回
    },
  },
});

发送消息

每个 ServerWebSocket 实例都有 .send() 方法用于向客户端发送消息,支持多种输入类型。
https://mintcdn.com/ikxin/RzFFGbzo0-4huILA/icons/typescript.svg?fit=max&auto=format&n=RzFFGbzo0-4huILA&q=85&s=a3dffd2241f05776d3bd25171d0c5a79server.ts
Bun.serve({
  fetch(req, server) {}, // 升级逻辑
  websocket: {
    message(ws, message) {
      ws.send("Hello world"); // 发送字符串
      ws.send(response.arrayBuffer()); // 发送 ArrayBuffer
      ws.send(new Uint8Array([1, 2, 3])); // 发送 TypedArray 或 DataView
    },
  },
});

头信息

升级成功后,Bun 会按照规范发送 101 Switching Protocols 响应。可在调用 server.upgrade() 时附加额外 headers 到响应中。
https://mintcdn.com/ikxin/RzFFGbzo0-4huILA/icons/typescript.svg?fit=max&auto=format&n=RzFFGbzo0-4huILA&q=85&s=a3dffd2241f05776d3bd25171d0c5a79server.ts
Bun.serve({
  fetch(req, server) {
    const sessionId = await generateSessionId();
    server.upgrade(req, {
      headers: { 
        "Set-Cookie": `SessionId=${sessionId}`, 
      }, 
    });
  },
  websocket: {}, // 处理程序
});

上下文数据

可以在 .upgrade() 调用中附加上下文 data 到新的 WebSocket。这些数据可在 WebSocket 处理函数中通过 ws.data 访问。 要为 ws.data 进行强类型定义,可在 websocket 处理对象里添加 data 属性,从而为所有生命周期钩子中的 ws.data 指定类型。
https://mintcdn.com/ikxin/RzFFGbzo0-4huILA/icons/typescript.svg?fit=max&auto=format&n=RzFFGbzo0-4huILA&q=85&s=a3dffd2241f05776d3bd25171d0c5a79server.ts
type WebSocketData = {
  createdAt: number;
  channelId: string;
  authToken: string;
};

Bun.serve({
  fetch(req, server) {
    const cookies = new Bun.CookieMap(req.headers.get("cookie")!);

    server.upgrade(req, {
      // 此对象必须符合 WebSocketData 类型
      data: {
        createdAt: Date.now(),
        channelId: new URL(req.url).searchParams.get("channelId"),
        authToken: cookies.get("X-Token"),
      },
    });

    return undefined;
  },
  websocket: {
    // TypeScript: 如此指定 ws.data 类型
    data: {} as WebSocketData,
    // 收到消息时调用的处理函数
    async message(ws, message) {
      // ws.data 现已正确类型为 WebSocketData
      const user = getUserFromToken(ws.data.authToken);

      await saveMessageToDatabase({
        channel: ws.data.channelId,
        message: String(message),
        userId: user.id,
      });
    },
  },
});
注意: 在之前版本中,你可以通过类型参数 Bun.serve<MyData>({...}) 来指定 ws.data 的类型。但因 TypeScript 的限制,该用法被移除,推荐使用上面展示的 data 属性方式来定义类型。
要从浏览器连接到此服务器,可以创建一个新的 WebSocket
browser.js
const socket = new WebSocket("ws://localhost:3000/chat");

socket.addEventListener("message", event => {
  console.log(event.data);
});
用户身份识别当前页面设置的 cookies 会随 WebSocket 升级请求发送,并可在 fetch 处理函数中的 req.headers 访问。解析这些 cookies 以确定连接用户的身份,并相应设置 data 的值。

发布/订阅

Bun 的 ServerWebSocket 实现了原生的主题广播发布-订阅 API。单个套接字可以 .subscribe() 订阅指定主题(以字符串标识),并使用 .publish() 向该主题的其他订阅者广播消息(不包括自身)。此基于主题的广播 API 类似于 MQTTRedis Pub/Sub
https://mintcdn.com/ikxin/RzFFGbzo0-4huILA/icons/typescript.svg?fit=max&auto=format&n=RzFFGbzo0-4huILA&q=85&s=a3dffd2241f05776d3bd25171d0c5a79server.ts
const server = Bun.serve({
  fetch(req, server) {
    const url = new URL(req.url);
    if (url.pathname === "/chat") {
      console.log(`开始升级!`);
      const username = getUsernameFromReq(req);
      const success = server.upgrade(req, { data: { username } });
      return success ? undefined : new Response("WebSocket 升级错误", { status: 400 });
    }

    return new Response("Hello world");
  },
  websocket: {
    // TypeScript: 如此指定 ws.data 类型
    data: {} as { username: string },
    open(ws) {
      const msg = `${ws.data.username} 加入了聊天室`;
      ws.subscribe("the-group-chat");
      server.publish("the-group-chat", msg);
    },
    message(ws, message) {
      // 群聊功能
      // 服务器将收到的消息转发给所有人
      server.publish("the-group-chat", `${ws.data.username}: ${message}`);

      // 查看当前订阅
      console.log(ws.subscriptions); // ["the-group-chat"]
    },
    close(ws) {
      const msg = `${ws.data.username} 离开了聊天室`;
      ws.unsubscribe("the-group-chat");
      server.publish("the-group-chat", msg);
    },
  },
});

console.log(`监听在 ${server.hostname}:${server.port}`);
调用 .publish(data) 会将消息发送给除调用该方法的套接字以外的所有主题订阅者。要向所有订阅者发送消息(包括自身),请调用 Server 实例上的 .publish() 方法。
const server = Bun.serve({
  websocket: {
    // ...
  },
});

// 监听某个外部事件
server.publish("the-group-chat", "Hello world");

压缩

可以通过 perMessageDeflate 参数启用每条消息压缩。
https://mintcdn.com/ikxin/RzFFGbzo0-4huILA/icons/typescript.svg?fit=max&auto=format&n=RzFFGbzo0-4huILA&q=85&s=a3dffd2241f05776d3bd25171d0c5a79server.ts
Bun.serve({
  websocket: {
    perMessageDeflate: true, 
  },
});
通过在 .send() 方法的第二个参数传递布尔值,可以为单条消息开启压缩。
ws.send("Hello world", true);
要对压缩特性进行精细控制,请参考参考文档

背压管理

ServerWebSocket.send(message) 方法返回一个 number 表示操作结果。
  • -1 — 消息已入队,但存在背压(backpressure)
  • 0 — 由于连接问题,消息已丢弃
  • 1+ — 发送的字节数
这让你能够更好地控制服务器的背压状态。

超时与限制

默认情况下,如果 WebSocket 连接空闲超过 120 秒,Bun 会主动关闭连接。可通过 idleTimeout 参数自行配置。
Bun.serve({
  fetch(req, server) {}, // 升级逻辑
  websocket: {
    idleTimeout: 60, // 60 秒
  },
});
若接收到的消息体积超过 16 MB,Bun 也会关闭连接。此限制可通过 maxPayloadLength 参数配置。
Bun.serve({
  fetch(req, server) {}, // 升级逻辑
  websocket: {
    maxPayloadLength: 1024 * 1024, // 1 MB
  },
});

连接到 WebSocket 服务器

Bun 实现了 WebSocket 类。创建连接至 ws://wss:// 服务器的 WebSocket 客户端实例类似于浏览器中的用法。
const socket = new WebSocket("ws://localhost:3000");

// 支持子协议协商
const socket2 = new WebSocket("ws://localhost:3000", ["soap", "wamp"]);
在浏览器中,当前页面设置的 cookies 会随 WebSocket 升级请求一起发送,这是 WebSocket API 的标准行为。 为方便起见,Bun 允许在构造函数中直接设置自定义头部。这是 Bun 对 WebSocket 标准的扩展。该功能在浏览器中无效
const socket = new WebSocket("ws://localhost:3000", {
  headers: {
    /* 自定义头部 */
  }, 
});
给套接字添加事件监听器的方法:
// 收到消息
socket.addEventListener("message", event => {});

// 连接打开
socket.addEventListener("open", event => {});

// 连接关闭
socket.addEventListener("close", event => {});

// 错误处理
socket.addEventListener("error", event => {});

参考

See Typescript Definitions
namespace Bun {
  export function serve(params: {
    fetch: (req: Request, server: Server) => Response | Promise<Response>;
    websocket?: {
      message: (ws: ServerWebSocket, message: string | ArrayBuffer | Uint8Array) => void;
      open?: (ws: ServerWebSocket) => void;
      close?: (ws: ServerWebSocket, code: number, reason: string) => void;
      error?: (ws: ServerWebSocket, error: Error) => void;
      drain?: (ws: ServerWebSocket) => void;

      maxPayloadLength?: number; // 默认值: 16 * 1024 * 1024 = 16 MB
      idleTimeout?: number; // 默认值: 120 秒
      backpressureLimit?: number; // 默认值: 1024 * 1024 = 1 MB
      closeOnBackpressureLimit?: boolean; // 默认值: false
      sendPings?: boolean; // 默认值: true
      publishToSelf?: boolean; // 默认值: false

      perMessageDeflate?:
        | boolean
        | {
            compress?: boolean | Compressor;
            decompress?: boolean | Compressor;
          };
    };
  }): Server;
}

type Compressor =
  | `"disable"`
  | `"shared"`
  | `"dedicated"`
  | `"3KB"`
  | `"4KB"`
  | `"8KB"`
  | `"16KB"`
  | `"32KB"`
  | `"64KB"`
  | `"128KB"`
  | `"256KB"`;

interface Server {
  pendingWebSockets: number;
  publish(topic: string, data: string | ArrayBufferView | ArrayBuffer, compress?: boolean): number;
  upgrade(
    req: Request,
    options?: {
      headers?: HeadersInit;
      data?: any;
    },
  ): boolean;
}

interface ServerWebSocket {
  readonly data: any;
  readonly readyState: number;
  readonly remoteAddress: string;
  readonly subscriptions: string[];
  send(message: string | ArrayBuffer | Uint8Array, compress?: boolean): number;
  close(code?: number, reason?: string): void;
  subscribe(topic: string): void;
  unsubscribe(topic: string): void;
  publish(topic: string, message: string | ArrayBuffer | Uint8Array): void;
  isSubscribed(topic: string): boolean;
  cork(cb: (ws: ServerWebSocket) => void): void;
}