Replace google cloud with ElasticSearch

Most of this was related to use of BigQuery for post versions and
generating reports and bulk reverts, but elasticsearch can easily
handle returning 5k results in a few ms, so this isn't a big deal.

There is also some pubsub stuff which has been gone for a while now.
I don't remember what it was being used for, but it is likely reporting
items, so not an immediate priority.
This commit is contained in:
Kira 2019-05-16 16:29:08 -07:00
parent 2e1403f58a
commit a2a26e25d4
10 changed files with 115 additions and 180 deletions

View File

@ -0,0 +1,85 @@
# frozen_string_literal: true
module PostVersionIndex
def self.included(base)
base.settings index: {number_of_shards: 5, number_of_replicas: 1} do
mappings dynamic: false, _all: {enabled: false} do
indexes :id, type: 'integer'
indexes :version, type: 'integer'
indexes :updater_id, type: 'integer'
indexes :parent_id, type: 'integer'
indexes :rating, type: 'keyword'
indexes :source, type: 'keyword'
indexes :description, type: 'text'
indexes :description_changed, type: 'boolean'
indexes :parent_id_changed, type: 'boolean'
indexes :source_changed, type: 'boolean'
indexes :rating_changed, type: 'boolean'
indexes :tags_added, type: 'keyword'
indexes :tags_removed, type: 'keyword'
indexes :tags, type: 'keyword'
indexes :updated_at, type: 'date'
indexes :locked_tags_added, type: 'keyword'
indexes :locked_tags_removed, type: 'keyword'
indexes :locked_tags, type: 'keyword'
end
end
base.__elasticsearch__.extend ClassMethods
end
module ClassMethods
def import(options = {})
q = all
q = q.where("id <= ?", options[:from]) if options[:from]
q = q.where("id >= ?", options[:to]) if options[:to]
q = q.where(options[:query]) if options[:query]
q.find_in_batches do |batch|
batch.map! do |pv|
{
index: {
_id: pv.id,
data: pv.as_indexed_json(),
}
}
end
client.bulk({
index: index_name,
type: document_type,
body: batch
})
end
end
end
def as_indexed_json(options = {})
{
id: id,
updated_at: updated_at,
version: version,
updater_id: updater_id,
parent_id: parent_id,
rating: rating,
source: source,
description_changed: false,
parent_id_changed: parent_changed,
source_changed: source_changed,
rating_changed: rating_changed,
tags_added: added_tags,
tags_removed: removed_tags,
tags: tag_array,
locked_tags_added: added_locked_tags,
locked_tags_removed: removed_locked_tags,
locked_tags: locked_tag_array
}
end
end

View File

