Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Race top n nodes when making requests #25

Merged
merged 40 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
804978c
feat: implement a nodes list for clients
AmeanAsad Oct 11, 2023
f4d1844
feat: implement a storage interface using indexedDb
AmeanAsad Oct 11, 2023
894d16d
feat: implement a test suite for fallback
AmeanAsad Oct 11, 2023
7009c7a
fix: remove unused code
AmeanAsad Oct 11, 2023
3336a87
fix: eslint an jsdoc
AmeanAsad Oct 11, 2023
696cd8a
fix: formatting and consistency
AmeanAsad Oct 11, 2023
a3dec56
fix: indexDbCheck
AmeanAsad Oct 11, 2023
747d780
Merge branch 'main' into feat/fallback
AmeanAsad Oct 12, 2023
8dec1c7
chore: change storage implementation
AmeanAsad Oct 12, 2023
b0dc673
enhancement: simplify node loading
AmeanAsad Oct 12, 2023
78651fc
naive fallback implementation
AmeanAsad Oct 12, 2023
d15c0ad
modify fallback
AmeanAsad Oct 12, 2023
29e8da1
fix formatting and typos
AmeanAsad Oct 12, 2023
194ce70
typos
AmeanAsad Oct 12, 2023
1d97dc9
Update .eslintrc
AmeanAsad Oct 12, 2023
84a1b8e
enhancement: edit storage impl
AmeanAsad Oct 12, 2023
f688406
enhancement: deal with overlapping byte chunks
AmeanAsad Oct 12, 2023
5b7c215
feat: add fallback test suite
AmeanAsad Oct 14, 2023
6068a90
fix: tests running
AmeanAsad Oct 15, 2023
2c3cc79
cleanup content fetch with fallback
AmeanAsad Oct 16, 2023
4cf0c00
add initial origin fetch to fallback
AmeanAsad Oct 16, 2023
d81038f
formatting and file re-org
AmeanAsad Oct 16, 2023
a1ecd50
feat: merge main into fallback branch (#22)
AmeanAsad Oct 16, 2023
55d56d1
Merge branch 'main' into feat/fallback
AmeanAsad Oct 16, 2023
d2c5c37
load nodes on first success
AmeanAsad Oct 16, 2023
e0065d8
add fallback limit
AmeanAsad Oct 16, 2023
109019b
fix: fallback bug
AmeanAsad Oct 17, 2023
b84ce4a
put eslint settings in package.json
AmeanAsad Oct 17, 2023
7731fe8
add nodesListKey as static
AmeanAsad Oct 17, 2023
e0fcfd8
fix: resolve process in browser
AmeanAsad Oct 18, 2023
ac1b9f8
Merge branch 'main' into feat/fallback
AmeanAsad Oct 19, 2023
489842a
feat: add fetching with a race
AmeanAsad Oct 23, 2023
9de35fc
enhancement: add backward compatibility for racing
AmeanAsad Oct 23, 2023
c06a2b6
tests and cleanup
AmeanAsad Oct 24, 2023
ceda4c2
Merge branch 'main' into feat/race-nodes
AmeanAsad Oct 24, 2023
7249aea
fixes and enhancements
AmeanAsad Oct 25, 2023
57697d2
add typings
AmeanAsad Oct 25, 2023
c0e3e93
Merge branch 'main' into feat/race-nodes
AmeanAsad Oct 26, 2023
7d0b34e
add typings
AmeanAsad Oct 26, 2023
4ef930c
Merge branch 'main' into feat/race-nodes
AmeanAsad Oct 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 117 additions & 6 deletions src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { isBrowserContext } from './utils/runtime.js'

export class Saturn {
static nodesListKey = 'saturn-nodes'
static defaultRaceCount = 3
/**
*
* @param {object} [opts={}]
Expand Down Expand Up @@ -51,6 +52,101 @@ export class Saturn {
this.loadNodesPromise = this._loadNodes(this.opts)
}

/**
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @returns {Promise<object>}
*/
async fetchCIDWithRace (cidPath, opts = {}) {
const [cid] = (cidPath ?? '').split('/')
CID.parse(cid)

const jwt = await getJWT(this.opts, this.storage)

const options = Object.assign({}, this.opts, { format: 'car', jwt }, opts)

if (!isBrowserContext) {
options.headers = {
...(options.headers || {}),
Authorization: 'Bearer ' + options.jwt
}
}

const origins = options.origins
const controllers = []

const createFetchPromise = async (origin) => {
options.url = origin
Copy link
Collaborator

@guanzo guanzo Oct 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better for each function call to make a copy of the options object instead of sharing it and mutating fields.

Suggested change
options.url = origin
const fetchOptions = { ...options, url: origin } // need to rename the rest of the references

This is only a shallow copy but should suffice.

const url = this.createRequestURL(cidPath, options)

const controller = new AbortController()
controllers.push(controller)
const connectTimeout = setTimeout(() => {
controller.abort()
}, options.connectTimeout)

try {
res = await fetch(parseUrl(url), { signal: controller.signal, ...options })
clearTimeout(connectTimeout)
return { res, url, controller }
} catch (err) {
throw new Error(
`Non OK response received: ${res.status} ${res.statusText}`
)
}
}

const abortRemainingFetches = async (res, controllers) => {
return controllers.forEach((controller) => {
if (res.controller !== controller) {
AmeanAsad marked this conversation as resolved.
Show resolved Hide resolved
controller.abort('Request race unsuccessful')
}
})
}

const fetchPromises = Promise.any(origins.map((origin) => createFetchPromise(origin)))

const log = {
startTime: new Date()
}

let res, url, controller
try {
({ res, url, controller } = await fetchPromises)

abortRemainingFetches(res, controllers)

const { headers } = res
log.url = url
log.ttfbMs = new Date() - log.startTime
guanzo marked this conversation as resolved.
Show resolved Hide resolved
log.httpStatusCode = res.status
log.cacheHit = headers.get('saturn-cache-status') === 'HIT'
log.nodeId = headers.get('saturn-node-id')
log.requestId = headers.get('saturn-transfer-id')
log.httpProtocol = headers.get('quic-status')
AmeanAsad marked this conversation as resolved.
Show resolved Hide resolved

if (!res.ok) {
throw new Error(
`Non OK response received: ${res.status} ${res.statusText}`
)
}
} catch (err) {
if (!res) {
log.error = err.message
}
// Report now if error, otherwise report after download is done.
this._finalizeLog(log)

throw err
}

return { res, controller, log }
}

/**
*
* @param {string} cidPath
Expand All @@ -68,7 +164,6 @@ export class Saturn {

const options = Object.assign({}, this.opts, { format: 'car', jwt }, opts)
const url = this.createRequestURL(cidPath, options)

const log = {
url,
startTime: new Date()
Expand Down Expand Up @@ -122,6 +217,7 @@ export class Saturn {
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]
* @param {boolean} [opts.raceNodes]
* @param {string} [opts.url]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
Expand Down Expand Up @@ -166,11 +262,18 @@ export class Saturn {
}

let fallbackCount = 0
for (const origin of this.nodes) {
const nodes = this.nodes
for (let i = 0; i < nodes.length; i++) {
if (fallbackCount > this.opts.fallbackLimit) {
return
}
opts.url = origin.url
if (opts.raceNodes) {
const origins = nodes.slice(i, i + Saturn.defaultRaceCount).map((node) => node.url)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if we have nodes a,b,c,d,e, then the race groups are:

  • a,b,c
  • 1st fallback: b,c,d
  • 2nd fallback: c,d,e

Is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. Will also note that this is not necessarily the best way to do it and has some obvious room for improvement. For example:

  • If node c fails, it we might hit it twice again before another good fallback is contacted.

I think as a next step, the best way to do this is to remove nodes that fail and such that:

  • initial group is a, b, c. Assume node c fails
  • 1st fallback is a, b, d and so on

I think this can be better achieved with a hashring implementation but we can re-evaluate.

opts.origins = origins
} else {
opts.url = nodes[i].url
}

try {
yield * fetchContent()
return
Expand All @@ -189,13 +292,20 @@ export class Saturn {
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]
* @param {('car'|'raw')} [opts.format]- -
* @param {boolean} [opts.raceNodes]- -
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @returns {Promise<AsyncIterable<Uint8Array>>}
*/
async * fetchContent (cidPath, opts = {}) {
const { res, controller, log } = await this.fetchCID(cidPath, opts)
let res, controller, log

if (opts.raceNodes) {
({ res, controller, log } = await this.fetchCIDWithRace(cidPath, opts))
} else {
({ res, controller, log } = await this.fetchCID(cidPath, opts))
}

async function * metricsIterable (itr) {
log.numBytesSent = 0
Expand Down Expand Up @@ -224,6 +334,7 @@ export class Saturn {
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]
* @param {boolean} [opts.raceNodes]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @returns {Promise<Uint8Array>}
Expand All @@ -239,7 +350,7 @@ export class Saturn {
* @returns {URL}
*/
createRequestURL (cidPath, opts) {
let origin = opts.url || opts.cdnURL
let origin = opts.url || (opts.origins && opts.origins[0]) || opts.cdnURL
origin = addHttpPrefix(origin)
const url = new URL(`${origin}/ipfs/${cidPath}`)

Expand Down
69 changes: 68 additions & 1 deletion test/fallback.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,74 @@ describe('Client Fallback', () => {

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test' })

const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4', { url: 'node1.saturn.ms' })
const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4')

const buffer = await concatChunks(cid)
const actualContent = String.fromCharCode(...buffer)
const expectedContent = 'hello world\n'

assert.strictEqual(actualContent, expectedContent)
server.close()
mock.reset()
})

test('Content Fallback fetches a cid properly with race', async (t) => {
const handlers = [
mockOrchHandler(5, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockJWT(TEST_AUTH),
mockSaturnOriginHandler(TEST_ORIGIN_DOMAIN, 0, true),
...mockNodesHandlers(5, TEST_ORIGIN_DOMAIN)
]
const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)

const expectedNodes = generateNodes(3, TEST_ORIGIN_DOMAIN)

// Mocking storage object
const mockStorage = {
get: async (key) => expectedNodes,
set: async (key, value) => { return null }
}
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test' })
// const origins =

const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4', { raceNodes: true })

const buffer = await concatChunks(cid)
const actualContent = String.fromCharCode(...buffer)
const expectedContent = 'hello world\n'

assert.strictEqual(actualContent, expectedContent)
server.close()
mock.reset()
})

test('Content Fallback with race fetches from consecutive nodes on failure', async (t) => {
const handlers = [
mockOrchHandler(5, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockJWT(TEST_AUTH),
mockSaturnOriginHandler(TEST_ORIGIN_DOMAIN, 0, true),
...mockNodesHandlers(5, TEST_ORIGIN_DOMAIN, 2)
]
const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)

const expectedNodes = generateNodes(5, TEST_ORIGIN_DOMAIN)

// Mocking storage object
const mockStorage = {
get: async (key) => expectedNodes,
set: async (key, value) => { return null }
}
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test' })

const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4', { raceNodes: true })

const buffer = await concatChunks(cid)
const actualContent = String.fromCharCode(...buffer)
Expand Down
14 changes: 12 additions & 2 deletions test/test-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import fs from 'fs'
import { addHttpPrefix } from '../src/utils/url.js'

const HTTP_STATUS_OK = 200
const HTTP_STATUS_TIMEOUT = 504

const __dirname = dirname(fileURLToPath(import.meta.url))
process.env.TESTING = 'true'
Expand Down Expand Up @@ -123,14 +124,23 @@ export function mockJWT (authURL) {
*
* @param {number} count - amount of nodes to mock
* @param {string} originDomain - saturn origin domain.
* @param {number} failures
* @returns {RestHandler<any>[]}
*/
export function mockNodesHandlers (count, originDomain) {
export function mockNodesHandlers (count, originDomain, failures = 0) {
if (failures > count) {
throw Error('failures number cannot exceed node count')
}
const nodes = generateNodes(count, originDomain)

const handlers = nodes.map((node) => {
const handlers = nodes.map((node, idx) => {
const url = `${node.url}/ipfs/:cid`
return rest.get(url, (req, res, ctx) => {
if (idx < failures) {
return res(
ctx.status(HTTP_STATUS_TIMEOUT)
)
}
const filepath = getFixturePath('hello.car')
const fileContents = fs.readFileSync(filepath)
return res(
Expand Down
Loading