diff --git a/change/change-72232b60-d75b-4512-934d-a1d346e6658a.json b/change/change-72232b60-d75b-4512-934d-a1d346e6658a.json new file mode 100644 index 000000000..cb159c109 --- /dev/null +++ b/change/change-72232b60-d75b-4512-934d-a1d346e6658a.json @@ -0,0 +1,25 @@ +{ + "changes": [ + { + "type": "minor", + "comment": "Add support for a local cache that uses tar", + "packageName": "@lage-run/cache", + "email": "1581488+christiango@users.noreply.github.com", + "dependentChangeType": "patch" + }, + { + "type": "minor", + "comment": "Add support for a local cache that uses tar", + "packageName": "@lage-run/config", + "email": "1581488+christiango@users.noreply.github.com", + "dependentChangeType": "patch" + }, + { + "type": "minor", + "comment": "Add support for a local cache that uses tar", + "packageName": "@lage-run/scheduler", + "email": "1581488+christiango@users.noreply.github.com", + "dependentChangeType": "patch" + } + ] +} \ No newline at end of file diff --git a/packages/cache/package.json b/packages/cache/package.json index 8a7f38baa..ed2756957 100644 --- a/packages/cache/package.json +++ b/packages/cache/package.json @@ -21,12 +21,14 @@ "@azure/core-auth": "1.9.0", "@azure/identity": "4.9.1", "@lage-run/config": "workspace:^", + "@lage-run/globby": "workspace:^", "@lage-run/logger": "workspace:^", "@lage-run/target-graph": "workspace:^", "backfill-cache": "workspace:^", "backfill-config": "workspace:^", "backfill-logger": "workspace:^", - "glob-hasher": "^1.4.2" + "glob-hasher": "^1.4.2", + "tar": "7.5.11" }, "devDependencies": { "@lage-run/monorepo-scripts": "workspace:^", diff --git a/packages/cache/src/__tests__/LocalTarCacheProvider.test.ts b/packages/cache/src/__tests__/LocalTarCacheProvider.test.ts new file mode 100644 index 000000000..47816548a --- /dev/null +++ b/packages/cache/src/__tests__/LocalTarCacheProvider.test.ts @@ -0,0 +1,215 @@ +import createLogger from "@lage-run/logger"; +import { Monorepo } from "@lage-run/test-utilities"; +import type { Target } from "@lage-run/target-graph"; +import path from "path"; +import fs from "fs"; +import { LocalTarCacheProvider, type LocalTarCacheProviderOptions } from "../providers/LocalTarCacheProvider.js"; +import { getCacheDirectoryRoot } from "../getCacheDirectory.js"; + +describe("LocalTarCacheProvider", () => { + let monorepo: Monorepo | undefined; + + afterEach(async () => { + await monorepo?.cleanup(); + monorepo = undefined; + }); + + it("should put outputs into a tar and fetch them back", async () => { + const logger = createLogger(); + monorepo = new Monorepo("tar-cache-roundtrip"); + + await monorepo.init({ packages: { a: {} } }); + + const options: LocalTarCacheProviderOptions = { + logger, + root: monorepo.root, + outputGlob: ["output.txt"], + }; + + const provider = new LocalTarCacheProvider(options); + + const target: Target = { + id: "a", + cwd: path.join(monorepo.root, "packages/a"), + depSpecs: [], + dependents: [], + dependencies: [], + task: "command", + label: "a - command", + }; + + const hash = "some-hash"; + + // Write an output file in the package directory + monorepo.writeFiles({ + "packages/a/output.txt": "hello from tar cache", + }); + + // Put the output into the cache + await provider.put(hash, target); + + // Verify the tar file was created + const tarPath = path.join(getCacheDirectoryRoot(monorepo.root), "cache", hash.substring(0, 4), `${hash}.tar`); + expect(fs.existsSync(tarPath)).toBe(true); + + // Remove the output file so we can verify fetch restores it + fs.unlinkSync(path.join(monorepo.root, "packages/a/output.txt")); + expect(fs.existsSync(path.join(monorepo.root, "packages/a/output.txt"))).toBe(false); + + // Fetch from cache + const fetchResult = await provider.fetch(hash, target); + expect(fetchResult).toBe(true); + + // Verify the file was restored + const contents = monorepo.readFiles(["packages/a/output.txt"]); + expect(contents["packages/a/output.txt"]).toBe("hello from tar cache"); + }); + + it("should return false when cache entry does not exist", async () => { + const logger = createLogger(); + monorepo = new Monorepo("tar-cache-miss"); + + await monorepo.init({ packages: { a: {} } }); + + const options: LocalTarCacheProviderOptions = { + logger, + root: monorepo.root, + outputGlob: ["output.txt"], + }; + + const provider = new LocalTarCacheProvider(options); + + const target: Target = { + id: "a", + cwd: path.join(monorepo.root, "packages/a"), + depSpecs: [], + dependents: [], + dependencies: [], + task: "command", + label: "a - command", + }; + + const fetchResult = await provider.fetch("nonexistent-hash", target); + expect(fetchResult).toBe(false); + }); + + it("should handle multiple output files in subdirectories", async () => { + const logger = createLogger(); + monorepo = new Monorepo("tar-cache-multi"); + + await monorepo.init({ packages: { a: {} } }); + + const options: LocalTarCacheProviderOptions = { + logger, + root: monorepo.root, + outputGlob: ["lib/**/*"], + }; + + const provider = new LocalTarCacheProvider(options); + + const target: Target = { + id: "a", + cwd: path.join(monorepo.root, "packages/a"), + depSpecs: [], + dependents: [], + dependencies: [], + task: "build", + label: "a - build", + }; + + const hash = "multi-file-hash"; + + // Write multiple output files + monorepo.writeFiles({ + "packages/a/lib/index.js": "module.exports = {};", + "packages/a/lib/utils/helper.js": "module.exports = { help: true };", + }); + + await provider.put(hash, target); + + // Remove output files + fs.rmSync(path.join(monorepo.root, "packages/a/lib"), { recursive: true }); + expect(fs.existsSync(path.join(monorepo.root, "packages/a/lib"))).toBe(false); + + // Fetch and verify + const fetchResult = await provider.fetch(hash, target); + expect(fetchResult).toBe(true); + + const contents = monorepo.readFiles(["packages/a/lib/index.js", "packages/a/lib/utils/helper.js"]); + expect(contents["packages/a/lib/index.js"]).toBe("module.exports = {};"); + expect(contents["packages/a/lib/utils/helper.js"]).toBe("module.exports = { help: true };"); + }); + + it("should use target.outputs when available", async () => { + const logger = createLogger(); + monorepo = new Monorepo("tar-cache-target-outputs"); + + await monorepo.init({ packages: { a: {} } }); + + const options: LocalTarCacheProviderOptions = { + logger, + root: monorepo.root, + outputGlob: ["should-not-match/**/*"], + }; + + const provider = new LocalTarCacheProvider(options); + + const target: Target = { + id: "a", + cwd: path.join(monorepo.root, "packages/a"), + depSpecs: [], + dependents: [], + dependencies: [], + task: "build", + label: "a - build", + outputs: ["dist/**/*"], + }; + + const hash = "target-outputs-hash"; + + monorepo.writeFiles({ + "packages/a/dist/bundle.js": "console.log('bundled');", + }); + + await provider.put(hash, target); + + fs.rmSync(path.join(monorepo.root, "packages/a/dist"), { recursive: true }); + + const fetchResult = await provider.fetch(hash, target); + expect(fetchResult).toBe(true); + + const contents = monorepo.readFiles(["packages/a/dist/bundle.js"]); + expect(contents["packages/a/dist/bundle.js"]).toBe("console.log('bundled');"); + }); + + it("should skip put when no files match the output glob", async () => { + const logger = createLogger(); + monorepo = new Monorepo("tar-cache-no-match"); + + await monorepo.init({ packages: { a: {} } }); + + const options: LocalTarCacheProviderOptions = { + logger, + root: monorepo.root, + outputGlob: ["nonexistent/**/*"], + }; + + const provider = new LocalTarCacheProvider(options); + + const target: Target = { + id: "a", + cwd: path.join(monorepo.root, "packages/a"), + depSpecs: [], + dependents: [], + dependencies: [], + task: "build", + label: "a - build", + }; + + const hash = "no-match-hash"; + await provider.put(hash, target); + + const tarPath = path.join(getCacheDirectoryRoot(monorepo.root), "cache", hash.substring(0, 4), `${hash}.tar`); + expect(fs.existsSync(tarPath)).toBe(false); + }); +}); diff --git a/packages/cache/src/index.ts b/packages/cache/src/index.ts index 8e3611b4d..7b2cfbb88 100644 --- a/packages/cache/src/index.ts +++ b/packages/cache/src/index.ts @@ -1,4 +1,5 @@ export { BackfillCacheProvider } from "./providers/BackfillCacheProvider.js"; +export { LocalTarCacheProvider } from "./providers/LocalTarCacheProvider.js"; export { RemoteFallbackCacheProvider } from "./providers/RemoteFallbackCacheProvider.js"; export type { CacheOptions } from "@lage-run/config"; export type { CacheProvider } from "./types/CacheProvider.js"; diff --git a/packages/cache/src/providers/LocalTarCacheProvider.ts b/packages/cache/src/providers/LocalTarCacheProvider.ts new file mode 100644 index 000000000..fda6813f3 --- /dev/null +++ b/packages/cache/src/providers/LocalTarCacheProvider.ts @@ -0,0 +1,294 @@ +import * as fs from "fs"; +import * as fsp from "fs/promises"; +import * as path from "path"; +import * as tar from "tar"; +import { globAsync } from "@lage-run/globby"; +import type { CacheProvider } from "../types/CacheProvider.js"; +import type { Target } from "@lage-run/target-graph"; +import type { Logger } from "@lage-run/logger"; +import { getCacheDirectoryRoot } from "../getCacheDirectory.js"; +import { chunkPromise } from "../chunkPromise.js"; + +const MS_IN_A_DAY = 1000 * 60 * 60 * 24; +const TAR_BLOCK_SIZE = 512; + +/** + * Options for creating a {@link LocalTarCacheProvider}. + */ +export interface LocalTarCacheProviderOptions { + /** + * Absolute path to the monorepo root. + * Used to derive the cache directory (`/node_modules/.cache/lage/cache/`). + */ + root: string; + + /** Logger instance for diagnostic messages (e.g. cache miss/error details). */ + logger: Logger; + + /** + * Default glob patterns (relative to each package's `cwd`) that identify + * output files to cache. Overridden per-target by `target.outputs` when set. + * + * @example ["lib/**\/*", "dist/**\/*"] + * @default ["**\/*"] + */ + outputGlob?: string[]; +} + +/** + * In-memory representation of a single file entry within a tar archive. + * Used as an intermediate structure when building or parsing tar buffers. + */ +interface TarFileEntry { + /** + * File path relative to the package's `cwd` (posix separators, e.g. `"lib/index.js"`). + * This is the path stored inside the tar header and used to reconstruct + * the file on extraction. + */ + path: string; + + /** Complete file contents as a raw buffer. */ + data: Buffer; + + /** + * Unix file-mode bits (e.g. `0o644`). Only the lower 12 bits + * (permission + sticky/setuid/setgid) are written to the tar header. + */ + mode: number; + + /** Last-modified timestamp, preserved in the tar header. */ + mtime: Date; +} + +/** + * Tar-based local cache provider. + * + * Instead of copying every output file individually (one syscall per file), + * this provider packs all outputs into a single `.tar` file on `put()` and + * extracts on `fetch()`. This dramatically reduces I/O overhead on file + * systems where per-file operations are expensive (e.g. NTFS). + * + * Both `put()` and `fetch()` use optimised bulk I/O paths: + * + * - **put()** reads all output files into memory in parallel, constructs + * the tar archive as a single buffer, and writes it with one `writeFile` + * call. This avoids the per-file `stat` → `read` → stream pipeline + * that `tar.create()` performs sequentially via Minipass. + * + * - **fetch()** parses the tar into memory buffers, then writes every file + * out in parallel via `Promise.all(writeFile(...))`. This avoids the + * sequential `mkdir` + `lstat` + `chmod` + stream-write that + * `tar.extract()` performs per entry. + */ +export class LocalTarCacheProvider implements CacheProvider { + constructor(private options: LocalTarCacheProviderOptions) {} + + private getTarCacheDir(): string { + return path.join(getCacheDirectoryRoot(this.options.root), "cache"); + } + + private getTarPath(hash: string): string { + return path.join(this.getTarCacheDir(), hash.substring(0, 4), `${hash}.tar`); + } + + /** + * Build a POSIX.1-1988 (ustar) tar archive as a single buffer from in-memory + * file entries. + * + * A tar archive is a sequence of 512-byte blocks with the following layout + * for each file entry: + * + * ┌──────────────────────────────────────────────────┐ + * │ Header block (512 bytes) │ + * │ - file name, size, mode, mtime, checksum, etc. │ + * ├──────────────────────────────────────────────────┤ + * │ Data blocks (⌈size / 512⌉ × 512 bytes) │ + * │ - raw file contents, zero-padded to block │ + * │ boundary │ + * └──────────────────────────────────────────────────┘ + * + * After all entries, two consecutive 512-byte zero blocks signal + * end-of-archive. + * + * Header encoding is delegated to `tar.Header` from the `node-tar` package + * which handles ustar field layout, octal encoding, and checksum + * calculation. + * + * @see https://pubs.opengroup.org/onlinepubs/9699919799/utilities/pax.html#tag_20_92_13_06 + * — POSIX pax/tar header specification + * @see https://www.gnu.org/software/tar/manual/html_node/Standard.html + * — GNU tar documentation on the ustar header format + */ + private buildTarBuffer(entries: TarFileEntry[]): Buffer { + const parts: Buffer[] = []; + + for (const entry of entries) { + const header = new tar.Header({ + path: entry.path, + size: entry.data.length, + mode: entry.mode & 0o7777, + mtime: entry.mtime, + type: "File", + uid: 0, + gid: 0, + }); + header.encode(); + + parts.push(header.block!); + parts.push(entry.data); + + const remainder = entry.data.length % TAR_BLOCK_SIZE; + if (remainder > 0) { + parts.push(Buffer.alloc(TAR_BLOCK_SIZE - remainder)); + } + } + + // End-of-archive: two 512-byte zero blocks + parts.push(Buffer.alloc(TAR_BLOCK_SIZE * 2)); + + return Buffer.concat(parts); + } + + public async fetch(hash: string, target: Target): Promise { + if (!hash) { + return false; + } + + const tarPath = this.getTarPath(hash); + if (!fs.existsSync(tarPath)) { + return false; + } + + try { + // Parse the tar into memory, then write all files in parallel. + // This is ~5-7x faster than tar.extract() which does sequential + // mkdir + lstat + chmod + stream-write per entry. + const entries: TarFileEntry[] = []; + const dirs = new Set(); + const parser = new tar.Parser(); + + parser.on("entry", (entry) => { + if (entry.type === "Directory") { + dirs.add(entry.path); + entry.resume(); + } else { + const chunks: Buffer[] = []; + entry.on("data", (chunk: Buffer) => chunks.push(chunk)); + entry.on("end", () => { + entries.push({ + path: entry.path, + data: Buffer.concat(chunks), + mode: entry.mode ?? 0o666, + mtime: entry.mtime ?? new Date(), + }); + }); + } + }); + + await new Promise((resolve, reject) => { + const readStream = fs.createReadStream(tarPath); + readStream.pipe(parser); + parser.on("end", resolve); + parser.on("error", reject); + readStream.on("error", reject); + }); + + // Collect and create all needed directories + const allDirs = new Set(); + for (const entry of entries) { + allDirs.add(path.dirname(path.join(target.cwd, entry.path))); + } + for (const d of dirs) { + allDirs.add(path.join(target.cwd, d)); + } + for (const d of [...allDirs].sort()) { + fs.mkdirSync(d, { recursive: true }); + } + + // Write all files in parallel + await Promise.all(entries.map((entry) => fsp.writeFile(path.join(target.cwd, entry.path), entry.data))); + + return true; + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + this.options.logger.silly(`Tar cache fetch failed: ${message}`, { target }); + return false; + } + } + + public async put(hash: string, target: Target): Promise { + if (!hash) { + return; + } + + const outputGlob = target.outputs ?? this.options.outputGlob ?? ["**/*"]; + + try { + const files = await globAsync(outputGlob, { cwd: target.cwd }); + if (files.length === 0) { + return; + } + + // Read all output files in parallel, then build and write the tar + // in one shot. This is dramatically faster than tar.create() which + // processes each file sequentially through Minipass streams. + const fileEntries: TarFileEntry[] = await Promise.all( + files.map(async (filePath) => { + const fullPath = path.join(target.cwd, filePath); + const [data, fileStat] = await Promise.all([fsp.readFile(fullPath), fsp.stat(fullPath)]); + return { + path: filePath, + data, + mode: fileStat.mode, + mtime: fileStat.mtime, + }; + }) + ); + + const tarBuffer = this.buildTarBuffer(fileEntries); + const tarPath = this.getTarPath(hash); + fs.mkdirSync(path.dirname(tarPath), { recursive: true }); + await fsp.writeFile(tarPath, tarBuffer); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + this.options.logger.silly(`Tar cache put failed: ${message}`, { target }); + } + } + + public async clear(concurrency = 10): Promise { + return this.purge(0, concurrency); + } + + public async purge(prunePeriod = 30, concurrency = 10): Promise { + const now = new Date(); + + const cacheTypes = ["cache", "logs"]; + const entries: string[] = []; + + for (const cacheType of cacheTypes) { + const cacheTypeDirectory = path.join(getCacheDirectoryRoot(this.options.root), cacheType); + if (fs.existsSync(cacheTypeDirectory)) { + const prefixes = await fsp.readdir(cacheTypeDirectory); + for (const prefix of prefixes) { + entries.push(path.join(cacheTypeDirectory, prefix)); + } + } + } + + await chunkPromise( + entries.map((entry) => { + return async () => { + const entryStat = await fsp.stat(entry); + if (now.getTime() - entryStat.mtime.getTime() > prunePeriod * MS_IN_A_DAY) { + if (entryStat.isDirectory()) { + await fsp.rm(entry, { recursive: true }); + } else { + await fsp.rm(entry); + } + } + }; + }), + concurrency + ); + } +} diff --git a/packages/config/src/types/CacheOptions.ts b/packages/config/src/types/CacheOptions.ts index be3b79094..1559834eb 100644 --- a/packages/config/src/types/CacheOptions.ts +++ b/packages/config/src/types/CacheOptions.ts @@ -62,4 +62,11 @@ export type CacheOptions = Omit & { * to generate the cache key. */ cacheKey?: string; + + /** + * Use a tar-based local cache instead of copying files individually. + * Packs all outputs into a single `.tar` file per cache entry, reducing + * I/O overhead on file systems where per-file operations are expensive (e.g. NTFS). + */ + useLocalTarCache?: boolean; }; diff --git a/packages/scheduler/src/cache/createCacheProvider.ts b/packages/scheduler/src/cache/createCacheProvider.ts index 59b028c0d..8293dba01 100644 --- a/packages/scheduler/src/cache/createCacheProvider.ts +++ b/packages/scheduler/src/cache/createCacheProvider.ts @@ -1,5 +1,5 @@ import type { CacheOptions } from "@lage-run/cache"; -import { BackfillCacheProvider, RemoteFallbackCacheProvider } from "@lage-run/cache"; +import { BackfillCacheProvider, LocalTarCacheProvider, RemoteFallbackCacheProvider } from "@lage-run/cache"; import { isRunningFromCI } from "@lage-run/config"; import type { Logger } from "@lage-run/logger"; @@ -20,12 +20,17 @@ export function createCache(options: CreateCacheOptions): { !!cacheOptions?.cacheStorageConfig || !!process.env.BACKFILL_CACHE_PROVIDER || !!process.env.BACKFILL_CACHE_PROVIDER_OPTIONS; // Create Cache Provider - const cacheProvider = new RemoteFallbackCacheProvider({ - root, - logger, - localCacheProvider: - skipLocalCache === true - ? undefined + const useLocalTarCache = cacheOptions?.useLocalTarCache === true; + + const localCacheProvider = + skipLocalCache === true + ? undefined + : useLocalTarCache + ? new LocalTarCacheProvider({ + logger, + root, + outputGlob: cacheOptions?.outputGlob, + }) : new BackfillCacheProvider({ logger, root, @@ -34,7 +39,12 @@ export function createCache(options: CreateCacheOptions): { ...(cacheOptions?.internalCacheFolder && { internalCacheFolder: cacheOptions.internalCacheFolder }), ...(cacheOptions?.incrementalCaching && { incrementalCaching: cacheOptions.incrementalCaching }), }, - }), + }); + + const cacheProvider = new RemoteFallbackCacheProvider({ + root, + logger, + localCacheProvider, remoteCacheProvider: hasRemoteCacheConfig ? new BackfillCacheProvider({ logger, root, cacheOptions: cacheOptions ?? {} }) : undefined, writeRemoteCache: (cacheOptions?.writeRemoteCache === true || String(process.env.LAGE_WRITE_CACHE).toLowerCase() === "true" || isRunningFromCI) && diff --git a/yarn.lock b/yarn.lock index f47cbb12b..1b6db9520 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1556,6 +1556,7 @@ __metadata: "@azure/core-auth": "npm:1.9.0" "@azure/identity": "npm:4.9.1" "@lage-run/config": "workspace:^" + "@lage-run/globby": "workspace:^" "@lage-run/logger": "workspace:^" "@lage-run/monorepo-scripts": "workspace:^" "@lage-run/target-graph": "workspace:^" @@ -1564,6 +1565,7 @@ __metadata: backfill-config: "workspace:^" backfill-logger: "workspace:^" glob-hasher: "npm:^1.4.2" + tar: "npm:7.5.11" languageName: unknown linkType: soft @@ -8372,7 +8374,7 @@ __metadata: languageName: node linkType: hard -"tar@npm:^7.5.11": +"tar@npm:7.5.11, tar@npm:^7.5.11": version: 7.5.11 resolution: "tar@npm:7.5.11" dependencies: