From b1a0f17b3a72dd5706578bf4049dc3ec6462ed55 Mon Sep 17 00:00:00 2001 From: Randy Schott <1815175+schottra@users.noreply.github.com> Date: Tue, 21 Oct 2025 11:55:24 -0400 Subject: [PATCH 01/10] add aggregates indexer and user score updater --- Makefile | 3 + indexer/aggregates_indexer.go | 53 +++++++ jobs/job_runner.go | 82 +++++++++++ jobs/update_aggregates.go | 252 ++++++++++++++++++++++++++++++++++ main.go | 18 +++ 5 files changed, 408 insertions(+) create mode 100644 indexer/aggregates_indexer.go create mode 100644 jobs/job_runner.go create mode 100644 jobs/update_aggregates.go diff --git a/Makefile b/Makefile index 62895f03..98453218 100644 --- a/Makefile +++ b/Makefile @@ -9,6 +9,9 @@ pretty:: indexer:: wgo run -file .go -debounce 10ms main.go indexer +aggregates-indexer:: + wgo run -file .go -debounce 10ms main.go aggregates-indexer + solana-indexer:: wgo run -file .go -debounce 10ms main.go solana-indexer diff --git a/indexer/aggregates_indexer.go b/indexer/aggregates_indexer.go new file mode 100644 index 00000000..53740b69 --- /dev/null +++ b/indexer/aggregates_indexer.go @@ -0,0 +1,53 @@ +package indexer + +import ( + "context" + "time" + + dbv1 "api.audius.co/api/dbv1" + "api.audius.co/config" + "api.audius.co/database" + "api.audius.co/jobs" + "api.audius.co/logging" + "github.com/jackc/pgx/v5/pgxpool" +) + +type AggregatesIndexer struct { + readPool *dbv1.DBPools + writePool database.DbPool + updateAggregatesJob *jobs.UpdateAggregatesJob +} + +func NewAggregatesIndexer(config config.Config) *AggregatesIndexer { + logger := logging.NewZapLogger(config).Named("AggregatesIndexer") + readPool, err := dbv1.NewDBPools([]string{config.ReadDbUrl}, logger, config.Env, config.ZapLevel) + if err != nil { + panic(err) + } + writePool, err := pgxpool.New(context.Background(), config.WriteDbUrl) + if err != nil { + panic(err) + } + + return &AggregatesIndexer{ + readPool: readPool, + writePool: writePool, + updateAggregatesJob: jobs.NewUpdateAggregatesJob(config, writePool, readPool), + } +} + +func (a *AggregatesIndexer) Start(ctx context.Context) error { + // try to run every 5 minutes. Job may take longer than 5 minutes to complete + // and result in every 10 minutes. + a.updateAggregatesJob.ScheduleEvery(ctx, 5*time.Minute) + go a.updateAggregatesJob.Run(ctx) + + // wait on context to be cancelled + <-ctx.Done() + return ctx.Err() +} + +func (a *AggregatesIndexer) Close() { + a.readPool.Close() + a.writePool.Close() +} diff --git a/jobs/job_runner.go b/jobs/job_runner.go new file mode 100644 index 00000000..dc611fec --- /dev/null +++ b/jobs/job_runner.go @@ -0,0 +1,82 @@ +package jobs + +import ( + "context" + "fmt" + "sync" + "time" + + "api.audius.co/config" + "api.audius.co/logging" + "go.uber.org/zap" +) + +type JobRunner interface { + Execute(ctx context.Context, logger *zap.Logger) error +} + +type BaseJob struct { + logger *zap.Logger + runner JobRunner + + mutex sync.Mutex + isRunning bool +} + +type BaseJobConfig struct { + config config.Config + jobName string + runner JobRunner +} + +func NewBaseJob(cfg BaseJobConfig) *BaseJob { + logger := logging.NewZapLogger(cfg.config).Named(cfg.jobName) + return &BaseJob{ + logger: logger, + runner: cfg.runner, + } +} + +// Run executes the job once +func (j *BaseJob) Run(ctx context.Context) error { + if err := j.run(ctx); err != nil { + j.logger.Error("Job run failed", zap.Error(err)) + return err + } + j.logger.Info("Job completed successfully") + return nil +} + +func (j *BaseJob) run(ctx context.Context) error { + j.mutex.Lock() + if j.isRunning { + j.mutex.Unlock() + return fmt.Errorf("job is already running") + } + j.isRunning = true + j.mutex.Unlock() + defer func() { + j.mutex.Lock() + j.isRunning = false + j.mutex.Unlock() + }() + + return j.runner.Execute(ctx, j.logger) +} + +// ScheduleEvery runs the job every `duration` until the context is cancelled. +func (j *BaseJob) ScheduleEvery(ctx context.Context, duration time.Duration) *BaseJob { + go func() { + ticker := time.NewTicker(duration) + defer ticker.Stop() + for { + select { + case <-ticker.C: + j.Run(ctx) + case <-ctx.Done(): + return + } + } + }() + return j +} diff --git a/jobs/update_aggregates.go b/jobs/update_aggregates.go new file mode 100644 index 00000000..70d21236 --- /dev/null +++ b/jobs/update_aggregates.go @@ -0,0 +1,252 @@ +package jobs + +import ( + "context" + "strings" + "time" + + dbv1 "api.audius.co/api/dbv1" + "api.audius.co/config" + "api.audius.co/database" + "github.com/jackc/pgx/v5" + "go.uber.org/zap" +) + +type UpdateAggregatesJobRunner struct { + writePool database.DbPool + readPool *dbv1.DBPools +} + +const ( + BatchSize = 10000 +) + +func (r *UpdateAggregatesJobRunner) Execute(ctx context.Context, logger *zap.Logger) error { + startTime := time.Now() + logger.Info("Starting user score update job") + + type UserScoreRecord struct { + UserID int32 + CreatedAt time.Time + Score int64 + } + + var lastUserID *int32 + var lastCreatedAt *time.Time + processedCount := 0 + scoreUpdatedCount := int64(0) + + for { + filters := []string{ + "u.is_current = TRUE", + "u.handle_lc IS NOT NULL", + } + if lastUserID != nil && lastCreatedAt != nil { + filters = append(filters, `((u.created_at, u.user_id) < (@cursorTime::timestamptz, @cursorUserId::int))`) + logger.Info("Processing batch", zap.String("lastCreatedAt", lastCreatedAt.Format(time.RFC3339)), zap.Int32("lastUserID", *lastUserID)) + } else { + logger.Info("Processing first batch") + } + + query := ` + WITH batch AS MATERIALIZED ( + SELECT u.user_id, u.created_at + FROM users u + WHERE ` + strings.Join(filters, " AND ") + ` + ORDER BY u.created_at DESC, u.user_id DESC + LIMIT @batchSize + ), + ids AS MATERIALIZED ( + SELECT array_agg(user_id) AS ids FROM batch + ), + + /* plays: split into two small per-user aggregates */ + hours_agg AS ( + SELECT h.user_id, COUNT(*)::bigint AS play_hours + FROM ( + SELECT DISTINCT p.user_id, date_trunc('hour', p.created_at) AS hr + FROM plays p, ids + WHERE p.user_id = ANY(ids.ids) + ) h + GROUP BY h.user_id + ), + tracks_agg AS ( + SELECT p.user_id, COUNT(DISTINCT p.play_item_id)::bigint AS distinct_tracks + FROM plays p, ids + WHERE p.user_id = ANY(ids.ids) + GROUP BY p.user_id + ), + + fast_challenge_completion AS ( + SELECT b.user_id, COUNT(*)::bigint AS challenge_count + FROM batch b + JOIN user_challenges uc + ON uc.user_id = b.user_id + AND uc.is_complete + AND uc.challenge_id NOT IN ('m','b') + AND uc.completed_at <= (b.created_at + interval '3 minutes') + GROUP BY b.user_id + ), + chat_blocks AS ( + SELECT b.user_id, COUNT(*)::bigint AS block_count + FROM batch b + JOIN chat_blocked_users c ON c.blockee_user_id = b.user_id + GROUP BY b.user_id + ), + followers_karma AS ( + SELECT b.user_id, + LEAST((SUM(fau.follower_count) / 100)::bigint, 100::bigint) AS karma_sum + FROM batch b + JOIN follows f + ON f.followee_user_id = b.user_id + AND f.is_delete = FALSE + JOIN aggregate_user fau + ON fau.user_id = f.follower_user_id + AND fau.following_count < 10000 + GROUP BY b.user_id + ), + + /* compute features for scoring */ + features AS ( + SELECT + u.user_id, + b.created_at, + COALESCE(h.play_hours, 0)::bigint AS play_count, -- distinct hours + COALESCE(t.distinct_tracks, 0)::bigint AS distinct_tracks_played, -- distinct tracks + COALESCE(c.challenge_count, 0)::bigint AS challenge_count, + COALESCE(au.following_count, 0)::bigint AS following_count, + COALESCE(au.follower_count, 0)::bigint AS follower_count, + COALESCE(cb.block_count, 0)::bigint AS chat_block_count, + ( ((u.handle_lc ILIKE '%audius%') OR (lower(u.name) ILIKE '%audius%')) + AND u.is_verified = FALSE ) AS is_audius_impersonator, + ( u.is_verified = FALSE + AND (u.handle_lc ILIKE '%airdrop%' OR lower(u.name) LIKE '%airdrop%') ) + AS has_badwords, + CASE + WHEN COALESCE(au.follower_count, 0) > 1000 THEN 100 + WHEN COALESCE(au.follower_count, 0) = 0 THEN 0 + ELSE COALESCE(k.karma_sum, 0) + END::bigint AS karma + FROM batch b + JOIN users u ON u.user_id = b.user_id + LEFT JOIN hours_agg h ON h.user_id = b.user_id + LEFT JOIN tracks_agg t ON t.user_id = b.user_id + LEFT JOIN fast_challenge_completion c ON c.user_id = b.user_id + LEFT JOIN chat_blocks cb ON cb.user_id = b.user_id + LEFT JOIN aggregate_user au ON au.user_id = b.user_id + LEFT JOIN followers_karma k ON k.user_id = b.user_id + ) + + SELECT + f.user_id, + f.created_at, + compute_user_score( + f.play_count, + f.follower_count, + f.challenge_count, + f.chat_block_count, + f.following_count, + f.is_audius_impersonator, + f.has_badwords, + f.distinct_tracks_played, + f.karma + ) AS score + FROM features f + ORDER BY f.created_at DESC, f.user_id DESC + ` + + readQueryStart := time.Now() + res, err := r.readPool.Query(ctx, query, pgx.NamedArgs{ + "batchSize": BatchSize, + "cursorTime": lastCreatedAt, + "cursorUserId": lastUserID, + }) + if err != nil { + logger.Error("Failed to execute batch read query", zap.Error(err)) + return err + } + readQueryDuration := time.Since(readQueryStart) + + userIDs := make([]int32, 0) + scores := make([]int64, 0) + seenUserIDs := make(map[int32]bool) + createdAt := time.Time{} + score := int64(0) + userID := int32(0) + fetchedRows := 0 + pgx.ForEachRow(res, []any{&userID, &createdAt, &score}, func() error { + fetchedRows++ + // We get dupes on some user_ids due to multiple is_current user rows. + // Upsert query will fail if we don't remove them. + if !seenUserIDs[userID] { + userIDs = append(userIDs, userID) + scores = append(scores, score) + seenUserIDs[userID] = true + } + return nil + }) + res.Close() + + lastCreatedAt = &createdAt + lastUserID = &userID + + writeQueryStart := time.Now() + tag, err := r.writePool.Exec(ctx, ` + WITH s AS ( + SELECT * FROM unnest($1::bigint[], $2::double precision[]) AS t(user_id, score) + ) + INSERT INTO aggregate_user (user_id, score) + SELECT s.user_id, s.score + FROM s + ON CONFLICT (user_id) + DO UPDATE SET + score = EXCLUDED.score + WHERE aggregate_user.score IS DISTINCT FROM EXCLUDED.score + `, userIDs, scores) + writeQueryDuration := time.Since(writeQueryStart) + if err != nil { + logger.Error("Failed to execute update query", zap.Error(err)) + return err + } + + processedCount += fetchedRows + scoreUpdatedCount += tag.RowsAffected() + logger.Info("Processed batch", + zap.Int("batch_size", fetchedRows), + zap.Int32("last_user_id", userID), + zap.String("last_created_at", lastCreatedAt.Format(time.RFC3339)), + zap.Int("total_processed", processedCount), + zap.Int64("total_scores_changes", tag.RowsAffected()), + zap.Duration("read_query_duration", readQueryDuration), + zap.Duration("write_query_duration", writeQueryDuration)) + + if fetchedRows < BatchSize { + logger.Info("Finished processing all users", zap.Int("total_processed", processedCount), zap.Int64("total_score_changes", scoreUpdatedCount), zap.Duration("duration", time.Since(startTime))) + break + } + } + + return nil +} + +type UpdateAggregatesJob struct { + *BaseJob +} + +func NewUpdateAggregatesJob(config config.Config, writePool database.DbPool, readPool *dbv1.DBPools) *UpdateAggregatesJob { + runner := &UpdateAggregatesJobRunner{writePool: writePool, readPool: readPool} + baseJob := NewBaseJob(BaseJobConfig{ + config: config, + jobName: "UpdateAggregatesJob", + runner: runner, + }) + + return &UpdateAggregatesJob{ + BaseJob: baseJob, + } +} + +func (j *UpdateAggregatesJob) ScheduleEvery(ctx context.Context, duration time.Duration) *UpdateAggregatesJob { + j.BaseJob.ScheduleEvery(ctx, duration) + return j +} diff --git a/main.go b/main.go index b8560362..a35c8fc3 100644 --- a/main.go +++ b/main.go @@ -37,6 +37,24 @@ func main() { as := api.NewApiServer(config.Cfg) as.Serve() } + case "aggregates-indexer": + { + fmt.Println("Running aggregates-indexer...") + + aggregatesIndexer := core_indexer.NewAggregatesIndexer(config.Cfg) + + defer aggregatesIndexer.Close() + + // Capture termination signals for graceful shutdown of the indexer + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGINT, os.Interrupt) + defer stop() + + if err := aggregatesIndexer.Start(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + panic(err) + } + } + } case "indexer": { fmt.Println("Running indexer...") From 1dab5943b152e8436f71be477f18b0c26e4553a6 Mon Sep 17 00:00:00 2001 From: Randy Schott <1815175+schottra@users.noreply.github.com> Date: Tue, 21 Oct 2025 16:02:11 -0400 Subject: [PATCH 02/10] update batch size --- jobs/update_aggregates.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/jobs/update_aggregates.go b/jobs/update_aggregates.go index 70d21236..377fbc63 100644 --- a/jobs/update_aggregates.go +++ b/jobs/update_aggregates.go @@ -18,7 +18,7 @@ type UpdateAggregatesJobRunner struct { } const ( - BatchSize = 10000 + UpdateUserScoresBatchSize = 3000 ) func (r *UpdateAggregatesJobRunner) Execute(ctx context.Context, logger *zap.Logger) error { @@ -157,7 +157,7 @@ func (r *UpdateAggregatesJobRunner) Execute(ctx context.Context, logger *zap.Log readQueryStart := time.Now() res, err := r.readPool.Query(ctx, query, pgx.NamedArgs{ - "batchSize": BatchSize, + "batchSize": UpdateUserScoresBatchSize, "cursorTime": lastCreatedAt, "cursorUserId": lastUserID, }) @@ -220,7 +220,7 @@ func (r *UpdateAggregatesJobRunner) Execute(ctx context.Context, logger *zap.Log zap.Duration("read_query_duration", readQueryDuration), zap.Duration("write_query_duration", writeQueryDuration)) - if fetchedRows < BatchSize { + if fetchedRows < UpdateUserScoresBatchSize { logger.Info("Finished processing all users", zap.Int("total_processed", processedCount), zap.Int64("total_score_changes", scoreUpdatedCount), zap.Duration("duration", time.Since(startTime))) break } From 511d13a0fa5a1c005e36c6c2d4a565bef3e61457 Mon Sep 17 00:00:00 2001 From: Randy Schott <1815175+schottra@users.noreply.github.com> Date: Fri, 24 Oct 2025 12:07:02 -0400 Subject: [PATCH 03/10] reorganize to make aggregates part of core indexer --- indexer/aggregates_indexer.go | 33 +++++++++----- indexer/indexer.go | 29 ++++++++++--- jobs/job_runner.go | 82 ----------------------------------- jobs/update_aggregates.go | 71 +++++++++++++++--------------- main.go | 18 -------- 5 files changed, 81 insertions(+), 152 deletions(-) delete mode 100644 jobs/job_runner.go diff --git a/indexer/aggregates_indexer.go b/indexer/aggregates_indexer.go index 53740b69..7ca03e18 100644 --- a/indexer/aggregates_indexer.go +++ b/indexer/aggregates_indexer.go @@ -2,7 +2,6 @@ package indexer import ( "context" - "time" dbv1 "api.audius.co/api/dbv1" "api.audius.co/config" @@ -10,9 +9,11 @@ import ( "api.audius.co/jobs" "api.audius.co/logging" "github.com/jackc/pgx/v5/pgxpool" + "go.uber.org/zap" ) type AggregatesIndexer struct { + logger *zap.Logger readPool *dbv1.DBPools writePool database.DbPool updateAggregatesJob *jobs.UpdateAggregatesJob @@ -30,21 +31,29 @@ func NewAggregatesIndexer(config config.Config) *AggregatesIndexer { } return &AggregatesIndexer{ - readPool: readPool, - writePool: writePool, - updateAggregatesJob: jobs.NewUpdateAggregatesJob(config, writePool, readPool), + logger: logger, + readPool: readPool, + writePool: writePool, + updateAggregatesJob: jobs.NewUpdateAggregatesJob(jobs.UpdateAggregatesJobConfig{ + WritePool: writePool, + ReadPool: readPool, + Logger: logger, + }), } } func (a *AggregatesIndexer) Start(ctx context.Context) error { - // try to run every 5 minutes. Job may take longer than 5 minutes to complete - // and result in every 10 minutes. - a.updateAggregatesJob.ScheduleEvery(ctx, 5*time.Minute) - go a.updateAggregatesJob.Run(ctx) - - // wait on context to be cancelled - <-ctx.Done() - return ctx.Err() + a.logger.Info("Starting aggregates indexer") + // This job runs in a continous loop until the context is cancelled. + for { + select { + case <-ctx.Done(): + a.logger.Info("Shutting down aggregates indexer") + return ctx.Err() + default: + a.updateAggregatesJob.Run(ctx) + } + } } func (a *AggregatesIndexer) Close() { diff --git a/indexer/indexer.go b/indexer/indexer.go index bc622739..b6155dd0 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -14,13 +14,15 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) type CoreIndexer struct { - pool dbv1.DbPool - Config config.Config - logger *zap.Logger - closeCh chan struct{} + aggregatesIndexer *AggregatesIndexer + pool dbv1.DbPool + Config config.Config + logger *zap.Logger + closeCh chan struct{} } const ( @@ -39,9 +41,12 @@ func NewIndexer(config config.Config) *CoreIndexer { panic(fmt.Errorf("error connecting to database: %w", err)) } + aggregatesIndexer := NewAggregatesIndexer(config) + ci := &CoreIndexer{ - pool: pool, - Config: config, + aggregatesIndexer: aggregatesIndexer, + pool: pool, + Config: config, logger: logging.NewZapLogger(config). Named("CoreIndexer"), } @@ -50,6 +55,17 @@ func NewIndexer(config config.Config) *CoreIndexer { } func (ci *CoreIndexer) Start(ctx context.Context) error { + eg := errgroup.Group{} + eg.Go(func() error { + return ci.aggregatesIndexer.Start(ctx) + }) + eg.Go(func() error { + return ci.run(ctx) + }) + return eg.Wait() +} + +func (ci *CoreIndexer) run(ctx context.Context) error { sdk := sdk.NewAudiusdSDK(ci.Config.AudiusdURL) go logging.SyncOnTicks(ctx, ci.logger, time.Second*10) @@ -157,6 +173,7 @@ func (ci *CoreIndexer) handleManageEntity(dbTx dbv1.DBTX, logger *zap.Logger, em } func (ci *CoreIndexer) Close() { + ci.aggregatesIndexer.Close() ci.pool.Close() ci.logger.Sync() } diff --git a/jobs/job_runner.go b/jobs/job_runner.go deleted file mode 100644 index dc611fec..00000000 --- a/jobs/job_runner.go +++ /dev/null @@ -1,82 +0,0 @@ -package jobs - -import ( - "context" - "fmt" - "sync" - "time" - - "api.audius.co/config" - "api.audius.co/logging" - "go.uber.org/zap" -) - -type JobRunner interface { - Execute(ctx context.Context, logger *zap.Logger) error -} - -type BaseJob struct { - logger *zap.Logger - runner JobRunner - - mutex sync.Mutex - isRunning bool -} - -type BaseJobConfig struct { - config config.Config - jobName string - runner JobRunner -} - -func NewBaseJob(cfg BaseJobConfig) *BaseJob { - logger := logging.NewZapLogger(cfg.config).Named(cfg.jobName) - return &BaseJob{ - logger: logger, - runner: cfg.runner, - } -} - -// Run executes the job once -func (j *BaseJob) Run(ctx context.Context) error { - if err := j.run(ctx); err != nil { - j.logger.Error("Job run failed", zap.Error(err)) - return err - } - j.logger.Info("Job completed successfully") - return nil -} - -func (j *BaseJob) run(ctx context.Context) error { - j.mutex.Lock() - if j.isRunning { - j.mutex.Unlock() - return fmt.Errorf("job is already running") - } - j.isRunning = true - j.mutex.Unlock() - defer func() { - j.mutex.Lock() - j.isRunning = false - j.mutex.Unlock() - }() - - return j.runner.Execute(ctx, j.logger) -} - -// ScheduleEvery runs the job every `duration` until the context is cancelled. -func (j *BaseJob) ScheduleEvery(ctx context.Context, duration time.Duration) *BaseJob { - go func() { - ticker := time.NewTicker(duration) - defer ticker.Stop() - for { - select { - case <-ticker.C: - j.Run(ctx) - case <-ctx.Done(): - return - } - } - }() - return j -} diff --git a/jobs/update_aggregates.go b/jobs/update_aggregates.go index 377fbc63..732f1cad 100644 --- a/jobs/update_aggregates.go +++ b/jobs/update_aggregates.go @@ -6,24 +6,46 @@ import ( "time" dbv1 "api.audius.co/api/dbv1" - "api.audius.co/config" "api.audius.co/database" "github.com/jackc/pgx/v5" "go.uber.org/zap" ) -type UpdateAggregatesJobRunner struct { +type UpdateAggregatesJob struct { writePool database.DbPool readPool *dbv1.DBPools + logger *zap.Logger } const ( UpdateUserScoresBatchSize = 3000 ) -func (r *UpdateAggregatesJobRunner) Execute(ctx context.Context, logger *zap.Logger) error { +type UpdateAggregatesJobConfig struct { + WritePool database.DbPool + ReadPool *dbv1.DBPools + Logger *zap.Logger +} + +func NewUpdateAggregatesJob(config UpdateAggregatesJobConfig) *UpdateAggregatesJob { + return &UpdateAggregatesJob{ + writePool: config.WritePool, + readPool: config.ReadPool, + logger: config.Logger, + } +} + +func (j *UpdateAggregatesJob) Run(ctx context.Context) error { + if err := j.updateScores(ctx); err != nil { + j.logger.Error("Job run failed", zap.Error(err)) + return err + } + return nil +} + +func (j *UpdateAggregatesJob) updateScores(ctx context.Context) error { startTime := time.Now() - logger.Info("Starting user score update job") + j.logger.Info("Starting user score update job") type UserScoreRecord struct { UserID int32 @@ -43,9 +65,9 @@ func (r *UpdateAggregatesJobRunner) Execute(ctx context.Context, logger *zap.Log } if lastUserID != nil && lastCreatedAt != nil { filters = append(filters, `((u.created_at, u.user_id) < (@cursorTime::timestamptz, @cursorUserId::int))`) - logger.Info("Processing batch", zap.String("lastCreatedAt", lastCreatedAt.Format(time.RFC3339)), zap.Int32("lastUserID", *lastUserID)) + j.logger.Info("Processing batch", zap.String("lastCreatedAt", lastCreatedAt.Format(time.RFC3339)), zap.Int32("lastUserID", *lastUserID)) } else { - logger.Info("Processing first batch") + j.logger.Info("Processing first batch") } query := ` @@ -156,13 +178,13 @@ func (r *UpdateAggregatesJobRunner) Execute(ctx context.Context, logger *zap.Log ` readQueryStart := time.Now() - res, err := r.readPool.Query(ctx, query, pgx.NamedArgs{ + res, err := j.readPool.Query(ctx, query, pgx.NamedArgs{ "batchSize": UpdateUserScoresBatchSize, "cursorTime": lastCreatedAt, "cursorUserId": lastUserID, }) if err != nil { - logger.Error("Failed to execute batch read query", zap.Error(err)) + j.logger.Error("Failed to execute batch read query", zap.Error(err)) return err } readQueryDuration := time.Since(readQueryStart) @@ -191,7 +213,7 @@ func (r *UpdateAggregatesJobRunner) Execute(ctx context.Context, logger *zap.Log lastUserID = &userID writeQueryStart := time.Now() - tag, err := r.writePool.Exec(ctx, ` + tag, err := j.writePool.Exec(ctx, ` WITH s AS ( SELECT * FROM unnest($1::bigint[], $2::double precision[]) AS t(user_id, score) ) @@ -205,13 +227,13 @@ func (r *UpdateAggregatesJobRunner) Execute(ctx context.Context, logger *zap.Log `, userIDs, scores) writeQueryDuration := time.Since(writeQueryStart) if err != nil { - logger.Error("Failed to execute update query", zap.Error(err)) + j.logger.Error("Failed to execute update query", zap.Error(err)) return err } processedCount += fetchedRows scoreUpdatedCount += tag.RowsAffected() - logger.Info("Processed batch", + j.logger.Info("Processed batch", zap.Int("batch_size", fetchedRows), zap.Int32("last_user_id", userID), zap.String("last_created_at", lastCreatedAt.Format(time.RFC3339)), @@ -221,32 +243,13 @@ func (r *UpdateAggregatesJobRunner) Execute(ctx context.Context, logger *zap.Log zap.Duration("write_query_duration", writeQueryDuration)) if fetchedRows < UpdateUserScoresBatchSize { - logger.Info("Finished processing all users", zap.Int("total_processed", processedCount), zap.Int64("total_score_changes", scoreUpdatedCount), zap.Duration("duration", time.Since(startTime))) + j.logger.Info("Finished processing all users", + zap.Int("total_processed", processedCount), + zap.Int64("total_score_changes", scoreUpdatedCount), + zap.Duration("duration", time.Since(startTime))) break } } return nil } - -type UpdateAggregatesJob struct { - *BaseJob -} - -func NewUpdateAggregatesJob(config config.Config, writePool database.DbPool, readPool *dbv1.DBPools) *UpdateAggregatesJob { - runner := &UpdateAggregatesJobRunner{writePool: writePool, readPool: readPool} - baseJob := NewBaseJob(BaseJobConfig{ - config: config, - jobName: "UpdateAggregatesJob", - runner: runner, - }) - - return &UpdateAggregatesJob{ - BaseJob: baseJob, - } -} - -func (j *UpdateAggregatesJob) ScheduleEvery(ctx context.Context, duration time.Duration) *UpdateAggregatesJob { - j.BaseJob.ScheduleEvery(ctx, duration) - return j -} diff --git a/main.go b/main.go index a35c8fc3..b8560362 100644 --- a/main.go +++ b/main.go @@ -37,24 +37,6 @@ func main() { as := api.NewApiServer(config.Cfg) as.Serve() } - case "aggregates-indexer": - { - fmt.Println("Running aggregates-indexer...") - - aggregatesIndexer := core_indexer.NewAggregatesIndexer(config.Cfg) - - defer aggregatesIndexer.Close() - - // Capture termination signals for graceful shutdown of the indexer - ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGINT, os.Interrupt) - defer stop() - - if err := aggregatesIndexer.Start(ctx); err != nil { - if !errors.Is(err, context.Canceled) { - panic(err) - } - } - } case "indexer": { fmt.Println("Running indexer...") From 33baae4565e6de9ddfffda92a7301eb89a66d9a1 Mon Sep 17 00:00:00 2001 From: Randy Schott <1815175+schottra@users.noreply.github.com> Date: Fri, 24 Oct 2025 12:12:01 -0400 Subject: [PATCH 04/10] add db index to help with scores calculations --- ddl/migrations/0175_add_plays_hourly_index.sql | 6 ++++++ sql/01_schema.sql | 11 +++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) create mode 100644 ddl/migrations/0175_add_plays_hourly_index.sql diff --git a/ddl/migrations/0175_add_plays_hourly_index.sql b/ddl/migrations/0175_add_plays_hourly_index.sql new file mode 100644 index 00000000..bf0d48a5 --- /dev/null +++ b/ddl/migrations/0175_add_plays_hourly_index.sql @@ -0,0 +1,6 @@ +begin; + +-- Create index for plays by user ID and hourly timestamp +CREATE INDEX IF NOT EXISTS ix_plays_user_hour ON plays (user_id, date_trunc('hour', created_at)) WHERE user_id IS NOT NULL; + +COMMIT; \ No newline at end of file diff --git a/sql/01_schema.sql b/sql/01_schema.sql index 8e0c69b7..b9b93197 100644 --- a/sql/01_schema.sql +++ b/sql/01_schema.sql @@ -3,8 +3,8 @@ -- --- Dumped from database version 17.6 (Debian 17.6-2.pgdg13+1) --- Dumped by pg_dump version 17.6 (Debian 17.6-2.pgdg13+1) +-- Dumped from database version 17.6 (Debian 17.6-1.pgdg13+1) +-- Dumped by pg_dump version 17.6 (Debian 17.6-1.pgdg13+1) SET statement_timeout = 0; SET lock_timeout = 0; @@ -10245,6 +10245,13 @@ CREATE INDEX ix_plays_slot ON public.plays USING btree (slot); CREATE INDEX ix_plays_sol_signature ON public.plays USING btree (signature); +-- +-- Name: ix_plays_user_hour; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX ix_plays_user_hour ON public.plays USING btree (user_id, date_trunc('hour'::text, created_at)) WHERE (user_id IS NOT NULL); + + -- -- Name: ix_plays_user_track_date; Type: INDEX; Schema: public; Owner: - -- From ce3d93f4918d8d8731dbf08b55941e627820b4f4 Mon Sep 17 00:00:00 2001 From: Randy Schott <1815175+schottra@users.noreply.github.com> Date: Fri, 24 Oct 2025 18:58:28 -0400 Subject: [PATCH 05/10] more optimizations --- Makefile | 3 - ddl/functions/handle_play.sql | 46 +++++- ddl/functions/handle_user_challenges.sql | 26 +++- .../0175_add_plays_hourly_index.sql | 6 - ddl/migrations/0175_add_user_score_tables.sql | 38 +++++ jobs/update_aggregates.go | 142 ++++++++---------- 6 files changed, 164 insertions(+), 97 deletions(-) delete mode 100644 ddl/migrations/0175_add_plays_hourly_index.sql create mode 100644 ddl/migrations/0175_add_user_score_tables.sql diff --git a/Makefile b/Makefile index 60e28965..96cb7847 100644 --- a/Makefile +++ b/Makefile @@ -9,9 +9,6 @@ pretty:: indexer:: wgo run -file .go -debounce 10ms main.go indexer -aggregates-indexer:: - wgo run -file .go -debounce 10ms main.go aggregates-indexer - solana-indexer:: wgo run -file .go -debounce 10ms main.go solana-indexer diff --git a/ddl/functions/handle_play.sql b/ddl/functions/handle_play.sql index c3f8bc8a..d5319f43 100644 --- a/ddl/functions/handle_play.sql +++ b/ddl/functions/handle_play.sql @@ -9,7 +9,7 @@ begin insert into aggregate_plays (play_item_id, count) values (new.play_item_id, 0) on conflict do nothing; update aggregate_plays - set count = count + 1 + set count = count + 1 where play_item_id = new.play_item_id returning count into new_listen_count; @@ -24,8 +24,8 @@ begin and timestamp = date_trunc('month', new.created_at) and country = coalesce(new.country, ''); - select new_listen_count - into milestone + select new_listen_count + into milestone where new_listen_count in (10,25,50,100,250,500,1000,2500,5000,10000,25000,50000,100000,250000,500000,1000000); if milestone is not null then @@ -51,6 +51,43 @@ begin on conflict do nothing; end if; end if; + + -- update listener's aggregates if applicable + if new.user_id is not null then + -- Insert or update user_distinct_play_hours + -- Only increment if the play's hour is newer than the user's last updated hour + insert into user_distinct_play_hours (user_id, hours_with_play, updated_at) + values (new.user_id, 1, date_trunc('hour', new.created_at)) + on conflict (user_id) do update set + hours_with_play = case + when date_trunc('hour', new.created_at) > date_trunc('hour', user_distinct_play_hours.updated_at) + then user_distinct_play_hours.hours_with_play + 1 + else user_distinct_play_hours.hours_with_play + end, + updated_at = case + when date_trunc('hour', new.created_at) > date_trunc('hour', user_distinct_play_hours.updated_at) + then new.created_at + else user_distinct_play_hours.updated_at + end; + + -- update user_distinct_play_tracks + -- Only increment if this is the first time this user has played this track + insert into user_distinct_play_tracks (user_id, track_count, updated_at) + values (new.user_id, 1, new.created_at) + on conflict (user_id) do update set + track_count = case + when not exists ( + select 1 from plays p + where p.user_id = new.user_id + and p.play_item_id = new.play_item_id + and p.id != new.id + ) + then user_distinct_play_tracks.track_count + 1 + else user_distinct_play_tracks.track_count + end, + updated_at = new.created_at; + end if; + return null; exception @@ -67,5 +104,4 @@ do $$ begin for each row execute procedure handle_play(); exception when others then null; -end $$; - +end $$; \ No newline at end of file diff --git a/ddl/functions/handle_user_challenges.sql b/ddl/functions/handle_user_challenges.sql index ed4e2490..a701c113 100644 --- a/ddl/functions/handle_user_challenges.sql +++ b/ddl/functions/handle_user_challenges.sql @@ -44,7 +44,7 @@ begin 'challenge_reward', 'challenge_reward:' || new.user_id || ':challenge:' || new.challenge_id || ':specifier:' || new.specifier, new.user_id, - case + case when new.challenge_id = 'e' then json_build_object( 'specifier', new.specifier, @@ -62,9 +62,9 @@ begin ) on conflict do nothing; else - -- transactional notifications cover this + -- transactional notifications cover this if (new.challenge_id != 'b' and new.challenge_id != 's') then - select id into existing_notification + select id into existing_notification from notification where type = 'reward_in_cooldown' and @@ -89,6 +89,26 @@ begin end if; end if; end if; + + -- update user fast challenge count + INSERT INTO user_score_features (user_id, challenge_count, updated_at) + SELECT + NEW.user_id, + COUNT(*)::int AS challenge_count, + now() + FROM user_challenges uc + JOIN users u + ON u.user_id = uc.user_id + WHERE uc.user_id = NEW.user_id + AND uc.is_complete + AND uc.challenge_id NOT IN ('m','b') + AND uc.completed_at <= (u.created_at + interval '3 minutes') + ON CONFLICT (user_id) DO UPDATE + SET challenge_count = EXCLUDED.challenge_count, + updated_at = EXCLUDED.updated_at + WHERE user_score_features.challenge_count IS DISTINCT FROM EXCLUDED.challenge_count; + + end if; return new; diff --git a/ddl/migrations/0175_add_plays_hourly_index.sql b/ddl/migrations/0175_add_plays_hourly_index.sql deleted file mode 100644 index bf0d48a5..00000000 --- a/ddl/migrations/0175_add_plays_hourly_index.sql +++ /dev/null @@ -1,6 +0,0 @@ -begin; - --- Create index for plays by user ID and hourly timestamp -CREATE INDEX IF NOT EXISTS ix_plays_user_hour ON plays (user_id, date_trunc('hour', created_at)) WHERE user_id IS NOT NULL; - -COMMIT; \ No newline at end of file diff --git a/ddl/migrations/0175_add_user_score_tables.sql b/ddl/migrations/0175_add_user_score_tables.sql new file mode 100644 index 00000000..ffe4f48e --- /dev/null +++ b/ddl/migrations/0175_add_user_score_tables.sql @@ -0,0 +1,38 @@ +begin; + +-- Create index for plays by user ID and hourly timestamp +CREATE INDEX IF NOT EXISTS ix_plays_user_hour ON plays (user_id, date_trunc('hour', created_at)) WHERE user_id IS NOT NULL; + + +CREATE TABLE IF NOT EXISTS user_distinct_play_hours ( + user_id integer PRIMARY KEY, + hours_with_play integer NOT NULL DEFAULT 0, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +COMMENT ON TABLE user_distinct_play_hours IS 'Tracks the number of distinct hours in which a user has listened to a track'; + +CREATE TABLE IF NOT EXISTS user_distinct_play_tracks ( + user_id integer PRIMARY KEY, + track_count integer NOT NULL DEFAULT 0, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +COMMENT ON TABLE user_distinct_play_tracks IS 'Tracks the number of distinct tracks a user has listened to'; + + +CREATE TABLE IF NOT EXISTS user_score_features ( + user_id integer PRIMARY KEY, + challenge_count integer default 0, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +COMMENT ON TABLE user_score_features IS 'Tracks some features used in user score calculation'; +COMMENT ON COLUMN user_score_features.challenge_count IS 'Tracks the number of fast challenges auser has completed'; + +-- Helps with karma calculation in user score queries +CREATE INDEX IF NOT EXISTS ix_au_user_follows + ON aggregate_user (user_id) + INCLUDE (follower_count, following_count); + +COMMIT; \ No newline at end of file diff --git a/jobs/update_aggregates.go b/jobs/update_aggregates.go index 732f1cad..093d235e 100644 --- a/jobs/update_aggregates.go +++ b/jobs/update_aggregates.go @@ -18,7 +18,7 @@ type UpdateAggregatesJob struct { } const ( - UpdateUserScoresBatchSize = 3000 + UpdateUserScoresBatchSize = 5000 ) type UpdateAggregatesJobConfig struct { @@ -71,94 +71,76 @@ func (j *UpdateAggregatesJob) updateScores(ctx context.Context) error { } query := ` - WITH batch AS MATERIALIZED ( - SELECT u.user_id, u.created_at - FROM users u - WHERE ` + strings.Join(filters, " AND ") + ` - ORDER BY u.created_at DESC, u.user_id DESC - LIMIT @batchSize + WITH batch AS ( + SELECT u.user_id, u.created_at + FROM users u + WHERE ` + strings.Join(filters, " AND ") + ` + ORDER BY u.created_at DESC, u.user_id DESC + LIMIT @batchSize ), - ids AS MATERIALIZED ( - SELECT array_agg(user_id) AS ids FROM batch - ), - - /* plays: split into two small per-user aggregates */ - hours_agg AS ( - SELECT h.user_id, COUNT(*)::bigint AS play_hours - FROM ( - SELECT DISTINCT p.user_id, date_trunc('hour', p.created_at) AS hr - FROM plays p, ids - WHERE p.user_id = ANY(ids.ids) - ) h - GROUP BY h.user_id - ), - tracks_agg AS ( - SELECT p.user_id, COUNT(DISTINCT p.play_item_id)::bigint AS distinct_tracks - FROM plays p, ids - WHERE p.user_id = ANY(ids.ids) - GROUP BY p.user_id - ), - fast_challenge_completion AS ( - SELECT b.user_id, COUNT(*)::bigint AS challenge_count - FROM batch b - JOIN user_challenges uc - ON uc.user_id = b.user_id - AND uc.is_complete - AND uc.challenge_id NOT IN ('m','b') - AND uc.completed_at <= (b.created_at + interval '3 minutes') - GROUP BY b.user_id + SELECT b.user_id, COUNT(*)::bigint AS challenge_count + FROM batch b + JOIN user_challenges uc + ON uc.user_id = b.user_id + AND uc.is_complete + AND uc.challenge_id NOT IN ('m','b') + AND uc.completed_at <= (b.created_at + interval '3 minutes') + GROUP BY b.user_id ), chat_blocks AS ( - SELECT b.user_id, COUNT(*)::bigint AS block_count - FROM batch b - JOIN chat_blocked_users c ON c.blockee_user_id = b.user_id - GROUP BY b.user_id + SELECT b.user_id, COUNT(*)::bigint AS block_count + FROM batch b + JOIN chat_blocked_users c ON c.blockee_user_id = b.user_id + GROUP BY b.user_id ), followers_karma AS ( - SELECT b.user_id, - LEAST((SUM(fau.follower_count) / 100)::bigint, 100::bigint) AS karma_sum - FROM batch b - JOIN follows f - ON f.followee_user_id = b.user_id - AND f.is_delete = FALSE - JOIN aggregate_user fau - ON fau.user_id = f.follower_user_id - AND fau.following_count < 10000 - GROUP BY b.user_id + SELECT b.user_id, + LEAST((SUM(fau.follower_count) / 100)::bigint, 100::bigint) AS karma_sum + FROM batch b + JOIN follows f + ON f.followee_user_id = b.user_id + AND f.is_delete = FALSE + JOIN aggregate_user fau + ON fau.user_id = f.follower_user_id + AND fau.following_count < 10000 + GROUP BY b.user_id ), - - /* compute features for scoring */ features AS ( - SELECT - u.user_id, - b.created_at, - COALESCE(h.play_hours, 0)::bigint AS play_count, -- distinct hours - COALESCE(t.distinct_tracks, 0)::bigint AS distinct_tracks_played, -- distinct tracks - COALESCE(c.challenge_count, 0)::bigint AS challenge_count, - COALESCE(au.following_count, 0)::bigint AS following_count, - COALESCE(au.follower_count, 0)::bigint AS follower_count, - COALESCE(cb.block_count, 0)::bigint AS chat_block_count, - ( ((u.handle_lc ILIKE '%audius%') OR (lower(u.name) ILIKE '%audius%')) - AND u.is_verified = FALSE ) AS is_audius_impersonator, - ( u.is_verified = FALSE - AND (u.handle_lc ILIKE '%airdrop%' OR lower(u.name) LIKE '%airdrop%') ) - AS has_badwords, - CASE - WHEN COALESCE(au.follower_count, 0) > 1000 THEN 100 - WHEN COALESCE(au.follower_count, 0) = 0 THEN 0 - ELSE COALESCE(k.karma_sum, 0) - END::bigint AS karma - FROM batch b - JOIN users u ON u.user_id = b.user_id - LEFT JOIN hours_agg h ON h.user_id = b.user_id - LEFT JOIN tracks_agg t ON t.user_id = b.user_id - LEFT JOIN fast_challenge_completion c ON c.user_id = b.user_id - LEFT JOIN chat_blocks cb ON cb.user_id = b.user_id - LEFT JOIN aggregate_user au ON au.user_id = b.user_id - LEFT JOIN followers_karma k ON k.user_id = b.user_id + SELECT + u.user_id, + b.created_at, + COALESCE(( + SELECT udph.hours_with_play + FROM user_distinct_play_hours udph + WHERE udph.user_id = b.user_id + ), 0) AS play_count, + COALESCE(( + SELECT t.track_count + FROM user_distinct_play_tracks t + WHERE t.user_id = b.user_id + ),0)::bigint AS distinct_tracks_played, + COALESCE(usf.challenge_count, 0)::bigint AS challenge_count, + COALESCE(au.following_count, 0)::bigint AS following_count, + COALESCE(au.follower_count, 0)::bigint AS follower_count, + COALESCE(cb.block_count, 0)::bigint AS chat_block_count, + ( ((u.handle_lc ILIKE '%audius%') OR (lower(u.name) ILIKE '%audius%')) + AND u.is_verified = FALSE ) AS is_audius_impersonator, + ( u.is_verified = FALSE + AND (u.handle_lc ILIKE '%airdrop%' OR lower(u.name) LIKE '%airdrop%') ) + AS has_badwords, + CASE + WHEN COALESCE(au.follower_count, 0) > 1000 THEN 100 + WHEN COALESCE(au.follower_count, 0) = 0 THEN 0 + ELSE COALESCE(k.karma_sum, 0) + END::bigint AS karma + FROM batch b + JOIN users u ON u.user_id = b.user_id + LEFT JOIN user_score_features usf ON usf.user_id = b.user_id + LEFT JOIN chat_blocks cb ON cb.user_id = b.user_id + LEFT JOIN aggregate_user au ON au.user_id = b.user_id + LEFT JOIN followers_karma k ON k.user_id = b.user_id ) - SELECT f.user_id, f.created_at, From 021b74dddc24c78bfac7d12111f8dcece1e3c94a Mon Sep 17 00:00:00 2001 From: Randy Schott <1815175+schottra@users.noreply.github.com> Date: Fri, 24 Oct 2025 19:10:12 -0400 Subject: [PATCH 06/10] comments on indexes --- ddl/migrations/0175_add_user_score_tables.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddl/migrations/0175_add_user_score_tables.sql b/ddl/migrations/0175_add_user_score_tables.sql index ffe4f48e..5c6bb155 100644 --- a/ddl/migrations/0175_add_user_score_tables.sql +++ b/ddl/migrations/0175_add_user_score_tables.sql @@ -1,7 +1,7 @@ begin; --- Create index for plays by user ID and hourly timestamp CREATE INDEX IF NOT EXISTS ix_plays_user_hour ON plays (user_id, date_trunc('hour', created_at)) WHERE user_id IS NOT NULL; +COMMENT ON INDEX ix_plays_user_hour IS 'Helps compute distinct hourly plays by user id'; CREATE TABLE IF NOT EXISTS user_distinct_play_hours ( @@ -30,9 +30,9 @@ CREATE TABLE IF NOT EXISTS user_score_features ( COMMENT ON TABLE user_score_features IS 'Tracks some features used in user score calculation'; COMMENT ON COLUMN user_score_features.challenge_count IS 'Tracks the number of fast challenges auser has completed'; --- Helps with karma calculation in user score queries CREATE INDEX IF NOT EXISTS ix_au_user_follows ON aggregate_user (user_id) INCLUDE (follower_count, following_count); +COMMENT ON INDEX ix_au_user_follows IS 'Fast lookup for fields use in karma calculation'; COMMIT; \ No newline at end of file From abc15bb7fa96e5befd17e38368a9b98fd6757283 Mon Sep 17 00:00:00 2001 From: Randy Schott <1815175+schottra@users.noreply.github.com> Date: Fri, 24 Oct 2025 19:11:49 -0400 Subject: [PATCH 07/10] update schema --- sql/01_schema.sql | 175 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 169 insertions(+), 6 deletions(-) diff --git a/sql/01_schema.sql b/sql/01_schema.sql index b9b93197..b02f2067 100644 --- a/sql/01_schema.sql +++ b/sql/01_schema.sql @@ -2498,7 +2498,7 @@ begin 'challenge_reward', 'challenge_reward:' || new.user_id || ':challenge:' || new.challenge_id || ':specifier:' || new.specifier, new.user_id, - case + case when new.challenge_id = 'e' then json_build_object( 'specifier', new.specifier, @@ -2516,9 +2516,9 @@ begin ) on conflict do nothing; else - -- transactional notifications cover this + -- transactional notifications cover this if (new.challenge_id != 'b' and new.challenge_id != 's') then - select id into existing_notification + select id into existing_notification from notification where type = 'reward_in_cooldown' and @@ -2543,6 +2543,26 @@ begin end if; end if; end if; + + -- update user fast challenge count + INSERT INTO user_score_features (user_id, challenge_count, updated_at) + SELECT + NEW.user_id, + COUNT(*)::int AS challenge_count, + now() + FROM user_challenges uc + JOIN users u + ON u.user_id = uc.user_id + WHERE uc.user_id = NEW.user_id + AND uc.is_complete + AND uc.challenge_id NOT IN ('m','b') + AND uc.completed_at <= (u.created_at + interval '3 minutes') + ON CONFLICT (user_id) DO UPDATE + SET challenge_count = EXCLUDED.challenge_count, + updated_at = EXCLUDED.updated_at + WHERE user_score_features.challenge_count IS DISTINCT FROM EXCLUDED.challenge_count; + + end if; return new; @@ -2568,7 +2588,7 @@ begin insert into aggregate_plays (play_item_id, count) values (new.play_item_id, 0) on conflict do nothing; update aggregate_plays - set count = count + 1 + set count = count + 1 where play_item_id = new.play_item_id returning count into new_listen_count; @@ -2583,8 +2603,8 @@ begin and timestamp = date_trunc('month', new.created_at) and country = coalesce(new.country, ''); - select new_listen_count - into milestone + select new_listen_count + into milestone where new_listen_count in (10,25,50,100,250,500,1000,2500,5000,10000,25000,50000,100000,250000,500000,1000000); if milestone is not null then @@ -2610,6 +2630,43 @@ begin on conflict do nothing; end if; end if; + + -- update listener's aggregates if applicable + if new.user_id is not null then + -- Insert or update user_distinct_play_hours + -- Only increment if the play's hour is newer than the user's last updated hour + insert into user_distinct_play_hours (user_id, hours_with_play, updated_at) + values (new.user_id, 1, date_trunc('hour', new.created_at)) + on conflict (user_id) do update set + hours_with_play = case + when date_trunc('hour', new.created_at) > date_trunc('hour', user_distinct_play_hours.updated_at) + then user_distinct_play_hours.hours_with_play + 1 + else user_distinct_play_hours.hours_with_play + end, + updated_at = case + when date_trunc('hour', new.created_at) > date_trunc('hour', user_distinct_play_hours.updated_at) + then new.created_at + else user_distinct_play_hours.updated_at + end; + + -- update user_distinct_play_tracks + -- Only increment if this is the first time this user has played this track + insert into user_distinct_play_tracks (user_id, track_count, updated_at) + values (new.user_id, 1, new.created_at) + on conflict (user_id) do update set + track_count = case + when not exists ( + select 1 from plays p + where p.user_id = new.user_id + and p.play_item_id = new.play_item_id + and p.id != new.id + ) + then user_distinct_play_tracks.track_count + 1 + else user_distinct_play_tracks.track_count + end, + updated_at = new.created_at; + end if; + return null; exception @@ -8338,6 +8395,42 @@ CREATE TABLE public.user_delist_statuses ( ); +-- +-- Name: user_distinct_play_hours; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.user_distinct_play_hours ( + user_id integer NOT NULL, + hours_with_play integer DEFAULT 0 NOT NULL, + updated_at timestamp with time zone DEFAULT now() NOT NULL +); + + +-- +-- Name: TABLE user_distinct_play_hours; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.user_distinct_play_hours IS 'Tracks the number of distinct hours in which a user has listened to a track'; + + +-- +-- Name: user_distinct_play_tracks; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.user_distinct_play_tracks ( + user_id integer NOT NULL, + track_count integer DEFAULT 0 NOT NULL, + updated_at timestamp with time zone DEFAULT now() NOT NULL +); + + +-- +-- Name: TABLE user_distinct_play_tracks; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.user_distinct_play_tracks IS 'Tracks the number of distinct tracks a user has listened to'; + + -- -- Name: user_events; Type: TABLE; Schema: public; Owner: - -- @@ -8427,6 +8520,31 @@ CREATE TABLE public.user_pubkeys ( ); +-- +-- Name: user_score_features; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.user_score_features ( + user_id integer NOT NULL, + challenge_count integer DEFAULT 0, + updated_at timestamp with time zone DEFAULT now() NOT NULL +); + + +-- +-- Name: TABLE user_score_features; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.user_score_features IS 'Tracks some features used in user score calculation'; + + +-- +-- Name: COLUMN user_score_features.challenge_count; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.user_score_features.challenge_count IS 'Tracks the number of fast challenges auser has completed'; + + -- -- Name: user_tips; Type: TABLE; Schema: public; Owner: - -- @@ -9686,6 +9804,22 @@ ALTER TABLE ONLY public.user_delist_statuses ADD CONSTRAINT user_delist_statuses_pkey PRIMARY KEY (created_at, user_id, delisted); +-- +-- Name: user_distinct_play_hours user_distinct_play_hours_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.user_distinct_play_hours + ADD CONSTRAINT user_distinct_play_hours_pkey PRIMARY KEY (user_id); + + +-- +-- Name: user_distinct_play_tracks user_distinct_play_tracks_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.user_distinct_play_tracks + ADD CONSTRAINT user_distinct_play_tracks_pkey PRIMARY KEY (user_id); + + -- -- Name: user_events user_events_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -9718,6 +9852,14 @@ ALTER TABLE ONLY public.user_pubkeys ADD CONSTRAINT user_pubkeys_pkey PRIMARY KEY (user_id); +-- +-- Name: user_score_features user_score_features_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.user_score_features + ADD CONSTRAINT user_score_features_pkey PRIMARY KEY (user_id); + + -- -- Name: user_tips user_tips_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -10182,6 +10324,20 @@ CREATE INDEX ix_associated_wallets_user_id ON public.associated_wallets USING bt CREATE INDEX ix_associated_wallets_wallet ON public.associated_wallets USING btree (wallet); +-- +-- Name: ix_au_user_follows; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX ix_au_user_follows ON public.aggregate_user USING btree (user_id) INCLUDE (follower_count, following_count); + + +-- +-- Name: INDEX ix_au_user_follows; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON INDEX public.ix_au_user_follows IS 'Fast lookup for fields use in karma calculation'; + + -- -- Name: ix_audio_transactions_history_slot; Type: INDEX; Schema: public; Owner: - -- @@ -10252,6 +10408,13 @@ CREATE INDEX ix_plays_sol_signature ON public.plays USING btree (signature); CREATE INDEX ix_plays_user_hour ON public.plays USING btree (user_id, date_trunc('hour'::text, created_at)) WHERE (user_id IS NOT NULL); +-- +-- Name: INDEX ix_plays_user_hour; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON INDEX public.ix_plays_user_hour IS 'Helps compute distinct hourly plays by user id'; + + -- -- Name: ix_plays_user_track_date; Type: INDEX; Schema: public; Owner: - -- From 9915392f28ef544b72c0315a99a004b5b1337fde Mon Sep 17 00:00:00 2001 From: Randy Schott <1815175+schottra@users.noreply.github.com> Date: Fri, 24 Oct 2025 19:14:40 -0400 Subject: [PATCH 08/10] rename --- ...es_indexer.go => aggregates_calculator.go} | 16 +++++++------- indexer/indexer.go | 22 +++++++++---------- 2 files changed, 19 insertions(+), 19 deletions(-) rename indexer/{aggregates_indexer.go => aggregates_calculator.go} (71%) diff --git a/indexer/aggregates_indexer.go b/indexer/aggregates_calculator.go similarity index 71% rename from indexer/aggregates_indexer.go rename to indexer/aggregates_calculator.go index 7ca03e18..a0044824 100644 --- a/indexer/aggregates_indexer.go +++ b/indexer/aggregates_calculator.go @@ -12,15 +12,15 @@ import ( "go.uber.org/zap" ) -type AggregatesIndexer struct { +type AggregatesCalculator struct { logger *zap.Logger readPool *dbv1.DBPools writePool database.DbPool updateAggregatesJob *jobs.UpdateAggregatesJob } -func NewAggregatesIndexer(config config.Config) *AggregatesIndexer { - logger := logging.NewZapLogger(config).Named("AggregatesIndexer") +func NewAggregatesCalculator(config config.Config) *AggregatesCalculator { + logger := logging.NewZapLogger(config).Named("AggregatesCalculator") readPool, err := dbv1.NewDBPools([]string{config.ReadDbUrl}, logger, config.Env, config.ZapLevel) if err != nil { panic(err) @@ -30,7 +30,7 @@ func NewAggregatesIndexer(config config.Config) *AggregatesIndexer { panic(err) } - return &AggregatesIndexer{ + return &AggregatesCalculator{ logger: logger, readPool: readPool, writePool: writePool, @@ -42,13 +42,13 @@ func NewAggregatesIndexer(config config.Config) *AggregatesIndexer { } } -func (a *AggregatesIndexer) Start(ctx context.Context) error { - a.logger.Info("Starting aggregates indexer") +func (a *AggregatesCalculator) Start(ctx context.Context) error { + a.logger.Info("Starting aggregates calculator") // This job runs in a continous loop until the context is cancelled. for { select { case <-ctx.Done(): - a.logger.Info("Shutting down aggregates indexer") + a.logger.Info("Shutting down aggregates calculator") return ctx.Err() default: a.updateAggregatesJob.Run(ctx) @@ -56,7 +56,7 @@ func (a *AggregatesIndexer) Start(ctx context.Context) error { } } -func (a *AggregatesIndexer) Close() { +func (a *AggregatesCalculator) Close() { a.readPool.Close() a.writePool.Close() } diff --git a/indexer/indexer.go b/indexer/indexer.go index b6155dd0..9cd504ba 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -18,11 +18,11 @@ import ( ) type CoreIndexer struct { - aggregatesIndexer *AggregatesIndexer - pool dbv1.DbPool - Config config.Config - logger *zap.Logger - closeCh chan struct{} + aggregatesCalculator *AggregatesCalculator + pool dbv1.DbPool + Config config.Config + logger *zap.Logger + closeCh chan struct{} } const ( @@ -41,12 +41,12 @@ func NewIndexer(config config.Config) *CoreIndexer { panic(fmt.Errorf("error connecting to database: %w", err)) } - aggregatesIndexer := NewAggregatesIndexer(config) + aggregatesCalculator := NewAggregatesCalculator(config) ci := &CoreIndexer{ - aggregatesIndexer: aggregatesIndexer, - pool: pool, - Config: config, + aggregatesCalculator: aggregatesCalculator, + pool: pool, + Config: config, logger: logging.NewZapLogger(config). Named("CoreIndexer"), } @@ -57,7 +57,7 @@ func NewIndexer(config config.Config) *CoreIndexer { func (ci *CoreIndexer) Start(ctx context.Context) error { eg := errgroup.Group{} eg.Go(func() error { - return ci.aggregatesIndexer.Start(ctx) + return ci.aggregatesCalculator.Start(ctx) }) eg.Go(func() error { return ci.run(ctx) @@ -173,7 +173,7 @@ func (ci *CoreIndexer) handleManageEntity(dbTx dbv1.DBTX, logger *zap.Logger, em } func (ci *CoreIndexer) Close() { - ci.aggregatesIndexer.Close() + ci.aggregatesCalculator.Close() ci.pool.Close() ci.logger.Sync() } From 7097c6237b541146b5e9d07085e7359b00e4af14 Mon Sep 17 00:00:00 2001 From: Randy Schott <1815175+schottra@users.noreply.github.com> Date: Mon, 27 Oct 2025 10:29:17 -0400 Subject: [PATCH 09/10] update models --- api/dbv1/models.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/api/dbv1/models.go b/api/dbv1/models.go index 8a243b3c..e098a11a 100644 --- a/api/dbv1/models.go +++ b/api/dbv1/models.go @@ -2300,6 +2300,20 @@ type UserDelistStatus struct { Reason DelistUserReason `json:"reason"` } +// Tracks the number of distinct hours in which a user has listened to a track +type UserDistinctPlayHour struct { + UserID int32 `json:"user_id"` + HoursWithPlay int32 `json:"hours_with_play"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` +} + +// Tracks the number of distinct tracks a user has listened to +type UserDistinctPlayTrack struct { + UserID int32 `json:"user_id"` + TrackCount int32 `json:"track_count"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` +} + type UserEvent struct { ID int32 `json:"id"` Blockhash pgtype.Text `json:"blockhash"` @@ -2329,6 +2343,14 @@ type UserPubkey struct { PubkeyBase64 string `json:"pubkey_base64"` } +// Tracks some features used in user score calculation +type UserScoreFeature struct { + UserID int32 `json:"user_id"` + // Tracks the number of fast challenges auser has completed + ChallengeCount pgtype.Int4 `json:"challenge_count"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` +} + type UserTip struct { Slot int32 `json:"slot"` Signature string `json:"signature"` From 3de5f79501ec484676d43462c6c0504edaff35f7 Mon Sep 17 00:00:00 2001 From: Randy Schott <1815175+schottra@users.noreply.github.com> Date: Mon, 27 Oct 2025 10:52:40 -0400 Subject: [PATCH 10/10] less logs, recover from panic --- jobs/update_aggregates.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/jobs/update_aggregates.go b/jobs/update_aggregates.go index 093d235e..5f09f599 100644 --- a/jobs/update_aggregates.go +++ b/jobs/update_aggregates.go @@ -36,6 +36,12 @@ func NewUpdateAggregatesJob(config UpdateAggregatesJobConfig) *UpdateAggregatesJ } func (j *UpdateAggregatesJob) Run(ctx context.Context) error { + defer func() { + if r := recover(); r != nil { + j.logger.Error("Job run panicked", zap.Any("panic", r)) + } + }() + if err := j.updateScores(ctx); err != nil { j.logger.Error("Job run failed", zap.Error(err)) return err @@ -65,9 +71,9 @@ func (j *UpdateAggregatesJob) updateScores(ctx context.Context) error { } if lastUserID != nil && lastCreatedAt != nil { filters = append(filters, `((u.created_at, u.user_id) < (@cursorTime::timestamptz, @cursorUserId::int))`) - j.logger.Info("Processing batch", zap.String("lastCreatedAt", lastCreatedAt.Format(time.RFC3339)), zap.Int32("lastUserID", *lastUserID)) + j.logger.Debug("Processing batch", zap.String("lastCreatedAt", lastCreatedAt.Format(time.RFC3339)), zap.Int32("lastUserID", *lastUserID)) } else { - j.logger.Info("Processing first batch") + j.logger.Debug("Processing first batch") } query := ` @@ -215,7 +221,7 @@ func (j *UpdateAggregatesJob) updateScores(ctx context.Context) error { processedCount += fetchedRows scoreUpdatedCount += tag.RowsAffected() - j.logger.Info("Processed batch", + j.logger.Debug("Processed batch", zap.Int("batch_size", fetchedRows), zap.Int32("last_user_id", userID), zap.String("last_created_at", lastCreatedAt.Format(time.RFC3339)),