Skip to content

Commit

Permalink
fix(core-p2p): only accept valid http path (SC http server) (#3537)
Browse files Browse the repository at this point in the history
  • Loading branch information
air1one authored Feb 26, 2020
1 parent 021ad47 commit f6462ef
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 23 deletions.
31 changes: 31 additions & 0 deletions __tests__/integration/core-p2p/socket-server/peer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { defaults } from "../mocks/p2p-options";
import { Blocks, Managers } from "@arkecosystem/crypto/src";
import unitnetMilestones from "@arkecosystem/crypto/src/networks/unitnet/milestones.json";
import delay from "delay";
import net from "net";
import SocketCluster from "socketcluster";
import socketCluster from "socketcluster-client";
import { startSocketServer } from "../../../../packages/core-p2p/src/socket-server";
Expand Down Expand Up @@ -533,6 +534,36 @@ describe("Peer socket endpoint", () => {
send(stringifiedPayload);
await delay(500);
expect(socket.state).not.toBe("open");

// kill workers to reset ipLastError (or we won't pass handshake for 1 minute)
server.killWorkers({ immediate: true });
await delay(2000); // give time to workers to respawn
});

it("should close the connection when the HTTP url is not valid", async () => {
const socket = new net.Socket();
socket.connect(4007, "127.0.0.1", function() {
socket.write("GET /invalid/ HTTP/1.0\r\n\r\n");
});
await delay(500);
expect(socket.destroyed).toBe(true);

socket.connect(4007, "127.0.0.1");
await delay(500);
expect(socket.destroyed).toBe(true);

// kill workers to reset ipLastError (or we won't pass handshake for 1 minute)
server.killWorkers({ immediate: true });
await delay(2000); // give time to workers to respawn
});

it("should close the connection if the initial HTTP request is not processed within 2 seconds", async () => {
const socket = new net.Socket();
socket.connect(4007, "127.0.0.1");
await delay(500);
expect(socket.destroyed).toBe(false);
await delay(2000);
expect(socket.destroyed).toBe(true);
});
});
});
10 changes: 9 additions & 1 deletion __tests__/unit/core-p2p/socket-server/worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@ import { Worker } from "../../../../packages/core-p2p/src/socket-server/worker";
const worker = new Worker();

// @ts-ignore
worker.scServer.wsServer = { on: () => undefined };
worker.scServer.wsServer = {
on: () => undefined,
_server: {
on: () => undefined,
},
};
worker.httpServer = {
on: () => undefined,
} as any;
worker.scServer.setCodecEngine = codec => undefined;

