From f1c35f3a9c8bc277b13da400d0e4984d626d3a65 Mon Sep 17 00:00:00 2001 From: air1one <36802613+air1one@users.noreply.github.com> Date: Thu, 12 Mar 2020 14:13:55 +0400 Subject: [PATCH] fix(core-p2p): block incomplete sockets (#3596) --- .../core-p2p/socket-server/peer.test.ts | 77 ++++++++++++++----- packages/core-p2p/src/socket-server/worker.ts | 51 ++++++------ 2 files changed, 87 insertions(+), 41 deletions(-) diff --git a/__tests__/integration/core-p2p/socket-server/peer.test.ts b/__tests__/integration/core-p2p/socket-server/peer.test.ts index 9c4334d0ee..e41ba3c7da 100644 --- a/__tests__/integration/core-p2p/socket-server/peer.test.ts +++ b/__tests__/integration/core-p2p/socket-server/peer.test.ts @@ -326,6 +326,23 @@ describe("Peer socket endpoint", () => { await delay(2000); // give time to workers to respawn }); + it("should disconnect the client if it sends multiple handshakes", async () => { + connect(); // this automatically sends the first handshake + await delay(1000); + + expect(socket.state).toBe("open"); + + // this is the second handshake + send('{"event": "#handshake", "data": {}, "cid": 1}'); + await delay(500); + + expect(socket.state).toBe("closed"); + + // kill workers to reset ipLastError (or we won't pass handshake for 1 minute) + server.killWorkers({ immediate: true }); + await delay(2000); // give time to workers to respawn + }); + it("should accept the request when below rate limit", async () => { connect(); await delay(1000); @@ -496,8 +513,6 @@ describe("Peer socket endpoint", () => { connect(); await delay(1000); - expect(socket.state).toBe("open"); - send('{"event":"#disconnect","data":{"code":4000}}'); await expect( emit("p2p.peer.getStatus", { @@ -540,30 +555,56 @@ describe("Peer socket endpoint", () => { await delay(2000); // give time to workers to respawn }); - it("should close the connection when the HTTP url is not valid", async () => { + it("should close the connection when the HTTP url is not valid", async done => { const socket = new net.Socket(); - socket.connect(4007, "127.0.0.1", function() { + socket.connect(4007, "127.0.0.1", async () => { socket.write("GET /invalid/ HTTP/1.0\r\n\r\n"); - }); - await delay(500); - expect(socket.destroyed).toBe(true); + await delay(500); + expect(socket.destroyed).toBe(true); - socket.connect(4007, "127.0.0.1"); - await delay(500); - expect(socket.destroyed).toBe(true); + socket.connect(4007, "127.0.0.1"); + await delay(500); + expect(socket.destroyed).toBe(true); - // kill workers to reset ipLastError (or we won't pass handshake for 1 minute) - server.killWorkers({ immediate: true }); - await delay(2000); // give time to workers to respawn + // kill workers to reset ipLastError (or we won't pass handshake for 1 minute) + server.killWorkers({ immediate: true }); + await delay(2000); // give time to workers to respawn + done(); + }); }); - it("should close the connection if the initial HTTP request is not processed within 2 seconds", async () => { + it("should close the connection if the initial HTTP request is not processed within 2 seconds", async done => { const socket = new net.Socket(); - socket.connect(4007, "127.0.0.1"); - await delay(500); - expect(socket.destroyed).toBe(false); await delay(2000); - expect(socket.destroyed).toBe(true); + socket.connect(4007, "127.0.0.1", async () => { + await delay(500); + expect(socket.destroyed).toBe(false); + await delay(2000); + expect(socket.destroyed).toBe(true); + server.killWorkers({ immediate: true }); + await delay(2000); // give time to workers to respawn + done(); + }); + }); + + it("should close the connection if is is not fully established from start to finish within 4 seconds", async done => { + const socket = new net.Socket(); + await delay(2000); + socket.connect(4007, "127.0.0.1", async () => { + expect(socket.destroyed).toBe(false); + // @ts-ignore + socket.write(`GET /${server.options.path}/ HTTP/1.0\r\n`); + socket.write("Host: 127.0.0.1"); + await delay(1500); + expect(socket.destroyed).toBe(false); + socket.write("Host: 127.0.0.1"); + await delay(1500); + expect(socket.destroyed).toBe(false); + socket.write("Host: 127.0.0.1"); + await delay(1500); + expect(socket.destroyed).toBe(true); + done(); + }); }); }); }); diff --git a/packages/core-p2p/src/socket-server/worker.ts b/packages/core-p2p/src/socket-server/worker.ts index 2414c39887..84b97c6bf4 100644 --- a/packages/core-p2p/src/socket-server/worker.ts +++ b/packages/core-p2p/src/socket-server/worker.ts @@ -9,6 +9,7 @@ import { requestSchemas } from "../schemas"; import { codec } from "../utils/sc-codec"; import { validateTransactionLight } from "./utils/validate"; +const SOCKET_TIMEOUT = 2000; const MINUTE_IN_MILLISECONDS = 1000 * 60; const HOUR_IN_MILLISECONDS = MINUTE_IN_MILLISECONDS * 60; @@ -41,7 +42,7 @@ export class Worker extends SCWorker { await this.loadHandlers(); // @ts-ignore - this.scServer.wsServer._server.timeout = 2000; + this.scServer.wsServer._server.timeout = SOCKET_TIMEOUT; // @ts-ignore this.scServer.wsServer.on("connection", (ws, req) => { @@ -59,8 +60,7 @@ export class Worker extends SCWorker { this.httpServer.on("request", req => { // @ts-ignore if (req.method !== "GET" || req.url !== this.scServer.wsServer.options.path) { - this.setErrorForIpAndTerminate(req); - req.destroy(); + this.setErrorForIpAndDestroy(req.socket); } }); // @ts-ignore @@ -99,43 +99,44 @@ export class Worker extends SCWorker { ws.removeAllListeners("ping"); ws.removeAllListeners("pong"); ws.prependListener("ping", () => { - this.setErrorForIpAndTerminate(req, ws); + this.setErrorForIpAndDestroy(req.socket); }); ws.prependListener("pong", () => { - this.setErrorForIpAndTerminate(req, ws); + this.setErrorForIpAndDestroy(req.socket); }); ws.prependListener("error", error => { if (error instanceof RangeError) { - this.setErrorForIpAndTerminate(req, ws); + this.setErrorForIpAndDestroy(req.socket); } }); const messageListeners = ws.listeners("message"); ws.removeAllListeners("message"); ws.prependListener("message", message => { - if (ws._disconnected) { - return this.setErrorForIpAndTerminate(req, ws); + if (req.socket._disconnected) { + return this.setErrorForIpAndDestroy(req.socket); } else if (message === "#2") { const timeNow: number = new Date().getTime() / 1000; - if (ws._lastPingTime && timeNow - ws._lastPingTime < 1) { - return this.setErrorForIpAndTerminate(req, ws); + if (req.socket._lastPingTime && timeNow - req.socket._lastPingTime < 1) { + return this.setErrorForIpAndDestroy(req.socket); } - ws._lastPingTime = timeNow; + req.socket._lastPingTime = timeNow; } else if (message.length < 10) { // except for #2 message, we should have JSON with some required properties // (see below) which implies that message length should be longer than 10 chars - return this.setErrorForIpAndTerminate(req, ws); + return this.setErrorForIpAndDestroy(req.socket); } else { try { const parsed = JSON.parse(message); if (parsed.event === "#disconnect") { - ws._disconnected = true; + req.socket._disconnected = true; } else if (parsed.event === "#handshake") { - if (ws._handshake) { - return this.setErrorForIpAndTerminate(req, ws); + if (req.socket._handshake) { + return this.setErrorForIpAndDestroy(req.socket); } - ws._handshake = true; + req.socket._handshake = true; + clearTimeout(req.socket._connectionTimer); } else if ( typeof parsed.event !== "string" || typeof parsed.data !== "object" || @@ -144,10 +145,10 @@ export class Worker extends SCWorker { (parsed.event === "#disconnect" && typeof parsed.cid !== "undefined")) || !this.handlers.includes(parsed.event) ) { - return this.setErrorForIpAndTerminate(req, ws); + return this.setErrorForIpAndDestroy(req.socket); } } catch (error) { - return this.setErrorForIpAndTerminate(req, ws); + return this.setErrorForIpAndDestroy(req.socket); } } @@ -219,11 +220,9 @@ export class Worker extends SCWorker { return false; } - private setErrorForIpAndTerminate(req, ws?): void { - this.ipLastError[req.socket.remoteAddress] = Date.now(); - if (ws) { - ws.terminate(); - } + private setErrorForIpAndDestroy(socket): void { + this.ipLastError[socket.remoteAddress] = Date.now(); + socket.destroy(); } private async handleConnection(socket): Promise { @@ -246,6 +245,12 @@ export class Worker extends SCWorker { return; } + socket._connectionTimer = setTimeout(() => { + if (!socket._handshake) { + this.setErrorForIpAndDestroy(socket); + } + }, SOCKET_TIMEOUT * 2); + const { data }: { data: { blocked: boolean } } = await this.sendToMasterAsync( "p2p.internal.isBlockedByRateLimit", {