Merge pull request #548 from e621ng/remove-elasticsearch-model

Remove elasticsearch-model
This commit is contained in:
Earlopain 2023-09-18 20:29:56 +02:00 committed by GitHub
commit a0c56446fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 427 additions and 216 deletions

View File

@ -29,7 +29,7 @@ gem 'sidekiq-unique-jobs'
gem 'redis'
gem 'request_store'
gem 'elasticsearch-model'
gem 'elasticsearch'
gem 'mailgun-ruby'
gem 'resolv'

View File

@ -130,10 +130,6 @@ GEM
elasticsearch-transport (= 7.17.7)
elasticsearch-api (7.17.7)
multi_json
elasticsearch-model (7.2.1)
activesupport (> 3)
elasticsearch (~> 7)
hashie
elasticsearch-transport (7.17.7)
faraday (~> 1)
multi_json
@ -172,7 +168,6 @@ GEM
globalid (1.1.0)
activesupport (>= 5.0)
hashdiff (1.0.1)
hashie (5.0.0)
http-accept (1.7.0)
http-cookie (1.0.5)
domain_name (~> 0.5)
@ -406,7 +401,7 @@ DEPENDENCIES
dotenv-rails
draper
dtext_rb!
elasticsearch-model
elasticsearch
factory_bot_rails
httparty
listen

View File

@ -17,14 +17,13 @@ module Admin
end
moved_post_ids = []
Post.tag_match("user:!#{@old_user.id} #{query}").limit(300).records.each do |p|
Post.tag_match("user:!#{@old_user.id} #{query}").limit(300).each do |p|
moved_post_ids << p.id
p.do_not_version_changes = true
p.update({ uploader_id: @new_user.id })
p.versions.where(updater_id: @old_user.id).each do |pv|
pv.update_column(:updater_id, @new_user.id)
pv.reload
pv.__elasticsearch__.index_document
pv.update_index
end
end

View File

@ -16,7 +16,7 @@ module Admin
dnp_tags = %w[avoid_posting conditional_dnp]
post_ids = []
Post.tag_match_system("#{query} ~avoid_posting ~conditional_dnp").limit(1000).records.each do |p|
Post.tag_match_system("#{query} ~avoid_posting ~conditional_dnp").limit(1000).each do |p|
previous_tags = p.fetch_tags(*dnp_tags)
p.do_not_version_changes = true

View File

@ -4,7 +4,7 @@ class PostVersionsController < ApplicationController
respond_to :js, only: [:undo]
def index
@post_versions = PostVersion.__elasticsearch__.search(PostVersion.build_query(search_params)).paginate(params[:page], limit: params[:limit], max_count: 10_000, search_count: params[:search], includes: [:updater, post: [:versions]])
@post_versions = PostVersion.document_store.search(PostVersion.build_query(search_params)).paginate(params[:page], limit: params[:limit], max_count: 10_000, search_count: params[:search], includes: [:updater, post: [:versions]])
respond_with(@post_versions)
end

View File

@ -89,7 +89,7 @@ class PostsController < ApplicationController
def random
tags = params[:tags] || ''
@post = Post.tag_match(tags + " order:random").limit(1).records[0]
@post = Post.tag_match(tags + " order:random").limit(1).first
raise ActiveRecord::RecordNotFound if @post.nil?
respond_with(@post) do |format|
format.html { redirect_to post_path(@post, :tags => params[:tags]) }

View File

@ -1,36 +0,0 @@
# frozen_string_literal: true
# Base Elasticsearch indexing definitions
#
# Make sure to include your custom index file
# in your model alongside Indexable.
module Indexable
def self.included(base)
base.include Elasticsearch::Model
base.index_name("#{base.model_name.plural}_#{Rails.env}")
base.after_commit on: [:create] do
__elasticsearch__.index_document(Rails.env.test? ? { refresh: "true" } : {})
end
base.after_commit on: [:update] do
update_index # XXX
end
base.after_commit on: [:destroy] do
__elasticsearch__.delete_document(Rails.env.test? ? { refresh: "true" } : {})
end
end
def update_index(queue: :high_prio)
# TODO: race condition hack, makes tests SLOW!!!
return __elasticsearch__.index_document refresh: "true" if Rails.env.test?
IndexUpdateJob.set(queue: queue).perform_later(self.class.to_s, id)
end
def update_index!
__elasticsearch__.index_document
end
end

