[Elasticsearch] Bring back the proxy object

This commit is contained in:
Earlopain 2023-09-16 17:09:33 +02:00
parent 0bd651c08b
commit b6dc473a55
No known key found for this signature in database
GPG Key ID: 48860312319ADF61
14 changed files with 118 additions and 89 deletions

View File

@ -4,7 +4,7 @@ class PostVersionsController < ApplicationController
respond_to :js, only: [:undo] respond_to :js, only: [:undo]
def index def index
@post_versions = PostVersion.document_store_search(PostVersion.build_query(search_params)).paginate(params[:page], limit: params[:limit], max_count: 10_000, search_count: params[:search], includes: [:updater, post: [:versions]]) @post_versions = PostVersion.document_store.search(PostVersion.build_query(search_params)).paginate(params[:page], limit: params[:limit], max_count: 10_000, search_count: params[:search], includes: [:updater, post: [:versions]])
respond_with(@post_versions) respond_with(@post_versions)
end end

View File

@ -2,7 +2,7 @@
module PostIndex module PostIndex
def self.included(base) def self.included(base)
base.document_store_index = { base.document_store.index = {
settings: { settings: {
index: { index: {
number_of_shards: 5, number_of_shards: 5,
@ -76,13 +76,13 @@ module PostIndex
}, },
} }
base.extend ClassMethods base.document_store.extend ClassMethods
end end
module ClassMethods module ClassMethods
# Denormalizing the input can be made significantly more # Denormalizing the input can be made significantly more
# efficient when processing large numbers of posts. # efficient when processing large numbers of posts.
def document_store_import(options = {}) def import(options = {})
batch_size = options[:batch_size] || 1000 batch_size = options[:batch_size] || 1000
relation = all relation = all
@ -206,8 +206,8 @@ module PostIndex
} }
end end
document_store_client.bulk({ client.bulk({
index: document_store_index_name, index: index_name,
body: batch, body: batch,
}) })
end end

View File

@ -2,7 +2,7 @@
module PostVersionIndex module PostVersionIndex
def self.included(base) def self.included(base)
base.document_store_index = { base.document_store.index = {
settings: { settings: {
index: { index: {
number_of_shards: 8, number_of_shards: 8,
@ -40,11 +40,11 @@ module PostVersionIndex
}, },
} }
base.extend ClassMethods base.document_store.extend ClassMethods
end end
module ClassMethods module ClassMethods
def document_store_import(options = {}) def import(options = {})
q = all q = all
q = q.where("id >= ?", options[:from]) if options[:from] q = q.where("id >= ?", options[:from]) if options[:from]
q = q.where("id <= ?", options[:to]) if options[:to] q = q.where("id <= ?", options[:to]) if options[:to]
@ -62,8 +62,8 @@ module PostVersionIndex
} }
end end
document_store_client.bulk({ client.bulk({
index: document_store_index_name, index: index_name,
body: batch body: batch
}) })
end end

View File

@ -6,7 +6,7 @@ class IndexUpdateJob < ApplicationJob
def perform(klass, id) def perform(klass, id)
obj = klass.constantize.find(id) obj = klass.constantize.find(id)
obj.document_store_update_index obj.document_store.update_index
rescue ActiveRecord::RecordNotFound rescue ActiveRecord::RecordNotFound
# Do nothing # Do nothing
end end

View File

@ -0,0 +1,44 @@
module DocumentStore
class ClassMethodProxy
delegate_missing_to :@target
attr_accessor :index, :index_name
def initialize(target)
@target = target
end
def search(body)
search = SearchRequest.new({ index: index_name, body: body }, client)
Response.new(@target, search)
end
def create_index!(delete_existing: false)
exists = index_exist?
return if exists && !delete_existing
delete_index! if exists && delete_existing
client.indices.create(index: index_name, body: index)
end
def delete_index!
client.indices.delete(index: index_name, ignore: 404)
end
def index_exist?
client.indices.exists(index: index_name)
end
def refresh_index!
client.indices.refresh(index: index_name)
end
def delete_by_query(query:, body:)
client.delete_by_query(index: index_name, q: query, body: body)
end
def client
DocumentStore.client
end
end
end

View File

@ -0,0 +1,24 @@
module DocumentStore
class InstanceMethodProxy
delegate :client, :index_name, to: :class_document_store
delegate_missing_to :@target
def initialize(target)
@target = target
end
def update_index(refresh: "false")
client.index(index: index_name, id: id, body: as_indexed_json, refresh: refresh)
end
def delete_document(refresh: "false")
client.delete(index: index_name, id: id, refresh: refresh)
end
private
def class_document_store
@target.class.document_store
end
end
end

