Convert delayed_jobs to activejob

Add sidekiq and create jobs for the majority of delayed actions.

Temporary hookup of sidekiq UI in routes for testing purposes.
This commit is contained in:
Kira 2019-02-17 02:01:12 -08:00
parent 8820570205
commit 22a077a2dd
41 changed files with 339 additions and 155 deletions

View File

@ -10,8 +10,6 @@ gem "pg"
gem "dalli", :platforms => :ruby
gem "memcache-client", :platforms => [:mswin, :mingw, :x64_mingw]
gem "tzinfo-data", :platforms => [:mswin, :mingw, :x64_mingw]
gem "delayed_job"
gem "delayed_job_active_record"
gem "simple_form"
gem "mechanize"
gem "whenever", :require => false
@ -51,6 +49,10 @@ gem 'ptools'
gem 'jquery-rails'
gem 'webpacker', '>= 4.0.x'
gem 'retriable'
gem 'sidekiq'
# bookmarks for later, if they are needed
# gem 'sidekiq-worker-killer'
# gem 'sidekiq-unique-jobs'
gem 'redis'
# needed for looser jpeg header compat

View File

@ -112,16 +112,12 @@ GEM
cityhash (0.9.0)
coderay (1.1.2)
concurrent-ruby (1.1.3)
connection_pool (2.2.2)
crack (0.4.3)
safe_yaml (~> 1.0.0)
crass (1.0.4)
daemons (1.2.6)
dalli (2.7.8)
delayed_job (4.1.5)
activesupport (>= 3.0, < 5.3)
delayed_job_active_record (4.1.3)
activerecord (>= 3.0, < 5.3)
delayed_job (>= 3.0, < 5)
diff-lcs (1.3)
docile (1.3.0)
domain_name (0.5.20180417)
@ -347,6 +343,11 @@ GEM
shoulda-context (1.2.2)
shoulda-matchers (3.1.2)
activesupport (>= 4.0.0)
sidekiq (5.2.5)
connection_pool (~> 2.2, >= 2.2.2)
rack (>= 1.5.0)
rack-protection (>= 1.5.0)
redis (>= 3.3.5, < 5)
signet (0.8.1)
addressable (~> 2.3)
faraday (~> 0.9)
@ -461,8 +462,6 @@ DEPENDENCIES
cityhash
daemons
dalli
delayed_job
delayed_job_active_record
diff-lcs
dotenv-rails
dtext_rb!
@ -502,6 +501,7 @@ DEPENDENCIES
sass-rails
shoulda-context
shoulda-matchers
sidekiq
simple_form
simplecov
sinatra

View File

@ -15,7 +15,7 @@ module Moderator
@bulk_revert.preview
render action: "new"
else
@bulk_revert.delay(:queue => "default", :priority => 15).process(CurrentUser.user, @constraints)
BulkRevertJob.perform_later(CurrentUser.id, @constraints)
flash[:notice] = "Reverts queued"
redirect_to new_moderator_bulk_revert_path
end

View File

@ -7,7 +7,7 @@ module Moderator
end
def update
Delayed::Job.enqueue(TagBatchChange.new(params[:tag][:antecedent], params[:tag][:consequent], CurrentUser.user.id, CurrentUser.ip_addr), :queue => "default")
TagBatchJob.perform_later(params[:tag][:antecedent], params[:tag][:consequent], CurrentUser.user.id, CurrentUser.ip_addr)
redirect_to edit_moderator_tag_path, :notice => "Post changes queued"
end

View File

@ -0,0 +1,8 @@
class ApplicationJob < ActiveJob::Base
class JobError < Exception ; end
# Automatically retry jobs that encountered a deadlock
retry_on ActiveRecord::Deadlocked
# Most jobs are safe to ignore if the underlying records are no longer available
# discard_on ActiveJob::DeserializationError
end

View File

@ -0,0 +1,11 @@
class BulkRevertJob < ApplicationJob
queue_as :low_prio
def perform(*args)
user = User.find(args[0])
constraints = args[1]
reverter = BulkRevert.new
reverter.perform(user, constraints)
end
end

View File

