diff --git a/.rubocop_todo/rspec/named_subject.yml b/.rubocop_todo/rspec/named_subject.yml
index 69a4316468ed172b3ec9c48ec5306e16d569ef0c..e7222c709738df72e4e04d3f7623389f69a4c4b7 100644
--- a/.rubocop_todo/rspec/named_subject.yml
+++ b/.rubocop_todo/rspec/named_subject.yml
@@ -1694,7 +1694,6 @@ RSpec/NamedSubject:
     - 'spec/lib/atlassian/jira_connect/serializers/repository_entity_spec.rb'
     - 'spec/lib/atlassian/jira_connect/serializers/reviewer_entity_spec.rb'
     - 'spec/lib/backup/database_backup_error_spec.rb'
-    - 'spec/lib/backup/database_model_spec.rb'
     - 'spec/lib/backup/database_spec.rb'
     - 'spec/lib/backup/dump/postgres_spec.rb'
     - 'spec/lib/backup/files_spec.rb'
diff --git a/.rubocop_todo/style/inline_disable_annotation.yml b/.rubocop_todo/style/inline_disable_annotation.yml
index cc101f02cf456cb05d653acbe21f4dcacd856548..8a47183544e4eb087f3d6d7398feea3e375c3c17 100644
--- a/.rubocop_todo/style/inline_disable_annotation.yml
+++ b/.rubocop_todo/style/inline_disable_annotation.yml
@@ -2867,7 +2867,6 @@ Style/InlineDisableAnnotation:
     - 'spec/lib/api/base_spec.rb'
     - 'spec/lib/api/entities/wiki_page_spec.rb'
     - 'spec/lib/api/helpers/packages/npm_spec.rb'
-    - 'spec/lib/backup/database_model_spec.rb'
     - 'spec/lib/backup/database_spec.rb'
     - 'spec/lib/backup/manager_spec.rb'
     - 'spec/lib/banzai/filter/footnote_filter_spec.rb'
diff --git a/Gemfile b/Gemfile
index a52c32f427844abb2b9742c6e03bc2d8d39b08c9..d3b630928d0f6e9d24b2c28abe131ddfc402633f 100644
--- a/Gemfile
+++ b/Gemfile
@@ -38,9 +38,10 @@ gem 'gitlab-safe_request_store', path: 'gems/gitlab-safe_request_store' # ruboco
 # GitLab Monorepo Gems
 group :monorepo do
   gem 'gitlab-utils', path: 'gems/gitlab-utils' # rubocop:todo Gemfile/MissingFeatureCategory
-  gem 'gitlab-backup-cli', path: 'gems/gitlab-backup-cli', feature_category: :backup_restore
+gem 'gitlab-backup-cli', path: 'gems/gitlab-backup-cli', require: 'gitlab/backup/cli', feature_category: :backup_restore
 gem 'gitlab-secret_detection', path: 'gems/gitlab-secret_detection', feature_category: :secret_detection
 # Responders respond_to and respond_with
diff --git a/gems/gitlab-backup-cli/lib/gitlab/backup/cli.rb b/gems/gitlab-backup-cli/lib/gitlab/backup/cli.rb
index 01ea934863f24b3c26ba181375d10bbfb2621c9a..4c46dff868c873efdc534193ba961d66b7a550f3 100644
--- a/gems/gitlab-backup-cli/lib/gitlab/backup/cli.rb
+++ b/gems/gitlab-backup-cli/lib/gitlab/backup/cli.rb
@@ -6,6 +6,7 @@ module Backup
     module Cli
       autoload :VERSION, 'gitlab/backup/cli/version'
       autoload :Runner, 'gitlab/backup/cli/runner'
+      autoload :Utils, 'gitlab/backup/cli/utils'
       Error = Class.new(StandardError)
       # Your code goes here...
diff --git a/gems/gitlab-backup-cli/lib/gitlab/backup/cli/utils.rb b/gems/gitlab-backup-cli/lib/gitlab/backup/cli/utils.rb
new file mode 100644
index 0000000000000000000000000000000000000000..84e6c605172a5b42867610d60b695d7d83c56656
--- /dev/null
+++ b/gems/gitlab-backup-cli/lib/gitlab/backup/cli/utils.rb
@@ -0,0 +1,11 @@
+# frozen_string_literal: true
+module Gitlab
+  module Backup
+    module Cli
+      module Utils
+        autoload :PgDump, 'gitlab/backup/cli/utils/pg_dump'
+      end
+    end
+  end
diff --git a/gems/gitlab-backup-cli/lib/gitlab/backup/cli/utils/pg_dump.rb b/gems/gitlab-backup-cli/lib/gitlab/backup/cli/utils/pg_dump.rb
new file mode 100644
index 0000000000000000000000000000000000000000..6c16ca5cf06901d1714cf1c16642087d8ff864bd
--- /dev/null
+++ b/gems/gitlab-backup-cli/lib/gitlab/backup/cli/utils/pg_dump.rb
@@ -0,0 +1,59 @@
+# frozen_string_literal: true
+module Gitlab
+  module Backup
+    module Cli
+      module Utils
+        class PgDump
+          # Expose snapshot_id to be used when creating a database dump
+          # See https://www.postgresql.org/docs/14/functions-admin.html#FUNCTIONS-SNAPSHOT-SYNCHRONIZATION
+          attr_reader :snapshot_id
+          # Dump only specified database schemas instead of everything
+          attr_reader :schemas
+          # Database name
+          attr_reader :database_name
+          # Additional ENV variables to use when running PgDump
+          attr_reader :env
+          # @param [String] database_name
+          # @param [String] snapshot_id the snapshot id to use when creating a database dump
+          # @param [Array<String>] schemas
+          # @param [Hash<String,String>] env
+          def initialize(database_name:, snapshot_id: nil, schemas: [], env: {})
+            @database_name = database_name
+            @snapshot_id = snapshot_id
+            @schemas = schemas
+            @env = env
+          end
+          # Spawn a pg_dump process and assign a given output IO
+          #
+          # @param [IO] output the output IO
+          def spawn(output:)
+            Process.spawn(env, 'pg_dump', *cmd_args, out: output)
+          end
+          private
+          # Returns a list of arguments used by the pg_dump command
+          #
+          # @return [Array<String (frozen)>]
+          def cmd_args
+            args = ["--clean"] # Pass '--clean' to include 'DROP TABLE' statements in the DB dump.
+            args << '--if-exists'
+            args << "--snapshot=#{snapshot_id}" if snapshot_id
+            schemas.each do |schema|
+              args << '-n'
+              args << schema
+            end
+            args << database_name
+            args
+          end
+        end
+      end
+    end
+  end
diff --git a/gems/gitlab-backup-cli/sig/gitlab/backup/cli.rbs b/gems/gitlab-backup-cli/sig/gitlab/backup/cli.rbs
index 25540c06400753d3121a7fb32d459e1a9e9ac430..76b68239e30b66fc6dd4e7ea8bc24cc6c5dd6149 100644
--- a/gems/gitlab-backup-cli/sig/gitlab/backup/cli.rbs
+++ b/gems/gitlab-backup-cli/sig/gitlab/backup/cli.rbs
@@ -1,6 +1,7 @@
 module Gitlab
   module Backup
     module Cli
+      Error: StandardError
       VERSION: String
