Skip to content

Commit

Permalink
fix(webtransport): add proper framing
Browse files Browse the repository at this point in the history
WebTransport being a stream-based protocol, the chunking boundaries are
not necessarily preserved. That's why we need a header indicating the
type of the payload (plain text or binary) and its length.

We will use a format inspired by the WebSocket frame:

- first bit indicates whether the payload is binary
- the next 7 bits are either:
  - 125 or less: that's the length of the payload
  - 126: the next 2 bytes represent the length of the payload
  - 127: the next 8 bytes represent the length of the payload

Reference: https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#decoding_payload_length

Related:

- #687
- #688
  • Loading branch information
darrachequesne committed Aug 1, 2023
1 parent 7dd1350 commit a306db0
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 116 deletions.
38 changes: 20 additions & 18 deletions lib/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ import type { CookieSerializeOptions } from "cookie";
import type { CorsOptions, CorsOptionsDelegate } from "cors";
import type { Duplex } from "stream";
import { WebTransport } from "./transports/webtransport";
import { TextDecoder } from "util";
import { createPacketDecoderStream } from "engine.io-parser";

const debug = debugModule("engine");

const kResponseHeaders = Symbol("responseHeaders");
const TEXT_DECODER = new TextDecoder();

type Transport = "polling" | "websocket";

Expand Down Expand Up @@ -149,15 +148,13 @@ type Middleware = (
next: (err?: any) => void
) => void;

function parseSessionId(handshake: string) {
if (handshake.startsWith("0{")) {
try {
const parsed = JSON.parse(handshake.substring(1));
if (typeof parsed.sid === "string") {
return parsed.sid;
}
} catch (e) {}
}
function parseSessionId(data: string) {
try {
const parsed = JSON.parse(data);
if (typeof parsed.sid === "string") {
return parsed.sid;
}
} catch (e) {}
}

export abstract class BaseServer extends EventEmitter {
Expand Down Expand Up @@ -536,7 +533,11 @@ export abstract class BaseServer extends EventEmitter {
}

const stream = result.value;
const reader = stream.readable.getReader();
const transformStream = createPacketDecoderStream(
this.opts.maxHttpBufferSize,
"nodebuffer"
);
const reader = stream.readable.pipeThrough(transformStream).getReader();

// reading the first packet of the stream
const { value, done } = await reader.read();
Expand All @@ -546,12 +547,13 @@ export abstract class BaseServer extends EventEmitter {
}

clearTimeout(timeout);
const handshake = TEXT_DECODER.decode(value);

// handshake is either
// "0" => new session
// '0{"sid":"xxxx"}' => upgrade
if (handshake === "0") {
if (value.type !== "open") {
debug("invalid WebTransport handshake");
return session.close();
}

if (value.data === undefined) {
const transport = new WebTransport(session, stream, reader);

// note: we cannot use "this.generateId()", because there is no "req" argument
Expand All @@ -572,7 +574,7 @@ export abstract class BaseServer extends EventEmitter {
return;
}

const sid = parseSessionId(handshake);
const sid = parseSessionId(value.data);

if (!sid) {
debug("invalid WebTransport handshake");
Expand Down
74 changes: 28 additions & 46 deletions lib/transports/webtransport.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,9 @@
import { Transport } from "../transport";
import debugModule from "debug";
import { createPacketEncoderStream } from "engine.io-parser";

const debug = debugModule("engine:webtransport");

const BINARY_HEADER = Buffer.of(54);

function shouldIncludeBinaryHeader(packet, encoded) {
// 48 === "0".charCodeAt(0) (OPEN packet type)
// 54 === "6".charCodeAt(0) (NOOP packet type)
return (
packet.type === "message" &&
typeof packet.data !== "string" &&
encoded[0] >= 48 &&
encoded[0] <= 54
);
}

/**
* Reference: https://developer.mozilla.org/en-US/docs/Web/API/WebTransport_API
*/
Expand All @@ -24,24 +12,24 @@ export class WebTransport extends Transport {

constructor(private readonly session, stream, reader) {
super({ _query: { EIO: "4" } });
this.writer = stream.writable.getWriter();

const transformStream = createPacketEncoderStream();
transformStream.readable.pipeTo(stream.writable);
this.writer = transformStream.writable.getWriter();

(async () => {
let binaryFlag = false;
while (true) {
const { value, done } = await reader.read();
if (done) {
debug("session is closed");
break;
}
debug("received chunk: %o", value);
if (!binaryFlag && value.byteLength === 1 && value[0] === 54) {
binaryFlag = true;
continue;
try {
while (true) {
const { value, done } = await reader.read();
if (done) {
debug("session is closed");
break;
}
debug("received chunk: %o", value);
this.onPacket(value);
}
this.onPacket(
this.parser.decodePacketFromBinary(value, binaryFlag, "nodebuffer")
);
binaryFlag = false;
} catch (e) {
debug("error while reading: %s", e.message);
}
})();

Expand All @@ -58,26 +46,20 @@ export class WebTransport extends Transport {
return true;
}

send(packets) {
async send(packets) {
this.writable = false;

for (let i = 0; i < packets.length; i++) {
const packet = packets[i];
const isLast = i + 1 === packets.length;

this.parser.encodePacketToBinary(packet, (data) => {
if (shouldIncludeBinaryHeader(packet, data)) {
debug("writing binary header");
this.writer.write(BINARY_HEADER);
}
debug("writing chunk: %o", data);
this.writer.write(data);
if (isLast) {
this.writable = true;
this.emit("drain");
}
});
try {
for (let i = 0; i < packets.length; i++) {
const packet = packets[i];
await this.writer.write(packet);
}
} catch (e) {
debug("error while writing: %s", e.message);
}

this.writable = true;
this.emit("drain");
}

doClose(fn) {
Expand Down
33 changes: 25 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"cookie": "~0.4.1",
"cors": "~2.8.5",
"debug": "~4.3.1",
"engine.io-parser": "~5.1.0",
"engine.io-parser": "~5.2.1",
"ws": "~8.11.0"
},
"devDependencies": {
Expand Down
Loading

0 comments on commit a306db0

Please sign in to comment.