Fix uploadChunk and add generic retry method

This commit is contained in:
Dave Hadka 2020-05-08 11:37:53 -04:00
parent ce9276c90e
commit aced43a650
4 changed files with 338 additions and 74 deletions

View file

@ -1,4 +1,4 @@
import { getCacheVersion } from "../src/cacheHttpClient"; import { getCacheVersion, retry } from "../src/cacheHttpClient";
import { CompressionMethod, Inputs } from "../src/constants"; import { CompressionMethod, Inputs } from "../src/constants";
import * as testUtils from "../src/utils/testUtils"; import * as testUtils from "../src/utils/testUtils";
@ -37,3 +37,131 @@ test("getCacheVersion with gzip compression does not change vesion", async () =>
test("getCacheVersion with no input throws", async () => { test("getCacheVersion with no input throws", async () => {
expect(() => getCacheVersion()).toThrow(); expect(() => getCacheVersion()).toThrow();
}); });
interface TestResponse {
statusCode: number;
result: string | null;
}
function handleResponse(
response: TestResponse | undefined
): Promise<TestResponse> {
if (!response) {
fail("Retry method called too many times");
}
if (response.statusCode === 999) {
throw Error("Test Error");
} else {
return Promise.resolve(response);
}
}
async function testRetryExpectingResult(
responses: Array<TestResponse>,
expectedResult: string
): Promise<void> {
responses = responses.reverse(); // Reverse responses since we pop from end
const actualResult = await retry(
"test",
() => handleResponse(responses.pop()),
(response: TestResponse) => response.statusCode,
(response: TestResponse) => response.result,
(statusCode: number) => statusCode === 200,
(statusCode: number) => statusCode === 503
);
expect(actualResult).toEqual(expectedResult);
}
async function testRetryExpectingError(
responses: Array<TestResponse>
): Promise<void> {
responses = responses.reverse(); // Reverse responses since we pop from end
expect(
retry(
"test",
() => handleResponse(responses.pop()),
(response: TestResponse) => response.statusCode,
(response: TestResponse) => response.result,
(statusCode: number) => statusCode === 200,
(statusCode: number) => statusCode === 503
)
).rejects.toBeInstanceOf(Error);
}
test("retry works on successful response", async () => {
await testRetryExpectingResult(
[
{
statusCode: 200,
result: "Ok"
}
],
"Ok"
);
});
test("retry works after retryable status code", async () => {
await testRetryExpectingResult(
[
{
statusCode: 503,
result: null
},
{
statusCode: 200,
result: "Ok"
}
],
"Ok"
);
});
test("retry fails after exhausting retries", async () => {
await testRetryExpectingError([
{
statusCode: 503,
result: null
},
{
statusCode: 503,
result: null
},
{
statusCode: 200,
result: "Ok"
}
]);
});
test("retry fails after non-retryable status code", async () => {
await testRetryExpectingError([
{
statusCode: 500,
result: null
},
{
statusCode: 200,
result: "Ok"
}
]);
});
test("retry works after error", async () => {
await testRetryExpectingResult(
[
{
statusCode: 999,
result: null
},
{
statusCode: 200,
result: "Ok"
}
],
"Ok"
);
});

79
dist/restore/index.js vendored
View file

@ -2246,19 +2246,60 @@ function getCacheVersion(compressionMethod) {
.digest("hex"); .digest("hex");
} }
exports.getCacheVersion = getCacheVersion; exports.getCacheVersion = getCacheVersion;
function retry(name, method, getStatusCode, getReturnValue, isSuccessStatusCode, isRetryableStatusCode, maxAttempts = 2) {
return __awaiter(this, void 0, void 0, function* () {
let response = undefined;
let statusCode = undefined;
let isRetryable = false;
let errorMessage = "";
let attempt = 1;
while (attempt <= maxAttempts) {
try {
response = yield method();
statusCode = getStatusCode(response);
if (isSuccessStatusCode(statusCode)) {
return getReturnValue(response);
}
isRetryable = isRetryableStatusCode(statusCode);
errorMessage = `Cache service responded with ${statusCode}`;
}
catch (error) {
isRetryable = true;
errorMessage = error.message;
}
core.debug(`${name} - Attempt ${attempt} of ${maxAttempts} failed with error: ${errorMessage}`);
if (!isRetryable) {
core.debug(`${name} - Error is not retryable`);
break;
}
attempt++;
}
throw Error(`${name} failed: ${errorMessage}`);
});
}
exports.retry = retry;
function retryTypedResponse(name, method, maxAttempts = 2) {
return __awaiter(this, void 0, void 0, function* () {
return yield retry(name, method, (response) => response.statusCode, (response) => response, isSuccessStatusCode, isRetryableStatusCode, maxAttempts);
});
}
exports.retryTypedResponse = retryTypedResponse;
function retryHttpClientResponse(name, method, maxAttempts = 2) {
return __awaiter(this, void 0, void 0, function* () {
return yield retry(name, method, (response) => response.message.statusCode, (response) => response, isSuccessStatusCode, isRetryableStatusCode, maxAttempts);
});
}
exports.retryHttpClientResponse = retryHttpClientResponse;
function getCacheEntry(keys, options) { function getCacheEntry(keys, options) {
var _a, _b; var _a, _b;
return __awaiter(this, void 0, void 0, function* () { return __awaiter(this, void 0, void 0, function* () {
const httpClient = createHttpClient(); const httpClient = createHttpClient();
const version = getCacheVersion((_a = options) === null || _a === void 0 ? void 0 : _a.compressionMethod); const version = getCacheVersion((_a = options) === null || _a === void 0 ? void 0 : _a.compressionMethod);
const resource = `cache?keys=${encodeURIComponent(keys.join(","))}&version=${version}`; const resource = `cache?keys=${encodeURIComponent(keys.join(","))}&version=${version}`;
const response = yield httpClient.getJson(getCacheApiUrl(resource)); const response = yield retryTypedResponse("getCacheEntry", () => httpClient.getJson(getCacheApiUrl(resource)));
if (response.statusCode === 204) { if (response.statusCode === 204) {
return null; return null;
} }
if (!isSuccessStatusCode(response.statusCode)) {
throw new Error(`Cache service responded with ${response.statusCode}`);
}
const cacheResult = response.result; const cacheResult = response.result;
const cacheDownloadUrl = (_b = cacheResult) === null || _b === void 0 ? void 0 : _b.archiveLocation; const cacheDownloadUrl = (_b = cacheResult) === null || _b === void 0 ? void 0 : _b.archiveLocation;
if (!cacheDownloadUrl) { if (!cacheDownloadUrl) {
@ -2326,7 +2367,7 @@ function getContentRange(start, end) {
// Content-Range: bytes 0-199/* // Content-Range: bytes 0-199/*
return `bytes ${start}-${end}/*`; return `bytes ${start}-${end}/*`;
} }
function uploadChunk(httpClient, resourceUrl, data, start, end) { function uploadChunk(httpClient, resourceUrl, openStream, start, end) {
return __awaiter(this, void 0, void 0, function* () { return __awaiter(this, void 0, void 0, function* () {
core.debug(`Uploading chunk of size ${end - core.debug(`Uploading chunk of size ${end -
start + start +
@ -2336,20 +2377,9 @@ function uploadChunk(httpClient, resourceUrl, data, start, end) {
"Content-Range": getContentRange(start, end) "Content-Range": getContentRange(start, end)
}; };
const uploadChunkRequest = () => __awaiter(this, void 0, void 0, function* () { const uploadChunkRequest = () => __awaiter(this, void 0, void 0, function* () {
return yield httpClient.sendStream("PATCH", resourceUrl, data, additionalHeaders); return yield httpClient.sendStream("PATCH", resourceUrl, openStream(), additionalHeaders);
}); });
const response = yield uploadChunkRequest(); yield retryHttpClientResponse(`uploadChunk (start: ${start}, end: ${end})`, uploadChunkRequest);
if (isSuccessStatusCode(response.message.statusCode)) {
return;
}
if (isRetryableStatusCode(response.message.statusCode)) {
core.debug(`Received ${response.message.statusCode}, retrying chunk at offset ${start}.`);
const retryResponse = yield uploadChunkRequest();
if (isSuccessStatusCode(retryResponse.message.statusCode)) {
return;
}
}
throw new Error(`Cache service responded with ${response.message.statusCode} during chunk upload.`);
}); });
} }
function parseEnvNumber(key) { function parseEnvNumber(key) {
@ -2379,13 +2409,12 @@ function uploadFile(httpClient, cacheId, archivePath) {
const start = offset; const start = offset;
const end = offset + chunkSize - 1; const end = offset + chunkSize - 1;
offset += MAX_CHUNK_SIZE; offset += MAX_CHUNK_SIZE;
const chunk = fs.createReadStream(archivePath, { yield uploadChunk(httpClient, resourceUrl, () => fs.createReadStream(archivePath, {
fd, fd,
start, start,
end, end,
autoClose: false autoClose: false
}); }), start, end);
yield uploadChunk(httpClient, resourceUrl, chunk, start, end);
} }
}))); })));
} }
@ -3642,6 +3671,12 @@ class HttpClientResponse {
this.message.on('data', (chunk) => { this.message.on('data', (chunk) => {
output = Buffer.concat([output, chunk]); output = Buffer.concat([output, chunk]);
}); });
this.message.on('aborted', () => {
reject("Request was aborted or closed prematurely");
});
this.message.on('timeout', (socket) => {
reject("Request timed out");
});
this.message.on('end', () => { this.message.on('end', () => {
resolve(output.toString()); resolve(output.toString());
}); });
@ -3763,6 +3798,7 @@ class HttpClient {
let response; let response;
while (numTries < maxTries) { while (numTries < maxTries) {
response = await this.requestRaw(info, data); response = await this.requestRaw(info, data);
// Check if it's an authentication challenge // Check if it's an authentication challenge
if (response && response.message && response.message.statusCode === HttpCodes.Unauthorized) { if (response && response.message && response.message.statusCode === HttpCodes.Unauthorized) {
let authenticationHandler; let authenticationHandler;
@ -3874,6 +3910,7 @@ class HttpClient {
req.on('error', function (err) { req.on('error', function (err) {
// err has statusCode property // err has statusCode property
// res should have headers // res should have headers
console.log(`Caught error on request: ${err}`);
handleResult(err, null); handleResult(err, null);
}); });
if (data && typeof (data) === 'string') { if (data && typeof (data) === 'string') {

79
dist/save/index.js vendored
View file

@ -2246,19 +2246,60 @@ function getCacheVersion(compressionMethod) {
.digest("hex"); .digest("hex");
} }
exports.getCacheVersion = getCacheVersion; exports.getCacheVersion = getCacheVersion;
function retry(name, method, getStatusCode, getReturnValue, isSuccessStatusCode, isRetryableStatusCode, maxAttempts = 2) {
return __awaiter(this, void 0, void 0, function* () {
let response = undefined;
let statusCode = undefined;
let isRetryable = false;
let errorMessage = "";
let attempt = 1;
while (attempt <= maxAttempts) {
try {
response = yield method();
statusCode = getStatusCode(response);
if (isSuccessStatusCode(statusCode)) {
return getReturnValue(response);
}
isRetryable = isRetryableStatusCode(statusCode);
errorMessage = `Cache service responded with ${statusCode}`;
}
catch (error) {
isRetryable = true;
errorMessage = error.message;
}
core.debug(`${name} - Attempt ${attempt} of ${maxAttempts} failed with error: ${errorMessage}`);
if (!isRetryable) {
core.debug(`${name} - Error is not retryable`);
break;
}
attempt++;
}
throw Error(`${name} failed: ${errorMessage}`);
});
}
exports.retry = retry;
function retryTypedResponse(name, method, maxAttempts = 2) {
return __awaiter(this, void 0, void 0, function* () {
return yield retry(name, method, (response) => response.statusCode, (response) => response, isSuccessStatusCode, isRetryableStatusCode, maxAttempts);
});
}
exports.retryTypedResponse = retryTypedResponse;
function retryHttpClientResponse(name, method, maxAttempts = 2) {
return __awaiter(this, void 0, void 0, function* () {
return yield retry(name, method, (response) => response.message.statusCode, (response) => response, isSuccessStatusCode, isRetryableStatusCode, maxAttempts);
});
}
exports.retryHttpClientResponse = retryHttpClientResponse;
function getCacheEntry(keys, options) { function getCacheEntry(keys, options) {
var _a, _b; var _a, _b;
return __awaiter(this, void 0, void 0, function* () { return __awaiter(this, void 0, void 0, function* () {
const httpClient = createHttpClient(); const httpClient = createHttpClient();
const version = getCacheVersion((_a = options) === null || _a === void 0 ? void 0 : _a.compressionMethod); const version = getCacheVersion((_a = options) === null || _a === void 0 ? void 0 : _a.compressionMethod);
const resource = `cache?keys=${encodeURIComponent(keys.join(","))}&version=${version}`; const resource = `cache?keys=${encodeURIComponent(keys.join(","))}&version=${version}`;
const response = yield httpClient.getJson(getCacheApiUrl(resource)); const response = yield retryTypedResponse("getCacheEntry", () => httpClient.getJson(getCacheApiUrl(resource)));
if (response.statusCode === 204) { if (response.statusCode === 204) {
return null; return null;
} }
if (!isSuccessStatusCode(response.statusCode)) {
throw new Error(`Cache service responded with ${response.statusCode}`);
}
const cacheResult = response.result; const cacheResult = response.result;
const cacheDownloadUrl = (_b = cacheResult) === null || _b === void 0 ? void 0 : _b.archiveLocation; const cacheDownloadUrl = (_b = cacheResult) === null || _b === void 0 ? void 0 : _b.archiveLocation;
if (!cacheDownloadUrl) { if (!cacheDownloadUrl) {
@ -2326,7 +2367,7 @@ function getContentRange(start, end) {
// Content-Range: bytes 0-199/* // Content-Range: bytes 0-199/*
return `bytes ${start}-${end}/*`; return `bytes ${start}-${end}/*`;
} }
function uploadChunk(httpClient, resourceUrl, data, start, end) { function uploadChunk(httpClient, resourceUrl, openStream, start, end) {
return __awaiter(this, void 0, void 0, function* () { return __awaiter(this, void 0, void 0, function* () {
core.debug(`Uploading chunk of size ${end - core.debug(`Uploading chunk of size ${end -
start + start +
@ -2336,20 +2377,9 @@ function uploadChunk(httpClient, resourceUrl, data, start, end) {
"Content-Range": getContentRange(start, end) "Content-Range": getContentRange(start, end)
}; };
const uploadChunkRequest = () => __awaiter(this, void 0, void 0, function* () { const uploadChunkRequest = () => __awaiter(this, void 0, void 0, function* () {
return yield httpClient.sendStream("PATCH", resourceUrl, data, additionalHeaders); return yield httpClient.sendStream("PATCH", resourceUrl, openStream(), additionalHeaders);
}); });
const response = yield uploadChunkRequest(); yield retryHttpClientResponse(`uploadChunk (start: ${start}, end: ${end})`, uploadChunkRequest);
if (isSuccessStatusCode(response.message.statusCode)) {
return;
}
if (isRetryableStatusCode(response.message.statusCode)) {
core.debug(`Received ${response.message.statusCode}, retrying chunk at offset ${start}.`);
const retryResponse = yield uploadChunkRequest();
if (isSuccessStatusCode(retryResponse.message.statusCode)) {
return;
}
}
throw new Error(`Cache service responded with ${response.message.statusCode} during chunk upload.`);
}); });
} }
function parseEnvNumber(key) { function parseEnvNumber(key) {
@ -2379,13 +2409,12 @@ function uploadFile(httpClient, cacheId, archivePath) {
const start = offset; const start = offset;
const end = offset + chunkSize - 1; const end = offset + chunkSize - 1;
offset += MAX_CHUNK_SIZE; offset += MAX_CHUNK_SIZE;
const chunk = fs.createReadStream(archivePath, { yield uploadChunk(httpClient, resourceUrl, () => fs.createReadStream(archivePath, {
fd, fd,
start, start,
end, end,
autoClose: false autoClose: false
}); }), start, end);
yield uploadChunk(httpClient, resourceUrl, chunk, start, end);
} }
}))); })));
} }
@ -3642,6 +3671,12 @@ class HttpClientResponse {
this.message.on('data', (chunk) => { this.message.on('data', (chunk) => {
output = Buffer.concat([output, chunk]); output = Buffer.concat([output, chunk]);
}); });
this.message.on('aborted', () => {
reject("Request was aborted or closed prematurely");
});
this.message.on('timeout', (socket) => {
reject("Request timed out");
});
this.message.on('end', () => { this.message.on('end', () => {
resolve(output.toString()); resolve(output.toString());
}); });
@ -3763,6 +3798,7 @@ class HttpClient {
let response; let response;
while (numTries < maxTries) { while (numTries < maxTries) {
response = await this.requestRaw(info, data); response = await this.requestRaw(info, data);
// Check if it's an authentication challenge // Check if it's an authentication challenge
if (response && response.message && response.message.statusCode === HttpCodes.Unauthorized) { if (response && response.message && response.message.statusCode === HttpCodes.Unauthorized) {
let authenticationHandler; let authenticationHandler;
@ -3874,6 +3910,7 @@ class HttpClient {
req.on('error', function (err) { req.on('error', function (err) {
// err has statusCode property // err has statusCode property
// res should have headers // res should have headers
console.log(`Caught error on request: ${err}`);
handleResult(err, null); handleResult(err, null);
}); });
if (data && typeof (data) === 'string') { if (data && typeof (data) === 'string') {

View file

@ -99,6 +99,84 @@ export function getCacheVersion(compressionMethod?: CompressionMethod): string {
.digest("hex"); .digest("hex");
} }
export async function retry<R, T>(
name: string,
method: () => Promise<R>,
getStatusCode: (R) => number | undefined,
getReturnValue: (R) => T,
isSuccessStatusCode: (number) => boolean,
isRetryableStatusCode: (number) => boolean,
maxAttempts = 2
): Promise<T> {
let response: R | undefined = undefined;
let statusCode: number | undefined = undefined;
let isRetryable = false;
let errorMessage = "";
let attempt = 1;
while (attempt <= maxAttempts) {
try {
response = await method();
statusCode = getStatusCode(response);
if (isSuccessStatusCode(statusCode)) {
return getReturnValue(response);
}
isRetryable = isRetryableStatusCode(statusCode);
errorMessage = `Cache service responded with ${statusCode}`;
} catch (error) {
isRetryable = true;
errorMessage = error.message;
}
core.debug(
`${name} - Attempt ${attempt} of ${maxAttempts} failed with error: ${errorMessage}`
);
if (!isRetryable) {
core.debug(`${name} - Error is not retryable`);
break;
}
attempt++;
}
throw Error(`${name} failed: ${errorMessage}`);
}
export async function retryTypedResponse<T>(
name: string,
method: () => Promise<ITypedResponse<T>>,
maxAttempts = 2
): Promise<ITypedResponse<T>> {
return await retry(
name,
method,
(response: ITypedResponse<T>) => response.statusCode,
(response: ITypedResponse<T>) => response,
isSuccessStatusCode,
isRetryableStatusCode,
maxAttempts
);
}
export async function retryHttpClientResponse<T>(
name: string,
method: () => Promise<IHttpClientResponse>,
maxAttempts = 2
): Promise<IHttpClientResponse> {
return await retry(
name,
method,
(response: IHttpClientResponse) => response.message.statusCode,
(response: IHttpClientResponse) => response,
isSuccessStatusCode,
isRetryableStatusCode,
maxAttempts
);
}
export async function getCacheEntry( export async function getCacheEntry(
keys: string[], keys: string[],
options?: CacheOptions options?: CacheOptions
@ -109,15 +187,13 @@ export async function getCacheEntry(
keys.join(",") keys.join(",")
)}&version=${version}`; )}&version=${version}`;
const response = await httpClient.getJson<ArtifactCacheEntry>( const response = await retryTypedResponse("getCacheEntry", () =>
getCacheApiUrl(resource) httpClient.getJson<ArtifactCacheEntry>(getCacheApiUrl(resource))
); );
if (response.statusCode === 204) { if (response.statusCode === 204) {
return null; return null;
} }
if (!isSuccessStatusCode(response.statusCode)) {
throw new Error(`Cache service responded with ${response.statusCode}`);
}
const cacheResult = response.result; const cacheResult = response.result;
const cacheDownloadUrl = cacheResult?.archiveLocation; const cacheDownloadUrl = cacheResult?.archiveLocation;
@ -206,7 +282,7 @@ function getContentRange(start: number, end: number): string {
async function uploadChunk( async function uploadChunk(
httpClient: HttpClient, httpClient: HttpClient,
resourceUrl: string, resourceUrl: string,
data: NodeJS.ReadableStream, openStream: () => NodeJS.ReadableStream,
start: number, start: number,
end: number end: number
): Promise<void> { ): Promise<void> {
@ -227,28 +303,14 @@ async function uploadChunk(
return await httpClient.sendStream( return await httpClient.sendStream(
"PATCH", "PATCH",
resourceUrl, resourceUrl,
data, openStream(),
additionalHeaders additionalHeaders
); );
}; };
const response = await uploadChunkRequest(); await retryHttpClientResponse(
if (isSuccessStatusCode(response.message.statusCode)) { `uploadChunk (start: ${start}, end: ${end})`,
return; uploadChunkRequest
}
if (isRetryableStatusCode(response.message.statusCode)) {
core.debug(
`Received ${response.message.statusCode}, retrying chunk at offset ${start}.`
);
const retryResponse = await uploadChunkRequest();
if (isSuccessStatusCode(retryResponse.message.statusCode)) {
return;
}
}
throw new Error(
`Cache service responded with ${response.message.statusCode} during chunk upload.`
); );
} }
@ -290,17 +352,17 @@ async function uploadFile(
const start = offset; const start = offset;
const end = offset + chunkSize - 1; const end = offset + chunkSize - 1;
offset += MAX_CHUNK_SIZE; offset += MAX_CHUNK_SIZE;
const chunk = fs.createReadStream(archivePath, {
fd,
start,
end,
autoClose: false
});
await uploadChunk( await uploadChunk(
httpClient, httpClient,
resourceUrl, resourceUrl,
chunk, () =>
fs.createReadStream(archivePath, {
fd,
start,
end,
autoClose: false
}),
start, start,
end end
); );