diff --git a/lib/internal/modules/cjs/loader.js b/lib/internal/modules/cjs/loader.js index 827655bedb65bf..d1ab0bf54d0a2b 100644 --- a/lib/internal/modules/cjs/loader.js +++ b/lib/internal/modules/cjs/loader.js @@ -329,6 +329,28 @@ function reportModuleNotFoundToWatchMode(basePath, extensions) { } } +/** + * Tell the watch mode that a module was required, from within a worker thread. + * @param {string} filename Absolute path of the module + * @returns {void} + */ +function reportModuleToWatchModeFromWorker(filename) { + if (!shouldReportRequiredModules()) { + return; + } + const { isMainThread } = internalBinding('worker'); + if (isMainThread) { + return; + } + // Lazy require to avoid circular dependency: worker_threads is loaded after + // the CJS loader is fully set up. + const { parentPort } = require('worker_threads'); + if (!parentPort) { + return; + } + parentPort.postMessage({ 'watch:require': [filename] }); +} + /** * Create a new module instance. * @param {string} id @@ -1245,6 +1267,7 @@ Module._load = function(request, parent, isMain, internalResolveOptions = kEmpty relResolveCacheIdentifier = `${parent.path}\x00${request}`; const filename = relativeResolveCache[relResolveCacheIdentifier]; reportModuleToWatchMode(filename); + reportModuleToWatchModeFromWorker(filename); if (filename !== undefined) { const cachedModule = Module._cache[filename]; if (cachedModule !== undefined) { @@ -1335,6 +1358,7 @@ Module._load = function(request, parent, isMain, internalResolveOptions = kEmpty } reportModuleToWatchMode(filename); + reportModuleToWatchModeFromWorker(filename); Module._cache[filename] = module; module[kIsCachedByESMLoader] = false; // If there are resolve hooks, carry the context information into the diff --git a/lib/internal/modules/esm/loader.js b/lib/internal/modules/esm/loader.js index 04c374c00cfc3e..92e2adce9d1a16 100644 --- a/lib/internal/modules/esm/loader.js +++ b/lib/internal/modules/esm/loader.js @@ -531,6 +531,19 @@ class ModuleLoader { const type = requestType === kRequireInImportedCJS ? 'require' : 'import'; process.send({ [`watch:${type}`]: [url] }); } + + // Relay Events from worker to main thread + if (process.env.WATCH_REPORT_DEPENDENCIES && !process.send) { + const { isMainThread } = internalBinding('worker'); + if (isMainThread) { + return; + } + const { parentPort } = require('worker_threads'); + if (!parentPort) { + return; + } + parentPort.postMessage({ 'watch:import': [url] }); + } // TODO(joyeecheung): update the module requests to use importAttributes as property names. const importAttributes = resolveResult.importAttributes ?? request.attributes; diff --git a/lib/internal/worker.js b/lib/internal/worker.js index 2a4caed82cf7c5..a173fc466ceb54 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -1,6 +1,7 @@ 'use strict'; const { + ArrayIsArray, ArrayPrototypeForEach, ArrayPrototypeMap, ArrayPrototypePush, @@ -333,6 +334,18 @@ class Worker extends EventEmitter { this[kPublicPort].on(event, (message) => this.emit(event, message)); }); setupPortReferencing(this[kPublicPort], this, 'message'); + + // relay events from worker thread to watcher + if (process.env.WATCH_REPORT_DEPENDENCIES && process.send) { + this[kPublicPort].on('message', (message) => { + if (ArrayIsArray(message?.['watch:require'])) { + process.send({ 'watch:require': message['watch:require'] }); + } + if (ArrayIsArray(message?.['watch:import'])) { + process.send({ 'watch:import': message['watch:import'] }); + } + }); + } this[kPort].postMessage({ argv, type: messageTypes.LOAD_SCRIPT, diff --git a/test/sequential/test-watch-mode.mjs b/test/sequential/test-watch-mode.mjs index a5cac129ad1c21..12705573396376 100644 --- a/test/sequential/test-watch-mode.mjs +++ b/test/sequential/test-watch-mode.mjs @@ -922,4 +922,238 @@ process.on('message', (message) => { await done(); } }); -}); + + it('should watch changes to worker - cjs', async () => { + const dir = tmpdir.resolve(`watch-worker-cjs-${Date.now()}`); + mkdirSync(dir); + + const worker = path.join(dir, 'worker.js'); + + writeFileSync(worker, ` + console.log("worker running"); + `); + + const file = createTmpFile(` + const { Worker } = require('node:worker_threads'); + + const w = new Worker(${JSON.stringify(worker)}); + w.on('exit', () => { + console.log('running'); + }); + `, '.js', dir); + + const { stderr, stdout } = await runWriteSucceed({ + file, + watchedFile: worker, + }); + + assert.strictEqual(stderr, ''); + assert.deepStrictEqual(stdout, [ + 'worker running', + 'running', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + `Restarting ${inspect(file)}`, + 'worker running', + 'running', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + ]); + }); + + it('should watch changes to worker dependencies - cjs', async () => { + const dir = tmpdir.resolve(`watch-worker-dep-cjs-${Date.now()}`); + mkdirSync(dir); + + const dep = path.join(dir, 'dep.js'); + const worker = path.join(dir, 'worker.js'); + + writeFileSync(dep, ` + module.exports = 'dep v1'; + `); + + writeFileSync(worker, ` + const dep = require('./dep.js'); + console.log(dep); + `); + + const file = createTmpFile(` + const { Worker } = require('node:worker_threads'); + + const w = new Worker(${JSON.stringify(worker)}); + w.on('exit', () => { + console.log('running'); + }); + `, '.js', dir); + + const { stderr, stdout } = await runWriteSucceed({ + file, + watchedFile: dep, + }); + + assert.strictEqual(stderr, ''); + assert.deepStrictEqual(stdout, [ + 'dep v1', + 'running', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + `Restarting ${inspect(file)}`, + 'dep v1', + 'running', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + ]); + }); + + it('should watch changes to nested worker dependencies - cjs', async () => { + const dir = tmpdir.resolve(`watch-worker-dep-cjs-${Date.now()}`); + mkdirSync(dir); + + const subDep = path.join(dir, 'sub-dep.js'); + const dep = path.join(dir, 'dep.js'); + const worker = path.join(dir, 'worker.js'); + + writeFileSync(subDep, ` + module.exports = 'sub-dep v1'; + `); + + writeFileSync(dep, ` + const subDep = require('./sub-dep.js'); + console.log(subDep); + module.exports = 'dep v1'; + `); + + writeFileSync(worker, ` + const dep = require('./dep.js'); + `); + + const file = createTmpFile(` + const { Worker } = require('node:worker_threads'); + + const w = new Worker(${JSON.stringify(worker)}); + w.on('exit', () => { + console.log('running'); + }); + `, '.js', dir); + + const { stderr, stdout } = await runWriteSucceed({ + file, + watchedFile: subDep, + }); + + assert.strictEqual(stderr, ''); + assert.deepStrictEqual(stdout, [ + 'sub-dep v1', + 'running', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + `Restarting ${inspect(file)}`, + 'sub-dep v1', + 'running', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + ]); + }); + + it('should watch changes to worker - esm', async () => { + const dir = tmpdir.resolve(`watch-worker-esm-${Date.now()}`); + mkdirSync(dir); + + const worker = path.join(dir, 'worker.mjs'); + + writeFileSync(worker, ` + console.log("worker running"); + `); + + const file = createTmpFile(` + import { Worker } from 'node:worker_threads'; + new Worker(new URL(${JSON.stringify(pathToFileURL(worker))})); + `, '.mjs', dir); + + const { stderr, stdout } = await runWriteSucceed({ + file, + watchedFile: worker, + }); + + assert.strictEqual(stderr, ''); + assert.deepStrictEqual(stdout, [ + 'worker running', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + `Restarting ${inspect(file)}`, + 'worker running', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + ]); + }); + + it('should watch changes to worker dependencies - esm', async () => { + const dir = tmpdir.resolve(`watch-worker-dep-esm-${Date.now()}`); + mkdirSync(dir); + + const dep = path.join(dir, 'dep.mjs'); + const worker = path.join(dir, 'worker.mjs'); + + writeFileSync(dep, ` + export default 'dep v1'; + `); + + writeFileSync(worker, ` + import dep from ${JSON.stringify(pathToFileURL(dep))}; + console.log(dep); + `); + + const file = createTmpFile(` + import { Worker } from 'node:worker_threads'; + new Worker(new URL(${JSON.stringify(pathToFileURL(worker))})); + `, '.mjs', dir); + + const { stderr, stdout } = await runWriteSucceed({ + file, + watchedFile: dep, + }); + + assert.strictEqual(stderr, ''); + assert.deepStrictEqual(stdout, [ + 'dep v1', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + `Restarting ${inspect(file)}`, + 'dep v1', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + ]); + }); + + it('should watch changes to nested worker dependencies - esm', async () => { + const dir = tmpdir.resolve(`watch-worker-dep-esm-${Date.now()}`); + mkdirSync(dir); + + const subDep = path.join(dir, 'sub-dep.mjs'); + const dep = path.join(dir, 'dep.mjs'); + const worker = path.join(dir, 'worker.mjs'); + + writeFileSync(subDep, ` + export default 'sub-dep v1'; + `); + + writeFileSync(dep, ` + import subDep from ${JSON.stringify(pathToFileURL(subDep))}; + console.log(subDep); + export default 'dep v1'; + `); + + writeFileSync(worker, ` + import dep from ${JSON.stringify(pathToFileURL(dep))}; + `); + + const file = createTmpFile(` + import { Worker } from 'node:worker_threads'; + new Worker(new URL(${JSON.stringify(pathToFileURL(worker))})); + `, '.mjs', dir); + + const { stderr, stdout } = await runWriteSucceed({ + file, + watchedFile: subDep, + }); + + assert.strictEqual(stderr, ''); + assert.deepStrictEqual(stdout, [ + 'sub-dep v1', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + `Restarting ${inspect(file)}`, + 'sub-dep v1', + `Completed running ${inspect(file)}. Waiting for file changes before restarting...`, + ]); + }); +}); \ No newline at end of file