diff --git a/app/indexes/post_version_index.rb b/app/indexes/post_version_index.rb new file mode 100644 index 000000000..161725c3e --- /dev/null +++ b/app/indexes/post_version_index.rb @@ -0,0 +1,85 @@ +# frozen_string_literal: true + +module PostVersionIndex + def self.included(base) + base.settings index: {number_of_shards: 5, number_of_replicas: 1} do + mappings dynamic: false, _all: {enabled: false} do + indexes :id, type: 'integer' + indexes :version, type: 'integer' + indexes :updater_id, type: 'integer' + indexes :parent_id, type: 'integer' + indexes :rating, type: 'keyword' + indexes :source, type: 'keyword' + indexes :description, type: 'text' + + indexes :description_changed, type: 'boolean' + indexes :parent_id_changed, type: 'boolean' + indexes :source_changed, type: 'boolean' + indexes :rating_changed, type: 'boolean' + + indexes :tags_added, type: 'keyword' + indexes :tags_removed, type: 'keyword' + indexes :tags, type: 'keyword' + + indexes :updated_at, type: 'date' + + indexes :locked_tags_added, type: 'keyword' + indexes :locked_tags_removed, type: 'keyword' + indexes :locked_tags, type: 'keyword' + end + end + + base.__elasticsearch__.extend ClassMethods + end + + module ClassMethods + def import(options = {}) + q = all + q = q.where("id <= ?", options[:from]) if options[:from] + q = q.where("id >= ?", options[:to]) if options[:to] + q = q.where(options[:query]) if options[:query] + + q.find_in_batches do |batch| + batch.map! do |pv| + { + index: { + _id: pv.id, + data: pv.as_indexed_json(), + } + } + end + + client.bulk({ + index: index_name, + type: document_type, + body: batch + }) + end + end + end + + def as_indexed_json(options = {}) + { + id: id, + updated_at: updated_at, + version: version, + updater_id: updater_id, + parent_id: parent_id, + rating: rating, + source: source, + + description_changed: false, + parent_id_changed: parent_changed, + source_changed: source_changed, + rating_changed: rating_changed, + + tags_added: added_tags, + tags_removed: removed_tags, + tags: tag_array, + + locked_tags_added: added_locked_tags, + locked_tags_removed: removed_locked_tags, + locked_tags: locked_tag_array + } + end +end diff --git a/app/logical/bulk_revert.rb b/app/logical/bulk_revert.rb index 320df0619..ef6950f9f 100644 --- a/app/logical/bulk_revert.rb +++ b/app/logical/bulk_revert.rb @@ -2,7 +2,8 @@ class BulkRevert BIG_QUERY_LIMIT = 5_000 attr_reader :constraints - class ConstraintTooGeneralError < Exception ; end + class ConstraintTooGeneralError < Exception; + end def process(creator, constraints = {}) @constraints = constraints @@ -26,42 +27,30 @@ class BulkRevert @_preview ||= find_post_versions end - def query_gbq(user_id, added_tags, removed_tags, min_version_id, max_version_id) - GoogleBigQuery::PostVersion.new.find(user_id, added_tags, removed_tags, min_version_id, max_version_id, BIG_QUERY_LIMIT) - end - def find_post_versions - q = PostArchive.where("true") - if constraints[:user_name] constraints[:user_id] = User.find_by_name(constraints[:user_name]).try(:id) end - if constraints[:user_id] - q = q.where("post_versions.updater_id = ?", constraints[:user_id]) - end + must = [] + must << {term: {updater_id: constraints[:user_id]}} if constraints[:user_id] + version_range = {range: {version: {}}} + version_range[:range][:version][:gte] = constraints[:min_version_id].to_i if constraints[:min_version_id] + version_range[:range][:version][:lte] = constraints[:max_version_id].to_i if constraints[:max_version_id] + must << version_range if constraints[:min_version_id] || constraints[:max_version_id] + must = must + constraints[:added_tags].split.map |x| {term: {tags_added: x}} if constraints[:added_tags] + must = must + constraints[:removed_tags].split.map |x| {terms: {tags_removed: x}} if constraints[:removed_tags] + q = PostArchive.__elasticsearch__.search({ + query: {bool: {filter: must}}, + sort: {id: :desc}, + size: BIG_QUERY_LIMIT + 1, + from: 0, + _source: false, + }) - if constraints[:added_tags] || constraints[:removed_tags] - hash = CityHash.hash64("#{constraints[:added_tags]} #{constraints{removed_tags}} #{constraints[:min_version_id]} #{constraints[:max_version_id]}").to_s(36) - sub_ids = Cache.get("br/fpv/#{hash}", 300) do - query_gbq(constraints[:user_id], constraints[:added_tags], constraints[:removed_tags], constraints[:min_version_id], constraints[:max_version_id]) - end - if sub_ids.size >= BIG_QUERY_LIMIT - raise ConstraintTooGeneralError.new - end + raise ConstraintTooGeneralError.new if q.results.total > BIG_QUERY_LIMIT - q = q.where("post_versions.id in (?)", sub_ids) - end - - if constraints[:min_version_id].present? - q = q.where("post_versions.id >= ?", constraints[:min_version_id]) - end - - if constraints[:max_version_id].present? - q = q.where("post_versions.id <= ?", constraints[:max_version_id]) - end - - q + q.records end end diff --git a/app/logical/google_big_query/base.rb b/app/logical/google_big_query/base.rb deleted file mode 100644 index 59ea23fd6..000000000 --- a/app/logical/google_big_query/base.rb +++ /dev/null @@ -1,44 +0,0 @@ -require "big_query" - -module GoogleBigQuery - class Base - def self.enabled? - File.exists?(Danbooru.config.google_api_json_key_path) - end - - def initialize - raise NotImplementedError.new("Google Big Query is not configured.") unless GoogleBigQuery::Base.enabled? - end - - def query(q) - client.query(q) - end - - def escape(s) - Regexp.escape(s).gsub(/\\/, '\0\0').gsub(/['"]/, '\\\\\0') - end - - def client - @_client ||= BigQuery::Client.new( - "json_key" => client_options[:google_key_path], - "project_id" => google_config["project_id"], - "dataset" => client_options[:google_data_set] - ) - end - - def client_options - @_client_options ||= { - google_key_path: Danbooru.config.google_api_json_key_path, - google_data_set: "danbooru_#{Rails.env}" - } - end - - def google_config - @_google_config ||= JSON.parse(File.read(client_options[:google_key_path])) - end - - def data_set - client_options[:google_data_set] - end - end -end diff --git a/app/logical/google_big_query/post_version.rb b/app/logical/google_big_query/post_version.rb deleted file mode 100644 index 7d30e8e63..000000000 --- a/app/logical/google_big_query/post_version.rb +++ /dev/null @@ -1,66 +0,0 @@ -module GoogleBigQuery - class PostVersion < Base - def find_removed(tag, limit = 1_000) - limit = limit.to_i - query("select id, post_id, updated_at, updater_id, updater_ip_addr, tags, added_tags, removed_tags, parent_id, rating, source from [#{data_set}.post_versions] where #{remove_tag_condition(tag)} order by updated_at desc limit #{limit}") - end - - def find_added(tag, limit = 1_000) - limit = limit.to_i - query("select id, post_id, updated_at, updater_id, updater_ip_addr, tags, added_tags, removed_tags, parent_id, rating, source from [#{data_set}.post_versions] where #{add_tag_condition(tag)} order by updated_at desc limit #{limit}") - end - - def add_tag_condition(t) - es = escape(t) - "regexp_match(added_tags, \"(?:^| )#{es}(?:$| )\")" - end - - def remove_tag_condition(t) - es = escape(t) - "regexp_match(removed_tags, \"(?:^| )#{es}(?:$| )\")" - end - - def find_for_post(post_id, created_at) - post_id = post_id.to_i - btime = created_at.strftime("%Y-%m-%d 00:00:00", created_at) - etime = 1.day.from(created_at).strftime("%Y-%m-%d 00:00:00") - "select updater_id, added_tag from [danbooru_#{Rails.env}].post_versions_flat_part where _partitiontime >= #{btime} and _partitiontime <= #{etime} and post_id = #{post_id}" - end - - def find(user_id, added_tags, removed_tags, min_version_id, max_version_id, limit = 1_000) - constraints = [] - - constraints << "updater_id = #{user_id.to_i}" - - if added_tags - added_tags.split.each do |tag| - constraints << add_tag_condition(tag) - end - end - - if removed_tags - removed_tags.split.each do |tag| - constraints << remove_tag_condition(tag) - end - end - - if min_version_id - constraints << "id >= #{min_version_id.to_i}" - end - - if max_version_id - constraints << "id <= #{max_version_id.to_i}" - end - - limit = limit.to_i - sql = "select id from [#{data_set}.post_versions] where " + constraints.join(" and ") + " order by updated_at desc limit #{limit}" - result = query(sql) - - if result["rows"] - result["rows"].map {|x| x["f"][0]["v"].to_i} - else - [] - end - end - end -end diff --git a/app/logical/reports/post_versions.rb b/app/logical/reports/post_versions.rb index 5e58c66a0..a3c3fe02b 100644 --- a/app/logical/reports/post_versions.rb +++ b/app/logical/reports/post_versions.rb @@ -5,11 +5,12 @@ module Reports def initialize(tag, query_type) @tag = tag - if query_type == "added" - @query = GoogleBigQuery::PostVersion.new.find_added(tag) - else - @query = GoogleBigQuery::PostVersion.new.find_removed(tag) - end + # TODO: Convert to Elasticsearch + # if query_type == "added" + # @query = GoogleBigQuery::PostVersion.new.find_added(tag) + # else + # @query = GoogleBigQuery::PostVersion.new.find_removed(tag) + # end end def mock_version(row) diff --git a/app/models/post.rb b/app/models/post.rb index 67de89308..69d2f3e15 100644 --- a/app/models/post.rb +++ b/app/models/post.rb @@ -1,5 +1,4 @@ require 'danbooru/has_bit_flags' -require 'google/apis/pubsub_v1' class Post < ApplicationRecord class ApprovalError < Exception ; end @@ -1476,9 +1475,7 @@ class Post < ApplicationRecord end def notify_pubsub - return unless Danbooru.config.google_api_project - - # PostUpdate.insert(id) + # NOTE: Left as a potentially useful hook into post updating. end end diff --git a/app/models/post_archive.rb b/app/models/post_archive.rb index f10322ab0..c762e9590 100644 --- a/app/models/post_archive.rb +++ b/app/models/post_archive.rb @@ -54,6 +54,8 @@ class PostArchive < ApplicationRecord end extend SearchMethods + include Indexable + include PostVersionIndex def self.queue(post) self.create({ diff --git a/app/models/post_update.rb b/app/models/post_update.rb deleted file mode 100644 index c755b06b1..000000000 --- a/app/models/post_update.rb +++ /dev/null @@ -1,23 +0,0 @@ -class PostUpdate - def self.insert(post_id) - ActiveRecord::Base.execute_sql("insert into post_updates (post_id) values (?)", post_id) - end - - def self.get - ActiveRecord::Base.select_values_sql("delete from post_updates returning post_id").uniq - end - - def self.push - return unless Danbooru.config.google_api_project - - pubsub = Google::Apis::PubsubV1::PubsubService.new - pubsub.authorization = Google::Auth.get_application_default([Google::Apis::PubsubV1::AUTH_PUBSUB]) - topic = "projects/#{Danbooru.config.google_api_project}/topics/post_updates" - post_ids = get() - - post_ids.in_groups_of(1_000, false).each do |group| - request = Google::Apis::PubsubV1::PublishRequest.new(messages: group.map {|x| Google::Apis::PubsubV1::Message.new(data: x.to_s)}) - pubsub.publish_topic(topic, request) - end - end -end diff --git a/bin/setup b/bin/setup old mode 100755 new mode 100644 index 98bdf974b..1715b2ba3 --- a/bin/setup +++ b/bin/setup @@ -31,6 +31,8 @@ chdir APP_ROOT do puts "\n== Preparing search indices ==" system! 'bin/rails r Post.__elasticsearch__.create_index!' system! 'bin/rails r Post.import' + system! 'bin/rails r PostArchive.__elasticsearch__.create_index!' + system! 'bin/rails r PostArchive.import' puts "\n== Removing old logs and tempfiles ==" system! 'bin/rails log:clear tmp:clear' diff --git a/config/danbooru_default_config.rb b/config/danbooru_default_config.rb index 4455e5ecb..2e1488f3d 100644 --- a/config/danbooru_default_config.rb +++ b/config/danbooru_default_config.rb @@ -705,14 +705,6 @@ module Danbooru def iqdbs_server end - # google api options - def google_api_project - end - - def google_api_json_key_path - "/var/www/danbooru2/shared/config/google-key.json" - end - # Use a recaptcha on the signup page to protect against spambots creating new accounts. # https://developers.google.com/recaptcha/intro def enable_recaptcha?