Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: 修复代理模式下请求头没有正确携带的问题 #719

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 65 additions & 56 deletions app/core/service/ProxyCacheService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,17 @@
async getPackageManifest(fullname: string, fileType: DIST_NAMES.FULL_MANIFESTS| DIST_NAMES.ABBREVIATED_MANIFESTS): Promise<AbbreviatedPackageManifestType|PackageManifestType> {
const cachedStoreKey = (await this.proxyCacheRepository.findProxyCache(fullname, fileType))?.filePath;
if (cachedStoreKey) {
const nfsBytes = await this.nfsAdapter.getBytes(cachedStoreKey);
const nfsString = Buffer.from(nfsBytes!).toString();
const nfsPkgManifgest = JSON.parse(nfsString);
return nfsPkgManifgest;
try {
const nfsBytes = await this.nfsAdapter.getBytes(cachedStoreKey);
const nfsString = Buffer.from(nfsBytes!).toString();
const nfsPkgManifgest = JSON.parse(nfsString);
return nfsPkgManifgest;
} catch (error) {
/* c8 ignore next 3 */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看逻辑如果 getBytes 抛出异常,会直接清空之前存储的信息?

用其他办法来做数据清理?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json信息解析不出来,说明存入的信息就有错误或者文件在nfs上已经删掉了。
每天凌晨三点会去刷新一下已经缓存的依赖,也算是一种清理吧,这里删除是防止依赖还没刷新但是读取已经异常了,清除了就会再次请求上游缓存一遍,不用等每天的刷新。

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这种有点危险,不能因为 nfs 异常就删除数据吧。

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要将这个逻辑修改一下,要明确知道具体错误才能删除数据

await this.nfsAdapter.remove(cachedStoreKey);
await this.proxyCacheRepository.removeProxyCache(fullname, fileType);
throw error;
}

Check warning on line 74 in app/core/service/ProxyCacheService.ts

View check run for this annotation

Codecov / codecov/patch

app/core/service/ProxyCacheService.ts#L74

Added line #L74 was not covered by tests
}

const manifest = await this.getRewrittenManifest<typeof fileType>(fullname, fileType);
Expand All @@ -88,9 +95,16 @@
}
const cachedStoreKey = (await this.proxyCacheRepository.findProxyCache(fullname, fileType, version))?.filePath;
if (cachedStoreKey) {
const nfsBytes = await this.nfsAdapter.getBytes(cachedStoreKey);
const nfsString = Buffer.from(nfsBytes!).toString();
return JSON.parse(nfsString) as PackageJSONType | AbbreviatedPackageJSONType;
try {
const nfsBytes = await this.nfsAdapter.getBytes(cachedStoreKey);
const nfsString = Buffer.from(nfsBytes!).toString();
return JSON.parse(nfsString) as PackageJSONType | AbbreviatedPackageJSONType;
} catch (error) {
/* c8 ignore next 3 */
await this.nfsAdapter.remove(cachedStoreKey);
await this.proxyCacheRepository.removeProxyCache(fullname, fileType);
throw error;
}

Check warning on line 107 in app/core/service/ProxyCacheService.ts

View check run for this annotation

Codecov / codecov/patch

app/core/service/ProxyCacheService.ts#L107

Added line #L107 was not covered by tests
Comment on lines +98 to +107
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider extracting common error handling logic.

This error handling block is duplicated from getPackageManifest. Consider extracting it into a private method.

+ private async handleNFSError(cachedStoreKey: string, fullname: string, fileType: DIST_NAMES) {
+   this.logger.error('[ProxyCacheService] Failed to read manifest from NFS: %s', error);
+   await this.nfsAdapter.remove(cachedStoreKey);
+   await this.proxyCacheRepository.removeProxyCache(fullname, fileType);
+   throw error;
+ }

Committable suggestion was skipped due to low confidence.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 107-107: app/core/service/ProxyCacheService.ts#L107
Added line #L107 was not covered by tests

}
const manifest = await this.getRewrittenManifest(fullname, fileType, versionOrTag);
this.backgroundTaskHelper.run(async () => {
Expand All @@ -109,6 +123,27 @@
await this.proxyCacheRepository.removeProxyCache(fullname, fileType, version);
}

replaceTarballUrl<T extends DIST_NAMES>(manifest: GetSourceManifestAndCacheReturnType<T>, fileType: T) {
const { sourceRegistry, registry } = this.config.cnpmcore;
if (isPkgManifest(fileType)) {
// pkg manifest
const versionMap = (manifest as AbbreviatedPackageManifestType|PackageManifestType)?.versions;
for (const key in versionMap) {
const versionItem = versionMap[key];
if (versionItem?.dist?.tarball) {
versionItem.dist.tarball = versionItem.dist.tarball.replace(sourceRegistry, registry);
}
}
} else {
// pkg version manifest
const distItem = (manifest as AbbreviatedPackageJSONType | PackageJSONType).dist;
if (distItem?.tarball) {
distItem.tarball = distItem.tarball.replace(sourceRegistry, registry);
}
}
return manifest;
}

async createTask(targetName: string, options: UpdateProxyCacheTaskOptions): Promise<CreateUpdateProxyCacheTask> {
return await this.taskService.createTask(Task.createUpdateProxyCache(targetName, options), false) as CreateUpdateProxyCacheTask;
}
Expand Down Expand Up @@ -151,44 +186,37 @@
await this.taskService.finishTask(task, TaskState.Success, logs.join('\n'));
}

async getRewrittenManifest<T extends DIST_NAMES>(fullname:string, fileType: T, versionOrTag?:string): Promise<GetSourceManifestAndCacheReturnType<T>> {
// only used by schedule task
private async getRewrittenManifest<T extends DIST_NAMES>(fullname:string, fileType: T, versionOrTag?:string): Promise<GetSourceManifestAndCacheReturnType<T>> {
let responseResult;
const USER_AGENT = 'npm_service.cnpmjs.org/cnpmcore';
switch (fileType) {
case DIST_NAMES.FULL_MANIFESTS:
responseResult = await this.getUpstreamFullManifests(fullname);
case DIST_NAMES.FULL_MANIFESTS: {
const url = `/${encodeURIComponent(fullname)}?t=${Date.now()}&cache=0`;
responseResult = await this.getProxyResponse({ url, headers: { accept: 'application/json', 'user-agent': USER_AGENT } }, { dataType: 'json' });
break;
case DIST_NAMES.ABBREVIATED_MANIFESTS:
responseResult = await this.getUpstreamAbbreviatedManifests(fullname);
}
case DIST_NAMES.ABBREVIATED_MANIFESTS: {
const url = `/${encodeURIComponent(fullname)}?t=${Date.now()}&cache=0`;
responseResult = await this.getProxyResponse({ url, headers: { accept: ABBREVIATED_META_TYPE, 'user-agent': USER_AGENT } }, { dataType: 'json' });
break;
case DIST_NAMES.MANIFEST:
responseResult = await this.getUpstreamPackageVersionManifest(fullname, versionOrTag!);
}
case DIST_NAMES.MANIFEST: {
const url = `/${encodeURIComponent(fullname)}/${encodeURIComponent(versionOrTag!)}`;
responseResult = await this.getProxyResponse({ url, headers: { accept: 'application/json', 'user-agent': USER_AGENT } }, { dataType: 'json' });
break;
case DIST_NAMES.ABBREVIATED:
responseResult = await this.getUpstreamAbbreviatedPackageVersionManifest(fullname, versionOrTag!);
}
case DIST_NAMES.ABBREVIATED: {
const url = `/${encodeURIComponent(fullname)}/${encodeURIComponent(versionOrTag!)}`;
responseResult = await this.getProxyResponse({ url, headers: { accept: ABBREVIATED_META_TYPE, 'user-agent': USER_AGENT } }, { dataType: 'json' });
break;
}
default:
break;
}

// replace tarball url
const manifest = responseResult.data;
const { sourceRegistry, registry } = this.config.cnpmcore;
if (isPkgManifest(fileType)) {
// pkg manifest
const versionMap = manifest.versions || {};
for (const key in versionMap) {
const versionItem = versionMap[key];
if (versionItem?.dist?.tarball) {
versionItem.dist.tarball = versionItem.dist.tarball.replace(sourceRegistry, registry);
}
}
} else {
// pkg version manifest
const distItem = manifest.dist || {};
if (distItem.tarball) {
distItem.tarball = distItem.tarball.replace(sourceRegistry, registry);
}
}
const manifest = this.replaceTarballUrl(responseResult.data, fileType);
return manifest;
Comment on lines +189 to 220
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider extracting common URL pattern.

The URL construction is similar across cases. Consider extracting the pattern.

+ private buildProxyUrl(fullname: string, versionOrTag?: string): string {
+   const base = `/${encodeURIComponent(fullname)}`;
+   return versionOrTag 
+     ? `${base}/${encodeURIComponent(versionOrTag)}`
+     : `${base}?t=${Date.now()}&cache=0`;
+ }

Committable suggestion was skipped due to low confidence.

}

Expand All @@ -204,7 +232,7 @@
await this.nfsAdapter.uploadBytes(storeKey, nfsBytes);
}

