Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions api/dbv1/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 41 additions & 5 deletions ddl/functions/handle_play.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ begin
insert into aggregate_plays (play_item_id, count) values (new.play_item_id, 0) on conflict do nothing;

update aggregate_plays
set count = count + 1
set count = count + 1
where play_item_id = new.play_item_id
returning count into new_listen_count;

Expand All @@ -24,8 +24,8 @@ begin
and timestamp = date_trunc('month', new.created_at)
and country = coalesce(new.country, '');

select new_listen_count
into milestone
select new_listen_count
into milestone
where new_listen_count in (10,25,50,100,250,500,1000,2500,5000,10000,25000,50000,100000,250000,500000,1000000);

if milestone is not null then
Expand All @@ -51,6 +51,43 @@ begin
on conflict do nothing;
end if;
end if;

-- update listener's aggregates if applicable
if new.user_id is not null then
-- Insert or update user_distinct_play_hours
-- Only increment if the play's hour is newer than the user's last updated hour
insert into user_distinct_play_hours (user_id, hours_with_play, updated_at)
values (new.user_id, 1, date_trunc('hour', new.created_at))
on conflict (user_id) do update set
hours_with_play = case
when date_trunc('hour', new.created_at) > date_trunc('hour', user_distinct_play_hours.updated_at)
then user_distinct_play_hours.hours_with_play + 1
else user_distinct_play_hours.hours_with_play
end,
updated_at = case
when date_trunc('hour', new.created_at) > date_trunc('hour', user_distinct_play_hours.updated_at)
then new.created_at
else user_distinct_play_hours.updated_at
end;

-- update user_distinct_play_tracks
-- Only increment if this is the first time this user has played this track
insert into user_distinct_play_tracks (user_id, track_count, updated_at)
values (new.user_id, 1, new.created_at)
on conflict (user_id) do update set
track_count = case
when not exists (
select 1 from plays p
where p.user_id = new.user_id
and p.play_item_id = new.play_item_id
and p.id != new.id
)
then user_distinct_play_tracks.track_count + 1
else user_distinct_play_tracks.track_count
end,
updated_at = new.created_at;
end if;

return null;

exception
Expand All @@ -67,5 +104,4 @@ do $$ begin
for each row execute procedure handle_play();
exception
when others then null;
end $$;

end $$;
26 changes: 23 additions & 3 deletions ddl/functions/handle_user_challenges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ begin
'challenge_reward',
'challenge_reward:' || new.user_id || ':challenge:' || new.challenge_id || ':specifier:' || new.specifier,
new.user_id,
case
case
when new.challenge_id = 'e' then
json_build_object(
'specifier', new.specifier,
Expand All @@ -62,9 +62,9 @@ begin
)
on conflict do nothing;
else
-- transactional notifications cover this
-- transactional notifications cover this
if (new.challenge_id != 'b' and new.challenge_id != 's') then
select id into existing_notification
select id into existing_notification
from notification
where
type = 'reward_in_cooldown' and
Expand All @@ -89,6 +89,26 @@ begin
end if;
end if;
end if;

-- update user fast challenge count
INSERT INTO user_score_features (user_id, challenge_count, updated_at)
SELECT
NEW.user_id,
COUNT(*)::int AS challenge_count,
now()
FROM user_challenges uc
JOIN users u
ON u.user_id = uc.user_id
WHERE uc.user_id = NEW.user_id
AND uc.is_complete
AND uc.challenge_id NOT IN ('m','b')
AND uc.completed_at <= (u.created_at + interval '3 minutes')
ON CONFLICT (user_id) DO UPDATE
SET challenge_count = EXCLUDED.challenge_count,
updated_at = EXCLUDED.updated_at
WHERE user_score_features.challenge_count IS DISTINCT FROM EXCLUDED.challenge_count;


end if;

return new;
Expand Down
38 changes: 38 additions & 0 deletions ddl/migrations/0175_add_user_score_tables.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
begin;

CREATE INDEX IF NOT EXISTS ix_plays_user_hour ON plays (user_id, date_trunc('hour', created_at)) WHERE user_id IS NOT NULL;
COMMENT ON INDEX ix_plays_user_hour IS 'Helps compute distinct hourly plays by user id';