@ -2,7 +2,8 @@ class BulkRevert
BIG_QUERY_LIMIT = 5_000
attr_reader :constraints
class ConstraintTooGeneralError < Exception ; end
class ConstraintTooGeneralError < Exception;
end
def process(creator, constraints = {})
@constraints = constraints
@ -26,42 +27,30 @@ class BulkRevert
@_preview ||= find_post_versions
end
def query_gbq(user_id, added_tags, removed_tags, min_version_id, max_version_id)
GoogleBigQuery::PostVersion.new.find(user_id, added_tags, removed_tags, min_version_id, max_version_id, BIG_QUERY_LIMIT)
end
def find_post_versions
q = PostArchive.where("true")
if constraints[:user_name]
constraints[:user_id] = User.find_by_name(constraints[:user_name]).try(:id)
end
if constraints[:user_id]
q = q.where("post_versions.updater_id = ?", constraints[:user_id])
end
must = []
must << {term: {updater_id: constraints[:user_id]}} if constraints[:user_id]
version_range = {range: {version: {}}}
version_range[:range][:version][:gte] = constraints[:min_version_id].to_i if constraints[:min_version_id]
version_range[:range][:version][:lte] = constraints[:max_version_id].to_i if constraints[:max_version_id]
must << version_range if constraints[:min_version_id] || constraints[:max_version_id]
must = must + constraints[:added_tags].split.map |x| {term: {tags_added: x}} if constraints[:added_tags]
must = must + constraints[:removed_tags].split.map |x| {terms: {tags_removed: x}} if constraints[:removed_tags]
q = PostArchive.__elasticsearch__.search({
query: {bool: {filter: must}},
sort: {id: :desc},
size: BIG_QUERY_LIMIT + 1,
from: 0,
_source: false,
})
if constraints[:added_tags] || constraints[:removed_tags]
hash = CityHash.hash64("#{constraints[:added_tags]} #{constraints{removed_tags}} #{constraints[:min_version_id]} #{constraints[:max_version_id]}").to_s(36)
sub_ids = Cache.get("br/fpv/#{hash}", 300) do
query_gbq(constraints[:user_id], constraints[:added_tags], constraints[:removed_tags], constraints[:min_version_id], constraints[:max_version_id])
end
if sub_ids.size >= BIG_QUERY_LIMIT
raise ConstraintTooGeneralError.new
end
raise ConstraintTooGeneralError.new if q.results.total > BIG_QUERY_LIMIT
q = q.where("post_versions.id in (?)", sub_ids)
end
if constraints[:min_version_id].present?
q = q.where("post_versions.id >= ?", constraints[:min_version_id])
end
if constraints[:max_version_id].present?
q = q.where("post_versions.id <= ?", constraints[:max_version_id])
end
q
q.records
end
end

View File