@ -0,0 +1,7 @@
class DeletePostFilesJob < ApplicationJob
queue_as :low_prio
def perform(*args)
Post.delete_files(args[0], args[1], args[2])
end
end

View File

@ -0,0 +1,9 @@
class PostUpdatePoolsJob < ApplicationJob
queue_as :tags
def perform(*args)
pool = Pool.find(args[0])
pool.update_category_pseudo_tags_for_posts
end
end

View File

@ -0,0 +1,8 @@
class TagAliasJob < ApplicationJob
queue_as :tags
def perform(*args)
ta = TagAlias.find(args[0])
ta.process!(args[1])
end
end

View File

@ -0,0 +1,9 @@
class TagAliasUpdatePostsJob < ApplicationJob
queue_as :tags
def perform(*args)
ta = TagAlias.find(args[0])
ta.update_posts
end
end

82
app/jobs/tag_batch_job.rb Normal file
View File

@ -0,0 +1,82 @@
class TagBatchJob < ApplicationJob
queue_as :tags
def perform(*args)
@antecedent = args[0]
@consequent = args[1]
@updater_id = args[2]
@updater_ip_addr = args[3]
raise JobError.new("antecedent is missing") if @antecedent.blank?
normalized_antecedent = TagAlias.to_aliased(::Tag.scan_tags(@antecedent.mb_chars.downcase))
normalized_consequent = TagAlias.to_aliased(::Tag.scan_tags(@consequent.mb_chars.downcase))
updater = User.find(@updater_id)
CurrentUser.without_safe_mode do
CurrentUser.scoped(updater, @updater_ip_addr) do
migrate_posts(normalized_antecedent, normalized_consequent)
migrate_saved_searches(normalized_antecedent, normalized_consequent)
migrate_blacklists(normalized_antecedent, normalized_consequent)
end
end
ModAction.log("processed mass update: #{@antecedent} -> #{@consequent}",:mass_update)
end
def estimate_update_count
PostReadOnly.tag_match(@antecedent).count
end
def migrate_posts(normalized_antecedent, normalized_consequent)
::Post.tag_match(normalized_antecedent.join(" ")).find_each do |post|
post.reload
tags = (post.tag_array - normalized_antecedent + normalized_consequent).join(" ")
post.update(tag_string: tags)
end
end
def migrate_saved_searches(normalized_antecedent, normalized_consequent)
if SavedSearch.enabled?
tags = Tag.scan_tags(normalized_antecedent.join(" "), strip_metatags: true)
# https://www.postgresql.org/docs/current/static/functions-array.html
saved_searches = SavedSearch.where("string_to_array(query, ' ') @> ARRAY[?]", tags)
saved_searches.find_each do |ss|
ss.query = (ss.query.split - tags + normalized_consequent).uniq.join(" ")
ss.save
end
end
end
# this can't handle negated tags or other special cases
def migrate_blacklists(normalized_antecedent, normalized_consequent)
query = normalized_antecedent
adds = normalized_consequent
arel = query.inject(User.none) do |scope, x|
scope.or(User.where("blacklisted_tags like ?", "%" + x.to_escaped_for_sql_like + "%"))
end
arel.find_each do |user|
changed = false
begin
repl = user.blacklisted_tags.split(/\r\n|\r|\n/).map do |line|
list = Tag.scan_tags(line)
if (list & query).size != query.size
next line
end
changed = true
(list - query + adds).join(" ")
end
if changed
user.update(blacklisted_tags: repl.join("\n"))
end
rescue Exception => e
NewRelic::Agent.notice_error(e)
end
end
end
end

View File

@ -0,0 +1,8 @@
class TagImplicationJob < ApplicationJob
queue_as :tags
def perform(*args)
ti = TagImplication.find(args[0])
ti.process!(args[1])
end
end

View File

@ -0,0 +1,9 @@
class TagPostCountJob < ApplicationJob
queue_as :tags
def perform(*args)
tag = Tag.find(args[0])
tag.fix_post_count
end
end

View File

@ -0,0 +1,9 @@
class TagUpdateRelatedJob < ApplicationJob
queue_as :tags
def perform(*args)
tag = Tag.find(args[0])
tag.update_related
end
end

View File