private async getProxyResponse(ctx: Partial<EggContext>, options?: HttpClientRequestOptions): Promise<HttpClientResponse> {
async getProxyResponse(ctx: Partial<EggContext>, options?: HttpClientRequestOptions): Promise<HttpClientResponse> {
const registry = this.npmRegistry.registry;
const remoteAuthToken = await this.registryManagerService.getAuthTokenByRegistryHost(registry);
const authorization = this.npmRegistry.genAuthorizationHeader(remoteAuthToken);
Expand All @@ -221,8 +249,8 @@
compressed: true,
...options,
headers: {
accept: ctx.header?.accept,
'user-agent': ctx.header?.['user-agent'],
accept: ctx.headers?.accept,
'user-agent': ctx.headers?.['user-agent'],
authorization,
'x-forwarded-for': ctx?.ip,
via: `1.1, ${this.config.cnpmcore.registry}`,
Expand All @@ -231,23 +259,4 @@
this.logger.info('[ProxyCacheService:getProxyStreamResponse] %s, status: %s', url, res.status);
return res;
}

private async getUpstreamFullManifests(fullname: string): Promise<HttpClientResponse> {
const url = `/${encodeURIComponent(fullname)}?t=${Date.now()}&cache=0`;
return await this.getProxyResponse({ url }, { dataType: 'json' });
}

private async getUpstreamAbbreviatedManifests(fullname: string): Promise<HttpClientResponse> {
const url = `/${encodeURIComponent(fullname)}?t=${Date.now()}&cache=0`;
return await this.getProxyResponse({ url, headers: { accept: ABBREVIATED_META_TYPE } }, { dataType: 'json' });
}
private async getUpstreamPackageVersionManifest(fullname: string, versionOrTag: string): Promise<HttpClientResponse> {
const url = `/${encodeURIComponent(fullname)}/${encodeURIComponent(versionOrTag)}`;
return await this.getProxyResponse({ url }, { dataType: 'json' });
}
private async getUpstreamAbbreviatedPackageVersionManifest(fullname: string, versionOrTag: string): Promise<HttpClientResponse> {
const url = `/${encodeURIComponent(fullname)}/${encodeURIComponent(versionOrTag)}`;
return await this.getProxyResponse({ url, headers: { accept: ABBREVIATED_META_TYPE } }, { dataType: 'json' });
}

}
39 changes: 13 additions & 26 deletions app/port/controller/ProxyCacheController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
Context,
EggContext,
} from '@eggjs/tegg';
import { ForbiddenError, NotFoundError, UnauthorizedError } from 'egg-errors';
import { ForbiddenError, NotFoundError, UnauthorizedError, NotImplementedError } from 'egg-errors';
import { AbstractController } from './AbstractController';
import { ProxyCacheRepository } from '../../repository/ProxyCacheRepository';
import { Static } from 'egg-typebox-validate/typebox';
Expand All @@ -18,8 +18,8 @@ import {
ProxyCacheService,
isPkgManifest,
} from '../../core/service/ProxyCacheService';
import { SyncMode, PROXY_CACHE_DIR_NAME } from '../../common/constants';
import { NFSAdapter } from '../../common/adapter/NFSAdapter';
import { SyncMode } from '../../common/constants';
import { CacheService } from '../../core/service/CacheService';

