|
|
@ -4,7 +4,7 @@ class Scheduler::IndexingScheduler |
|
|
|
include Sidekiq::Worker |
|
|
|
include Sidekiq::Worker |
|
|
|
include Redisable |
|
|
|
include Redisable |
|
|
|
|
|
|
|
|
|
|
|
sidekiq_options retry: 0 |
|
|
|
sidekiq_options retry: 0, lock: :until_executed, lock_ttl: 1.day.to_i |
|
|
|
|
|
|
|
|
|
|
|
IMPORT_BATCH_SIZE = 1000 |
|
|
|
IMPORT_BATCH_SIZE = 1000 |
|
|
|
SCAN_BATCH_SIZE = 10 * IMPORT_BATCH_SIZE |
|
|
|
SCAN_BATCH_SIZE = 10 * IMPORT_BATCH_SIZE |
|
|
@ -16,9 +16,7 @@ class Scheduler::IndexingScheduler |
|
|
|
with_redis do |redis| |
|
|
|
with_redis do |redis| |
|
|
|
redis.sscan_each("chewy:queue:#{type.name}", count: SCAN_BATCH_SIZE).each_slice(IMPORT_BATCH_SIZE) do |ids| |
|
|
|
redis.sscan_each("chewy:queue:#{type.name}", count: SCAN_BATCH_SIZE).each_slice(IMPORT_BATCH_SIZE) do |ids| |
|
|
|
type.import!(ids) |
|
|
|
type.import!(ids) |
|
|
|
redis.pipelined do |pipeline| |
|
|
|
redis.srem("chewy:queue:#{type.name}", ids) |
|
|
|
pipeline.srem("chewy:queue:#{type.name}", ids) |
|
|
|
|
|
|
|
end |
|
|
|
|
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|