@@ -68,6 +68,10 @@ func indexTokens(ctx context.Context, info *indexMutationInfo) ([]string, error)
6868
6969 var tokens []string
7070 for _ , it := range info .tokenizers {
71+ // BM25 tokenizer is handled separately in addBM25IndexMutations.
72+ if it .Identifier () == tok .IdentBM25 {
73+ continue
74+ }
7175 toks , err := tok .BuildTokens (sv .Value , tok .GetTokenizerForLang (it , lang ))
7276 if err != nil {
7377 return tokens , err
@@ -179,6 +183,17 @@ func (txn *Txn) addIndexMutations(ctx context.Context, info *indexMutationInfo)
179183 }
180184 }
181185
186+ // Check if any tokenizer is BM25 and handle separately.
187+ for _ , it := range info .tokenizers {
188+ if _ , ok := tok .GetTokenizerForLang (it , info .edge .GetLang ()).(tok.BM25Tokenizer ); ok {
189+ if err := txn .addBM25IndexMutations (ctx , info ); err != nil {
190+ return []* pb.DirectedEdge {}, err
191+ }
192+ // Continue to process remaining non-BM25 tokenizers below.
193+ continue
194+ }
195+ }
196+
182197 tokens , err := indexTokens (ctx , info )
183198 if err != nil {
184199 // This data is not indexable
@@ -215,6 +230,174 @@ func (txn *Txn) addIndexMutation(ctx context.Context, edge *pb.DirectedEdge, tok
215230 return nil
216231}
217232
233+ // addBM25IndexMutations handles index mutations for the BM25 tokenizer.
234+ // It stores term frequencies, document lengths, and corpus statistics.
235+ func (txn * Txn ) addBM25IndexMutations (ctx context.Context , info * indexMutationInfo ) error {
236+ attr := info .edge .Attr
237+ uid := info .edge .Entity
238+ lang := info .edge .GetLang ()
239+
240+ schemaType , err := schema .State ().TypeOf (attr )
241+ if err != nil || ! schemaType .IsScalar () {
242+ return errors .Errorf ("Cannot BM25 index attribute %s of type object." , attr )
243+ }
244+
245+ sv , err := types .Convert (info .val , schemaType )
246+ if err != nil {
247+ return err
248+ }
249+
250+ bm25Tok := tok.BM25Tokenizer {}
251+ termFreqs , docLen , err := bm25Tok .TokensWithFrequency (sv .Value , lang )
252+ if err != nil {
253+ return err
254+ }
255+
256+ // Skip documents that tokenize to zero terms (e.g., all stopwords).
257+ if docLen == 0 {
258+ return nil
259+ }
260+
261+ if info .op == pb .DirectedEdge_DEL {
262+ // For DELETE: remove uid from all BM25 term posting lists, doc length list,
263+ // and decrement corpus stats.
264+ for term := range termFreqs {
265+ encodedTerm := string ([]byte {tok .IdentBM25 }) + term
266+ key := x .BM25IndexKey (attr , encodedTerm )
267+ plist , err := txn .cache .GetFromDelta (key )
268+ if err != nil {
269+ return err
270+ }
271+ edge := & pb.DirectedEdge {
272+ ValueId : uid ,
273+ Attr : attr ,
274+ Op : pb .DirectedEdge_DEL ,
275+ }
276+ if err := plist .addMutation (ctx , txn , edge ); err != nil {
277+ return err
278+ }
279+ }
280+ // Remove doc length entry.
281+ dlKey := x .BM25DocLenKey (attr )
282+ dlPlist , err := txn .cache .GetFromDelta (dlKey )
283+ if err != nil {
284+ return err
285+ }
286+ dlEdge := & pb.DirectedEdge {
287+ ValueId : uid ,
288+ Attr : attr ,
289+ Op : pb .DirectedEdge_DEL ,
290+ }
291+ if err := dlPlist .addMutation (ctx , txn , dlEdge ); err != nil {
292+ return err
293+ }
294+
295+ // Update corpus stats: decrement doc count and total terms.
296+ return txn .updateBM25Stats (ctx , attr , - 1 , - int64 (docLen ))
297+ }
298+
299+ // For SET: store term frequencies, doc length, and update corpus stats.
300+ for term , tf := range termFreqs {
301+ encodedTerm := string ([]byte {tok .IdentBM25 }) + term
302+ key := x .BM25IndexKey (attr , encodedTerm )
303+ plist , err := txn .cache .GetFromDelta (key )
304+ if err != nil {
305+ return err
306+ }
307+ // Store uid in the posting list. The TF is encoded in the Value field.
308+ tfBuf := make ([]byte , 4 )
309+ binary .BigEndian .PutUint32 (tfBuf , tf )
310+ edge := & pb.DirectedEdge {
311+ ValueId : uid ,
312+ Attr : attr ,
313+ Value : tfBuf ,
314+ ValueType : pb .Posting_INT ,
315+ Op : pb .DirectedEdge_SET ,
316+ }
317+ if err := plist .addMutation (ctx , txn , edge ); err != nil {
318+ return err
319+ }
320+ }
321+
322+ // Store document length.
323+ dlKey := x .BM25DocLenKey (attr )
324+ dlPlist , err := txn .cache .GetFromDelta (dlKey )
325+ if err != nil {
326+ return err
327+ }
328+ dlBuf := make ([]byte , 4 )
329+ binary .BigEndian .PutUint32 (dlBuf , docLen )
330+ dlEdge := & pb.DirectedEdge {
331+ ValueId : uid ,
332+ Attr : attr ,
333+ Value : dlBuf ,
334+ ValueType : pb .Posting_INT ,
335+ Op : pb .DirectedEdge_SET ,
336+ }
337+ if err := dlPlist .addMutation (ctx , txn , dlEdge ); err != nil {
338+ return err
339+ }
340+
341+ // Update corpus stats: increment doc count by 1 and total terms by docLen.
342+ return txn .updateBM25Stats (ctx , attr , 1 , int64 (docLen ))
343+ }
344+
345+ // updateBM25Stats reads the current corpus statistics for a BM25-indexed attribute,
346+ // applies the given deltas, and writes back.
347+ func (txn * Txn ) updateBM25Stats (ctx context.Context , attr string , docCountDelta int64 , totalTermsDelta int64 ) error {
348+ statsKey := x .BM25StatsKey (attr )
349+ plist , err := txn .cache .GetFromDelta (statsKey )
350+ if err != nil {
351+ return err
352+ }
353+
354+ // Read existing stats from posting with uid=1.
355+ var docCount , totalTerms uint64
356+ val , err := plist .Value (txn .StartTs )
357+ if err == nil && val .Value != nil {
358+ data , ok := val .Value .([]byte )
359+ if ok && len (data ) == 16 {
360+ docCount = binary .BigEndian .Uint64 (data [0 :8 ])
361+ totalTerms = binary .BigEndian .Uint64 (data [8 :16 ])
362+ }
363+ }
364+
365+ // Apply deltas.
366+ if docCountDelta >= 0 {
367+ docCount += uint64 (docCountDelta )
368+ } else {
369+ dec := uint64 (- docCountDelta )
370+ if dec > docCount {
371+ docCount = 0
372+ } else {
373+ docCount -= dec
374+ }
375+ }
376+ if totalTermsDelta >= 0 {
377+ totalTerms += uint64 (totalTermsDelta )
378+ } else {
379+ dec := uint64 (- totalTermsDelta )
380+ if dec > totalTerms {
381+ totalTerms = 0
382+ } else {
383+ totalTerms -= dec
384+ }
385+ }
386+
387+ // Write back stats.
388+ statsBuf := make ([]byte , 16 )
389+ binary .BigEndian .PutUint64 (statsBuf [0 :8 ], docCount )
390+ binary .BigEndian .PutUint64 (statsBuf [8 :16 ], totalTerms )
391+ edge := & pb.DirectedEdge {
392+ Entity : 1 ,
393+ Attr : attr ,
394+ Value : statsBuf ,
395+ ValueType : pb .Posting_ValType (0 ),
396+ Op : pb .DirectedEdge_SET ,
397+ }
398+ return plist .addMutation (ctx , txn , edge )
399+ }
400+
218401// countParams is sent to updateCount function. It is used to update the count index.
219402// It deletes the uid from the key corresponding to <attr, countBefore> and adds it
220403// to <attr, countAfter>.
0 commit comments