@@ -62,27 +62,28 @@ func (r *reducer) run() error {
6262
6363 if len (vectorIndexSpecs ) > 0 {
6464 fmt .Printf ("Creating shared vector database for %d vector predicate(s)\n " , len (vectorIndexSpecs ))
65+ // Track which predicates belong to which output shard
66+ predToOutputShard = make (map [string ]int )
6567
66- // Create single shared vectorTmpDb
6768 sharedVectorDb = r .createVectorTmpBadger ()
6869
69- // Initialize posting and schema ONCE (avoids race condition!)
7070 posting .Init (sharedVectorDb , 0 , false )
7171 schema .Init (sharedVectorDb )
7272 for pred , sch := range r .schema .schemaMap {
73+ _ , ok := vectorIndexSpecs [pred ]
74+ if ! ok {
75+ continue
76+ }
7377 schema .State ().Set (pred , sch )
7478 }
75-
76- // Track which predicates belong to which output shard
77- predToOutputShard = make (map [string ]int )
7879 }
7980
8081 thr := y .NewThrottle (r .opt .NumReducers )
8182 for i := range r .opt .ReduceShards {
8283 if err := thr .Do (); err != nil {
8384 return err
8485 }
85- go func (shardId int , db * badger.DB , tmpDb * badger.DB ) {
86+ go func (shardId int , db * badger.DB , tmpDb * badger.DB , vectorTmpDb * badger. DB ) {
8687 defer thr .Done (nil )
8788
8889 mapFiles := filenamesInTree (dirs [shardId ])
@@ -117,10 +118,10 @@ func (r *reducer) run() error {
117118
118119 // Create vector indexer using shared DB (if vectors exist)
119120 var vi * vectorIndexer
120- if sharedVectorDb != nil && len (vectorIndexSpecs ) > 0 {
121+ if vectorTmpDb != nil && len (vectorIndexSpecs ) > 0 {
121122 fmt .Printf ("Initializing vector indexer for shard %d with %d predicate(s)\n " ,
122123 shardId , len (vectorIndexSpecs ))
123- vi = newVectorIndexerShared (r , sharedVectorDb , vectorIndexSpecs ,
124+ vi = newVectorIndexerShared (r , vectorTmpDb , vectorIndexSpecs ,
124125 shardId , & predToShardMu , predToOutputShard )
125126 }
126127
@@ -149,7 +150,7 @@ func (r *reducer) run() error {
149150 fmt .Printf ("Error while closing iterator: %v" , err )
150151 }
151152 }
152- }(i , r .createBadger (i ), r .createTmpBadger ())
153+ }(i , r .createBadger (i ), r .createTmpBadger (), sharedVectorDb )
153154 }
154155 if err := thr .Finish (); err != nil {
155156 return err
@@ -294,8 +295,7 @@ func newMapIterator(filename string) (*pb.MapHeader, *mapIterator) {
294295type encodeRequest struct {
295296 cbuf * z.Buffer
296297 countBuf * z.Buffer
297- vectorBuf * z.Buffer // Buffer for vector entries to be indexed
298- vi * vectorIndexer // Vector indexer for routing vector predicates to tmpDb
298+ vectorBuf * z.Buffer // Buffer for vector entries to be indexed
299299 wg * sync.WaitGroup
300300 listCh chan * z.Buffer
301301 splitCh chan * bpb.KVList
@@ -318,11 +318,11 @@ func (r *reducer) streamIdFor(pred string) uint32 {
318318 return streamId
319319}
320320
321- func (r * reducer ) encode (entryCh chan * encodeRequest , closer * z.Closer ) {
321+ func (r * reducer ) encode (entryCh chan * encodeRequest , vi * vectorIndexer , closer * z.Closer ) {
322322 defer closer .Done ()
323323
324324 for req := range entryCh {
325- r .toList (req )
325+ r .toList (req , vi )
326326 req .wg .Done ()
327327 }
328328}
@@ -470,6 +470,9 @@ func (r *reducer) startWriting(ci *countIndexer, vi *vectorIndexer, writerCh cha
470470
471471 count (req )
472472 if vi != nil {
473+ if err := vi .flushWriteBatch (); err != nil {
474+ glog .Errorf ("Error flushing vector write batch before HNSW insertion: %v" , err )
475+ }
473476 vector (req )
474477 }
475478 }
@@ -646,7 +649,7 @@ func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *cou
646649 for range cpu {
647650 // Start listening to encode entries
648651 // For time being let's lease 100 stream id for each encoder.
649- go r .encode (encoderCh , encoderCloser )
652+ go r .encode (encoderCh , vi , encoderCloser )
650653 }
651654 // Start listening to write the badger list.
652655 writerCloser := z .NewCloser (1 )
@@ -661,7 +664,6 @@ func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *cou
661664 listCh : make (chan * z.Buffer , 3 ),
662665 splitCh : ci .splitCh ,
663666 countBuf : getBuf (r .opt .TmpDir ),
664- vi : vi ,
665667 }
666668 // Only allocate vectorBuf when we have vector predicates to index
667669 if vi != nil {
@@ -733,7 +735,7 @@ func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *cou
733735 writerCloser .SignalAndWait ()
734736}
735737
736- func (r * reducer ) toList (req * encodeRequest ) {
738+ func (r * reducer ) toList (req * encodeRequest , vi * vectorIndexer ) {
737739 cbuf := req .cbuf
738740 defer func () {
739741 atomic .AddInt64 (& r .prog .numEncoding , - int64 (cbuf .LenNoPadding ()))
@@ -888,8 +890,8 @@ func (r *reducer) toList(req *encodeRequest) {
888890 }
889891 }
890892
891- // Check if this is a vector predicate that should be routed to tmpDb
892- isVectorPred := req . vi != nil && pk .IsData () && req . vi .isVectorPredicate (pk .Attr )
893+ // Check if this is a vector predicate that should be routed to vectorTmpDb
894+ isVectorPred := vi != nil && pk .IsData () && vi .isVectorPredicate (pk .Attr )
893895
894896 shouldSplit := proto .Size (pl ) > (1 << 20 )/ 2 && len (pl .Pack .Blocks ) > 1
895897 if shouldSplit {
@@ -908,7 +910,7 @@ func (r *reducer) toList(req *encodeRequest) {
908910 // Vector predicates go to vectorTmpDb
909911 for _ , kv := range kvs {
910912 kv .Version = writeVersionTs
911- if err := req . vi .writeVectorKV (kv ); err != nil {
913+ if err := vi .writeVectorKV (kv ); err != nil {
912914 glog .Errorf ("Error writing vector posting to tmpDb: %v" , err )
913915 }
914916 }
@@ -931,7 +933,7 @@ func (r *reducer) toList(req *encodeRequest) {
931933
932934 if isVectorPred {
933935 // Vector predicates go to vectorTmpDb
934- if err := req . vi .writeVectorKV (kv ); err != nil {
936+ if err := vi .writeVectorKV (kv ); err != nil {
935937 glog .Errorf ("Error writing vector posting to tmpDb: %v" , err )
936938 }
937939 } else {
0 commit comments