-
Notifications
You must be signed in to change notification settings - Fork 10
file level parallelization of the dataflow analysis #2088
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
EagleoutIce
wants to merge
45
commits into
main
Choose a base branch
from
2042-file-level-parallelization-of-the-dataflow-analysis
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
file level parallelization of the dataflow analysis #2088
EagleoutIce
wants to merge
45
commits into
main
from
2042-file-level-parallelization-of-the-dataflow-analysis
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
worker.ts: handles each tasks by calling the appropriate workerTasks task-registry.ts: contains all definitions for the workerTasks, used by worker.ts threadpool.ts: wrapper for tinypool, handles dispatching tasks and creating / deleting the worker threads
basic dispatch of all files to analyze to threadpool. currently fails, because path to worker file is not correctly resolved diff --git a/src/dataflow/extractor.ts b/src/dataflow/extractor.ts index 34dfe00..83f6be3 100644 --- a/src/dataflow/extractor.ts +++ b/src/dataflow/extractor.ts @@ -26,6 +26,8 @@ import type { ControlFlowInformation } from '../control-flow/control-flow-graph' import { getBuiltInDefinitions } from './environments/built-in-config'; import type { FlowrAnalyzerContext } from '../project/context/flowr-analyzer-context'; import { FlowrFile } from '../project/context/flowr-file'; +import { Threadpool } from './parallel/threadpool'; +import { SourceFilePayload } from './parallel/task-registry'; /** * The best friend of {@link produceDataFlowGraph} and {@link processDataflowFor}. @@ -118,6 +120,20 @@ export function produceDataFlowGraph<OtherInfo>( }; let df = processDataflowFor<OtherInfo>(files[0].root, dfData); + // first call with threadpool + const pool = new Threadpool(); + + // submit all files + const result = pool.submitTasks<SourceFilePayload<OtherInfo>, void>( + "testPool", + files.map((file, i) => ({ + index: i, + file, + data: dfData, + dataflowInfo: df, + })) + ) + for(let i = 1; i < files.length; i++) { /* source requests register automatically */ df = standaloneSourceFile(i, files[i], dfData, df);
threadpool.ts: contains piscina wrapper > now handles worker creation with MessagePorts correctly, tasks can submit more tasks into the queue worker.ts: actual worker file for threadpool > handles port registration to main thread, chooses appropriate handler for tasks, handles subtask submission and collection task-registry.ts: handler definitions for tasks > contains relevant types and interfaces, specifies each handler for a given task extractor.ts: dataflow extractor > modified to dispatch dummy call for all files, threadpool creation currently for each call -> needs to be moved out
- extractor.ts: aggregates all dataflow information and the merges them back together via reduction - built-in-source.ts: dataflow merging is now in seperate function
- feature-def.ts: contains all features and their default value - feature-manager.ts: exposes functionality for setting and checking the feature flags
worker.ts: handles each tasks by calling the appropriate workerTasks task-registry.ts: contains all definitions for the workerTasks, used by worker.ts threadpool.ts: wrapper for tinypool, handles dispatching tasks and creating / deleting the worker threads
basic dispatch of all files to analyze to threadpool. currently fails, because path to worker file is not correctly resolved diff --git a/src/dataflow/extractor.ts b/src/dataflow/extractor.ts index 34dfe00..83f6be3 100644 --- a/src/dataflow/extractor.ts +++ b/src/dataflow/extractor.ts @@ -26,6 +26,8 @@ import type { ControlFlowInformation } from '../control-flow/control-flow-graph' import { getBuiltInDefinitions } from './environments/built-in-config'; import type { FlowrAnalyzerContext } from '../project/context/flowr-analyzer-context'; import { FlowrFile } from '../project/context/flowr-file'; +import { Threadpool } from './parallel/threadpool'; +import { SourceFilePayload } from './parallel/task-registry'; /** * The best friend of {@link produceDataFlowGraph} and {@link processDataflowFor}. @@ -118,6 +120,20 @@ export function produceDataFlowGraph<OtherInfo>( }; let df = processDataflowFor<OtherInfo>(files[0].root, dfData); + // first call with threadpool + const pool = new Threadpool(); + + // submit all files + const result = pool.submitTasks<SourceFilePayload<OtherInfo>, void>( + "testPool", + files.map((file, i) => ({ + index: i, + file, + data: dfData, + dataflowInfo: df, + })) + ) + for(let i = 1; i < files.length; i++) { /* source requests register automatically */ df = standaloneSourceFile(i, files[i], dfData, df);
threadpool.ts: contains piscina wrapper > now handles worker creation with MessagePorts correctly, tasks can submit more tasks into the queue worker.ts: actual worker file for threadpool > handles port registration to main thread, chooses appropriate handler for tasks, handles subtask submission and collection task-registry.ts: handler definitions for tasks > contains relevant types and interfaces, specifies each handler for a given task extractor.ts: dataflow extractor > modified to dispatch dummy call for all files, threadpool creation currently for each call -> needs to be moved out
- extractor.ts: aggregates all dataflow information and the merges them back together via reduction - built-in-source.ts: dataflow merging is now in seperate function
- feature-def.ts: contains all features and their default value - feature-manager.ts: exposes functionality for setting and checking the feature flags
…s' of github.com:flowr-analysis/flowr into 2042-file-level-parallelization-of-the-dataflow-analysis
now uses the threadId provided by piscina more logging statements <- remove later diff --git a/src/dataflow/parallel/worker.ts b/src/dataflow/parallel/worker.ts index 5fd490b..bb0bba2 100644 --- a/src/dataflow/parallel/worker.ts +++ b/src/dataflow/parallel/worker.ts @@ -1,4 +1,4 @@ -import { parentPort, MessageChannel, workerData } from 'node:worker_threads'; +import { parentPort, MessageChannel, workerData, threadId } from 'node:worker_threads'; import type { TaskName } from './task-registry'; import { workerTasks } from './task-registry'; import type { SubtaskReceivedMessage } from './threadpool'; @@ -15,27 +15,27 @@ const pending = new Map< PendingEntry<unknown> >(); + const { port1: workerPort, port2: mainPort } = new MessageChannel(); if(!parentPort){ dataflowLogger.error('Worker started without parentPort present, Aborting worker'); } else { + //console.log(`Worker ${workerData.workerId} registering port to main thread.`); + console.log(threadId); parentPort.postMessage({ type: 'register-port', - workerId: typeof workerData === 'object' && - workerData !== null && - typeof (workerData as { id?: number }).id === 'number' - ? (workerData as { id: number }).id : Math.floor(Math.random() * 1e9), + workerId: threadId, port: mainPort, }, [mainPort] // transfer port to main thread ); } - workerPort.on('message', (msg: unknown) => { if(isSubtaskResponseMessage(msg)){ const { id, result, error } = msg; + console.log(`got response for ${id}`); const entry = pending.get(id); if(!entry) { return; @@ -59,7 +59,7 @@ async function runSubtask<TInput, TOutput>(taskName: TaskName, taskPayload: TInp //return undefined as unknown as TOutput; return new Promise((resolve, reject) => { pending.set(id, { resolve: resolve as (value: unknown) => void, reject }); - + console.log(`submitting subtask with ${id} from ${threadId}`); // submit the subtask to main thread workerPort.postMessage({ type: 'subtask',
- fix for broken worker path - fixed subtask resolution - allowed deferred dataflow merge
- definitions for clonable dataflow data - example usage of clonable data
- use workerWrapper to load and register ts-node for the worker file
threadpool now waits for worker initialization to conclude
feature manager is now a class and not an global object
integrated the feature manager into the flowrAnalyzerBuilder and flowrAnalyzer.
Feature Flags and Threadpool are now initialised outside pipeline. Info is passed along inside flowr context. diff --git a/src/core/steps/all/core/20-dataflow.ts b/src/core/steps/all/core/20-dataflow.ts index fb4dc34..9deed5927 100644 --- a/src/core/steps/all/core/20-dataflow.ts +++ b/src/core/steps/all/core/20-dataflow.ts @@ -11,6 +11,8 @@ import type { NormalizedAst } from '../../../../r-bridge/lang-4.x/ast/model/proc import { produceDataFlowGraph } from '../../../../dataflow/extractor'; import type { KnownParserType, Parser } from '../../../../r-bridge/parser'; import type { FlowrAnalyzerContext } from '../../../../project/context/flowr-analyzer-context'; +import { FeatureManager } from '../../../feature-flags/feature-manager'; +import { Threadpool } from '../../../../dataflow/parallel/threadpool'; const staticDataflowCommon = { name: 'dataflow', @@ -26,8 +28,15 @@ const staticDataflowCommon = { dependencies: [ 'normalize' ], } as const; -function processor(results: { normalize?: NormalizedAst }, input: { parser?: Parser<KnownParserType>, context?: FlowrAnalyzerContext }) { - return produceDataFlowGraph(input.parser as Parser<KnownParserType>, results.normalize as NormalizedAst, input.context as FlowrAnalyzerContext); +function processor( + results: { normalize?: NormalizedAst }, + input: { parser?: Parser<KnownParserType>, + context?: FlowrAnalyzerContext,}) { + return produceDataFlowGraph( + input.parser as Parser<KnownParserType>, + results.normalize as NormalizedAst, + input.context as FlowrAnalyzerContext, + ); } export const STATIC_DATAFLOW = { diff --git a/src/dataflow/extractor.ts b/src/dataflow/extractor.ts index ae99208..f0025f033 100644 --- a/src/dataflow/extractor.ts +++ b/src/dataflow/extractor.ts @@ -28,6 +28,9 @@ import type { NodeId } from '../r-bridge/lang-4.x/ast/model/processing/node-id'; import type { DataflowGraphVertexFunctionCall } from './graph/vertex'; import { Threadpool } from './parallel/threadpool'; import { toClonableDataflowProcessorInfo } from './parallel/clonable-data'; +import { FeatureManager } from '../core/feature-flags/feature-manager'; +import { featureFlags } from '../core/feature-flags/feature-def'; +import { dataflowLogger } from './logger'; /** * The best friend of {@link produceDataFlowGraph} and {@link processDataflowFor}. @@ -107,7 +110,7 @@ function resolveLinkToSideEffects(ast: NormalizedAst, graph: DataflowGraph) { export function produceDataFlowGraph<OtherInfo>( parser: Parser<KnownParserType>, completeAst: NormalizedAst<OtherInfo & ParentInformation>, - ctx: FlowrAnalyzerContext + ctx: FlowrAnalyzerContext, ): DataflowInformation & { cfgQuick: ControlFlowInformation | undefined } { // we freeze the files here to avoid endless modifications during processing @@ -115,6 +118,10 @@ export function produceDataFlowGraph<OtherInfo>( ctx.files.addConsideredFile(files[0].filePath ? files[0].filePath : FlowrFile.INLINE_PATH); + const features = ctx.features; + const fileParallelization = features.isEnabled('paralleliseFiles'); + const workerPool = fileParallelization ? ctx.workerPool ?? new Threadpool() : undefined; + const dfData: DataflowProcessorInformation<OtherInfo & ParentInformation> = { parser, completeAst, @@ -140,8 +147,14 @@ export function produceDataFlowGraph<OtherInfo>( structuredClone(dfData.controlDependencies); structuredClone(dfData.referenceChain); - console.log('Cloning CTX'); - //structuredClone(JSON.stringify(dfData.ctx)); + //console.log('Cloning CTX files'); + //structuredClone(dfData.ctx.files); + //console.log('Cloning CTX deps'); + //structuredClone(dfData.ctx.deps); + //console.log('Cloning CTX config'); + //structuredClone(dfData.ctx.config); + //console.log('Cloning CTX env'); + //structuredClone(dfData.ctx.env); const clonable = toClonableDataflowProcessorInfo(dfData); structuredClone(clonable); @@ -159,23 +172,27 @@ export function produceDataFlowGraph<OtherInfo>( // construct clonable DataflowInformation - // first call with threadpool - const pool = new Threadpool(); - - // submit all files - const _result = pool.submitTasks( - 'testPool', - files.map((file, i) => ({ - index: i, - file, - data: undefined as unknown as DataflowProcessorInformation<OtherInfo & ParentInformation>, - dataflowInfo: undefined as unknown as DataflowInformation - })) - ); + if(fileParallelization){ + // parallelise the dataflow graph analysis + // submit all files + const _result = workerPool!.submitTasks( + 'testPool', + files.map((file, i) => ({ + index: i, + file, + data: undefined as unknown as DataflowProcessorInformation<OtherInfo & ParentInformation>, + dataflowInfo: undefined as unknown as DataflowInformation + })) + ); + + void _result.then( () => { + workerPool!.closePool(); + }); + + } else { + // use the sequential analysis + } - void _result.then( () => { - pool.closePool(); - }); for(let i = 1; i < files.length; i++) { /* source requests register automatically */ diff --git a/src/project/context/flowr-analyzer-context.ts b/src/project/context/flowr-analyzer-context.ts index ebff6ad..41e07b8cd 100644 --- a/src/project/context/flowr-analyzer-context.ts +++ b/src/project/context/flowr-analyzer-context.ts @@ -28,6 +28,8 @@ import type { FlowrFileProvider } from './flowr-file'; import { FlowrInlineTextFile } from './flowr-file'; import type { ReadOnlyFlowrAnalyzerEnvironmentContext } from './flowr-analyzer-environment-context'; import { FlowrAnalyzerEnvironmentContext } from './flowr-analyzer-environment-context'; +import { FeatureManager } from '../../core/feature-flags/feature-manager'; +import { Threadpool } from '../../dataflow/parallel/threadpool'; /** * This is a read-only interface to the {@link FlowrAnalyzerContext}. @@ -69,16 +71,20 @@ export class FlowrAnalyzerContext implements ReadOnlyFlowrAnalyzerContext { public readonly files: FlowrAnalyzerFilesContext; public readonly deps: FlowrAnalyzerDependenciesContext; public readonly env: FlowrAnalyzerEnvironmentContext; + public readonly features: FeatureManager; + public readonly workerPool?: Threadpool; public readonly config: FlowrConfigOptions; - constructor(config: FlowrConfigOptions, plugins: ReadonlyMap<PluginType, readonly FlowrAnalyzerPlugin[]>) { + constructor(config: FlowrConfigOptions, plugins: ReadonlyMap<PluginType, readonly FlowrAnalyzerPlugin[]>, features = new FeatureManager(), workerPool?: Threadpool) { this.config = config; const loadingOrder = new FlowrAnalyzerLoadingOrderContext(this, plugins.get(PluginType.LoadingOrder) as FlowrAnalyzerLoadingOrderPlugin[]); this.files = new FlowrAnalyzerFilesContext(loadingOrder, (plugins.get(PluginType.ProjectDiscovery) ?? []) as FlowrAnalyzerProjectDiscoveryPlugin[], (plugins.get(PluginType.FileLoad) ?? []) as FlowrAnalyzerFilePlugin[]); this.deps = new FlowrAnalyzerDependenciesContext(this, (plugins.get(PluginType.DependencyIdentification) ?? []) as FlowrAnalyzerPackageVersionsPlugin[]); this.env = new FlowrAnalyzerEnvironmentContext(this); + this.features = features; + this.workerPool = workerPool; } /** delegate request addition */ diff --git a/src/project/flowr-analyzer-builder.ts b/src/project/flowr-analyzer-builder.ts index 4cb19f8..e127a54b1 100644 --- a/src/project/flowr-analyzer-builder.ts +++ b/src/project/flowr-analyzer-builder.ts @@ -13,6 +13,7 @@ import type { BuiltInFlowrPluginName, PluginToRegister } from './plugins/plugin- import { makePlugin } from './plugins/plugin-registry'; import { FeatureManager } from '../core/feature-flags/feature-manager'; import { FeatureFlag } from '../core/feature-flags/feature-def'; +import { Threadpool } from '../dataflow/parallel/threadpool'; /** * Builder for the {@link FlowrAnalyzer}, use it to configure all analysis aspects before creating the analyzer instance @@ -188,7 +189,10 @@ export class FlowrAnalyzerBuilder { public buildSync(): FlowrAnalyzer { guard(this.parser !== undefined, 'No parser set, please use the setParser or setEngine method to set a parser before building the analyzer'); - const context = new FlowrAnalyzerContext(this.flowrConfig, this.plugins); + let workerPool = undefined; + if(this.features.isEnabled('paralleliseFiles'))workerPool = new Threadpool(); + + const context = new FlowrAnalyzerContext(this.flowrConfig, this.plugins, this.features, workerPool); const cache = FlowrAnalyzerCache.create({ parser: this.parser, context, @@ -199,7 +203,6 @@ export class FlowrAnalyzerBuilder { this.parser, context, cache, - this.features, ); // we do it here to save time later if the analyzer is to be duplicated diff --git a/src/project/flowr-analyzer.ts b/src/project/flowr-analyzer.ts index ac63d91..b37217d9c 100644 --- a/src/project/flowr-analyzer.ts +++ b/src/project/flowr-analyzer.ts @@ -142,8 +142,6 @@ export class FlowrAnalyzer<Parser extends KnownParser = KnownParser> implements private readonly cache: FlowrAnalyzerCache<Parser>; private readonly ctx: FlowrAnalyzerContext; private parserInfo: KnownParserInformation | undefined; - private features: FeatureManager; - private workerPool: Threadpool | undefined; /** * Create a new analyzer instance. @@ -152,15 +150,10 @@ export class FlowrAnalyzer<Parser extends KnownParser = KnownParser> implements * @param ctx - The context to use for the analyses. * @param cache - The caching layer to use for storing analysis results. */ - constructor(parser: Parser, ctx: FlowrAnalyzerContext, cache: FlowrAnalyzerCache<Parser>, features: FeatureManager) { + constructor(parser: Parser, ctx: FlowrAnalyzerContext, cache: FlowrAnalyzerCache<Parser>) { this.parser = parser; this.ctx = ctx; this.cache = cache; - this.features = features; - if(this.features.isEnabled('paralleliseFiles')){ - // create worker Pool - this.workerPool = new Threadpool(); - } } public get flowrConfig(): FlowrConfigOptions { @@ -278,7 +271,7 @@ export class FlowrAnalyzer<Parser extends KnownParser = KnownParser> implements * @param state - boolean state */ public setFeatureState(feature: FeatureFlag, state: boolean): this{ - this.features.setFlag(feature, state); + this.ctx.features.setFlag(feature, state); return this; } @@ -287,7 +280,7 @@ export class FlowrAnalyzer<Parser extends KnownParser = KnownParser> implements * @param feature - feature to set */ public setFeature(feature: FeatureFlag): this{ - this.features.setFlag(feature, true); + this.ctx.features.setFlag(feature, true); return this; } @@ -296,7 +289,7 @@ export class FlowrAnalyzer<Parser extends KnownParser = KnownParser> implements * @param feature - feature to set */ public unsetFeature(feature: FeatureFlag): this{ - this.features.setFlag(feature, false); + this.ctx.features.setFlag(feature, false); return this; }
changed default settings of parallelization to disabled. Sequential is used as fallbac for parallelization.
linter added breaking type imports (why)
- env (even builtin) is now serializable - added env diff function to compare
defined several simple tests - single file dataflow - multifile dataflow - serialize dataflow graph - serialize dataflow env
- added Serialization for DataflowInformation - Task submission to workqueue - merging of all information - tests for file parallelization
- tests now correctly behave - workers are closed if analyzer is closed - logging pipe from worker to main thread - threadpool asserts it is alive - plugins get correctly serialized
Previous feature flags are now integrated in flworConfigOptions. Added standard workerPool Options to flowrConfig
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
No description provided.