Skip to content

Commit

Permalink
Delay concat of read chunks into frame
Browse files Browse the repository at this point in the history
Buffer.concat is slow on large frames. Collect read chunks into list until
full frame has been received.
  • Loading branch information
kohtala authored and grs committed Jul 1, 2022
1 parent 228ad64 commit 63634b4
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 4 deletions.
27 changes: 23 additions & 4 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -547,15 +547,34 @@ Connection.prototype.input = function (buff) {
try {
if (this.heartbeat_in) clearTimeout(this.heartbeat_in);
log.io('[%s] read %d bytes', this.options.id, buff.length);
if (this.previous_input) {
buffer = Buffer.concat([this.previous_input, buff], this.previous_input.length + buff.length);
if (this.frame_size) {
this.received_bytes += buff.length;
this.chunks.push(buff);
if (this.frame_size <= this.received_bytes) {
buffer = Buffer.concat(this.chunks, this.received_bytes);
this.chunks = null;
this.frame_size = undefined;
} else {
log.io('[%s] pushed %d bytes', this.options.id, buff.length);
return;
}
} else if (this.previous_input) {
buffer = Buffer.concat([this.previous_input, buff]);
this.previous_input = null;
} else {
buffer = buff;
}
var read = this.transport.read(buffer, this);
const read = this.transport.read(buffer, this);
if (read < buffer.length) {
this.previous_input = buffer.slice(read);
const previous_input = buffer.slice(read);
this.frame_size = this.transport.peek_size(previous_input);
if (this.frame_size) {
this.chunks = [previous_input];
this.received_bytes = previous_input.length;
log.io('[%s] waiting frame_size %s', this.options.id, this.frame_size);
} else {
this.previous_input = previous_input;
}
}
if (this.local.open.idle_time_out) this.heartbeat_in = setTimeout(this.idle.bind(this), this.local.open.idle_time_out);
if (this.transport.has_writes_pending()) {
Expand Down
23 changes: 23 additions & 0 deletions lib/sasl.js
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ SaslServer.prototype.write = function (socket) {
}
};

SaslServer.prototype.peek_size = function (buffer) {
if (this.transport.read_complete) {
return this.next.peek_size(buffer);
} else {
return this.transport.peek_size(buffer);
}
};

SaslServer.prototype.read = function (buffer) {
if (this.transport.read_complete) {
return this.next.read(buffer);
Expand Down Expand Up @@ -323,6 +331,14 @@ SaslClient.prototype.write = function (socket) {
}
};

SaslClient.prototype.peek_size = function (buffer) {
if (this.transport.read_complete) {
return this.next.peek_size(buffer);
} else {
return this.transport.peek_size(buffer);
}
};

SaslClient.prototype.read = function (buffer) {
if (this.transport.read_complete) {
return this.next.read(buffer);
Expand Down Expand Up @@ -352,6 +368,13 @@ SelectiveServer.prototype.write = function (socket) {
}
};

SelectiveServer.prototype.peek_size = function (buffer) {
if (this.header_received) {
return this.selected.peek_size(buffer);
}
return undefined;
};

SelectiveServer.prototype.read = function (buffer) {
if (!this.header_received) {
if (buffer.length < 8) {
Expand Down
8 changes: 8 additions & 0 deletions lib/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ Transport.prototype.write = function (socket) {
this.pending = [];
};

Transport.prototype.peek_size = function (buffer) {
log.frames('[%s] peek_size %o, %d', this.identifier, this.header_received, buffer.length);
if (this.header_received && buffer.length >= 4) {
return buffer.readUInt32BE();
}
return undefined;
};

Transport.prototype.read = function (buffer) {
var offset = 0;
if (!this.header_received) {
Expand Down
3 changes: 3 additions & 0 deletions test/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ describe('message content', function() {
it('sends and receives body of 50k', transfer_test({body:new Array(1024*50+1).join('z')}, function(message: rhea.Message) {
assert.equal(message.body, new Array(1024*50+1).join('z'));
}));
it('sends and receives body of 50M', transfer_test({body:new Array(1024*1024*50+1).join('z')}, function(message: rhea.Message) {
assert.equal(message.body, new Array(1024*1024*50+1).join('z'));
}));
it('sends and receives map body', transfer_test({body:{colour:'green',age:8,happy:true, sad:false}}, function(message: rhea.Message) {
assert.equal(message.body.colour, 'green');
assert.equal(message.body.age, 8);
Expand Down
3 changes: 3 additions & 0 deletions typings/sasl.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ declare class SaslServer {
on_sasl_response(frame: frames): void;
has_writes_pending(): boolean;
write(socket: Socket): void;
peek_size(buffer: Buffer): number | undefined;
read(buffer: Buffer): number;
}

Expand All @@ -87,6 +88,7 @@ declare class SaslClient {
on_sasl_outcome(frame: frames): void;
has_writes_pending(): boolean;
write(socket: Socket): void;
peek_size(buffer: Buffer): number | undefined;
read(buffer: Buffer): number;
}

Expand All @@ -100,6 +102,7 @@ declare class SelectiveServer {
constructor(connection: Connection, mechanisms: Mechanisms);
has_writes_pending(): boolean;
write(socket: Socket): void | number;
peek_size(buffer: Buffer): number | undefined;
read(buffer: Buffer): number;
}

Expand Down
1 change: 1 addition & 0 deletions typings/transport.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ export declare interface Transport {
has_writes_pending(): boolean;
encode(frame: frames): void;
write(socket: Socket): void;
peek_size(buffer: Buffer): number | undefined;
read(buffer: Buffer): number;
}

0 comments on commit 63634b4

Please sign in to comment.