diff --git a/gems/gitlab-backup-cli/sig/gitlab/backup/cli/utils/pg_dump.rbs b/gems/gitlab-backup-cli/sig/gitlab/backup/cli/utils/pg_dump.rbs
new file mode 100644
index 0000000000000000000000000000000000000000..1718d0df6c0686e57d111754a506020125fb281e
--- /dev/null
+++ b/gems/gitlab-backup-cli/sig/gitlab/backup/cli/utils/pg_dump.rbs
@@ -0,0 +1,25 @@
+module Gitlab
+  module Backup
+    module Cli
+      module Utils
+        class PgDump
+          attr_reader snapshot_id: String
+          attr_reader schemas: Array[String]
+          attr_reader database_name: String
+          attr_reader env: Hash[String,String]
+          def initialize: (database_name: String, ?snapshot_id: String?, ?schemas: Array[String], ?env: Hash[String, String]) -> void
+          # Spawn a pg_dump process and assign a given output IO
+          #
+          # @param [IO] output the output IO
+          def spawn: (output: IO) -> Integer
+          private
+          def cmd_args: () -> Array[String]
+        end
+      end
+    end
+  end
diff --git a/gems/gitlab-backup-cli/spec/gitlab/backup/cli/utils/pg_dump_spec.rb b/gems/gitlab-backup-cli/spec/gitlab/backup/cli/utils/pg_dump_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..b1e8495c6375d53d5fe541a6be2cddc212ba1f9b
--- /dev/null
+++ b/gems/gitlab-backup-cli/spec/gitlab/backup/cli/utils/pg_dump_spec.rb
@@ -0,0 +1,81 @@
+# frozen_string_literal: true
+require 'spec_helper'
+RSpec.describe Gitlab::Backup::Cli::Utils::PgDump do
+  let(:cmd_args) { pg_dump.send(:cmd_args) }
+  let(:database_name) { 'gitlab_database' }
+  let(:env) do
+    {
+      'PGHOST' => '',
+      'PGPORT' => '5434'
+    }
+  end
+  subject(:pg_dump) { described_class.new(database_name: database_name) }
+  context 'with accessors' do
+    it { respond_to :database_name }
+    it { respond_to :snapshot_id }
+    it { respond_to :schemas }
+    it { respond_to :env }
+  end
+  describe '#cmd_args' do
+    let(:default_args) { %w[--clean --if-exists] }
+    context 'when no optional parameter is provided' do
+      it 'returns default arguments' do
+        expect(cmd_args).to eq(default_args << database_name)
+      end
+    end
+    context 'with custom snapshot_id' do
+      let(:snapshot_id) { '00000003-000001BF-1' }
+      subject(:pg_dump) { described_class.new(database_name: database_name, snapshot_id: snapshot_id) }
+      it 'adds a flag between default_args and the database name' do
+        expect(cmd_args).to eq(default_args + %W[--snapshot=#{snapshot_id} #{database_name}])
+      end
+    end
+    context 'with custom schemas' do
+      let(:schemas) { %w[public gitlab_partitions_dynamic gitlab_partitions_static] }
+      subject(:pg_dump) { described_class.new(database_name: database_name, schemas: schemas) }
+      it 'adds additional flags for each schema' do
+        schemas_args = %W[-n #{schemas[0]} -n #{schemas[1]} -n #{schemas[2]}]
+        expected_args = (default_args + schemas_args) << database_name
+        expect(cmd_args).to eq(expected_args)
+      end
+    end
+  end
+  describe '#spawn' do
+    it 'returns a spawned process' do
+      process = instance_double(Process)
+      expect(Process).to receive(:spawn).and_return(process)
+      expect(pg_dump.spawn(output: StringIO)).to eq(process)
+    end
+    it 'forwards cmd_args to Process spawn' do
+      expect(Process).to receive(:spawn).with({}, 'pg_dump', *cmd_args, any_args)
+      pg_dump.spawn(output: StringIO)
+    end
+    context 'when env variables are provided' do
+      subject(:pg_dump) { described_class.new(database_name: database_name, env: env) }
+      it 'forwards provided env variables to Process spawn' do
+        expect(Process).to receive(:spawn).with(env, 'pg_dump', any_args)
+        pg_dump.spawn(output: StringIO)
+      end
+    end
+  end
diff --git a/lib/backup/database.rb b/lib/backup/database.rb
index 27b672ff6b6d821c888413d3df0c5c6db2633276..a0eaccb1ca4f7f8a4ce4a912c08647ee5d38ad20 100644
--- a/lib/backup/database.rb
+++ b/lib/backup/database.rb
@@ -24,44 +24,37 @@ def initialize(progress, force:)
     override :dump
-    def dump(destination_dir, backup_id)
+    def dump(destination_dir, _)
-      each_database(destination_dir) do |database_name, current_db|
-        model = current_db[:model]
-        snapshot_id = current_db[:snapshot_id]
+      each_database(destination_dir) do |backup_connection|
+        pg_env = backup_connection.database_configuration.pg_env_variables
+        active_record_config = backup_connection.database_configuration.activerecord_variables
+        pg_database_name = active_record_config[:database]
-        pg_env = model.config[:pg_env]
-        connection = model.connection
-        active_record_config = model.config[:activerecord]
-        pg_database = active_record_config[:database]
+        dump_file_name = file_name(destination_dir, backup_connection.connection_name)
+        FileUtils.rm_f(dump_file_name)
-        db_file_name = file_name(destination_dir, database_name)
-        FileUtils.rm_f(db_file_name)
-        progress.print "Dumping PostgreSQL database #{pg_database} ... "
+        progress.print "Dumping PostgreSQL database #{pg_database_name} ... "
-        pgsql_args = ["--clean"] # Pass '--clean' to include 'DROP TABLE' statements in the DB dump.
-        pgsql_args << '--if-exists'
-        pgsql_args << "--snapshot=#{snapshot_id}" if snapshot_id
+        schemas = []
         if Gitlab.config.backup.pg_schema
-          pgsql_args << '-n'
-          pgsql_args << Gitlab.config.backup.pg_schema
-          Gitlab::Database::EXTRA_SCHEMAS.each do |schema|
-            pgsql_args << '-n'
-            pgsql_args << schema.to_s
-          end
+          schemas << Gitlab.config.backup.pg_schema
+          schemas.push(*Gitlab::Database::EXTRA_SCHEMAS.map(&:to_s))
-        success = with_transient_pg_env(pg_env) do
-          Backup::Dump::Postgres.new.dump(pg_database, db_file_name, pgsql_args)
-        end
+        pg_dump = ::Gitlab::Backup::Cli::Utils::PgDump.new(
+          database_name: pg_database_name,
+          snapshot_id: backup_connection.snapshot_id,
+          schemas: schemas,
+          env: pg_env)
+        success = Backup::Dump::Postgres.new.dump(dump_file_name, pg_dump)
-        connection.rollback_transaction if snapshot_id
+        backup_connection.release_snapshot! if backup_connection.snapshot_id
-        raise DatabaseBackupError.new(active_record_config, db_file_name) unless success
+        raise DatabaseBackupError.new(active_record_config, dump_file_name) unless success
@@ -76,10 +69,10 @@ def dump(destination_dir, backup_id)
     override :restore
     def restore(destination_dir, backup_id)
-      base_models_for_backup.each do |database_name, _base_model|
-        backup_model = Backup::DatabaseModel.new(database_name)
+      base_models_for_backup.each do |database_name, _|
+        backup_connection = Backup::DatabaseConnection.new(database_name)
-        config = backup_model.config[:activerecord]
+        config = backup_connection.database_configuration.activerecord_variables
         db_file_name = file_name(destination_dir, database_name)
         database = config[:database]
@@ -100,7 +93,7 @@ def restore(destination_dir, backup_id)
         # hanging out from a failed upgrade
-        pg_env = backup_model.config[:pg_env]
+        pg_env = backup_connection.database_configuration.pg_env_variables
         success = with_transient_pg_env(pg_env) do
           decompress_rd, decompress_wr = IO.pipe
           decompress_pid = spawn(decompress_cmd, out: decompress_wr, in: db_file_name)
@@ -235,6 +228,7 @@ def drop_tables(database_name)
       puts_time 'done'.color(:green)
+    # @deprecated This will be removed when restore operation is refactored to use extended_env directly
     def with_transient_pg_env(extended_env)
       result = yield
@@ -248,32 +242,36 @@ def pg_restore_cmd(database)
     def each_database(destination_dir, &block)
-      databases = {}
+      databases = []
+      # each connection will loop through all database connections defined in `database.yml`
+      # and reject the ones that are shared, so we don't get duplicates
+      #
+      # we consider a connection to be shared when it has `database_tasks: false`
         only: base_models_for_backup.keys, include_shared: false
-      ) do |_connection, name|
-        next if databases[name]
-        backup_model = Backup::DatabaseModel.new(name)
-        databases[name] = {
-          model: backup_model
-        }
+      ) do |_, database_connection_name|
+        backup_connection = Backup::DatabaseConnection.new(database_connection_name)
+        databases << backup_connection
-        next unless Gitlab::Database.database_mode == Gitlab::Database::MODE_MULTIPLE_DATABASES
-        connection = backup_model.connection
+        next unless multiple_databases?
-          Gitlab::Database::TransactionTimeoutSettings.new(connection).disable_timeouts
-          connection.begin_transaction(isolation: :repeatable_read)
-          databases[name][:snapshot_id] = connection.select_value("SELECT pg_export_snapshot()")
+          # Trigger a transaction snapshot export that will be used by pg_dump later on
+          backup_connection.export_snapshot!
         rescue ActiveRecord::ConnectionNotEstablished
-          raise Backup::DatabaseBackupError.new(backup_model.config[:activerecord], file_name(destination_dir, name))
+          raise Backup::DatabaseBackupError.new(
+            backup_connection.database_configuration.activerecord_variables,
+            file_name(destination_dir, database_connection_name)
+          )
+    def multiple_databases?
+      Gitlab::Database.database_mode == Gitlab::Database::MODE_MULTIPLE_DATABASES
+    end
diff --git a/lib/backup/database_configuration.rb b/lib/backup/database_configuration.rb
new file mode 100644
index 0000000000000000000000000000000000000000..1a6a476f9c104baf62a8a6059b299144a378aadd
--- /dev/null
+++ b/lib/backup/database_configuration.rb
@@ -0,0 +1,107 @@
+# frozen_string_literal: true
+module Backup
+  class DatabaseConfiguration
+    # Connection name is the key used in `config/database.yml` for multi-database connection configuration
+    #
+    # @return [String]
+    attr_reader :connection_name
+    # ActiveRecord base model that is configured to connect to the database identified by connection_name key
+    #
+    # @return [ActiveRecord::Base]
+    attr_reader :source_model
+    # Initializes configuration
+    #
+    # @param [String] connection_name the key from `database.yml` for multi-database connection configuration
+    def initialize(connection_name)
+      @connection_name = connection_name
+      @source_model = Gitlab::Database.database_base_models_with_gitlab_shared[connection_name] ||
+        Gitlab::Database.database_base_models_with_gitlab_shared['main']
+      @activerecord_database_config = ActiveRecord::Base.configurations.find_db_config(connection_name)
+    end
+    # ENV variables that can override each database configuration
+    # These are used along with OVERRIDE_PREFIX and database name
+    # @see #process_config_overrides!
+      username: 'PGUSER',
+      host: 'PGHOST',
+      port: 'PGPORT',
+      password: 'PGPASSWORD',
+      # SSL
+      sslmode: 'PGSSLMODE',
+      sslkey: 'PGSSLKEY',
+      sslcert: 'PGSSLCERT',
+      sslrootcert: 'PGSSLROOTCERT',
+      sslcrl: 'PGSSLCRL',
+      sslcompression: 'PGSSLCOMPRESSION'
+    }.freeze
+    # Prefixes used for ENV variables overriding database configuration
+    # Return the HashConfig for the database
+    #
+    # @return [ActiveRecord::DatabaseConfigurations::HashConfig]
+    def activerecord_configuration
+      ActiveRecord::DatabaseConfigurations::HashConfig.new(
+        @activerecord_database_config.env_name,
+        connection_name,
+        activerecord_variables
+      )
+    end
+    # Return postgres ENV variable values for current database with overrided values
+    #
+    # @return[Hash<String,String>] hash of postgres ENV variables
+    def pg_env_variables
+      process_config_overrides! unless @pg_env_variables
+      @pg_env_variables
+    end
+    # Return activerecord configuration values for current database with overrided values
+    #
+    # @return[Hash<String,String>] activerecord database.yml configuration compatible values
+    def activerecord_variables
+      process_config_overrides! unless @activerecord_variables
+      @activerecord_variables
+    end
+    private
+    def process_config_overrides!
+      @activerecord_variables = original_activerecord_config
+      @pg_env_variables = {}
+      SUPPORTED_OVERRIDES.each do |config_key, env_variable_name|
+        # This enables the use of different PostgreSQL settings in
+        # case PgBouncer is used. PgBouncer clears the search path,
+        # which wreaks havoc on Rails if connections are reused.
+        OVERRIDE_PREFIXES.each do |override_prefix|
+          override_all = "#{override_prefix}#{env_variable_name}"
+          override_db = "#{override_prefix}#{connection_name.upcase}_#{env_variable_name}"
+          val = ENV[override_db].presence ||
+            ENV[override_all].presence ||
+            @activerecord_variables[config_key].to_s.presence
+          next unless val
+          @pg_env_variables[env_variable_name] = val
+          @activerecord_variables[config_key] = val
+        end
+      end
+    end
+    # Return the database configuration from rails config/database.yml file
+    # in the format expected by ActiveRecord::DatabaseConfigurations::HashConfig
+    #
+    # @return [Hash] configuration hash
+    def original_activerecord_config
+      @activerecord_database_config.configuration_hash.dup
+    end
+  end
diff --git a/lib/backup/database_connection.rb b/lib/backup/database_connection.rb
new file mode 100644
index 0000000000000000000000000000000000000000..f3f0a5dfcb5f9fd1583908cc5dfc3794238a6b29
--- /dev/null
+++ b/lib/backup/database_connection.rb
@@ -0,0 +1,59 @@
+# frozen_string_literal: true
+module Backup
+  class DatabaseConnection
+    attr_reader :database_configuration, :snapshot_id
+    delegate :connection_name, to: :database_configuration
+    delegate :connection, to: :@backup_model
+    # Initializes a database connection
+    #
+    # @param [String] connection_name the key from `database.yml` for multi-database connection configuration
+    def initialize(connection_name)
+      @database_configuration = Backup::DatabaseConfiguration.new(connection_name)
+      @backup_model = backup_model
+      @snapshot_id = nil
+      configure_backup_model
+    end
+    # Start a new transaction and run pg_export_snapshot()
+    # Returns the snapshot identifier
+    #
+    # @return [String] snapshot identifier
+    def export_snapshot!
+      Gitlab::Database::TransactionTimeoutSettings.new(connection).disable_timeouts
+      connection.begin_transaction(isolation: :repeatable_read)
+      @snapshot_id = connection.select_value("SELECT pg_export_snapshot()")
+    end
+    # Rollback the transaction to release the effects of pg_export_snapshot()
+    def release_snapshot!
+      return unless snapshot_id
+      connection.rollback_transaction
+      @snapshot_id = nil
+    end
+    private
+    delegate :activerecord_configuration, to: :database_configuration, private: true
+    def configure_backup_model
+      @backup_model.establish_connection(activerecord_configuration)
+      Gitlab::Database::LoadBalancing::Setup.new(@backup_model).setup
+    end
+    # Creates a disposable model to be used to host the Backup connection only
+    def backup_model
+      klass_name = connection_name.camelize
+      return "#{self.class.name}::#{klass_name}".constantize if self.class.const_defined?(klass_name.to_sym, false)
+      self.class.const_set(klass_name, Class.new(ApplicationRecord))
+    end
+  end
diff --git a/lib/backup/database_model.rb b/lib/backup/database_model.rb
deleted file mode 100644
index 228a7fa53838ee79776fa1eb7265976b31a7c90b..0000000000000000000000000000000000000000
--- a/lib/backup/database_model.rb
+++ /dev/null
@@ -1,86 +0,0 @@
-# frozen_string_literal: true
-module Backup
-  class DatabaseModel
-      username: 'PGUSER',
-      host: 'PGHOST',
-      port: 'PGPORT',
-      password: 'PGPASSWORD',
-      # SSL
-      sslmode: 'PGSSLMODE',
-      sslkey: 'PGSSLKEY',
-      sslcert: 'PGSSLCERT',
-      sslrootcert: 'PGSSLROOTCERT',
-      sslcrl: 'PGSSLCRL',
-      sslcompression: 'PGSSLCOMPRESSION'
-    }.freeze
-    attr_reader :config
-    def initialize(name)
-      configure_model(name)
-    end
-    def connection
-      @model.connection
-    end
-    private
-    def configure_model(name)
-      source_model = Gitlab::Database.database_base_models_with_gitlab_shared[name] ||
-        Gitlab::Database.database_base_models_with_gitlab_shared['main']
-      @model = backup_model_for(name)
-      original_config = source_model.connection_db_config.configuration_hash.dup
-      @config = config_for_backup(name, original_config)
-      @model.establish_connection(
-        ActiveRecord::DatabaseConfigurations::HashConfig.new(
-          source_model.connection_db_config.env_name,
-          name.to_s,
-          original_config.merge(@config[:activerecord])
-        )
-      )
-      Gitlab::Database::LoadBalancing::Setup.new(@model).setup
-    end
-    def backup_model_for(name)
-      klass_name = name.camelize
-      return "#{self.class.name}::#{klass_name}".constantize if self.class.const_defined?(klass_name.to_sym, false)
-      self.class.const_set(klass_name, Class.new(ApplicationRecord))
-    end
-    def config_for_backup(name, config)
-      db_config = {
-        activerecord: config,
-        pg_env: {}
-      }
-      SUPPORTED_OVERRIDES.each do |opt, arg|
-        # This enables the use of different PostgreSQL settings in
-        # case PgBouncer is used. PgBouncer clears the search path,
-        # which wreaks havoc on Rails if connections are reused.
-        OVERRIDE_PREFIXES.each do |override_prefix|
-          override_all = "#{override_prefix}#{arg}"
-          override_db = "#{override_prefix}#{name.upcase}_#{arg}"
-          val = ENV[override_db].presence || ENV[override_all].presence || config[opt].to_s.presence
-          next unless val
-          db_config[:pg_env][arg] = val
-          db_config[:activerecord][opt] = val
-        end
-      end
-      db_config
-    end
-  end
diff --git a/lib/backup/dump/postgres.rb b/lib/backup/dump/postgres.rb
index 52690d242df036b07e3f2fa6ce7c91402b376d2d..80a499711404b06389a7a0050541deba1fe8953a 100644
--- a/lib/backup/dump/postgres.rb
+++ b/lib/backup/dump/postgres.rb
@@ -4,14 +4,21 @@ module Dump
     class Postgres
       include Backup::Helper