@ -0,0 +1,39 @@
class UserDeletionJob < ApplicationJob
queue_as :low_prio
def perform(*args)
user = User.find(args[0])
remove_favorites(user)
remove_saved_searches(user)
rename(user)
end
def remove_favorites(user)
Post.without_timeout do
Post.raw_tag_match("fav:#{user.id}").where("true /* UserDeletion.remove_favorites_for */").find_each do |post|
Favorite.remove(post: post, user: user)
end
end
end
def remove_saved_searches(user)
SavedSearch.where(user_id: user.id).destroy_all
end
def rename(user)
name = "user_#{user.id}"
n = 0
while User.where(:name => name).exists? && (n < 10)
name += "~"
end
if n == 10
raise JobError.new("New name could not be found")
end
user.name = name
user.save!
end
end

View File

@ -156,7 +156,7 @@ private
tag_implication.reject!(update_topic: false)
when :mass_update
Delayed::Job.enqueue(Moderator::TagBatchChange.new(token[1], token[2], CurrentUser.id, CurrentUser.ip_addr), :queue => "default")
TagBatchJob.perform_later(token[1], token[2], CurrentUser.id, CurrentUser.ip_addr)
when :change_category
tag = Tag.find_by_name(token[1])

View File

@ -1,80 +0,0 @@
module Moderator
class TagBatchChange < Struct.new(:antecedent, :consequent, :updater_id, :updater_ip_addr)
class Error < Exception ; end
def perform
raise Error.new("antecedent is missing") if antecedent.blank?
normalized_antecedent = TagAlias.to_aliased(::Tag.scan_tags(antecedent.mb_chars.downcase))
normalized_consequent = TagAlias.to_aliased(::Tag.scan_tags(consequent.mb_chars.downcase))
updater = User.find(updater_id)
CurrentUser.without_safe_mode do
CurrentUser.scoped(updater, updater_ip_addr) do
migrate_posts(normalized_antecedent, normalized_consequent)
migrate_saved_searches(normalized_antecedent, normalized_consequent)
migrate_blacklists(normalized_antecedent, normalized_consequent)
end
end
ModAction.log("processed mass update: #{antecedent} -> #{consequent}",:mass_update)
end
def estimate_update_count
PostReadOnly.tag_match(antecedent).count
end
def migrate_posts(normalized_antecedent, normalized_consequent)
::Post.tag_match(normalized_antecedent.join(" ")).find_each do |post|
post.reload
tags = (post.tag_array - normalized_antecedent + normalized_consequent).join(" ")
post.update(tag_string: tags)
end
end
def migrate_saved_searches(normalized_antecedent, normalized_consequent)
if SavedSearch.enabled?
tags = Tag.scan_tags(normalized_antecedent.join(" "), strip_metatags: true)
# https://www.postgresql.org/docs/current/static/functions-array.html
saved_searches = SavedSearch.where("string_to_array(query, ' ') @> ARRAY[?]", tags)
saved_searches.find_each do |ss|
ss.query = (ss.query.split - tags + normalized_consequent).uniq.join(" ")
ss.save
end
end
end
# this can't handle negated tags or other special cases
def migrate_blacklists(normalized_antecedent, normalized_consequent)
query = normalized_antecedent
adds = normalized_consequent
arel = query.inject(User.none) do |scope, x|
scope.or(User.where("blacklisted_tags like ?", "%" + x.to_escaped_for_sql_like + "%"))
end
arel.find_each do |user|
changed = false
begin
repl = user.blacklisted_tags.split(/\r\n|\r|\n/).map do |line|
list = Tag.scan_tags(line)
if (list & query).size != query.size
next line
end
changed = true
(list - query + adds).join(" ")
end
if changed
user.update(blacklisted_tags: repl.join("\n"))
end
rescue Exception => e
NewRelic::Agent.notice_error(e)
end
end
end
end
end

View File

@ -26,6 +26,6 @@ class TagAliasCorrection
def fix!
clear_cache
tag_alias.delay(:queue => "default").update_posts
TagAliasUpdatePostsJob.perform_later(tag_alias.id)
end
end

View File

@ -19,7 +19,7 @@ class TagCorrection
end
def fix!
tag.delay(:queue => "default").fix_post_count
TagPostCountJob.perform_later(tag.id)
tag.update_category_cache
end
end

