Skip to content

Commit

Permalink
Odata opaque cursor (#934)
Browse files Browse the repository at this point in the history
* added integration tests to capture requirement

* Made changes for Submission OData
need to add more unit tests

* changes for Entiites Odata

* refactored a bit and added unit tests

* prefix skiptoken with 01

* just use repeatId for subtables

* fix entity count

* datasets/:name is extendable

* added test for extended datasets/:name

* polish some of the tests
  • Loading branch information
sadiqkhoja authored Sep 12, 2023
1 parent f02dd3a commit 211dd59
Show file tree
Hide file tree
Showing 19 changed files with 623 additions and 114 deletions.
10 changes: 7 additions & 3 deletions lib/data/entity.js
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,13 @@ const extractSelectedProperties = (query, properties) => {
};

// Pagination is done at the database level
const streamEntityOdata = (inStream, properties, domain, originalUrl, query, tableCount) => {
const streamEntityOdata = (inStream, properties, domain, originalUrl, query, tableCount, tableRemaining) => {
const serviceRoot = getServiceRoot(originalUrl);
const { limit, offset, shouldCount } = extractPaging(query);
const { limit, offset, shouldCount, skipToken } = extractPaging(query);
const selectedProperties = extractSelectedProperties(query, properties);

let isFirstEntity = true;
let lastUuid;
const rootStream = new Transform({
writableObjectMode: true, // we take a stream of objects from the db, but
readableObjectMode: false, // we put out a stream of text.
Expand All @@ -281,6 +282,8 @@ const streamEntityOdata = (inStream, properties, domain, originalUrl, query, tab
this.push(',');
}

lastUuid = entity.uuid;

this.push(JSON.stringify(selectFields(entity, properties, selectedProperties)));

done();
Expand All @@ -290,8 +293,9 @@ const streamEntityOdata = (inStream, properties, domain, originalUrl, query, tab
}, flush(done) {
this.push((isFirstEntity) ? '{"value":[],' : '],'); // open object or close row array.

const remaining = skipToken ? tableRemaining - limit : tableCount - (limit + offset);
// @odata.count and nextUrl.
const nextUrl = nextUrlFor(limit, offset, tableCount, originalUrl);
const nextUrl = nextUrlFor(remaining, originalUrl, { uuid: lastUuid });

this.push(jsonDataFooter({ table: 'Entities', domain, serviceRoot, nextUrl, count: (shouldCount ? tableCount.toString() : null) }));
done();
Expand Down
12 changes: 4 additions & 8 deletions lib/data/odata.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,7 @@ const submissionToOData = (fields, table, submission, options = {}) => new Promi
// on option.metadata even though they are not part of the form's own schema.
// So rather than try to inject them into the xml transformation below, we just
// formulate them herein advance:
if (!options.metadata || options.metadata.__id) {
root.__id = submission.instanceId;
}
root.__id = submission.instanceId;
if (table === 'Submissions') {
const systemObj = {
submissionDate: submission.createdAt,
Expand Down Expand Up @@ -175,7 +173,7 @@ const submissionToOData = (fields, table, submission, options = {}) => new Promi
}

// bail out without doing any work if we are encrypted.
if (encrypted === true) return resolve(result);
if (encrypted === true) return resolve({ data: result, instanceId: submission.instanceId });

// we keep a dataStack, so we build an appropriate nested structure overall, and
// we can select the appropriate layer of that nesting at will.
Expand Down Expand Up @@ -220,9 +218,7 @@ const submissionToOData = (fields, table, submission, options = {}) => new Promi

// create our new databag, push into result data, and set it as our result ptr.
const bag = generateDataFrame(schemaStack);
if (!options.metadata || options.metadata.__id) {
bag.__id = hashId(schemaStack, submission.instanceId);
}
bag.__id = hashId(schemaStack, submission.instanceId);
dataPtr[outname].push(bag);
dataStack.push(bag);

Expand Down Expand Up @@ -342,7 +338,7 @@ const submissionToOData = (fields, table, submission, options = {}) => new Promi

if (schemaStack.hasExited()) {
parser.reset();
resolve(result);
resolve({ data: result, instanceId: submission.instanceId });
}
}
}, { xmlMode: true, decodeEntities: true });
Expand Down
93 changes: 79 additions & 14 deletions lib/formats/odata.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const { sanitizeOdataIdentifier, without } = require('../util/util');
const { jsonDataFooter, extractOptions, nextUrlFor } = require('../util/odata');
const { submissionToOData, systemFields } = require('../data/odata');
const { SchemaStack } = require('../data/schema');
const { QueryOptions } = require('../util/db');


////////////////////////////////////////////////////////////////////////////////
// UTIL
Expand Down Expand Up @@ -52,9 +54,10 @@ const extractPathContext = (subpath) =>
const extractPaging = (table, query) => {
const parsedLimit = parseInt(query.$top, 10);
const limit = Number.isNaN(parsedLimit) ? Infinity : parsedLimit;
const offset = parseInt(query.$skip, 10) || 0;
const offset = (!query.$skiptoken && parseInt(query.$skip, 10)) || 0;
const shouldCount = isTrue(query.$count);
const result = { limit: max(0, limit), offset: max(0, offset), shouldCount };
const skipToken = query.$skiptoken ? QueryOptions.parseSkiptoken(query.$skiptoken) : null;
const result = { limit: max(0, limit), offset: max(0, offset), shouldCount, skipToken };

return Object.assign(result, (table === 'Submissions')
? { doLimit: Infinity, doOffset: 0 }
Expand Down Expand Up @@ -366,38 +369,66 @@ const edmxForEntities = (datasetName, properties) => {
// originalUrl: String is the request URL; we need it as well to formulate response URLs.
// query: Object is the Express Request query object indicating request querystring parameters.
// inStream: Stream[Row] is the postgres Submissions rowstream.
const rowStreamToOData = (fields, table, domain, originalUrl, query, inStream, tableCount) => {
const rowStreamToOData = (fields, table, domain, originalUrl, query, inStream, tableCount, tableRemaining) => {
// cache values we'll need repeatedly.
const serviceRoot = getServiceRoot(originalUrl);
const { limit, offset, doLimit, doOffset, shouldCount } = extractPaging(table, query);
const { doLimit, doOffset, shouldCount, skipToken } = extractPaging(table, query);
const options = extractOptions(query);
const isSubTable = table !== 'Submissions';

// make sure the target table actually exists.
// TODO: now that this doesn't require schema computation, should we move it up
// to the service level, along with the equivalent for singleRowToOData? probably.
if (!verifyTablePath(table.split('.'), fields)) throw Problem.user.notFound();

// write the header, then transform and stream each row.
// To count total number of items for subtable (repeats)
let counted = 0;
// To count items added to the downstream, required only for subtable
let added = 0;
// To count remaining items in case of subtable
let remainingItems = 0;

// skipToken is created based on following two variables
let lastInstanceId = null;
let lastRepeatId = null;

// For Submissions table, it is true because cursor is handled at database level
let cursorPredicate = !isSubTable || !skipToken;

const parserStream = new Transform({
writableObjectMode: true, // we take a stream of objects from the db, but
readableObjectMode: false, // we put out a stream of text.
transform(row, _, done) {
// per row, we do our asynchronous parsing, jam the result onto the
// text resultstream, and call done to indicate that the row is processed.
submissionToOData(fields, table, row, options).then((data) => {
submissionToOData(fields, table, row, options).then(({ data, instanceId }) => {

const parentIdProperty = data[0] ? Object.keys(data[0]).find(p => /^__.*-id$/.test(p)) : null;

// In case of subtable we are reading all Submissions without pagination because we have to
// count repeat items in each Submission
for (const field of data) {

// if $select is there and parentId is not requested then remove it
const fieldRefined = options.metadata && !options.metadata[parentIdProperty] ? without([parentIdProperty], field) : field;
let fieldRefined = options.metadata && !options.metadata[parentIdProperty] ? without([parentIdProperty], field) : field;
// if $select is there and __id is not requested then remove it
fieldRefined = options.metadata && !options.metadata.__id ? without(['__id'], fieldRefined) : fieldRefined;

if (added === doLimit) remainingItems += 1;

if ((counted >= doOffset) && (counted < (doOffset + doLimit))) {
this.push((counted === doOffset) ? '{"value":[' : ','); // header or fencepost.
if (added < doLimit && counted >= doOffset && cursorPredicate) {
this.push((added === 0) ? '{"value":[' : ','); // header or fencepost.
this.push(JSON.stringify(fieldRefined));
lastInstanceId = instanceId;
if (isSubTable) lastRepeatId = field.__id;
added += 1;
}

// Controls the rows to be skipped based on skipToken
// Once set to true remains true
cursorPredicate = cursorPredicate || skipToken.repeatId === field.__id;

counted += 1;
}
done(); // signifies that this stream element is fully processed.
Expand All @@ -406,12 +437,20 @@ const rowStreamToOData = (fields, table, domain, originalUrl, query, inStream, t
flush(done) {
// flush is called just before the transform stream is done and closed; we write
// our footer information, close the object, and tell the stream we are done.
this.push((counted <= doOffset) ? '{"value":[],' : '],'); // open object or close row array.
this.push((added === 0) ? '{"value":[],' : '],'); // open object or close row array.

// if we were given an explicit count, use it from here out, to create
// @odata.count and nextUrl.
const totalCount = (tableCount != null) ? tableCount : counted;
const nextUrl = nextUrlFor(limit, offset, totalCount, originalUrl);

// How many items are remaining for the next page?
// if there aren't any then we don't need nextUrl
const remaining = (tableRemaining != null) ? tableRemaining - added : remainingItems;

let skipTokenData = { instanceId: lastInstanceId };
if (isSubTable) skipTokenData = { repeatId: lastRepeatId };

const nextUrl = nextUrlFor(remaining, originalUrl, skipTokenData);

// we do toString on the totalCount because mustache won't bother rendering
// the block if it sees integer zero.
Expand Down Expand Up @@ -448,8 +487,10 @@ const singleRowToOData = (fields, row, domain, originalUrl, query) => {
const table = tableParts.join('.');
if (!verifyTablePath(tableParts, fields)) throw Problem.user.notFound();

const isSubTable = table !== 'Submissions';

// extract all our fields first, the field extractor doesn't know about target contexts.
return submissionToOData(fields, table, row, options).then((subrows) => {
return submissionToOData(fields, table, row, options).then(({ data: subrows, instanceId }) => {
// now we actually filter to the requested set. we actually only need to compare
// the very last specified id, since it is fully unique.
const filterContextIdx = targetContext.reduce(((extant, pair, idx) => ((pair[1] != null) ? idx : extant)), -1);
Expand All @@ -462,12 +503,36 @@ const singleRowToOData = (fields, row, domain, originalUrl, query) => {
const count = filtered.length;

// now we can process $top/$skip/$count:
const { limit, offset, shouldCount } = extractPaging(table, query);
const nextUrl = nextUrlFor(limit, offset, count, originalUrl);
const paging = extractPaging(table, query);
const { limit, shouldCount, skipToken } = paging;
let { offset } = paging;

if (skipToken) {
offset = filtered.fiindIndex(s => skipToken.repeatId === s.__id);
}

const pared = filtered.slice(offset, offset + limit);

let nextUrl = null;

if (pared.length > 0) {
const remaining = count - (offset + limit);

let skipTokenData = {
instanceId
};

if (isSubTable) skipTokenData = { repeatId: pared[pared.length - 1].__id };

nextUrl = nextUrlFor(remaining, originalUrl, skipTokenData);
}


// if $select is there and parentId is not requested then remove it
let paredRefined = options.metadata && !options.metadata[filterField] ? pared.map(p => without([filterField], p)) : pared;

// if $select is there and parentId is not requested then remove it
const paredRefined = options.metadata && !options.metadata[filterField] ? pared.map(p => without([filterField], p)) : pared;
paredRefined = options.metadata && !options.metadata.__id ? paredRefined.map(p => without(['__id'], p)) : paredRefined;

// and finally splice together and return our result:
const dataContents = paredRefined.map(JSON.stringify).join(',');
Expand Down
2 changes: 1 addition & 1 deletion lib/http/endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ const isJsonType = (x) => /(^|,)(application\/json|json)($|;|,)/i.test(x);
const isXmlType = (x) => /(^|,)(application\/(atom(svc)?\+)?xml|atom|xml)($|;|,)/i.test(x);

// various supported odata constants:
const supportedParams = [ '$format', '$count', '$skip', '$top', '$filter', '$wkt', '$expand', '$select' ];
const supportedParams = [ '$format', '$count', '$skip', '$top', '$filter', '$wkt', '$expand', '$select', '$skiptoken' ];
const supportedFormats = {
json: [ 'application/json', 'json' ],
xml: [ 'application/xml', 'atom' ]
Expand Down
29 changes: 26 additions & 3 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ INNER JOIN
SELECT "entityId", (COUNT(id) - 1) AS "updates" FROM entity_defs GROUP BY "entityId"
) stats ON stats."entityId"=entity_defs."entityId"
LEFT JOIN actors ON entities."creatorId"=actors.id
${options.skiptoken ? sql`
INNER JOIN
( SELECT id, "createdAt" FROM entities WHERE "uuid" = ${options.skiptoken.uuid}) AS cursor
ON entities."createdAt" <= cursor."createdAt" AND entities.id < cursor.id
`: sql``}
WHERE
entities."datasetId" = ${datasetId}
AND entities."deletedAt" IS NULL
Expand All @@ -324,10 +329,28 @@ ORDER BY entities."createdAt" DESC, entities.id DESC
${page(options)}`)
.then(stream.map(_exportUnjoiner));

const countByDatasetId = (datasetId, options = QueryOptions.none) => ({ oneFirst }) => oneFirst(sql`
SELECT count(*) FROM entities
const countByDatasetId = (datasetId, options = QueryOptions.none) => ({ one }) => one(sql`
SELECT * FROM
(
SELECT count(*) count FROM entities
WHERE "datasetId" = ${datasetId}
AND "deletedAt" IS NULL
AND ${odataFilter(options.filter, odataToColumnMap)}
) AS "all"
CROSS JOIN
(
SELECT COUNT(*) remaining FROM entities
${options.skiptoken ? sql`
INNER JOIN
( SELECT id, "createdAt" FROM entities WHERE "uuid" = ${options.skiptoken.uuid}) AS cursor
ON entities."createdAt" <= cursor."createdAt" AND entities.id < cursor.id
`: sql``}
WHERE "datasetId" = ${datasetId}
AND ${odataFilter(options.filter, odataToColumnMap)}`);
AND "deletedAt" IS NULL
AND ${odataFilter(options.filter, odataToColumnMap)}
) AS skiptoken`);


////////////////////////////////////////////////////////////////////////////////
Expand Down
23 changes: 20 additions & 3 deletions lib/model/query/submissions.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,21 @@ const getById = (submissionId) => ({ maybeOne }) =>
maybeOne(sql`select * from submissions where id=${submissionId} and "deletedAt" is null`)
.then(map(construct(Submission)));

const countByFormId = (formId, draft, options = QueryOptions.none) => ({ oneFirst }) => oneFirst(sql`
select count(*) from submissions
where ${equals({ formId, draft })} and "deletedAt" is null and ${odataFilter(options.filter, odataToColumnMap)}`);
const countByFormId = (formId, draft, options = QueryOptions.none) => ({ one }) => one(sql`
SELECT * FROM
( SELECT COUNT(*) count FROM submissions
WHERE ${equals({ formId, draft })} AND "deletedAt" IS NULL AND ${odataFilter(options.filter, odataToColumnMap)}) AS "all"
CROSS JOIN
( SELECT COUNT(*) remaining FROM submissions
${options.skiptoken ? sql`
INNER JOIN
( SELECT id, "createdAt" FROM submissions WHERE "instanceId" = ${options.skiptoken.instanceId}) AS cursor
ON submissions."createdAt" <= cursor."createdAt" AND submissions.id < cursor.id
`: sql``}
WHERE ${equals({ formId, draft })}
AND "deletedAt" IS NULL
AND ${odataFilter(options.filter, odataToColumnMap)}) AS skiptoken
`);

const verifyVersion = (formId, rootId, instanceId, draft) => ({ maybeOne }) => maybeOne(sql`
select true from submissions
Expand Down Expand Up @@ -337,6 +349,11 @@ inner join
(select "submissionId", (count(id) - 1) as count from submission_defs
group by "submissionId") as edits
on edits."submissionId"=submission_defs."submissionId"
${options.skiptoken && !options.skiptoken.repeatId ? sql` -- in case of subtable we are fetching all Submissions without pagination
inner join
(select id, "createdAt" from submissions where "instanceId" = ${options.skiptoken.instanceId}) as cursor
on submissions."createdAt" <= cursor."createdAt" and submissions.id < cursor.id
`: sql``}
where
${encrypted ? sql`(form_defs."encKeyId" is null or form_defs."encKeyId" in (${sql.join(keyIds, sql`,`)})) and` : sql``}
${odataFilter(options.filter, options.isSubmissionsTable ? odataToColumnMap : odataSubTableToColumnMap)} and
Expand Down
4 changes: 2 additions & 2 deletions lib/resources/datasets.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ module.exports = (service, endpoint) => {
.then((project) => auth.canOrReject('dataset.list', project))
.then(() => Datasets.getList(params.id, queryOptions))));

service.get('/projects/:projectId/datasets/:name', endpoint(({ Datasets }, { params, auth }) =>
Datasets.get(params.projectId, params.name)
service.get('/projects/:projectId/datasets/:name', endpoint(({ Datasets }, { params, auth, queryOptions }) =>
Datasets.get(params.projectId, params.name, true, queryOptions.extended)
.then(getOrNotFound)
.then((dataset) => auth.canOrReject('dataset.read', dataset)
.then(() => Datasets.getMetadata(dataset)))));
Expand Down
4 changes: 2 additions & 2 deletions lib/resources/odata-entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ module.exports = (service, endpoint) => {
const dataset = await Datasets.get(params.projectId, params.name, true).then(getOrNotFound);
const properties = await Datasets.getProperties(dataset.id);
const options = QueryOptions.fromODataRequestEntities(query);
const count = await Entities.countByDatasetId(dataset.id, options);
const { count, remaining } = await Entities.countByDatasetId(dataset.id, options);
const entities = await Entities.streamForExport(dataset.id, options);
return json(streamEntityOdata(entities, properties, env.domain, originalUrl, query, count));
return json(streamEntityOdata(entities, properties, env.domain, originalUrl, query, count, remaining));
}));


Expand Down
6 changes: 3 additions & 3 deletions lib/resources/odata.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ module.exports = (service, endpoint) => {
Forms.getFields(form.def.id).then(selectFields(query, params.table)),
Submissions.streamForExport(form.id, draft, undefined, options),
((params.table === 'Submissions') && options.hasPaging())
? Submissions.countByFormId(form.id, draft, options) : resolve(null)
? Submissions.countByFormId(form.id, draft, options) : resolve({})
])
.then(([fields, stream, count]) =>
json(rowStreamToOData(fields, params.table, env.domain, originalUrl, query, stream, count)));
.then(([fields, stream, { count, remaining }]) =>
json(rowStreamToOData(fields, params.table, env.domain, originalUrl, query, stream, count, remaining)));
})));
};

Expand Down
Loading

0 comments on commit 211dd59

Please sign in to comment.