Add new public status index (#26344)
Co-authored-by: Eugen Rochko <eugen@zeonfederated.com> Co-authored-by: Claire <claire.github-309c@sitedethib.com>local
parent
96bcee66fb
commit
30c191aaa0
28 changed files with 584 additions and 87 deletions
@ -0,0 +1,50 @@ |
||||
# frozen_string_literal: true |
||||
|
||||
class PublicStatusesIndex < Chewy::Index |
||||
settings index: index_preset(refresh_interval: '30s', number_of_shards: 5), analysis: { |
||||
filter: { |
||||
english_stop: { |
||||
type: 'stop', |
||||
stopwords: '_english_', |
||||
}, |
||||
|
||||
english_stemmer: { |
||||
type: 'stemmer', |
||||
language: 'english', |
||||
}, |
||||
|
||||
english_possessive_stemmer: { |
||||
type: 'stemmer', |
||||
language: 'possessive_english', |
||||
}, |
||||
}, |
||||
|
||||
analyzer: { |
||||
content: { |
||||
tokenizer: 'uax_url_email', |
||||
filter: %w( |
||||
english_possessive_stemmer |
||||
lowercase |
||||
asciifolding |
||||
cjk_width |
||||
english_stop |
||||
english_stemmer |
||||
), |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
index_scope ::Status.unscoped |
||||
.kept |
||||
.indexable |
||||
.includes(:media_attachments, :preloadable_poll, :preview_cards) |
||||
|
||||
root date_detection: false do |
||||
field(:id, type: 'keyword') |
||||
field(:account_id, type: 'long') |
||||
field(:text, type: 'text', analyzer: 'whitespace', value: ->(status) { status.searchable_text }) { field(:stemmed, type: 'text', analyzer: 'content') } |
||||
field(:language, type: 'keyword') |
||||
field(:properties, type: 'keyword', value: ->(status) { status.searchable_properties }) |
||||
field(:created_at, type: 'date') |
||||
end |
||||
end |
@ -0,0 +1,41 @@ |
||||
# frozen_string_literal: true |
||||
|
||||
class Importer::PublicStatusesIndexImporter < Importer::BaseImporter |
||||
def import! |
||||
indexable_statuses_scope.find_in_batches(batch_size: @batch_size) do |batch| |
||||
in_work_unit(batch.map(&:status_id)) do |status_ids| |
||||
bulk = ActiveRecord::Base.connection_pool.with_connection do |
||||
Chewy::Index::Import::BulkBuilder.new(index, to_index: Status.includes(:media_attachments, :preloadable_poll).where(id: status_ids)).bulk_body |
||||
end |
||||
|
||||
indexed = 0 |
||||
deleted = 0 |
||||
|
||||
bulk.map! do |entry| |
||||
if entry[:index] |
||||
indexed += 1 |
||||
else |
||||
deleted += 1 |
||||
end |
||||
entry |
||||
end |
||||
|
||||
Chewy::Index::Import::BulkRequest.new(index).perform(bulk) |
||||
|
||||
[indexed, deleted] |
||||
end |
||||
end |
||||
|
||||
wait! |
||||
end |
||||
|
||||
private |
||||
|
||||
def index |
||||
PublicStatusesIndex |
||||
end |
||||
|
||||
def indexable_statuses_scope |
||||
Status.indexable.select('"statuses"."id", COALESCE("statuses"."reblog_of_id", "statuses"."id") AS status_id') |
||||
end |
||||
end |
@ -0,0 +1,44 @@ |
||||
# frozen_string_literal: true |
||||
|
||||
module AccountStatusesSearch |
||||
extend ActiveSupport::Concern |
||||
|
||||
included do |
||||
after_update_commit :enqueue_update_public_statuses_index, if: :saved_change_to_indexable? |
||||
after_destroy_commit :enqueue_remove_from_public_statuses_index, if: :indexable? |
||||
end |
||||
|
||||
def enqueue_update_public_statuses_index |
||||
if indexable? |
||||
enqueue_add_to_public_statuses_index |
||||
else |
||||
enqueue_remove_from_public_statuses_index |
||||
end |
||||
end |
||||
|
||||
def enqueue_add_to_public_statuses_index |
||||
return unless Chewy.enabled? |
||||
|
||||
AddToPublicStatusesIndexWorker.perform_async(id) |
||||
end |
||||
|
||||
def enqueue_remove_from_public_statuses_index |
||||
return unless Chewy.enabled? |
||||
|
||||
RemoveFromPublicStatusesIndexWorker.perform_async(id) |
||||
end |
||||
|
||||
def add_to_public_statuses_index! |
||||
return unless Chewy.enabled? |
||||
|
||||
statuses.indexable.find_in_batches do |batch| |
||||
PublicStatusesIndex.import(query: batch) |
||||
end |
||||
end |
||||
|
||||
def remove_from_public_statuses_index! |
||||
return unless Chewy.enabled? |
||||
|
||||
PublicStatusesIndex.filter(term: { account_id: id }).delete_all |
||||
end |
||||
end |
@ -0,0 +1,54 @@ |
||||
# frozen_string_literal: true |
||||
|
||||
module StatusSearchConcern |
||||
extend ActiveSupport::Concern |
||||
|
||||
included do |
||||
scope :indexable, -> { without_reblogs.where(visibility: :public).joins(:account).where(account: { indexable: true }) } |
||||
end |
||||
|
||||
def searchable_by(preloaded = nil) |
||||
ids = [] |
||||
|
||||
ids << account_id if local? |
||||
|
||||
if preloaded.nil? |
||||
ids += mentions.joins(:account).merge(Account.local).active.pluck(:account_id) |
||||
ids += favourites.joins(:account).merge(Account.local).pluck(:account_id) |
||||
ids += reblogs.joins(:account).merge(Account.local).pluck(:account_id) |
||||
ids += bookmarks.joins(:account).merge(Account.local).pluck(:account_id) |
||||
ids += poll.votes.joins(:account).merge(Account.local).pluck(:account_id) if poll.present? |
||||
else |
||||
ids += preloaded.mentions[id] || [] |
||||
ids += preloaded.favourites[id] || [] |
||||
ids += preloaded.reblogs[id] || [] |
||||
ids += preloaded.bookmarks[id] || [] |
||||
ids += preloaded.votes[id] || [] |
||||
end |
||||
|
||||
ids.uniq |
||||
end |
||||
|
||||
def searchable_text |
||||
[ |
||||
spoiler_text, |
||||
FormattingHelper.extract_status_plain_text(self), |
||||
preloadable_poll&.options&.join("\n\n"), |
||||
ordered_media_attachments.map(&:description).join("\n\n"), |
||||
].compact.join("\n\n") |
||||
end |
||||
|
||||
def searchable_properties |
||||
[].tap do |properties| |
||||
properties << 'image' if ordered_media_attachments.any?(&:image?) |
||||
properties << 'video' if ordered_media_attachments.any?(&:video?) |
||||
properties << 'audio' if ordered_media_attachments.any?(&:audio?) |
||||
properties << 'media' if with_media? |
||||
properties << 'poll' if with_poll? |
||||
properties << 'link' if with_preview_card? |
||||
properties << 'embed' if preview_cards.any?(&:video?) |
||||
properties << 'sensitive' if sensitive? |
||||
properties << 'reply' if reply? |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,75 @@ |
||||
# frozen_string_literal: true |
||||
|
||||
class StatusesSearchService < BaseService |
||||
def call(query, account = nil, options = {}) |
||||
@query = query&.strip |
||||
@account = account |
||||
@options = options |
||||
@limit = options[:limit].to_i |
||||
@offset = options[:offset].to_i |
||||
|
||||
status_search_results |
||||
end |
||||
|
||||
private |
||||
|
||||
def status_search_results |
||||
definition = parsed_query.apply( |
||||
StatusesIndex.filter( |
||||
bool: { |
||||
should: [ |
||||
publicly_searchable, |
||||
non_publicly_searchable, |
||||
], |
||||
|
||||
minimum_should_match: 1, |
||||
} |
||||
) |
||||
) |
||||
|
||||
# This is the best way to submit identical queries to multi-indexes though chewy |
||||
definition.instance_variable_get(:@parameters)[:indices].value[:indices] << PublicStatusesIndex |
||||
|
||||
results = definition.collapse(field: :id).order(_id: { order: :desc }).limit(@limit).offset(@offset).objects.compact |
||||
account_ids = results.map(&:account_id) |
||||
account_domains = results.map(&:account_domain) |
||||
preloaded_relations = @account.relations_map(account_ids, account_domains) |
||||
|
||||
results.reject { |status| StatusFilter.new(status, @account, preloaded_relations).filtered? } |
||||
rescue Faraday::ConnectionFailed, Parslet::ParseFailed |
||||
[] |
||||
end |
||||
|
||||
def publicly_searchable |
||||
{ |
||||
bool: { |
||||
must_not: { |
||||
exists: { |
||||
field: 'searchable_by', |
||||
}, |
||||
}, |
||||
}, |
||||
} |
||||
end |
||||
|
||||
def non_publicly_searchable |
||||
{ |
||||
bool: { |
||||
must: [ |
||||
{ |
||||
exists: { |
||||
field: 'searchable_by', |
||||
}, |
||||
}, |
||||
{ |
||||
term: { searchable_by: @account.id }, |
||||
}, |
||||
], |
||||
}, |
||||
} |
||||
end |
||||
|
||||
def parsed_query |
||||
SearchQueryTransformer.new.apply(SearchQueryParser.new.parse(@query)) |
||||
end |
||||
end |
@ -0,0 +1,15 @@ |
||||
# frozen_string_literal: true |
||||
|
||||
class AddToPublicStatusesIndexWorker |
||||
include Sidekiq::Worker |
||||
|
||||
def perform(account_id) |
||||
account = Account.find(account_id) |
||||
|
||||
return unless account.indexable? |
||||
|
||||
account.add_to_public_statuses_index! |
||||
rescue ActiveRecord::RecordNotFound |
||||
true |
||||
end |
||||
end |
@ -0,0 +1,15 @@ |
||||
# frozen_string_literal: true |
||||
|
||||
class RemoveFromPublicStatusesIndexWorker |
||||
include Sidekiq::Worker |
||||
|
||||
def perform(account_id) |
||||
account = Account.find(account_id) |
||||
|
||||
return if account.indexable? |
||||
|
||||
account.remove_from_public_statuses_index! |
||||
rescue ActiveRecord::RecordNotFound |
||||
true |
||||
end |
||||
end |
@ -0,0 +1,31 @@ |
||||
# frozen_string_literal: true |
||||
|
||||
require 'rails_helper' |
||||
|
||||
describe PublicStatusesIndex do |
||||
describe 'Searching the index' do |
||||
before do |
||||
mock_elasticsearch_response(described_class, raw_response) |
||||
end |
||||
|
||||
it 'returns results from a query' do |
||||
results = described_class.query(match: { name: 'status' }) |
||||
|
||||
expect(results).to eq [] |
||||
end |
||||
end |
||||
|
||||
def raw_response |
||||
{ |
||||
took: 3, |
||||
hits: { |
||||
hits: [ |
||||
{ |
||||
_id: '0', |
||||
_score: 1.6375021, |
||||
}, |
||||
], |
||||
}, |
||||
} |
||||
end |
||||
end |
@ -0,0 +1,16 @@ |
||||
# frozen_string_literal: true |
||||
|
||||
require 'rails_helper' |
||||
|
||||
describe Importer::PublicStatusesIndexImporter do |
||||
describe 'import!' do |
||||
let(:pool) { Concurrent::FixedThreadPool.new(5) } |
||||
let(:importer) { described_class.new(batch_size: 123, executor: pool) } |
||||
|
||||
before { Fabricate(:status, account: Fabricate(:account, indexable: true)) } |
||||
|
||||
it 'indexes relevant statuses' do |
||||
expect { importer.import! }.to update_index(PublicStatusesIndex) |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,66 @@ |
||||
# frozen_string_literal: true |
||||
|
||||
require 'rails_helper' |
||||
|
||||
describe AccountStatusesSearch do |
||||
let(:account) { Fabricate(:account, indexable: indexable) } |
||||
|
||||
before do |
||||
allow(Chewy).to receive(:enabled?).and_return(true) |
||||
end |
||||
|
||||
describe '#enqueue_update_public_statuses_index' do |
||||
before do |
||||
allow(account).to receive(:enqueue_add_to_public_statuses_index) |
||||
allow(account).to receive(:enqueue_remove_from_public_statuses_index) |
||||
end |
||||
|
||||
context 'when account is indexable' do |
||||
let(:indexable) { true } |
||||
|
||||
it 'enqueues add_to_public_statuses_index and not to remove_from_public_statuses_index' do |
||||
account.enqueue_update_public_statuses_index |
||||
expect(account).to have_received(:enqueue_add_to_public_statuses_index) |
||||
expect(account).to_not have_received(:enqueue_remove_from_public_statuses_index) |
||||
end |
||||
end |
||||
|
||||
context 'when account is not indexable' do |
||||
let(:indexable) { false } |
||||
|
||||
it 'enqueues remove_from_public_statuses_index and not to add_to_public_statuses_index' do |
||||
account.enqueue_update_public_statuses_index |
||||
expect(account).to have_received(:enqueue_remove_from_public_statuses_index) |
||||
expect(account).to_not have_received(:enqueue_add_to_public_statuses_index) |
||||
end |
||||
end |
||||
end |
||||
|
||||
describe '#enqueue_add_to_public_statuses_index' do |
||||
let(:indexable) { true } |
||||
let(:worker) { AddToPublicStatusesIndexWorker } |
||||
|
||||
before do |
||||
allow(worker).to receive(:perform_async) |
||||
end |
||||
|
||||
it 'enqueues AddToPublicStatusesIndexWorker' do |
||||
account.enqueue_add_to_public_statuses_index |
||||
expect(worker).to have_received(:perform_async).with(account.id) |
||||
end |
||||
end |
||||
|
||||
describe '#enqueue_remove_from_public_statuses_index' do |
||||
let(:indexable) { false } |
||||
let(:worker) { RemoveFromPublicStatusesIndexWorker } |
||||
|
||||
before do |
||||
allow(worker).to receive(:perform_async) |
||||
end |
||||
|
||||
it 'enqueues RemoveFromPublicStatusesIndexWorker' do |
||||
account.enqueue_remove_from_public_statuses_index |
||||
expect(worker).to have_received(:perform_async).with(account.id) |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,42 @@ |
||||
# frozen_string_literal: true |
||||
|
||||
require 'rails_helper' |
||||
|
||||
describe AddToPublicStatusesIndexWorker do |
||||
describe '#perform' do |
||||
let(:account) { Fabricate(:account, indexable: indexable) } |
||||
let(:account_id) { account.id } |
||||
|
||||
before do |
||||
allow(Account).to receive(:find).with(account_id).and_return(account) unless account.nil? |
||||
allow(account).to receive(:add_to_public_statuses_index!) unless account.nil? |
||||
end |
||||
|
||||
context 'when account is indexable' do |
||||
let(:indexable) { true } |
||||
|
||||
it 'adds the account to the public statuses index' do |
||||
subject.perform(account_id) |
||||
expect(account).to have_received(:add_to_public_statuses_index!) |
||||
end |
||||
end |
||||
|
||||
context 'when account is not indexable' do |
||||
let(:indexable) { false } |
||||
|
||||
it 'does not add the account to public statuses index' do |
||||
subject.perform(account_id) |
||||
expect(account).to_not have_received(:add_to_public_statuses_index!) |
||||
end |
||||
end |
||||
|
||||
context 'when account does not exist' do |
||||
let(:account) { nil } |
||||
let(:account_id) { 999 } |
||||
|
||||
it 'does not raise an error' do |
||||
expect { subject.perform(account_id) }.to_not raise_error |
||||
end |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,42 @@ |
||||
# frozen_string_literal: true |
||||
|
||||
require 'rails_helper' |
||||
|
||||
describe RemoveFromPublicStatusesIndexWorker do |
||||
describe '#perform' do |
||||
let(:account) { Fabricate(:account, indexable: indexable) } |
||||
let(:account_id) { account.id } |
||||
|
||||
before do |
||||
allow(Account).to receive(:find).with(account_id).and_return(account) unless account.nil? |
||||
allow(account).to receive(:remove_from_public_statuses_index!) unless account.nil? |
||||
end |
||||
|
||||
context 'when account is not indexable' do |
||||
let(:indexable) { false } |
||||
|
||||
it 'removes the account from public statuses index' do |
||||
subject.perform(account_id) |
||||
expect(account).to have_received(:remove_from_public_statuses_index!) |
||||
end |
||||
end |
||||
|
||||
context 'when account is indexable' do |
||||
let(:indexable) { true } |
||||
|
||||
it 'does not remove the account from public statuses index' do |
||||
subject.perform(account_id) |
||||
expect(account).to_not have_received(:remove_from_public_statuses_index!) |
||||
end |
||||
end |
||||
|
||||
context 'when account does not exist' do |
||||
let(:account) { nil } |
||||
let(:account_id) { 999 } |
||||
|
||||
it 'does not raise an error' do |
||||
expect { subject.perform(account_id) }.to_not raise_error |
||||
end |
||||
end |
||||
end |
||||
end |
Loading…
Reference in new issue