Skip to content

Conversation

@EagleoutIce
Copy link
Member

No description provided.

jang1er and others added 30 commits November 27, 2025 21:45
worker.ts: handles each tasks by calling the appropriate workerTasks
task-registry.ts: contains all definitions for the workerTasks, used by
worker.ts
threadpool.ts: wrapper for tinypool, handles dispatching tasks and
creating / deleting the worker threads
basic dispatch of all files to analyze to threadpool.
currently fails, because path to worker file is not correctly resolved
diff --git a/src/dataflow/extractor.ts b/src/dataflow/extractor.ts
index 34dfe00..83f6be3 100644
--- a/src/dataflow/extractor.ts
+++ b/src/dataflow/extractor.ts
@@ -26,6 +26,8 @@ import type { ControlFlowInformation } from '../control-flow/control-flow-graph'
 import { getBuiltInDefinitions } from './environments/built-in-config';
 import type { FlowrAnalyzerContext } from '../project/context/flowr-analyzer-context';
 import { FlowrFile } from '../project/context/flowr-file';
+import { Threadpool } from './parallel/threadpool';
+import { SourceFilePayload } from './parallel/task-registry';

 /**
  * The best friend of {@link produceDataFlowGraph} and {@link processDataflowFor}.
@@ -118,6 +120,20 @@ export function produceDataFlowGraph<OtherInfo>(
 	};
 	let df = processDataflowFor<OtherInfo>(files[0].root, dfData);

+	// first call with threadpool
+	const pool = new Threadpool();
+
+	// submit all files
+	const result = pool.submitTasks<SourceFilePayload<OtherInfo>, void>(
+		"testPool",
+		files.map((file, i) => ({
+			index: i,
+			file,
+			data: dfData,
+			dataflowInfo: df,
+		}))
+	)
+
 	for(let i = 1; i < files.length; i++) {
 		/* source requests register automatically */
 		df = standaloneSourceFile(i, files[i], dfData, df);
threadpool.ts: contains piscina wrapper
> now handles worker creation with MessagePorts correctly, tasks can
submit more tasks into the queue

worker.ts: actual worker file for threadpool
> handles port registration to main thread, chooses appropriate handler
for tasks, handles subtask submission and collection

task-registry.ts: handler definitions for tasks
> contains relevant types and interfaces, specifies each handler for a
given task

extractor.ts: dataflow extractor
> modified to dispatch dummy call for all files, threadpool creation
currently for each call -> needs to be moved out
- extractor.ts: aggregates all dataflow information and the merges them
  back together via reduction
- built-in-source.ts: dataflow merging is now in seperate function
- feature-def.ts: contains all features and their default value
- feature-manager.ts: exposes functionality for setting and checking the feature flags
worker.ts: handles each tasks by calling the appropriate workerTasks
task-registry.ts: contains all definitions for the workerTasks, used by
worker.ts
threadpool.ts: wrapper for tinypool, handles dispatching tasks and
creating / deleting the worker threads
basic dispatch of all files to analyze to threadpool.
currently fails, because path to worker file is not correctly resolved
diff --git a/src/dataflow/extractor.ts b/src/dataflow/extractor.ts
index 34dfe00..83f6be3 100644
--- a/src/dataflow/extractor.ts
+++ b/src/dataflow/extractor.ts
@@ -26,6 +26,8 @@ import type { ControlFlowInformation } from '../control-flow/control-flow-graph'
 import { getBuiltInDefinitions } from './environments/built-in-config';
 import type { FlowrAnalyzerContext } from '../project/context/flowr-analyzer-context';
 import { FlowrFile } from '../project/context/flowr-file';
+import { Threadpool } from './parallel/threadpool';
+import { SourceFilePayload } from './parallel/task-registry';

 /**
  * The best friend of {@link produceDataFlowGraph} and {@link processDataflowFor}.
@@ -118,6 +120,20 @@ export function produceDataFlowGraph<OtherInfo>(
 	};
 	let df = processDataflowFor<OtherInfo>(files[0].root, dfData);

+	// first call with threadpool
+	const pool = new Threadpool();
+
+	// submit all files
+	const result = pool.submitTasks<SourceFilePayload<OtherInfo>, void>(
+		"testPool",
+		files.map((file, i) => ({
+			index: i,
+			file,
+			data: dfData,
+			dataflowInfo: df,
+		}))
+	)
+
 	for(let i = 1; i < files.length; i++) {
 		/* source requests register automatically */
 		df = standaloneSourceFile(i, files[i], dfData, df);
