Skip to content

Commit fde05e8

Browse files
committed
fix: prevent reconciler from silently dropping state transitions
1 parent 73c4c85 commit fde05e8

2 files changed

Lines changed: 173 additions & 3 deletions

File tree

src/lib/orchestrator.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ export class Orchestrator {
159159
private reconcilerInterval: Timer | null = null;
160160
private isRunning = false;
161161
private isReconciling = false;
162+
private reconcilePending = false;
162163
private activePlanId: string | null = null;
163164
private planModelSnapshot: string | undefined;
164165
private planPlacement: 'session' | 'window' | null = null;
@@ -403,11 +404,22 @@ export class Orchestrator {
403404

404405
private async reconcile(): Promise<void> {
405406
if (this.isReconciling) {
407+
this.reconcilePending = true;
406408
return;
407409
}
408410

409411
this.isReconciling = true;
410412
try {
413+
do {
414+
this.reconcilePending = false;
415+
await this._doReconcile();
416+
} while (this.reconcilePending);
417+
} finally {
418+
this.isReconciling = false;
419+
}
420+
}
421+
422+
private async _doReconcile(): Promise<void> {
411423
const plan = await loadPlan();
412424

413425
if (!plan || isTerminalPlanStatus(plan.status)) {
@@ -652,9 +664,6 @@ export class Orchestrator {
652664
}
653665
await savePlan(latestPlan);
654666
}
655-
} finally {
656-
this.isReconciling = false;
657-
}
658667
}
659668

660669
private async launchJob(job: JobSpec): Promise<void> {

tests/lib/orchestrator.test.ts

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -785,6 +785,167 @@ describe('orchestrator', () => {
785785
});
786786
});
787787