View File

@ -2,71 +2,81 @@
module PostIndex
def self.included(base)
base.settings index: { number_of_shards: 5, number_of_replicas: 1, max_result_window: 250_000 } do
mappings dynamic: false do
indexes :created_at, type: 'date'
indexes :updated_at, type: 'date'
indexes :commented_at, type: 'date'
indexes :comment_bumped_at, type: 'date'
indexes :noted_at, type: 'date'
indexes :id, type: 'integer'
indexes :up_score, type: 'integer'
indexes :down_score, type: 'integer'
indexes :score, type: 'integer'
indexes :fav_count, type: 'integer'
indexes :tag_count, type: 'integer'
indexes :change_seq, type: 'long'
base.document_store.index = {
settings: {
index: {
number_of_shards: 5,
number_of_replicas: 1,
max_result_window: 250_000,
},
},
mappings: {
dynamic: false,
properties: {
created_at: { type: "date" },
updated_at: { type: "date" },
commented_at: { type: "date" },
comment_bumped_at: { type: "date" },
noted_at: { type: "date" },
id: { type: "integer" },
up_score: { type: "integer" },
down_score: { type: "integer" },
score: { type: "integer" },
fav_count: { type: "integer" },
tag_count: { type: "integer" },
change_seq: { type: "long" },
indexes :tag_count_general, type: 'integer'
indexes :tag_count_artist, type: 'integer'
indexes :tag_count_character, type: 'integer'
indexes :tag_count_copyright, type: 'integer'
indexes :tag_count_meta, type: 'integer'
indexes :tag_count_species, type: 'integer'
indexes :tag_count_invalid, type: 'integer'
indexes :tag_count_lore, type: 'integer'
indexes :comment_count, type: 'integer'
tag_count_general: { type: "integer" },
tag_count_artist: { type: "integer" },
tag_count_character: { type: "integer" },
tag_count_copyright: { type: "integer" },
tag_count_meta: { type: "integer" },
tag_count_species: { type: "integer" },
tag_count_invalid: { type: "integer" },
tag_count_lore: { type: "integer" },
comment_count: { type: "integer" },
indexes :file_size, type: 'integer'
indexes :parent, type: 'integer'
indexes :pools, type: 'integer'
indexes :sets, type: 'integer'
indexes :commenters, type: 'integer'
indexes :noters, type: 'integer'
indexes :faves, type: 'integer'
indexes :upvotes, type: 'integer'
indexes :downvotes, type: 'integer'
indexes :children, type: 'integer'
indexes :uploader, type: 'integer'
indexes :approver, type: 'integer'
indexes :deleter, type: 'integer'
indexes :width, type: 'integer'
indexes :height, type: 'integer'
indexes :mpixels, type: 'float'
indexes :aspect_ratio, type: 'float'
indexes :duration, type: 'float'
file_size: { type: "integer" },
parent: { type: "integer" },
pools: { type: "integer" },
sets: { type: "integer" },
commenters: { type: "integer" },
noters: { type: "integer" },
faves: { type: "integer" },
upvotes: { type: "integer" },
downvotes: { type: "integer" },
children: { type: "integer" },
uploader: { type: "integer" },
approver: { type: "integer" },
deleter: { type: "integer" },
width: { type: "integer" },
height: { type: "integer" },
mpixels: { type: "float" },
aspect_ratio: { type: "float" },
duration: { type: "float" },
indexes :tags, type: 'keyword'
indexes :md5, type: 'keyword'
indexes :rating, type: 'keyword'
indexes :file_ext, type: 'keyword'
indexes :source, type: 'keyword'
indexes :description, type: 'text'
indexes :notes, type: 'text'
indexes :del_reason, type: 'keyword'
tags: { type: "keyword" },
md5: { type: "keyword" },
rating: { type: "keyword" },
file_ext: { type: "keyword" },
source: { type: "keyword" },
description: { type: "text" },
notes: { type: "text" },
del_reason: { type: "keyword" },
indexes :rating_locked, type: 'boolean'
indexes :note_locked, type: 'boolean'
indexes :status_locked, type: 'boolean'
indexes :flagged, type: 'boolean'
indexes :pending, type: 'boolean'
indexes :deleted, type: 'boolean'
indexes :has_children, type: 'boolean'
indexes :has_pending_replacements, type: 'boolean'
end
end
rating_locked: { type: "boolean" },
note_locked: { type: "boolean" },
status_locked: { type: "boolean" },
flagged: { type: "boolean" },
pending: { type: "boolean" },
deleted: { type: "boolean" },
has_children: { type: "boolean" },
has_pending_replacements: { type: "boolean" },
},
},
}
base.__elasticsearch__.extend ClassMethods
base.document_store.extend ClassMethods
end
module ClassMethods
@ -198,7 +208,6 @@ module PostIndex
client.bulk({
index: index_name,
type: document_type,
body: batch,
})
end