@ -1,44 +0,0 @@
require "big_query"
module GoogleBigQuery
class Base
def self.enabled?
File.exists?(Danbooru.config.google_api_json_key_path)
end
def initialize
raise NotImplementedError.new("Google Big Query is not configured.") unless GoogleBigQuery::Base.enabled?
end
def query(q)
client.query(q)
end
def escape(s)
Regexp.escape(s).gsub(/\\/, '\0\0').gsub(/['"]/, '\\\\\0')
end
def client
@_client ||= BigQuery::Client.new(
"json_key" => client_options[:google_key_path],
"project_id" => google_config["project_id"],
"dataset" => client_options[:google_data_set]
)
end
def client_options
@_client_options ||= {
google_key_path: Danbooru.config.google_api_json_key_path,
google_data_set: "danbooru_#{Rails.env}"
}
end
def google_config
@_google_config ||= JSON.parse(File.read(client_options[:google_key_path]))
end
def data_set
client_options[:google_data_set]
end
end
end

View File

@ -1,66 +0,0 @@
module GoogleBigQuery
class PostVersion < Base
def find_removed(tag, limit = 1_000)
limit = limit.to_i
query("select id, post_id, updated_at, updater_id, updater_ip_addr, tags, added_tags, removed_tags, parent_id, rating, source from [#{data_set}.post_versions] where #{remove_tag_condition(tag)} order by updated_at desc limit #{limit}")
end
def find_added(tag, limit = 1_000)
limit = limit.to_i
query("select id, post_id, updated_at, updater_id, updater_ip_addr, tags, added_tags, removed_tags, parent_id, rating, source from [#{data_set}.post_versions] where #{add_tag_condition(tag)} order by updated_at desc limit #{limit}")
end
def add_tag_condition(t)
es = escape(t)
"regexp_match(added_tags, \"(?:^| )#{es}(?:$| )\")"
end
def remove_tag_condition(t)
es = escape(t)
"regexp_match(removed_tags, \"(?:^| )#{es}(?:$| )\")"
end
def find_for_post(post_id, created_at)
post_id = post_id.to_i
btime = created_at.strftime("%Y-%m-%d 00:00:00", created_at)
etime = 1.day.from(created_at).strftime("%Y-%m-%d 00:00:00")
"select updater_id, added_tag from [danbooru_#{Rails.env}].post_versions_flat_part where _partitiontime >= #{btime} and _partitiontime <= #{etime} and post_id = #{post_id}"
end
def find(user_id, added_tags, removed_tags, min_version_id, max_version_id, limit = 1_000)
constraints = []
constraints << "updater_id = #{user_id.to_i}"
if added_tags
added_tags.split.each do |tag|
constraints << add_tag_condition(tag)
end
end
if removed_tags
removed_tags.split.each do |tag|
constraints << remove_tag_condition(tag)
end
end
if min_version_id
constraints << "id >= #{min_version_id.to_i}"
end
if max_version_id
constraints << "id <= #{max_version_id.to_i}"
end
limit = limit.to_i
sql = "select id from [#{data_set}.post_versions] where " + constraints.join(" and ") + " order by updated_at desc limit #{limit}"
result = query(sql)
if result["rows"]
result["rows"].map {|x| x["f"][0]["v"].to_i}
else
[]
end
end
end
end

View File

@ -5,11 +5,12 @@ module Reports
def initialize(tag, query_type)
@tag = tag
if query_type == "added"
@query = GoogleBigQuery::PostVersion.new.find_added(tag)
else
@query = GoogleBigQuery::PostVersion.new.find_removed(tag)
end
# TODO: Convert to Elasticsearch
# if query_type == "added"
# @query = GoogleBigQuery::PostVersion.new.find_added(tag)
# else
# @query = GoogleBigQuery::PostVersion.new.find_removed(tag)
# end
end
def mock_version(row)

View File

@ -1,5 +1,4 @@
require 'danbooru/has_bit_flags'
require 'google/apis/pubsub_v1'
class Post < ApplicationRecord
class ApprovalError < Exception ; end
@ -1476,9 +1475,7 @@ class Post < ApplicationRecord
end
def notify_pubsub
return unless Danbooru.config.google_api_project
# PostUpdate.insert(id)
# NOTE: Left as a potentially useful hook into post updating.
end
end

View File

@ -54,6 +54,8 @@ class PostArchive < ApplicationRecord
end
extend SearchMethods
include Indexable
include PostVersionIndex
def self.queue(post)
self.create({

View File

@ -1,23 +0,0 @@
class PostUpdate
def self.insert(post_id)
ActiveRecord::Base.execute_sql("insert into post_updates (post_id) values (?)", post_id)
end
def self.get
ActiveRecord::Base.select_values_sql("delete from post_updates returning post_id").uniq
end
def self.push
return unless Danbooru.config.google_api_project
pubsub = Google::Apis::PubsubV1::PubsubService.new
pubsub.authorization = Google::Auth.get_application_default([Google::Apis::PubsubV1::AUTH_PUBSUB])
topic = "projects/#{Danbooru.config.google_api_project}/topics/post_updates"
post_ids = get()
post_ids.in_groups_of(1_000, false).each do |group|
request = Google::Apis::PubsubV1::PublishRequest.new(messages: group.map {|x| Google::Apis::PubsubV1::Message.new(data: x.to_s)})
pubsub.publish_topic(topic, request)
end
end
end

2
bin/setup Executable file → Normal file
View File

@ -31,6 +31,8 @@ chdir APP_ROOT do
puts "\n== Preparing search indices =="
system! 'bin/rails r Post.__elasticsearch__.create_index!'
system! 'bin/rails r Post.import'
system! 'bin/rails r PostArchive.__elasticsearch__.create_index!'
system! 'bin/rails r PostArchive.import'
puts "\n== Removing old logs and tempfiles =="
system! 'bin/rails log:clear tmp:clear'

View File

@ -705,14 +705,6 @@ module Danbooru
def iqdbs_server
end
# google api options
def google_api_project
end
def google_api_json_key_path
"/var/www/danbooru2/shared/config/google-key.json"
end
# Use a recaptcha on the signup page to protect against spambots creating new accounts.
# https://developers.google.com/recaptcha/intro
def enable_recaptcha?