788+
describe('orchestrator reconcile pending (dirty re-reconcile)', () => {
789+
let planState: PlanSpec | null;
790+
let runningJobs: Job[];
791+
let monitor: FakeMonitor;
792+
793+
beforeEach(() => {
794+
planState = null;
795+
runningJobs = [];
796+
monitor = new FakeMonitor();
797+
798+
spyOn(planStateMod, 'loadPlan').mockImplementation(async () => clone(planState));
799+
spyOn(planStateMod, 'savePlan').mockImplementation(async (plan: PlanSpec) => {
800+
planState = clone(plan);
801+
});
802+
spyOn(planStateMod, 'updatePlanJob').mockImplementation(
803+
async (planId: string, jobName: string, updates: Partial<JobSpec>) => {
804+
if (!planState || planState.id !== planId) {
805+
return;
806+
}
807+
planState.jobs = planState.jobs.map((job) =>
808+
job.name === jobName ? { ...job, ...updates } : job,
809+
);
810+
},
811+
);
812+
spyOn(planStateMod, 'clearPlan').mockImplementation(async () => {
813+
planState = null;
814+
});
815+
spyOn(planStateMod, 'validateGhAuth').mockResolvedValue(true);
816+
817+
spyOn(integrationMod, 'createIntegrationBranch').mockResolvedValue({
818+
branch: 'mc/integration-plan-1',
819+
worktreePath: '/tmp/integration-plan-1',
820+
});
821+
spyOn(integrationMod, 'deleteIntegrationBranch').mockResolvedValue();
822+
823+
spyOn(jobStateMod, 'getRunningJobs').mockImplementation(async () => clone(runningJobs));
824+
spyOn(jobStateMod, 'addJob').mockResolvedValue();
825+
spyOn(jobStateMod, 'updateJob').mockResolvedValue();
826+
spyOn(jobStateMod, 'loadJobState').mockImplementation(async () => {
827+
const state: JobState = {
828+
version: 2,
829+
jobs: runningJobs,
830+
updatedAt: new Date().toISOString(),
831+
};
832+
return state;
833+
});
834+
835+
spyOn(worktreeMod, 'createWorktree').mockResolvedValue('/tmp/wt/job-a');
836+
spyOn(worktreeMod, 'removeWorktree').mockResolvedValue();
837+
838+
spyOn(tmuxMod, 'createSession').mockResolvedValue();
839+
spyOn(tmuxMod, 'createWindow').mockResolvedValue();
840+
spyOn(tmuxMod, 'getCurrentSession').mockReturnValue('main');
841+
spyOn(tmuxMod, 'isInsideTmux').mockReturnValue(true);
842+
spyOn(tmuxMod, 'isPaneRunning').mockResolvedValue(true);
843+
spyOn(tmuxMod, 'killSession').mockResolvedValue();
844+
spyOn(tmuxMod, 'killWindow').mockResolvedValue();
845+
spyOn(tmuxMod, 'sendKeys').mockResolvedValue();
846+
spyOn(tmuxMod, 'setPaneDiedHook').mockResolvedValue();
847+
848+
spyOn(mergeTrainMod, 'checkMergeability').mockResolvedValue({ canMerge: true });
849+
});
850+
851+
afterEach(() => {
852+
mock.restore();
853+
});
854+
855+
it('reconcile runs normally when not already reconciling', async () => {
856+
planState = makePlan({
857+
status: 'running',
858+
jobs: [makeJob('job-a', { status: 'queued' })],
859+
});
860+
861+
const orchestrator = new Orchestrator(monitor as any, {
862+
defaultPlacement: 'session',
863+
pollInterval: 10000,
864+
idleThreshold: 300000,
865+
worktreeBasePath: '/tmp',
866+
omo: { enabled: false, defaultMode: 'vanilla' },
867+
maxParallel: 3,
868+
} as any);
869+
const launchSpy = spyOn(orchestrator as any, 'launchJob').mockResolvedValue(undefined);
870+
871+
await (orchestrator as any).reconcile();
872+
873+
expect(launchSpy).toHaveBeenCalledTimes(1);
874+
expect((orchestrator as any).isReconciling).toBe(false);
875+
});
876+
877+
it('concurrent reconcile call sets pending flag instead of dropping', async () => {
878+
const orchestrator = new Orchestrator(monitor as any, {
879+
defaultPlacement: 'session',
880+
pollInterval: 10000,
881+
idleThreshold: 300000,
882+
worktreeBasePath: '/tmp',
883+
omo: { enabled: false, defaultMode: 'vanilla' },
884+
} as any);
885+
886+
(orchestrator as any).isReconciling = true;
887+
888+
await (orchestrator as any).reconcile();
889+
890+
expect((orchestrator as any).reconcilePending).toBe(true);
891+
});
892+
893+
it('reconciler re-runs when pending flag is set during execution', async () => {
894+
let doReconcileCallCount = 0;
895+
896+
planState = makePlan({
897+
status: 'running',
898+
jobs: [makeJob('job-a', { status: 'queued' })],
899+
});
900+
901+
const orchestrator = new Orchestrator(monitor as any, {
902+
defaultPlacement: 'session',
903+
pollInterval: 10000,
904+
idleThreshold: 300000,
905+
worktreeBasePath: '/tmp',
906+
omo: { enabled: false, defaultMode: 'vanilla' },
907+
maxParallel: 3,
908+
} as any);
909+
910+
spyOn(orchestrator as any, '_doReconcile').mockImplementation(async () => {
911+
doReconcileCallCount++;
912+
if (doReconcileCallCount === 1) {
913+
(orchestrator as any).reconcilePending = true;
914+
}
915+
});
916+
917+
await (orchestrator as any).reconcile();
918+
919+
expect(doReconcileCallCount).toBe(2);
920+
expect((orchestrator as any).isReconciling).toBe(false);
921+
});
922+
923+
it('pending flag is cleared before each re-run cycle', async () => {
924+
const pendingValues: boolean[] = [];
925+
926+
const orchestrator = new Orchestrator(monitor as any, {
927+
defaultPlacement: 'session',
928+
pollInterval: 10000,
929+
idleThreshold: 300000,
930+
worktreeBasePath: '/tmp',
931+
omo: { enabled: false, defaultMode: 'vanilla' },
932+
} as any);
933+
934+
let callCount = 0;
935+
spyOn(orchestrator as any, '_doReconcile').mockImplementation(async () => {
936+
pendingValues.push((orchestrator as any).reconcilePending);
937+
callCount++;
938+
if (callCount === 1) {
939+
(orchestrator as any).reconcilePending = true;
940+
}
941+
});
942+
943+
await (orchestrator as any).reconcile();
944+
945+
expect(pendingValues).toEqual([false, false]);
946+
});
947+
});
948+
788949
describe('orchestrator DAG helpers', () => {
789950
it('detects circular dependencies', () => {
790951
const jobs = [

0 commit comments

Comments
 (0)