View File

@ -2,36 +2,45 @@
module PostVersionIndex
def self.included(base)
base.settings index: {number_of_shards: 8, number_of_replicas: 1} do
mappings dynamic: false do
indexes :id, type: 'integer'
indexes :post_id, type: 'integer'
indexes :version, type: 'integer'
indexes :updater_id, type: 'integer'
indexes :parent_id, type: 'integer'
indexes :rating, type: 'keyword'
indexes :source, type: 'keyword'
indexes :description, type: 'text'
indexes :reason, type: "text"
base.document_store.index = {
settings: {
index: {
number_of_shards: 8,
number_of_replicas: 1,
},
},
mappings: {
dynamic: false,
properties: {
id: { type: "integer" },
post_id: { type: "integer" },
version: { type: "integer" },
updater_id: { type: "integer" },
parent_id: { type: "integer" },
rating: { type: "keyword" },
source: { type: "keyword" },
description: { type: "text" },
reason: { type: "text" },
indexes :description_changed, type: 'boolean'
indexes :parent_id_changed, type: 'boolean'
indexes :source_changed, type: 'boolean'
indexes :rating_changed, type: 'boolean'
description_changed: { type: "boolean" },
parent_id_changed: { type: "boolean" },
source_changed: { type: "boolean" },
rating_changed: { type: "boolean" },
indexes :tags_added, type: 'keyword'
indexes :tags_removed, type: 'keyword'
indexes :tags, type: 'keyword'
tags_added: { type: "keyword" },
tags_removed: { type: "keyword" },
tags: { type: "keyword" },
indexes :updated_at, type: 'date'
updated_at: { type: "date" },
indexes :locked_tags_added, type: 'keyword'
indexes :locked_tags_removed, type: 'keyword'
indexes :locked_tags, type: 'keyword'
end
end
locked_tags_added: { type: "keyword" },
locked_tags_removed: { type: "keyword" },
locked_tags: { type: "keyword" },
},
},
}
base.__elasticsearch__.extend ClassMethods
base.document_store.extend ClassMethods
end
module ClassMethods
@ -55,7 +64,6 @@ module PostVersionIndex
client.bulk({
index: index_name,
type: document_type,
body: batch
})
end

View File

@ -6,7 +6,7 @@ class IndexUpdateJob < ApplicationJob
def perform(klass, id)
obj = klass.constantize.find(id)
obj.update_index!
obj.document_store.update_index
rescue ActiveRecord::RecordNotFound
# Do nothing
end

View File

@ -1,63 +1,6 @@
module Danbooru
module Paginator
class PaginatedArray < Array
attr_reader :pagination_mode, :max_numbered_pages, :orig_size, :current_page, :records_per_page, :total_count
def initialize(orig_array, options = {})
@current_page = options[:current_page]
@records_per_page = options[:records_per_page]
@total_count = options[:total_count]
@max_numbered_pages = options[:max_numbered_pages] || Danbooru.config.max_numbered_pages
@pagination_mode = options[:pagination_mode]
real_array = orig_array || []
@orig_size = real_array.size
case @pagination_mode
when :sequential_before, :sequential_after
real_array = orig_array.first(records_per_page)
if @pagination_mode == :sequential_before
super(real_array)
else
super(real_array.reverse)
end
when :numbered
super(real_array)
end
end
def is_first_page?
case @pagination_mode
when :numbered
current_page == 1
when :sequential_before
false
when :sequential_after
orig_size <= records_per_page
end
end
def is_last_page?
case @pagination_mode
when :numbered
current_page >= total_pages
when :sequential_before
orig_size <= records_per_page
when :sequential_after
false
end
end
def total_pages
if records_per_page > 0
(total_count.to_f / records_per_page).ceil
else
1
end
end
end
module ElasticsearchExtensions
module DocumentStoreExtensions
include BaseExtension
def paginate(page, options)
@ -71,7 +14,7 @@ module Danbooru
current_page: current_page,
}
PaginatedArray.new(records(includes: options[:includes]).to_a, new_opts)
PaginatedArray.new(records(includes: options[:includes]), new_opts)
end
def paginate_numbered

