From 63634b4e2e9bc713d4ee2849963bfeb198d14499 Mon Sep 17 00:00:00 2001 From: Marko Kohtala Date: Fri, 1 Jul 2022 08:03:18 +0300 Subject: [PATCH] Delay concat of read chunks into frame Buffer.concat is slow on large frames. Collect read chunks into list until full frame has been received. --- lib/connection.js | 27 +++++++++++++++++++++++---- lib/sasl.js | 23 +++++++++++++++++++++++ lib/transport.js | 8 ++++++++ test/messages.ts | 3 +++ typings/sasl.d.ts | 3 +++ typings/transport.d.ts | 1 + 6 files changed, 61 insertions(+), 4 deletions(-) diff --git a/lib/connection.js b/lib/connection.js index 766a17b..7f06e1b 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -547,15 +547,34 @@ Connection.prototype.input = function (buff) { try { if (this.heartbeat_in) clearTimeout(this.heartbeat_in); log.io('[%s] read %d bytes', this.options.id, buff.length); - if (this.previous_input) { - buffer = Buffer.concat([this.previous_input, buff], this.previous_input.length + buff.length); + if (this.frame_size) { + this.received_bytes += buff.length; + this.chunks.push(buff); + if (this.frame_size <= this.received_bytes) { + buffer = Buffer.concat(this.chunks, this.received_bytes); + this.chunks = null; + this.frame_size = undefined; + } else { + log.io('[%s] pushed %d bytes', this.options.id, buff.length); + return; + } + } else if (this.previous_input) { + buffer = Buffer.concat([this.previous_input, buff]); this.previous_input = null; } else { buffer = buff; } - var read = this.transport.read(buffer, this); + const read = this.transport.read(buffer, this); if (read < buffer.length) { - this.previous_input = buffer.slice(read); + const previous_input = buffer.slice(read); + this.frame_size = this.transport.peek_size(previous_input); + if (this.frame_size) { + this.chunks = [previous_input]; + this.received_bytes = previous_input.length; + log.io('[%s] waiting frame_size %s', this.options.id, this.frame_size); + } else { + this.previous_input = previous_input; + } } if (this.local.open.idle_time_out) this.heartbeat_in = setTimeout(this.idle.bind(this), this.local.open.idle_time_out); if (this.transport.has_writes_pending()) { diff --git a/lib/sasl.js b/lib/sasl.js index 99d239f..d61bbae 100644 --- a/lib/sasl.js +++ b/lib/sasl.js @@ -227,6 +227,14 @@ SaslServer.prototype.write = function (socket) { } }; +SaslServer.prototype.peek_size = function (buffer) { + if (this.transport.read_complete) { + return this.next.peek_size(buffer); + } else { + return this.transport.peek_size(buffer); + } +}; + SaslServer.prototype.read = function (buffer) { if (this.transport.read_complete) { return this.next.read(buffer); @@ -323,6 +331,14 @@ SaslClient.prototype.write = function (socket) { } }; +SaslClient.prototype.peek_size = function (buffer) { + if (this.transport.read_complete) { + return this.next.peek_size(buffer); + } else { + return this.transport.peek_size(buffer); + } +}; + SaslClient.prototype.read = function (buffer) { if (this.transport.read_complete) { return this.next.read(buffer); @@ -352,6 +368,13 @@ SelectiveServer.prototype.write = function (socket) { } }; +SelectiveServer.prototype.peek_size = function (buffer) { + if (this.header_received) { + return this.selected.peek_size(buffer); + } + return undefined; +}; + SelectiveServer.prototype.read = function (buffer) { if (!this.header_received) { if (buffer.length < 8) { diff --git a/lib/transport.js b/lib/transport.js index 561c7bd..4219823 100644 --- a/lib/transport.js +++ b/lib/transport.js @@ -64,6 +64,14 @@ Transport.prototype.write = function (socket) { this.pending = []; }; +Transport.prototype.peek_size = function (buffer) { + log.frames('[%s] peek_size %o, %d', this.identifier, this.header_received, buffer.length); + if (this.header_received && buffer.length >= 4) { + return buffer.readUInt32BE(); + } + return undefined; +}; + Transport.prototype.read = function (buffer) { var offset = 0; if (!this.header_received) { diff --git a/test/messages.ts b/test/messages.ts index a7e747d..b7e39a5 100644 --- a/test/messages.ts +++ b/test/messages.ts @@ -154,6 +154,9 @@ describe('message content', function() { it('sends and receives body of 50k', transfer_test({body:new Array(1024*50+1).join('z')}, function(message: rhea.Message) { assert.equal(message.body, new Array(1024*50+1).join('z')); })); + it('sends and receives body of 50M', transfer_test({body:new Array(1024*1024*50+1).join('z')}, function(message: rhea.Message) { + assert.equal(message.body, new Array(1024*1024*50+1).join('z')); + })); it('sends and receives map body', transfer_test({body:{colour:'green',age:8,happy:true, sad:false}}, function(message: rhea.Message) { assert.equal(message.body.colour, 'green'); assert.equal(message.body.age, 8); diff --git a/typings/sasl.d.ts b/typings/sasl.d.ts index 206ae08..ccb517c 100644 --- a/typings/sasl.d.ts +++ b/typings/sasl.d.ts @@ -69,6 +69,7 @@ declare class SaslServer { on_sasl_response(frame: frames): void; has_writes_pending(): boolean; write(socket: Socket): void; + peek_size(buffer: Buffer): number | undefined; read(buffer: Buffer): number; } @@ -87,6 +88,7 @@ declare class SaslClient { on_sasl_outcome(frame: frames): void; has_writes_pending(): boolean; write(socket: Socket): void; + peek_size(buffer: Buffer): number | undefined; read(buffer: Buffer): number; } @@ -100,6 +102,7 @@ declare class SelectiveServer { constructor(connection: Connection, mechanisms: Mechanisms); has_writes_pending(): boolean; write(socket: Socket): void | number; + peek_size(buffer: Buffer): number | undefined; read(buffer: Buffer): number; } diff --git a/typings/transport.d.ts b/typings/transport.d.ts index 2695ab9..11a868e 100644 --- a/typings/transport.d.ts +++ b/typings/transport.d.ts @@ -16,5 +16,6 @@ export declare interface Transport { has_writes_pending(): boolean; encode(frame: frames): void; write(socket: Socket): void; + peek_size(buffer: Buffer): number | undefined; read(buffer: Buffer): number; }