CREATE TABLE IF NOT EXISTS user_distinct_play_hours (
user_id integer PRIMARY KEY,
hours_with_play integer NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

COMMENT ON TABLE user_distinct_play_hours IS 'Tracks the number of distinct hours in which a user has listened to a track';

CREATE TABLE IF NOT EXISTS user_distinct_play_tracks (
user_id integer PRIMARY KEY,
track_count integer NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

COMMENT ON TABLE user_distinct_play_tracks IS 'Tracks the number of distinct tracks a user has listened to';


CREATE TABLE IF NOT EXISTS user_score_features (
user_id integer PRIMARY KEY,
challenge_count integer default 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

COMMENT ON TABLE user_score_features IS 'Tracks some features used in user score calculation';
COMMENT ON COLUMN user_score_features.challenge_count IS 'Tracks the number of fast challenges auser has completed';

CREATE INDEX IF NOT EXISTS ix_au_user_follows
ON aggregate_user (user_id)
INCLUDE (follower_count, following_count);
COMMENT ON INDEX ix_au_user_follows IS 'Fast lookup for fields use in karma calculation';

COMMIT;
62 changes: 62 additions & 0 deletions indexer/aggregates_calculator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package indexer

import (
"context"

dbv1 "api.audius.co/api/dbv1"
"api.audius.co/config"
"api.audius.co/database"
"api.audius.co/jobs"
"api.audius.co/logging"
"github.com/jackc/pgx/v5/pgxpool"
"go.uber.org/zap"
)

type AggregatesCalculator struct {
logger *zap.Logger
readPool *dbv1.DBPools
writePool database.DbPool
updateAggregatesJob *jobs.UpdateAggregatesJob
}

func NewAggregatesCalculator(config config.Config) *AggregatesCalculator {
logger := logging.NewZapLogger(config).Named("AggregatesCalculator")
readPool, err := dbv1.NewDBPools([]string{config.ReadDbUrl}, logger, config.Env, config.ZapLevel)
if err != nil {
panic(err)
}
writePool, err := pgxpool.New(context.Background(), config.WriteDbUrl)
if err != nil {
panic(err)
}

return &AggregatesCalculator{
logger: logger,
readPool: readPool,
writePool: writePool,
updateAggregatesJob: jobs.NewUpdateAggregatesJob(jobs.UpdateAggregatesJobConfig{
WritePool: writePool,
ReadPool: readPool,
Logger: logger,
}),
}
}

func (a *AggregatesCalculator) Start(ctx context.Context) error {
a.logger.Info("Starting aggregates calculator")
// This job runs in a continous loop until the context is cancelled.
for {
select {
case <-ctx.Done():
a.logger.Info("Shutting down aggregates calculator")
return ctx.Err()
default:
a.updateAggregatesJob.Run(ctx)
}
}
}

func (a *AggregatesCalculator) Close() {
a.readPool.Close()
a.writePool.Close()
}
29 changes: 23 additions & 6 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

type CoreIndexer struct {
pool dbv1.DbPool
Config config.Config
logger *zap.Logger
closeCh chan struct{}
aggregatesCalculator *AggregatesCalculator
pool dbv1.DbPool
Config config.Config
logger *zap.Logger
closeCh chan struct{}
}

const (
Expand All @@ -39,9 +41,12 @@ func NewIndexer(config config.Config) *CoreIndexer {
panic(fmt.Errorf("error connecting to database: %w", err))
}

aggregatesCalculator := NewAggregatesCalculator(config)

ci := &CoreIndexer{
pool: pool,
Config: config,
aggregatesCalculator: aggregatesCalculator,
pool: pool,
Config: config,
logger: logging.NewZapLogger(config).
Named("CoreIndexer"),
}
Expand All @@ -50,6 +55,17 @@ func NewIndexer(config config.Config) *CoreIndexer {
}

func (ci *CoreIndexer) Start(ctx context.Context) error {
eg := errgroup.Group{}
eg.Go(func() error {
return ci.aggregatesCalculator.Start(ctx)
})
eg.Go(func() error {
return ci.run(ctx)
})
return eg.Wait()
}

func (ci *CoreIndexer) run(ctx context.Context) error {
sdk := sdk.NewAudiusdSDK(ci.Config.AudiusdURL)
go logging.SyncOnTicks(ctx, ci.logger, time.Second*10)

Expand Down Expand Up @@ -157,6 +173,7 @@ func (ci *CoreIndexer) handleManageEntity(dbTx dbv1.DBTX, logger *zap.Logger, em
}

func (ci *CoreIndexer) Close() {
ci.aggregatesCalculator.Close()
ci.pool.Close()
ci.logger.Sync()
}
Loading
Loading