Skip to content

Commit 9fed7b1

Browse files
committed
cancel goroutine when has error
1 parent d9a9da5 commit 9fed7b1

1 file changed

Lines changed: 28 additions & 2 deletions

File tree

worker/task.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -798,12 +798,19 @@ func (qs *queryState) handleUidPostings(
798798
errCh := make(chan error, numGo)
799799
outputs := make([]*pb.Result, numGo)
800800

801+
cctx, ccancel := context.WithCancel(ctx)
802+
defer ccancel()
801803
calculate := func(start, end int) error {
802804
x.AssertTrue(start%width == 0)
803805
out := &pb.Result{}
804806
outputs[start/width] = out
805807

806808
for i := start; i < end; i++ {
809+
select {
810+
case <-cctx.Done():
811+
return cctx.Err()
812+
default:
813+
}
807814
if i%100 == 0 {
808815
select {
809816
case <-ctx.Done():
@@ -951,7 +958,13 @@ func (qs *queryState) handleUidPostings(
951958
end = srcFn.n
952959
}
953960
go func(start, end int) {
954-
errCh <- calculate(start, end)
961+
if err := calculate(start, end); err != nil {
962+
errCh <- err
963+
ccancel()
964+
return
965+
} else {
966+
errCh <- nil
967+
}
955968
}(start, end)
956969
}
957970
for range numGo {
@@ -1597,11 +1610,18 @@ func (qs *queryState) filterGeoFunction(ctx context.Context, arg funcArgs) error
15971610
attribute.Int("num_go", numGo),
15981611
attribute.Int("width", width)))
15991612

1613+
cctx, ccancel := context.WithCancel(ctx)
1614+
defer ccancel()
16001615
filtered := make([]*pb.List, numGo)
16011616
filter := func(idx, start, end int) error {
16021617
filtered[idx] = &pb.List{}
16031618
out := filtered[idx]
16041619
for _, uid := range uids.Uids[start:end] {
1620+
select {
1621+
case <-cctx.Done():
1622+
return cctx.Err()
1623+
default:
1624+
}
16051625
pl, err := qs.cache.Get(x.DataKey(attr, uid))
16061626
if err != nil {
16071627
return err
@@ -1631,7 +1651,13 @@ func (qs *queryState) filterGeoFunction(ctx context.Context, arg funcArgs) error
16311651
end = len(uids.Uids)
16321652
}
16331653
go func(idx, start, end int) {
1634-
errCh <- filter(idx, start, end)
1654+
if err := filter(idx, start, end); err != nil {
1655+
errCh <- err
1656+
ccancel()
1657+
return
1658+
} else {
1659+
errCh <- nil
1660+
}
16351661
}(i, start, end)
16361662
}
16371663
for range numGo {

0 commit comments

Comments
 (0)