Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Result set streaming #702

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions db-service/lib/SQLService.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,23 +115,24 @@ class SQLService extends DatabaseService {
* Handler for SELECT
* @type {Handler}
*/
async onSELECT({ query, data }) {
async onSELECT({ query, data, hasPostProcessing, iterator }) {
// REVISIT: for custom joins, infer is called twice, which is bad
// --> make cds.infer properly work with custom joins and remove this
if (!query.target) {
try { this.infer(query) } catch { /**/ }
}
if (query.target && !query.target._unresolved) {
if (query.SELECT.expand !== false && query.target && !query.target._unresolved) {
// Will return multiple rows with objects inside
query.SELECT.expand = 'root'
}

const { sql, values, cqn } = this.cqn2sql(query, data)
const expand = query.SELECT.expand
delete query.SELECT.expand
const isOne = cqn.SELECT.one || query.SELECT.from?.ref?.[0].cardinality?.max === 1

let ps = await this.prepare(sql)
let rows = await ps.all(values)
let rows = (hasPostProcessing === false || iterator) ? await ps.stream(values, isOne, iterator) : await ps.all(values)
if (rows.length)
if (expand) rows = rows.map(r => (typeof r._json_ === 'string' ? JSON.parse(r._json_) : r._json_ || r))

Expand All @@ -157,7 +158,7 @@ class SQLService extends DatabaseService {
return SQLService._arrayWithCount(rows, await this.count(query, rows))
}

return cqn.SELECT.one || query.SELECT.from?.ref?.[0].cardinality?.max === 1 ? rows[0] : rows
return hasPostProcessing !== false && isOne ? rows[0] : rows
}

/**
Expand Down Expand Up @@ -287,7 +288,7 @@ class SQLService extends DatabaseService {
* @returns {Promise<number>}
*/
async count(query, ret) {
if (ret) {
if (ret?.length) {
const { one, limit: _ } = query.SELECT,
n = ret.length
const [max, offset = 0] = one ? [1] : _ ? [_.rows?.val, _.offset?.val] : []
Expand Down
10 changes: 6 additions & 4 deletions hana/lib/HANAService.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class HANAService extends SQLService {
}

async onSELECT(req) {
const { query, data } = req
const { query, data, hasPostProcessing, iterator } = req

if (!query.target) {
try { this.infer(query) } catch { /**/ }
Expand All @@ -138,13 +138,15 @@ class HANAService extends SQLService {
delete query.SELECT.expand

const isSimple = temporary.length + blobs.length + withclause.length === 0
const isOne = cqn.SELECT.one || query.SELECT.from.ref?.[0].cardinality?.max === 1
const canStream = (hasPostProcessing === false || iterator) && !isLockQuery

// REVISIT: add prepare options when param:true is used
const sqlScript = isLockQuery || isSimple ? sql : this.wrapTemporary(temporary, withclause, blobs)
let rows
if (values?.length || blobs.length > 0) {
if (values?.length || blobs.length > 0 || canStream) {
const ps = await this.prepare(sqlScript, blobs.length)
rows = this.ensureDBC() && await ps.all(values || [])
rows = this.ensureDBC() && await ps[canStream ? 'stream' : 'all'](values || [], isOne, iterator)
} else {
rows = await this.exec(sqlScript)
}
Expand All @@ -164,7 +166,7 @@ class HANAService extends SQLService {
// REVISIT: the runtime always expects that the count is preserved with .map, required for renaming in mocks
return HANAService._arrayWithCount(rows, await this.count(query, rows))
}
return cqn.SELECT.one || query.SELECT.from.ref?.[0].cardinality?.max === 1 ? rows[0] : rows
return isOne && !canStream ? rows[0] : rows
}

async onINSERT({ query, data }) {
Expand Down
9 changes: 8 additions & 1 deletion hana/lib/drivers/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ const handleLevel = function (levels, path, expands) {
const property = path.slice(level.path.length + 2, -7)
if (property && property in level.expands) {
const is2Many = level.expands[property]
delete level.expands[property]
// delete level.expands[property]
if (level.hasProperties) {
buffer += ','
} else {
Expand All @@ -236,6 +236,7 @@ const handleLevel = function (levels, path, expands) {
index: 1,
suffix: is2Many ? ']' : '',
path: path.slice(0, -6),
result: level.expands[property],
expands,
})
} else {
Expand All @@ -247,6 +248,7 @@ const handleLevel = function (levels, path, expands) {
index: 0,
suffix: '}',
path: path,
result: levels.at(-1).result,
expands,
})
break
Expand All @@ -261,6 +263,11 @@ const handleLevel = function (levels, path, expands) {
}
}
if (level.suffix) buffer += level.suffix
if (level.expands) {
for (const expand in level.expands) {
if (level.expands[expand]?.push) level.expands[expand]?.push(null)
}
}
}
}
return buffer
Expand Down
142 changes: 66 additions & 76 deletions hana/lib/drivers/hana-client.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
const { Readable, Stream } = require('stream')
const { pipeline } = require('stream/promises')

const cds = require('@sap/cds')
const hdb = require('@sap/hana-client')
const { driver, prom, handleLevel } = require('./base')
const { resultSetStream } = require('./stream')

const LOG = cds.log('@sap/hana-client')
if (process.env.NODE_ENV === 'production' && !process.env.HDB_NODEJS_THREADPOOL_SIZE && !process.env.UV_THREADPOOL_SIZE) LOG.warn("When using @sap/hana-client, it's strongly recommended to adjust its thread pool size with environment variable `HDB_NODEJS_THREADPOOL_SIZE`, otherwise it might lead to performance issues.\nLearn more: https://help.sap.com/docs/SAP_HANA_CLIENT/f1b440ded6144a54ada97ff95dac7adf/31a8c93a574b4f8fb6a8366d2c758f21.html")

Expand Down Expand Up @@ -134,7 +137,7 @@ class HANAClientDriver extends driver {
return this._getResultForProcedure(rows, outParameters, stmt)
}

ret.stream = async (values, one) => {
ret.stream = async (values, one, objectMode) => {
const stmt = await ret._prep
values = Array.isArray(values) ? values : []
// Uses the native exec method instead of executeQuery to initialize a full stream
Expand All @@ -159,7 +162,7 @@ class HANAClientDriver extends driver {
if (rs.isNull(0)) return null
return Readable.from(streamBlob(rs, undefined, 0), { objectMode: false })
}
return Readable.from(rsIterator(rs, one), { objectMode: false })
return rsIterator(rs, one, objectMode)
}
return ret
}
Expand Down Expand Up @@ -232,10 +235,13 @@ class HANAClientDriver extends driver {

HANAClientDriver.pool = true

async function* rsIterator(rs, one) {
const next = prom(rs, 'next') // () => rs.next()
const getValue = prom(rs, 'getValue') // nr => rs.getValue(nr)
const getData = prom(rs, 'getData') // (nr, pos, buf, zero, bufSize) => rs.getData(nr, pos, buf, zero, bufSize) //
async function rsIterator(rs, one, objectMode) {
rs._rowPosition = -1
rs.nextAsync = prom(rs, 'next')
rs.getValueAsync = prom(rs, 'getValue')
rs.getValueAsync = prom(rs, 'getData')

const blobs = rs.getColumnInfo().slice(4).map(b => b.columnName)
const levels = [
{
index: 0,
Expand All @@ -244,83 +250,67 @@ async function* rsIterator(rs, one) {
expands: {},
},
]

const binaryBuffer = new Buffer.alloc(1 << 16)

const blobColumns = {}
rs.getColumnInfo()
.slice(4)
.forEach((c, i) => {
blobColumns[c.columnName] = i + 4
})

if (!one) {
yield '['
}

let buffer = ''
// Load next row of the result set (starts before the first row)
while (await next()) {
const values = await Promise.all([getValue(0), getValue(1), getValue(2)])

const [path, _blobs, _expands] = values
const expands = JSON.parse(_expands)
const blobs = JSON.parse(_blobs)

yield handleLevel(levels, path, expands)

let hasProperties = false
let jsonPosition = 0
while (true) {
const read = await getData(3, jsonPosition, binaryBuffer, 0, binaryBuffer.byteLength)
if (read < binaryBuffer.byteLength) {
if (read > 2) hasProperties = true
// Pipe json stream.slice(0,-1) removing the } to keep the object open
yield binaryBuffer.slice(0, read - 1).toString('utf-8')
break
}
jsonPosition += read
yield binaryBuffer.toString('utf-8')
}

for (const key of Object.keys(blobs)) {
if (hasProperties) buffer += ','
hasProperties = true
buffer += `${JSON.stringify(key)}:`

const columnIndex = blobColumns[key]
if (rs.isNull(columnIndex)) {
buffer += 'null'
continue
const state = {
rs,
levels,
blobs,
columnIndex: 0,
binaryBuffer: new Buffer.alloc(1 << 16),
done() {
this.columnIndex = 0
this.rs._rowPosition++
return this.rs.nextCanBlock()
? this.rs.nextAsync().then(a => !a)
: !this.rs.next()
},
inject(str) {
if (str == null) return
this.stream.push(str)
},
read() {
this.columnIndex++
},
readString() {
const index = this.columnIndex++
if (index === 3) {
const _inject = str => {
this.inject(str.slice(0, -1))
return str.length
}
return this.rs.getValuesCanBlock()
? this.rs.getValueAsync(index).then(_inject)
: _inject(this.rs.getValue(index))
}

buffer += '"'
yield buffer
buffer = ''

const stream = Readable.from(streamBlob(rs, undefined, columnIndex, binaryBuffer), { objectMode: false })
return this.rs.getValuesCanBlock()
? this.rs.getValueAsync(index)
: this.rs.getValue(index)
},
readBlob() {
const index = this.columnIndex++
const stream = Readable.from(streamBlob(this.rs, undefined, index, this.binaryBuffer), { objectMode: false })
stream.setEncoding('base64')
for await (const chunk of stream) {
yield chunk
}
buffer += '"'
this.stream.push('"')
return pipeline(stream, this.stream, { end: false }).then(() => { this.stream.push('"') })
}
}

if (buffer) {
yield buffer
buffer = ''
if (objectMode) {
state.inject = function inject() { }
state.readString = function readString() {
const index = this.columnIndex++
return this.rs.getValuesCanBlock()
? this.rs.getValueAsync(index)
: this.rs.getValue(index)
}
state.readBlob = function readBlob() {
const index = this.columnIndex++
const stream = Readable.from(streamBlob(this.rs, rs._rowPosition, index, this.binaryBuffer), { objectMode: false })
stream.setEncoding('base64')
return stream
}

const level = levels[levels.length - 1]
level.hasProperties = hasProperties
}

// Close all left over levels
buffer += levels
.reverse()
.map(l => l.suffix)
.join('')
yield buffer
return resultSetStream(state, one, objectMode)
}

async function* streamBlob(rs, rowIndex = -1, columnIndex, binaryBuffer = Buffer.allocUnsafe(1 << 16)) {
Expand Down
Loading