Skip to content
代码片段 群组 项目
提交 8d80a0c9 编辑于 作者: Rodrigo Tomonari's avatar Rodrigo Tomonari 提交者: Douglas Barbosa Alexandre
浏览文件

Add service and worker to export projects in parallel

This change is part of series of changes that will allow projects to
be exported using multiple jobs

Changelog: other
上级 f4fb67db
No related branches found
No related tags found
无相关合并请求
......@@ -6,6 +6,13 @@ class ProjectExportJob < ApplicationRecord
validates :project, :jid, :status, presence: true
STATUS = {
queued: 0,
started: 1,
finished: 2,
failed: 3
}.freeze
state_machine :status, initial: :queued do
event :start do
transition [:queued] => :started
......@@ -19,9 +26,9 @@ class ProjectExportJob < ApplicationRecord
transition [:queued, :started] => :failed
end
state :queued, value: 0
state :started, value: 1
state :finished, value: 2
state :failed, value: 3
state :queued, value: STATUS[:queued]
state :started, value: STATUS[:started]
state :finished, value: STATUS[:finished]
state :failed, value: STATUS[:failed]
end
end
# frozen_string_literal: true
module Projects
module ImportExport
class ParallelExportService
def initialize(export_job, current_user, after_export_strategy)
@export_job = export_job
@current_user = current_user
@after_export_strategy = after_export_strategy
@shared = project.import_export_shared
@logger = Gitlab::Export::Logger.build
end
def execute
log_info('Parallel project export started')
if save_exporters && save_export_archive
log_info('Parallel project export finished successfully')
execute_after_export_action(after_export_strategy)
else
notify_error
end
ensure
cleanup
end
private
attr_reader :export_job, :current_user, :after_export_strategy, :shared, :logger
delegate :project, to: :export_job
def execute_after_export_action(after_export_strategy)
return if after_export_strategy.execute(current_user, project)
notify_error
end
def exporters
[version_saver, exported_relations_merger]
end
def save_exporters
exporters.all? do |exporter|
log_info("Parallel project export - #{exporter.class.name} saver started")
exporter.save
end
end
def save_export_archive
Gitlab::ImportExport::Saver.save(exportable: project, shared: shared)
end
def version_saver
@version_saver ||= Gitlab::ImportExport::VersionSaver.new(shared: shared)
end
def exported_relations_merger
@relation_saver ||= Gitlab::ImportExport::Project::ExportedRelationsMerger.new(
export_job: export_job,
shared: shared)
end
def cleanup
FileUtils.rm_rf(shared.export_path) if File.exist?(shared.export_path)
FileUtils.rm_rf(shared.archive_path) if File.exist?(shared.archive_path)
end
def log_info(message)
logger.info(
message: message,
**log_base_data
)
end
def notify_error
logger.error(
message: 'Parallel project export error',
export_errors: shared.errors.join(', '),
export_job_id: export_job.id,
**log_base_data
)
NotificationService.new.project_not_exported(project, current_user, shared.errors)
end
def log_base_data
{
project_id: project.id,
project_name: project.name,
project_path: project.full_path
}
end
end
end
end
......@@ -3009,6 +3009,15 @@
:weight: 1
:idempotent: false
:tags: []
- :name: projects_import_export_parallel_project_export
:worker_name: Projects::ImportExport::ParallelProjectExportWorker
:feature_category: :importers
:has_external_dependencies: false
:urgency: :low
:resource_boundary: :memory
:weight: 1
:idempotent: true
:tags: []
- :name: projects_import_export_relation_export
:worker_name: Projects::ImportExport::RelationExportWorker
:feature_category: :importers
......
# frozen_string_literal: true
module Projects
module ImportExport
class ParallelProjectExportWorker
include ApplicationWorker
include ExceptionBacktrace
idempotent!
data_consistency :always
deduplicate :until_executed
feature_category :importers
worker_resource_boundary :memory
urgency :low
loggable_arguments 1, 2
sidekiq_options retries: 3, dead: false, status_expiration: StuckExportJobsWorker::EXPORT_JOBS_EXPIRATION
sidekiq_retries_exhausted do |job, exception|
export_job = ProjectExportJob.find(job['args'].first)
export_job.fail_op!
project = export_job.project
log_payload = {
message: 'Parallel project export error',
export_error: job['error_message'],
project_export_job_id: export_job.id,
project_name: project.name,
project_id: project.id
}
Gitlab::ExceptionLogFormatter.format!(exception, log_payload)
Gitlab::Export::Logger.error(log_payload)
end
def perform(project_export_job_id, user_id, after_export_strategy = {})
export_job = ProjectExportJob.find(project_export_job_id)
return if export_job.finished?
export_job.update_attribute(:jid, jid)
current_user = User.find(user_id)
after_export = build!(after_export_strategy)
export_service = ::Projects::ImportExport::ParallelExportService.new(export_job, current_user, after_export)
export_service.execute
export_job.finish!
rescue Gitlab::ImportExport::AfterExportStrategyBuilder::StrategyNotFoundError
export_job.fail_op!
end
private
def build!(after_export_strategy)
strategy_klass = after_export_strategy&.delete('klass')
Gitlab::ImportExport::AfterExportStrategyBuilder.build!(strategy_klass, after_export_strategy)
end
end
end
end
......@@ -405,6 +405,8 @@
- 1
- - projects_git_garbage_collect
- 1
- - projects_import_export_parallel_project_export
- 1
- - projects_import_export_relation_export
- 1
- - projects_inactive_projects_deletion_notification
......
......@@ -245,9 +245,9 @@
it 'expands multiple queue groups correctly' do
expected_workers =
if Gitlab.ee?
[%w[chat_notification], %w[project_export projects_import_export_relation_export project_template_export]]
[%w[chat_notification], %w[project_export projects_import_export_parallel_project_export projects_import_export_relation_export project_template_export]]
else
[%w[chat_notification], %w[project_export projects_import_export_relation_export]]
[%w[chat_notification], %w[project_export projects_import_export_parallel_project_export projects_import_export_relation_export]]
end
expect(Gitlab::SidekiqCluster)
......
......@@ -4,5 +4,21 @@
factory :project_export_job do
project
jid { SecureRandom.hex(8) }
trait :queued do
status { ProjectExportJob::STATUS[:queued] }
end
trait :started do
status { ProjectExportJob::STATUS[:started] }
end
trait :finished do
status { ProjectExportJob::STATUS[:finished] }
end
trait :failed do
status { ProjectExportJob::STATUS[:failed] }
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Projects::ImportExport::ParallelExportService, feature_category: :importers do
let_it_be(:user) { create(:user) }
let(:export_job) { create(:project_export_job) }
let(:after_export_strategy) { Gitlab::ImportExport::AfterExportStrategies::DownloadNotificationStrategy.new }
let(:project) { export_job.project }
before do
allow_next_instance_of(Gitlab::ImportExport::Project::ExportedRelationsMerger) do |saver|
allow(saver).to receive(:save).and_return(true)
end
allow_next_instance_of(Gitlab::ImportExport::VersionSaver) do |saver|
allow(saver).to receive(:save).and_return(true)
end
end
describe '#execute' do
subject(:service) { described_class.new(export_job, user, after_export_strategy) }
it 'creates a project export archive file' do
expect(Gitlab::ImportExport::Saver).to receive(:save)
.with(exportable: project, shared: project.import_export_shared)
service.execute
end
it 'logs export progress' do
allow(Gitlab::ImportExport::Saver).to receive(:save).and_return(true)
logger = service.instance_variable_get(:@logger)
messages = [
'Parallel project export started',
'Parallel project export - Gitlab::ImportExport::VersionSaver saver started',
'Parallel project export - Gitlab::ImportExport::Project::ExportedRelationsMerger saver started',
'Parallel project export finished successfully'
]
messages.each do |message|
expect(logger).to receive(:info).ordered.with(hash_including(message: message))
end
service.execute
end
it 'executes after export stragegy on export success' do
allow(Gitlab::ImportExport::Saver).to receive(:save).and_return(true)
expect(after_export_strategy).to receive(:execute)
service.execute
end
it 'ensures files are cleaned up' do
shared = project.import_export_shared
FileUtils.mkdir_p(shared.archive_path)
FileUtils.mkdir_p(shared.export_path)
allow(Gitlab::ImportExport::Saver).to receive(:save).and_raise(StandardError)
expect { service.execute }.to raise_error(StandardError)
expect(File.exist?(shared.export_path)).to eq(false)
expect(File.exist?(shared.archive_path)).to eq(false)
end
context 'when export fails' do
it 'notifies the error to the user' do
allow(Gitlab::ImportExport::Saver).to receive(:save).and_return(false)
allow(project.import_export_shared).to receive(:errors).and_return(['Error'])
expect_next_instance_of(NotificationService) do |instance|
expect(instance).to receive(:project_not_exported).with(project, user, ['Error'])
end
service.execute
end
end
context 'when after export stragegy fails' do
it 'notifies the error to the user' do
allow(Gitlab::ImportExport::Saver).to receive(:save).and_return(true)
allow(after_export_strategy).to receive(:execute).and_return(false)
allow(project.import_export_shared).to receive(:errors).and_return(['Error'])
expect_next_instance_of(NotificationService) do |instance|
expect(instance).to receive(:project_not_exported).with(project, user, ['Error'])
end
service.execute
end
end
end
end
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Projects::ImportExport::ParallelProjectExportWorker, feature_category: :importers do
let_it_be(:user) { create(:user) }
let(:export_job) { create(:project_export_job, :started) }
let(:after_export_strategy) { {} }
let(:job_args) { [export_job.id, user.id, after_export_strategy] }
before do
allow_next_instance_of(described_class) do |job|
allow(job).to receive(:jid) { SecureRandom.hex(8) }
end
end
describe '#perform' do
it_behaves_like 'an idempotent worker' do
it 'sets the export job status to finished' do
subject
expect(export_job.reload.finished?).to eq(true)
end
end
context 'when after export strategy does not exist' do
let(:after_export_strategy) { { 'klass' => 'InvalidStrategy' } }
it 'sets the export job status to failed' do
described_class.new.perform(*job_args)
expect(export_job.reload.failed?).to eq(true)
end
end
end
describe '.sidekiq_retries_exhausted' do
let(:job) { { 'args' => job_args, 'error_message' => 'Error message' } }
it 'sets export_job status to failed' do
described_class.sidekiq_retries_exhausted_block.call(job)
expect(export_job.reload.failed?).to eq(true)
end
it 'logs an error message' do
expect_next_instance_of(Gitlab::Export::Logger) do |logger|
expect(logger).to receive(:error).with(
hash_including(
message: 'Parallel project export error',
export_error: 'Error message'
)
)
end
described_class.sidekiq_retries_exhausted_block.call(job)
end
end
end
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册