From 18d66dddedb4c6c14975a0ab47377258fea59e2e Mon Sep 17 00:00:00 2001 From: Omar Roth Date: Sat, 1 Jun 2019 10:19:18 -0500 Subject: [PATCH] Add 'needs_update' column for scheduling feed refresh --- config/migrate-scripts/migrate-db-701b5ea.sh | 3 + config/sql/users.sql | 1 + src/invidious.cr | 60 +++++------------- src/invidious/channels.cr | 29 ++++----- src/invidious/helpers/helpers.cr | 1 - src/invidious/helpers/jobs.cr | 66 ++------------------ src/invidious/users.cr | 11 ++-- 7 files changed, 46 insertions(+), 125 deletions(-) create mode 100755 config/migrate-scripts/migrate-db-701b5ea.sh diff --git a/config/migrate-scripts/migrate-db-701b5ea.sh b/config/migrate-scripts/migrate-db-701b5ea.sh new file mode 100755 index 00000000..429531a2 --- /dev/null +++ b/config/migrate-scripts/migrate-db-701b5ea.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +psql invidious kemal -c "ALTER TABLE users ADD COLUMN feed_needs_update boolean" diff --git a/config/sql/users.sql b/config/sql/users.sql index 536508a4..0f2cdba2 100644 --- a/config/sql/users.sql +++ b/config/sql/users.sql @@ -12,6 +12,7 @@ CREATE TABLE public.users password text, token text, watched text[], + feed_needs_update boolean, CONSTRAINT users_email_key UNIQUE (email) ); diff --git a/src/invidious.cr b/src/invidious.cr index de134947..822f7b85 100644 --- a/src/invidious.cr +++ b/src/invidious.cr @@ -1710,18 +1710,12 @@ post "/subscription_ajax" do |env| when .starts_with? "action_create" if !user.subscriptions.includes? channel_id get_channel(channel_id, PG_DB, false, false) - PG_DB.exec("UPDATE users SET subscriptions = array_append(subscriptions, $1) WHERE email = $2", channel_id, email) + PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = array_append(subscriptions, $1) WHERE email = $2", channel_id, email) end when .starts_with? "action_remove" - PG_DB.exec("UPDATE users SET subscriptions = array_remove(subscriptions, $1) WHERE email = $2", channel_id, email) + PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = array_remove(subscriptions, $1) WHERE email = $2", channel_id, email) end - payload = { - "email" => user.email, - "action" => "refresh", - }.to_json - PG_DB.exec("NOTIFY feeds, E'#{payload}'") - if redirect env.redirect referer else @@ -1884,7 +1878,7 @@ post "/data_control" do |env| user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, false, false) - PG_DB.exec("UPDATE users SET subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) + PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) end if body["watch_history"]? @@ -1906,7 +1900,7 @@ post "/data_control" do |env| user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, false, false) - PG_DB.exec("UPDATE users SET subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) + PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) when "import_freetube" user.subscriptions += body.scan(/"channelId":"(?[a-zA-Z0-9_-]{24})"/).map do |md| md["channel_id"] @@ -1915,7 +1909,7 @@ post "/data_control" do |env| user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, false, false) - PG_DB.exec("UPDATE users SET subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) + PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) when "import_newpipe_subscriptions" body = JSON.parse(body) user.subscriptions += body["subscriptions"].as_a.compact_map do |channel| @@ -1939,7 +1933,7 @@ post "/data_control" do |env| user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, false, false) - PG_DB.exec("UPDATE users SET subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) + PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) when "import_newpipe" Zip::Reader.open(IO::Memory.new(body)) do |file| file.each_entry do |entry| @@ -1958,7 +1952,7 @@ post "/data_control" do |env| user.subscriptions = get_batch_channels(user.subscriptions, PG_DB, false, false) - PG_DB.exec("UPDATE users SET subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) + PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = $1 WHERE email = $2", user.subscriptions, user.email) db.close tempfile.delete @@ -1967,12 +1961,6 @@ post "/data_control" do |env| end end end - - payload = { - "email" => user.email, - "action" => "refresh", - }.to_json - PG_DB.exec("NOTIFY feeds, E'#{payload}'") end env.redirect referer @@ -2874,7 +2862,7 @@ post "/feed/webhook/:token" do |env| views: video.views, ) - users = PG_DB.query_all("UPDATE users SET notifications = notifications || $1 \ + emails = PG_DB.query_all("UPDATE users SET notifications = notifications || $1 \ WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications) RETURNING email", video.id, video.published, video.ucid, as: String) @@ -2886,13 +2874,14 @@ post "/feed/webhook/:token" do |env| updated = $4, ucid = $5, author = $6, length_seconds = $7, \ live_now = $8, premiere_timestamp = $9, views = $10", video_array) - users.each do |user| - payload = { - "email" => user, - "action" => "refresh", - }.to_json - PG_DB.exec("NOTIFY feeds, E'#{payload}'") + # Update all users affected by insert + if emails.empty? + values = "'{}'" + else + values = "VALUES #{emails.map { |id| %(('#{id}')) }.join(",")}" end + + PG_DB.exec("UPDATE users SET feed_needs_update = true WHERE email = ANY($1)", emails) end end @@ -4490,7 +4479,6 @@ post "/api/v1/auth/preferences" do |env| PG_DB.exec("UPDATE users SET preferences = $1 WHERE email = $2", preferences.to_json, user.email) env.response.status_code = 204 - "" end get "/api/v1/auth/subscriptions" do |env| @@ -4525,13 +4513,7 @@ post "/api/v1/auth/subscriptions/:ucid" do |env| if !user.subscriptions.includes? ucid get_channel(ucid, PG_DB, false, false) - PG_DB.exec("UPDATE users SET subscriptions = array_append(subscriptions,$1) WHERE email = $2", ucid, user.email) - - payload = { - "email" => user.email, - "action" => "refresh", - }.to_json - PG_DB.exec("NOTIFY feeds, E'#{payload}'") + PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = array_append(subscriptions,$1) WHERE email = $2", ucid, user.email) end # For Google accounts, access tokens don't have enough information to @@ -4539,7 +4521,6 @@ post "/api/v1/auth/subscriptions/:ucid" do |env| # YouTube. env.response.status_code = 204 - "" end delete "/api/v1/auth/subscriptions/:ucid" do |env| @@ -4548,15 +4529,9 @@ delete "/api/v1/auth/subscriptions/:ucid" do |env| ucid = env.params.url["ucid"] - PG_DB.exec("UPDATE users SET subscriptions = array_remove(subscriptions, $1) WHERE email = $2", ucid, user.email) - payload = { - "email" => user.email, - "action" => "refresh", - }.to_json - PG_DB.exec("NOTIFY feeds, E'#{payload}'") + PG_DB.exec("UPDATE users SET feed_needs_update = true, subscriptions = array_remove(subscriptions, $1) WHERE email = $2", ucid, user.email) env.response.status_code = 204 - "" end get "/api/v1/auth/tokens" do |env| @@ -4663,7 +4638,6 @@ post "/api/v1/auth/tokens/unregister" do |env| end env.response.status_code = 204 - "" end get "/api/manifest/dash/id/videoplayback" do |env| diff --git a/src/invidious/channels.cr b/src/invidious/channels.cr index d0eb7dd3..d33cd9c3 100644 --- a/src/invidious/channels.cr +++ b/src/invidious/channels.cr @@ -184,7 +184,7 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil) views: views, ) - users = db.query_all("UPDATE users SET notifications = notifications || $1 \ + emails = db.query_all("UPDATE users SET notifications = notifications || $1 \ WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications) RETURNING email", video.id, video.published, ucid, as: String) @@ -198,13 +198,14 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil) updated = $4, ucid = $5, author = $6, length_seconds = $7, \ live_now = $8, views = $10", video_array) - users.each do |user| - payload = { - "email" => user, - "action" => "refresh", - }.to_json - PG_DB.exec("NOTIFY feeds, E'#{payload}'") + # Update all users affected by insert + if emails.empty? + values = "'{}'" + else + values = "VALUES #{emails.map { |id| %(('#{id}')) }.join(",")}" end + + db.exec("UPDATE users SET feed_needs_update = true WHERE email = ANY($1)", emails) end if pull_all_videos @@ -252,7 +253,7 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil) # We are notified of Red videos elsewhere (PubSub), which includes a correct published date, # so since they don't provide a published date here we can safely ignore them. if Time.now - video.published > 1.minute - users = db.query_all("UPDATE users SET notifications = notifications || $1 \ + emails = db.query_all("UPDATE users SET notifications = notifications || $1 \ WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications) RETURNING email", video.id, video.published, video.ucid, as: String) @@ -266,13 +267,13 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil) live_now = $8, views = $10", video_array) # Update all users affected by insert - users.each do |user| - payload = { - "email" => user, - "action" => "refresh", - }.to_json - PG_DB.exec("NOTIFY feeds, E'#{payload}'") + if emails.empty? + values = "'{}'" + else + values = "VALUES #{emails.map { |id| %(('#{id}')) }.join(",")}" end + + db.exec("UPDATE users SET feed_needs_update = true WHERE email = ANY($1)", emails) end end diff --git a/src/invidious/helpers/helpers.cr b/src/invidious/helpers/helpers.cr index 3155cb67..476038c7 100644 --- a/src/invidious/helpers/helpers.cr +++ b/src/invidious/helpers/helpers.cr @@ -105,7 +105,6 @@ struct Config hmac_key: String?, # HMAC signing key for CSRF tokens and verifying pubsub subscriptions domain: String?, # Domain to be used for links to resources on the site where an absolute URL is required use_pubsub_feeds: {type: Bool | Int32, default: false}, # Subscribe to channels using PubSubHubbub (requires domain, hmac_key) - use_feed_events: {type: Bool | Int32, default: false}, # Update feeds on receiving notifications default_home: {type: String, default: "Top"}, feed_menu: {type: Array(String), default: ["Popular", "Top", "Trending", "Subscriptions"]}, top_enabled: {type: Bool, default: true}, diff --git a/src/invidious/helpers/jobs.cr b/src/invidious/helpers/jobs.cr index b9f9a86f..1dd81cf5 100644 --- a/src/invidious/helpers/jobs.cr +++ b/src/invidious/helpers/jobs.cr @@ -43,66 +43,6 @@ def refresh_channels(db, logger, config) end def refresh_feeds(db, logger, config) - # Spawn thread to handle feed events - if config.use_feed_events - case config.use_feed_events - when Bool - max_feed_event_threads = config.use_feed_events.as(Bool).to_unsafe - when Int32 - max_feed_event_threads = config.use_feed_events.as(Int32) - end - max_feed_event_channel = Channel(Int32).new - - spawn do - queue = Deque(String).new(30) - PG.connect_listen(PG_URL, "feeds") do |event| - if !queue.includes? event.payload - queue << event.payload - end - end - - max_threads = max_feed_event_channel.receive - active_threads = 0 - active_channel = Channel(Bool).new - - loop do - until queue.empty? - event = queue.shift - - if active_threads >= max_threads - if active_channel.receive - active_threads -= 1 - end - end - - active_threads += 1 - - spawn do - begin - feed = JSON.parse(event) - email = feed["email"].as_s - action = feed["action"].as_s - - view_name = "subscriptions_#{sha256(email)}" - - case action - when "refresh" - db.exec("REFRESH MATERIALIZED VIEW #{view_name}") - end - rescue ex - end - - active_channel.send(true) - end - end - - sleep 5.seconds - end - end - - max_feed_event_channel.send(max_feed_event_threads.as(Int32)) - end - max_channel = Channel(Int32).new spawn do max_threads = max_channel.receive @@ -110,7 +50,7 @@ def refresh_feeds(db, logger, config) active_channel = Channel(Bool).new loop do - db.query("SELECT email FROM users") do |rs| + db.query("SELECT email FROM users WHERE feed_needs_update = true OR feed_needs_update IS NULL") do |rs| rs.each do email = rs.read(String) view_name = "subscriptions_#{sha256(email)}" @@ -135,6 +75,7 @@ def refresh_feeds(db, logger, config) end db.exec("REFRESH MATERIALIZED VIEW #{view_name}") + db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email) rescue ex # Rename old views begin @@ -152,6 +93,7 @@ def refresh_feeds(db, logger, config) SELECT * FROM channel_videos WHERE \ ucid = ANY ((SELECT subscriptions FROM users WHERE email = E'#{email.gsub("'", "\\'")}')::text[]) \ ORDER BY published DESC;") + db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email) end rescue ex logger.write("REFRESH #{email} : #{ex.message}\n") @@ -164,7 +106,7 @@ def refresh_feeds(db, logger, config) end end - sleep 1.minute + sleep 5.seconds end end diff --git a/src/invidious/users.cr b/src/invidious/users.cr index 298d6b0d..ceaac9f1 100644 --- a/src/invidious/users.cr +++ b/src/invidious/users.cr @@ -20,9 +20,10 @@ struct User type: Preferences, converter: PreferencesConverter, }, - password: String?, - token: String, - watched: Array(String), + password: String?, + token: String, + watched: Array(String), + feed_needs_update: Bool?, }) end @@ -205,7 +206,7 @@ def fetch_user(sid, headers, db) token = Base64.urlsafe_encode(Random::Secure.random_bytes(32)) - user = User.new(Time.now, [] of String, channels, email, CONFIG.default_user_preferences, nil, token, [] of String) + user = User.new(Time.now, [] of String, channels, email, CONFIG.default_user_preferences, nil, token, [] of String, true) return user, sid end @@ -213,7 +214,7 @@ def create_user(sid, email, password) password = Crypto::Bcrypt::Password.create(password, cost: 10) token = Base64.urlsafe_encode(Random::Secure.random_bytes(32)) - user = User.new(Time.now, [] of String, [] of String, email, CONFIG.default_user_preferences, password.to_s, token, [] of String) + user = User.new(Time.now, [] of String, [] of String, email, CONFIG.default_user_preferences, password.to_s, token, [] of String, true) return user, sid end