+      # Owner can read/write, group no permission, others no permission
       FILE_PERMISSION = 0o600
-      def dump(database_name, output_file, pgsql_args)
+      # Triggers PgDump and outputs to the provided file path
+      #
+      # @param [String] output_file_path full path to the output destination
+      # @param [Gitlab::Backup::Cli::Utils::PgDump] pg_dump
+      # @return [Boolean] whether pg_dump finished with success
+      def dump(output_file_path, pg_dump)
         compress_rd, compress_wr = IO.pipe
-        compress_pid = spawn(compress_cmd, in: compress_rd, out: [output_file, 'w', FILE_PERMISSION])
+        compress_pid = spawn(compress_cmd, in: compress_rd, out: [output_file_path, 'w', FILE_PERMISSION])
-        dump_pid = Process.spawn('pg_dump', *pgsql_args, database_name, out: compress_wr)
+        dump_pid = pg_dump.spawn(output: compress_wr)
         [compress_pid, dump_pid].all? do |pid|
diff --git a/spec/lib/backup/database_configuration_spec.rb b/spec/lib/backup/database_configuration_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..b7fa9f161c1ec60110862fecd9586c2ccc346624
--- /dev/null
+++ b/spec/lib/backup/database_configuration_spec.rb
@@ -0,0 +1,239 @@
+# frozen_string_literal: true
+require 'spec_helper'
+RSpec.describe Backup::DatabaseConfiguration, :reestablished_active_record_base, feature_category: :backup_restore do
+  using RSpec::Parameterized::TableSyntax
+  let(:connection_name) { 'main' }
+  subject(:config) { described_class.new(connection_name) }
+  describe '#initialize' do
+    it 'initializes with the provided connection_name' do
+      expect_next_instance_of(described_class) do |config|
+        expect(config.connection_name).to eq(connection_name)
+      end
+      config
+    end
+  end
+  describe '#activerecord_configuration' do
+    it 'returns a ActiveRecord::DatabaseConfigurations::HashConfig' do
+      expect(config.activerecord_configuration).to be_a ActiveRecord::DatabaseConfigurations::HashConfig
+    end
+  end
+  context 'with configuration override feature' do
+    let(:application_config) do
+      {
+        adapter: 'postgresql',
+        host: 'some_host',
+        port: '5432'
+      }
+    end
+    let(:active_record_key) { described_class::SUPPORTED_OVERRIDES.invert[pg_env] }
+    before do
+      allow(config).to receive(:original_activerecord_config).and_return(application_config)
+    end
+    shared_context 'with generic database with overridden values' do
+      where(:env_variable, :overridden_value) do
+        'GITLAB_BACKUP_PGHOST' | 'test.invalid.'
+        'GITLAB_BACKUP_PGUSER' | 'some_user'
+        'GITLAB_BACKUP_PGPORT' | '1543'
+        'GITLAB_BACKUP_PGPASSWORD' | 'secret'
+        'GITLAB_BACKUP_PGSSLMODE' | 'allow'
+        'GITLAB_BACKUP_PGSSLKEY' | 'some_key'
+        'GITLAB_BACKUP_PGSSLCERT' | '/path/to/cert'
+        'GITLAB_BACKUP_PGSSLROOTCERT' | '/path/to/root/cert'
+        'GITLAB_BACKUP_PGSSLCRL' | '/path/to/crl'
+        'GITLAB_OVERRIDE_PGHOST' | 'test.invalid.'
+        'GITLAB_OVERRIDE_PGUSER' | 'some_user'
+        'GITLAB_OVERRIDE_PGPORT' | '1543'
+        'GITLAB_OVERRIDE_PGPASSWORD' | 'secret'
+        'GITLAB_OVERRIDE_PGSSLMODE' | 'allow'
+        'GITLAB_OVERRIDE_PGSSLKEY' | 'some_key'
+        'GITLAB_OVERRIDE_PGSSLCERT' | '/path/to/cert'
+        'GITLAB_OVERRIDE_PGSSLROOTCERT' | '/path/to/root/cert'
+        'GITLAB_OVERRIDE_PGSSLCRL' | '/path/to/crl'
+      end
+    end
+    shared_context 'with generic database with overridden values using current database prefix' do
+      where(:env_variable, :overridden_value) do
+        'GITLAB_BACKUP_MAIN_PGHOST' | 'test.invalid.'
+        'GITLAB_BACKUP_MAIN_PGUSER' | 'some_user'
+        'GITLAB_BACKUP_MAIN_PGPORT' | '1543'
+        'GITLAB_BACKUP_MAIN_PGSSLKEY' | 'some_key'
+        'GITLAB_BACKUP_MAIN_PGSSLCERT' | '/path/to/cert'
+        'GITLAB_BACKUP_MAIN_PGSSLROOTCERT' | '/path/to/root/cert'
+        'GITLAB_BACKUP_MAIN_PGSSLCRL' | '/path/to/crl'
+        'GITLAB_OVERRIDE_MAIN_PGHOST' | 'test.invalid.'
+        'GITLAB_OVERRIDE_MAIN_PGUSER' | 'some_user'
+        'GITLAB_OVERRIDE_MAIN_PGSSLKEY' | 'some_key'
+        'GITLAB_OVERRIDE_MAIN_PGSSLCERT' | '/path/to/cert'
+        'GITLAB_OVERRIDE_MAIN_PGSSLROOTCERT' | '/path/to/root/cert'
+        'GITLAB_OVERRIDE_MAIN_PGSSLCRL' | '/path/to/crl'
+      end
+    end
+    shared_context 'with generic database with overridden values for a different database prefix' do
+      where(:env_variable, :overridden_value) do
+        'GITLAB_BACKUP_CI_PGHOST' | 'test.invalid.'
+        'GITLAB_BACKUP_CI_PGUSER' | 'some_user'
+        'GITLAB_BACKUP_CI_PGPORT' | '1543'
+        'GITLAB_BACKUP_CI_PGPASSWORD' | 'secret'
+        'GITLAB_BACKUP_CI_PGSSLMODE' | 'allow'
+        'GITLAB_BACKUP_CI_PGSSLKEY' | 'some_key'
+        'GITLAB_BACKUP_CI_PGSSLCERT' | '/path/to/cert'
+        'GITLAB_BACKUP_CI_PGSSLROOTCERT' | '/path/to/root/cert'
+        'GITLAB_BACKUP_CI_PGSSLCRL' | '/path/to/crl'
+        'GITLAB_OVERRIDE_CI_PGHOST' | 'test.invalid.'
+        'GITLAB_OVERRIDE_CI_PGUSER' | 'some_user'
+        'GITLAB_OVERRIDE_CI_PGPORT' | '1543'
+        'GITLAB_OVERRIDE_CI_PGSSLKEY' | 'some_key'
+        'GITLAB_OVERRIDE_CI_PGSSLCERT' | '/path/to/cert'
+        'GITLAB_OVERRIDE_CI_PGSSLROOTCERT' | '/path/to/root/cert'
+        'GITLAB_OVERRIDE_CI_PGSSLCRL' | '/path/to/crl'
+      end
+    end
+    describe('#pg_env_variables') do
+      context 'with provided ENV variables' do
+        before do
+          stub_env(env_variable, overridden_value)
+        end
+        context 'when generic database configuration is overridden' do
+          include_context "with generic database with overridden values"
+          with_them do
+            let(:pg_env) { env_variable[/GITLAB_(BACKUP|OVERRIDE)_(\w+)/, 2] }
+            it 'PostgreSQL ENV overrides application configuration' do
+              expect(config.pg_env_variables).to include({ pg_env => overridden_value })
+            end
+          end
+        end
+        context 'when specific database configuration is overridden' do
+          context 'and environment variables are for the current database name' do
+            include_context 'with generic database with overridden values using current database prefix'
+            with_them do
+              let(:pg_env) { env_variable[/GITLAB_(BACKUP|OVERRIDE)_MAIN_(\w+)/, 2] }
+              it 'PostgreSQL ENV overrides application configuration' do
+                expect(config.pg_env_variables).to include({ pg_env => overridden_value })
+              end
+            end
+          end
+          context 'and environment variables are for another database' do
+            include_context 'with generic database with overridden values for a different database prefix'
+            with_them do
+              let(:pg_env) { env_variable[/GITLAB_(BACKUP|OVERRIDE)_CI_(\w+)/, 1] }
+              it 'PostgreSQL ENV is expected to equal application configuration' do
+                expect(config.pg_env_variables).to eq(
+                  {
+                    'PGHOST' => application_config[:host],
+                    'PGPORT' => application_config[:port]
+                  }
+                )
+              end
+            end
+          end
+        end
+      end
+      context 'when both GITLAB_BACKUP_PGUSER and GITLAB_BACKUP_MAIN_PGUSER variable are present' do
+        it 'prefers more specific GITLAB_BACKUP_MAIN_PGUSER' do
+          stub_env('GITLAB_BACKUP_PGUSER', 'generic_user')
+          stub_env('GITLAB_BACKUP_MAIN_PGUSER', 'specific_user')
+          expect(config.pg_env_variables['PGUSER']).to eq('specific_user')
+        end
+      end
+    end
+    describe('#activerecord_variables') do
+      context 'with provided ENV variables' do
+        before do
+          stub_env(env_variable, overridden_value)
+        end
+        context 'when generic database configuration is overridden' do
+          include_context "with generic database with overridden values"
+          with_them do
+            let(:pg_env) { env_variable[/GITLAB_(BACKUP|OVERRIDE)_(\w+)/, 2] }
+            it 'ActiveRecord backup configuration overrides application configuration' do
+              expect(config.activerecord_variables).to eq(
+                application_config.merge(active_record_key => overridden_value)
+              )
+            end
+          end
+        end
+        context 'when specific database configuration is overridden' do
+          context 'and environment variables are for the current database name' do
+            include_context 'with generic database with overridden values using current database prefix'
+            with_them do
+              let(:pg_env) { env_variable[/GITLAB_(BACKUP|OVERRIDE)_MAIN_(\w+)/, 2] }
+              it 'ActiveRecord backup configuration overrides application configuration' do
+                expect(config.activerecord_variables).to eq(
+                  application_config.merge(active_record_key => overridden_value)
+                )
+              end
+            end
+          end
+          context 'and environment variables are for another database' do
+            include_context 'with generic database with overridden values for a different database prefix'
+            with_them do
+              let(:pg_env) { env_variable[/GITLAB_(BACKUP|OVERRIDE)_CI_(\w+)/, 1] }
+              it 'ActiveRecord backup configuration is expected to equal application configuration' do
+                expect(config.activerecord_variables).to eq(application_config)
+              end
+            end
+          end
+        end
+      end
+      context 'when both GITLAB_BACKUP_PGUSER and GITLAB_BACKUP_MAIN_PGUSER variable are present' do
+        with_them do
+          it 'prefers more specific GITLAB_BACKUP_MAIN_PGUSER' do
+            stub_env('GITLAB_BACKUP_PGUSER', 'generic_user')
+            stub_env('GITLAB_BACKUP_MAIN_PGUSER', 'specific_user')
+            expect(config.activerecord_variables[:username]).to eq('specific_user')
+          end
+        end
+      end
+    end
+  end
diff --git a/spec/lib/backup/database_connection_spec.rb b/spec/lib/backup/database_connection_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..b56da3d99f724171e5eac6ee6e45e8f69bcde990
--- /dev/null
+++ b/spec/lib/backup/database_connection_spec.rb
@@ -0,0 +1,103 @@
+# frozen_string_literal: true
+require 'spec_helper'
+RSpec.describe Backup::DatabaseConnection, :reestablished_active_record_base, feature_category: :backup_restore do
+  let(:connection_name) { 'main' }
+  let(:snapshot_id_pattern) { /[A-Z0-9]{8}-[A-Z0-9]{8}-[0-9]/ }
+  subject(:backup_connection) { described_class.new(connection_name) }
+  describe '#initialize' do
+    it 'initializes database_configuration with the provided connection_name' do
+      expect(Backup::DatabaseConfiguration).to receive(:new).with(connection_name).and_call_original
+      backup_connection
+    end
+  end
+  describe '#connection_name' do
+    it 'returns the same connection name used during initialization' do
+      expect(backup_connection.connection_name).to eq(connection_name)
+    end
+  end
+  describe '#connection' do
+    it 'is an instance of a ActiveRecord::Base.connection' do
+      backup_connection.connection.is_a? Gitlab::Database::LoadBalancing::ConnectionProxy
+    end
+  end
+  describe '#database_configuration' do
+    it 'returns database configuration' do
+      expect(backup_connection.database_configuration).to be_a(Backup::DatabaseConfiguration)
+    end
+  end
+  describe '#snapshot_id' do
+    it "returns nil when snapshot has not been triggered" do
+      expect(backup_connection.snapshot_id).to be_nil
+    end
+    context 'when a snapshot transaction is open', :delete do
+      let!(:snapshot_id) { backup_connection.export_snapshot! }
+      it 'returns the snapshot_id in the expected format' do
+        expect(backup_connection.snapshot_id).to match(snapshot_id_pattern)
+      end
+      it 'returns the snapshot_id equal to the one returned by #export_snapshot!' do
+        expect(backup_connection.snapshot_id).to eq(snapshot_id)
+      end
+      it "returns nil after a snapshot is released" do
+        backup_connection.release_snapshot!
+        expect(backup_connection.snapshot_id).to be_nil
+      end
+    end
+  end
+  describe '#export_snapshot!', :delete do
+    it 'returns a snapshot_id in the expected format' do
+      expect(backup_connection.export_snapshot!).to match(snapshot_id_pattern)
+    end
+    it 'opens a transaction with correct isolation format and triggers a snapshot generation' do
+      expect(backup_connection.connection).to receive(:begin_transaction).with(
+        isolation: :repeatable_read
+      ).and_call_original
+      expect(backup_connection.connection).to receive(:select_value).with(
+        "SELECT pg_export_snapshot()"
+      ).and_call_original
+      backup_connection.export_snapshot!
+    end
+    it 'disables transaction time out' do
+      expect_next_instance_of(Gitlab::Database::TransactionTimeoutSettings) do |transaction_settings|
+        expect(transaction_settings).to receive(:disable_timeouts).and_call_original
+      end
+      backup_connection.export_snapshot!
+    end
+  end
+  describe '#release_snapshot!', :delete do
+    it 'clears out existing snapshot_id' do
+      snapshot_id = backup_connection.export_snapshot!
+      expect { backup_connection.release_snapshot! }.to change { backup_connection.snapshot_id }
+        .from(snapshot_id).to(nil)
+    end
+    it 'executes a transaction rollback' do
+      backup_connection.export_snapshot!
+      expect(backup_connection.connection).to receive(:rollback_transaction).and_call_original
+      backup_connection.release_snapshot!
+    end
+  end
diff --git a/spec/lib/backup/database_model_spec.rb b/spec/lib/backup/database_model_spec.rb
deleted file mode 100644
index 522bba8097230951623ffdde3a29b83fc684d053..0000000000000000000000000000000000000000
--- a/spec/lib/backup/database_model_spec.rb
+++ /dev/null
@@ -1,186 +0,0 @@
-# frozen_string_literal: true
-require 'spec_helper'
-RSpec.describe Backup::DatabaseModel, :reestablished_active_record_base, feature_category: :backup_restore do
-  using RSpec::Parameterized::TableSyntax
-  let(:gitlab_database_name) { 'main' }
-  describe '#connection' do
-    subject { described_class.new(gitlab_database_name).connection }
-    it 'an instance of a ActiveRecord::Base.connection' do
-      subject.is_a? ActiveRecord::Base.connection.class # rubocop:disable Database/MultipleDatabases
-    end
-  end
-  describe '#config' do
-    let(:application_config) do
-      {
-        adapter: 'postgresql',
-        host: 'some_host',
-        port: '5432'
-      }
-    end
-    subject { described_class.new(gitlab_database_name).config }
-    before do
-      allow(
-        Gitlab::Database.database_base_models_with_gitlab_shared[gitlab_database_name].connection_db_config
-      ).to receive(:configuration_hash).and_return(application_config)
-    end
-    shared_examples 'no configuration is overridden' do
-      it 'ActiveRecord backup configuration is expected to equal application configuration' do
-        expect(subject[:activerecord]).to eq(application_config)
-      end
-      it 'PostgreSQL ENV is expected to equal application configuration' do
-        expect(subject[:pg_env]).to eq(
-          {
-            'PGHOST' => application_config[:host],
-            'PGPORT' => application_config[:port]
-          }
-        )
-      end
-    end
-    shared_examples 'environment variables override application configuration' do
-      let(:active_record_key) { described_class::SUPPORTED_OVERRIDES.invert[pg_env] }
-      it 'ActiveRecord backup configuration overrides application configuration' do
-        expect(subject[:activerecord]).to eq(application_config.merge(active_record_key => overridden_value))
-      end
-      it 'PostgreSQL ENV overrides application configuration' do
-        expect(subject[:pg_env]).to include({ pg_env => overridden_value })
-      end
-    end
-    context 'when no GITLAB_BACKUP_PG* variables are set' do
-      it_behaves_like 'no configuration is overridden'
-    end
-    context 'when generic database configuration is overridden' do
-      where(:env_variable, :overridden_value) do
-        'GITLAB_BACKUP_PGHOST'           | 'test.invalid.'
-        'GITLAB_BACKUP_PGUSER'           | 'some_user'
-        'GITLAB_BACKUP_PGPORT'           | '1543'
-        'GITLAB_BACKUP_PGPASSWORD'       | 'secret'
-        'GITLAB_BACKUP_PGSSLMODE'        | 'allow'
-        'GITLAB_BACKUP_PGSSLKEY'         | 'some_key'
-        'GITLAB_BACKUP_PGSSLCERT'        | '/path/to/cert'
-        'GITLAB_BACKUP_PGSSLROOTCERT'    | '/path/to/root/cert'
-        'GITLAB_BACKUP_PGSSLCRL'         | '/path/to/crl'
-        'GITLAB_OVERRIDE_PGHOST'           | 'test.invalid.'
-        'GITLAB_OVERRIDE_PGUSER'           | 'some_user'
-        'GITLAB_OVERRIDE_PGPORT'           | '1543'
-        'GITLAB_OVERRIDE_PGPASSWORD'       | 'secret'
-        'GITLAB_OVERRIDE_PGSSLMODE'        | 'allow'
-        'GITLAB_OVERRIDE_PGSSLKEY'         | 'some_key'
-        'GITLAB_OVERRIDE_PGSSLCERT'        | '/path/to/cert'
-        'GITLAB_OVERRIDE_PGSSLROOTCERT'    | '/path/to/root/cert'
-        'GITLAB_OVERRIDE_PGSSLCRL'         | '/path/to/crl'
-      end
-      with_them do
-        let(:pg_env) { env_variable[/GITLAB_(BACKUP|OVERRIDE)_(\w+)/, 2] }
-        before do
-          stub_env(env_variable, overridden_value)
-        end
-        it_behaves_like 'environment variables override application configuration'
-      end
-    end
-    context 'when specific database configuration is overridden' do
-      context 'and environment variables are for the current database name' do
-        where(:env_variable, :overridden_value) do
-          'GITLAB_BACKUP_MAIN_PGHOST'           | 'test.invalid.'
-          'GITLAB_BACKUP_MAIN_PGUSER'           | 'some_user'
-          'GITLAB_BACKUP_MAIN_PGPORT'           | '1543'
-          'GITLAB_BACKUP_MAIN_PGPASSWORD'       | 'secret'
-          'GITLAB_BACKUP_MAIN_PGSSLMODE'        | 'allow'
-          'GITLAB_BACKUP_MAIN_PGSSLKEY'         | 'some_key'
-          'GITLAB_BACKUP_MAIN_PGSSLCERT'        | '/path/to/cert'
-          'GITLAB_BACKUP_MAIN_PGSSLROOTCERT'    | '/path/to/root/cert'
-          'GITLAB_BACKUP_MAIN_PGSSLCRL'         | '/path/to/crl'
-          'GITLAB_OVERRIDE_MAIN_PGHOST'           | 'test.invalid.'
-          'GITLAB_OVERRIDE_MAIN_PGUSER'           | 'some_user'
-          'GITLAB_OVERRIDE_MAIN_PGPORT'           | '1543'
-          'GITLAB_OVERRIDE_MAIN_PGPASSWORD'       | 'secret'
-          'GITLAB_OVERRIDE_MAIN_PGSSLMODE'        | 'allow'
-          'GITLAB_OVERRIDE_MAIN_PGSSLKEY'         | 'some_key'
-          'GITLAB_OVERRIDE_MAIN_PGSSLCERT'        | '/path/to/cert'
-          'GITLAB_OVERRIDE_MAIN_PGSSLROOTCERT'    | '/path/to/root/cert'
-          'GITLAB_OVERRIDE_MAIN_PGSSLCRL'         | '/path/to/crl'
-        end
-        with_them do
-          let(:pg_env) { env_variable[/GITLAB_(BACKUP|OVERRIDE)_MAIN_(\w+)/, 2] }
-          before do
-            stub_env(env_variable, overridden_value)
-          end
-          it_behaves_like 'environment variables override application configuration'
-        end
-      end
-      context 'and environment variables are for another database' do
-        where(:env_variable, :overridden_value) do
-          'GITLAB_BACKUP_CI_PGHOST'           | 'test.invalid.'
-          'GITLAB_BACKUP_CI_PGUSER'           | 'some_user'
-          'GITLAB_BACKUP_CI_PGPORT'           | '1543'
-          'GITLAB_BACKUP_CI_PGPASSWORD'       | 'secret'
-          'GITLAB_BACKUP_CI_PGSSLMODE'        | 'allow'
-          'GITLAB_BACKUP_CI_PGSSLKEY'         | 'some_key'
-          'GITLAB_BACKUP_CI_PGSSLCERT'        | '/path/to/cert'
-          'GITLAB_BACKUP_CI_PGSSLROOTCERT'    | '/path/to/root/cert'
-          'GITLAB_BACKUP_CI_PGSSLCRL'         | '/path/to/crl'
-          'GITLAB_OVERRIDE_CI_PGHOST'           | 'test.invalid.'
-          'GITLAB_OVERRIDE_CI_PGUSER'           | 'some_user'
-          'GITLAB_OVERRIDE_CI_PGPORT'           | '1543'
-          'GITLAB_OVERRIDE_CI_PGPASSWORD'       | 'secret'
-          'GITLAB_OVERRIDE_CI_PGSSLMODE'        | 'allow'
-          'GITLAB_OVERRIDE_CI_PGSSLKEY'         | 'some_key'
-          'GITLAB_OVERRIDE_CI_PGSSLCERT'        | '/path/to/cert'
-          'GITLAB_OVERRIDE_CI_PGSSLROOTCERT'    | '/path/to/root/cert'
-          'GITLAB_OVERRIDE_CI_PGSSLCRL'         | '/path/to/crl'
-        end
-        with_them do
-          let(:pg_env) { env_variable[/GITLAB_(BACKUP|OVERRIDE)_CI_(\w+)/, 1] }
-          before do
-            stub_env(env_variable, overridden_value)
-          end
-          it_behaves_like 'no configuration is overridden'
-        end
-      end
-      context 'when both GITLAB_BACKUP_PGUSER and GITLAB_BACKUP_MAIN_PGUSER variable are present' do
-        before do
-          stub_env('GITLAB_BACKUP_PGUSER', 'generic_user')
-          stub_env('GITLAB_BACKUP_MAIN_PGUSER', 'specfic_user')
-        end
-        it 'prefers more specific GITLAB_BACKUP_MAIN_PGUSER' do
-          config = subject
-          expect(config.dig(:activerecord, :username)).to eq('specfic_user')
-          expect(config.dig(:pg_env, 'PGUSER')).to eq('specfic_user')
-        end
-      end
-    end
-  end
diff --git a/spec/lib/backup/database_spec.rb b/spec/lib/backup/database_spec.rb
index 26d09c275c1f793dc5c221ea5d2b23cb19b9174e..86468689f76e1191728aac49dc46603dddc2f3ba 100644
--- a/spec/lib/backup/database_spec.rb
+++ b/spec/lib/backup/database_spec.rb
@@ -48,28 +48,16 @@
       it 'uses snapshots' do
         Dir.mktmpdir do |dir|