View File

@ -0,0 +1,60 @@
module Danbooru
module Paginator
class PaginatedArray < Array
attr_reader :pagination_mode, :max_numbered_pages, :orig_size, :current_page, :records_per_page, :total_count
def initialize(orig_array, options = {})
@current_page = options[:current_page]
@records_per_page = options[:records_per_page]
@total_count = options[:total_count]
@max_numbered_pages = options[:max_numbered_pages] || Danbooru.config.max_numbered_pages
@pagination_mode = options[:pagination_mode]
real_array = orig_array || []
@orig_size = real_array.size
case @pagination_mode
when :sequential_before, :sequential_after
real_array = orig_array.first(records_per_page)
if @pagination_mode == :sequential_before
super(real_array)
else
super(real_array.reverse)
end
when :numbered
super(real_array)
end
end
def is_first_page?
case @pagination_mode
when :numbered
current_page == 1
when :sequential_before
false
when :sequential_after
orig_size <= records_per_page
end
end
def is_last_page?
case @pagination_mode
when :numbered
current_page >= total_pages
when :sequential_before
orig_size <= records_per_page
when :sequential_after
false
end
end
def total_pages
if records_per_page > 0
(total_count.to_f / records_per_page).ceil
else
1
end
end
end
end
end

View File

@ -0,0 +1,44 @@
module DocumentStore
class ClassMethodProxy
delegate_missing_to :@target
attr_accessor :index, :index_name
def initialize(target)
@target = target
end
def search(body)
search = SearchRequest.new({ index: index_name, body: body }, client)
Response.new(@target, search)
end
def create_index!(delete_existing: false)
exists = index_exist?
return if exists && !delete_existing
delete_index! if exists && delete_existing
client.indices.create(index: index_name, body: index)
end
def delete_index!
client.indices.delete(index: index_name, ignore: 404)
end
def index_exist?
client.indices.exists(index: index_name)
end
def refresh_index!
client.indices.refresh(index: index_name)
end
def delete_by_query(query:, body:)
client.delete_by_query(index: index_name, q: query, body: body)
end
def client
DocumentStore.client
end
end
end

View File

@ -0,0 +1,24 @@
module DocumentStore
class InstanceMethodProxy
delegate :client, :index_name, to: :class_document_store
delegate_missing_to :@target
def initialize(target)
@target = target
end
def update_index(refresh: "false")
client.index(index: index_name, id: id, body: as_indexed_json, refresh: refresh)
end
def delete_document(refresh: "false")
client.delete(index: index_name, id: id, refresh: refresh)
end
private
def class_document_store
@target.class.document_store
end
end
end

View File

@ -0,0 +1,32 @@
module DocumentStore
module Model
def self.included(klass)
klass.include(Proxy)
klass.document_store.index_name = "#{klass.model_name.plural}_#{Rails.env}"
klass.after_commit on: [:create] do
document_store.update_index(refresh: Rails.env.test?.to_s)
end
klass.after_commit on: [:update] do
update_index
end
klass.after_commit on: [:destroy] do
document_store.delete_document(refresh: Rails.env.test?.to_s)
end
end
def update_index(queue: :high_prio)
# TODO: race condition hack, makes tests SLOW!!!
return document_store.update_index refresh: "true" if Rails.env.test?
IndexUpdateJob.set(queue: queue).perform_later(self.class.to_s, id)
end
end
def self.client
@client ||= Elasticsearch::Client.new(host: Danbooru.config.elasticsearch_host)
end
end

View File

@ -0,0 +1,15 @@
module DocumentStore
module Proxy
def self.included(base)
base.class_eval do
def self.document_store
@document_store ||= ClassMethodProxy.new(self)
end
def document_store
@document_store ||= InstanceMethodProxy.new(self)
end
end
end
end
end