View File

@ -3,15 +3,6 @@ class UserDeletion
attr_reader :user, :password
def self.remove_favorites_for(user_id)
user = User.find(user_id)
Post.without_timeout do
Post.raw_tag_match("fav:#{user_id}").where("true /* UserDeletion.remove_favorites_for */").find_each do |post|
Favorite.remove(post: post, user: user)
end
end
end
def initialize(user, password)
@user = user
@password = password
@ -20,11 +11,9 @@ class UserDeletion
def delete!
validate
clear_user_settings
remove_favorites
clear_saved_searches
rename
reset_password
create_mod_action
UserDeletionJob.perform_later(user.id)
end
private
@ -33,10 +22,6 @@ private
ModAction.log("user ##{user.id} deleted",:user_delete)
end
def clear_saved_searches
SavedSearch.where(user_id: user.id).destroy_all
end
def clear_user_settings
user.email = nil
user.last_logged_in_at = nil
@ -56,25 +41,6 @@ private
user.save!
end
def remove_favorites
UserDeletion.delay(:queue => "default").remove_favorites_for(user.id)
end
def rename
name = "user_#{user.id}"
n = 0
while User.where(:name => name).exists? && (n < 10)
name += "~"
end
if n == 10
raise ValidationError.new("New name could not be found")
end
user.name = name
user.save!
end
def validate
if !User.authenticate(user.name, password)
raise ValidationError.new("Password is incorrect")

View File

@ -302,7 +302,7 @@ class Pool < ApplicationRecord
def update_category_pseudo_tags_for_posts_async
if saved_change_to_category?
delay(:queue => "default").update_category_pseudo_tags_for_posts
PostUpdatePoolsJob.perform_later(id)
end
end

View File

@ -88,7 +88,7 @@ class Post < ApplicationRecord
end
def queue_delete_files(grace_period)
Post.delay(queue: "default", run_at: Time.now + grace_period).delete_files(id, md5, file_ext)
DeletePostFilesJob.set(wait: grace_period).perform_later(id, md5, file_ext)
end
def delete_files

View File

@ -876,7 +876,7 @@ class Tag < ApplicationRecord
if Cache.get("urt:#{key}").nil? && should_update_related?
if post_count < COSINE_SIMILARITY_RELATED_TAG_THRESHOLD
delay(:queue => "default").update_related
TagUpdateRelatedJob.perform_later(id)
else
sqs = SqsService.new(Danbooru.config.aws_sqs_reltagcalc_url)
sqs.send_message("calculate #{name}")

View File

@ -27,7 +27,7 @@ class TagAlias < TagRelationship
def approve!(update_topic: true, approver: CurrentUser.user)
CurrentUser.scoped(approver) do
update(status: "queued", approver_id: approver.id)
delay(:queue => "default").process!(update_topic: update_topic)
TagAliasJob.perform_later(id, update_topic)
end
end
end

View File

@ -166,7 +166,7 @@ class TagImplication < TagRelationship
def approve!(approver: CurrentUser.user, update_topic: true)
update(status: "queued", approver_id: approver.id)
delay(:queue => "default").process!(update_topic: update_topic)
TagImplicationJob.perform_later(id, update_topic)
end
def reject!(update_topic: true)

View File

@ -24,6 +24,7 @@ module Danbooru
config.log_tags = [->(req) {"PID:#{Process.pid}"}]
config.action_controller.action_on_unpermitted_parameters = :raise
config.force_ssl = true
config.active_job.queue_adapter = :sidekiq
if Rails.env.production? && Danbooru.config.ssl_options.present?
config.ssl_options = Danbooru.config.ssl_options

View File

@ -1,5 +1,5 @@
class Delayed::Job
def hidden_attributes
[:handler]
end
end
# class Delayed::Job
# def hidden_attributes
# [:handler]
# end
# end

View File

