From 855149fc3eb9e58e8d4a1fb922c0e17b3af9003a Mon Sep 17 00:00:00 2001 From: Ajeet Yadav Date: Sun, 31 May 2026 21:38:58 +0530 Subject: [PATCH] feat(hubble): add HubbleDiscoverer and related tests for flow observation - Implemented HubbleDiscoverer to emit edges for observed pod communication flows. - Added tests for HubbleDiscoverer to verify edge emission, flow skipping, and deduplication. - Introduced a stub FlowGetter for testing purposes. - Enhanced Manager with negative cache handling and flow retrieval logic. - Updated provider detection to include Cilium and Hubble Relay. - Modified NetworkPanel to support flow window configuration and display flow data. - Added UI elements for managing flow visibility and flow window duration. - Updated types and state management to accommodate new providers and flow features. --- CHANGELOG.md | 32 ++ go-core/cmd/podscape-core/main.go | 4 + go-core/cmd/podscape-mcp/tools_diag.go | 13 +- go-core/go.mod | 45 +- go-core/go.sum | 85 ++-- go-core/internal/graph/discoverers.go | 16 +- go-core/internal/graph/engine.go | 23 +- go-core/internal/graph/graph.go | 13 +- go-core/internal/graph/graph_test.go | 120 ++++- go-core/internal/handlers/handlers.go | 3 + go-core/internal/handlers/network.go | 12 + go-core/internal/handlers/providers.go | 17 +- go-core/internal/handlers/security.go | 8 +- go-core/internal/hubble/client.go | 448 ++++++++++++++++++ go-core/internal/hubble/client_test.go | 84 ++++ go-core/internal/hubble/discoverer.go | 92 ++++ go-core/internal/hubble/discoverer_test.go | 91 ++++ go-core/internal/hubble/export_test.go | 13 + go-core/internal/providers/detect.go | 17 +- go-core/internal/providers/detect_test.go | 78 ++- package.json | 2 +- src/main/ipc/kubectl.ts | 8 +- src/preload/index.ts | 4 +- .../components/panels/NetworkPanel.tsx | 101 +++- .../components/panels/NetworkPanel.utils.ts | 3 +- src/renderer/store/slices/analysisSlice.ts | 8 +- .../store/slices/clusterSlice.test.ts | 3 + src/renderer/store/slices/clusterSlice.ts | 17 +- .../store/slices/providersSlice.test.ts | 18 +- src/renderer/store/slices/providersSlice.ts | 20 +- src/renderer/store/slices/resourceSlice.ts | 6 +- src/renderer/types/api.ts | 2 + 32 files changed, 1270 insertions(+), 136 deletions(-) create mode 100644 go-core/internal/hubble/client.go create mode 100644 go-core/internal/hubble/client_test.go create mode 100644 go-core/internal/hubble/discoverer.go create mode 100644 go-core/internal/hubble/discoverer_test.go create mode 100644 go-core/internal/hubble/export_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6af2bef..c17ebe6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,35 @@ +## [3.3.0] — 2026-05-30 + +### New features + +#### Cilium Hubble network flows + +- **Live traffic edges in the network map** (`NetworkPanel.tsx`, `NetworkPanel.utils.ts`) — When Cilium with Hubble Relay is present in the cluster, the network map now overlays observed pod-to-pod traffic as teal `hubble-flow` edges. DROPPED and ERROR flows are labelled and visually distinct from forwarded traffic. Edges are opt-in via a **Hubble Flows** filter pill that only appears when `providers.hubbleRelay` is true, so the panel is unaffected on clusters without Hubble. + +- **Hubble Relay gRPC client** (`go-core/internal/hubble/client.go`) — Lazy-connecting `Manager` singleton that programmatically port-forwards to a `hubble-relay` pod in `kube-system` on first use, then streams flows via the Cilium Observer gRPC API. Key design properties: + - Connection setup runs outside the manager lock — concurrent context switches (`Reset`) are never blocked for up to the 15 s port-forward timeout. + - A generation counter on every `Reset` lets in-flight fetches detect a cluster switch and discard stale results. + - A per-generation negative cache (pod-not-found only) prevents repeated API calls on clusters without Hubble; transient port-forward and gRPC failures do not cache and retry on the next request. + - Concurrent dial race handled: if two goroutines both attempt to create a connection, the loser discards its tunnel and streams from the winner's connection. + +- **`HubbleDiscoverer`** (`go-core/internal/hubble/discoverer.go`) — Implements `graph.Discoverer`; queries all flows from the last 60 s, deduplicates by pod-pair (DROPPED/ERROR wins over FORWARDED), and emits `hubble-flow` edges into the topology graph. Always registered — returns zero edges silently when Hubble is unavailable. + +- **`EdgeHubbleFlow` edge kind** (`go-core/internal/graph/graph.go`) — New `EdgeKind = "hubble-flow"` constant; rendered in teal with a 1.2 s animated stroke. + +- **Provider detection extended** (`go-core/internal/providers/detect.go`, `go-core/internal/handlers/providers.go`) — `ProviderSet` gains `Cilium` (detected via `cilium.io` API group) and `HubbleRelay` (detected by the existence of the `hubble-relay` Service in `kube-system`). The renderer `ProviderSet` type and `providersSlice` default state updated to match. Context-switch reset in `clusterSlice` now includes both new fields. + +### Bug fixes + +#### Go sidecar + +- **`tools_diag.go` `detect_providers` silently swallowed hubble-relay probe errors** — Added `k8serrors.IsNotFound` check; unexpected API errors are now logged rather than silently treated as "not installed". + +#### Renderer + +- **Context-switch provider reset missing `cilium` and `hubbleRelay`** (`clusterSlice.ts`) — The inline reset object in `selectContext` did not include the two new provider fields, so stale Hubble values persisted across context switches until `fetchProviders` resolved. Both fields are now explicitly reset to `false`. + +--- + ## [3.2.2] — 2026-05-02 ### New features diff --git a/go-core/cmd/podscape-core/main.go b/go-core/cmd/podscape-core/main.go index f32c885..645a4be 100644 --- a/go-core/cmd/podscape-core/main.go +++ b/go-core/cmd/podscape-core/main.go @@ -10,6 +10,7 @@ import ( "github.com/podscape/go-core/internal/client" "github.com/podscape/go-core/internal/handlers" + "github.com/podscape/go-core/internal/hubble" "github.com/podscape/go-core/internal/informers" "github.com/podscape/go-core/internal/portforward" "github.com/podscape/go-core/internal/store" @@ -174,6 +175,7 @@ func main() { // Initialize the portforward manager early (with nil clients) so handlers // like HandleSwitchContext can safely call Manager.StopAll() immediately. portforward.Init(nil, nil) + hubble.Init(nil, nil) // Build the k8s client; fail gracefully into setup mode if no valid kubeconfig exists. // If the file is missing (fresh install, CI, new machine), run in no-kubeconfig @@ -205,6 +207,8 @@ func main() { // Update the portforward manager with valid clients now that we have them. portforward.Manager.UpdateClients(bundle.Clientset, bundle.Config) + hubble.Init(bundle.Clientset, bundle.Config) + hubble.DefaultManager.WarmUp() // Start the HTTP server — /health returns 503 until informers sync, so // startSidecar() keeps polling. portforward is already initialised above. diff --git a/go-core/cmd/podscape-mcp/tools_diag.go b/go-core/cmd/podscape-mcp/tools_diag.go index cfc36f8..14f4e5a 100644 --- a/go-core/cmd/podscape-mcp/tools_diag.go +++ b/go-core/cmd/podscape-mcp/tools_diag.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log" "strings" "sync" "time" @@ -15,6 +16,7 @@ import ( "github.com/podscape/go-core/internal/providers" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -544,6 +546,15 @@ func handleDetectProviders(ctx context.Context, req mcp.CallToolRequest) (*mcp.C if icList != nil { items = icList.Items } - ps := providers.Detect(b.Clientset.Discovery(), items) + + hubbleRelayPresent := false + _, hubbleErr := b.Clientset.CoreV1().Services("kube-system").Get(apiCtx, "hubble-relay", metav1.GetOptions{}) + if hubbleErr == nil { + hubbleRelayPresent = true + } else if !k8serrors.IsNotFound(hubbleErr) { + log.Printf("[providers] unexpected error checking hubble-relay service: %v", hubbleErr) + } + + ps := providers.Detect(b.Clientset.Discovery(), items, hubbleRelayPresent) return jsonResult(ps) } diff --git a/go-core/go.mod b/go-core/go.mod index 8353759..84beca6 100644 --- a/go-core/go.mod +++ b/go-core/go.mod @@ -3,16 +3,22 @@ module github.com/podscape/go-core go 1.25.5 require ( + github.com/cilium/cilium v1.19.4 github.com/controlplaneio/kubesec/v2 v2.14.2 github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 go.uber.org/zap v1.27.1 helm.sh/helm/v3 v3.20.2 - k8s.io/api v0.35.2 - k8s.io/apimachinery v0.35.2 - k8s.io/client-go v0.35.2 + k8s.io/api v0.35.4 + k8s.io/apimachinery v0.35.4 + k8s.io/client-go v0.35.4 sigs.k8s.io/yaml v1.6.0 ) +require ( + github.com/stretchr/testify v1.11.1 // indirect + go.opentelemetry.io/otel/sdk/metric v1.43.0 // indirect +) + require ( github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/buger/jsonparser v1.1.2 // indirect @@ -25,7 +31,6 @@ require ( require ( dario.cat/mergo v1.0.2 // indirect - github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 // indirect github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect github.com/BurntSushi/toml v1.6.0 // indirect github.com/MakeNowJust/heredoc v1.0.0 // indirect @@ -40,7 +45,6 @@ require ( github.com/containerd/errdefs v1.0.0 // indirect github.com/containerd/log v0.1.0 // indirect github.com/containerd/platforms v1.0.0-rc.2 // indirect - github.com/coreos/go-systemd/v22 v22.6.0 // indirect github.com/creack/pty v1.1.24 // indirect github.com/cyphar/filepath-securejoin v0.6.1 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -81,7 +85,6 @@ require ( github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-retryablehttp v0.7.8 // indirect - github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/huandu/xstrings v1.5.0 // indirect github.com/in-toto/in-toto-golang v0.9.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect @@ -96,7 +99,6 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect github.com/mattn/go-sqlite3 v1.14.28 // indirect - github.com/miekg/dns v1.1.61 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-wordwrap v1.0.1 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect @@ -112,8 +114,6 @@ require ( github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/common v0.67.4 // indirect - github.com/prometheus/procfs v0.17.0 // indirect github.com/redis/go-redis/extra/redisotel/v9 v9.5.3 // indirect github.com/redis/go-redis/v9 v9.17.2 // indirect github.com/rivo/uniseg v0.4.7 // indirect @@ -141,19 +141,16 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.60.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.36.0 // indirect - go.opentelemetry.io/otel/metric v1.43.0 // indirect - go.opentelemetry.io/otel/sdk v1.43.0 // indirect - go.opentelemetry.io/otel/trace v1.43.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect - golang.org/x/crypto v0.49.0 // indirect - golang.org/x/net v0.52.0 // indirect + golang.org/x/crypto v0.50.0 // indirect + golang.org/x/net v0.53.0 // indirect golang.org/x/oauth2 v0.35.0 // indirect golang.org/x/sync v0.20.0 - golang.org/x/sys v0.42.0 // indirect - golang.org/x/term v0.41.0 // indirect - golang.org/x/text v0.35.0 // indirect + golang.org/x/sys v0.43.0 // indirect + golang.org/x/term v0.42.0 // indirect + golang.org/x/text v0.36.0 // indirect golang.org/x/time v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect google.golang.org/grpc v1.80.0 // indirect @@ -162,18 +159,18 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/apiextensions-apiserver v0.35.2 - k8s.io/apiserver v0.35.2 // indirect - k8s.io/cli-runtime v0.35.1 // indirect; indirect — TODO: upgrade to v0.35.x alongside kubectl to resolve k8s minor-version skew (used only by Helm CLI, unrelated to apiextensions path) - k8s.io/component-base v0.35.2 // indirect + k8s.io/apiextensions-apiserver v0.35.4 + k8s.io/apiserver v0.35.4 // indirect + k8s.io/cli-runtime v0.35.4 // indirect; indirect — TODO: upgrade to v0.35.x alongside kubectl to resolve k8s minor-version skew (used only by Helm CLI, unrelated to apiextensions path) + k8s.io/component-base v0.35.4 // indirect k8s.io/klog/v2 v2.130.1 k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect - k8s.io/kubectl v0.35.1 // indirect; indirect — TODO: upgrade to v0.35.x (see cli-runtime above) - k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 // indirect + k8s.io/kubectl v0.35.4 // indirect; indirect — TODO: upgrade to v0.35.x (see cli-runtime above) + k8s.io/utils v0.0.0-20260319190234-28399d86e0b5 // indirect oras.land/oras-go/v2 v2.6.0 // indirect sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect sigs.k8s.io/kustomize/api v0.20.1 // indirect sigs.k8s.io/kustomize/kyaml v0.20.1 // indirect sigs.k8s.io/randfill v1.0.0 // indirect - sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect + sigs.k8s.io/structured-merge-diff/v6 v6.3.2-0.20260122202528-d9cc6641c482 // indirect ) diff --git a/go-core/go.sum b/go-core/go.sum index 8826fad..c156100 100644 --- a/go-core/go.sum +++ b/go-core/go.sum @@ -42,6 +42,8 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chai2010/gettext-go v1.0.2 h1:1Lwwip6Q2QGsAdl/ZKPCwTe9fe0CjlUbqj5bFNSjIRk= github.com/chai2010/gettext-go v1.0.2/go.mod h1:y+wnP2cHYaVj19NZhYKAwEMH2CI1gNHeQQ+5AjwawxA= +github.com/cilium/cilium v1.19.4 h1:TxDZW+27NqLbuenPlWd8y8cnAmDNFga/FCMM8zP1qLQ= +github.com/cilium/cilium v1.19.4/go.mod h1:9j8LLVACyWe8bbtlUPCjPSARbmOS+tqo9GRNh5SHjJc= github.com/codahale/rfc6979 v0.0.0-20141003034818-6a90f24967eb h1:EDmT6Q9Zs+SbUoc7Ik9EfrFqcylYqgPZ9ANSbTAntnE= github.com/codahale/rfc6979 v0.0.0-20141003034818-6a90f24967eb/go.mod h1:ZjrT6AXHbDs86ZSdt/osfBi5qfexBrKUdONk989Wnk4= github.com/containerd/containerd v1.7.32 h1:S54xuVcPxeLaYgaRABtpJ2VyVUVsy0IGf7qHBs+sbY8= @@ -231,8 +233,8 @@ github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/mattn/go-sqlite3 v1.14.28 h1:ThEiQrnbtumT+QMknw63Befp/ce/nUPgBPMlRFEum7A= github.com/mattn/go-sqlite3 v1.14.28/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= -github.com/miekg/dns v1.1.61 h1:nLxbwF3XxhwVSm8g9Dghm9MHPaUZuqhPiGL+675ZmEs= -github.com/miekg/dns v1.1.61/go.mod h1:mnAarhS3nWaW+NVP2wTkYVIZyHNJ098SJZUki3eykwQ= +github.com/miekg/dns v1.1.68 h1:jsSRkNozw7G/mnmXULynzMNIsgY2dHC8LO6U6Ij2JEA= +github.com/miekg/dns v1.1.68/go.mod h1:fujopn7TB3Pu3JM69XaawiU0wqjpL9/8xGop5UrTPps= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0= @@ -255,6 +257,7 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo/v2 v2.27.2 h1:LzwLj0b89qtIy6SSASkzlNvX6WktqurSHwkk2ipF/Ns= github.com/onsi/ginkgo/v2 v2.27.2/go.mod h1:ArE1D/XhNXBXCBkKOLkbsb2c81dQHCRcF5zwn/ykDRo= github.com/onsi/gomega v1.38.2 h1:eZCjf2xjZAqe+LeWvKb5weQ+NcPwX84kqJ0cZNxok2A= @@ -278,12 +281,12 @@ github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= -github.com/prometheus/common v0.67.4 h1:yR3NqWO1/UyO1w2PhUvXlGQs/PtFmoveVO0KZ4+Lvsc= -github.com/prometheus/common v0.67.4/go.mod h1:gP0fq6YjjNCLssJCQp0yk4M8W6ikLURwkdd/YKtTbyI= +github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTUGI4= +github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw= github.com/prometheus/otlptranslator v0.0.2 h1:+1CdeLVrRQ6Psmhnobldo0kTp96Rj80DRXRd5OSnMEQ= github.com/prometheus/otlptranslator v0.0.2/go.mod h1:P8AwMgdD7XEr6QRUJ2QWLpiAZTgTE2UYgjlu3svompI= -github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7DuK0= -github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw= +github.com/prometheus/procfs v0.19.2 h1:zUMhqEW66Ex7OXIiDkll3tl9a1ZdilUOd/F6ZXw4Vws= +github.com/prometheus/procfs v0.19.2/go.mod h1:M0aotyiemPhBCM0z5w87kL22CxfcH05ZpYlu+b4J7mw= github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3 h1:1/BDligzCa40GTllkDnY3Y5DTHuKCONbB2JcRyIfl20= github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3/go.mod h1:3dZmcLn3Qw6FLlWASn1g4y+YO9ycEFUOM+bhBmzLVKQ= github.com/redis/go-redis/extra/redisotel/v9 v9.5.3 h1:kuvuJL/+MZIEdvtb/kTBRiRgYaOmx1l+lYJyVdrRUOs= @@ -398,12 +401,12 @@ go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= -golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4= -golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA= -golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= -golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= -golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= -golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= +golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= +golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= +golang.org/x/mod v0.34.0 h1:xIHgNUUnW6sYkcM5Jleh05DvLOtwc6RitGHbDk4akRI= +golang.org/x/mod v0.34.0/go.mod h1:ykgH52iCZe79kzLLMhyCUzhMci+nQj+0XkbXpNYtVjY= +golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= +golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= golang.org/x/oauth2 v0.35.0 h1:Mv2mzuHuZuY2+bkyWXIHMfhNdJAdwW3FuWeCPYN5GVQ= golang.org/x/oauth2 v0.35.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= @@ -411,16 +414,18 @@ golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= -golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= -golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU= -golang.org/x/term v0.41.0/go.mod h1:3pfBgksrReYfZ5lvYM0kSO0LIkAl4Yl2bXOkKP7Ec2A= -golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= -golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/term v0.42.0 h1:UiKe+zDFmJobeJ5ggPwOshJIVt6/Ft0rcfrXZDLWAWY= +golang.org/x/term v0.42.0/go.mod h1:Dq/D+snpsbazcBG5+F9Q1n2rXV8Ma+71xEjTRufARgY= +golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= +golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= -golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= -golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= +golang.org/x/tools v0.43.0 h1:12BdW9CeB3Z+J/I/wj34VMl8X+fEXBxVR90JeMX5E7s= +golang.org/x/tools v0.43.0/go.mod h1:uHkMso649BX2cZK6+RpuIPXS3ho2hZo4FVwfoy1vIk0= +gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= +gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= google.golang.org/genproto v0.0.0-20231211222908-989df2bf70f3 h1:1hfbdAfFbkmpg41000wDVqr7jUpK/Yo+LPnIxxGzmkg= google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 h1:VPWxll4HlMw1Vs/qXtN7BvhZqsS9cdAittCNvVENElA= google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:7QBABkRtR8z+TEnmXTqIqwJLlzrZKVfAUm7tY3yGv0M= @@ -446,28 +451,28 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= helm.sh/helm/v3 v3.20.2 h1:binM4rvPx5DcNsa1sIt7UZi55lRbu3pZUFmQkSoRh48= helm.sh/helm/v3 v3.20.2/go.mod h1:Fl1kBaWCpkUrM6IYXPjQ3bdZQfFrogKArqptvueZ6Ww= -k8s.io/api v0.35.2 h1:tW7mWc2RpxW7HS4CoRXhtYHSzme1PN1UjGHJ1bdrtdw= -k8s.io/api v0.35.2/go.mod h1:7AJfqGoAZcwSFhOjcGM7WV05QxMMgUaChNfLTXDRE60= -k8s.io/apiextensions-apiserver v0.35.2 h1:iyStXHoJZsUXPh/nFAsjC29rjJWdSgUmG1XpApE29c0= -k8s.io/apiextensions-apiserver v0.35.2/go.mod h1:OdyGvcO1FtMDWQ+rRh/Ei3b6X3g2+ZDHd0MSRGeS8rU= -k8s.io/apimachinery v0.35.2 h1:NqsM/mmZA7sHW02JZ9RTtk3wInRgbVxL8MPfzSANAK8= -k8s.io/apimachinery v0.35.2/go.mod h1:jQCgFZFR1F4Ik7hvr2g84RTJSZegBc8yHgFWKn//hns= -k8s.io/apiserver v0.35.2 h1:rb52v0CZGEL0FkhjS+I6jHflAp7fZ4MIaKcEHX7wmDk= -k8s.io/apiserver v0.35.2/go.mod h1:CROJUAu0tfjZLyYgSeBsBan2T7LUJGh0ucWwTCSSk7g= -k8s.io/cli-runtime v0.35.1 h1:uKcXFe8J7AMAM4Gm2JDK4mp198dBEq2nyeYtO+JfGJE= -k8s.io/cli-runtime v0.35.1/go.mod h1:55/hiXIq1C8qIJ3WBrWxEwDLdHQYhBNRdZOz9f7yvTw= -k8s.io/client-go v0.35.2 h1:YUfPefdGJA4aljDdayAXkc98DnPkIetMl4PrKX97W9o= -k8s.io/client-go v0.35.2/go.mod h1:4QqEwh4oQpeK8AaefZ0jwTFJw/9kIjdQi0jpKeYvz7g= -k8s.io/component-base v0.35.2 h1:btgR+qNrpWuRSuvWSnQYsZy88yf5gVwemvz0yw79pGc= -k8s.io/component-base v0.35.2/go.mod h1:B1iBJjooe6xIJYUucAxb26RwhAjzx0gHnqO9htWIX+0= +k8s.io/api v0.35.4 h1:P7nFYKl5vo9AGUp1Z+Pmd3p2tA7bX2wbFWCvDeRv988= +k8s.io/api v0.35.4/go.mod h1:yl4lqySWOgYJJf9RERXKUwE9g2y+CkuwG+xmcOK8wXU= +k8s.io/apiextensions-apiserver v0.35.4 h1:HeP+Upp7ItdvnyGmub0yoix+2z5+ev4M5cE5TCgtOUU= +k8s.io/apiextensions-apiserver v0.35.4/go.mod h1:ogQlk+stIE8mnoRthSYCwlOS12fVqgWFiErMwPaXA7c= +k8s.io/apimachinery v0.35.4 h1:xtdom9RG7e+yDp71uoXoJDWEE2eOiHgeO4GdBzwWpds= +k8s.io/apimachinery v0.35.4/go.mod h1:NNi1taPOpep0jOj+oRha3mBJPqvi0hGdaV8TCqGQ+cc= +k8s.io/apiserver v0.35.4 h1:vtuFqNFmF9bPRdHDL2lpK6qCTPWDreZJL4LRPwVM6ho= +k8s.io/apiserver v0.35.4/go.mod h1:JnBcb+J8kFXKpZkgcbcUnPBBHi4qgBii1I7dLxFY/oo= +k8s.io/cli-runtime v0.35.4 h1:8QRCXSDvopflFNM65Vkkdv42BljPdRSiqf6HFyI1iik= +k8s.io/cli-runtime v0.35.4/go.mod h1:MKLFuZxiJpm87UxjVeQRNy3sCaczHrSOPKN9pinlrM0= +k8s.io/client-go v0.35.4 h1:DN6fyaGuzK64UvnKO5fOA6ymSjvfGAnCAHAR0C66kD8= +k8s.io/client-go v0.35.4/go.mod h1:2Pg9WpsS4NeOpoYTfHHfMxBG8zFMSAUi4O/qoiJC3nY= +k8s.io/component-base v0.35.4 h1:6n1tNJ87johN0Hif0Fs8K2GMthsaUwMqCebUDLYyv7U= +k8s.io/component-base v0.35.4/go.mod h1:qaDJgz5c1KYKla9occFmlJEfPpkuA55s90G509R+PeY= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 h1:Y3gxNAuB0OBLImH611+UDZcmKS3g6CthxToOb37KgwE= k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912/go.mod h1:kdmbQkyfwUagLfXIad1y2TdrjPFWp2Q89B3qkRwf/pQ= -k8s.io/kubectl v0.35.1 h1:zP3Er8C5i1dcAFUMh9Eva0kVvZHptXIn/+8NtRWMxwg= -k8s.io/kubectl v0.35.1/go.mod h1:cQ2uAPs5IO/kx8R5s5J3Ihv3VCYwrx0obCXum0CvnXo= -k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 h1:SjGebBtkBqHFOli+05xYbK8YF1Dzkbzn+gDM4X9T4Ck= -k8s.io/utils v0.0.0-20251002143259-bc988d571ff4/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/kubectl v0.35.4 h1:IHitney6OUeH29rBQnt6Cas6az8HpFeSAohormITNMc= +k8s.io/kubectl v0.35.4/go.mod h1:CGWAaof9ae4vGDAyhnSf1bSQN/U7jiWQHLVbMbLMjRI= +k8s.io/utils v0.0.0-20260319190234-28399d86e0b5 h1:kBawHLSnx/mYHmRnNUf9d4CpjREbeZuxoSGOX/J+aYM= +k8s.io/utils v0.0.0-20260319190234-28399d86e0b5/go.mod h1:xDxuJ0whA3d0I4mf/C4ppKHxXynQ+fxnkmQH0vTHnuk= oras.land/oras-go/v2 v2.6.0 h1:X4ELRsiGkrbeox69+9tzTu492FMUu7zJQW6eJU+I2oc= oras.land/oras-go/v2 v2.6.0/go.mod h1:magiQDfG6H1O9APp+rOsvCPcW1GD2MM7vgnKY0Y+u1o= sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 h1:IpInykpT6ceI+QxKBbEflcR5EXP7sU1kvOlxwZh5txg= @@ -478,7 +483,7 @@ sigs.k8s.io/kustomize/kyaml v0.20.1 h1:PCMnA2mrVbRP3NIB6v9kYCAc38uvFLVs8j/CD567A sigs.k8s.io/kustomize/kyaml v0.20.1/go.mod h1:0EmkQHRUsJxY8Ug9Niig1pUMSCGHxQ5RklbpV/Ri6po= sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU= sigs.k8s.io/randfill v1.0.0/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY= -sigs.k8s.io/structured-merge-diff/v6 v6.3.0 h1:jTijUJbW353oVOd9oTlifJqOGEkUw2jB/fXCbTiQEco= -sigs.k8s.io/structured-merge-diff/v6 v6.3.0/go.mod h1:M3W8sfWvn2HhQDIbGWj3S099YozAsymCo/wrT5ohRUE= +sigs.k8s.io/structured-merge-diff/v6 v6.3.2-0.20260122202528-d9cc6641c482 h1:2WOzJpHUBVrrkDjU4KBT8n5LDcj824eX0I5UKcgeRUs= +sigs.k8s.io/structured-merge-diff/v6 v6.3.2-0.20260122202528-d9cc6641c482/go.mod h1:M3W8sfWvn2HhQDIbGWj3S099YozAsymCo/wrT5ohRUE= sigs.k8s.io/yaml v1.6.0 h1:G8fkbMSAFqgEFgh4b1wmtzDnioxFCUgTZhlbj5P9QYs= sigs.k8s.io/yaml v1.6.0/go.mod h1:796bPqUfzR/0jLAl6XjHl3Ck7MiyVv8dbTdyT3/pMf4= diff --git a/go-core/internal/graph/discoverers.go b/go-core/internal/graph/discoverers.go index a36b7c9..b346230 100644 --- a/go-core/internal/graph/discoverers.go +++ b/go-core/internal/graph/discoverers.go @@ -121,7 +121,13 @@ func (d *VolumeDiscoverer) Discover(nodes []Node, cache ResourceCache) []Edge { for _, vol := range pod.Spec.Volumes { if vol.PersistentVolumeClaim != nil { - pvcID := fmt.Sprintf("%s:%s:%s", KindPVC, node.Namespace, vol.PersistentVolumeClaim.ClaimName) + claimName := vol.PersistentVolumeClaim.ClaimName + pvcID := fmt.Sprintf("%s:%s:%s", KindPVC, node.Namespace, claimName) + if pvcObj, ok := cache.GetRawObject(KindPVC, node.Namespace, claimName); ok { + if pvc, ok := pvcObj.(*corev1.PersistentVolumeClaim); ok && pvc.UID != "" { + pvcID = fmt.Sprintf("%s:%s", KindPVC, pvc.UID) + } + } edges = append(edges, Edge{ ID: fmt.Sprintf("volume:%s->%s", node.ID, pvcID), Source: node.ID, @@ -155,7 +161,13 @@ func (d *NodeDiscoverer) Discover(nodes []Node, cache ResourceCache) []Edge { continue } - nodeID := fmt.Sprintf("node:%s", pod.Spec.NodeName) + nodeName := pod.Spec.NodeName + nodeID := fmt.Sprintf("node:%s", nodeName) + if kubeNodeObj, ok := cache.GetRawObject(KindNode, "", nodeName); ok { + if kubeNode, ok := kubeNodeObj.(*corev1.Node); ok && kubeNode.UID != "" { + nodeID = fmt.Sprintf("node:%s", kubeNode.UID) + } + } edges = append(edges, Edge{ ID: fmt.Sprintf("node:%s->%s", node.ID, nodeID), Source: node.ID, diff --git a/go-core/internal/graph/engine.go b/go-core/internal/graph/engine.go index b591792..726cebc 100644 --- a/go-core/internal/graph/engine.go +++ b/go-core/internal/graph/engine.go @@ -81,7 +81,7 @@ func (b *GraphBuilder) Build(initialNodes []Node) *Graph { func (b *GraphBuilder) collapseResources(graph *Graph) { groups := make(map[string][]int) // key -> indices in graph.Nodes for i, n := range graph.Nodes { - if (n.Kind == KindPod || n.Kind == KindReplicaSet) && n.OwnerUID != "" { + if (n.Kind == KindPod || n.WorkloadKind == "ReplicaSet") && n.OwnerUID != "" { key := fmt.Sprintf("%s:%s:%s", n.Kind, n.Namespace, n.OwnerUID) groups[key] = append(groups[key], i) } @@ -127,10 +127,6 @@ func (b *GraphBuilder) collapseResources(graph *Graph) { collapsedNode.ID = collapsedID collapsedNode.ReplicaCount = len(indices) collapsedNode.ReplicaNames = names - // Shorten name if it ends with a hash/suffix, usually owners share a prefix - if len(collapsedNode.Name) > 10 { - collapsedNode.Name = collapsedNode.Name[:len(collapsedNode.Name)-6] + ".." - } newNodes = append(newNodes, collapsedNode) } @@ -143,10 +139,14 @@ func (b *GraphBuilder) collapseResources(graph *Graph) { } graph.Nodes = newNodes - // Update edges + // Update edges: remap endpoints, re-ID, and merge duplicate labels. + // edgeSeen maps new edge ID → index in newEdges so that when multiple + // pre-collapse edges land on the same post-collapse endpoint pair we can + // merge their labels rather than silently discarding one. "dropped" always + // wins, matching the same priority rule used by HubbleDiscoverer. if len(idMap) > 0 { newEdges := make([]Edge, 0, len(graph.Edges)) - edgeSeen := make(map[string]bool) + edgeSeen := make(map[string]int) for _, e := range graph.Edges { if newSrc, ok := idMap[e.Source]; ok { e.Source = newSrc @@ -154,11 +154,14 @@ func (b *GraphBuilder) collapseResources(graph *Graph) { if newTarget, ok := idMap[e.Target]; ok { e.Target = newTarget } - // Update edge ID to reflect new endpoints e.ID = fmt.Sprintf("edge:%s:%s:%s", e.Source, e.Target, e.Kind) - if !edgeSeen[e.ID] { + if idx, seen := edgeSeen[e.ID]; seen { + if e.Label == "dropped" && newEdges[idx].Label != "dropped" { + newEdges[idx].Label = "dropped" + } + } else { + edgeSeen[e.ID] = len(newEdges) newEdges = append(newEdges, e) - edgeSeen[e.ID] = true } } graph.Edges = newEdges diff --git a/go-core/internal/graph/graph.go b/go-core/internal/graph/graph.go index 9a27bad..a649b00 100644 --- a/go-core/internal/graph/graph.go +++ b/go-core/internal/graph/graph.go @@ -46,13 +46,14 @@ type Node struct { type EdgeKind string const ( - EdgeOwner EdgeKind = "controller-pod" + EdgeOwner EdgeKind = "controller-pod" EdgeControllerWorkload EdgeKind = "controller-workload" - EdgeSelector EdgeKind = "svc-pod" - EdgeVolume EdgeKind = "pod-pvc" - EdgeConnection EdgeKind = "ing-svc" - EdgePolicy EdgeKind = "policy-pod" - EdgePodNode EdgeKind = "pod-node" + EdgeSelector EdgeKind = "svc-pod" + EdgeVolume EdgeKind = "pod-pvc" + EdgeConnection EdgeKind = "ing-svc" + EdgePolicy EdgeKind = "policy-pod" + EdgePodNode EdgeKind = "pod-node" + EdgeHubbleFlow EdgeKind = "hubble-flow" ) // Edge represents a directed relationship between two nodes. diff --git a/go-core/internal/graph/graph_test.go b/go-core/internal/graph/graph_test.go index 5215ca4..7883320 100644 --- a/go-core/internal/graph/graph_test.go +++ b/go-core/internal/graph/graph_test.go @@ -194,12 +194,17 @@ func TestConnectivityDiscovery(t *testing.T) { }, } + kubeNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-1", UID: "node-uid-1"}, + } + mock.objects["pod:ns:pod-1"] = pod mock.objects["ingress:ns:ing-1"] = ing + mock.objects["node::node-1"] = kubeNode nodes := []Node{ {ID: "pod:pod-uid", Kind: KindPod, Name: "pod-1", Namespace: "ns"}, - {ID: "node:node-1", Kind: KindNode, Name: "node-1"}, + {ID: "node:node-uid-1", Kind: KindNode, Name: "node-1", UID: "node-uid-1"}, {ID: "ingress:ns:ing-1", Kind: KindIngress, Name: "ing-1", Namespace: "ns"}, {ID: "service:ns:svc-1", Kind: KindService, Name: "svc-1", Namespace: "ns"}, } @@ -207,16 +212,16 @@ func TestConnectivityDiscovery(t *testing.T) { builder := NewGraphBuilder(mock) g := builder.Build(nodes) - // Check Pod -> Node + // Check Pod -> Node (must use UID-based node ID) foundNodeEdge := false for _, e := range g.Edges { - if e.Source == "pod:pod-uid" && e.Target == "node:node-1" { + if e.Source == "pod:pod-uid" && e.Target == "node:node-uid-1" { foundNodeEdge = true break } } if !foundNodeEdge { - t.Errorf("did not find pod-to-node edge") + t.Errorf("did not find pod-to-node edge with UID-based target") } // Check Ingress -> Service @@ -234,6 +239,113 @@ func TestConnectivityDiscovery(t *testing.T) { t.Errorf("did not find ingress-to-service edge") } } + +func TestVolumeDiscoverer_EdgeUsesUID(t *testing.T) { + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{Name: "data-pvc", Namespace: "ns", UID: "pvc-uid-42"}, + } + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "app-pod", Namespace: "ns", UID: "pod-uid-1"}, + Spec: corev1.PodSpec{ + Volumes: []corev1.Volume{ + { + Name: "data", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ClaimName: "data-pvc"}, + }, + }, + }, + }, + } + + mock := &mockCache{objects: map[string]interface{}{ + "pod:ns:app-pod": pod, + "pvc:ns:data-pvc": pvc, + }} + + nodes := []Node{ + {ID: "pod:pod-uid-1", Kind: KindPod, Name: "app-pod", Namespace: "ns"}, + {ID: "pvc:pvc-uid-42", Kind: KindPVC, Name: "data-pvc", Namespace: "ns", UID: "pvc-uid-42"}, + } + + builder := &GraphBuilder{cache: mock, discoverers: []Discoverer{&VolumeDiscoverer{}}} + g := builder.Build(nodes) + + found := false + for _, e := range g.Edges { + if e.Kind == EdgeVolume && e.Source == "pod:pod-uid-1" && e.Target == "pvc:pvc-uid-42" { + found = true + break + } + } + if !found { + t.Errorf("expected volume edge target to use PVC UID (pvc:pvc-uid-42), edges: %+v", g.Edges) + } +} + +func TestNodeDiscoverer_EdgeUsesUID(t *testing.T) { + kubeNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "worker-1", UID: "node-uid-99"}, + } + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "my-pod", Namespace: "ns", UID: "pod-uid-2"}, + Spec: corev1.PodSpec{NodeName: "worker-1"}, + } + + mock := &mockCache{objects: map[string]interface{}{ + "pod:ns:my-pod": pod, + "node::worker-1": kubeNode, + }} + + nodes := []Node{ + {ID: "pod:pod-uid-2", Kind: KindPod, Name: "my-pod", Namespace: "ns"}, + {ID: "node:node-uid-99", Kind: KindNode, Name: "worker-1", UID: "node-uid-99"}, + } + + builder := &GraphBuilder{cache: mock, discoverers: []Discoverer{&NodeDiscoverer{}}} + g := builder.Build(nodes) + + found := false + for _, e := range g.Edges { + if e.Kind == EdgePodNode && e.Source == "pod:pod-uid-2" && e.Target == "node:node-uid-99" { + found = true + break + } + } + if !found { + t.Errorf("expected pod-node edge target to use Node UID (node:node-uid-99), edges: %+v", g.Edges) + } +} + +func TestCollapseResources_DroppedLabelWins(t *testing.T) { + cache := &mockCache{objects: make(map[string]interface{})} + builder := NewGraphBuilder(cache) + + ownerUID := "owner-abc" + nodes := []Node{ + {ID: "pod-1", Kind: KindPod, Name: "app-a", Namespace: "ns", OwnerUID: ownerUID}, + {ID: "pod-2", Kind: KindPod, Name: "app-b", Namespace: "ns", OwnerUID: ownerUID}, + {ID: "svc-1", Kind: KindService, Name: "my-svc", Namespace: "ns"}, + } + + // Two edges to the same pod pair that will collapse to one endpoint: + // one forwarded, one dropped. "dropped" must win after merging. + g := &Graph{ + Nodes: nodes, + Edges: []Edge{ + {ID: "edge:svc-1:pod-1:hubble-flow", Source: "svc-1", Target: "pod-1", Kind: EdgeHubbleFlow, Label: "forwarded"}, + {ID: "edge:svc-1:pod-2:hubble-flow", Source: "svc-1", Target: "pod-2", Kind: EdgeHubbleFlow, Label: "dropped"}, + }, + } + builder.collapseResources(g) + + if len(g.Edges) != 1 { + t.Fatalf("expected 1 merged edge after collapse, got %d: %+v", len(g.Edges), g.Edges) + } + if g.Edges[0].Label != "dropped" { + t.Errorf("expected merged edge label to be 'dropped', got %q", g.Edges[0].Label) + } +} func TestResourceCollapsing(t *testing.T) { cache := &mockCache{objects: make(map[string]interface{})} builder := NewGraphBuilder(cache) diff --git a/go-core/internal/handlers/handlers.go b/go-core/internal/handlers/handlers.go index 6b481e5..c0f84ab 100644 --- a/go-core/internal/handlers/handlers.go +++ b/go-core/internal/handlers/handlers.go @@ -10,6 +10,7 @@ import ( "github.com/gorilla/websocket" "github.com/podscape/go-core/internal/helm" + "github.com/podscape/go-core/internal/hubble" "github.com/podscape/go-core/internal/informers" "github.com/podscape/go-core/internal/portforward" "github.com/podscape/go-core/internal/prometheus" @@ -339,6 +340,8 @@ func HandleSwitchContext(w http.ResponseWriter, r *http.Request) { // the Prometheus probe to return "Connected" for the wrong cluster. portforward.Manager.StopAll() portforward.Manager.UpdateClients(clientset, restConfig) + hubble.DefaultManager.Reset(clientset, restConfig) + hubble.DefaultManager.WarmUp() // Clear the Prometheus query cache so cluster A results are never served // to cluster B even if the PromQL strings and time range happen to match. prometheus.ClearCache() diff --git a/go-core/internal/handlers/network.go b/go-core/internal/handlers/network.go index 6875608..6331a3e 100644 --- a/go-core/internal/handlers/network.go +++ b/go-core/internal/handlers/network.go @@ -7,6 +7,7 @@ import ( "net/http" "sort" "strconv" + "time" corev1 "k8s.io/api/core/v1" appsv1 "k8s.io/api/apps/v1" @@ -15,6 +16,7 @@ import ( "github.com/gorilla/websocket" "github.com/podscape/go-core/internal/graph" + "github.com/podscape/go-core/internal/hubble" "github.com/podscape/go-core/internal/logs" "github.com/podscape/go-core/internal/portforward" "github.com/podscape/go-core/internal/store" @@ -492,7 +494,17 @@ func HandleTopology(w http.ResponseWriter, r *http.Request) { }) // 2. Build Graph using the new Discovery Engine + // HubbleDiscoverer is always registered; it lazily connects on first use + // and returns zero edges when Hubble Relay is absent or unreachable. + // flowWindow defaults to 60 s; callers may pass ?flowWindow= (5–3600). + flowWindowSecs := 60 + if fw := r.URL.Query().Get("flowWindow"); fw != "" { + if n, err := strconv.Atoi(fw); err == nil && n >= 5 && n <= 3600 { + flowWindowSecs = n + } + } builder := graph.NewGraphBuilder(snap) + builder.AddDiscoverer(hubble.NewDiscoverer(hubble.DefaultManager, time.Duration(flowWindowSecs)*time.Second)) g := builder.Build(initialNodes) w.Header().Set("Content-Type", "application/json") diff --git a/go-core/internal/handlers/providers.go b/go-core/internal/handlers/providers.go index 1b1c7ab..60000cf 100644 --- a/go-core/internal/handlers/providers.go +++ b/go-core/internal/handlers/providers.go @@ -7,8 +7,9 @@ import ( "github.com/podscape/go-core/internal/providers" "github.com/podscape/go-core/internal/store" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" networkingv1 "k8s.io/api/networking/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // HandleProviders detects which ingress controllers and service mesh providers @@ -31,7 +32,19 @@ func HandleProviders(w http.ResponseWriter, r *http.Request) { icList = icl.Items } - ps := providers.Detect(cs.Discovery(), icList) + const ( + hubbleNamespace = "kube-system" + hubbleServiceName = "hubble-relay" + ) + hubbleRelayPresent := false + _, hubbleErr := cs.CoreV1().Services(hubbleNamespace).Get(r.Context(), hubbleServiceName, metav1.GetOptions{}) + if hubbleErr == nil { + hubbleRelayPresent = true + } else if !k8serrors.IsNotFound(hubbleErr) { + log.Printf("[providers] failed to check hubble-relay service: %v", hubbleErr) + } + + ps := providers.Detect(cs.Discovery(), icList, hubbleRelayPresent) w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(ps); err != nil { diff --git a/go-core/internal/handlers/security.go b/go-core/internal/handlers/security.go index ae87973..5db7e49 100644 --- a/go-core/internal/handlers/security.go +++ b/go-core/internal/handlers/security.go @@ -140,6 +140,8 @@ func HandleSecurityScan(w http.ResponseWriter, r *http.Request) { msg := "trivy scan failed" if waitErr != nil { msg = waitErr.Error() + } else if readErr != nil { + msg = readErr.Error() } sseEvent(w, flusher, "error", msg) return @@ -305,7 +307,11 @@ func HandleTrivyImages(w http.ResponseWriter, r *http.Request) { Kind string `json:"kind"` } `json:"workloads"` } - if err := json.NewDecoder(r.Body).Decode(&req); err != nil || len(req.Workloads) == 0 { + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "invalid request body: "+err.Error(), http.StatusBadRequest) + return + } + if len(req.Workloads) == 0 { w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{"Resources": []interface{}{}}) return diff --git a/go-core/internal/hubble/client.go b/go-core/internal/hubble/client.go new file mode 100644 index 0000000..ffe021a --- /dev/null +++ b/go-core/internal/hubble/client.go @@ -0,0 +1,448 @@ +// Package hubble provides a lazy gRPC client for Cilium Hubble Relay. +// It port-forwards to a hubble-relay pod in kube-system on first use and +// establishes a gRPC connection to observe network flows. +package hubble + +import ( + "context" + "fmt" + "io" + "log" + "net" + "net/http" + "net/url" + "sync" + "time" + + flowpb "github.com/cilium/cilium/api/v1/flow" + observerpb "github.com/cilium/cilium/api/v1/observer" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/timestamppb" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/portforward" + "k8s.io/client-go/transport/spdy" +) + +const ( + hubbleRelayNamespace = "kube-system" + hubbleRelayPort = 4245 + tunnelReadyTimeout = 15 * time.Second + dialFailedTTL = 2 * time.Minute +) + +// Flow captures the essential routing metadata for a single observed network flow. +type Flow struct { + SrcNamespace string + SrcPod string + DstNamespace string + DstPod string + DstService string + Verdict string // "FORWARDED" | "DROPPED" | "ERROR" | ... +} + +// Manager lazily creates a port-forward to hubble-relay and a gRPC connection +// on first GetFlows call. It is safe for concurrent use. +type Manager struct { + mu sync.Mutex + clientset kubernetes.Interface + restConfig *rest.Config + // generation increments on every Reset, letting in-flight fetches detect + // that the cluster switched and discard stale results. + generation uint64 + // dialFailedGen records the generation for which the last dial attempt + // failed. When dialFailed is true and dialFailedGen == generation, further + // attempts are suppressed until the next Reset (context switch) or until + // dialFailedTTL elapses — allowing self-healing when Hubble is installed + // mid-session without a context switch. + dialFailed bool + dialFailedGen uint64 + dialFailedAt time.Time + // dialFailedTTL controls how long a cached dial failure suppresses retries. + // Defaults to dialFailedTTL constant; injectable via SetDialFailedTTL for tests. + dialFailedTTL time.Duration + + // active tunnel state + stopCh chan struct{} + localPort int + grpcConn *grpc.ClientConn +} + +// NewManager returns a new idle Manager. Call Reset to attach Kubernetes clients. +func NewManager() *Manager { + return &Manager{dialFailedTTL: dialFailedTTL} +} + +// SetDialFailedTTL overrides the negative-cache timeout. Intended for testing; +// the default (2 minutes) is set by NewManager. +func (m *Manager) SetDialFailedTTL(d time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + m.dialFailedTTL = d +} + +// DefaultManager is the package-level singleton used by the rest of the sidecar. +var DefaultManager = NewManager() + +// Init attaches Kubernetes clients to the DefaultManager, tearing down any +// existing connection first. +func Init(clientset kubernetes.Interface, config *rest.Config) { + DefaultManager.Reset(clientset, config) +} + +// WarmUp fires a background goroutine that establishes the port-forward and +// gRPC connection proactively so the first /topology request does not block +// waiting for tunnel setup. It is a no-op when clients are nil. +func (m *Manager) WarmUp() { + m.mu.Lock() + if m.clientset == nil || m.restConfig == nil { + m.mu.Unlock() + return + } + m.mu.Unlock() + go func() { + ctx, cancel := context.WithTimeout(context.Background(), tunnelReadyTimeout+5*time.Second) + defer cancel() + _, _ = m.GetFlows(ctx, "", 1*time.Second) + }() +} + +// Reset tears down any existing tunnel and gRPC connection, then records new +// Kubernetes clients. Passing nil clients is safe — subsequent GetFlows calls +// will return empty slices without error. +func (m *Manager) Reset(clientset kubernetes.Interface, config *rest.Config) { + m.mu.Lock() + defer m.mu.Unlock() + m.teardownLocked() + m.generation++ + m.dialFailed = false + m.dialFailedGen = 0 + m.dialFailedAt = time.Time{} + m.clientset = clientset + m.restConfig = config +} + +// IsConnected returns true when an active gRPC connection exists. +func (m *Manager) IsConnected() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.grpcConn != nil +} + +// GetFlows returns a snapshot of network flows for the given namespace over the +// trailing window. It returns nil (nil error) when no Kubernetes clients are +// configured or when Hubble is unreachable. +// +// Connection setup (port-forward + gRPC dial) is performed outside the manager +// lock so concurrent Reset calls are never blocked waiting for the tunnel. +func (m *Manager) GetFlows(ctx context.Context, namespace string, window time.Duration) ([]Flow, error) { + m.mu.Lock() + if m.clientset == nil || m.restConfig == nil { + m.mu.Unlock() + return nil, nil + } + + // Fast path: reuse an existing connection. + if m.grpcConn != nil { + conn := m.grpcConn + gen := m.generation + m.mu.Unlock() + return m.fetchFlows(ctx, conn, namespace, window, gen) + } + + // Negative cache: if dial already failed for this generation (e.g. Hubble + // not installed), skip re-dialing until the TTL expires or the next context + // switch. The TTL allows self-healing when Hubble is installed mid-session. + if m.dialFailed && m.dialFailedGen == m.generation { + if time.Since(m.dialFailedAt) < m.dialFailedTTL { + m.mu.Unlock() + return nil, nil + } + // TTL expired — clear the cache and let the dial proceed. + m.dialFailed = false + } + + // Slow path: establish port-forward + gRPC connection outside the lock so + // concurrent Reset() calls (e.g. context switch) are not blocked for up to + // tunnelReadyTimeout. + cs := m.clientset + cfg := m.restConfig + gen := m.generation + m.mu.Unlock() + + conn, stopCh, localPort, cacheable, err := dial(ctx, cs, cfg) + if err != nil { + log.Printf("[hubble] dial failed: %v", err) + if cacheable { + m.mu.Lock() + if m.generation == gen { + m.dialFailed = true + m.dialFailedGen = gen + m.dialFailedAt = time.Now() + } + m.mu.Unlock() + } + return nil, nil + } + + // Store the connection under lock. Two outcomes require cleanup: + // 1. Reset() fired while we were dialing (generation changed). + // 2. Another goroutine won the concurrent-dial race (grpcConn now set). + // In both cases close what we just created and use what's already there. + m.mu.Lock() + if m.generation != gen { + m.mu.Unlock() + close(stopCh) + _ = conn.Close() + return nil, nil + } + if m.grpcConn != nil { + // Another goroutine beat us. Discard ours and use the winner's. + // m.generation == gen is already confirmed above, so gen is correct. + winner := m.grpcConn + m.mu.Unlock() + close(stopCh) + _ = conn.Close() + return m.fetchFlows(ctx, winner, namespace, window, gen) + } + m.grpcConn = conn + m.stopCh = stopCh + m.localPort = localPort + // Clear any stale negative-cache flag. A concurrent loser may have set + // dialFailed=true for this generation after we succeeded; that would + // permanently disable Hubble once teardown fires. A stored connection + // contradicts a cached failure — clear it. + m.dialFailed = false + m.dialFailedGen = 0 + m.mu.Unlock() + + return m.fetchFlows(ctx, conn, namespace, window, gen) +} + +// fetchFlows streams flows from the Observer gRPC service and collects them. +// gen is the generation captured before the lock was released; results are +// discarded if generation changes mid-stream (cross-cluster bleed guard). +func (m *Manager) fetchFlows(ctx context.Context, conn *grpc.ClientConn, namespace string, window time.Duration, gen uint64) ([]Flow, error) { + client := observerpb.NewObserverClient(conn) + + req := &observerpb.GetFlowsRequest{ + Follow: false, + Since: timestamppb.New(time.Now().Add(-window)), + } + // Only filter by namespace when one is specified. An empty namespace prefix + // ("/" or "/podname") is not a valid Hubble pod selector and returns nothing. + if namespace != "" { + req.Whitelist = []*observerpb.FlowFilter{ + {SourcePod: []string{namespace + "/"}}, + {DestinationPod: []string{namespace + "/"}}, + } + } + + stream, err := client.GetFlows(ctx, req) + if err != nil { + // Log the RPC error so operators can distinguish "Hubble not installed" + // (handled by the negative cache / pod-not-found path) from "Hubble + // requires TLS" (grpc transport error) or other misconfiguration. + log.Printf("[hubble] GetFlows RPC failed: %v", err) + m.mu.Lock() + if m.generation == gen { + m.teardownLocked() + } + m.mu.Unlock() + return nil, nil + } + + var flows []Flow + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + // Tear down so the next call reconnects cleanly. A transient + // mid-stream error on the current generation still yields whatever + // flows were collected so far — partial data beats an empty map for + // the same cluster. Staleness is checked below. + m.mu.Lock() + if m.generation == gen { + m.teardownLocked() + } + m.mu.Unlock() + break + } + + f := resp.GetFlow() + if f == nil { + continue + } + + flow := Flow{Verdict: verdictString(f.GetVerdict())} + if src := f.GetSource(); src != nil { + flow.SrcNamespace = src.GetNamespace() + flow.SrcPod = src.GetPodName() + } + if dst := f.GetDestination(); dst != nil { + flow.DstNamespace = dst.GetNamespace() + flow.DstPod = dst.GetPodName() + } + if svc := f.GetDestinationService(); svc != nil { + flow.DstService = svc.GetName() + } + flows = append(flows, flow) + } + + // Discard results if the cluster switched mid-stream. + m.mu.Lock() + stale := m.generation != gen + m.mu.Unlock() + if stale { + return nil, nil + } + return flows, nil +} + +// dial creates a port-forward tunnel and gRPC connection to hubble-relay. +// Called outside the manager lock to avoid blocking Reset. +// cacheable is true only when the failure indicates Hubble is not installed +// (pod not found). Port-forward and gRPC errors are transient — do not cache +// them, or a momentary port collision blocks Hubble until the next Reset. +func dial(ctx context.Context, cs kubernetes.Interface, cfg *rest.Config) (conn *grpc.ClientConn, stopCh chan struct{}, localPort int, cacheable bool, err error) { + podName, podErr := findHubbleRelayPod(ctx, cs) + if podErr != nil { + return nil, nil, 0, true, fmt.Errorf("hubble-relay pod not found: %w", podErr) + } + + localPort, err = freeLocalPort() + if err != nil { + return nil, nil, 0, false, fmt.Errorf("no free local port: %w", err) + } + + stopCh, err = startPortForward(ctx, cfg, podName, localPort) + if err != nil { + return nil, nil, 0, false, fmt.Errorf("port-forward failed: %w", err) + } + + addr := fmt.Sprintf("127.0.0.1:%d", localPort) + conn, err = grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + close(stopCh) + return nil, nil, 0, false, fmt.Errorf("grpc dial: %w", err) + } + + return conn, stopCh, localPort, false, nil +} + +// startPortForward creates a SPDY port-forward tunnel to the given pod +// and waits up to tunnelReadyTimeout for it to be ready. +func startPortForward(ctx context.Context, cfg *rest.Config, podName string, localPort int) (chan struct{}, error) { + rawURL := fmt.Sprintf("%s/api/v1/namespaces/%s/pods/%s/portforward", + cfg.Host, hubbleRelayNamespace, podName) + u, err := url.Parse(rawURL) + if err != nil { + return nil, err + } + + transport, upgrader, err := spdy.RoundTripperFor(cfg) + if err != nil { + return nil, err + } + + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport, Timeout: tunnelReadyTimeout + 5*time.Second}, http.MethodPost, u) + ports := []string{fmt.Sprintf("%d:%d", localPort, hubbleRelayPort)} + + stopCh := make(chan struct{}) + readyCh := make(chan struct{}) + errCh := make(chan error, 1) + + pf, err := portforward.New(dialer, ports, stopCh, readyCh, io.Discard, io.Discard) + if err != nil { + close(stopCh) + return nil, err + } + + go func() { + if err := pf.ForwardPorts(); err != nil { + select { + case <-readyCh: + // Already ready — normal teardown. + default: + select { + case errCh <- err: + default: + } + } + } + }() + + select { + case <-readyCh: + return stopCh, nil + case err := <-errCh: + close(stopCh) + return nil, err + case <-time.After(tunnelReadyTimeout): + close(stopCh) + return nil, fmt.Errorf("hubble port-forward did not become ready within %s", tunnelReadyTimeout) + case <-ctx.Done(): + close(stopCh) + return nil, ctx.Err() + } +} + +// findHubbleRelayPod returns the name of a Running hubble-relay pod in kube-system. +// It tries the label selector "k8s-app=hubble-relay" first, falling back to +// "app=hubble-relay". +func findHubbleRelayPod(ctx context.Context, cs kubernetes.Interface) (string, error) { + var lastErr error + for _, selector := range []string{"k8s-app=hubble-relay", "app=hubble-relay"} { + pods, err := cs.CoreV1().Pods(hubbleRelayNamespace).List(ctx, metav1.ListOptions{ + LabelSelector: selector, + }) + if err != nil { + lastErr = err + continue + } + for i := range pods.Items { + if pods.Items[i].Status.Phase == corev1.PodRunning { + return pods.Items[i].Name, nil + } + } + } + if lastErr != nil { + return "", fmt.Errorf("no running hubble-relay pod found in %s: %w", hubbleRelayNamespace, lastErr) + } + return "", fmt.Errorf("no running hubble-relay pod found in %s", hubbleRelayNamespace) +} + +// teardownLocked closes the gRPC connection and stops the port-forward. +// The caller must hold m.mu. +func (m *Manager) teardownLocked() { + if m.grpcConn != nil { + _ = m.grpcConn.Close() + m.grpcConn = nil + } + if m.stopCh != nil { + close(m.stopCh) + m.stopCh = nil + } + m.localPort = 0 +} + +// freeLocalPort picks an available port on localhost by briefly listening on :0. +func freeLocalPort() (int, error) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return 0, err + } + port := ln.Addr().(*net.TCPAddr).Port + _ = ln.Close() + return port, nil +} + +// verdictString converts a Cilium flow Verdict to its canonical string name. +func verdictString(v flowpb.Verdict) string { + return v.String() +} diff --git a/go-core/internal/hubble/client_test.go b/go-core/internal/hubble/client_test.go new file mode 100644 index 0000000..72d87c0 --- /dev/null +++ b/go-core/internal/hubble/client_test.go @@ -0,0 +1,84 @@ +package hubble_test + +import ( + "context" + "testing" + "time" + + "github.com/podscape/go-core/internal/hubble" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + fakeclient "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/rest" +) + +func TestManager_NilClientsReturnsEmptyFlows(t *testing.T) { + m := hubble.NewManager() + flows, err := m.GetFlows(context.Background(), "default", 60*time.Second) + require.NoError(t, err) + assert.Empty(t, flows) +} + +func TestManager_ResetWithNilClientsIsNoop(t *testing.T) { + m := hubble.NewManager() + m.Reset(nil, nil) + flows, err := m.GetFlows(context.Background(), "default", 60*time.Second) + require.NoError(t, err) + assert.Empty(t, flows) +} + +func TestManager_IsConnectedFalseWhenNoConnection(t *testing.T) { + m := hubble.NewManager() + assert.False(t, m.IsConnected()) +} + +func TestManager_ResetClearsConnection(t *testing.T) { + m := hubble.NewManager() + // Reset with nil should not panic and IsConnected should remain false + m.Reset(nil, nil) + assert.False(t, m.IsConnected()) +} + +func TestManager_ResetUnblocksNegativeCache(t *testing.T) { + // Verify that Reset() allows a fresh dial attempt even after a previous + // failure was cached for the prior generation. + m := hubble.NewManager() + // Two consecutive Resets with nil clients prove generation increments: + // the negative-cache flag from nil-client early-return (if any) must not + // persist across Reset calls. + m.Reset(nil, nil) + m.Reset(nil, nil) + flows, err := m.GetFlows(context.Background(), "default", 60*time.Second) + require.NoError(t, err) + assert.Empty(t, flows) + // IsConnected must still be false — nil clients short-circuit before dial. + assert.False(t, m.IsConnected()) +} + +func TestManager_NegativeCacheTTLExpiry(t *testing.T) { + // Verify that an expired TTL allows a fresh dial attempt, observable via + // the fake client's action recorder: if the dial was attempted, at least + // one Pods.List call will appear in cs.Actions(). + cs := fakeclient.NewSimpleClientset() // empty cluster — no hubble-relay pods + cfg := &rest.Config{Host: "http://127.0.0.1:1"} + + m := hubble.NewManager() + m.SetDialFailedTTL(time.Nanosecond) // expire almost immediately + m.Reset(cs, cfg) + + // Inject a negative-cache entry that is already past the TTL. + past := time.Now().Add(-time.Hour) + m.ForceNegativeCache(past) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + actionsBefore := len(cs.Actions()) + _, err := m.GetFlows(ctx, "", time.Second) + require.NoError(t, err) + + // TTL expired → dial was attempted → at least one API call (Pods.List to find hubble-relay). + if len(cs.Actions()) <= actionsBefore { + t.Errorf("expected at least one API call after TTL expiry, got none (negative cache was not cleared)") + } +} diff --git a/go-core/internal/hubble/discoverer.go b/go-core/internal/hubble/discoverer.go new file mode 100644 index 0000000..425e8e3 --- /dev/null +++ b/go-core/internal/hubble/discoverer.go @@ -0,0 +1,92 @@ +package hubble + +import ( + "context" + "fmt" + "time" + + "github.com/podscape/go-core/internal/graph" +) + +const FlowWindow = 60 * time.Second + +// FlowGetter is the interface for retrieving observed flows. +// Implemented by *Manager in production and stub types in tests. +type FlowGetter interface { + GetFlows(ctx context.Context, namespace string, window time.Duration) ([]Flow, error) +} + +// HubbleDiscoverer implements graph.Discoverer and emits hubble-flow edges +// for pod pairs observed communicating in the last flowWindow. +type HubbleDiscoverer struct { + getter FlowGetter + flowWindow time.Duration +} + +func NewDiscoverer(getter FlowGetter, flowWindow time.Duration) *HubbleDiscoverer { + return &HubbleDiscoverer{getter: getter, flowWindow: flowWindow} +} + +func (d *HubbleDiscoverer) Name() string { return "HubbleDiscoverer" } + +func (d *HubbleDiscoverer) Discover(nodes []graph.Node, _ graph.ResourceCache) []graph.Edge { + if d.getter == nil { + return nil + } + + // Build lookup: "namespace/podName" -> node.ID for all pod nodes. + podIndex := make(map[string]string, len(nodes)) + for _, n := range nodes { + if n.Kind == graph.KindPod { + podIndex[n.Namespace+"/"+n.Name] = n.ID + } + } + if len(podIndex) == 0 { + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + flows, err := d.getter.GetFlows(ctx, "", d.flowWindow) + if err != nil || len(flows) == 0 { + return nil + } + + type edgeKey struct{ src, dst string } + type edgeMeta struct{ dropped bool } + seen := make(map[edgeKey]edgeMeta) + + for _, f := range flows { + srcID, srcOK := podIndex[f.SrcNamespace+"/"+f.SrcPod] + dstID, dstOK := podIndex[f.DstNamespace+"/"+f.DstPod] + if !srcOK || !dstOK || srcID == dstID { + continue + } + k := edgeKey{srcID, dstID} + existing := seen[k] + // DROPPED and ERROR both indicate a traffic problem; DROPPED takes + // priority but ERROR is also surfaced rather than silently counted as + // forwarded traffic. + if f.Verdict == "DROPPED" || f.Verdict == "ERROR" { + existing.dropped = true + } + seen[k] = existing + } + + edges := make([]graph.Edge, 0, len(seen)) + for k, m := range seen { + label := "" + if m.dropped { + label = "dropped" + } + edges = append(edges, graph.Edge{ + ID: fmt.Sprintf("hubble:%s->%s", k.src, k.dst), + Source: k.src, + Target: k.dst, + Kind: graph.EdgeHubbleFlow, + Label: label, + }) + } + return edges +} diff --git a/go-core/internal/hubble/discoverer_test.go b/go-core/internal/hubble/discoverer_test.go new file mode 100644 index 0000000..9e62740 --- /dev/null +++ b/go-core/internal/hubble/discoverer_test.go @@ -0,0 +1,91 @@ +package hubble_test + +import ( + "context" + "testing" + "time" + + "github.com/podscape/go-core/internal/graph" + "github.com/podscape/go-core/internal/hubble" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type stubGetter struct { + flows []hubble.Flow +} + +func (s *stubGetter) GetFlows(_ context.Context, _ string, _ time.Duration) ([]hubble.Flow, error) { + return s.flows, nil +} + +func TestHubbleDiscoverer_EmitsEdgeForObservedFlow(t *testing.T) { + stub := &stubGetter{flows: []hubble.Flow{ + {SrcNamespace: "default", SrcPod: "frontend-abc", DstNamespace: "default", DstPod: "backend-xyz", Verdict: "FORWARDED"}, + }} + d := hubble.NewDiscoverer(stub, hubble.FlowWindow) + nodes := []graph.Node{ + {ID: "pod:uid-frontend", Kind: graph.KindPod, Name: "frontend-abc", Namespace: "default"}, + {ID: "pod:uid-backend", Kind: graph.KindPod, Name: "backend-xyz", Namespace: "default"}, + } + edges := d.Discover(nodes, nil) + require.Len(t, edges, 1) + assert.Equal(t, graph.EdgeHubbleFlow, edges[0].Kind) + assert.Equal(t, "pod:uid-frontend", edges[0].Source) + assert.Equal(t, "pod:uid-backend", edges[0].Target) +} + +func TestHubbleDiscoverer_SkipsFlowsForUnknownNodes(t *testing.T) { + stub := &stubGetter{flows: []hubble.Flow{ + {SrcNamespace: "default", SrcPod: "ghost-pod", DstNamespace: "default", DstPod: "backend-xyz", Verdict: "FORWARDED"}, + }} + d := hubble.NewDiscoverer(stub, hubble.FlowWindow) + nodes := []graph.Node{ + {ID: "pod:uid-backend", Kind: graph.KindPod, Name: "backend-xyz", Namespace: "default"}, + } + edges := d.Discover(nodes, nil) + assert.Empty(t, edges) +} + +func TestHubbleDiscoverer_DeduplicatesParallelFlows(t *testing.T) { + stub := &stubGetter{flows: []hubble.Flow{ + {SrcNamespace: "default", SrcPod: "frontend-abc", DstNamespace: "default", DstPod: "backend-xyz", Verdict: "FORWARDED"}, + {SrcNamespace: "default", SrcPod: "frontend-abc", DstNamespace: "default", DstPod: "backend-xyz", Verdict: "FORWARDED"}, + {SrcNamespace: "default", SrcPod: "frontend-abc", DstNamespace: "default", DstPod: "backend-xyz", Verdict: "FORWARDED"}, + }} + d := hubble.NewDiscoverer(stub, hubble.FlowWindow) + nodes := []graph.Node{ + {ID: "pod:uid-frontend", Kind: graph.KindPod, Name: "frontend-abc", Namespace: "default"}, + {ID: "pod:uid-backend", Kind: graph.KindPod, Name: "backend-xyz", Namespace: "default"}, + } + edges := d.Discover(nodes, nil) + assert.Len(t, edges, 1) +} + +func TestHubbleDiscoverer_DroppedFlowGetsDroppedLabel(t *testing.T) { + stub := &stubGetter{flows: []hubble.Flow{ + {SrcNamespace: "default", SrcPod: "frontend-abc", DstNamespace: "default", DstPod: "backend-xyz", Verdict: "DROPPED"}, + }} + d := hubble.NewDiscoverer(stub, hubble.FlowWindow) + nodes := []graph.Node{ + {ID: "pod:uid-frontend", Kind: graph.KindPod, Name: "frontend-abc", Namespace: "default"}, + {ID: "pod:uid-backend", Kind: graph.KindPod, Name: "backend-xyz", Namespace: "default"}, + } + edges := d.Discover(nodes, nil) + require.Len(t, edges, 1) + assert.Equal(t, "dropped", edges[0].Label) +} + +func TestHubbleDiscoverer_ErrorFlowGetsDroppedLabel(t *testing.T) { + stub := &stubGetter{flows: []hubble.Flow{ + {SrcNamespace: "default", SrcPod: "frontend-abc", DstNamespace: "default", DstPod: "backend-xyz", Verdict: "ERROR"}, + }} + d := hubble.NewDiscoverer(stub, hubble.FlowWindow) + nodes := []graph.Node{ + {ID: "pod:uid-frontend", Kind: graph.KindPod, Name: "frontend-abc", Namespace: "default"}, + {ID: "pod:uid-backend", Kind: graph.KindPod, Name: "backend-xyz", Namespace: "default"}, + } + edges := d.Discover(nodes, nil) + require.Len(t, edges, 1) + assert.Equal(t, "dropped", edges[0].Label) +} diff --git a/go-core/internal/hubble/export_test.go b/go-core/internal/hubble/export_test.go new file mode 100644 index 0000000..ee20569 --- /dev/null +++ b/go-core/internal/hubble/export_test.go @@ -0,0 +1,13 @@ +package hubble + +import "time" + +// ForceNegativeCache injects negative-cache state directly for testing, +// bypassing the normal dial path. +func (m *Manager) ForceNegativeCache(at time.Time) { + m.mu.Lock() + defer m.mu.Unlock() + m.dialFailed = true + m.dialFailedGen = m.generation + m.dialFailedAt = at +} diff --git a/go-core/internal/providers/detect.go b/go-core/internal/providers/detect.go index 84858a5..615bc8d 100644 --- a/go-core/internal/providers/detect.go +++ b/go-core/internal/providers/detect.go @@ -17,15 +17,24 @@ type ProviderSet struct { NginxInc bool `json:"nginxInc"` // kubernetes-ingress (NGINX Inc, CRD-based) NginxCommunity bool `json:"nginxCommunity"` // ingress-nginx (community, annotation-based) Keda bool `json:"keda"` + Cilium bool `json:"cilium"` + HubbleRelay bool `json:"hubbleRelay"` } // Detect probes the cluster's API group list and IngressClass resources to // determine which ingress and service mesh providers are installed. -// It is intentionally best-effort: discovery failures return an empty -// ProviderSet so the rest of the app keeps working regardless. -func Detect(disco discovery.DiscoveryInterface, ingressClasses []networkingv1.IngressClass) ProviderSet { +// hubbleRelayPresent should be true when the hubble-relay Service exists in +// kube-system; it is set independently of API-group discovery so it survives +// clusters with restricted discovery RBAC. All other providers require a +// successful ServerGroups call — failures leave them false so the app degrades +// gracefully. +func Detect(disco discovery.DiscoveryInterface, ingressClasses []networkingv1.IngressClass, hubbleRelayPresent bool) ProviderSet { var ps ProviderSet + // HubbleRelay is based on a Service existence check, not API-group + // discovery, so assign it before the ServerGroups error guard. + ps.HubbleRelay = hubbleRelayPresent + groups, err := disco.ServerGroups() if err != nil { return ps @@ -51,6 +60,8 @@ func Detect(disco discovery.DiscoveryInterface, ingressClasses []networkingv1.In ps.NginxInc = true case "keda.sh": ps.Keda = true + case "cilium.io": + ps.Cilium = true } } diff --git a/go-core/internal/providers/detect_test.go b/go-core/internal/providers/detect_test.go index 45385bb..40a8c26 100644 --- a/go-core/internal/providers/detect_test.go +++ b/go-core/internal/providers/detect_test.go @@ -35,14 +35,20 @@ func makeDiscovery(groupVersions ...string) *fakediscovery.FakeDiscovery { } func TestDetect_NoProviders(t *testing.T) { - ps := providers.Detect(makeDiscovery(), nil) + ps := providers.Detect(makeDiscovery(), nil, false) if ps.Istio || ps.Traefik || ps.NginxInc || ps.NginxCommunity || ps.Keda { t.Errorf("expected all false, got %+v", ps) } + if ps.Cilium { + t.Errorf("expected Cilium=false, got %+v", ps) + } + if ps.HubbleRelay { + t.Errorf("expected HubbleRelay=false, got %+v", ps) + } } func TestDetect_Istio(t *testing.T) { - ps := providers.Detect(makeDiscovery("networking.istio.io/v1alpha3"), nil) + ps := providers.Detect(makeDiscovery("networking.istio.io/v1alpha3"), nil, false) if !ps.Istio { t.Error("expected Istio=true") } @@ -55,7 +61,7 @@ func TestDetect_Istio(t *testing.T) { } func TestDetect_TraefikV3(t *testing.T) { - ps := providers.Detect(makeDiscovery("traefik.io/v1alpha1"), nil) + ps := providers.Detect(makeDiscovery("traefik.io/v1alpha1"), nil, false) if !ps.Traefik { t.Error("expected Traefik=true") } @@ -65,7 +71,7 @@ func TestDetect_TraefikV3(t *testing.T) { } func TestDetect_TraefikV2(t *testing.T) { - ps := providers.Detect(makeDiscovery("traefik.containo.us/v1alpha1"), nil) + ps := providers.Detect(makeDiscovery("traefik.containo.us/v1alpha1"), nil, false) if !ps.Traefik { t.Error("expected Traefik=true") } @@ -76,7 +82,7 @@ func TestDetect_TraefikV2(t *testing.T) { func TestDetect_TraefikV3WinsOverV2(t *testing.T) { // Both API groups present — v3 must win regardless of iteration order. - ps := providers.Detect(makeDiscovery("traefik.io/v1alpha1", "traefik.containo.us/v1alpha1"), nil) + ps := providers.Detect(makeDiscovery("traefik.io/v1alpha1", "traefik.containo.us/v1alpha1"), nil, false) if !ps.Traefik { t.Error("expected Traefik=true") } @@ -86,7 +92,7 @@ func TestDetect_TraefikV3WinsOverV2(t *testing.T) { } func TestDetect_NginxInc(t *testing.T) { - ps := providers.Detect(makeDiscovery("k8s.nginx.org/v1"), nil) + ps := providers.Detect(makeDiscovery("k8s.nginx.org/v1"), nil, false) if !ps.NginxInc { t.Error("expected NginxInc=true") } @@ -99,7 +105,7 @@ func TestDetect_NginxCommunity_ViaIngressClass(t *testing.T) { ic := networkingv1.IngressClass{ Spec: networkingv1.IngressClassSpec{Controller: "k8s.io/ingress-nginx"}, } - ps := providers.Detect(makeDiscovery(), []networkingv1.IngressClass{ic}) + ps := providers.Detect(makeDiscovery(), []networkingv1.IngressClass{ic}, false) if !ps.NginxCommunity { t.Error("expected NginxCommunity=true") } @@ -112,7 +118,7 @@ func TestDetect_NginxCommunity_PartialControllerString(t *testing.T) { ic := networkingv1.IngressClass{ Spec: networkingv1.IngressClassSpec{Controller: "some-vendor/ingress-nginx"}, } - ps := providers.Detect(makeDiscovery(), []networkingv1.IngressClass{ic}) + ps := providers.Detect(makeDiscovery(), []networkingv1.IngressClass{ic}, false) if !ps.NginxCommunity { t.Error("expected NginxCommunity=true for partial controller match") } @@ -122,14 +128,14 @@ func TestDetect_NginxCommunity_CaseInsensitive(t *testing.T) { ic := networkingv1.IngressClass{ Spec: networkingv1.IngressClassSpec{Controller: "INGRESS-NGINX"}, } - ps := providers.Detect(makeDiscovery(), []networkingv1.IngressClass{ic}) + ps := providers.Detect(makeDiscovery(), []networkingv1.IngressClass{ic}, false) if !ps.NginxCommunity { t.Error("expected NginxCommunity=true for uppercase controller") } } func TestDetect_Keda(t *testing.T) { - ps := providers.Detect(makeDiscovery("keda.sh/v1alpha1"), nil) + ps := providers.Detect(makeDiscovery("keda.sh/v1alpha1"), nil, false) if !ps.Keda { t.Error("expected Keda=true when keda.sh API group present") } @@ -139,17 +145,63 @@ func TestDetect_Keda(t *testing.T) { } func TestDetect_NoKeda_WhenGroupAbsent(t *testing.T) { - ps := providers.Detect(makeDiscovery("networking.istio.io/v1alpha3"), nil) + ps := providers.Detect(makeDiscovery("networking.istio.io/v1alpha3"), nil, false) if ps.Keda { t.Errorf("expected Keda=false when keda.sh not in groups, got %+v", ps) } } func TestDetect_DiscoveryFailure_ReturnsEmptySet(t *testing.T) { - ps := providers.Detect(&errorDiscovery{}, nil) - if ps.Istio || ps.Traefik || ps.NginxInc || ps.NginxCommunity || ps.Keda { + ps := providers.Detect(&errorDiscovery{}, nil, false) + if ps.Istio || ps.Traefik || ps.NginxInc || ps.NginxCommunity || ps.Keda || ps.Cilium { t.Errorf("expected all false on discovery error, got %+v", ps) } + if ps.HubbleRelay { + t.Errorf("expected HubbleRelay=false when hubbleRelayPresent=false, got %+v", ps) + } +} + +func TestDetect_DiscoveryFailure_PreservesHubbleRelay(t *testing.T) { + // HubbleRelay is detected via a Service check independent of API-group + // discovery — it must remain true even when ServerGroups() fails. + ps := providers.Detect(&errorDiscovery{}, nil, true) + if !ps.HubbleRelay { + t.Errorf("expected HubbleRelay=true when hubbleRelayPresent=true even on discovery error, got %+v", ps) + } + // All API-group-dependent providers must still be false. + if ps.Istio || ps.Traefik || ps.NginxInc || ps.NginxCommunity || ps.Keda || ps.Cilium { + t.Errorf("expected all API-group providers false on discovery error, got %+v", ps) + } +} + +func TestDetect_CiliumPresent(t *testing.T) { + ps := providers.Detect(makeDiscovery("cilium.io/v2"), nil, false) + if !ps.Cilium { + t.Error("expected Cilium=true when cilium.io API group present") + } + if ps.HubbleRelay { + t.Error("expected HubbleRelay=false when hubbleRelayPresent=false") + } +} + +func TestDetect_HubbleRelayPresent(t *testing.T) { + ps := providers.Detect(makeDiscovery(), nil, true) + if ps.Cilium { + t.Error("expected Cilium=false when cilium.io API group absent") + } + if !ps.HubbleRelay { + t.Error("expected HubbleRelay=true when hubbleRelayPresent=true") + } +} + +func TestDetect_CiliumAndHubble(t *testing.T) { + ps := providers.Detect(makeDiscovery("cilium.io/v2"), nil, true) + if !ps.Cilium { + t.Error("expected Cilium=true") + } + if !ps.HubbleRelay { + t.Error("expected HubbleRelay=true") + } } // errorDiscovery is a minimal discovery.DiscoveryInterface that always errors on ServerGroups. diff --git a/package.json b/package.json index a81086b..4b9dfe8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "podscape-electron", - "version": "3.2.2", + "version": "3.3.0", "description": "Kubernetes management desktop app — Electron + React + TypeScript", "main": "./out/main/index.js", "author": "Coding Protocols Private Limited ", diff --git a/src/main/ipc/kubectl.ts b/src/main/ipc/kubectl.ts index dbfcaa9..7f415f0 100644 --- a/src/main/ipc/kubectl.ts +++ b/src/main/ipc/kubectl.ts @@ -282,8 +282,10 @@ export class KubectlProvider { } } -async function getTopology(ns: string): Promise { - const url = `/topology?namespace=${encodeURIComponent(ns)}` +async function getTopology(ns: string, flowWindow?: number): Promise { + const params = new URLSearchParams({ namespace: ns }) + if (flowWindow !== undefined && flowWindow !== 60) params.set('flowWindow', String(flowWindow)) + const url = `/topology?${params.toString()}` const res = await checkedSidecarFetch(url) return await res.json() } @@ -474,7 +476,7 @@ export function registerKubectlHandlers(): void { cancelAllExecStreams() }) - ipcMain.handle('kubectl:getTopology', (_e, ns) => getTopology(ns)) + ipcMain.handle('kubectl:getTopology', (_e, ns, flowWindow) => getTopology(ns, flowWindow)) // Track alive-poll timers keyed by forward id so we can clear them on // stopPortForward or when the tunnel exits on its own. diff --git a/src/preload/index.ts b/src/preload/index.ts index 5811f67..7d3fadd 100644 --- a/src/preload/index.ts +++ b/src/preload/index.ts @@ -136,8 +136,8 @@ const kubectl = { isReady: (): Promise => ipcRenderer.invoke('kubectl:isReady'), - getTopology: (namespace: string) => - ipcRenderer.invoke('kubectl:getTopology', namespace), + getTopology: (namespace: string, flowWindow?: number) => + ipcRenderer.invoke('kubectl:getTopology', namespace, flowWindow), getProviders: () => ipcRenderer.invoke('kubectl:getProviders'), diff --git a/src/renderer/components/panels/NetworkPanel.tsx b/src/renderer/components/panels/NetworkPanel.tsx index 0d236b6..316df09 100644 --- a/src/renderer/components/panels/NetworkPanel.tsx +++ b/src/renderer/components/panels/NetworkPanel.tsx @@ -1213,6 +1213,7 @@ const WORKLOAD_KIND_TO_SECTION: Record = { export default function NetworkPanel(): JSX.Element { const { selectedContext, loadingNamespaces, namespaces, theme, setSection } = useAppStore() + const providers = useAppStore(s => s.providers) const dark = theme === 'dark' const [panelNs, setPanelNs] = useState('default') @@ -1226,39 +1227,90 @@ export default function NetworkPanel(): JSX.Element { const [fitTrigger, setFitTrigger] = useState(0) const [visibleFilters, setVisibleFilters] = useState>(() => new Set(KIND_DEFS.map(d => d.filterKey))) const [searchQuery, setSearchQuery] = useState('') + const [flowWindowSecs, setFlowWindowSecs] = useState(60) + const graphFingerprintRef = useRef('') + const loadingRef = useRef(false) + const abortControllerRef = useRef(null) const load = useCallback((ns: string) => { if (!selectedContext || loadingNamespaces) return + // Cancel any in-flight request so stale responses never overwrite fresh ones. + abortControllerRef.current?.abort() + const controller = new AbortController() + abortControllerRef.current = controller setLoading(true) + loadingRef.current = true const nsArg = ns === '_all' ? '' : ns // @ts-ignore - window.kubectl.getTopology(nsArg) + window.kubectl.getTopology(nsArg, flowWindowSecs) .then((data: Graph) => { - setRawGraph(data) + if (controller.signal.aborted) return + // Only update state when the graph actually changed to prevent the + // force-directed layout from jumping on stable data during polling. + const fp = data.nodes.map(n => n.id).sort().join(',') + '|' + data.edges.map(e => e.id).sort().join(',') + if (fp !== graphFingerprintRef.current) { + graphFingerprintRef.current = fp + setRawGraph(data) + } setLoading(false) + loadingRef.current = false }) .catch((err) => { + if (controller.signal.aborted) return console.error('Failed to load topology:', err) setLoading(false) + loadingRef.current = false }) - }, [selectedContext, loadingNamespaces]) + }, [selectedContext, loadingNamespaces, flowWindowSecs]) + + // Clear fingerprint on namespace/param change so fresh data always renders. + // Polling loads skip this — they go through the interval, not this effect. + useEffect(() => { + graphFingerprintRef.current = '' + load(panelNs) + return () => { + abortControllerRef.current?.abort() + setLoading(false) + loadingRef.current = false + } + }, [panelNs, load]) - useEffect(() => { load(panelNs) }, [panelNs, load]) + // Auto-refresh topology every 30 s when Hubble Relay is available so flow + // edges stay current without requiring manual navigation away and back. + // `loading` is intentionally excluded from deps — accessed via loadingRef so + // the 30 s timer is not reset on every fetch start/finish. + useEffect(() => { + if (!providers.hubbleRelay) return + const id = setInterval(() => { if (!loadingRef.current) load(panelNs) }, 30_000) + return () => { clearInterval(id); abortControllerRef.current?.abort() } + }, [providers.hubbleRelay, panelNs, load]) const graph = useMemo(() => { const rNodes = rawGraph?.nodes || [] const rEdges = rawGraph?.edges || [] const rNss = rawGraph?.namespaces || [] - if (visibleFilters.size === KIND_DEFS.length) { + const showHubbleFlows = visibleFilters.has('hubble-flow') + const allNodeFiltersActive = KIND_DEFS.every(d => visibleFilters.has(d.filterKey)) + + if (allNodeFiltersActive && showHubbleFlows) { return { nodes: rNodes, edges: rEdges, namespaces: rNss } } + if (allNodeFiltersActive && !showHubbleFlows) { + // All node filters active but hubble-flow edges hidden + const edges = rEdges.filter(e => e.kind !== 'hubble-flow') + return { nodes: rNodes, edges, namespaces: rNss } + } + const nodes = rNodes.filter(n => n.kind === 'workload' ? visibleFilters.has(n.workloadKind ?? '') : visibleFilters.has(n.kind) ) const nodeIds = new Set(nodes.map(n => n.id)) - const edges = rEdges.filter(e => nodeIds.has(e.source) && nodeIds.has(e.target)) + const edges = rEdges.filter(e => + nodeIds.has(e.source) && nodeIds.has(e.target) && + (e.kind !== 'hubble-flow' || showHubbleFlows) + ) const nss = [...new Set(nodes.map(n => n.namespace))].sort() return { nodes, edges, namespaces: nss } }, [rawGraph, visibleFilters]) @@ -1366,6 +1418,29 @@ export default function NetworkPanel(): JSX.Element { onToggle={() => toggleFilter(filterKey)} /> ))} + {providers.hubbleRelay && ( + <> + e.kind === 'hubble-flow').length} + active={visibleFilters.has('hubble-flow')} + onToggle={() => toggleFilter('hubble-flow')} + /> + + + )} @@ -1436,6 +1511,20 @@ export default function NetworkPanel(): JSX.Element { )} + {/* Hubble zero-flows notice — shown when the filter is active but no flows + were observed in the lookback window (covers both "no traffic" and TLS). */} + {!loading && providers.hubbleRelay && visibleFilters.has('hubble-flow') && + graph.nodes.length > 0 && (rawGraph?.edges || []).filter(e => e.kind === 'hubble-flow').length === 0 && ( +
+ + + + No flows observed in the last {flowWindowSecs < 60 ? `${flowWindowSecs} s` : `${flowWindowSecs / 60} min`} — no traffic occurred, or Hubble Relay may require TLS +
+ )} {/* Legend overlay */} {showLegend && } diff --git a/src/renderer/components/panels/NetworkPanel.utils.ts b/src/renderer/components/panels/NetworkPanel.utils.ts index 2ef7de8..f2fbc1a 100644 --- a/src/renderer/components/panels/NetworkPanel.utils.ts +++ b/src/renderer/components/panels/NetworkPanel.utils.ts @@ -1,7 +1,7 @@ // ─── Shared types ──────────────────────────────────────────────────────────── export type NodeKind = 'ingress' | 'service' | 'pod' | 'policy' | 'workload' | 'pvc' | 'node' -export type EdgeKind = 'ing-svc' | 'svc-pod' | 'policy-pod' | 'pol-ingress' | 'pol-egress' | 'pod-pvc' | 'pod-node' | 'controller-pod' | 'controller-workload' +export type EdgeKind = 'ing-svc' | 'svc-pod' | 'policy-pod' | 'pol-ingress' | 'pol-egress' | 'pod-pvc' | 'pod-node' | 'controller-pod' | 'controller-workload' | 'hubble-flow' export type EdgeClass = 'traffic' | 'infra' | 'policy' export interface GraphNode { @@ -56,6 +56,7 @@ export function edgeStyle(kind: EdgeKind): EdgeStyleResult { case 'pod-node': return { color: '#06b6d4', dur: '3.0s', class: 'infra' } case 'controller-pod': return { color: '#fbbf24', dur: '2.0s', class: 'infra' } case 'controller-workload': return { color: '#fbbf24', dur: '2.0s', class: 'infra' } + case 'hubble-flow': return { color: '#2dd4bf', dur: '1.2s', class: 'traffic' } default: return { color: '#9ca3af', dur: '2.5s', class: 'infra' } } } diff --git a/src/renderer/store/slices/analysisSlice.ts b/src/renderer/store/slices/analysisSlice.ts index e8bc4b4..c668f48 100644 --- a/src/renderer/store/slices/analysisSlice.ts +++ b/src/renderer/store/slices/analysisSlice.ts @@ -23,6 +23,10 @@ export interface AnalysisSlice { // called again before a previous scan completes. let activeProgressUnsub: (() => void) | null = null +// Monotonic counter — prevents a completed scan's finally block from clearing the +// securityScanning flag when a newer scan has already started (A→B overlap race). +let scanSeq = 0 + export const createAnalysisSlice: StoreSlice = (set, get) => ({ scanResults: {}, isScanning: false, @@ -58,6 +62,7 @@ export const createAnalysisSlice: StoreSlice = (set, get) => ({ // Snapshot the context at scan start. Trivy + kubesec can take several // minutes; discard results if the user switched context mid-scan. const scanCtx = get().selectedContext + const mySeq = ++scanSeq set({ securityScanning: true, scanInBackground: background, error: null, securityScanProgressLines: [] }) // Synthetic milestone helper — prefixed with '› ' so the UI can style them distinctly. @@ -198,7 +203,8 @@ export const createAnalysisSlice: StoreSlice = (set, get) => ({ } finally { unsubProgress() activeProgressUnsub = null - set({ securityScanning: false, scanInBackground: false }) + // Only clear the scanning flag if no newer scan has started since this one. + if (mySeq === scanSeq) set({ securityScanning: false, scanInBackground: false }) } }, }) diff --git a/src/renderer/store/slices/clusterSlice.test.ts b/src/renderer/store/slices/clusterSlice.test.ts index d6a2fb9..fc38d25 100644 --- a/src/renderer/store/slices/clusterSlice.test.ts +++ b/src/renderer/store/slices/clusterSlice.test.ts @@ -17,6 +17,9 @@ describe('clusterSlice', () => { fetchProviders: vi.fn(), fetchAllowedVerbs: vi.fn(), stopAllPortForwards: vi.fn(), + closeExec: vi.fn(), + providers: { istio: false, traefik: false, nginxInc: false, nginxCommunity: false, keda: false, cilium: false, hubbleRelay: false }, + providersLoading: false, hotbarContexts: [], prodContexts: [], } diff --git a/src/renderer/store/slices/clusterSlice.ts b/src/renderer/store/slices/clusterSlice.ts index 7bfbab4..2ff8a56 100644 --- a/src/renderer/store/slices/clusterSlice.ts +++ b/src/renderer/store/slices/clusterSlice.ts @@ -152,6 +152,8 @@ export const createClusterSlice: StoreSlice = (set, get) => ({ // Snapshot namespace state so we can restore it if the connection attempt fails. const previousNamespaces = get().namespaces const previousSelectedNamespace = get().selectedNamespace + // Snapshot provider state so it can be restored if the connection attempt fails. + const previousProviders = get().providers const isProd = get().prodContexts.includes(name) // Unblock any section fetches that were in-flight for the previous context. // Without this, loadSection for the new context can see a stale in-flight key @@ -174,7 +176,13 @@ export const createClusterSlice: StoreSlice = (set, get) => ({ selectedContext: name, isProduction: isProd, loadingNamespaces: true, loadingResources: true, namespaces: [], selectedNamespace: null, selectedResource: null, error: null, // Reset security scan state so stale results from the previous context are not shown. - securityScanResults: null, kubesecBatchResults: null, trivyAvailable: null, + securityScanResults: null, kubesecBatchResults: null, trivyAvailable: null, securityScanProgressLines: [], + // Reset scanning flags so the new context's scan button is not stuck disabled + // if a background scan was in-flight when the context switched. + securityScanning: false, scanInBackground: false, + // Reset provider loading flag so it doesn't get stuck if the previous fetch + // was in-flight and the stale-guard fires without resetting it. + providersLoading: false, // Reset freshness timestamps so next dashboard/preload fetch always runs. lastPreloadedAt: 0, lastDashboardLoadedAt: 0, // Clear owner chains cached from previous context. @@ -185,7 +193,7 @@ export const createClusterSlice: StoreSlice = (set, get) => ({ metricsError: null, // Reset provider detection so stale flags from the old cluster don't // briefly show sidebar groups that don't exist in the new cluster. - providers: { istio: false, traefik: false, nginxInc: false, nginxCommunity: false, keda: false }, + providers: { istio: false, traefik: false, nginxInc: false, nginxCommunity: false, keda: false, cilium: false, hubbleRelay: false }, // Navigate away from provider-specific sections so ProviderResourcePanel // doesn't attempt a fetch against a cluster that may lack those CRDs. ...(isProviderSection ? { section: 'dashboard' as const } : {}), @@ -204,6 +212,8 @@ export const createClusterSlice: StoreSlice = (set, get) => ({ try { await window.kubectl.cancelAllStreams() } catch {} // Stop all active port forwards — they belong to the previous context. get().stopAllPortForwards() + // Close any open exec sessions — their PTY connections belong to the previous context. + get().closeExec() // Tell the sidecar to switch its clientset + informer cache to the new // context BEFORE fetching any data. Without this the sidecar keeps // serving the previous context's cache. @@ -240,6 +250,9 @@ export const createClusterSlice: StoreSlice = (set, get) => ({ // Restore namespace list so the user is not left with an empty sidebar. namespaces: previousNamespaces, selectedNamespace: previousSelectedNamespace, + // Restore provider state so sidebar groups are not wiped on a failed switch. + providers: previousProviders, + providersLoading: false, }) } const msg = (connectErr as Error).message diff --git a/src/renderer/store/slices/providersSlice.test.ts b/src/renderer/store/slices/providersSlice.test.ts index 6a55dee..84b941b 100644 --- a/src/renderer/store/slices/providersSlice.test.ts +++ b/src/renderer/store/slices/providersSlice.test.ts @@ -12,7 +12,7 @@ describe('providersSlice', () => { beforeEach(() => { state = { selectedContext: 'ctx-a', - providers: { istio: false, traefik: false, nginxInc: false, nginxCommunity: false, keda: false }, + providers: { istio: false, traefik: false, nginxInc: false, nginxCommunity: false, keda: false, cilium: false, hubbleRelay: false }, providersLoading: false, } set = vi.fn((update: any) => { @@ -42,6 +42,9 @@ describe('providersSlice', () => { istio: false, nginxInc: false, nginxCommunity: false, + keda: false, + cilium: false, + hubbleRelay: false, } windowMock.kubectl.getProviders.mockResolvedValue(mockProviders) @@ -64,7 +67,7 @@ describe('providersSlice', () => { expect(set).toHaveBeenCalledWith({ providersLoading: true }) // Error path: reset to defaults expect(set).toHaveBeenCalledWith({ - providers: { istio: false, traefik: false, nginxInc: false, nginxCommunity: false, keda: false }, + providers: { istio: false, traefik: false, nginxInc: false, nginxCommunity: false, keda: false, cilium: false, hubbleRelay: false }, providersLoading: false, }) }) @@ -84,14 +87,15 @@ describe('providersSlice', () => { state.selectedContext = 'ctx-b' // Resolve the in-flight fetch. - resolveProviders({ traefik: true, istio: false, nginxInc: false, nginxCommunity: false }) + resolveProviders({ traefik: true, istio: false, nginxInc: false, nginxCommunity: false, keda: false, cilium: false, hubbleRelay: false }) await fetchPromise - // Only the initial { providersLoading: true } call should have happened. - // The result should have been discarded due to the stale-context guard. - expect(set).toHaveBeenCalledTimes(1) + // Two set calls: { providersLoading: true } then { providersLoading: false } from the + // stale-guard reset. providers must not have been written. + expect(set).toHaveBeenCalledTimes(2) expect(set).toHaveBeenCalledWith({ providersLoading: true }) + expect(set).toHaveBeenCalledWith({ providersLoading: false }) // providers state must not have been updated. - expect(state.providers).toEqual({ istio: false, traefik: false, nginxInc: false, nginxCommunity: false, keda: false }) + expect(state.providers).toEqual({ istio: false, traefik: false, nginxInc: false, nginxCommunity: false, keda: false, cilium: false, hubbleRelay: false }) }) }) diff --git a/src/renderer/store/slices/providersSlice.ts b/src/renderer/store/slices/providersSlice.ts index 03b65fa..5de4d26 100644 --- a/src/renderer/store/slices/providersSlice.ts +++ b/src/renderer/store/slices/providersSlice.ts @@ -13,8 +13,15 @@ const defaultProviders: ProviderSet = { nginxInc: false, nginxCommunity: false, keda: false, + cilium: false, + hubbleRelay: false, } +// Monotonic counter incremented on every fetchProviders() call. Used alongside +// the string-context guard to reject results from A→B→A switch sequences where +// the string comparison alone would pass for the original stale fetch. +let fetchSeq = 0 + export const createProvidersSlice: StoreSlice = (set, get) => ({ providers: defaultProviders, providersLoading: false, @@ -22,16 +29,21 @@ export const createProvidersSlice: StoreSlice = (set, get) => ({ fetchProviders: async () => { const ctx = get().selectedContext if (!ctx) return + const mySeq = ++fetchSeq set({ providersLoading: true }) try { const ps = await window.kubectl.getProviders() - // Discard result if context switched while the request was in-flight - // (same guard as probePrometheus to prevent stale-context overwrites). - if (get().selectedContext !== ctx) return + if (mySeq !== fetchSeq || get().selectedContext !== ctx) { + set({ providersLoading: false }) + return + } set({ providers: ps, providersLoading: false }) } catch (err) { console.error('[providers] detection failed:', err) - if (get().selectedContext !== ctx) return + if (mySeq !== fetchSeq || get().selectedContext !== ctx) { + set({ providersLoading: false }) + return + } set({ providers: defaultProviders, providersLoading: false }) } }, diff --git a/src/renderer/store/slices/resourceSlice.ts b/src/renderer/store/slices/resourceSlice.ts index 7f608f9..02d4146 100644 --- a/src/renderer/store/slices/resourceSlice.ts +++ b/src/renderer/store/slices/resourceSlice.ts @@ -191,7 +191,7 @@ export const createResourceSlice: StoreSlice = (set, get) => ({ window.kubectl.getNodes(ctx), window.kubectl.getHPAs(ctx, nsArg) ]) - if (get().selectedContext !== snapshotCtx) return + if (get().selectedContext !== snapshotCtx) { set({ loadingResources: false }); return } set({ podMetrics: Array.isArray(pm) ? pm : [], nodeMetrics: Array.isArray(nm) ? nm : [], @@ -258,7 +258,7 @@ export const createResourceSlice: StoreSlice = (set, get) => ({ window.kubectl.getNamespaces(ctx), window.kubectl.getNetworkPolicies(ctx, nsArg) ]) - if (get().selectedContext !== snapshotCtx) return + if (get().selectedContext !== snapshotCtx) { set({ loadingResources: false }); return } set({ services: svcs as KubeService[], ingresses: ings as KubeIngress[], @@ -286,7 +286,7 @@ export const createResourceSlice: StoreSlice = (set, get) => ({ window.kubectl.getJobs(ctx, nsArg), window.kubectl.getCronJobs(ctx, nsArg) ]) - if (get().selectedContext !== snapshotCtx) return + if (get().selectedContext !== snapshotCtx) { set({ loadingResources: false }); return } set({ pods: pds as KubePod[], deployments: depls as KubeDeployment[], diff --git a/src/renderer/types/api.ts b/src/renderer/types/api.ts index 6d5d1cf..cd68de3 100644 --- a/src/renderer/types/api.ts +++ b/src/renderer/types/api.ts @@ -98,4 +98,6 @@ export interface ProviderSet { nginxInc: boolean // kubernetes-ingress (NGINX Inc, CRD-based) nginxCommunity: boolean // ingress-nginx (community, annotation-based) keda: boolean + cilium: boolean + hubbleRelay: boolean }