-          expect_next_instances_of(Backup::DatabaseModel, 2) do |adapter|
-            expect(adapter.connection).to receive(:begin_transaction).with(
-              isolation: :repeatable_read
-            ).and_call_original
-            expect(adapter.connection).to receive(:select_value).with(
-              "SELECT pg_export_snapshot()"
-            ).and_call_original
-            expect(adapter.connection).to receive(:rollback_transaction).and_call_original
-          end
+          expect_next_instances_of(Backup::DatabaseConnection, 2) do |backup_connection|
+            expect(backup_connection).to receive(:export_snapshot!).and_call_original
-          subject.dump(dir, backup_id)
-        end
-      end
+            expect_next_instance_of(::Gitlab::Backup::Cli::Utils::PgDump) do |pgdump|
+              expect(pgdump.snapshot_id).to eq(backup_connection.snapshot_id)
+            end
-      it 'disables transaction time out' do
-        number_of_databases = base_models_for_backup.count
-        expect(Gitlab::Database::TransactionTimeoutSettings)
-          .to receive(:new).exactly(2 * number_of_databases).times.and_return(timeout_service)
-        expect(timeout_service).to receive(:disable_timeouts).exactly(number_of_databases).times
-        expect(timeout_service).to receive(:restore_timeouts).exactly(number_of_databases).times
+            expect(backup_connection).to receive(:release_snapshot!).and_call_original
+          end
-        Dir.mktmpdir do |dir|
           subject.dump(dir, backup_id)