View File

@ -1,79 +1,25 @@
module DocumentStore module DocumentStore
module Model module Model
def self.included(klass) def self.included(klass)
klass.extend(ClassMethods) klass.include(Proxy)
klass.document_store_index_name = "#{klass.model_name.plural}_#{Rails.env}" klass.document_store.index_name = "#{klass.model_name.plural}_#{Rails.env}"
klass.after_commit on: %i[create update] do klass.after_commit on: %i[create update] do
update_index update_index
end end
klass.after_commit on: [:destroy] do klass.after_commit on: [:destroy] do
document_store_delete_document(refresh: Rails.env.test?.to_s) document_store.delete_document(refresh: Rails.env.test?.to_s)
end end
end end
def update_index(queue: :high_prio) def update_index(queue: :high_prio)
# TODO: race condition hack, makes tests SLOW!!! # TODO: race condition hack, makes tests SLOW!!!
return document_store_update_index refresh: "true" if Rails.env.test? return document_store.update_index refresh: "true" if Rails.env.test?
IndexUpdateJob.set(queue: queue).perform_later(self.class.to_s, id) IndexUpdateJob.set(queue: queue).perform_later(self.class.to_s, id)
end end
def document_store_update_index(refresh: "false")
document_store_client.index(index: document_store_index_name, id: id, body: as_indexed_json, refresh: refresh)
end
def document_store_delete_document(refresh: "false")
document_store_client.delete(index: document_store_index_name, id: id, refresh: refresh)
end
def document_store_index_name
self.class.document_store_index_name
end
def document_store_client
DocumentStore.client
end
module ClassMethods
attr_accessor :document_store_index, :document_store_index_name
def document_store_search(body)
search = SearchRequest.new({ index: document_store_index_name, body: body }, document_store_client)
Response.new(self, search)
end
def document_store_create_index!(delete_existing: false)
exists = document_store_index_exist?
return if exists && !delete_existing
document_store_delete_index! if exists && delete_existing
document_store_client.indices.create(index: document_store_index_name, body: document_store_index)
end
def document_store_delete_index!
document_store_client.indices.delete(index: document_store_index_name, ignore: 404)
end
def document_store_index_exist?
document_store_client.indices.exists(index: document_store_index_name)
end
def document_store_refresh_index!
document_store_client.indices.refresh(index: document_store_index_name)
end
def document_store_delete_by_query(query:, body:)
document_store_client.delete_by_query(index: document_store_index_name, q: query, body: body)
end
def document_store_client
DocumentStore.client
end
end
end end
def self.client def self.client

View File

@ -0,0 +1,15 @@
module DocumentStore
module Proxy
def self.included(base)
base.class_eval do
def self.document_store
@document_store ||= ClassMethodProxy.new(self)
end
def document_store
@document_store ||= InstanceMethodProxy.new(self)
end
end
end
end
end

View File

@ -36,7 +36,7 @@ class ElasticQueryBuilder
timeout: "#{CurrentUser.user.try(:statement_timeout) || 3_000}ms", timeout: "#{CurrentUser.user.try(:statement_timeout) || 3_000}ms",
} }
model_class.document_store_search(search_body) model_class.document_store.search(search_body)
end end
def match_any(*args) def match_any(*args)

View File

@ -19,7 +19,7 @@ FileUtils.chdir APP_ROOT do
end end
puts "== Creating elasticsearch indices ==\n" puts "== Creating elasticsearch indices ==\n"
system! "RAILS_ENV=development bin/rails runner '[Post, PostVersion].each { |model| model.document_store_create_index! }'" system! "RAILS_ENV=development bin/rails runner '[Post, PostVersion].each { |model| model.document_store.create_index! }'"
puts "\n== Preparing database ==" puts "\n== Preparing database =="
# Create the test database, since only development exists at this point # Create the test database, since only development exists at this point

View File

@ -2,5 +2,5 @@
require File.expand_path(File.join(File.dirname(__FILE__), '..', '..', 'config', 'environment')) require File.expand_path(File.join(File.dirname(__FILE__), '..', '..', 'config', 'environment'))
client = Post.document_store_client client = Post.document_store.client
client.indices.put_mapping index: Post.document_store_index_name, body: { properties: { has_pending_replacements: { type: "boolean" } } } client.indices.put_mapping index: Post.document_store.index_name, body: { properties: { has_pending_replacements: { type: "boolean" } } }

View File

