From b45d91cc4bac5c58ff24b19cd3b6711b2c5eca8f Mon Sep 17 00:00:00 2001 From: Josh Gross Date: Mon, 6 Jan 2020 13:05:50 -0500 Subject: [PATCH] Chunked Cache Upload APIs (#128) * Initial pass at chunked upload apis * Fix cacheEntry type * Linting * Fix download cache entry tests * Linting tests * Pull in fixes from testing branch * Fix typo in ReserveCacheResponse * Add test convering reserve cache failure * Add retries to upload chunk * PR feedback * Format default chunk size * Remove responses array --- __tests__/restore.test.ts | 15 ++- __tests__/save.test.ts | 80 +++++++++++- package-lock.json | 2 +- package.json | 2 +- src/cacheHttpClient.ts | 250 +++++++++++++++++++++++++++++++------- src/contracts.d.ts | 13 ++ src/restore.ts | 7 +- src/save.ts | 16 ++- 8 files changed, 324 insertions(+), 61 deletions(-) diff --git a/__tests__/restore.test.ts b/__tests__/restore.test.ts index 15e0ba5..c96a2d6 100644 --- a/__tests__/restore.test.ts +++ b/__tests__/restore.test.ts @@ -248,7 +248,10 @@ test("restore with cache found", async () => { expect(getCacheMock).toHaveBeenCalledWith([key]); expect(setCacheStateMock).toHaveBeenCalledWith(cacheEntry); expect(createTempDirectoryMock).toHaveBeenCalledTimes(1); - expect(downloadCacheMock).toHaveBeenCalledWith(cacheEntry, archivePath); + expect(downloadCacheMock).toHaveBeenCalledWith( + cacheEntry.archiveLocation, + archivePath + ); expect(getArchiveFileSizeMock).toHaveBeenCalledWith(archivePath); expect(extractTarMock).toHaveBeenCalledTimes(1); @@ -312,7 +315,10 @@ test("restore with a pull request event and cache found", async () => { expect(getCacheMock).toHaveBeenCalledWith([key]); expect(setCacheStateMock).toHaveBeenCalledWith(cacheEntry); expect(createTempDirectoryMock).toHaveBeenCalledTimes(1); - expect(downloadCacheMock).toHaveBeenCalledWith(cacheEntry, archivePath); + expect(downloadCacheMock).toHaveBeenCalledWith( + cacheEntry.archiveLocation, + archivePath + ); expect(getArchiveFileSizeMock).toHaveBeenCalledWith(archivePath); expect(infoMock).toHaveBeenCalledWith(`Cache Size: ~60 MB (62915000 B)`); @@ -377,7 +383,10 @@ test("restore with cache found for restore key", async () => { expect(getCacheMock).toHaveBeenCalledWith([key, restoreKey]); expect(setCacheStateMock).toHaveBeenCalledWith(cacheEntry); expect(createTempDirectoryMock).toHaveBeenCalledTimes(1); - expect(downloadCacheMock).toHaveBeenCalledWith(cacheEntry, archivePath); + expect(downloadCacheMock).toHaveBeenCalledWith( + cacheEntry.archiveLocation, + archivePath + ); expect(getArchiveFileSizeMock).toHaveBeenCalledWith(archivePath); expect(infoMock).toHaveBeenCalledWith(`Cache Size: ~0 MB (142 B)`); diff --git a/__tests__/save.test.ts b/__tests__/save.test.ts index ba76cda..b355076 100644 --- a/__tests__/save.test.ts +++ b/__tests__/save.test.ts @@ -194,7 +194,7 @@ test("save with large cache outputs warning", async () => { const createTarMock = jest.spyOn(tar, "createTar"); - const cacheSize = 1024 * 1024 * 1024; //~1GB, over the 400MB limit + const cacheSize = 4 * 1024 * 1024 * 1024; //~4GB, over the 2GB limit jest.spyOn(actionUtils, "getArchiveFileSize").mockImplementationOnce(() => { return cacheSize; }); @@ -208,12 +208,63 @@ test("save with large cache outputs warning", async () => { expect(logWarningMock).toHaveBeenCalledTimes(1); expect(logWarningMock).toHaveBeenCalledWith( - "Cache size of ~1024 MB (1073741824 B) is over the 400MB limit, not saving cache." + "Cache size of ~4096 MB (4294967296 B) is over the 2GB limit, not saving cache." ); expect(failedMock).toHaveBeenCalledTimes(0); }); +test("save with reserve cache failure outputs warning", async () => { + const infoMock = jest.spyOn(core, "info"); + const logWarningMock = jest.spyOn(actionUtils, "logWarning"); + const failedMock = jest.spyOn(core, "setFailed"); + + const primaryKey = "Linux-node-bb828da54c148048dd17899ba9fda624811cfb43"; + const cacheEntry: ArtifactCacheEntry = { + cacheKey: "Linux-node-", + scope: "refs/heads/master", + creationTime: "2019-11-13T19:18:02+00:00", + archiveLocation: "www.actionscache.test/download" + }; + + jest.spyOn(core, "getState") + // Cache Entry State + .mockImplementationOnce(() => { + return JSON.stringify(cacheEntry); + }) + // Cache Key State + .mockImplementationOnce(() => { + return primaryKey; + }); + + const inputPath = "node_modules"; + testUtils.setInput(Inputs.Path, inputPath); + + const reserveCacheMock = jest + .spyOn(cacheHttpClient, "reserveCache") + .mockImplementationOnce(() => { + return Promise.resolve(-1); + }); + + const createTarMock = jest.spyOn(tar, "createTar"); + + const saveCacheMock = jest.spyOn(cacheHttpClient, "saveCache"); + + await run(); + + expect(reserveCacheMock).toHaveBeenCalledTimes(1); + expect(reserveCacheMock).toHaveBeenCalledWith(primaryKey); + + expect(infoMock).toHaveBeenCalledWith( + `Unable to reserve cache with key ${primaryKey}, another job may be creating this cache.` + ); + + expect(createTarMock).toHaveBeenCalledTimes(0); + expect(saveCacheMock).toHaveBeenCalledTimes(0); + expect(logWarningMock).toHaveBeenCalledTimes(0); + expect(failedMock).toHaveBeenCalledTimes(0); +}); + test("save with server error outputs warning", async () => { const logWarningMock = jest.spyOn(actionUtils, "logWarning"); const failedMock = jest.spyOn(core, "setFailed"); @@ -240,6 +291,13 @@ test("save with server error outputs warning", async () => { const cachePath = path.resolve(inputPath); testUtils.setInput(Inputs.Path, inputPath); + const cacheId = 4; + const reserveCacheMock = jest + .spyOn(cacheHttpClient, "reserveCache") + .mockImplementationOnce(() => { + return Promise.resolve(cacheId); + }); + const createTarMock = jest.spyOn(tar, "createTar"); const saveCacheMock = jest @@ -250,13 +308,16 @@ test("save with server error outputs warning", async () => { await run(); + expect(reserveCacheMock).toHaveBeenCalledTimes(1); + expect(reserveCacheMock).toHaveBeenCalledWith(primaryKey); + const archivePath = path.join("/foo/bar", "cache.tgz"); expect(createTarMock).toHaveBeenCalledTimes(1); expect(createTarMock).toHaveBeenCalledWith(archivePath, cachePath); expect(saveCacheMock).toHaveBeenCalledTimes(1); - expect(saveCacheMock).toHaveBeenCalledWith(primaryKey, archivePath); + expect(saveCacheMock).toHaveBeenCalledWith(cacheId, archivePath); expect(logWarningMock).toHaveBeenCalledTimes(1); expect(logWarningMock).toHaveBeenCalledWith("HTTP Error Occurred"); @@ -289,18 +350,29 @@ test("save with valid inputs uploads a cache", async () => { const cachePath = path.resolve(inputPath); testUtils.setInput(Inputs.Path, inputPath); + const cacheId = 4; + const reserveCacheMock = jest + .spyOn(cacheHttpClient, "reserveCache") + .mockImplementationOnce(() => { + return Promise.resolve(cacheId); + }); + const createTarMock = jest.spyOn(tar, "createTar"); + const saveCacheMock = jest.spyOn(cacheHttpClient, "saveCache"); await run(); + expect(reserveCacheMock).toHaveBeenCalledTimes(1); + expect(reserveCacheMock).toHaveBeenCalledWith(primaryKey); + const archivePath = path.join("/foo/bar", "cache.tgz"); expect(createTarMock).toHaveBeenCalledTimes(1); expect(createTarMock).toHaveBeenCalledWith(archivePath, cachePath); expect(saveCacheMock).toHaveBeenCalledTimes(1); - expect(saveCacheMock).toHaveBeenCalledWith(primaryKey, archivePath); + expect(saveCacheMock).toHaveBeenCalledWith(cacheId, archivePath); expect(failedMock).toHaveBeenCalledTimes(0); }); diff --git a/package-lock.json b/package-lock.json index 986b08b..37e50d2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "cache", - "version": "1.0.3", + "version": "1.1.0", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index facc0af..7de321b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "cache", - "version": "1.0.3", + "version": "1.1.0", "private": true, "description": "Cache dependencies and build outputs", "main": "dist/restore/index.js", diff --git a/src/cacheHttpClient.ts b/src/cacheHttpClient.ts index 8a2014f..89bca63 100644 --- a/src/cacheHttpClient.ts +++ b/src/cacheHttpClient.ts @@ -1,26 +1,49 @@ import * as core from "@actions/core"; import * as fs from "fs"; import { BearerCredentialHandler } from "typed-rest-client/Handlers"; -import { HttpClient } from "typed-rest-client/HttpClient"; +import { HttpClient, HttpCodes } from "typed-rest-client/HttpClient"; import { IHttpClientResponse } from "typed-rest-client/Interfaces"; -import { IRequestOptions, RestClient } from "typed-rest-client/RestClient"; -import { ArtifactCacheEntry } from "./contracts"; +import { + IRequestOptions, + RestClient, + IRestResponse +} from "typed-rest-client/RestClient"; +import { + ArtifactCacheEntry, + CommitCacheRequest, + ReserveCacheRequest, + ReserveCacheResponse +} from "./contracts"; +import * as utils from "./utils/actionUtils"; -function getCacheUrl(): string { +function isSuccessStatusCode(statusCode: number): boolean { + return statusCode >= 200 && statusCode < 300; +} + +function isRetryableStatusCode(statusCode: number): boolean { + const retryableStatusCodes = [ + HttpCodes.BadGateway, + HttpCodes.ServiceUnavailable, + HttpCodes.GatewayTimeout + ]; + return retryableStatusCodes.includes(statusCode); +} + +function getCacheApiUrl(): string { // Ideally we just use ACTIONS_CACHE_URL - const cacheUrl: string = ( + const baseUrl: string = ( process.env["ACTIONS_CACHE_URL"] || process.env["ACTIONS_RUNTIME_URL"] || "" ).replace("pipelines", "artifactcache"); - if (!cacheUrl) { + if (!baseUrl) { throw new Error( "Cache Service Url not found, unable to restore cache." ); } - core.debug(`Cache Url: ${cacheUrl}`); - return cacheUrl; + core.debug(`Cache Url: ${baseUrl}`); + return `${baseUrl}_apis/artifactcache/`; } function createAcceptHeader(type: string, apiVersion: string): string { @@ -29,26 +52,26 @@ function createAcceptHeader(type: string, apiVersion: string): string { function getRequestOptions(): IRequestOptions { const requestOptions: IRequestOptions = { - acceptHeader: createAcceptHeader("application/json", "5.2-preview.1") + acceptHeader: createAcceptHeader("application/json", "6.0-preview.1") }; return requestOptions; } -export async function getCacheEntry( - keys: string[] -): Promise { - const cacheUrl = getCacheUrl(); +function createRestClient(): RestClient { const token = process.env["ACTIONS_RUNTIME_TOKEN"] || ""; const bearerCredentialHandler = new BearerCredentialHandler(token); - const resource = `_apis/artifactcache/cache?keys=${encodeURIComponent( - keys.join(",") - )}`; - - const restClient = new RestClient("actions/cache", cacheUrl, [ + return new RestClient("actions/cache", getCacheApiUrl(), [ bearerCredentialHandler ]); +} + +export async function getCacheEntry( + keys: string[] +): Promise { + const restClient = createRestClient(); + const resource = `cache?keys=${encodeURIComponent(keys.join(","))}`; const response = await restClient.get( resource, @@ -57,14 +80,15 @@ export async function getCacheEntry( if (response.statusCode === 204) { return null; } - if (response.statusCode !== 200) { + if (!isSuccessStatusCode(response.statusCode)) { throw new Error(`Cache service responded with ${response.statusCode}`); } const cacheResult = response.result; - if (!cacheResult || !cacheResult.archiveLocation) { + const cacheDownloadUrl = cacheResult?.archiveLocation; + if (!cacheDownloadUrl) { throw new Error("Cache not found."); } - core.setSecret(cacheResult.archiveLocation); + core.setSecret(cacheDownloadUrl); core.debug(`Cache Result:`); core.debug(JSON.stringify(cacheResult)); @@ -83,46 +107,178 @@ async function pipeResponseToStream( } export async function downloadCache( - cacheEntry: ArtifactCacheEntry, + archiveLocation: string, archivePath: string ): Promise { const stream = fs.createWriteStream(archivePath); const httpClient = new HttpClient("actions/cache"); - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const downloadResponse = await httpClient.get(cacheEntry.archiveLocation!); + const downloadResponse = await httpClient.get(archiveLocation); await pipeResponseToStream(downloadResponse, stream); } -export async function saveCache( - key: string, - archivePath: string +// Reserve Cache +export async function reserveCache(key: string): Promise { + const restClient = createRestClient(); + + const reserveCacheRequest: ReserveCacheRequest = { + key + }; + const response = await restClient.create( + "caches", + reserveCacheRequest, + getRequestOptions() + ); + + return response?.result?.cacheId ?? -1; +} + +function getContentRange(start: number, end: number): string { + // Format: `bytes start-end/filesize + // start and end are inclusive + // filesize can be * + // For a 200 byte chunk starting at byte 0: + // Content-Range: bytes 0-199/* + return `bytes ${start}-${end}/*`; +} + +async function uploadChunk( + restClient: RestClient, + resourceUrl: string, + data: NodeJS.ReadableStream, + start: number, + end: number ): Promise { - const stream = fs.createReadStream(archivePath); - - const cacheUrl = getCacheUrl(); - const token = process.env["ACTIONS_RUNTIME_TOKEN"] || ""; - const bearerCredentialHandler = new BearerCredentialHandler(token); - - const resource = `_apis/artifactcache/cache/${encodeURIComponent(key)}`; - const postUrl = cacheUrl + resource; - - const restClient = new RestClient("actions/cache", undefined, [ - bearerCredentialHandler - ]); - + core.debug( + `Uploading chunk of size ${end - + start + + 1} bytes at offset ${start} with content range: ${getContentRange( + start, + end + )}` + ); const requestOptions = getRequestOptions(); requestOptions.additionalHeaders = { - "Content-Type": "application/octet-stream" + "Content-Type": "application/octet-stream", + "Content-Range": getContentRange(start, end) }; - const response = await restClient.uploadStream( - "POST", - postUrl, - stream, + const uploadChunkRequest = async (): Promise> => { + return await restClient.uploadStream( + "PATCH", + resourceUrl, + data, + requestOptions + ); + }; + + const response = await uploadChunkRequest(); + if (isSuccessStatusCode(response.statusCode)) { + return; + } + + if (isRetryableStatusCode(response.statusCode)) { + core.debug( + `Received ${response.statusCode}, retrying chunk at offset ${start}.` + ); + const retryResponse = await uploadChunkRequest(); + if (isSuccessStatusCode(retryResponse.statusCode)) { + return; + } + } + + throw new Error( + `Cache service responded with ${response.statusCode} during chunk upload.` + ); +} + +async function uploadFile( + restClient: RestClient, + cacheId: number, + archivePath: string +): Promise { + // Upload Chunks + const fileSize = fs.statSync(archivePath).size; + const resourceUrl = getCacheApiUrl() + "caches/" + cacheId.toString(); + const fd = fs.openSync(archivePath, "r"); + + const concurrency = Number(process.env["CACHE_UPLOAD_CONCURRENCY"]) ?? 4; // # of HTTP requests in parallel + const MAX_CHUNK_SIZE = + Number(process.env["CACHE_UPLOAD_CHUNK_SIZE"]) ?? 32 * 1024 * 1024; // 32 MB Chunks + core.debug(`Concurrency: ${concurrency} and Chunk Size: ${MAX_CHUNK_SIZE}`); + + const parallelUploads = [...new Array(concurrency).keys()]; + core.debug("Awaiting all uploads"); + let offset = 0; + + try { + await Promise.all( + parallelUploads.map(async () => { + while (offset < fileSize) { + const chunkSize = Math.min( + fileSize - offset, + MAX_CHUNK_SIZE + ); + const start = offset; + const end = offset + chunkSize - 1; + offset += MAX_CHUNK_SIZE; + const chunk = fs.createReadStream(archivePath, { + fd, + start, + end, + autoClose: false + }); + + await uploadChunk( + restClient, + resourceUrl, + chunk, + start, + end + ); + } + }) + ); + } finally { + fs.closeSync(fd); + } + return; +} + +async function commitCache( + restClient: RestClient, + cacheId: number, + filesize: number +): Promise> { + const requestOptions = getRequestOptions(); + const commitCacheRequest: CommitCacheRequest = { size: filesize }; + return await restClient.create( + `caches/${cacheId.toString()}`, + commitCacheRequest, requestOptions ); - if (response.statusCode !== 200) { - throw new Error(`Cache service responded with ${response.statusCode}`); +} + +export async function saveCache( + cacheId: number, + archivePath: string +): Promise { + const restClient = createRestClient(); + + core.debug("Upload cache"); + await uploadFile(restClient, cacheId, archivePath); + + // Commit Cache + core.debug("Commiting cache"); + const cacheSize = utils.getArchiveFileSize(archivePath); + const commitCacheResponse = await commitCache( + restClient, + cacheId, + cacheSize + ); + if (!isSuccessStatusCode(commitCacheResponse.statusCode)) { + throw new Error( + `Cache service responded with ${commitCacheResponse.statusCode} during commit cache.` + ); } core.info("Cache saved successfully"); diff --git a/src/contracts.d.ts b/src/contracts.d.ts index 8478b83..269c7d9 100644 --- a/src/contracts.d.ts +++ b/src/contracts.d.ts @@ -4,3 +4,16 @@ export interface ArtifactCacheEntry { creationTime?: string; archiveLocation?: string; } + +export interface CommitCacheRequest { + size: number; +} + +export interface ReserveCacheRequest { + key: string; + version?: string; +} + +export interface ReserveCacheResponse { + cacheId: number; +} diff --git a/src/restore.ts b/src/restore.ts index 599dbd7..4911e7e 100644 --- a/src/restore.ts +++ b/src/restore.ts @@ -60,7 +60,7 @@ async function run(): Promise { try { const cacheEntry = await cacheHttpClient.getCacheEntry(keys); - if (!cacheEntry) { + if (!cacheEntry?.archiveLocation) { core.info( `Cache not found for input keys: ${keys.join(", ")}.` ); @@ -77,7 +77,10 @@ async function run(): Promise { utils.setCacheState(cacheEntry); // Download the cache from the cache entry - await cacheHttpClient.downloadCache(cacheEntry, archivePath); + await cacheHttpClient.downloadCache( + cacheEntry.archiveLocation, + archivePath + ); const archiveFileSize = utils.getArchiveFileSize(archivePath); core.info( diff --git a/src/save.ts b/src/save.ts index 56198a7..ee64e42 100644 --- a/src/save.ts +++ b/src/save.ts @@ -34,6 +34,15 @@ async function run(): Promise { return; } + core.debug("Reserving Cache"); + const cacheId = await cacheHttpClient.reserveCache(primaryKey); + if (cacheId == -1) { + core.info( + `Unable to reserve cache with key ${primaryKey}, another job may be creating this cache.` + ); + return; + } + core.debug(`Cache ID: ${cacheId}`); const cachePath = utils.resolvePath( core.getInput(Inputs.Path, { required: true }) ); @@ -47,19 +56,20 @@ async function run(): Promise { await createTar(archivePath, cachePath); - const fileSizeLimit = 400 * 1024 * 1024; // 400MB + const fileSizeLimit = 2 * 1024 * 1024 * 1024; // 2GB per repo limit const archiveFileSize = utils.getArchiveFileSize(archivePath); core.debug(`File Size: ${archiveFileSize}`); if (archiveFileSize > fileSizeLimit) { utils.logWarning( `Cache size of ~${Math.round( archiveFileSize / (1024 * 1024) - )} MB (${archiveFileSize} B) is over the 400MB limit, not saving cache.` + )} MB (${archiveFileSize} B) is over the 2GB limit, not saving cache.` ); return; } - await cacheHttpClient.saveCache(primaryKey, archivePath); + core.debug(`Saving Cache (ID: ${cacheId})`); + await cacheHttpClient.saveCache(cacheId, archivePath); } catch (error) { utils.logWarning(error.message); }