diff --git a/indexer/indexer.go b/indexer/indexer.go index 616d450e..bc622739 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -3,7 +3,6 @@ package indexer import ( "context" "fmt" - "log" "time" "api.audius.co/config" @@ -52,6 +51,7 @@ func NewIndexer(config config.Config) *CoreIndexer { func (ci *CoreIndexer) Start(ctx context.Context) error { sdk := sdk.NewAudiusdSDK(ci.Config.AudiusdURL) + go logging.SyncOnTicks(ctx, ci.logger, time.Second*10) var height int64 err := ci.pool.QueryRow(context.Background(), `select last_checkpoint from indexing_checkpoints where tablename = $1`, CoreIndexerCheckpointName).Scan(&height) @@ -76,14 +76,15 @@ func (ci *CoreIndexer) Start(ctx context.Context) error { select { case <-ctx.Done(): ci.logger.Info("Shutting down core indexer") - return nil + return ctx.Err() default: } block, err := sdk.Core.GetBlock(context.Background(), connect.NewRequest(&corev1.GetBlockRequest{ Height: height, })) if err != nil { - log.Fatal(err) + ci.logger.Error("failed to get block", zap.Error(err)) + return err } if block.Msg.Block.Height < 0 { @@ -93,7 +94,8 @@ func (ci *CoreIndexer) Start(ctx context.Context) error { err = ci.handleBlock(block.Msg.Block) if err != nil { - log.Fatal(err) + ci.logger.Error("failed to handle block", zap.Error(err)) + return err } height++ @@ -156,4 +158,5 @@ func (ci *CoreIndexer) handleManageEntity(dbTx dbv1.DBTX, logger *zap.Logger, em func (ci *CoreIndexer) Close() { ci.pool.Close() + ci.logger.Sync() }