-
-
Notifications
You must be signed in to change notification settings - Fork 161
Expand file tree
/
Copy pathkubernetes.go
More file actions
95 lines (79 loc) · 2.5 KB
/
kubernetes.go
File metadata and controls
95 lines (79 loc) · 2.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package main
import (
"sync"
piperv1beta1 "github.com/tg123/sshpiper/plugin/kubernetes/apis/sshpiper/v1beta1"
sshpiper "github.com/tg123/sshpiper/plugin/kubernetes/generated/clientset/versioned"
piperlister "github.com/tg123/sshpiper/plugin/kubernetes/generated/listers/sshpiper/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
type plugin struct {
k8sclient corev1.CoreV1Interface
lister piperlister.PipeLister
stop chan<- struct{}
kubeCfg *rest.Config
kubeExecMu sync.Mutex
kubeExecBridgeAddr string
kubeExecPipeToKey map[string]string
kubeExecPrivateKeys map[string]string
kubeExecTargets map[string]kubectlExecTarget
}
func newKubernetesPlugin(allNamespaces bool, kubeConfigPath string) (*plugin, error) {
loader := clientcmd.NewDefaultClientConfigLoadingRules()
loader.ExplicitPath = kubeConfigPath
kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
loader,
&clientcmd.ConfigOverrides{},
)
config, err := kubeConfig.ClientConfig()
if err != nil {
return nil, err
}
ns, _, err := kubeConfig.Namespace()
if err != nil {
return nil, err
}
if allNamespaces {
ns = metav1.NamespaceAll
}
k8sclient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
piperclient, err := sshpiper.NewForConfig(config)
if err != nil {
return nil, err
}
listWatcher := cache.NewListWatchFromClient(piperclient.SshpiperV1beta1().RESTClient(), "pipes", ns, fields.Everything())
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
lister := piperlister.NewPipeLister(store)
reflector := cache.NewReflector(listWatcher, &piperv1beta1.Pipe{}, store, 0)
stop := make(chan struct{})
go reflector.Run(stop)
return &plugin{
k8sclient: k8sclient.CoreV1(),
lister: lister,
stop: stop,
kubeCfg: config,
kubeExecPipeToKey: make(map[string]string),
kubeExecPrivateKeys: make(map[string]string),
kubeExecTargets: make(map[string]kubectlExecTarget),
}, nil
}
func (p *plugin) Stop() {
p.stop <- struct{}{}
}
func (p *plugin) list() ([]*piperv1beta1.Pipe, error) {
pipes, err := p.lister.List(labels.Everything())
if err != nil {
return nil, err
}
p.syncKubectlExecState(pipes)
return pipes, nil
}