Skip to content

Commit 77943f7

Browse files
committed
Implemented worker pool for concurrent scanning for neighbouring points in DBSCAN
1 parent b3642fe commit 77943f7

5 files changed

Lines changed: 225 additions & 65 deletions

File tree

clusters.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,15 @@ type HardClusterer interface {
4141
/* Returns index of cluster to which the observation was assigned */
4242
Predict(observation []float64) int
4343

44-
/* Provides a method to train the algorithm online and receive intermediate results of computation */
45-
Online(observations chan []float64, done chan struct{}) chan *HCEvent
44+
/* Whether algorithm supports online learning */
45+
IsOnline() bool
4646

4747
/* Allows to configure the algorithms for online learning */
4848
WithOnline(Online) HardClusterer
4949

50+
/* Provides a method to train the algorithm online and receive intermediate results of computation */
51+
Online(observations chan []float64, done chan struct{}) chan *HCEvent
52+
5053
Clusterer
5154
}
5255

@@ -61,12 +64,15 @@ type SoftClusterer interface {
6164
/* Returns probabilities of the observation being assigned to respective clusters */
6265
Predict(observation []float64) []float64
6366

64-
/* Provides a method to train the algorithm online and receive intermediate results of computation */
65-
Online(observations chan []float64, done chan struct{}) chan *SCEvent
67+
/* Whether algorithm supports online learning */
68+
IsOnline() bool
6669

6770
/* Allows to configure the algorithms for online learning */
6871
WithOnline(Online) SoftClusterer
6972

73+
/* Provides a method to train the algorithm online and receive intermediate results of computation */
74+
Online(observations chan []float64, done chan struct{}) chan *SCEvent
75+
7076
Clusterer
7177
}
7278

dbscan.go

Lines changed: 84 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,40 @@ type dbscanClusterer struct {
1010

1111
l, s, o, f int
1212

13-
// For online learning only
14-
alpha float64
15-
dimension int
16-
1713
distance DistanceFunc
1814

1915
mu sync.RWMutex
2016
a, b []int
2117

18+
// channel for distributed searching for nearest neighbours
19+
j chan *nearestJob
20+
21+
// variabes for calculating nearest neighbours concurrently
22+
m *sync.Mutex
23+
w *sync.WaitGroup
24+
p *[]float64
25+
r *[]int
26+
2227
// visited points
2328
v []bool
2429

2530
d [][]float64
2631
}
2732