View File

@ -0,0 +1,33 @@
module DocumentStore
class Response
include Danbooru::Paginator::DocumentStoreExtensions
delegate_missing_to :records
attr_reader :klass, :search
def initialize(klass, search)
@klass = klass
@search = search
end
def response
@response ||= @search.execute!
end
def hits
response["hits"]["hits"]
end
def relation
klass.where(id: hits.pluck("_id"))
end
def records(includes: nil)
@records ||= begin
sql_records = relation
sql_records = sql_records.includes(includes) if includes
sql_records.records.sort_by { |sql_record| hits.index { |hit| hit["_id"] == sql_record.id.to_s } }
end
end
end
end

View File

@ -0,0 +1,14 @@
module DocumentStore
class SearchRequest
attr_reader :definition
def initialize(definition, client)
@definition = definition
@client = client
end
def execute!
@client.search(@definition)
end
end
end

View File

@ -36,7 +36,7 @@ class ElasticQueryBuilder
timeout: "#{CurrentUser.user.try(:statement_timeout) || 3_000}ms",
}
model_class.__elasticsearch__.search(search_body)
model_class.document_store.search(search_body)
end
def match_any(*args)

View File

@ -240,7 +240,7 @@ class Artist < ApplicationRecord
Cache.fetch("artist-domains-#{id}", expires_in: 1.day) do
re = /\.(png|jpeg|jpg|webm|mp4)$/m
counted = Hash.new(0)
sources = Post.tag_match(name, resolve_aliases: false).limit(100).records.pluck(:source).each do |source_string|
sources = Post.tag_match(name, resolve_aliases: false).limit(100).pluck(:source).each do |source_string|
sources = source_string.split("\n")
# try to filter out direct file urls
domains = sources.filter {|s| !re.match?(s) }.map do |x|

View File