threadpool.ts: contains piscina wrapper
> now handles worker creation with MessagePorts correctly, tasks can
submit more tasks into the queue

worker.ts: actual worker file for threadpool
> handles port registration to main thread, chooses appropriate handler
for tasks, handles subtask submission and collection

task-registry.ts: handler definitions for tasks
> contains relevant types and interfaces, specifies each handler for a
given task

extractor.ts: dataflow extractor
> modified to dispatch dummy call for all files, threadpool creation
currently for each call -> needs to be moved out
- extractor.ts: aggregates all dataflow information and the merges them
  back together via reduction
- built-in-source.ts: dataflow merging is now in seperate function
- feature-def.ts: contains all features and their default value
- feature-manager.ts: exposes functionality for setting and checking the feature flags
…s' of github.com:flowr-analysis/flowr into 2042-file-level-parallelization-of-the-dataflow-analysis
now uses the threadId provided by piscina
more logging statements <- remove later
diff --git a/src/dataflow/parallel/worker.ts b/src/dataflow/parallel/worker.ts
index 5fd490b..bb0bba2 100644
--- a/src/dataflow/parallel/worker.ts
+++ b/src/dataflow/parallel/worker.ts
@@ -1,4 +1,4 @@
-import { parentPort, MessageChannel, workerData } from 'node:worker_threads';
+import { parentPort, MessageChannel, workerData, threadId } from 'node:worker_threads';
 import type { TaskName } from './task-registry';
 import { workerTasks } from './task-registry';
 import type { SubtaskReceivedMessage } from './threadpool';
@@ -15,27 +15,27 @@ const pending = new Map<
         PendingEntry<unknown>
     >();

+
 const { port1: workerPort, port2: mainPort } = new MessageChannel();

 if(!parentPort){
 	dataflowLogger.error('Worker started without parentPort present, Aborting worker');
 } else {
+	//console.log(`Worker ${workerData.workerId} registering port to main thread.`);
+	console.log(threadId);
 	parentPort.postMessage({
 		type:     'register-port',
-		workerId: typeof workerData === 'object' &&
-					workerData !== null &&
-					typeof (workerData as { id?: number }).id === 'number'
-			? (workerData as { id: number }).id : Math.floor(Math.random() * 1e9),
+		workerId: threadId,
 		port: mainPort,
 	},
 	[mainPort] // transfer port to main thread
 	);
 }