33+
type nearestJob struct {
34+
a, b int
35+
}
36+
37+
/* Implementation of DBSCAN algorithm with concurrent moditication */
2838
func DbscanClusterer(minpts int, eps float64, distance DistanceFunc) (HardClusterer, error) {
39+
if minpts < 1 {
40+
return nil, ErrZeroMinpts
41+
}
42+
43+
if eps <= 0 {
44+
return nil, ErrZeroEpsilon
45+
}
46+
2947
var d DistanceFunc
3048
{
3149
if distance != nil {
@@ -42,12 +60,11 @@ func DbscanClusterer(minpts int, eps float64, distance DistanceFunc) (HardCluste
4260
}, nil
4361
}
4462

45-
func (c *dbscanClusterer) WithOnline(o Online) HardClusterer {
46-
c.alpha = o.Alpha
47-
c.dimension = o.Dimension
48-
49-
c.d = make([][]float64, 0, 100)
63+
func (c *dbscanClusterer) IsOnline() bool {
64+
return false
65+
}
5066

67+
func (c *dbscanClusterer) WithOnline(o Online) HardClusterer {
5168
return c
5269
}
5370

@@ -59,7 +76,7 @@ func (c *dbscanClusterer) Learn(data [][]float64) error {
5976
c.mu.Lock()
6077

6178
c.l = len(data)
62-
c.s = numGoroutines(c.l)
79+
c.s = numWorkers(c.l)
6380
c.o = c.s - 1
6481
c.f = c.l / c.s
6582

@@ -70,9 +87,17 @@ func (c *dbscanClusterer) Learn(data [][]float64) error {
7087
c.a = make([]int, c.l)
7188
c.b = make([]int, 0)
7289

90+
c.startWorkers()
91+
7392
c.run()
7493

94+
c.endWorkers()
95+
7596
c.v = nil
97+
c.m = nil
98+
c.w = nil
99+
c.p = nil
100+
c.r = nil
76101

77102
c.mu.Unlock()
78103

@@ -111,30 +136,10 @@ func (c *dbscanClusterer) Predict(p []float64) int {
111136
}
112137

113138
func (c *dbscanClusterer) Online(observations chan []float64, done chan struct{}) chan *HCEvent {
114-
c.mu.Lock()
115-
116-
var (
117-
r chan *HCEvent = make(chan *HCEvent)
118-
)
119-
120-
go func() {
121-
for {
122-
select {
123-
case o := <-observations:
124-
c.d = append(c.d, o)
125-
case <-done:
126-
go func() {
127-
c.mu.Unlock()
128-
}()
129-
130-
return
131-
}
132-
}
133-
}()
134-
135-
return r
139+
return nil
136140
}
137141

142+
// private
138143
func (c *dbscanClusterer) run() {
139144
var (
140145
n, m, l, k = 1, 0, 0, 0
@@ -148,7 +153,7 @@ func (c *dbscanClusterer) run() {
148153

149154
c.v[i] = true
150155

151-
c.nearest(i, &l, &ns)
156+
c.nearest(&i, &l, &ns)
152157

153158
if l < c.minpts {
154159
c.a[i] = -1
@@ -162,7 +167,7 @@ func (c *dbscanClusterer) run() {
162167
if !c.v[ns[j]] {
163168
c.v[ns[j]] = true
164169

165-
c.nearest(ns[j], &k, &nss)
170+
c.nearest(&ns[j], &k, &nss)
166171

167172
if k >= c.minpts {
168173
l += k
@@ -182,18 +187,21 @@ func (c *dbscanClusterer) run() {
182187
}
183188
}
184189

185-
func (c *dbscanClusterer) nearest(p int, l *int, r *[]int) {
186-
var (
187-
m sync.Mutex
188-
w sync.WaitGroup
189-
190-
b int
191-
v []float64 = c.d[p]
192-
)
190+
/* Divide work among c.s workers, where c.s is determined
191+
* by the size of the data. This is based on an assumption that neighbour points of p
192+
* are located in relatively small subsection of the input data, so the dataset can be scanned
193+
* concurrently without blocking a big number of goroutines trying to write to r */
194+
func (c *dbscanClusterer) nearest(p *int, l *int, r *[]int) {
195+
var b int
193196

194197
*r = (*r)[:0]
195198

196-
w.Add(c.s)
199+
c.m = &sync.Mutex{}
200+
c.w = &sync.WaitGroup{}
201+
c.p = &c.d[*p]
202+
c.r = r
203+
204+
c.w.Add(c.s)
197205

198206
for i := 0; i < c.s; i++ {
199207
if i == c.o {
@@ -202,25 +210,44 @@ func (c *dbscanClusterer) nearest(p int, l *int, r *[]int) {
202210
b = (i + 1) * c.f
203211
}
204212

205-
go func(a, b int) {
206-
for j := a; j < b; j++ {
207-
if c.distance(v, c.d[j]) < c.eps {
208-
m.Lock()
209-
*r = append(*r, j)
210-
m.Unlock()
211-
}
212-
}
213-
214-
w.Done()
215-
}(i*c.f, b)
213+
c.j <- &nearestJob{
214+
a: i * c.f,
215+
b: b,
216+
}
216217
}
217218

218-
w.Wait()
219+
c.w.Wait()
219220

220221
*l = len(*r)
221222
}
222223

223-
func numGoroutines(a int) int {
224+
func (c *dbscanClusterer) startWorkers() {
225+
c.j = make(chan *nearestJob, c.s)
226+
227+
for i := 0; i < c.s; i++ {
228+
go c.nearestWorker()
229+
}
230+
}
231+
232+
func (c *dbscanClusterer) endWorkers() {
233+
close(c.j)
234+
}
235+
236+
func (c *dbscanClusterer) nearestWorker() {
237+
for j := range c.j {
238+
for i := j.a; i < j.b; i++ {
239+
if c.distance(*c.p, c.d[i]) < c.eps {
240+
c.m.Lock()
241+
*c.r = append(*c.r, i)
242+
c.m.Unlock()
243+
}
244+
}
245+
246+
c.w.Done()
247+
}
248+
}
249+
250+
func numWorkers(a int) int {
224251
if a < 1000 {
225252
return 1
226253
} else if a < 10000 {
@@ -229,11 +256,7 @@ func numGoroutines(a int) int {
229256
return 100
230257
} else if a < 1000000 {
231258
return 1000
232-
} else if a < 10000000 {
233-
return 10000
234-
} else if a < 100000000 {
235-
return 100000
236259
} else {
237-
return 1000000
260+
return 10000
238261
}
239262
}

errors.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,6 @@ var (
77
ErrNotTrained = errors.New("You need to train the algorithm first")
88
ErrZeroIterations = errors.New("Number of iterations cannot be less than 1")
99
ErrOneCluster = errors.New("Number of clusters cannot be less than 2")
10+
ErrZeroEpsilon = errors.New("Epsilon cannot be 0")
11+
ErrZeroMinpts = errors.New("MinPts cannot be 0")
1012
)

kmeans.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const (
1313
CHANGES_THRESHOLD = 2
1414
)
1515

16+
/* Implementation of k-means++ algorithm with online learning */
1617
type kmeansClusterer struct {
1718
iterations int
1819
number int
@@ -61,6 +62,10 @@ func KmeansClusterer(iterations, clusters int, distance DistanceFunc) (HardClust
6162
}, nil
6263
}
6364

65+
func (c *kmeansClusterer) IsOnline() bool {
66+
return true
67+
}
68+
6469
func (c *kmeansClusterer) WithOnline(o Online) HardClusterer {
6570
c.alpha = o.Alpha
6671
c.dimension = o.Dimension

0 commit comments

Comments
 (0)