@@ -82,79 +70,18 @@
       it 'does not use snapshots' do
         Dir.mktmpdir do |dir|
-          base_model = Backup::DatabaseModel.new('main')
-          expect(base_model.connection).not_to receive(:begin_transaction).with(
-            isolation: :repeatable_read
-          ).and_call_original
-          expect(base_model.connection).not_to receive(:select_value).with(
-            "SELECT pg_export_snapshot()"
-          ).and_call_original
-          expect(base_model.connection).not_to receive(:rollback_transaction).and_call_original
-          subject.dump(dir, backup_id)
-        end
-      end
-    end
-    describe 'pg_dump arguments' do
-      let(:snapshot_id) { 'fake_id' }
-      let(:default_pg_args) do
-        args = [
-          '--clean',
-          '--if-exists'
-        ]
-        if Gitlab::Database.database_mode == Gitlab::Database::MODE_MULTIPLE_DATABASES
-          args + ["--snapshot=#{snapshot_id}"]
-        else
-          args
-        end
-      end
+          expect_next_instance_of(Backup::DatabaseConnection) do |backup_connection|
+            expect(backup_connection).not_to receive(:export_snapshot!)
-      let(:dumper) { double }
-      let(:destination_dir) { 'tmp' }
-      before do
-        allow(Backup::Dump::Postgres).to receive(:new).and_return(dumper)
-        allow(dumper).to receive(:dump).with(any_args).and_return(true)
-      end
-      shared_examples 'pg_dump arguments' do
-        it 'calls Backup::Dump::Postgres with correct pg_dump arguments' do
-          number_of_databases = base_models_for_backup.count
-          if number_of_databases > 1
-            expect_next_instances_of(Backup::DatabaseModel, number_of_databases) do |model|
-              expect(model.connection).to receive(:select_value).with(
-                "SELECT pg_export_snapshot()"
-              ).and_return(snapshot_id)
+            expect_next_instance_of(::Gitlab::Backup::Cli::Utils::PgDump) do |pgdump|
+              expect(pgdump.snapshot_id).to be_nil
-          end
-          expect(dumper).to receive(:dump).with(anything, anything, expected_pg_args)
-          subject.dump(destination_dir, backup_id)
-        end
-      end
-      context 'when no PostgreSQL schemas are specified' do
-        let(:expected_pg_args) { default_pg_args }
-        include_examples 'pg_dump arguments'
-      end
-      context 'when a PostgreSQL schema is used' do
-        let(:schema) { 'gitlab' }
-        let(:expected_pg_args) do
-          default_pg_args + ['-n', schema] + Gitlab::Database::EXTRA_SCHEMAS.flat_map do |schema|
-            ['-n', schema.to_s]
+            expect(backup_connection).not_to receive(:release_snapshot!)
-        end
-        before do
-          allow(Gitlab.config.backup).to receive(:pg_schema).and_return(schema)
+          subject.dump(dir, backup_id)
-        include_examples 'pg_dump arguments'
diff --git a/spec/lib/backup/dump/postgres_spec.rb b/spec/lib/backup/dump/postgres_spec.rb
index f7020c78c6c820b98b8a1073cf3f8039c662143f..1da2ee950db6da7da13e16a751804b4eec7f5c4c 100644
--- a/spec/lib/backup/dump/postgres_spec.rb
+++ b/spec/lib/backup/dump/postgres_spec.rb
@@ -3,17 +3,22 @@
 require 'spec_helper'
 RSpec.describe Backup::Dump::Postgres, feature_category: :backup_restore do
