Convert remaining delay jobs to activejob

This commit is contained in:
Kira 2019-02-18 14:32:18 -08:00
parent 22a077a2dd
commit bd69a08179
22 changed files with 65 additions and 275 deletions

View File

@ -1,41 +0,0 @@
class DelayedJobsController < ApplicationController
respond_to :html, :xml, :json, :js
before_action :admin_only, except: [:index]
def index
@delayed_jobs = Delayed::Job.order("run_at asc").paginate(params[:page], :limit => params[:limit])
respond_with(@delayed_jobs)
end
def cancel
@job = Delayed::Job.find(params[:id])
if !@job.locked_at?
@job.fail!
end
respond_with(@job)
end
def retry
@job = Delayed::Job.find(params[:id])
if !@job.locked_at?
@job.update(failed_at: nil, attempts: 0)
end
respond_with(@job)
end
def run
@job = Delayed::Job.find(params[:id])
if !@job.locked_at?
@job.update(run_at: Time.now)
end
respond_with(@job)
end
def destroy
@job = Delayed::Job.find(params[:id])
if !@job.locked_at?
@job.destroy
end
respond_with(@job)
end
end

View File

@ -1,138 +0,0 @@
module DelayedJobsHelper
def print_name(job)
case job.name
when "Tag.increment_post_counts"
"<strong>increment post counts</strong>"
when "Tag.decrement_post_counts"
"<strong>decrement post counts</strong>"
when "Post.expire_cache"
"<strong>expire post cache</strong>"
when "Moderator::TagBatchChange"
"<strong>tag batch change</strong>"
when "Class#expire_cache"
"<strong>expire post count cache</strong>"
when "Upload#process!"
"<strong>upload post</strong>"
when "Tag#update_related"
"<strong>update related tags</strong>"
when "TagAlias#process!"
"<strong>alias</strong>"
when "TagImplication#process!"
"<strong>implication</strong>"
when "Class#clear_cache_for"
"<strong>expire tag alias cache</strong>"
when "Tag#update_category_cache"
"<strong>update tag category cache</strong>"
when "Tag#update_category_post_counts"
"<strong>update category post counts</strong>"
when "Class#remove_iqdb"
"<strong>remove from iqdb</strong>"
when "Post#update_iqdb"
"<strong>update iqdb</strong>"
when "Class#convert"
"<strong>convert ugoira</strong>"
when "Class#increment_post_counts"
"<strong>increment post counts</strong>"
when "Class#decrement_post_counts"
"<strong>decrement post counts</strong>"
when "Pool#update_category_pseudo_tags_for_posts"
"<strong>update pool category pseudo tags for posts</strong>"
when "Post.delete_files"
"<strong>delete old files</strong>"
when "BulkRevert#process"
"<strong>bulk revert</strong>"
when "PostKeeperManager.check_and_update"
"<strong>update top tagger</strong>"
else
h(job.name)
end
end
def print_handler(job)
case job.name
when "PostKeeperManager.check_and_assign"
""
when "Tag.increment_post_counts", "Tag.decrement_post_counts"
""
when "Post.expire_cache"
""
when "Moderator::TagBatchChange"
h(job.payload_object.antecedent) + " -> " + h(job.payload_object.consequent)
when "Class#expire_cache"
h(job.payload_object.args.flatten.join(" "))
when "Upload#process!"
%{<a href="/uploads/#{job.payload_object.object.id}">record</a>}
when "Tag#update_related"
h(job.payload_object.name)
when "TagAlias#process!"
h(job.payload_object.antecedent_name) + " -> " + h(job.payload_object.consequent_name)
when "TagImplication#process!"
h(job.payload_object.antecedent_name) + " -> " + h(job.payload_object.consequent_name)
when "Class#clear_cache_for"
h(job.payload_object.args.flatten.join(" "))
when "Tag#update_category_cache"
h(job.payload_object.name)
when "Tag#update_category_post_counts"
h(job.payload_object.name)
when "Class#process", "Class#remove_iqdb"
h(job.payload_object.args.flatten.join(" "))
when "Post#update_iqdb"
h(job.payload_object.id)
when "Class#convert"
h(job.payload_object.args[0])
when "Class#increment_post_counts"
h(job.payload_object.args.join(" "))
when "Class#decrement_post_counts"
h(job.payload_object.args.join(" "))
when "Pool#update_category_pseudo_tags_for_posts"
%{<a href="/pools/#{job.payload_object.id}">#{h(job.payload_object.name)}</a>}
when "Post.delete_files"
%{<a href="/posts/#{job.payload_object.args.first}">post ##{job.payload_object.args.first}</a>}
when "BulkRevert#process"
h(job.payload_object.args.join(" "))
when "PostKeeperManager.check_and_update"
h(job.payload_object.args[0])
end
end
end

View File

@ -0,0 +1,7 @@
class SavedSearchJob < ApplicationJob
queue_as :default
def perform(*args)
SavedSearch.populate(args[0])
end
end

View File

@ -0,0 +1,7 @@
class UploadDeleteFilesJob < ApplicationJob
queue_as :default
def perform(*args)
UploadService::Utils::delete_file(args[0], args[1], args[2])
end
end

View File

@ -0,0 +1,9 @@
class UploadPreprocessJob < ApplicationJob
queue_as :default
def perform(*args)
user_id = args[0]
params = args[1]
Preprocessor.new(source: params[:url], referer_url: params[:ref]).delay_start(user_id)
end
end

View File

@ -0,0 +1,7 @@
class UploadProcessJob < ApplicationJob
queue_as :default
def perform(*args)
# Do something later
end
end

View File

@ -1,22 +0,0 @@
#!/usr/bin/env ruby
require 'mail'
class DelayedJobErrorChecker
def check!
errors = Delayed::Job.where("last_error is not null").limit(100).pluck(:last_error).map {|x| x[0..100]}
if errors.size == 100
mail = Mail.new do
from Danbooru.config.contact_email
to Danbooru.config.contact_email
CurrentUser.as_system do
subject "[#{Danbooru.config.app_name}] Delayed job error count at #{errors}"
end
body errors.uniq.join("\n")
end
mail.delivery_method :sendmail
mail.deliver
end
end
end

View File

@ -3,7 +3,6 @@ module Maintenance
def hourly
UploadErrorChecker.new.check!
DelayedJobErrorChecker.new.check!
rescue Exception => exception
rescue_exception(exception)
end
@ -12,7 +11,6 @@ module Maintenance
ActiveRecord::Base.connection.execute("set statement_timeout = 0")
PostPruner.new.prune!
Upload.where('created_at < ?', 1.day.ago).delete_all
Delayed::Job.where('created_at < ?', 45.days.ago).delete_all
PostVote.prune!
CommentVote.prune!
ApiCacheGenerator.new.generate_tag_cache

View File

@ -22,7 +22,7 @@ class UploadService
preprocessor = Preprocessor.new(params)
if preprocessor.in_progress?
delay(queue: "default", priority: -1, run_at: 5.seconds.from_now).delayed_start(CurrentUser.id)
UploadProcessJob.set(wait: 5.seconds).perform_later(self, CurrentUser.id)
return preprocessor.predecessor
end
@ -91,9 +91,9 @@ class UploadService
upload.update(status: "completed", post_id: @post.id)
if @post.is_pending? && Automod::UpdateDynamoDbJob.enabled?
Delayed::Job.enqueue(Automod::UpdateDynamoDbJob.new(@post.id), run_at: 84.hours.from_now, queue: "default")
end
# if @post.is_pending? && Automod::UpdateDynamoDbJob.enabled?
# Delayed::Job.enqueue(Automod::UpdateDynamoDbJob.new(@post.id), run_at: 84.hours.from_now, queue: "default")
# end
@post
end

View File

@ -5,7 +5,7 @@ class UploadService
if Utils.is_downloadable?(url) && file.nil?
# this gets called from UploadsController#new so we need to preprocess async
Preprocessor.new(source: url, referer_url: ref).delay(priority: -1, queue: "default").delayed_start(CurrentUser.id)
UploadPreprocessJob.perform_later(CurrentUser.id, {source: uri, referer_url: ref})
begin
download = Downloads::File.new(url, ref)

View File

@ -166,7 +166,7 @@ class UploadService
# in case this upload never finishes processing, we need to delete the
# distributed files in the future
UploadService::Utils.delay(priority: -1, queue: "default", run_at: 24.hours.from_now).delete_file(upload.md5, upload.file_ext, upload.id)
UploadDeleteFilesJob.set(wait: 24.hours).perform_later(upload.md5, upload.file_ext, upload.id)
end
# these methods are only really used during upload processing even

View File

@ -26,6 +26,7 @@ class SavedSearch < ApplicationRecord
post_ids.merge(sub_ids)
redis.expire(redis_key, REDIS_EXPIRY)
else
SavedSearchJob.perform_later(query)
SavedSearch.delay(queue: "default").populate(query)
end
end

View File

@ -1 +0,0 @@
location.reload();

View File

@ -1 +0,0 @@
location.reload();

View File

@ -1,61 +0,0 @@
<div id="c-delayed-jobs">
<div id="a-index">
<h1>Delayed Jobs</h1>
<table class="striped autofit">
<thead>
<tr>
<th>Queue</th>
<th>Name</th>
<% if CurrentUser.is_admin? %>
<th>Handler</th>
<% end %>
<th>Attempts</th>
<th>Last error</th>
<th>Failed at</th>
<th>Run at</th>
<th></th>
</tr>
</thead>
<tbody>
<% @delayed_jobs.each do |job| %>
<tr>
<td><%= job.queue %></td>
<td><%= raw print_name(job) %></td>
<% if CurrentUser.is_admin? %>
<td class="col-expand"><%= raw print_handler(job) %></td>
<% end %>
<td><%= job.attempts %></td>
<td class="col-expand">
<% if job.last_error %>
<%= job.last_error.split(/\n/)[0] %>
<%= job.last_error.split(/\n/)[1..-1].grep(/releases/).join("\n") %>
<% end %>
</td>
<td><%= time_ago_in_words_tagged(job.failed_at) if job.failed_at %></td>
<td><%= time_ago_in_words_tagged(job.run_at) %></td>
<td>
<% if CurrentUser.is_admin? %>
<% if job.locked_at? %>
Running
<% elsif job.failed? %>
<%= link_to "Retry", retry_delayed_job_path(job), method: :put, remote: true %> |
<%= link_to "Delete", delayed_job_path(job), method: :delete, remote: true %>
<% else %>
<%= link_to "Run", run_delayed_job_path(job), method: :put, remote: true %> |
<%= link_to "Cancel", cancel_delayed_job_path(job), method: :put, remote: true %>
<% end %>
<% end %>
</td>
</tr>
<% end %>
</tbody>
</table>
<%= numbered_paginator(@delayed_jobs) %>
</div>
</div>
<% content_for(:page_title) do %>
Delayed Jobs - <%= Danbooru.config.app_name %>
<% end %>

View File

@ -1 +0,0 @@
location.reload();

View File

@ -1 +0,0 @@
location.reload();

View File

@ -124,7 +124,6 @@
<ul>
<li><h1>Admin</h1></li>
<li><%= link_to("Mod Actions", mod_actions_path) %></li>
<li><%= link_to("Jobs", delayed_jobs_path) %></li>
<li><%= link_to("Bulk Update Requests", bulk_update_requests_path) %></li>
<li><%= link_to("Janitor Trials", janitor_trials_path) %></li>

View File

@ -0,0 +1,7 @@
require 'test_helper'
class SavedSearchJobTest < ActiveJob::TestCase
# test "the truth" do
# assert true
# end
end

View File

@ -0,0 +1,7 @@
require 'test_helper'
class UploadDeleteFilesJobTest < ActiveJob::TestCase
# test "the truth" do
# assert true
# end
end

View File

@ -0,0 +1,7 @@
require 'test_helper'
class UploadPreprocessJobTest < ActiveJob::TestCase
# test "the truth" do
# assert true
# end
end

View File

@ -0,0 +1,7 @@
require 'test_helper'
class UploadProcessJobTest < ActiveJob::TestCase
# test "the truth" do
# assert true
# end
end