11package clusters
22
33import (
4- "fmt"
54 "math"
65 "sync"
76)
@@ -11,8 +10,8 @@ type steepDownArea struct {
1110 mib float64
1211}
1312
14- type clusterBounds struct {
15- start , end int
13+ type clusterJob struct {
14+ a , b , n int
1615}
1716
1817type opticsClusterer struct {
@@ -28,9 +27,10 @@ type opticsClusterer struct {
2827 mu sync.RWMutex
2928 a , b []int
3029
31- // variables used for concurrent computation of nearest neighbours
30+ // variables used for concurrent computation of nearest neighbours and producing final mapping
3231 l , s , o , f int
3332 j chan * rangeJob
33+ c chan * clusterJob
3434 m * sync.Mutex
3535 w * sync.WaitGroup
3636 p * []float64
@@ -114,21 +114,21 @@ func (c *opticsClusterer) Learn(data [][]float64) error {
114114 c .a = make ([]int , c .l )
115115 c .b = make ([]int , 0 )
116116
117- c .startWorkers ()
117+ c .startNearestWorkers ()
118118
119119 c .run ()
120120
121- c .endWorkers ()
122-
123- fmt .Printf ("Done running\n " )
121+ c .endNearestWorkers ()
124122
125123 c .v = nil
126124 c .p = nil
127125 c .r = nil
128126
127+ c .startClusterWorkers ()
128+
129129 c .extract ()
130130
131- fmt . Printf ( "Done extracting \n " )
131+ c . endClusterWorkers ( )
132132
133133 c .re = nil
134134 c .so = nil
@@ -254,10 +254,10 @@ func (c *opticsClusterer) update(p int, d float64, l *int, r *[]int, q *priority
254254
255255func (c * opticsClusterer ) extract () {
256256 var (
257- i , e , us , ue , cs , ce , s int
258- mib , d float64
259- areas []* steepDownArea = make ([]* steepDownArea , 0 )
260- clusters map [int ]* clusterBounds = make (map [int ]* clusterBounds )
257+ i , e , us , ue , cs , ce , s , k int
258+ mib , d float64
259+ areas []* steepDownArea = make ([]* steepDownArea , 0 )
260+ clusters map [int ]bool = make (map [int ]bool )
261261 )
262262
263263 for i < c .l - 1 {
@@ -345,17 +345,28 @@ func (c *opticsClusterer) extract() {
345345
346346 s = cs + ce
347347
348- if _ , ok := clusters [s ]; ! ok {
349- clusters [s ] = & clusterBounds {
350- start : cs ,
351- end : ce ,
348+ if ! clusters [s ] {
349+ clusters [s ] = true
350+
351+ c .b [k ] = ce - cs
352+
353+ c .w .Add (1 )
354+
355+ c .c <- & clusterJob {
356+ a : cs ,
357+ b : ce ,
358+ n : k ,
352359 }
360+
361+ k ++
353362 }
354363 }
355364 } else {
356365 i ++
357366 }
358367 }
368+
369+ c .w .Wait ()
359370}
360371
361372func (c * opticsClusterer ) isSteepDown (i int , e * int ) bool {
@@ -420,6 +431,34 @@ func (c *opticsClusterer) isSteepUp(i int, e *int) bool {
420431 return * e != i + 1
421432}
422433
434+ func (c * opticsClusterer ) startClusterWorkers () {
435+ c .c = make (chan * clusterJob , c .l )
436+
437+ c .w = & sync.WaitGroup {}
438+
439+ for i := 0 ; i < c .s ; i ++ {
440+ go c .clusterWorker ()
441+ }
442+ }
443+
444+ func (c * opticsClusterer ) endClusterWorkers () {
445+ close (c .c )
446+
447+ c .c = nil
448+
449+ c .w = nil
450+ }
451+
452+ func (c * opticsClusterer ) clusterWorker () {
453+ for j := range c .c {
454+ for i := j .a ; i < j .b ; i ++ {
455+ c .a [i ] = j .n
456+ }
457+
458+ c .w .Done ()
459+ }
460+ }
461+
423462/* Divide work among c.s workers, where c.s is determined
424463 * by the size of the data. This is based on an assumption that neighbour points of p
425464 * are located in relatively small subsection of the input data, so the dataset can be scanned
@@ -452,7 +491,7 @@ func (c *opticsClusterer) nearest(p int, l *int, r *[]int) {
452491 * l = len (* r )
453492}
454493
455- func (c * opticsClusterer ) startWorkers () {
494+ func (c * opticsClusterer ) startNearestWorkers () {
456495 c .j = make (chan * rangeJob , c .l )
457496
458497 c .m = & sync.Mutex {}
@@ -463,7 +502,7 @@ func (c *opticsClusterer) startWorkers() {
463502 }
464503}
465504
466- func (c * opticsClusterer ) endWorkers () {
505+ func (c * opticsClusterer ) endNearestWorkers () {
467506 close (c .j )
468507
469508 c .j = nil
0 commit comments