@ -1,14 +1,14 @@
require 'delayed/plugin'
class DelayedJobTimeoutPlugin < ::Delayed::Plugin
callbacks do |lifecycle|
lifecycle.before(:execute) do |job|
Delayed::Job.connection.execute "set statement_timeout = 0"
end
end
end
Delayed::Worker.default_queue_name = "default"
Delayed::Worker.destroy_failed_jobs = false
Delayed::Worker.plugins << DelayedJobTimeoutPlugin
Delayed::Job.include(Danbooru::Paginator::ActiveRecordExtension)
# require 'delayed/plugin'
#
# class DelayedJobTimeoutPlugin < ::Delayed::Plugin
# callbacks do |lifecycle|
# lifecycle.before(:execute) do |job|
# Delayed::Job.connection.execute "set statement_timeout = 0"
# end
# end
# end
#
# Delayed::Worker.default_queue_name = "default"
# Delayed::Worker.destroy_failed_jobs = false
# Delayed::Worker.plugins << DelayedJobTimeoutPlugin
# Delayed::Job.include(Danbooru::Paginator::ActiveRecordExtension)

View File

@ -1,5 +1,9 @@
Rails.application.routes.draw do
require 'sidekiq/web'
Sidekiq::Web.set :session_secret, Rails.application.credentials[:secret_key_base]
mount Sidekiq::Web => '/sidekiq'
namespace :admin do
resources :users, :only => [:edit, :update]
resource :alias_and_implication_import, :only => [:new, :create]

21
config/sidekiq.yml Normal file
View File

@ -0,0 +1,21 @@
---
:verbose: false
:concurrency: 5
# Set timeout to 8 on Heroku, longer if you manage your own systems.
:timeout: 30
# Sidekiq will run this file through ERB when reading it so you can
# even put in dynamic logic, like a host-specific queue.
# http://www.mikeperham.com/2013/11/13/advanced-sidekiq-host-specific-queues/
:queues:
- [low_prio, 1]
- [tags, 2]
- [default, 3]
- [high_prio, 5]
# you can override concurrency based on environment
production:
:concurrency: 25
staging:
:concurrency: 5

View File

@ -0,0 +1,7 @@
require 'test_helper'
class BulkRevertJobTest < ActiveJob::TestCase
# test "the truth" do
# assert true
# end
end

View File

@ -0,0 +1,7 @@
require 'test_helper'
class DeletePostFilesJobTest < ActiveJob::TestCase
# test "the truth" do
# assert true
# end
end

View File

@ -0,0 +1,7 @@
require 'test_helper'
class PostUpdatePoolsJobTest < ActiveJob::TestCase
# test "the truth" do
# assert true
# end
end

View File

@ -0,0 +1,7 @@
require 'test_helper'
class TagAliasJobTest < ActiveJob::TestCase
# test "the truth" do
# assert true
# end
end

View File

@ -0,0 +1,7 @@
require 'test_helper'
class TagAliasUpdatePostsJobTest < ActiveJob::TestCase
# test "the truth" do
# assert true
# end
end

View File

@ -0,0 +1,7 @@
require 'test_helper'
class TagBatchJobTest < ActiveJob::TestCase
# test "the truth" do
# assert true
# end
end

View File

@ -0,0 +1,7 @@
require 'test_helper'
class TagImplicationJobTest < ActiveJob::TestCase
# test "the truth" do
# assert true
# end
end

View File

@ -0,0 +1,7 @@
require 'test_helper'
class TagPostCountJobTest < ActiveJob::TestCase
# test "the truth" do
# assert true
# end
end

View File

@ -0,0 +1,7 @@
require 'test_helper'
class TagUpdateRelatedJobTest < ActiveJob::TestCase
# test "the truth" do
# assert true
# end
end

View File

@ -0,0 +1,7 @@
require 'test_helper'
class UserDeletionJobTest < ActiveJob::TestCase
# test "the truth" do
# assert true
# end
end

View File

@ -13,6 +13,9 @@ require 'rails/test_help'
require 'cache'
require 'webmock/minitest'
require 'sidekiq/testing'
Sidekiq::Testing::fake!
Dir[File.expand_path(File.dirname(__FILE__) + "/factories/*.rb")].each {|file| require file}
Dir[File.expand_path(File.dirname(__FILE__) + "/test_helpers/*.rb")].each {|file| require file}
@ -131,6 +134,4 @@ class ActionDispatch::IntegrationTest
end
end
Delayed::Worker.delay_jobs = false
Rails.application.load_seed