From 81584779cb1795d2fe7827e054bbe245712528a2 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Fri, 5 May 2017 02:23:01 +0200 Subject: [PATCH] More robust PuSH subscription refreshes (#2799) * Fix #2473 - Use sidekiq scheduler to refresh PuSH subscriptions instead of cron Fix an issue where / in domain would raise exception in TagManager#normalize_domain PuSH subscriptions refresh done in a round-robin way to avoid hammering a single server's hub in sequence. Correct handling of failures/retries through Sidekiq (see also #2613). Optimize Account#with_followers scope. Also, since subscriptions are now delegated to Sidekiq jobs, an uncaught exception will not stop the entire refreshing operation halfway through Fix #2702 - Correct user agent header on outgoing http requests * Add test for SubscribeService * Extract #expiring_accounts into method * Make mastodon:push:refresh no-op * Queues are now defined in sidekiq.yml * Queues are now in sidekiq.yml --- Gemfile | 3 +- Gemfile.lock | 14 ++++++- Procfile | 2 +- Procfile.dev | 1 + app/helpers/http_helper.rb | 10 +++-- app/lib/tag_manager.rb | 4 +- app/models/account.rb | 5 ++- app/services/follow_service.rb | 2 +- app/services/process_interaction_service.rb | 2 +- app/services/subscribe_service.rb | 28 +++++++++++--- app/services/update_remote_profile_service.rb | 2 +- app/workers/pubsubhubbub/delivery_worker.rb | 12 +++++- app/workers/pubsubhubbub/subscribe_worker.rb | 13 +++++++ .../scheduler/subscriptions_scheduler.rb | 20 ++++++++++ config/environments/development.rb | 3 -- config/sidekiq.yml | 9 +++++ lib/tasks/mastodon.rake | 6 +-- spec/services/follow_service_spec.rb | 7 +++- spec/services/subscribe_service_spec.rb | 38 +++++++++++++++++++ 19 files changed, 152 insertions(+), 29 deletions(-) create mode 100644 app/workers/pubsubhubbub/subscribe_worker.rb create mode 100644 app/workers/scheduler/subscriptions_scheduler.rb create mode 100644 spec/services/subscribe_service_spec.rb diff --git a/Gemfile b/Gemfile index 25979d0a4..54fb972ab 100644 --- a/Gemfile +++ b/Gemfile @@ -35,7 +35,7 @@ gem 'link_header' gem 'local_time' gem 'nokogiri' gem 'oj' -gem 'ostatus2', '~> 1.1' +gem 'ostatus2', '~> 2.0' gem 'ox' gem 'rabl' gem 'rack-attack' @@ -48,6 +48,7 @@ gem 'rqrcode' gem 'ruby-oembed', require: 'oembed' gem 'sanitize' gem 'sidekiq' +gem 'sidekiq-scheduler' gem 'sidekiq-unique-jobs' gem 'simple-navigation' gem 'simple_form' diff --git a/Gemfile.lock b/Gemfile.lock index aedf83433..10b47082c 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -143,6 +143,8 @@ GEM thread_safe encryptor (3.0.0) erubis (2.7.0) + et-orbi (1.0.3) + tzinfo execjs (2.7.0) fabrication (2.16.1) faker (1.7.3) @@ -251,7 +253,7 @@ GEM oj (3.0.5) openssl (2.0.3) orm_adapter (0.5.0) - ostatus2 (1.1.0) + ostatus2 (2.0.0) addressable (~> 2.4) http (~> 2.0) nokogiri (~> 1.6) @@ -386,6 +388,8 @@ GEM unicode-display_width (~> 1.0, >= 1.0.1) ruby-oembed (0.12.0) ruby-progressbar (1.8.1) + rufus-scheduler (3.4.0) + et-orbi (~> 1.0) safe_yaml (1.0.4) sanitize (4.4.0) crass (~> 1.0.2) @@ -396,6 +400,11 @@ GEM connection_pool (~> 2.2, >= 2.2.0) rack-protection (>= 1.5.0) redis (~> 3.3, >= 3.3.3) + sidekiq-scheduler (2.1.4) + redis (~> 3) + rufus-scheduler (~> 3.2) + sidekiq (>= 3) + tilt (>= 1.4.0) sidekiq-unique-jobs (5.0.7) sidekiq (>= 4.0, <= 6.0) thor (~> 0) @@ -499,7 +508,7 @@ DEPENDENCIES microformats2 nokogiri oj - ostatus2 (~> 1.1) + ostatus2 (~> 2.0) ox paperclip (~> 5.1) paperclip-av-transcoder @@ -527,6 +536,7 @@ DEPENDENCIES ruby-oembed sanitize sidekiq + sidekiq-scheduler sidekiq-unique-jobs simple-navigation simple_form diff --git a/Procfile b/Procfile index 646e26059..b18e4b6be 100644 --- a/Procfile +++ b/Procfile @@ -1,2 +1,2 @@ web: bundle exec puma -C config/puma.rb -worker: bundle exec sidekiq -q default -q push -q pull -q mailers +worker: bundle exec sidekiq diff --git a/Procfile.dev b/Procfile.dev index 87da7a7f4..80666280d 100644 --- a/Procfile.dev +++ b/Procfile.dev @@ -1,3 +1,4 @@ web: PORT=3000 bundle exec puma -C config/puma.rb +sidekiq: bundle exec sidekiq stream: PORT=4000 yarn run start webpack: ./bin/webpack-dev-server --host 0.0.0.0 diff --git a/app/helpers/http_helper.rb b/app/helpers/http_helper.rb index 1697de746..e39a52da0 100644 --- a/app/helpers/http_helper.rb +++ b/app/helpers/http_helper.rb @@ -1,13 +1,17 @@ # frozen_string_literal: true module HttpHelper - USER_AGENT = "#{HTTP::Request::USER_AGENT} (Mastodon/#{Mastodon::Version}; +http://#{Rails.configuration.x.local_domain}/)" - def http_client(options = {}) timeout = { write: 10, connect: 10, read: 10 }.merge(options) - HTTP.headers(user_agent: USER_AGENT) + HTTP.headers(user_agent: user_agent) .timeout(:per_operation, timeout) .follow end + + private + + def user_agent + @user_agent ||= "#{HTTP::Request::USER_AGENT} (Mastodon/#{Mastodon::Version}; +http://#{Rails.configuration.x.local_domain}/)" + end end diff --git a/app/lib/tag_manager.rb b/app/lib/tag_manager.rb index 3bddfba7c..6170a90de 100644 --- a/app/lib/tag_manager.rb +++ b/app/lib/tag_manager.rb @@ -65,8 +65,10 @@ class TagManager end def normalize_domain(domain) + return if domain.nil? + uri = Addressable::URI.new - uri.host = domain + uri.host = domain.gsub(/[\/]/, '') uri.normalize.host end diff --git a/app/models/account.rb b/app/models/account.rb index d64591641..87b97a20d 100644 --- a/app/models/account.rb +++ b/app/models/account.rb @@ -103,9 +103,10 @@ class Account < ApplicationRecord scope :remote, -> { where.not(domain: nil) } scope :local, -> { where(domain: nil) } - scope :without_followers, -> { where('(select count(f.id) from follows as f where f.target_account_id = accounts.id) = 0') } - scope :with_followers, -> { where('(select count(f.id) from follows as f where f.target_account_id = accounts.id) > 0') } + scope :without_followers, -> { where(followers_count: 0) } + scope :with_followers, -> { where('followers_count > 0') } scope :expiring, ->(time) { where(subscription_expires_at: nil).or(where('subscription_expires_at < ?', time)).remote.with_followers } + scope :partitioned, -> { order('row_number() over (partition by domain)') } scope :silenced, -> { where(silenced: true) } scope :suspended, -> { where(suspended: true) } scope :recent, -> { reorder(id: :desc) } diff --git a/app/services/follow_service.rb b/app/services/follow_service.rb index 844f5282d..23e721fac 100644 --- a/app/services/follow_service.rb +++ b/app/services/follow_service.rb @@ -40,7 +40,7 @@ class FollowService < BaseService if target_account.local? NotifyService.new.call(target_account, follow) else - SubscribeService.new.call(target_account) unless target_account.subscribed? + Pubsubhubbub::SubscribeWorker.perform_async(target_account.id) unless target_account.subscribed? NotificationWorker.perform_async(build_follow_xml(follow), source_account.id, target_account.id) AfterRemoteFollowWorker.perform_async(follow.id) end diff --git a/app/services/process_interaction_service.rb b/app/services/process_interaction_service.rb index 1f15a265d..16eac2353 100644 --- a/app/services/process_interaction_service.rb +++ b/app/services/process_interaction_service.rb @@ -77,7 +77,7 @@ class ProcessInteractionService < BaseService def authorize_follow_request!(account, target_account) follow_request = FollowRequest.find_by(account: target_account, target_account: account) follow_request&.authorize! - SubscribeService.new.call(account) unless account.subscribed? + Pubsubhubbub::SubscribeWorker.perform_async(account.id) unless account.subscribed? end def reject_follow_request!(account, target_account) diff --git a/app/services/subscribe_service.rb b/app/services/subscribe_service.rb index 820b208e9..138718f14 100644 --- a/app/services/subscribe_service.rb +++ b/app/services/subscribe_service.rb @@ -5,15 +5,31 @@ class SubscribeService < BaseService account.secret = SecureRandom.hex subscription = account.subscription(api_subscription_url(account.id)) - response = subscription.subscribe + response = subscription.subscribe - unless response.successful? + if response_failed_permanently?(response) + # An error in the 4xx range (except for 429, which is rate limiting) + # means we're not allowed to subscribe. Fail and move on account.secret = '' - Rails.logger.debug "PuSH subscription request for #{account.acct} failed: #{response.message}" + account.save! + elsif response_successful?(response) + # Anything in the 2xx range means the subscription will be confirmed + # asynchronously, we've done what we needed to do + account.save! + else + # What's left is the 5xx range and 429, which means we need to retry + # at a later time. Fail loudly! + raise "Subscription attempt failed for #{account.acct} (#{account.hub_url}): HTTP #{response.code}" end + end - account.save! - rescue HTTP::Error, OpenSSL::SSL::SSLError - Rails.logger.debug "PuSH subscription request for #{account.acct} could not be made due to HTTP or SSL error" + private + + def response_failed_permanently?(response) + response.code > 299 && response.code < 500 && response.code != 429 + end + + def response_successful?(response) + response.code > 199 && response.code < 300 end end diff --git a/app/services/update_remote_profile_service.rb b/app/services/update_remote_profile_service.rb index 31f4af2c1..f0c39ecc0 100644 --- a/app/services/update_remote_profile_service.rb +++ b/app/services/update_remote_profile_service.rb @@ -28,7 +28,7 @@ class UpdateRemoteProfileService < BaseService account.save_with_optional_avatar! - SubscribeService.new.call(account) if resubscribe && (account.hub_url != old_hub_url) + Pubsubhubbub::SubscribeWorker.perform_async(account.id) if resubscribe && (account.hub_url != old_hub_url) end private diff --git a/app/workers/pubsubhubbub/delivery_worker.rb b/app/workers/pubsubhubbub/delivery_worker.rb index f645b1e24..511ae14b3 100644 --- a/app/workers/pubsubhubbub/delivery_worker.rb +++ b/app/workers/pubsubhubbub/delivery_worker.rb @@ -25,8 +25,8 @@ class Pubsubhubbub::DeliveryWorker .headers(headers) .post(subscription.callback_url, body: payload) - return subscription.destroy! if response.code > 299 && response.code < 500 && response.code != 429 # HTTP 4xx means error is not temporary, except for 429 (throttling) - raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response.code > 199 && response.code < 300 + return subscription.destroy! if response_failed_permanently?(response) # HTTP 4xx means error is not temporary, except for 429 (throttling) + raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response_successful?(response) subscription.touch(:last_successful_delivery_at) end @@ -37,4 +37,12 @@ class Pubsubhubbub::DeliveryWorker hmac = OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), secret, payload) "sha1=#{hmac}" end + + def response_failed_permanently?(response) + response.code > 299 && response.code < 500 && response.code != 429 + end + + def response_successful?(response) + response.code > 199 && response.code < 300 + end end diff --git a/app/workers/pubsubhubbub/subscribe_worker.rb b/app/workers/pubsubhubbub/subscribe_worker.rb new file mode 100644 index 000000000..0c4111a8c --- /dev/null +++ b/app/workers/pubsubhubbub/subscribe_worker.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +class Pubsubhubbub::SubscribeWorker + include Sidekiq::Worker + + sidekiq_options queue: 'push' + + def perform(account_id) + account = Account.find(account_id) + Rails.logger.debug "PuSH re-subscribing to #{account.acct}" + ::SubscribeService.new.call(account) + end +end diff --git a/app/workers/scheduler/subscriptions_scheduler.rb b/app/workers/scheduler/subscriptions_scheduler.rb new file mode 100644 index 000000000..03622e95b --- /dev/null +++ b/app/workers/scheduler/subscriptions_scheduler.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true +require 'sidekiq-scheduler' + +class Scheduler::SubscriptionsScheduler + include Sidekiq::Worker + + def perform + Rails.logger.debug 'Queueing PuSH re-subscriptions' + + expiring_accounts.pluck(:id) do |id| + Pubsubhubbub::SubscribeWorker.perform_async(id) + end + end + + private + + def expiring_accounts + Account.expiring(1.day.from_now).partitioned + end +end diff --git a/config/environments/development.rb b/config/environments/development.rb index 58e8a0728..c20d08493 100644 --- a/config/environments/development.rb +++ b/config/environments/development.rb @@ -69,7 +69,4 @@ Rails.application.configure do end end -require 'sidekiq/testing' -Sidekiq::Testing.inline! - ActiveRecordQueryTrace.enabled = ENV.fetch('QUERY_TRACE_ENABLED') { false } diff --git a/config/sidekiq.yml b/config/sidekiq.yml index ee32b6317..5c700cb27 100644 --- a/config/sidekiq.yml +++ b/config/sidekiq.yml @@ -1,2 +1,11 @@ --- :concurrency: 5 +:queues: + - default + - push + - pull + - mailers +:schedule: + subscriptions_scheduler: + cron: '0 5 * * *' + class: Scheduler::SubscriptionsScheduler diff --git a/lib/tasks/mastodon.rake b/lib/tasks/mastodon.rake index 2cc1c29eb..290b28098 100644 --- a/lib/tasks/mastodon.rake +++ b/lib/tasks/mastodon.rake @@ -77,10 +77,8 @@ namespace :mastodon do desc 'Re-subscribes to soon expiring PuSH subscriptions' task refresh: :environment do - Account.expiring(1.day.from_now).find_each do |a| - Rails.logger.debug "PuSH re-subscribing to #{a.acct}" - SubscribeService.new.call(a) - end + # No-op + # This task is now executed via sidekiq-scheduler end end diff --git a/spec/services/follow_service_spec.rb b/spec/services/follow_service_spec.rb index 2ce0fa464..bda5daee1 100644 --- a/spec/services/follow_service_spec.rb +++ b/spec/services/follow_service_spec.rb @@ -53,10 +53,11 @@ RSpec.describe FollowService do end describe 'unlocked account' do - let(:bob) { Fabricate(:user, email: 'bob@example.com', account: Fabricate(:account, username: 'bob', domain: 'example.com', salmon_url: 'http://salmon.example.com')).account } + let(:bob) { Fabricate(:user, email: 'bob@example.com', account: Fabricate(:account, username: 'bob', domain: 'example.com', salmon_url: 'http://salmon.example.com', hub_url: 'http://hub.example.com')).account } before do stub_request(:post, "http://salmon.example.com/").to_return(:status => 200, :body => "", :headers => {}) + stub_request(:post, "http://hub.example.com/").to_return(status: 202) subject.call(sender, bob.acct) end @@ -70,6 +71,10 @@ RSpec.describe FollowService do xml.match(TagManager::VERBS[:follow]) }).to have_been_made.once end + + it 'subscribes to PuSH' do + expect(a_request(:post, "http://hub.example.com/")).to have_been_made.once + end end end end diff --git a/spec/services/subscribe_service_spec.rb b/spec/services/subscribe_service_spec.rb new file mode 100644 index 000000000..8cf0100c6 --- /dev/null +++ b/spec/services/subscribe_service_spec.rb @@ -0,0 +1,38 @@ +require 'rails_helper' + +RSpec.describe SubscribeService do + let(:account) { Fabricate(:account, username: 'bob', domain: 'example.com', hub_url: 'http://hub.example.com') } + subject { SubscribeService.new } + + it 'sends subscription request to PuSH hub' do + stub_request(:post, 'http://hub.example.com/').to_return(status: 202) + subject.call(account) + expect(a_request(:post, 'http://hub.example.com/')).to have_been_made.once + end + + it 'generates and keeps PuSH secret on successful call' do + stub_request(:post, 'http://hub.example.com/').to_return(status: 202) + subject.call(account) + expect(account.secret).to_not be_blank + end + + it 'fails silently if PuSH hub forbids subscription' do + stub_request(:post, 'http://hub.example.com/').to_return(status: 403) + subject.call(account) + end + + it 'fails silently if PuSH hub is not found' do + stub_request(:post, 'http://hub.example.com/').to_return(status: 404) + subject.call(account) + end + + it 'fails loudly if there is a network error' do + stub_request(:post, 'http://hub.example.com/').to_raise(HTTP::Error) + expect { subject.call(account) }.to raise_error HTTP::Error + end + + it 'fails loudly if PuSH hub is unavailable' do + stub_request(:post, 'http://hub.example.com/').to_return(status: 503) + expect { subject.call(account) }.to raise_error + end +end