@ -1430,7 +1430,7 @@ class Post < ApplicationRecord
end
def sample(query, sample_size)
tag_match_system("#{query} order:random", free_tags_count: 1).limit(sample_size).records
tag_match_system("#{query} order:random", free_tags_count: 1).limit(sample_size).relation
end
# unflattens the tag_string into one tag per row.
@ -1644,7 +1644,7 @@ class Post < ApplicationRecord
include ValidationMethods
include PostEventMethods
include Danbooru::HasBitFlags
include Indexable
include DocumentStore::Model
include PostIndex
BOOLEAN_ATTRIBUTES = %w(

View File

@ -119,7 +119,7 @@ class PostVersion < ApplicationRecord
end
extend SearchMethods
include Indexable
include DocumentStore::Model
include PostVersionIndex
def self.queue(post)

View File

@ -96,7 +96,7 @@ class Takedown < ApplicationRecord
end
def add_posts_by_tags!(tag_string)
new_ids = Post.tag_match_system("#{tag_string} -status:deleted").limit(1000).results.map(&:id)
new_ids = Post.tag_match_system("#{tag_string} -status:deleted").limit(1000).pluck(&:id)
add_posts_by_ids!(new_ids.join(" "))
end

View File

@ -52,7 +52,7 @@ class UserPresenter
end
def uploads
Post.tag_match("user:#{user.name}").limit(6).records
Post.tag_match("user:#{user.name}").limit(6)
end
def has_uploads?

View File

@ -19,7 +19,7 @@ FileUtils.chdir APP_ROOT do
end
puts "== Creating elasticsearch indices ==\n"
system! "RAILS_ENV=development bin/rails runner '[Post, PostVersion].each { |model| model.__elasticsearch__.create_index! }'"
system! "RAILS_ENV=development bin/rails runner '[Post, PostVersion].each { |model| model.document_store.create_index! }'"
puts "\n== Preparing database =="
# Create the test database, since only development exists at this point

View File

@ -1,4 +0,0 @@
Elasticsearch::Model.client = Elasticsearch::Client.new host: Danbooru.config.elasticsearch_host
Rails.configuration.to_prepare do
Elasticsearch::Model::Response::Response.include(Danbooru::Paginator::ElasticsearchExtensions)
end

View File

@ -2,5 +2,5 @@
require File.expand_path(File.join(File.dirname(__FILE__), '..', '..', 'config', 'environment'))
client = Post.__elasticsearch__.client
client.indices.put_mapping index: Post.index_name, body: { properties: { has_pending_replacements: { type: "boolean" } } }
client = Post.document_store.client
client.indices.put_mapping index: Post.document_store.index_name, body: { properties: { has_pending_replacements: { type: "boolean" } } }

View File

@ -4,5 +4,5 @@ require File.expand_path(File.join(File.dirname(__FILE__), '..', '..', 'config',
Post.find_each do |post|
puts post.id
post.__elasticsearch__.update_document_attributes has_pending_replacements: post.replacements.pending.any?
post.document_store.client.update_document_attributes has_pending_replacements: post.replacements.pending.any?
end

View File

@ -38,8 +38,8 @@ BCrypt::Engine.send(:remove_const, :DEFAULT_COST)
BCrypt::Engine::DEFAULT_COST = BCrypt::Engine::MIN_COST
# Clear the elastic indicies completly
Post.__elasticsearch__.create_index!(force: true)
PostVersion.__elasticsearch__.create_index!(force: true)
Post.document_store.create_index!(delete_existing: true)
PostVersion.document_store.create_index!(delete_existing: true)
class ActiveSupport::TestCase
include ActionDispatch::TestProcess::FixtureFile
@ -86,8 +86,8 @@ class ActiveSupport::TestCase
def reset_post_index
# This seems slightly faster than deleting and recreating the index
Post.__elasticsearch__.client.delete_by_query(index: Post.index_name, q: "*", body: {})
Post.__elasticsearch__.refresh_index!
Post.document_store.delete_by_query(query: "*", body: {})
Post.document_store.refresh_index!
end
end

View File

@ -0,0 +1,75 @@
require "test_helper"
module DocumentStore
class ModelTest < ActiveSupport::TestCase
setup do
WebMock.disable_net_connect!
WebMock.reset_executed_requests!
end
teardown do
WebMock.disable_net_connect!(allow: [Danbooru.config.elasticsearch_host])
end
def stub_elastic(method, path)
stub_request(method, "http://#{Danbooru.config.elasticsearch_host}:9200#{path}")
end
test "it deletes the index" do
delete_request = stub_elastic(:delete, "/posts_test")
Post.document_store.delete_index!
assert_requested delete_request
end
test "it checks for the existance of the index" do
head_request = stub_elastic(:head, "/posts_test")
Post.document_store.index_exist?
assert_requested head_request
end
test "it skips creating the index if it already exists" do
head_request = stub_elastic(:head, "/posts_test").to_return(status: 200)
Post.document_store.create_index!
assert_requested head_request
end
test "it creates the index if it doesn't exist" do
head_request = stub_elastic(:head, "/posts_test").to_return(status: 404)
put_request = stub_elastic(:put, "/posts_test").with(body: Post.document_store.index)
assert(Post.document_store.index.present?)
Post.document_store.create_index!
assert_requested(head_request)
assert_requested(put_request)
end
test "it recreates the index if delete_existing is true and the index already exists" do
head_request = stub_elastic(:head, "/posts_test").to_return(status: 200)
delete_request = stub_elastic(:delete, "/posts_test")
put_request = stub_elastic(:put, "/posts_test")
Post.document_store.create_index!(delete_existing: true)
assert_requested(head_request)
assert_requested(delete_request)
assert_requested(put_request)
end
test "it deletes by query" do
post_request = stub_elastic(:post, "/posts_test/_delete_by_query?q=*").with(body: "{}")
Post.document_store.delete_by_query(query: "*", body: {})
assert_requested(post_request)
end
test "it refreshes the index" do
post_request = stub_elastic(:post, "/posts_test/_refresh")
Post.document_store.refresh_index!
assert_requested(post_request)
end
test "models share the same client" do
assert_equal(Post.document_store.client.object_id, PostVersion.document_store.client.object_id)
end
end
end

View File

@ -2,7 +2,7 @@ require 'test_helper'
class PostTest < ActiveSupport::TestCase
def assert_tag_match(posts, query)
assert_equal(posts.map(&:id), Post.tag_match(query).records.pluck(:id))
assert_equal(posts.map(&:id), Post.tag_match(query).pluck(:id))
end
setup do