@@ -2,34 +2,40 @@ import { dirname, normalize } from 'node:path'
22
33import { WdioExtensionWorker } from './worker.js'
44import { log } from '../utils/logger.js'
5-
65import type * as vscode from 'vscode'
76
87export class ServerManager implements vscode . Disposable {
98 private _serverPool = new Map < string , WdioExtensionWorker > ( )
9+ private _pendingOperations = new Map < string , Promise < WdioExtensionWorker | void > > ( )
1010 private latestId = 0
11+ // Semaphore to track the overall operation (for complete sequential execution)
12+ private _operationLock = false
13+ private _operationQueue : ( ( ) => Promise < void > ) [ ] = [ ]
1114
1215 /**
1316 * Start worker process directory by directory which is located the wdio config file.
1417 * @param configPaths path to the configuration file for wdio (e.g. /path/to/wdio.config.js)
1518 */
1619 public async start ( configPaths : string [ ] ) {
17- const duplicatedWorkerCwds = new Set < string > ( )
18- configPaths . forEach ( ( configPath ) => {
19- const normalizedConfigPath = normalize ( configPath )
20- const wdioDirName = dirname ( normalizedConfigPath )
21- duplicatedWorkerCwds . add ( wdioDirName )
22- } )
20+ // Add to queue and then execute the process
21+ return this . queueOperation ( async ( ) => {
22+ const duplicatedWorkerCwds = new Set < string > ( )
23+ configPaths . forEach ( ( configPath ) => {
24+ const normalizedConfigPath = normalize ( configPath )
25+ const wdioDirName = dirname ( normalizedConfigPath )
26+ duplicatedWorkerCwds . add ( wdioDirName )
27+ } )
2328
24- const workerCwds = Array . from ( duplicatedWorkerCwds )
25- const ids = Array . from ( { length : workerCwds . length } , ( _ , i ) => i )
26- this . latestId = ids [ ids . length - 1 ]
29+ const workerCwds = Array . from ( duplicatedWorkerCwds )
30+ const ids = Array . from ( { length : workerCwds . length } , ( _ , i ) => i )
31+ this . latestId = ids [ ids . length - 1 ]
2732
28- await Promise . all (
29- workerCwds . map ( async ( workerCwd , index ) => {
30- await this . startWorker ( ids [ index ] , workerCwd )
31- } )
32- )
33+ await Promise . all (
34+ workerCwds . map ( async ( workerCwd , index ) => {
35+ await this . startWorker ( ids [ index ] , workerCwd )
36+ } )
37+ )
38+ } )
3339 }
3440
3541 /**
@@ -38,31 +44,179 @@ export class ServerManager implements vscode.Disposable {
3844 * @returns proper the connection server and worker
3945 */
4046 public async getConnection ( configPaths : string ) {
41- const normalizedConfigPath = normalize ( configPaths )
42- const wdioDirName = dirname ( normalizedConfigPath )
43- log . debug ( `[server manager] detecting server: ${ wdioDirName } ` )
44- const server = this . _serverPool . get ( wdioDirName )
45- if ( server ) {
47+ return this . queueOperation ( async ( ) => {
48+ const normalizedConfigPath = normalize ( configPaths )
49+ const wdioDirName = dirname ( normalizedConfigPath )
50+ log . debug ( `[server manager] detecting server: ${ wdioDirName } ` )
51+ const server = this . _serverPool . get ( wdioDirName )
52+ if ( server ) {
53+ return server
54+ }
55+ this . latestId ++
56+ return this . startWorker ( this . latestId , dirname ( configPaths ) )
57+ } )
58+ }
59+
60+ /**
61+ * Reorganize workers by stopping unnecessary ones and starting new ones
62+ * @param configPaths new configuration paths to maintain
63+ */
64+ public async reorganize ( configPaths : string [ ] ) {
65+ return this . queueOperation ( async ( ) => {
66+ // Create a set of new configuration paths
67+ const newConfigDirs = new Set < string > ( )
68+ configPaths . forEach ( ( configPath ) => {
69+ const normalizedConfigPath = normalize ( configPath )
70+ const wdioDirName = dirname ( normalizedConfigPath )
71+ newConfigDirs . add ( wdioDirName )
72+ } )
73+
74+ // Find workers that need to be stopped
75+ const stoppingPromises : Promise < void > [ ] = [ ]
76+ for ( const [ cwd , worker ] of this . _serverPool . entries ( ) ) {
77+ if ( ! newConfigDirs . has ( cwd ) ) {
78+ log . debug ( `[server manager] stopping unnecessary worker: ${ cwd } ` )
79+ stoppingPromises . push ( this . stopWorker ( cwd , worker ) )
80+ }
81+ }
82+
83+ // Stop unnecessary workers
84+ if ( stoppingPromises . length > 0 ) {
85+ await Promise . all ( stoppingPromises )
86+ }
87+
88+ // Start new workers
89+ const startingPromises : Promise < WdioExtensionWorker > [ ] = [ ]
90+ const workerCwds = Array . from ( newConfigDirs )
91+ for ( const cwd of workerCwds ) {
92+ if ( ! this . _serverPool . has ( cwd ) ) {
93+ this . latestId ++
94+ startingPromises . push ( this . startWorker ( this . latestId , cwd ) )
95+ }
96+ }
97+
98+ // Wait for new workers to start
99+ if ( startingPromises . length > 0 ) {
100+ await Promise . all ( startingPromises )
101+ }
102+ } )
103+ }
104+
105+ private async queueOperation < T > ( operation : ( ) => Promise < T > ) : Promise < T > {
106+ // Execute immediately if no operation is in progress
107+ if ( ! this . _operationLock ) {
108+ this . _operationLock = true
109+ try {
110+ return await operation ( )
111+ } finally {
112+ this . _operationLock = false
113+ this . processQueue ( )
114+ }
115+ }
116+
117+ // Add operation to the queue
118+ return new Promise < T > ( ( resolve , reject ) => {
119+ this . _operationQueue . push ( async ( ) => {
120+ try {
121+ resolve ( await operation ( ) )
122+ } catch ( error ) {
123+ reject ( error )
124+ }
125+ } )
126+ } )
127+ }
128+
129+ private async processQueue ( ) {
130+ // Do nothing if an operation is in progress or the queue is empty
131+ if ( this . _operationLock || this . _operationQueue . length === 0 ) {
132+ return
133+ }
134+
135+ // Get and execute the next operation from the queue
136+ this . _operationLock = true
137+ const nextOperation = this . _operationQueue . shift ( ) !
138+
139+ try {
140+ await nextOperation ( )
141+ } finally {
142+ this . _operationLock = false
143+ // Process the next operation (recursive call)
144+ this . processQueue ( )
145+ }
146+ }
147+
148+ private async startWorker ( id : number , configPaths : string ) : Promise < WdioExtensionWorker > {
149+ // Return existing server if already created
150+ const existingServer = this . _serverPool . get ( configPaths )
151+ if ( existingServer ) {
152+ return existingServer
153+ }
154+
155+ // Return pending operation if one is in progress
156+ const pendingOperation = this . _pendingOperations . get ( `start:${ configPaths } ` )
157+ if ( pendingOperation ) {
158+ return pendingOperation as Promise < WdioExtensionWorker >
159+ }
160+
161+ // Start a new process and track it
162+ const serverPromise = this . createWorker ( id , configPaths )
163+ this . _pendingOperations . set ( `start:${ configPaths } ` , serverPromise )
164+
165+ try {
166+ const server = await serverPromise
46167 return server
168+ } finally {
169+ // Remove from pending list when completed
170+ this . _pendingOperations . delete ( `start:${ configPaths } ` )
47171 }
48- this . latestId ++
49- return this . startWorker ( this . latestId , dirname ( configPaths ) )
50172 }
51173
52- private async startWorker ( id : number , configPaths : string ) {
174+ private async createWorker ( id : number , configPaths : string ) : Promise < WdioExtensionWorker > {
53175 const strId = `#${ String ( id ) } `
54176 const server = new WdioExtensionWorker ( strId , configPaths )
55177 await server . start ( )
56178 await server . waitForStart ( )
57- log . debug ( `[server manager] server was resisted : ${ configPaths } ` )
179+ log . debug ( `[server manager] server was registered : ${ configPaths } ` )
58180 this . _serverPool . set ( configPaths , server )
59181 return server
60182 }
61183
62- public async dispose ( ) {
63- for ( const [ cwd , worker ] of this . _serverPool . entries ( ) ) {
64- log . trace ( `shutdown the worker ${ worker . cid } for ${ cwd } ` )
65- await worker . stop ( )
184+ private async stopWorker ( configPath : string , worker : WdioExtensionWorker ) : Promise < void > {
185+ // Return pending stop operation if one is in progress
186+ const pendingOperation = this . _pendingOperations . get ( `stop:${ configPath } ` )
187+ if ( pendingOperation ) {
188+ return pendingOperation as Promise < void >
189+ }
190+
191+ // Start a new stop process and track it
192+ const stopPromise = this . executeStopWorker ( configPath , worker )
193+ this . _pendingOperations . set ( `stop:${ configPath } ` , stopPromise )
194+
195+ try {
196+ await stopPromise
197+ } finally {
198+ // Remove from pending list when completed
199+ this . _pendingOperations . delete ( `stop:${ configPath } ` )
66200 }
67201 }
202+
203+ private async executeStopWorker ( configPath : string , worker : WdioExtensionWorker ) : Promise < void > {
204+ log . trace ( `shutdown the worker ${ worker . cid } for ${ configPath } ` )
205+ await worker . stop ( )
206+ this . _serverPool . delete ( configPath )
207+ }
208+
209+ public async dispose ( ) {
210+ return this . queueOperation ( async ( ) => {
211+ const stopPromises : Promise < void > [ ] = [ ]
212+ for ( const [ cwd , worker ] of this . _serverPool . entries ( ) ) {
213+ log . trace ( `shutdown the worker ${ worker . cid } for ${ cwd } ` )
214+ stopPromises . push ( this . stopWorker ( cwd , worker ) )
215+ }
216+
217+ if ( stopPromises . length > 0 ) {
218+ await Promise . all ( stopPromises )
219+ }
220+ } )
221+ }
68222}
0 commit comments