@HTTPController()
export class ProxyCacheController extends AbstractController {
Expand All @@ -28,7 +28,7 @@ export class ProxyCacheController extends AbstractController {
@Inject()
private readonly proxyCacheService: ProxyCacheService;
@Inject()
private readonly nfsAdapter: NFSAdapter;
private readonly cacheService: CacheService;

@HTTPMethod({
method: HTTPMethodEnum.GET,
Expand Down Expand Up @@ -77,11 +77,12 @@ export class ProxyCacheController extends AbstractController {
if (refreshList.length === 0) {
throw new NotFoundError();
}
await this.cacheService.removeCache(fullname);
const taskList = refreshList
// 仅manifests需要更新,指定版本的package.json文件发布后不会改变
// only refresh package.json and abbreviated.json
.filter(i => isPkgManifest(i.fileType))
.map(async item => {
const task = await this.proxyCacheService.createTask(
.map(item => {
const task = this.proxyCacheService.createTask(
`${item.fullname}/${item.fileType}`,
{
fullname: item.fullname,
Expand All @@ -90,22 +91,18 @@ export class ProxyCacheController extends AbstractController {
);
return task;
});
const tasks = await Promise.all(taskList);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

加一个 pMap 来限制一下并发量?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个接口会查询这个依赖所有缓存的json文件,但是只有manifest 和 abbrivated_manifest会更新,因为具体版本的包发布之后就不会再更改(正常情况下),所以创建的任务数量最多也只有2个。不需要特别控制并发

return {
ok: true,
tasks: await Promise.all(taskList),
tasks,
};
}

@HTTPMethod({
method: HTTPMethodEnum.DELETE,
path: `/-/proxy-cache/:fullname(${FULLNAME_REG_STRING})`,
})
async removeProxyCaches(@Context() ctx: EggContext, @HTTPParam() fullname: string) {
const isAdmin = await this.userRoleManager.isAdmin(ctx);
if (!isAdmin) {
throw new UnauthorizedError('only admin can do this');
}

async removeProxyCaches(@HTTPParam() fullname: string) {
if (this.config.cnpmcore.syncMode !== SyncMode.proxy) {
throw new ForbiddenError('proxy mode is not enabled');
}
Expand All @@ -116,6 +113,7 @@ export class ProxyCacheController extends AbstractController {
if (proxyCachesList.length === 0) {
throw new NotFoundError();
}
await this.cacheService.removeCache(fullname);
const removingList = proxyCachesList.map(item => {
return this.proxyCacheService.removeProxyCache(item.fullname, item.fileType, item.version);
});
Expand All @@ -140,17 +138,6 @@ export class ProxyCacheController extends AbstractController {
throw new ForbiddenError('proxy mode is not enabled');
}

await this.proxyCacheRepository.truncateProxyCache();
// 尝试删除proxy cache目录,若失败可手动管理
ctx.runInBackground(async () => {
try {
await this.nfsAdapter.remove(`/${PROXY_CACHE_DIR_NAME}`);
} catch (err) {
this.logger.error('[ProxyCacheService.truncateProxyCaches] remove proxy cache dir error: %s', err);
}
});
return {
ok: true,
};
throw new NotImplementedError('not implemented yet');
}
}
3 changes: 2 additions & 1 deletion app/port/controller/package/ShowPackageController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ export class ShowPackageController extends AbstractController {
if (this.config.cnpmcore.syncMode === SyncMode.proxy) {
// proxy mode
const fileType = isFullManifests ? DIST_NAMES.FULL_MANIFESTS : DIST_NAMES.ABBREVIATED_MANIFESTS;
const pkgManifest = await this.proxyCacheService.getPackageManifest(fullname, fileType);
const { data: sourceManifest } = await this.proxyCacheService.getProxyResponse(ctx, { dataType: 'json' });
const pkgManifest = this.proxyCacheService.replaceTarballUrl(sourceManifest, fileType);

const nfsBytes = Buffer.from(JSON.stringify(pkgManifest));
const { shasum: etag } = await calculateIntegrity(nfsBytes);
Expand Down
88 changes: 1 addition & 87 deletions test/core/service/ProxyCacheService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,92 +127,6 @@ describe('test/core/service/ProxyCacheService/index.test.ts', () => {
});
});

describe('getRewrittenManifest()', () => {
it('should get full package manifest', async () => {
const data = await TestUtil.readJSONFile(
TestUtil.getFixtures('registry.npmjs.org/foobar.json'),
);
mock(proxyCacheService, 'getUpstreamFullManifests', async () => {
return {
status: 200,
data,
};
});
const manifest = await proxyCacheService.getRewrittenManifest(
'foobar',
DIST_NAMES.FULL_MANIFESTS,
);
const versionArr = Object.values(manifest.versions);
for (const i of versionArr) {
assert(i.dist.tarball.includes('http://localhost:7001'));
}
});

it('should get abbreviated package manifest', async () => {
const data = await TestUtil.readJSONFile(
TestUtil.getFixtures('registry.npmjs.org/abbreviated_foobar.json'),
);
mock(proxyCacheService, 'getUpstreamAbbreviatedManifests', async () => {
return {
status: 200,
data,
};
});
const manifest = await proxyCacheService.getRewrittenManifest(
'foobar',
DIST_NAMES.ABBREVIATED_MANIFESTS,
);
const versionArr = Object.values(manifest.versions);
for (const i of versionArr) {
assert(i.dist.tarball.includes('http://localhost:7001'));
}
});

it('should get full package version manifest', async () => {
const data = await TestUtil.readJSONFile(
TestUtil.getFixtures('registry.npmjs.org/foobar/1.0.0/package.json'),
);
mock(proxyCacheService, 'getUpstreamPackageVersionManifest', async () => {
return {
status: 200,
data,
};
});
const manifest = await proxyCacheService.getRewrittenManifest(
'foobar',
DIST_NAMES.MANIFEST,
'1.0.0',
);
assert(manifest.dist);
assert(manifest.dist.tarball.includes('http://localhost:7001'));
});

it('should get abbreviated package version manifest', async () => {
const data = await TestUtil.readJSONFile(
TestUtil.getFixtures(
'registry.npmjs.org/foobar/1.0.0/abbreviated.json',
),
);
mock(
proxyCacheService,
'getUpstreamAbbreviatedPackageVersionManifest',
async () => {
return {
status: 200,
data,
};
},
);
const manifest = await proxyCacheService.getRewrittenManifest(
'foobar',
DIST_NAMES.ABBREVIATED,
'1.0.0',
);
assert(manifest.dist);
assert(manifest.dist.tarball.includes('http://localhost:7001'));
});
});

describe('removeProxyCache()', () => {
it('should remove cache', async () => {
await proxyCacheRepository.saveProxyCache(
Expand Down Expand Up @@ -300,7 +214,7 @@ describe('test/core/service/ProxyCacheService/index.test.ts', () => {
fileType: DIST_NAMES.FULL_MANIFESTS,
},
);
mock(proxyCacheService, 'getUpstreamFullManifests', async () => {
mock(proxyCacheService, 'getPackageVersionManifest', async () => {
return {
status: 200,
data: await TestUtil.readJSONFile(
Expand Down
Loading
Loading