Skip to content

Commit 5b6e9f8

Browse files
committed
TerminateDanglingWorkflows flow written for both argo and system executor
1 parent 177520a commit 5b6e9f8

5 files changed

Lines changed: 83 additions & 33 deletions

File tree

pkg/pipeline/CiHandler.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,7 @@ func (impl *CiHandlerImpl) CancelBuild(workflowId int, forceAbort bool) (int, er
607607
// Terminate workflow
608608
cancelWfDtoRequest := &types.CancelWfRequestDto{
609609
ExecutorType: workflow.ExecutorType,
610-
Name: workflow.Name,
610+
WorkflowName: workflow.Name,
611611
Namespace: workflow.Namespace,
612612
RestConfig: restConfig,
613613
IsExt: isExt,
@@ -617,10 +617,12 @@ func (impl *CiHandlerImpl) CancelBuild(workflowId int, forceAbort bool) (int, er
617617
err = impl.workflowService.TerminateWorkflow(cancelWfDtoRequest)
618618
if err != nil && forceAbort {
619619
impl.Logger.Errorw("error in terminating workflow, with force abort flag flag as true", "workflowName", workflow.Name, "err", err)
620+
621+
cancelWfDtoRequest.WorkflowGenerateName = fmt.Sprintf("%d-%s-", workflowId, workflow.Name)
620622
err1 := impl.workflowService.TerminateDanglingWorkflows(cancelWfDtoRequest)
621623
if err1 != nil {
622624
impl.Logger.Errorw("error in terminating dangling workflows", "cancelWfDtoRequest", cancelWfDtoRequest, "err", err)
623-
return 0, err1
625+
// ignoring error here in case of force abort, confirmed from product
624626
}
625627
} else if err != nil && strings.Contains(err.Error(), "cannot find workflow") {
626628
return 0, &util.ApiError{Code: "200", HttpStatusCode: http.StatusBadRequest, UserMessage: err.Error()}

pkg/pipeline/WorkflowService.go

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ func (impl *WorkflowServiceImpl) GetWorkflowStatus(executorType cdWorkflow.Workf
354354
}
355355

356356
func (impl *WorkflowServiceImpl) TerminateWorkflow(cancelWfDtoRequest *types.CancelWfRequestDto) error {
357-
impl.Logger.Debugw("terminating wf", "name", cancelWfDtoRequest.Name)
357+
impl.Logger.Debugw("terminating wf", "name", cancelWfDtoRequest.WorkflowName)
358358
var err error
359359
if cancelWfDtoRequest.ExecutorType != "" {
360360
workflowExecutor := impl.getWorkflowExecutor(cancelWfDtoRequest.ExecutorType)
@@ -364,36 +364,28 @@ func (impl *WorkflowServiceImpl) TerminateWorkflow(cancelWfDtoRequest *types.Can
364364
if cancelWfDtoRequest.RestConfig == nil {
365365
cancelWfDtoRequest.RestConfig = impl.config
366366
}
367-
err = workflowExecutor.TerminateWorkflow(cancelWfDtoRequest.Name, cancelWfDtoRequest.Namespace, cancelWfDtoRequest.RestConfig)
367+
err = workflowExecutor.TerminateWorkflow(cancelWfDtoRequest.WorkflowName, cancelWfDtoRequest.Namespace, cancelWfDtoRequest.RestConfig)
368368
} else {
369369
wfClient, err := impl.getWfClient(cancelWfDtoRequest.Environment, cancelWfDtoRequest.Namespace, cancelWfDtoRequest.IsExt)
370370
if err != nil {
371371
return err
372372
}
373-
err = util.TerminateWorkflow(context.Background(), wfClient, cancelWfDtoRequest.Name)
373+
err = util.TerminateWorkflow(context.Background(), wfClient, cancelWfDtoRequest.WorkflowName)
374374
}
375375
return err
376376
}
377377

378378
func (impl *WorkflowServiceImpl) TerminateDanglingWorkflows(cancelWfDtoRequest *types.CancelWfRequestDto) error {
379-
impl.Logger.Debugw("terminating dangling wf", "name", cancelWfDtoRequest.Name)
379+
impl.Logger.Debugw("terminating dangling wf", "name", cancelWfDtoRequest.WorkflowName)
380380
var err error
381-
if cancelWfDtoRequest.ExecutorType != "" {
382-
workflowExecutor := impl.getWorkflowExecutor(cancelWfDtoRequest.ExecutorType)
383-
if workflowExecutor == nil {
384-
return errors.New("workflow executor not found")
385-
}
386-
if cancelWfDtoRequest.RestConfig == nil {
387-
cancelWfDtoRequest.RestConfig = impl.config
388-
}
389-
err = workflowExecutor.TerminateWorkflow(cancelWfDtoRequest.Name, cancelWfDtoRequest.Namespace, cancelWfDtoRequest.RestConfig)
390-
} else {
391-
wfClient, err := impl.getWfClient(cancelWfDtoRequest.Environment, cancelWfDtoRequest.Namespace, cancelWfDtoRequest.IsExt)
392-
if err != nil {
393-
return err
394-
}
395-
err = util.TerminateWorkflow(context.Background(), wfClient, cancelWfDtoRequest.Name)
381+
workflowExecutor := impl.getWorkflowExecutor(cancelWfDtoRequest.ExecutorType)
382+
if workflowExecutor == nil {
383+
return errors.New("workflow executor not found")
384+
}
385+
if cancelWfDtoRequest.RestConfig == nil {
386+
cancelWfDtoRequest.RestConfig = impl.config
396387
}
388+
err = workflowExecutor.TerminateDanglingWorkflow(cancelWfDtoRequest.WorkflowGenerateName, cancelWfDtoRequest.Namespace, cancelWfDtoRequest.RestConfig)
397389
return err
398390
}
399391

pkg/pipeline/executors/ArgoWorkflowExecutor.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ type WorkflowExecutor interface {
5959
TerminateWorkflow(workflowName string, namespace string, clusterConfig *rest.Config) error
6060
GetWorkflow(workflowName string, namespace string, clusterConfig *rest.Config) (*unstructured.UnstructuredList, error)
6161
GetWorkflowStatus(workflowName string, namespace string, clusterConfig *rest.Config) (*types.WorkflowStatus, error)
62-
TerminateDanglingWorkflow(workflowName string, namespace string, clusterConfig *rest.Config) error
62+
TerminateDanglingWorkflow(workflowGenerateName string, namespace string, clusterConfig *rest.Config) error
6363
}
6464

6565
type ArgoWorkflowExecutor interface {
@@ -90,8 +90,36 @@ func (impl *ArgoWorkflowExecutorImpl) TerminateWorkflow(workflowName string, nam
9090
return err
9191
}
9292

93-
func (impl *ArgoWorkflowExecutorImpl) TerminateDanglingWorkflow(workflowName string, namespace string, clusterConfig *rest.Config) error {
94-
93+
func (impl *ArgoWorkflowExecutorImpl) TerminateDanglingWorkflow(workflowGenerateName string, namespace string, clusterConfig *rest.Config) error {
94+
impl.logger.Debugw("terminating dangling wf", "workflowGenerateName", workflowGenerateName)
95+
wfClient, err := impl.getClientInstance(namespace, clusterConfig)
96+
if err != nil {
97+
impl.logger.Errorw("cannot build wf client", "workflowGenerateName", workflowGenerateName, "err", err)
98+
return err
99+
}
100+
wfList, err := wfClient.List(context.Background(), v1.ListOptions{})
101+
if err != nil {
102+
impl.logger.Errorw("error in fetching list of workflows", "namespace", namespace, "err", err)
103+
return err
104+
}
105+
var wfToDelete v1alpha1.Workflow
106+
for _, wf := range wfList.Items {
107+
if wf.GenerateName == workflowGenerateName {
108+
wfToDelete = wf
109+
break
110+
}
111+
}
112+
_, err = wfClient.Get(context.Background(), wfToDelete.Name, v1.GetOptions{})
113+
if err != nil {
114+
impl.logger.Errorw("cannot find workflow", "name", wfToDelete.Name, "err", err)
115+
return errors.New("cannot find workflow " + wfToDelete.Name)
116+
}
117+
err = util.TerminateWorkflow(context.Background(), wfClient, wfToDelete.Name)
118+
if err != nil {
119+
impl.logger.Errorw("error in terminating argo executor workflow", "name", wfToDelete.Name, "err", err)
120+
return err
121+
}
122+
return nil
95123
}
96124

97125
func (impl *ArgoWorkflowExecutorImpl) ExecuteWorkflow(workflowTemplate bean.WorkflowTemplate) (*unstructured.UnstructuredList, error) {

pkg/pipeline/executors/SystemWorkflowExecutor.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,35 @@ func (impl *SystemWorkflowExecutorImpl) TerminateWorkflow(workflowName string, n
114114
return err
115115
}
116116

117-
func (impl *SystemWorkflowExecutorImpl) TerminateDanglingWorkflow(workflowName string, namespace string, clusterConfig *rest.Config) error {
118-
117+
func (impl *SystemWorkflowExecutorImpl) TerminateDanglingWorkflow(workflowGenerateName string, namespace string, clusterConfig *rest.Config) error {
118+
_, clientset, err := impl.k8sUtil.GetK8sConfigAndClientsByRestConfig(clusterConfig)
119+
if err != nil {
120+
impl.logger.Errorw("error occurred while creating k8s client", "workflowGenerateName", workflowGenerateName, "namespace", namespace, "err", err)
121+
return err
122+
}
123+
jobList, err := clientset.BatchV1().Jobs(namespace).List(context.Background(), v12.ListOptions{})
124+
if err != nil {
125+
impl.logger.Errorw("error occurred while fetching jobs list for terminating dangling workflows", "namespace", namespace, "err", err)
126+
return err
127+
}
128+
var jobToDelete v1.Job
129+
for _, job := range jobList.Items {
130+
if job.ObjectMeta.GenerateName == workflowGenerateName {
131+
jobToDelete = job
132+
break
133+
}
134+
}
135+
if len(jobToDelete.Name) > 0 {
136+
err = clientset.BatchV1().Jobs(namespace).Delete(context.Background(), jobToDelete.Name, v12.DeleteOptions{})
137+
if err != nil {
138+
if errors.IsNotFound(err) {
139+
err = fmt.Errorf("cannot find job workflow %s", jobToDelete.Name)
140+
}
141+
impl.logger.Errorw("error occurred while deleting workflow", "workflowName", jobToDelete.Name, "namespace", namespace, "err", err)
142+
return err
143+
}
144+
}
145+
return nil
119146
}
120147

121148
func (impl *SystemWorkflowExecutorImpl) GetWorkflow(workflowName string, namespace string, clusterConfig *rest.Config) (*unstructured.UnstructuredList, error) {

pkg/pipeline/types/CiCdConfig.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,14 @@ import (
3939
)
4040

4141
type CancelWfRequestDto struct {
42-
ExecutorType cdWorkflow.WorkflowExecutorType
43-
Name string
44-
Namespace string
45-
RestConfig *rest.Config
46-
IsExt bool
47-
Environment *repository.Environment
48-
ForceAbort bool
42+
ExecutorType cdWorkflow.WorkflowExecutorType
43+
WorkflowName string
44+
Namespace string
45+
RestConfig *rest.Config
46+
IsExt bool
47+
Environment *repository.Environment
48+
ForceAbort bool
49+
WorkflowGenerateName string
4950
}
5051

5152
// build infra configurations like ciTimeout,ciCpuLimit,ciMemLimit,ciCpuReq,ciMemReq are being managed by infraConfig service

0 commit comments

Comments
 (0)