Skip to content

Commit

Permalink
fix(core-p2p): block incomplete sockets (#3596)
Browse files Browse the repository at this point in the history
  • Loading branch information
air1one authored Mar 12, 2020
1 parent 9b6f101 commit f1c35f3
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 41 deletions.
77 changes: 59 additions & 18 deletions __tests__/integration/core-p2p/socket-server/peer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,23 @@ describe("Peer socket endpoint", () => {
await delay(2000); // give time to workers to respawn
});

it("should disconnect the client if it sends multiple handshakes", async () => {
connect(); // this automatically sends the first handshake
await delay(1000);

expect(socket.state).toBe("open");

// this is the second handshake
send('{"event": "#handshake", "data": {}, "cid": 1}');
await delay(500);

expect(socket.state).toBe("closed");

// kill workers to reset ipLastError (or we won't pass handshake for 1 minute)
server.killWorkers({ immediate: true });
await delay(2000); // give time to workers to respawn
});

it("should accept the request when below rate limit", async () => {
connect();
await delay(1000);
Expand Down Expand Up @@ -496,8 +513,6 @@ describe("Peer socket endpoint", () => {
connect();
await delay(1000);

expect(socket.state).toBe("open");

send('{"event":"#disconnect","data":{"code":4000}}');
await expect(
emit("p2p.peer.getStatus", {
Expand Down Expand Up @@ -540,30 +555,56 @@ describe("Peer socket endpoint", () => {
await delay(2000); // give time to workers to respawn
});

it("should close the connection when the HTTP url is not valid", async () => {
it("should close the connection when the HTTP url is not valid", async done => {
const socket = new net.Socket();
socket.connect(4007, "127.0.0.1", function() {
socket.connect(4007, "127.0.0.1", async () => {
socket.write("GET /invalid/ HTTP/1.0\r\n\r\n");
});
await delay(500);
expect(socket.destroyed).toBe(true);
await delay(500);
expect(socket.destroyed).toBe(true);

socket.connect(4007, "127.0.0.1");
await delay(500);
expect(socket.destroyed).toBe(true);
socket.connect(4007, "127.0.0.1");
await delay(500);
expect(socket.destroyed).toBe(true);

// kill workers to reset ipLastError (or we won't pass handshake for 1 minute)
server.killWorkers({ immediate: true });
await delay(2000); // give time to workers to respawn
// kill workers to reset ipLastError (or we won't pass handshake for 1 minute)
server.killWorkers({ immediate: true });
await delay(2000); // give time to workers to respawn
done();
});
});

it("should close the connection if the initial HTTP request is not processed within 2 seconds", async () => {
it("should close the connection if the initial HTTP request is not processed within 2 seconds", async done => {
const socket = new net.Socket();
socket.connect(4007, "127.0.0.1");
await delay(500);
expect(socket.destroyed).toBe(false);
await delay(2000);
expect(socket.destroyed).toBe(true);
socket.connect(4007, "127.0.0.1", async () => {
await delay(500);
expect(socket.destroyed).toBe(false);
await delay(2000);
expect(socket.destroyed).toBe(true);
server.killWorkers({ immediate: true });
await delay(2000); // give time to workers to respawn
done();
});
});

it("should close the connection if is is not fully established from start to finish within 4 seconds", async done => {
const socket = new net.Socket();
await delay(2000);
socket.connect(4007, "127.0.0.1", async () => {
expect(socket.destroyed).toBe(false);
// @ts-ignore
socket.write(`GET /${server.options.path}/ HTTP/1.0\r\n`);
socket.write("Host: 127.0.0.1");
await delay(1500);
expect(socket.destroyed).toBe(false);
socket.write("Host: 127.0.0.1");
await delay(1500);
expect(socket.destroyed).toBe(false);
socket.write("Host: 127.0.0.1");
await delay(1500);
expect(socket.destroyed).toBe(true);
done();
});
});
});
});
51 changes: 28 additions & 23 deletions packages/core-p2p/src/socket-server/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { requestSchemas } from "../schemas";
import { codec } from "../utils/sc-codec";
import { validateTransactionLight } from "./utils/validate";

const SOCKET_TIMEOUT = 2000;
const MINUTE_IN_MILLISECONDS = 1000 * 60;
const HOUR_IN_MILLISECONDS = MINUTE_IN_MILLISECONDS * 60;

Expand Down Expand Up @@ -41,7 +42,7 @@ export class Worker extends SCWorker {
await this.loadHandlers();

// @ts-ignore
this.scServer.wsServer._server.timeout = 2000;
this.scServer.wsServer._server.timeout = SOCKET_TIMEOUT;

// @ts-ignore
this.scServer.wsServer.on("connection", (ws, req) => {
Expand All @@ -59,8 +60,7 @@ export class Worker extends SCWorker {
this.httpServer.on("request", req => {
// @ts-ignore
if (req.method !== "GET" || req.url !== this.scServer.wsServer.options.path) {
this.setErrorForIpAndTerminate(req);
req.destroy();
this.setErrorForIpAndDestroy(req.socket);
}
});
// @ts-ignore
Expand Down Expand Up @@ -99,43 +99,44 @@ export class Worker extends SCWorker {
ws.removeAllListeners("ping");
ws.removeAllListeners("pong");
ws.prependListener("ping", () => {
this.setErrorForIpAndTerminate(req, ws);
this.setErrorForIpAndDestroy(req.socket);
});
ws.prependListener("pong", () => {
this.setErrorForIpAndTerminate(req, ws);
this.setErrorForIpAndDestroy(req.socket);
});

ws.prependListener("error", error => {
if (error instanceof RangeError) {
this.setErrorForIpAndTerminate(req, ws);
this.setErrorForIpAndDestroy(req.socket);
}
});

const messageListeners = ws.listeners("message");
ws.removeAllListeners("message");
ws.prependListener("message", message => {
if (ws._disconnected) {
return this.setErrorForIpAndTerminate(req, ws);
if (req.socket._disconnected) {
return this.setErrorForIpAndDestroy(req.socket);
} else if (message === "#2") {
const timeNow: number = new Date().getTime() / 1000;
if (ws._lastPingTime && timeNow - ws._lastPingTime < 1) {
return this.setErrorForIpAndTerminate(req, ws);
if (req.socket._lastPingTime && timeNow - req.socket._lastPingTime < 1) {
return this.setErrorForIpAndDestroy(req.socket);
}
ws._lastPingTime = timeNow;
req.socket._lastPingTime = timeNow;
} else if (message.length < 10) {
// except for #2 message, we should have JSON with some required properties
// (see below) which implies that message length should be longer than 10 chars
return this.setErrorForIpAndTerminate(req, ws);
return this.setErrorForIpAndDestroy(req.socket);
} else {
try {
const parsed = JSON.parse(message);
if (parsed.event === "#disconnect") {
ws._disconnected = true;
req.socket._disconnected = true;
} else if (parsed.event === "#handshake") {
if (ws._handshake) {
return this.setErrorForIpAndTerminate(req, ws);
if (req.socket._handshake) {
return this.setErrorForIpAndDestroy(req.socket);
}
ws._handshake = true;
req.socket._handshake = true;
clearTimeout(req.socket._connectionTimer);
} else if (
typeof parsed.event !== "string" ||
typeof parsed.data !== "object" ||
Expand All @@ -144,10 +145,10 @@ export class Worker extends SCWorker {
(parsed.event === "#disconnect" && typeof parsed.cid !== "undefined")) ||
!this.handlers.includes(parsed.event)
) {
return this.setErrorForIpAndTerminate(req, ws);
return this.setErrorForIpAndDestroy(req.socket);
}
} catch (error) {
return this.setErrorForIpAndTerminate(req, ws);
return this.setErrorForIpAndDestroy(req.socket);
}
}

Expand Down Expand Up @@ -219,11 +220,9 @@ export class Worker extends SCWorker {
return false;
}

private setErrorForIpAndTerminate(req, ws?): void {
this.ipLastError[req.socket.remoteAddress] = Date.now();
if (ws) {
ws.terminate();
}
private setErrorForIpAndDestroy(socket): void {
this.ipLastError[socket.remoteAddress] = Date.now();
socket.destroy();
}

private async handleConnection(socket): Promise<void> {
Expand All @@ -246,6 +245,12 @@ export class Worker extends SCWorker {
return;
}

socket._connectionTimer = setTimeout(() => {
if (!socket._handshake) {
this.setErrorForIpAndDestroy(socket);
}
}, SOCKET_TIMEOUT * 2);

const { data }: { data: { blocked: boolean } } = await this.sendToMasterAsync(
"p2p.internal.isBlockedByRateLimit",
{
Expand Down

0 comments on commit f1c35f3

Please sign in to comment.