Skip to content

Commit a0d3a43

Browse files
committed
Compute graph as build progresses
1 parent a457d5d commit a0d3a43

10 files changed

Lines changed: 440 additions & 19 deletions

File tree

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package io.jenkins.plugins.pipelinegraphview.livestate;
2+
3+
import edu.umd.cs.findbugs.annotations.NonNull;
4+
import hudson.Extension;
5+
import org.jenkinsci.plugins.workflow.flow.FlowExecution;
6+
import org.jenkinsci.plugins.workflow.flow.FlowExecutionListener;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
/**
11+
* Creates {@link LiveGraphState} entries at execution start / resume and evicts them on
12+
* completion. Without this, entries are still created lazily by {@link LiveGraphPopulator}
13+
* on first {@code onNewHead}, but {@code onResumed} guarantees the catch-up scan happens
14+
* once up-front rather than at the first event after restart.
15+
*/
16+
@Extension
17+
public class LiveGraphLifecycle extends FlowExecutionListener {
18+
19+
private static final Logger logger = LoggerFactory.getLogger(LiveGraphLifecycle.class);
20+
21+
@Override
22+
public void onRunning(@NonNull FlowExecution execution) {
23+
try {
24+
LiveGraphRegistry.get().getOrCreate(execution);
25+
} catch (Throwable t) {
26+
logger.warn("pipeline-graph-view live state onRunning failed", t);
27+
}
28+
}
29+
30+
@Override
31+
public void onResumed(@NonNull FlowExecution execution) {
32+
try {
33+
LiveGraphState state = LiveGraphRegistry.get().getOrCreate(execution);
34+
if (state != null) {
35+
LiveGraphPopulator.catchUp(execution, state);
36+
}
37+
} catch (Throwable t) {
38+
logger.warn("pipeline-graph-view live state onResumed failed", t);
39+
}
40+
}
41+
42+
@Override
43+
public void onCompleted(@NonNull FlowExecution execution) {
44+
try {
45+
LiveGraphRegistry.get().remove(execution);
46+
} catch (Throwable t) {
47+
logger.warn("pipeline-graph-view live state onCompleted failed", t);
48+
}
49+
}
50+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package io.jenkins.plugins.pipelinegraphview.livestate;
2+
3+
import hudson.Extension;
4+
import org.jenkinsci.plugins.workflow.flow.FlowExecution;
5+
import org.jenkinsci.plugins.workflow.flow.GraphListener;
6+
import org.jenkinsci.plugins.workflow.graph.FlowNode;
7+
import org.jenkinsci.plugins.workflow.graphanalysis.DepthFirstScanner;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
/**
12+
* Extension that captures every new {@link FlowNode} across every running execution and
13+
* feeds it to the corresponding {@link LiveGraphState}. The downstream {@code PipelineGraphApi}
14+
* path reads a snapshot of that state instead of walking the whole execution each time.
15+
*
16+
* <p>We use {@link GraphListener.Synchronous} rather than the async variant because callers
17+
* expect "once a node is a head, the next API read reflects it" — async delivery creates a
18+
* lag window where the snapshot is behind the execution, which breaks tests that check state
19+
* at precise trigger points and would surprise anyone hitting the REST API after an event.
20+
* The work done under the monitor is trivial ({@code ArrayList}/{@code HashSet} additions),
21+
* so the CPS VM thread is not meaningfully blocked. Every code path is still wrapped in
22+
* try/catch and poisons the state on failure so a bug here can never disrupt a build.
23+
*/
24+
@Extension
25+
public class LiveGraphPopulator implements GraphListener.Synchronous {
26+
27+
private static final Logger logger = LoggerFactory.getLogger(LiveGraphPopulator.class);
28+
29+
@Override
30+
public void onNewHead(FlowNode node) {
31+
LiveGraphState state = null;
32+
try {
33+
FlowExecution execution = node.getExecution();
34+
state = LiveGraphRegistry.get().getOrCreate(execution);
35+
if (state == null) {
36+
return; // feature disabled or execution not a WorkflowRun
37+
}
38+
// Lazy initial catch-up: if the listener is seeing nodes for an execution it's
39+
// never observed (plugin upgrade mid-build, Jenkins resume without onResumed
40+
// firing first), the early history is already in the FlowExecution's storage.
41+
// Backfill it once before processing this event.
42+
if (state.size() == 0 && !state.hasSeen(node.getId())) {
43+
catchUp(execution, state);
44+
}
45+
state.addNode(node);
46+
} catch (Throwable t) {
47+
// A thrown exception here propagates into the CPS VM and can abort the build.
48+
// Poison the state so subsequent reads fall back to the scanner; log the failure
49+
// but never rethrow.
50+
logger.warn("pipeline-graph-view live state failed; falling back to scanner", t);
51+
if (state != null) {
52+
state.poison();
53+
}
54+
}
55+
}
56+
57+
static void catchUp(FlowExecution execution, LiveGraphState state) {
58+
try {
59+
DepthFirstScanner scanner = new DepthFirstScanner();
60+
scanner.setup(execution.getCurrentHeads());
61+
for (FlowNode existing : scanner) {
62+
state.addNode(existing);
63+
}
64+
} catch (Throwable t) {
65+
logger.warn("pipeline-graph-view live state catch-up failed; poisoning", t);
66+
state.poison();
67+
}
68+
}
69+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package io.jenkins.plugins.pipelinegraphview.livestate;
2+
3+
import com.github.benmanes.caffeine.cache.Cache;
4+
import com.github.benmanes.caffeine.cache.Caffeine;
5+
import java.time.Duration;
6+
import jenkins.util.SystemProperties;
7+
import org.jenkinsci.plugins.workflow.flow.FlowExecution;
8+
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
9+
10+
/**
11+
* Singleton holding one {@link LiveGraphState} per in-progress run.
12+
* Entries are created on demand by the listener / lifecycle code, removed on completion,
13+
* and otherwise bounded by a Caffeine LRU so abandoned entries (deleted runs, listener
14+
* bugs) don't leak.
15+
*/
16+
public final class LiveGraphRegistry {
17+
18+
private static final LiveGraphRegistry INSTANCE = new LiveGraphRegistry();
19+
20+
public static LiveGraphRegistry get() {
21+
return INSTANCE;
22+
}
23+
24+
private final Cache<String, LiveGraphState> states = Caffeine.newBuilder()
25+
.maximumSize(256)
26+
.expireAfterAccess(Duration.ofMinutes(30))
27+
.build();
28+
29+
LiveGraphRegistry() {}
30+
31+
/**
32+
* Escape hatch. Setting this system property to {@code false} makes
33+
* {@link #snapshot(WorkflowRun)} always return {@code null}, forcing callers to use the
34+
* scanner fallback. Useful if a regression lands in the live-state path.
35+
*/
36+
private static boolean disabled() {
37+
return !SystemProperties.getBoolean(LiveGraphRegistry.class.getName() + ".enabled", true);
38+
}
39+
40+
LiveGraphState getOrCreate(FlowExecution execution) {
41+
if (disabled()) {
42+
return null;
43+
}
44+
String key = keyFor(execution);
45+
if (key == null) {
46+
return null;
47+
}
48+
return states.get(key, LiveGraphState::new);
49+
}
50+
51+
/**
52+
* Returns a snapshot of the live state for this run, or {@code null} if none exists
53+
* (feature disabled, state never populated, state poisoned). Callers must treat
54+
* {@code null} as "fall back to the scanner path."
55+
*/
56+
public LiveGraphSnapshot snapshot(WorkflowRun run) {
57+
if (disabled()) {
58+
return null;
59+
}
60+
LiveGraphState state = states.getIfPresent(run.getExternalizableId());
61+
return state == null ? null : state.snapshot();
62+
}
63+
64+
void remove(FlowExecution execution) {
65+
String key = keyFor(execution);
66+
if (key != null) {
67+
states.invalidate(key);
68+
}
69+
}
70+
71+
private static String keyFor(FlowExecution execution) {
72+
try {
73+
Object exec = execution.getOwner().getExecutable();
74+
if (exec instanceof WorkflowRun run) {
75+
return run.getExternalizableId();
76+
}
77+
return null;
78+
} catch (Exception e) {
79+
return null;
80+
}
81+
}
82+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.jenkins.plugins.pipelinegraphview.livestate;
2+
3+
import java.util.List;
4+
import org.jenkinsci.plugins.workflow.graph.FlowNode;
5+
6+
/**
7+
* Immutable projection of a {@link LiveGraphState} at a point in time.
8+
* Held briefly outside the state's monitor so HTTP callers can construct DTOs without
9+
* blocking the CPS VM thread that's feeding the live state.
10+
*/
11+
public record LiveGraphSnapshot(List<FlowNode> nodes, List<FlowNode> workspaceNodes) {}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package io.jenkins.plugins.pipelinegraphview.livestate;
2+
3+
import java.util.ArrayList;
4+
import java.util.HashSet;
5+
import java.util.List;
6+
import java.util.Set;
7+
import org.jenkinsci.plugins.workflow.actions.WorkspaceAction;
8+
import org.jenkinsci.plugins.workflow.graph.FlowNode;
9+
10+
/**
11+
* Per-run mutable state built up by {@link LiveGraphPopulator} as {@code GraphListener}
12+
* events arrive. Reads and writes are serialised on the instance monitor; holders should
13+
* snapshot and release quickly — the writer is the CPS VM thread and must not block.
14+
*
15+
* <p>This is a Phase 1 state: it records the raw node list plus the subset that carry a
16+
* {@link WorkspaceAction}. Snapshotting returns immutable copies of these so the downstream
17+
* relationship-finder / graph-builder path runs without the monitor.
18+
*/
19+
final class LiveGraphState {
20+
21+
private final String runId;
22+
private final List<FlowNode> nodes = new ArrayList<>();
23+
private final Set<String> seenIds = new HashSet<>();
24+
private final List<FlowNode> workspaceNodes = new ArrayList<>();
25+
private volatile boolean poisoned = false;
26+
27+
LiveGraphState(String runId) {
28+
this.runId = runId;
29+
}
30+
31+
String getRunId() {
32+
return runId;
33+
}
34+
35+
synchronized void addNode(FlowNode node) {
36+
if (!seenIds.add(node.getId())) {
37+
return;
38+
}
39+
nodes.add(node);
40+
if (node.getAction(WorkspaceAction.class) != null) {
41+
workspaceNodes.add(node);
42+
}
43+
}
44+
45+
synchronized boolean hasSeen(String nodeId) {
46+
return seenIds.contains(nodeId);
47+
}
48+
49+
synchronized int size() {
50+
return nodes.size();
51+
}
52+
53+
synchronized LiveGraphSnapshot snapshot() {
54+
if (poisoned) {
55+
return null;
56+
}
57+
return new LiveGraphSnapshot(List.copyOf(nodes), List.copyOf(workspaceNodes));
58+
}
59+
60+
void poison() {
61+
poisoned = true;
62+
}
63+
64+
boolean isPoisoned() {
65+
return poisoned;
66+
}
67+
}

src/main/java/io/jenkins/plugins/pipelinegraphview/treescanner/PipelineNodeGraphAdapter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
import io.jenkins.plugins.pipelinegraphview.utils.PipelineGraphBuilderApi;
55
import io.jenkins.plugins.pipelinegraphview.utils.PipelineStepBuilderApi;
66
import java.util.ArrayList;
7+
import java.util.Collection;
78
import java.util.Collections;
89
import java.util.HashMap;
910
import java.util.List;
1011
import java.util.Map;
1112
import java.util.stream.Collectors;
13+
import org.jenkinsci.plugins.workflow.graph.FlowNode;
1214
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
1315
import org.slf4j.Logger;
1416
import org.slf4j.LoggerFactory;
@@ -34,6 +36,14 @@ public PipelineNodeGraphAdapter(WorkflowRun run) {
3436
treeScanner = new PipelineNodeTreeScanner(run);
3537
}
3638

39+
/**
40+
* Builds the adapter over a pre-collected node set rather than walking the execution
41+
* graph. Used by the live-state path.
42+
*/
43+
public PipelineNodeGraphAdapter(WorkflowRun run, Collection<FlowNode> preCollectedNodes) {
44+
treeScanner = new PipelineNodeTreeScanner(run, preCollectedNodes);
45+
}
46+
3747
private final Object pipelineLock = new Object();
3848
private final Object stepLock = new Object();
3949
private final Object remapLock = new Object();

src/main/java/io/jenkins/plugins/pipelinegraphview/treescanner/PipelineNodeTreeScanner.java

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,18 @@ public PipelineNodeTreeScanner(@NonNull WorkflowRun run) {
5959
this.build();
6060
}
6161

62+
/**
63+
* Alternate constructor that uses a caller-supplied node collection instead of walking
64+
* the execution graph with a {@link DepthFirstScanner}. Intended for use by the
65+
* live-state path, which has already observed every node via {@code GraphListener}.
66+
*/
67+
public PipelineNodeTreeScanner(@NonNull WorkflowRun run, @NonNull Collection<FlowNode> nodes) {
68+
this.run = run;
69+
this.execution = run.getExecution();
70+
this.declarative = run.getAction(ExecutionModelAction.class) != null;
71+
this.buildFrom(nodes);
72+
}
73+
6274
/**
6375
* Builds the flow node graph.
6476
*/
@@ -67,22 +79,7 @@ public void build() {
6779
logger.debug("Building graph");
6880
}
6981
if (execution != null) {
70-
Collection<FlowNode> nodes = getAllNodes();
71-
NodeRelationshipFinder finder = new NodeRelationshipFinder();
72-
Map<String, NodeRelationship> relationships = finder.getNodeRelationships(nodes);
73-
GraphBuilder builder = new GraphBuilder(nodes, relationships, this.run, this.execution);
74-
if (isDebugEnabled) {
75-
logger.debug("Original nodes:");
76-
logger.debug("{}", builder.getNodes());
77-
}
78-
this.stageNodeMap = builder.getStageMapping();
79-
this.stepNodeMap = builder.getStepMapping();
80-
List<FlowNodeWrapper> remappedNodes = new ArrayList<>(this.stageNodeMap.values());
81-
remappedNodes.addAll(this.stepNodeMap.values());
82-
if (isDebugEnabled) {
83-
logger.debug("Remapped nodes:");
84-
logger.debug("{}", remappedNodes);
85-
}
82+
buildFrom(getAllNodes());
8683
} else {
8784
this.stageNodeMap = new LinkedHashMap<>();
8885
this.stepNodeMap = new LinkedHashMap<>();
@@ -92,6 +89,29 @@ public void build() {
9289
}
9390
}
9491

92+
private void buildFrom(Collection<FlowNode> nodes) {
93+
if (execution == null || nodes.isEmpty()) {
94+
this.stageNodeMap = new LinkedHashMap<>();
95+
this.stepNodeMap = new LinkedHashMap<>();
96+
return;
97+
}
98+
NodeRelationshipFinder finder = new NodeRelationshipFinder();
99+
Map<String, NodeRelationship> relationships = finder.getNodeRelationships(nodes);
100+
GraphBuilder builder = new GraphBuilder(nodes, relationships, this.run, this.execution);
101+
if (isDebugEnabled) {
102+
logger.debug("Original nodes:");
103+
logger.debug("{}", builder.getNodes());
104+
}
105+
this.stageNodeMap = builder.getStageMapping();
106+
this.stepNodeMap = builder.getStepMapping();
107+
if (isDebugEnabled) {
108+
List<FlowNodeWrapper> remappedNodes = new ArrayList<>(this.stageNodeMap.values());
109+
remappedNodes.addAll(this.stepNodeMap.values());
110+
logger.debug("Remapped nodes:");
111+
logger.debug("{}", remappedNodes);
112+
}
113+
}
114+
95115
/**
96116
* Gets all the nodes that are reachable in the graph.
97117
*/

0 commit comments

Comments
 (0)