diff --git a/api/dbv1/models.go b/api/dbv1/models.go index 8a243b3c..e098a11a 100644 --- a/api/dbv1/models.go +++ b/api/dbv1/models.go @@ -2300,6 +2300,20 @@ type UserDelistStatus struct { Reason DelistUserReason `json:"reason"` } +// Tracks the number of distinct hours in which a user has listened to a track +type UserDistinctPlayHour struct { + UserID int32 `json:"user_id"` + HoursWithPlay int32 `json:"hours_with_play"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` +} + +// Tracks the number of distinct tracks a user has listened to +type UserDistinctPlayTrack struct { + UserID int32 `json:"user_id"` + TrackCount int32 `json:"track_count"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` +} + type UserEvent struct { ID int32 `json:"id"` Blockhash pgtype.Text `json:"blockhash"` @@ -2329,6 +2343,14 @@ type UserPubkey struct { PubkeyBase64 string `json:"pubkey_base64"` } +// Tracks some features used in user score calculation +type UserScoreFeature struct { + UserID int32 `json:"user_id"` + // Tracks the number of fast challenges auser has completed + ChallengeCount pgtype.Int4 `json:"challenge_count"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` +} + type UserTip struct { Slot int32 `json:"slot"` Signature string `json:"signature"` diff --git a/ddl/functions/handle_play.sql b/ddl/functions/handle_play.sql index c3f8bc8a..d5319f43 100644 --- a/ddl/functions/handle_play.sql +++ b/ddl/functions/handle_play.sql @@ -9,7 +9,7 @@ begin insert into aggregate_plays (play_item_id, count) values (new.play_item_id, 0) on conflict do nothing; update aggregate_plays - set count = count + 1 + set count = count + 1 where play_item_id = new.play_item_id returning count into new_listen_count; @@ -24,8 +24,8 @@ begin and timestamp = date_trunc('month', new.created_at) and country = coalesce(new.country, ''); - select new_listen_count - into milestone + select new_listen_count + into milestone where new_listen_count in (10,25,50,100,250,500,1000,2500,5000,10000,25000,50000,100000,250000,500000,1000000); if milestone is not null then @@ -51,6 +51,43 @@ begin on conflict do nothing; end if; end if; + + -- update listener's aggregates if applicable + if new.user_id is not null then + -- Insert or update user_distinct_play_hours + -- Only increment if the play's hour is newer than the user's last updated hour + insert into user_distinct_play_hours (user_id, hours_with_play, updated_at) + values (new.user_id, 1, date_trunc('hour', new.created_at)) + on conflict (user_id) do update set + hours_with_play = case + when date_trunc('hour', new.created_at) > date_trunc('hour', user_distinct_play_hours.updated_at) + then user_distinct_play_hours.hours_with_play + 1 + else user_distinct_play_hours.hours_with_play + end, + updated_at = case + when date_trunc('hour', new.created_at) > date_trunc('hour', user_distinct_play_hours.updated_at) + then new.created_at + else user_distinct_play_hours.updated_at + end; + + -- update user_distinct_play_tracks + -- Only increment if this is the first time this user has played this track + insert into user_distinct_play_tracks (user_id, track_count, updated_at) + values (new.user_id, 1, new.created_at) + on conflict (user_id) do update set + track_count = case + when not exists ( + select 1 from plays p + where p.user_id = new.user_id + and p.play_item_id = new.play_item_id + and p.id != new.id + ) + then user_distinct_play_tracks.track_count + 1 + else user_distinct_play_tracks.track_count + end, + updated_at = new.created_at; + end if; + return null; exception @@ -67,5 +104,4 @@ do $$ begin for each row execute procedure handle_play(); exception when others then null; -end $$; - +end $$; \ No newline at end of file diff --git a/ddl/functions/handle_user_challenges.sql b/ddl/functions/handle_user_challenges.sql index ed4e2490..a701c113 100644 --- a/ddl/functions/handle_user_challenges.sql +++ b/ddl/functions/handle_user_challenges.sql @@ -44,7 +44,7 @@ begin 'challenge_reward', 'challenge_reward:' || new.user_id || ':challenge:' || new.challenge_id || ':specifier:' || new.specifier, new.user_id, - case + case when new.challenge_id = 'e' then json_build_object( 'specifier', new.specifier, @@ -62,9 +62,9 @@ begin ) on conflict do nothing; else - -- transactional notifications cover this + -- transactional notifications cover this if (new.challenge_id != 'b' and new.challenge_id != 's') then - select id into existing_notification + select id into existing_notification from notification where type = 'reward_in_cooldown' and @@ -89,6 +89,26 @@ begin end if; end if; end if; + + -- update user fast challenge count + INSERT INTO user_score_features (user_id, challenge_count, updated_at) + SELECT + NEW.user_id, + COUNT(*)::int AS challenge_count, + now() + FROM user_challenges uc + JOIN users u + ON u.user_id = uc.user_id + WHERE uc.user_id = NEW.user_id + AND uc.is_complete + AND uc.challenge_id NOT IN ('m','b') + AND uc.completed_at <= (u.created_at + interval '3 minutes') + ON CONFLICT (user_id) DO UPDATE + SET challenge_count = EXCLUDED.challenge_count, + updated_at = EXCLUDED.updated_at + WHERE user_score_features.challenge_count IS DISTINCT FROM EXCLUDED.challenge_count; + + end if; return new; diff --git a/ddl/migrations/0175_add_user_score_tables.sql b/ddl/migrations/0175_add_user_score_tables.sql new file mode 100644 index 00000000..5c6bb155 --- /dev/null +++ b/ddl/migrations/0175_add_user_score_tables.sql @@ -0,0 +1,38 @@ +begin; + +CREATE INDEX IF NOT EXISTS ix_plays_user_hour ON plays (user_id, date_trunc('hour', created_at)) WHERE user_id IS NOT NULL; +COMMENT ON INDEX ix_plays_user_hour IS 'Helps compute distinct hourly plays by user id'; + + +CREATE TABLE IF NOT EXISTS user_distinct_play_hours ( + user_id integer PRIMARY KEY, + hours_with_play integer NOT NULL DEFAULT 0, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +COMMENT ON TABLE user_distinct_play_hours IS 'Tracks the number of distinct hours in which a user has listened to a track'; + +CREATE TABLE IF NOT EXISTS user_distinct_play_tracks ( + user_id integer PRIMARY KEY, + track_count integer NOT NULL DEFAULT 0, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +COMMENT ON TABLE user_distinct_play_tracks IS 'Tracks the number of distinct tracks a user has listened to'; + + +CREATE TABLE IF NOT EXISTS user_score_features ( + user_id integer PRIMARY KEY, + challenge_count integer default 0, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +COMMENT ON TABLE user_score_features IS 'Tracks some features used in user score calculation'; +COMMENT ON COLUMN user_score_features.challenge_count IS 'Tracks the number of fast challenges auser has completed'; + +CREATE INDEX IF NOT EXISTS ix_au_user_follows + ON aggregate_user (user_id) + INCLUDE (follower_count, following_count); +COMMENT ON INDEX ix_au_user_follows IS 'Fast lookup for fields use in karma calculation'; + +COMMIT; \ No newline at end of file diff --git a/indexer/aggregates_calculator.go b/indexer/aggregates_calculator.go new file mode 100644 index 00000000..a0044824 --- /dev/null +++ b/indexer/aggregates_calculator.go @@ -0,0 +1,62 @@ +package indexer + +import ( + "context" + + dbv1 "api.audius.co/api/dbv1" + "api.audius.co/config" + "api.audius.co/database" + "api.audius.co/jobs" + "api.audius.co/logging" + "github.com/jackc/pgx/v5/pgxpool" + "go.uber.org/zap" +) + +type AggregatesCalculator struct { + logger *zap.Logger + readPool *dbv1.DBPools + writePool database.DbPool + updateAggregatesJob *jobs.UpdateAggregatesJob +} + +func NewAggregatesCalculator(config config.Config) *AggregatesCalculator { + logger := logging.NewZapLogger(config).Named("AggregatesCalculator") + readPool, err := dbv1.NewDBPools([]string{config.ReadDbUrl}, logger, config.Env, config.ZapLevel) + if err != nil { + panic(err) + } + writePool, err := pgxpool.New(context.Background(), config.WriteDbUrl) + if err != nil { + panic(err) + } + + return &AggregatesCalculator{ + logger: logger, + readPool: readPool, + writePool: writePool, + updateAggregatesJob: jobs.NewUpdateAggregatesJob(jobs.UpdateAggregatesJobConfig{ + WritePool: writePool, + ReadPool: readPool, + Logger: logger, + }), + } +} + +func (a *AggregatesCalculator) Start(ctx context.Context) error { + a.logger.Info("Starting aggregates calculator") + // This job runs in a continous loop until the context is cancelled. + for { + select { + case <-ctx.Done(): + a.logger.Info("Shutting down aggregates calculator") + return ctx.Err() + default: + a.updateAggregatesJob.Run(ctx) + } + } +} + +func (a *AggregatesCalculator) Close() { + a.readPool.Close() + a.writePool.Close() +} diff --git a/indexer/indexer.go b/indexer/indexer.go index bc622739..9cd504ba 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -14,13 +14,15 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) type CoreIndexer struct { - pool dbv1.DbPool - Config config.Config - logger *zap.Logger - closeCh chan struct{} + aggregatesCalculator *AggregatesCalculator + pool dbv1.DbPool + Config config.Config + logger *zap.Logger + closeCh chan struct{} } const ( @@ -39,9 +41,12 @@ func NewIndexer(config config.Config) *CoreIndexer { panic(fmt.Errorf("error connecting to database: %w", err)) } + aggregatesCalculator := NewAggregatesCalculator(config) + ci := &CoreIndexer{ - pool: pool, - Config: config, + aggregatesCalculator: aggregatesCalculator, + pool: pool, + Config: config, logger: logging.NewZapLogger(config). Named("CoreIndexer"), } @@ -50,6 +55,17 @@ func NewIndexer(config config.Config) *CoreIndexer { } func (ci *CoreIndexer) Start(ctx context.Context) error { + eg := errgroup.Group{} + eg.Go(func() error { + return ci.aggregatesCalculator.Start(ctx) + }) + eg.Go(func() error { + return ci.run(ctx) + }) + return eg.Wait() +} + +func (ci *CoreIndexer) run(ctx context.Context) error { sdk := sdk.NewAudiusdSDK(ci.Config.AudiusdURL) go logging.SyncOnTicks(ctx, ci.logger, time.Second*10) @@ -157,6 +173,7 @@ func (ci *CoreIndexer) handleManageEntity(dbTx dbv1.DBTX, logger *zap.Logger, em } func (ci *CoreIndexer) Close() { + ci.aggregatesCalculator.Close() ci.pool.Close() ci.logger.Sync() } diff --git a/jobs/update_aggregates.go b/jobs/update_aggregates.go new file mode 100644 index 00000000..5f09f599 --- /dev/null +++ b/jobs/update_aggregates.go @@ -0,0 +1,243 @@ +package jobs + +import ( + "context" + "strings" + "time" + + dbv1 "api.audius.co/api/dbv1" + "api.audius.co/database" + "github.com/jackc/pgx/v5" + "go.uber.org/zap" +) + +type UpdateAggregatesJob struct { + writePool database.DbPool + readPool *dbv1.DBPools + logger *zap.Logger +} + +const ( + UpdateUserScoresBatchSize = 5000 +) + +type UpdateAggregatesJobConfig struct { + WritePool database.DbPool + ReadPool *dbv1.DBPools + Logger *zap.Logger +} + +func NewUpdateAggregatesJob(config UpdateAggregatesJobConfig) *UpdateAggregatesJob { + return &UpdateAggregatesJob{ + writePool: config.WritePool, + readPool: config.ReadPool, + logger: config.Logger, + } +} + +func (j *UpdateAggregatesJob) Run(ctx context.Context) error { + defer func() { + if r := recover(); r != nil { + j.logger.Error("Job run panicked", zap.Any("panic", r)) + } + }() + + if err := j.updateScores(ctx); err != nil { + j.logger.Error("Job run failed", zap.Error(err)) + return err + } + return nil +} + +func (j *UpdateAggregatesJob) updateScores(ctx context.Context) error { + startTime := time.Now() + j.logger.Info("Starting user score update job") + + type UserScoreRecord struct { + UserID int32 + CreatedAt time.Time + Score int64 + } + + var lastUserID *int32 + var lastCreatedAt *time.Time + processedCount := 0 + scoreUpdatedCount := int64(0) + + for { + filters := []string{ + "u.is_current = TRUE", + "u.handle_lc IS NOT NULL", + } + if lastUserID != nil && lastCreatedAt != nil { + filters = append(filters, `((u.created_at, u.user_id) < (@cursorTime::timestamptz, @cursorUserId::int))`) + j.logger.Debug("Processing batch", zap.String("lastCreatedAt", lastCreatedAt.Format(time.RFC3339)), zap.Int32("lastUserID", *lastUserID)) + } else { + j.logger.Debug("Processing first batch") + } + + query := ` + WITH batch AS ( + SELECT u.user_id, u.created_at + FROM users u + WHERE ` + strings.Join(filters, " AND ") + ` + ORDER BY u.created_at DESC, u.user_id DESC + LIMIT @batchSize + ), + fast_challenge_completion AS ( + SELECT b.user_id, COUNT(*)::bigint AS challenge_count + FROM batch b + JOIN user_challenges uc + ON uc.user_id = b.user_id + AND uc.is_complete + AND uc.challenge_id NOT IN ('m','b') + AND uc.completed_at <= (b.created_at + interval '3 minutes') + GROUP BY b.user_id + ), + chat_blocks AS ( + SELECT b.user_id, COUNT(*)::bigint AS block_count + FROM batch b + JOIN chat_blocked_users c ON c.blockee_user_id = b.user_id + GROUP BY b.user_id + ), + followers_karma AS ( + SELECT b.user_id, + LEAST((SUM(fau.follower_count) / 100)::bigint, 100::bigint) AS karma_sum + FROM batch b + JOIN follows f + ON f.followee_user_id = b.user_id + AND f.is_delete = FALSE + JOIN aggregate_user fau + ON fau.user_id = f.follower_user_id + AND fau.following_count < 10000 + GROUP BY b.user_id + ), + features AS ( + SELECT + u.user_id, + b.created_at, + COALESCE(( + SELECT udph.hours_with_play + FROM user_distinct_play_hours udph + WHERE udph.user_id = b.user_id + ), 0) AS play_count, + COALESCE(( + SELECT t.track_count + FROM user_distinct_play_tracks t + WHERE t.user_id = b.user_id + ),0)::bigint AS distinct_tracks_played, + COALESCE(usf.challenge_count, 0)::bigint AS challenge_count, + COALESCE(au.following_count, 0)::bigint AS following_count, + COALESCE(au.follower_count, 0)::bigint AS follower_count, + COALESCE(cb.block_count, 0)::bigint AS chat_block_count, + ( ((u.handle_lc ILIKE '%audius%') OR (lower(u.name) ILIKE '%audius%')) + AND u.is_verified = FALSE ) AS is_audius_impersonator, + ( u.is_verified = FALSE + AND (u.handle_lc ILIKE '%airdrop%' OR lower(u.name) LIKE '%airdrop%') ) + AS has_badwords, + CASE + WHEN COALESCE(au.follower_count, 0) > 1000 THEN 100 + WHEN COALESCE(au.follower_count, 0) = 0 THEN 0 + ELSE COALESCE(k.karma_sum, 0) + END::bigint AS karma + FROM batch b + JOIN users u ON u.user_id = b.user_id + LEFT JOIN user_score_features usf ON usf.user_id = b.user_id + LEFT JOIN chat_blocks cb ON cb.user_id = b.user_id + LEFT JOIN aggregate_user au ON au.user_id = b.user_id + LEFT JOIN followers_karma k ON k.user_id = b.user_id + ) + SELECT + f.user_id, + f.created_at, + compute_user_score( + f.play_count, + f.follower_count, + f.challenge_count, + f.chat_block_count, + f.following_count, + f.is_audius_impersonator, + f.has_badwords, + f.distinct_tracks_played, + f.karma + ) AS score + FROM features f + ORDER BY f.created_at DESC, f.user_id DESC + ` + + readQueryStart := time.Now() + res, err := j.readPool.Query(ctx, query, pgx.NamedArgs{ + "batchSize": UpdateUserScoresBatchSize, + "cursorTime": lastCreatedAt, + "cursorUserId": lastUserID, + }) + if err != nil { + j.logger.Error("Failed to execute batch read query", zap.Error(err)) + return err + } + readQueryDuration := time.Since(readQueryStart) + + userIDs := make([]int32, 0) + scores := make([]int64, 0) + seenUserIDs := make(map[int32]bool) + createdAt := time.Time{} + score := int64(0) + userID := int32(0) + fetchedRows := 0 + pgx.ForEachRow(res, []any{&userID, &createdAt, &score}, func() error { + fetchedRows++ + // We get dupes on some user_ids due to multiple is_current user rows. + // Upsert query will fail if we don't remove them. + if !seenUserIDs[userID] { + userIDs = append(userIDs, userID) + scores = append(scores, score) + seenUserIDs[userID] = true + } + return nil + }) + res.Close() + + lastCreatedAt = &createdAt + lastUserID = &userID + + writeQueryStart := time.Now() + tag, err := j.writePool.Exec(ctx, ` + WITH s AS ( + SELECT * FROM unnest($1::bigint[], $2::double precision[]) AS t(user_id, score) + ) + INSERT INTO aggregate_user (user_id, score) + SELECT s.user_id, s.score + FROM s + ON CONFLICT (user_id) + DO UPDATE SET + score = EXCLUDED.score + WHERE aggregate_user.score IS DISTINCT FROM EXCLUDED.score + `, userIDs, scores) + writeQueryDuration := time.Since(writeQueryStart) + if err != nil { + j.logger.Error("Failed to execute update query", zap.Error(err)) + return err + } + + processedCount += fetchedRows + scoreUpdatedCount += tag.RowsAffected() + j.logger.Debug("Processed batch", + zap.Int("batch_size", fetchedRows), + zap.Int32("last_user_id", userID), + zap.String("last_created_at", lastCreatedAt.Format(time.RFC3339)), + zap.Int("total_processed", processedCount), + zap.Int64("total_scores_changes", tag.RowsAffected()), + zap.Duration("read_query_duration", readQueryDuration), + zap.Duration("write_query_duration", writeQueryDuration)) + + if fetchedRows < UpdateUserScoresBatchSize { + j.logger.Info("Finished processing all users", + zap.Int("total_processed", processedCount), + zap.Int64("total_score_changes", scoreUpdatedCount), + zap.Duration("duration", time.Since(startTime))) + break + } + } + + return nil +} diff --git a/sql/01_schema.sql b/sql/01_schema.sql index 8e0c69b7..b02f2067 100644 --- a/sql/01_schema.sql +++ b/sql/01_schema.sql @@ -3,8 +3,8 @@ -- --- Dumped from database version 17.6 (Debian 17.6-2.pgdg13+1) --- Dumped by pg_dump version 17.6 (Debian 17.6-2.pgdg13+1) +-- Dumped from database version 17.6 (Debian 17.6-1.pgdg13+1) +-- Dumped by pg_dump version 17.6 (Debian 17.6-1.pgdg13+1) SET statement_timeout = 0; SET lock_timeout = 0; @@ -2498,7 +2498,7 @@ begin 'challenge_reward', 'challenge_reward:' || new.user_id || ':challenge:' || new.challenge_id || ':specifier:' || new.specifier, new.user_id, - case + case when new.challenge_id = 'e' then json_build_object( 'specifier', new.specifier, @@ -2516,9 +2516,9 @@ begin ) on conflict do nothing; else - -- transactional notifications cover this + -- transactional notifications cover this if (new.challenge_id != 'b' and new.challenge_id != 's') then - select id into existing_notification + select id into existing_notification from notification where type = 'reward_in_cooldown' and @@ -2543,6 +2543,26 @@ begin end if; end if; end if; + + -- update user fast challenge count + INSERT INTO user_score_features (user_id, challenge_count, updated_at) + SELECT + NEW.user_id, + COUNT(*)::int AS challenge_count, + now() + FROM user_challenges uc + JOIN users u + ON u.user_id = uc.user_id + WHERE uc.user_id = NEW.user_id + AND uc.is_complete + AND uc.challenge_id NOT IN ('m','b') + AND uc.completed_at <= (u.created_at + interval '3 minutes') + ON CONFLICT (user_id) DO UPDATE + SET challenge_count = EXCLUDED.challenge_count, + updated_at = EXCLUDED.updated_at + WHERE user_score_features.challenge_count IS DISTINCT FROM EXCLUDED.challenge_count; + + end if; return new; @@ -2568,7 +2588,7 @@ begin insert into aggregate_plays (play_item_id, count) values (new.play_item_id, 0) on conflict do nothing; update aggregate_plays - set count = count + 1 + set count = count + 1 where play_item_id = new.play_item_id returning count into new_listen_count; @@ -2583,8 +2603,8 @@ begin and timestamp = date_trunc('month', new.created_at) and country = coalesce(new.country, ''); - select new_listen_count - into milestone + select new_listen_count + into milestone where new_listen_count in (10,25,50,100,250,500,1000,2500,5000,10000,25000,50000,100000,250000,500000,1000000); if milestone is not null then @@ -2610,6 +2630,43 @@ begin on conflict do nothing; end if; end if; + + -- update listener's aggregates if applicable + if new.user_id is not null then + -- Insert or update user_distinct_play_hours + -- Only increment if the play's hour is newer than the user's last updated hour + insert into user_distinct_play_hours (user_id, hours_with_play, updated_at) + values (new.user_id, 1, date_trunc('hour', new.created_at)) + on conflict (user_id) do update set + hours_with_play = case + when date_trunc('hour', new.created_at) > date_trunc('hour', user_distinct_play_hours.updated_at) + then user_distinct_play_hours.hours_with_play + 1 + else user_distinct_play_hours.hours_with_play + end, + updated_at = case + when date_trunc('hour', new.created_at) > date_trunc('hour', user_distinct_play_hours.updated_at) + then new.created_at + else user_distinct_play_hours.updated_at + end; + + -- update user_distinct_play_tracks + -- Only increment if this is the first time this user has played this track + insert into user_distinct_play_tracks (user_id, track_count, updated_at) + values (new.user_id, 1, new.created_at) + on conflict (user_id) do update set + track_count = case + when not exists ( + select 1 from plays p + where p.user_id = new.user_id + and p.play_item_id = new.play_item_id + and p.id != new.id + ) + then user_distinct_play_tracks.track_count + 1 + else user_distinct_play_tracks.track_count + end, + updated_at = new.created_at; + end if; + return null; exception @@ -8338,6 +8395,42 @@ CREATE TABLE public.user_delist_statuses ( ); +-- +-- Name: user_distinct_play_hours; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.user_distinct_play_hours ( + user_id integer NOT NULL, + hours_with_play integer DEFAULT 0 NOT NULL, + updated_at timestamp with time zone DEFAULT now() NOT NULL +); + + +-- +-- Name: TABLE user_distinct_play_hours; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.user_distinct_play_hours IS 'Tracks the number of distinct hours in which a user has listened to a track'; + + +-- +-- Name: user_distinct_play_tracks; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.user_distinct_play_tracks ( + user_id integer NOT NULL, + track_count integer DEFAULT 0 NOT NULL, + updated_at timestamp with time zone DEFAULT now() NOT NULL +); + + +-- +-- Name: TABLE user_distinct_play_tracks; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.user_distinct_play_tracks IS 'Tracks the number of distinct tracks a user has listened to'; + + -- -- Name: user_events; Type: TABLE; Schema: public; Owner: - -- @@ -8427,6 +8520,31 @@ CREATE TABLE public.user_pubkeys ( ); +-- +-- Name: user_score_features; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.user_score_features ( + user_id integer NOT NULL, + challenge_count integer DEFAULT 0, + updated_at timestamp with time zone DEFAULT now() NOT NULL +); + + +-- +-- Name: TABLE user_score_features; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON TABLE public.user_score_features IS 'Tracks some features used in user score calculation'; + + +-- +-- Name: COLUMN user_score_features.challenge_count; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON COLUMN public.user_score_features.challenge_count IS 'Tracks the number of fast challenges auser has completed'; + + -- -- Name: user_tips; Type: TABLE; Schema: public; Owner: - -- @@ -9686,6 +9804,22 @@ ALTER TABLE ONLY public.user_delist_statuses ADD CONSTRAINT user_delist_statuses_pkey PRIMARY KEY (created_at, user_id, delisted); +-- +-- Name: user_distinct_play_hours user_distinct_play_hours_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.user_distinct_play_hours + ADD CONSTRAINT user_distinct_play_hours_pkey PRIMARY KEY (user_id); + + +-- +-- Name: user_distinct_play_tracks user_distinct_play_tracks_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.user_distinct_play_tracks + ADD CONSTRAINT user_distinct_play_tracks_pkey PRIMARY KEY (user_id); + + -- -- Name: user_events user_events_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -9718,6 +9852,14 @@ ALTER TABLE ONLY public.user_pubkeys ADD CONSTRAINT user_pubkeys_pkey PRIMARY KEY (user_id); +-- +-- Name: user_score_features user_score_features_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.user_score_features + ADD CONSTRAINT user_score_features_pkey PRIMARY KEY (user_id); + + -- -- Name: user_tips user_tips_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -10182,6 +10324,20 @@ CREATE INDEX ix_associated_wallets_user_id ON public.associated_wallets USING bt CREATE INDEX ix_associated_wallets_wallet ON public.associated_wallets USING btree (wallet); +-- +-- Name: ix_au_user_follows; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX ix_au_user_follows ON public.aggregate_user USING btree (user_id) INCLUDE (follower_count, following_count); + + +-- +-- Name: INDEX ix_au_user_follows; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON INDEX public.ix_au_user_follows IS 'Fast lookup for fields use in karma calculation'; + + -- -- Name: ix_audio_transactions_history_slot; Type: INDEX; Schema: public; Owner: - -- @@ -10245,6 +10401,20 @@ CREATE INDEX ix_plays_slot ON public.plays USING btree (slot); CREATE INDEX ix_plays_sol_signature ON public.plays USING btree (signature); +-- +-- Name: ix_plays_user_hour; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX ix_plays_user_hour ON public.plays USING btree (user_id, date_trunc('hour'::text, created_at)) WHERE (user_id IS NOT NULL); + + +-- +-- Name: INDEX ix_plays_user_hour; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON INDEX public.ix_plays_user_hour IS 'Helps compute distinct hourly plays by user id'; + + -- -- Name: ix_plays_user_track_date; Type: INDEX; Schema: public; Owner: - --