describe("Worker", () => {
Expand Down
54 changes: 32 additions & 22 deletions packages/core-p2p/src/socket-server/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ export class Worker extends SCWorker {

await this.loadHandlers();

// @ts-ignore
this.scServer.wsServer._server.timeout = 2000;

// @ts-ignore
this.scServer.wsServer.on("connection", (ws, req) => {
const clients = [...Object.values(this.scServer.clients), ...Object.values(this.scServer.pendingClients)];
Expand All @@ -52,10 +55,17 @@ export class Worker extends SCWorker {
}
this.handlePayload(ws, req);
});
// @ts-ignore
this.httpServer.on("request", req => {
// @ts-ignore
if (req.method !== "GET" || req.url !== this.scServer.wsServer.options.path) {
this.setErrorForIpAndTerminate(req);
req.destroy();
}
});
// @ts-ignore
this.scServer.wsServer._server.on("connection", socket => this.handleSocket(socket));
this.scServer.on("connection", socket => this.handleConnection(socket));
this.scServer.addMiddleware(this.scServer.MIDDLEWARE_HANDSHAKE_WS, (req, next) =>
this.handleHandshake(req, next),
);
this.scServer.addMiddleware(this.scServer.MIDDLEWARE_EMIT, (req, next) => this.handleEmit(req, next));
}

Expand Down Expand Up @@ -89,41 +99,41 @@ export class Worker extends SCWorker {
ws.removeAllListeners("ping");
ws.removeAllListeners("pong");
ws.prependListener("ping", () => {
this.setErrorForIpAndTerminate(ws, req);
this.setErrorForIpAndTerminate(req, ws);
});
ws.prependListener("pong", () => {
this.setErrorForIpAndTerminate(ws, req);
this.setErrorForIpAndTerminate(req, ws);
});

ws.prependListener("error", error => {
if (error instanceof RangeError) {
this.setErrorForIpAndTerminate(ws, req);
this.setErrorForIpAndTerminate(req, ws);
}
});

const messageListeners = ws.listeners("message");
ws.removeAllListeners("message");
ws.prependListener("message", message => {
if (ws._disconnected) {
return this.setErrorForIpAndTerminate(ws, req);
return this.setErrorForIpAndTerminate(req, ws);
} else if (message === "#2") {
const timeNow: number = new Date().getTime() / 1000;
if (ws._lastPingTime && timeNow - ws._lastPingTime < 1) {
return this.setErrorForIpAndTerminate(ws, req);
return this.setErrorForIpAndTerminate(req, ws);
}
ws._lastPingTime = timeNow;
} else if (message.length < 10) {
// except for #2 message, we should have JSON with some required properties
// (see below) which implies that message length should be longer than 10 chars
return this.setErrorForIpAndTerminate(ws, req);
return this.setErrorForIpAndTerminate(req, ws);
} else {
try {
const parsed = JSON.parse(message);
if (parsed.event === "#disconnect") {
ws._disconnected = true;
} else if (parsed.event === "#handshake") {
if (ws._handshake) {
return this.setErrorForIpAndTerminate(ws, req);
return this.setErrorForIpAndTerminate(req, ws);
}
ws._handshake = true;
} else if (
Expand All @@ -134,10 +144,10 @@ export class Worker extends SCWorker {
(parsed.event === "#disconnect" && typeof parsed.cid !== "undefined")) ||
!this.handlers.includes(parsed.event)
) {
return this.setErrorForIpAndTerminate(ws, req);
return this.setErrorForIpAndTerminate(req, ws);
}
} catch (error) {
return this.setErrorForIpAndTerminate(ws, req);
return this.setErrorForIpAndTerminate(req, ws);
}
}

Expand Down Expand Up @@ -209,9 +219,11 @@ export class Worker extends SCWorker {
return false;
}

private setErrorForIpAndTerminate(ws, req): void {
private setErrorForIpAndTerminate(req, ws?): void {
this.ipLastError[req.socket.remoteAddress] = Date.now();
ws.terminate();
if (ws) {
ws.terminate();
}
}

private async handleConnection(socket): Promise<void> {
Expand All @@ -227,10 +239,10 @@ export class Worker extends SCWorker {
}
}

private async handleHandshake(req, next): Promise<void> {
const ip = req.socket.remoteAddress;
if (this.ipLastError[ip] && this.ipLastError[ip] > Date.now() - MINUTE_IN_MILLISECONDS) {
req.socket.destroy();
private async handleSocket(socket): Promise<void> {
const ip = socket.remoteAddress;
if (!ip || (this.ipLastError[ip] && this.ipLastError[ip] > Date.now() - MINUTE_IN_MILLISECONDS)) {
socket.destroy();
return;
}

Expand All @@ -243,7 +255,7 @@ export class Worker extends SCWorker {

const isBlacklisted: boolean = (this.config.blacklist || []).includes(ip);
if (data.blocked || isBlacklisted) {
req.socket.destroy();
socket.destroy();
return;
}

Expand All @@ -252,11 +264,9 @@ export class Worker extends SCWorker {
client => cidr(`${client.remoteAddress}/24`) === cidrRemoteAddress,
);
if (sameSubnetSockets.length > this.config.maxSameSubnetPeers) {
req.socket.destroy();
socket.destroy();
return;
}

next();
}

private async handleEmit(req, next): Promise<void> {
Expand Down

0 comments on commit f6462ef

Please sign in to comment.