-  describe '#dump' do
-    let(:pg_database) { 'gitlabhq_test' }
-    let(:destination_dir) { Dir.mktmpdir }
-    let(:db_file_name) { File.join(destination_dir, 'output.gz') }
+  let(:pg_database) { 'gitlabhq_test' }
+  let(:pg_dump) { ::Gitlab::Backup::Cli::Utils::PgDump.new(database_name: pg_database) }
+  let(:default_compression_cmd) { 'gzip -c -1' }
-    let(:pipes) { IO.pipe }
-    let(:gzip_pid) { spawn('gzip -c -1', in: pipes[0], out: [db_file_name, 'w', 0o600]) }
-    let(:pg_dump_pid) { Process.spawn('pg_dump', *args, pg_database, out: pipes[1]) }
-    let(:args) { ['--help'] }
+  subject(:postgres) { described_class.new }
-    subject { described_class.new }
+  describe '#compress_cmd' do
+    it 'returns default compression command' do
+      expect(postgres.compress_cmd).to eq(default_compression_cmd)
+    end
+  end
+  describe '#dump' do
+    let(:pipes) { IO.pipe }
+    let(:destination_dir) { Dir.mktmpdir }
+    let(:dump_file_name) { File.join(destination_dir, 'output.gz') }
     before do
       allow(IO).to receive(:pipe).and_return(pipes)
@@ -23,33 +28,54 @@
       FileUtils.remove_entry destination_dir
-    it 'creates gzipped dump using supplied arguments' do
-      expect(subject).to receive(:spawn).with('gzip -c -1', in: pipes.first,
-                                                            out: [db_file_name, 'w', 0o600]).and_return(gzip_pid)
-      expect(Process).to receive(:spawn).with('pg_dump', *args, pg_database, out: pipes[1]).and_return(pg_dump_pid)
+    context 'with default compression method' do
+      it 'creates a dump file' do
+        postgres.dump(dump_file_name, pg_dump)
-      subject.dump(pg_database, db_file_name, args)
+        expect(File.exist?(dump_file_name)).to eq(true)
+      end
+      it 'default compression command is used' do
+        compressor_pid = spawn(default_compression_cmd, in: pipes[0], out: [dump_file_name, 'w', 0o600])
+        expect(postgres).to receive(:spawn).with(
+          default_compression_cmd,
+          in: pipes.first,
+          out: [dump_file_name, 'w', 0o600]).and_return(compressor_pid)
-      expect(File.exist?(db_file_name)).to eq(true)
+        postgres.dump(dump_file_name, pg_dump)
+        expect(File.exist?(dump_file_name)).to eq(true)
+      end
     context 'when COMPRESS_CMD is set to tee' do
-      let(:tee_pid) { spawn('tee', in: pipes[0], out: [db_file_name, 'w', 0o600]) }
+      let(:tee_pid) { spawn('tee', in: pipes[0], out: [dump_file_name, 'w', 0o600]) }
       before do
         stub_env('COMPRESS_CMD', 'tee')
+      it 'creates a dump file' do
+        postgres.dump(dump_file_name, pg_dump)
+        expect(File.exist?(dump_file_name)).to eq(true)
+      end
       it 'passes through tee instead of gzip' do
-        expect(subject).to receive(:spawn).with('tee', in: pipes.first,
-                                                       out: [db_file_name, 'w', 0o600]).and_return(tee_pid)
-        expect(Process).to receive(:spawn).with('pg_dump', *args, pg_database, out: pipes[1]).and_return(pg_dump_pid)
+        custom_compression_command = 'tee'
+        compressor_pid = spawn(custom_compression_command, in: pipes[0], out: [dump_file_name, 'w', 0o600])
+        expect(postgres).to receive(:spawn).with(
+          custom_compression_command,
+          in: pipes.first,
+          out: [dump_file_name, 'w', 0o600]).and_return(compressor_pid)
         expect do
-          subject.dump(pg_database, db_file_name, args)
+          postgres.dump(dump_file_name, pg_dump)
         end.to output(/Using custom COMPRESS_CMD 'tee'/).to_stdout
-        expect(File.exist?(db_file_name)).to eq(true)
+        expect(File.exist?(dump_file_name)).to eq(true)