-
 workerPort.on('message', (msg: unknown) => {
 	if(isSubtaskResponseMessage(msg)){
 		const { id, result, error } = msg;
+		console.log(`got response for ${id}`);
 		const entry = pending.get(id);
 		if(!entry) {
 			return;
@@ -59,7 +59,7 @@ async function runSubtask<TInput, TOutput>(taskName: TaskName, taskPayload: TInp
 	//return undefined as unknown as TOutput;
 	return new Promise((resolve, reject) => {
 		pending.set(id, { resolve: resolve as (value: unknown) => void, reject });
-
+		console.log(`submitting subtask with ${id} from ${threadId}`);
 		// submit the subtask to main thread
 		workerPort.postMessage({
 			type: 'subtask',
- fix for broken worker path
- fixed subtask resolution
- allowed deferred dataflow merge
- definitions for clonable dataflow data
- example usage of clonable data
- use workerWrapper to load and register ts-node for the worker file
threadpool now waits for worker initialization to conclude
feature manager is now a class and not an global object
integrated the feature manager into the flowrAnalyzerBuilder and flowrAnalyzer.
Feature Flags and Threadpool are now initialised outside pipeline.
Info is passed along inside flowr context.
diff --git a/src/core/steps/all/core/20-dataflow.ts b/src/core/steps/all/core/20-dataflow.ts
index fb4dc34..9deed5927 100644
--- a/src/core/steps/all/core/20-dataflow.ts
+++ b/src/core/steps/all/core/20-dataflow.ts
@@ -11,6 +11,8 @@ import type { NormalizedAst } from '../../../../r-bridge/lang-4.x/ast/model/proc
 import { produceDataFlowGraph } from '../../../../dataflow/extractor';
 import type { KnownParserType, Parser } from '../../../../r-bridge/parser';
 import type { FlowrAnalyzerContext } from '../../../../project/context/flowr-analyzer-context';
+import { FeatureManager } from '../../../feature-flags/feature-manager';
+import { Threadpool } from '../../../../dataflow/parallel/threadpool';

 const staticDataflowCommon = {
 	name:        'dataflow',
@@ -26,8 +28,15 @@ const staticDataflowCommon = {
 	dependencies: [ 'normalize' ],
 } as const;

-function processor(results: { normalize?: NormalizedAst }, input: { parser?: Parser<KnownParserType>, context?: FlowrAnalyzerContext }) {
-	return produceDataFlowGraph(input.parser as Parser<KnownParserType>, results.normalize as NormalizedAst, input.context as FlowrAnalyzerContext);
+function processor(
+	results: { normalize?: NormalizedAst },
+	input: { parser?: Parser<KnownParserType>,
+	context?: FlowrAnalyzerContext,}) {
+	return produceDataFlowGraph(
+		input.parser as Parser<KnownParserType>,
+		results.normalize as NormalizedAst,
+		input.context as FlowrAnalyzerContext,
+	);
 }

 export const STATIC_DATAFLOW = {
diff --git a/src/dataflow/extractor.ts b/src/dataflow/extractor.ts
index ae99208..f0025f033 100644
--- a/src/dataflow/extractor.ts
+++ b/src/dataflow/extractor.ts
@@ -28,6 +28,9 @@ import type { NodeId } from '../r-bridge/lang-4.x/ast/model/processing/node-id';
 import type { DataflowGraphVertexFunctionCall } from './graph/vertex';
 import { Threadpool } from './parallel/threadpool';
 import { toClonableDataflowProcessorInfo } from './parallel/clonable-data';
+import { FeatureManager } from '../core/feature-flags/feature-manager';
+import { featureFlags } from '../core/feature-flags/feature-def';
+import { dataflowLogger } from './logger';

 /**
  * The best friend of {@link produceDataFlowGraph} and {@link processDataflowFor}.
@@ -107,7 +110,7 @@ function resolveLinkToSideEffects(ast: NormalizedAst, graph: DataflowGraph) {
 export function produceDataFlowGraph<OtherInfo>(
 	parser:      Parser<KnownParserType>,
 	completeAst: NormalizedAst<OtherInfo & ParentInformation>,
-	ctx:         FlowrAnalyzerContext
+	ctx:         FlowrAnalyzerContext,
 ): DataflowInformation & { cfgQuick: ControlFlowInformation | undefined } {

 	// we freeze the files here to avoid endless modifications during processing
@@ -115,6 +118,10 @@ export function produceDataFlowGraph<OtherInfo>(

 	ctx.files.addConsideredFile(files[0].filePath ? files[0].filePath : FlowrFile.INLINE_PATH);

+	const features = ctx.features;
+	const fileParallelization = features.isEnabled('paralleliseFiles');
+	const workerPool = fileParallelization ? ctx.workerPool ?? new Threadpool() : undefined;
+
 	const dfData: DataflowProcessorInformation<OtherInfo & ParentInformation> = {
 		parser,
 		completeAst,
@@ -140,8 +147,14 @@ export function produceDataFlowGraph<OtherInfo>(
 	structuredClone(dfData.controlDependencies);
 	structuredClone(dfData.referenceChain);

-	console.log('Cloning CTX');
-	//structuredClone(JSON.stringify(dfData.ctx));
+	//console.log('Cloning CTX files');
+	//structuredClone(dfData.ctx.files);
+	//console.log('Cloning CTX deps');
+	//structuredClone(dfData.ctx.deps);
+	//console.log('Cloning CTX config');
+	//structuredClone(dfData.ctx.config);
+	//console.log('Cloning CTX env');
+	//structuredClone(dfData.ctx.env);

 	const clonable = toClonableDataflowProcessorInfo(dfData);
 	structuredClone(clonable);
@@ -159,23 +172,27 @@ export function produceDataFlowGraph<OtherInfo>(
 	// construct clonable DataflowInformation

-	// first call with threadpool
-	const pool = new Threadpool();
-
-	// submit all files
-	const _result = pool.submitTasks(
-		'testPool',
-		files.map((file, i) => ({
-			index:        i,
-			file,
-			data:         undefined as unknown as DataflowProcessorInformation<OtherInfo & ParentInformation>,
-			dataflowInfo: undefined as unknown as DataflowInformation
-		}))
-	);
+	if(fileParallelization){
+		// parallelise the dataflow graph analysis
+		// submit all files
+		const _result = workerPool!.submitTasks(
+			'testPool',
+			files.map((file, i) => ({
+				index:        i,
+				file,
+				data:         undefined as unknown as DataflowProcessorInformation<OtherInfo & ParentInformation>,
+				dataflowInfo: undefined as unknown as DataflowInformation
+			}))
+		);
+
+		void _result.then( () => {
+			workerPool!.closePool();
+		});
+
+	} else {
+		// use the sequential analysis
+	}

-	void _result.then( () => {
-		pool.closePool();
-	});

 	for(let i = 1; i < files.length; i++) {
 		/* source requests register automatically */
diff --git a/src/project/context/flowr-analyzer-context.ts b/src/project/context/flowr-analyzer-context.ts
index ebff6ad..41e07b8cd 100644
--- a/src/project/context/flowr-analyzer-context.ts
+++ b/src/project/context/flowr-analyzer-context.ts
@@ -28,6 +28,8 @@ import type { FlowrFileProvider } from './flowr-file';
 import { FlowrInlineTextFile } from './flowr-file';
 import type { ReadOnlyFlowrAnalyzerEnvironmentContext } from './flowr-analyzer-environment-context';
 import { FlowrAnalyzerEnvironmentContext } from './flowr-analyzer-environment-context';
+import { FeatureManager } from '../../core/feature-flags/feature-manager';
+import { Threadpool } from '../../dataflow/parallel/threadpool';

 /**
  * This is a read-only interface to the {@link FlowrAnalyzerContext}.
@@ -69,16 +71,20 @@ export class FlowrAnalyzerContext implements ReadOnlyFlowrAnalyzerContext {
 	public readonly files: FlowrAnalyzerFilesContext;
 	public readonly deps:  FlowrAnalyzerDependenciesContext;
 	public readonly env:   FlowrAnalyzerEnvironmentContext;
+	public readonly features: FeatureManager;
+	public readonly workerPool?: Threadpool;

 	public readonly config: FlowrConfigOptions;

-	constructor(config: FlowrConfigOptions, plugins: ReadonlyMap<PluginType, readonly FlowrAnalyzerPlugin[]>) {
+	constructor(config: FlowrConfigOptions, plugins: ReadonlyMap<PluginType, readonly FlowrAnalyzerPlugin[]>, features = new FeatureManager(), workerPool?: Threadpool) {
 		this.config = config;
 		const loadingOrder = new FlowrAnalyzerLoadingOrderContext(this, plugins.get(PluginType.LoadingOrder) as FlowrAnalyzerLoadingOrderPlugin[]);
 		this.files = new FlowrAnalyzerFilesContext(loadingOrder, (plugins.get(PluginType.ProjectDiscovery) ?? []) as FlowrAnalyzerProjectDiscoveryPlugin[],
             (plugins.get(PluginType.FileLoad) ?? []) as FlowrAnalyzerFilePlugin[]);
 		this.deps  = new FlowrAnalyzerDependenciesContext(this, (plugins.get(PluginType.DependencyIdentification) ?? []) as FlowrAnalyzerPackageVersionsPlugin[]);
 		this.env   = new FlowrAnalyzerEnvironmentContext(this);
+		this.features = features;
+		this.workerPool = workerPool;
 	}

 	/** delegate request addition */
diff --git a/src/project/flowr-analyzer-builder.ts b/src/project/flowr-analyzer-builder.ts
index 4cb19f8..e127a54b1 100644
--- a/src/project/flowr-analyzer-builder.ts
+++ b/src/project/flowr-analyzer-builder.ts
@@ -13,6 +13,7 @@ import type { BuiltInFlowrPluginName, PluginToRegister } from './plugins/plugin-
 import { makePlugin } from './plugins/plugin-registry';
 import { FeatureManager } from '../core/feature-flags/feature-manager';
 import { FeatureFlag } from '../core/feature-flags/feature-def';
+import { Threadpool } from '../dataflow/parallel/threadpool';

 /**
  * Builder for the {@link FlowrAnalyzer}, use it to configure all analysis aspects before creating the analyzer instance
@@ -188,7 +189,10 @@ export class FlowrAnalyzerBuilder {
 	public buildSync(): FlowrAnalyzer {
 		guard(this.parser !== undefined, 'No parser set, please use the setParser or setEngine method to set a parser before building the analyzer');

-		const context = new FlowrAnalyzerContext(this.flowrConfig, this.plugins);
+		let workerPool = undefined;
+		if(this.features.isEnabled('paralleliseFiles'))workerPool = new Threadpool();
+
+		const context = new FlowrAnalyzerContext(this.flowrConfig, this.plugins, this.features, workerPool);
 		const cache = FlowrAnalyzerCache.create({
 			parser: this.parser,
 			context,
@@ -199,7 +203,6 @@ export class FlowrAnalyzerBuilder {
 			this.parser,
 			context,
 			cache,
-			this.features,
 		);

 		// we do it here to save time later if the analyzer is to be duplicated
diff --git a/src/project/flowr-analyzer.ts b/src/project/flowr-analyzer.ts
index ac63d91..b37217d9c 100644
--- a/src/project/flowr-analyzer.ts
+++ b/src/project/flowr-analyzer.ts
@@ -142,8 +142,6 @@ export class FlowrAnalyzer<Parser extends KnownParser = KnownParser> implements
 	private readonly cache:  FlowrAnalyzerCache<Parser>;
 	private readonly ctx:    FlowrAnalyzerContext;
 	private parserInfo:      KnownParserInformation | undefined;
-	private features: FeatureManager;
-	private workerPool: Threadpool | undefined;

 	/**
 	 * Create a new analyzer instance.
@@ -152,15 +150,10 @@ export class FlowrAnalyzer<Parser extends KnownParser = KnownParser> implements
 	 * @param ctx    - The context to use for the analyses.
 	 * @param cache  - The caching layer to use for storing analysis results.
 	 */
-	constructor(parser: Parser, ctx: FlowrAnalyzerContext, cache: FlowrAnalyzerCache<Parser>, features: FeatureManager) {
+	constructor(parser: Parser, ctx: FlowrAnalyzerContext, cache: FlowrAnalyzerCache<Parser>) {
 		this.parser = parser;
 		this.ctx = ctx;
 		this.cache = cache;
-		this.features = features;
-		if(this.features.isEnabled('paralleliseFiles')){
-			// create worker Pool
-			this.workerPool = new Threadpool();
-		}
 	}

 	public get flowrConfig(): FlowrConfigOptions {
@@ -278,7 +271,7 @@ export class FlowrAnalyzer<Parser extends KnownParser = KnownParser> implements
 	 * @param state - boolean state
 	 */
 	public setFeatureState(feature: FeatureFlag, state: boolean): this{
-		this.features.setFlag(feature, state);
+		this.ctx.features.setFlag(feature, state);
 		return this;
 	}

@@ -287,7 +280,7 @@ export class FlowrAnalyzer<Parser extends KnownParser = KnownParser> implements
 	 * @param feature - feature to set
 	 */
 	public setFeature(feature: FeatureFlag): this{
-		this.features.setFlag(feature, true);
+		this.ctx.features.setFlag(feature, true);
 		return this;
 	}

@@ -296,7 +289,7 @@ export class FlowrAnalyzer<Parser extends KnownParser = KnownParser> implements
 	 * @param feature - feature to set
 	 */
 	public unsetFeature(feature: FeatureFlag): this{
-		this.features.setFlag(feature, false);
+		this.ctx.features.setFlag(feature, false);
 		return this;
 	}
changed default settings of parallelization to disabled.
Sequential is used as fallbac for parallelization.
@EagleoutIce EagleoutIce changed the base branch from staging/jonas to main December 16, 2025 11:07
jang1er and others added 12 commits December 16, 2025 15:42
linter added breaking type imports (why)
- env (even builtin) is now serializable
- added env diff function to compare
defined several simple tests
- single file dataflow
- multifile dataflow
- serialize dataflow graph
- serialize dataflow env
- added Serialization for DataflowInformation
- Task submission to workqueue
- merging of all information
- tests for file parallelization
- tests now correctly behave
- workers are closed if analyzer is closed
- logging pipe from worker to main thread
- threadpool asserts it is alive
- plugins get correctly serialized
Previous feature flags are now integrated in flworConfigOptions.
Added standard workerPool Options to flowrConfig
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants