Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package indexer
import (
"context"
"fmt"
"log"
"time"

"api.audius.co/config"
Expand Down Expand Up @@ -52,6 +51,7 @@ func NewIndexer(config config.Config) *CoreIndexer {

func (ci *CoreIndexer) Start(ctx context.Context) error {
sdk := sdk.NewAudiusdSDK(ci.Config.AudiusdURL)
go logging.SyncOnTicks(ctx, ci.logger, time.Second*10)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rickyrombo didn't i say something about the caller forgetting to do this 😤

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To balance the argument: I like the fiber model where it flushes at the end of the request using a middleware.
For indexing I think I may update this to flush after each block so we don't need to run a ticker at all.

But I do agree that this bit me and kept biting me and I didn't notice until I went to find the logs and found only bite marks.

@rickyrombo rickyrombo Oct 16, 2025

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i feel like syncing on ticks is an antipattern that we're abusing in both indexers... but maybe it should be opt-out instead of opt-in tho?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What makes it an anti-pattern? I think there is probably value in flushing on a ticker if the code that's generating the logs may take a long time to reach whatever point it's flushing at. And it feels difficult to pick the right spot in forever loop that's processing events without just sprinkling calls to sync() in a bunch of places and hoping you don't miss somewhere important.
It also feels like it mimics the sidecar behavior you would expect elsewhere. I was surprised to learn that I had to manually flush these logs...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not an anti pattern but just kinda like a lazy workaround... Like, you wuldn't flushes SQL statements on a ticker. But it's logs so w/e


var height int64
err := ci.pool.QueryRow(context.Background(), `select last_checkpoint from indexing_checkpoints where tablename = $1`, CoreIndexerCheckpointName).Scan(&height)
Expand All @@ -76,14 +76,15 @@ func (ci *CoreIndexer) Start(ctx context.Context) error {
select {
case <-ctx.Done():
ci.logger.Info("Shutting down core indexer")
return nil
return ctx.Err()
default:
}
block, err := sdk.Core.GetBlock(context.Background(), connect.NewRequest(&corev1.GetBlockRequest{
Height: height,
}))
if err != nil {
log.Fatal(err)
ci.logger.Error("failed to get block", zap.Error(err))
return err
}

if block.Msg.Block.Height < 0 {
Expand All @@ -93,7 +94,8 @@ func (ci *CoreIndexer) Start(ctx context.Context) error {

err = ci.handleBlock(block.Msg.Block)
if err != nil {
log.Fatal(err)
ci.logger.Error("failed to handle block", zap.Error(err))
return err
}

height++
Expand Down Expand Up @@ -156,4 +158,5 @@ func (ci *CoreIndexer) handleManageEntity(dbTx dbv1.DBTX, logger *zap.Logger, em

func (ci *CoreIndexer) Close() {
ci.pool.Close()
ci.logger.Sync()
}
Loading