@ -4,5 +4,5 @@ require File.expand_path(File.join(File.dirname(__FILE__), '..', '..', 'config',
Post.find_each do |post| Post.find_each do |post|
puts post.id puts post.id
post.document_store_client.update_document_attributes has_pending_replacements: post.replacements.pending.any? post.document_store.client.update_document_attributes has_pending_replacements: post.replacements.pending.any?
end end

View File

@ -38,8 +38,8 @@ BCrypt::Engine.send(:remove_const, :DEFAULT_COST)
BCrypt::Engine::DEFAULT_COST = BCrypt::Engine::MIN_COST BCrypt::Engine::DEFAULT_COST = BCrypt::Engine::MIN_COST
# Clear the elastic indicies completly # Clear the elastic indicies completly
Post.document_store_create_index!(delete_existing: true) Post.document_store.create_index!(delete_existing: true)
PostVersion.document_store_create_index!(delete_existing: true) PostVersion.document_store.create_index!(delete_existing: true)
class ActiveSupport::TestCase class ActiveSupport::TestCase
include ActionDispatch::TestProcess::FixtureFile include ActionDispatch::TestProcess::FixtureFile
@ -86,8 +86,8 @@ class ActiveSupport::TestCase
def reset_post_index def reset_post_index
# This seems slightly faster than deleting and recreating the index # This seems slightly faster than deleting and recreating the index
Post.document_store_delete_by_query(query: "*", body: {}) Post.document_store.delete_by_query(query: "*", body: {})
Post.document_store_refresh_index! Post.document_store.refresh_index!
end end
end end

View File

@ -17,28 +17,28 @@ module DocumentStore
test "it deletes the index" do test "it deletes the index" do
delete_request = stub_elastic(:delete, "/posts_test") delete_request = stub_elastic(:delete, "/posts_test")
Post.document_store_delete_index! Post.document_store.delete_index!
assert_requested delete_request assert_requested delete_request
end end
test "it checks for the existance of the index" do test "it checks for the existance of the index" do
head_request = stub_elastic(:head, "/posts_test") head_request = stub_elastic(:head, "/posts_test")
Post.document_store_index_exist? Post.document_store.index_exist?
assert_requested head_request assert_requested head_request
end end
test "it skips creating the index if it already exists" do test "it skips creating the index if it already exists" do
head_request = stub_elastic(:head, "/posts_test").to_return(status: 200) head_request = stub_elastic(:head, "/posts_test").to_return(status: 200)
Post.document_store_create_index! Post.document_store.create_index!
assert_requested head_request assert_requested head_request
end end
test "it creates the index if it doesn't exist" do test "it creates the index if it doesn't exist" do
head_request = stub_elastic(:head, "/posts_test").to_return(status: 404) head_request = stub_elastic(:head, "/posts_test").to_return(status: 404)
put_request = stub_elastic(:put, "/posts_test").with(body: Post.document_store_index) put_request = stub_elastic(:put, "/posts_test").with(body: Post.document_store.index)
assert(Post.document_store_index.present?) assert(Post.document_store.index.present?)
Post.document_store_create_index! Post.document_store.create_index!
assert_requested(head_request) assert_requested(head_request)
assert_requested(put_request) assert_requested(put_request)
@ -49,7 +49,7 @@ module DocumentStore
delete_request = stub_elastic(:delete, "/posts_test") delete_request = stub_elastic(:delete, "/posts_test")
put_request = stub_elastic(:put, "/posts_test") put_request = stub_elastic(:put, "/posts_test")
Post.document_store_create_index!(delete_existing: true) Post.document_store.create_index!(delete_existing: true)
assert_requested(head_request) assert_requested(head_request)
assert_requested(delete_request) assert_requested(delete_request)
@ -58,18 +58,18 @@ module DocumentStore
test "it deletes by query" do test "it deletes by query" do
post_request = stub_elastic(:post, "/posts_test/_delete_by_query?q=*").with(body: "{}") post_request = stub_elastic(:post, "/posts_test/_delete_by_query?q=*").with(body: "{}")
Post.document_store_delete_by_query(query: "*", body: {}) Post.document_store.delete_by_query(query: "*", body: {})
assert_requested(post_request) assert_requested(post_request)
end end
test "it refreshes the index" do test "it refreshes the index" do
post_request = stub_elastic(:post, "/posts_test/_refresh") post_request = stub_elastic(:post, "/posts_test/_refresh")
Post.document_store_refresh_index! Post.document_store.refresh_index!
assert_requested(post_request) assert_requested(post_request)
end end
test "models share the same client" do test "models share the same client" do
assert_equal(Post.document_store_client.object_id, PostVersion.document_store_client.object_id) assert_equal(Post.document_store.client.object_id, PostVersion.document_store.client.object_id)
end end
end end
end end