diff --git a/config/feature_flags/wip/bitbucket_import_resumable_worker.yml b/config/feature_flags/beta/bitbucket_import_resumable_worker.yml
similarity index 71%
rename from config/feature_flags/wip/bitbucket_import_resumable_worker.yml
rename to config/feature_flags/beta/bitbucket_import_resumable_worker.yml
index ae411b7ae0b99daac780c02d2a79e998cc935785..42b9b34d4e326073a3d3e85d29aa7549a2674e6f 100644
--- a/config/feature_flags/wip/bitbucket_import_resumable_worker.yml
+++ b/config/feature_flags/beta/bitbucket_import_resumable_worker.yml
@@ -2,8 +2,8 @@
 name: bitbucket_import_resumable_worker
 feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/466231
 introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/156797
-rollout_issue_url:
-milestone: '17.2'
+rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/471309
+milestone: '17.3'
 group: group::import and integrate
-type: wip
+type: beta
 default_enabled: false
diff --git a/lib/bitbucket/client.rb b/lib/bitbucket/client.rb
index c031851af3d7be1e13c67e9523f8412c2935ae2d..25894617af8644ccde36cb2a2c52929a217bd84e 100644
--- a/lib/bitbucket/client.rb
+++ b/lib/bitbucket/client.rb
@@ -73,9 +73,15 @@ def last_issue(repo)
       Bitbucket::Representation::Issue.new(parsed_response['values'].first)
     end
 
-    def issues(repo)
+    def issues(repo, options = {})
       path = "/repositories/#{repo}/issues?sort=created_on"
-      get_collection(path, :issue)
+
+      if options[:raw]
+        path = options[:next_url] if options[:next_url]
+        connection.get(path)
+      else
+        get_collection(path, :issue)
+      end
     end
 
     def issue_comments(repo, issue_id)
@@ -133,6 +139,7 @@ def users(workspace_key, page_number: nil, limit: nil)
     def fetch_data(method, *args)
       case method
       when :pull_requests then pull_requests(*args)
+      when :issues then issues(*args)
       else
         raise ArgumentError, "Unknown data method #{method}"
       end
diff --git a/lib/gitlab/bitbucket_import/importers/issues_importer.rb b/lib/gitlab/bitbucket_import/importers/issues_importer.rb
index 678cb4e129db2edaa294e66ccb5c955ea5c79cfa..bf457d23fc541d1cda0bfed98c7c0df2ee1204d0 100644
--- a/lib/gitlab/bitbucket_import/importers/issues_importer.rb
+++ b/lib/gitlab/bitbucket_import/importers/issues_importer.rb
@@ -11,6 +11,38 @@ def execute
 
           log_info(import_stage: 'import_issues', message: 'importing issues')
 
+          bitbucket_import_resumable_worker =
+            project.import_data&.data&.dig('bitbucket_import_resumable_worker')
+
+          if bitbucket_import_resumable_worker
+            resumable_execute
+          else
+            non_resumable_execute
+          end
+        end
+
+        private
+
+        def resumable_execute
+          labels = build_labels_hash
+
+          is_first = true
+          each_object_to_import do |object|
+            job_delay = calculate_job_delay(job_waiter.jobs_remaining)
+
+            if is_first
+              allocate_issues_internal_id!
+              is_first = false
+            end
+
+            issue_hash = object.to_hash.merge({ issue_type_id: default_issue_type_id, label_id: labels[object[:kind]] })
+            sidekiq_worker_class.perform_in(job_delay, project.id, issue_hash, job_waiter.key)
+          end
+
+          job_waiter
+        end
+
+        def non_resumable_execute
           issues = client.issues(project.import_source)
 
           labels = build_labels_hash
@@ -36,8 +68,6 @@ def execute
           job_waiter
         end
 
-        private
-
         def sidekiq_worker_class
           ImportIssueWorker
         end
@@ -46,8 +76,22 @@ def collection_method
           :issues
         end
 
+        def collection_options
+          { raw: true }
+        end
+
+        def representation_type
+          :issue
+        end
+
         def id_for_already_enqueued_cache(object)
-          object.iid
+          if object.is_a?(Hash)
+            # used for `resumable_execute`
+            object[:iid]
+          else
+            # used for `non_resumable_execute`
+            object.iid
+          end
         end
 
         def default_issue_type_id
diff --git a/spec/lib/bitbucket/client_spec.rb b/spec/lib/bitbucket/client_spec.rb
index a485ca5455827f4b574dc18e45a92958d9be5445..94b0785fa54f616a42c4c5b7ddad5217d5d8591c 100644
--- a/spec/lib/bitbucket/client_spec.rb
+++ b/spec/lib/bitbucket/client_spec.rb
@@ -15,66 +15,72 @@
   subject(:client) { described_class.new(options) }
 
   describe '#each_page' do
-    let_it_be(:item1) do
-      { 'username' => 'Ben' }
-    end
+    shared_examples 'fetching bitbucket data' do |params|
+      let_it_be(:item1) do
+        { 'username' => 'Ben' }
+      end
 
-    let_it_be(:item2) do
-      { 'username' => 'Affleck' }
-    end
+      let_it_be(:item2) do
+        { 'username' => 'Affleck' }
+      end
 
-    let_it_be(:item3) do
-      { 'username' => 'Jane' }
-    end
+      let_it_be(:item3) do
+        { 'username' => 'Jane' }
+      end
 
-    let_it_be(:response1) do
-      { 'values' => [item1], 'next' => 'https://example.com/next' }
-    end
+      let_it_be(:response1) do
+        { 'values' => [item1], 'next' => 'https://example.com/next' }
+      end
 
-    let_it_be(:response2) do
-      { 'values' => [item2], 'next' => 'https://example.com/next2' }
-    end
+      let_it_be(:response2) do
+        { 'values' => [item2], 'next' => 'https://example.com/next2' }
+      end
 
-    let_it_be(:response3) do
-      { 'values' => [item3], 'next' => nil }
-    end
+      let_it_be(:response3) do
+        { 'values' => [item3], 'next' => nil }
+      end
 
-    before do
-      allow(client)
-        .to receive(:pull_requests)
-        .with('repo')
-        .and_return(response1)
+      before do
+        allow(client)
+          .to receive(params[:fetch_type])
+          .with('repo')
+          .and_return(response1)
+
+        allow(client)
+          .to receive(params[:fetch_type])
+          .with('repo', { next_url: 'https://example.com/next' })
+          .and_return(response2)
+
+        allow(client)
+          .to receive(params[:fetch_type])
+          .with('repo', { next_url: 'https://example.com/next2' })
+          .and_return(response3)
+      end
 
-      allow(client)
-        .to receive(:pull_requests)
-        .with('repo', { next_url: 'https://example.com/next' })
-        .and_return(response2)
+      it 'yields every retrieved page to the supplied block' do
+        pages = []
 
-      allow(client)
-        .to receive(:pull_requests)
-        .with('repo', { next_url: 'https://example.com/next2' })
-        .and_return(response3)
-    end
+        client.each_page(params[:fetch_type], params[:representation_type], 'repo') { |page| pages << page }
 
-    it 'yields every retrieved page to the supplied block' do
-      pages = []
+        expect(pages[0]).to be_an_instance_of(Bitbucket::Page)
 
-      client.each_page(:pull_requests, :pull_request, 'repo') { |page| pages << page }
+        expect(pages[0].items.count).to eq(1)
+        expect(pages[0].items.first.raw).to eq(item1)
+        expect(pages[0].attrs[:next]).to eq('https://example.com/next')
 
-      expect(pages[0]).to be_an_instance_of(Bitbucket::Page)
+        expect(pages[1].items.count).to eq(1)
+        expect(pages[1].items.first.raw).to eq(item2)
+        expect(pages[1].attrs[:next]).to eq('https://example.com/next2')
 
-      expect(pages[0].items.count).to eq(1)
-      expect(pages[0].items.first.raw).to eq(item1)
-      expect(pages[0].attrs[:next]).to eq('https://example.com/next')
+        expect(pages[2].items.count).to eq(1)
+        expect(pages[2].items.first.raw).to eq(item3)
+        expect(pages[2].attrs[:next]).to eq(nil)
+      end
+    end
 
-      expect(pages[1].items.count).to eq(1)
-      expect(pages[1].items.first.raw).to eq(item2)
-      expect(pages[1].attrs[:next]).to eq('https://example.com/next2')
+    it_behaves_like 'fetching bitbucket data', { fetch_type: :pull_requests, representation_type: :pull_request }
 
-      expect(pages[2].items.count).to eq(1)
-      expect(pages[2].items.first.raw).to eq(item3)
-      expect(pages[2].attrs[:next]).to eq(nil)
-    end
+    it_behaves_like 'fetching bitbucket data', { fetch_type: :issues, representation_type: :issue }
 
     context 'when fetch_data not defined' do
       it 'raises argument error' do
@@ -108,6 +114,18 @@
 
       client.issues(repo)
     end
+
+    context 'with options raw' do
+      let(:url) { "#{root_url}#{path}" }
+
+      it 'returns raw result' do
+        stub_request(:get, url).to_return(status: 200, headers: headers, body: '{}')
+
+        client.issues(repo, raw: true)
+
+        expect(WebMock).to have_requested(:get, url)
+      end
+    end
   end
 
   describe '#issue_comments' do
diff --git a/spec/lib/gitlab/bitbucket_import/importers/issues_importer_spec.rb b/spec/lib/gitlab/bitbucket_import/importers/issues_importer_spec.rb
index b4c6fb085c63e1f5e084b37833496934ac579e4d..677295ff8d25591dbd63884b710294c02e36b95a 100644
--- a/spec/lib/gitlab/bitbucket_import/importers/issues_importer_spec.rb
+++ b/spec/lib/gitlab/bitbucket_import/importers/issues_importer_spec.rb
@@ -3,33 +3,22 @@
 require 'spec_helper'
 
 RSpec.describe Gitlab::BitbucketImport::Importers::IssuesImporter, :clean_gitlab_redis_shared_state, feature_category: :importers do
-  let_it_be(:project) do
-    create(:project, :import_started,
-      import_data_attributes: {
-        data: { 'project_key' => 'key', 'repo_slug' => 'slug' },
-        credentials: { 'base_uri' => 'http://bitbucket.org/', 'user' => 'bitbucket', 'password' => 'password' }
-      }
-    )
-  end
-
-  let(:client) { Bitbucket::Client.new(project.import_data.credentials) }
-
-  before do
-    allow(Bitbucket::Client).to receive(:new).and_return(client)
-    allow(client).to receive(:repo).and_return(Bitbucket::Representation::Repo.new({ 'has_issues' => true }))
-    allow(client).to receive(:last_issue).and_return(Bitbucket::Representation::Issue.new({ 'id' => 2 }))
-    allow(client).to receive(:issues).and_return(
-      [
-        Bitbucket::Representation::Issue.new({ 'id' => 1 }),
-        Bitbucket::Representation::Issue.new({ 'id' => 2 })
-      ],
-      []
-    )
-  end
-
   subject(:importer) { described_class.new(project) }
 
-  describe '#execute' do
+  shared_examples 'import bitbucket IssuesImporter' do |params|
+    let_it_be(:project) do
+      create(:project, :import_started,
+        import_data_attributes: {
+          data: {
+            'project_key' => 'key',
+            'repo_slug' => 'slug',
+            'bitbucket_import_resumable_worker' => params[:resumable]
+          },
+          credentials: { 'base_uri' => 'http://bitbucket.org/', 'user' => 'bitbucket', 'password' => 'password' }
+        }
+      )
+    end
+
     context 'when the repo does not have issue tracking enabled' do
       before do
         allow(client).to receive(:repo).and_return(Bitbucket::Representation::Repo.new({ 'has_issues' => false }))
@@ -59,18 +48,6 @@
       importer.execute
     end
 
-    context 'when the client raises an error' do
-      before do
-        allow(client).to receive(:issues).and_raise(StandardError)
-      end
-
-      it 'tracks the failure and does not fail' do
-        expect(Gitlab::Import::ImportFailureService).to receive(:track).once
-
-        expect(importer.execute).to be_a(Gitlab::JobWaiter)
-      end
-    end
-
     context 'when issue was already enqueued' do
       before do
         Gitlab::Cache::Import::Caching.set_add(importer.already_enqueued_cache_key, 1)
@@ -86,4 +63,72 @@
       end
     end
   end
+
+  describe '#resumable_execute' do
+    let(:client) { Bitbucket::Client.new(project.import_data.credentials) }
+
+    before do
+      allow(Bitbucket::Client).to receive(:new).and_return(client)
+      allow(client).to receive(:repo).and_return(Bitbucket::Representation::Repo.new({ 'has_issues' => true }))
+      allow(client).to receive(:last_issue).and_return(Bitbucket::Representation::Issue.new({ 'id' => 2 }))
+      page = instance_double('Bitbucket::Page', attrs: [], items: [
+        Bitbucket::Representation::Issue.new({ 'id' => 1 }),
+        Bitbucket::Representation::Issue.new({ 'id' => 2 })
+      ])
+      allow(client).to receive(:each_page).and_yield(page)
+      allow(page).to receive(:next?).and_return(true)
+      allow(page).to receive(:next).and_return('https://example.com/next')
+    end
+
+    it_behaves_like 'import bitbucket IssuesImporter', { resumable: true } do
+      context 'when the client raises an error' do
+        let(:exception) { StandardError.new('error fetching issues') }
+
+        before do
+          allow_next_instance_of(Bitbucket::Client) do |client|
+            allow(client).to receive(:repo).and_raise(exception)
+          end
+        end
+
+        it 'raises the error' do
+          expect { importer.execute }.to raise_error(StandardError, 'error fetching issues')
+        end
+      end
+    end
+  end
+
+  describe '#non_resumable_execute' do
+    let(:client) { Bitbucket::Client.new(project.import_data.credentials) }
+
+    before do
+      allow(Bitbucket::Client).to receive(:new).and_return(client)
+      allow(client).to receive(:repo).and_return(Bitbucket::Representation::Repo.new({ 'has_issues' => true }))
+      allow(client).to receive(:last_issue).and_return(Bitbucket::Representation::Issue.new({ 'id' => 2 }))
+      allow(client).to receive(:issues).and_return(
+        [
+          Bitbucket::Representation::Issue.new({ 'id' => 1 }),
+          Bitbucket::Representation::Issue.new({ 'id' => 2 })
+        ],
+        []
+      )
+    end
+
+    it_behaves_like 'import bitbucket IssuesImporter', { resumable: false } do
+      context 'when the client raises an error' do
+        let(:exception) { StandardError.new('error fetching issues') }
+
+        before do
+          allow(client).to receive(:issues).and_raise(exception)
+        end
+
+        it 'tracks the failure and does not fail' do
+          expect(Gitlab::Import::ImportFailureService).to receive(:track)
+            .once
+            .with(a_hash_including(exception: exception))
+
+          expect(importer.execute).to be_a(Gitlab::JobWaiter)
+        end
+      end
+    end
+  end
 end
diff --git a/spec/lib/gitlab/bitbucket_import/importers/pull_requests_importer_spec.rb b/spec/lib/gitlab/bitbucket_import/importers/pull_requests_importer_spec.rb
index cb292a7d6f3671364fba7c888c97ed05bda177c8..5a2f151a976326aebe33b2edccedc23d8d0fd856 100644
--- a/spec/lib/gitlab/bitbucket_import/importers/pull_requests_importer_spec.rb
+++ b/spec/lib/gitlab/bitbucket_import/importers/pull_requests_importer_spec.rb
@@ -5,14 +5,14 @@
 RSpec.describe Gitlab::BitbucketImport::Importers::PullRequestsImporter, :clean_gitlab_redis_shared_state, feature_category: :importers do
   subject(:importer) { described_class.new(project) }
 
-  shared_examples 'import bitbucket PullRequestsImporter' do |bitbucket_import_resumable_worker|
+  shared_examples 'import bitbucket PullRequestsImporter' do |params|
     let_it_be(:project) do
       create(:project, :import_started,
         import_data_attributes: {
           data: {
             'project_key' => 'key',
             'repo_slug' => 'slug',
-            'bitbucket_import_resumable_worker' => bitbucket_import_resumable_worker
+            'bitbucket_import_resumable_worker' => params[:resumable]
           },
           credentials: { 'base_uri' => 'http://bitbucket.org/', 'user' => 'bitbucket', 'password' => 'password' }
         }
@@ -61,7 +61,7 @@
       end
     end
 
-    it_behaves_like 'import bitbucket PullRequestsImporter', true do
+    it_behaves_like 'import bitbucket PullRequestsImporter', { resumable: true } do
       context 'when the client raises an error' do
         before do
           allow_next_instance_of(Bitbucket::Client) do |client|
@@ -90,7 +90,7 @@
       end
     end
 
-    it_behaves_like 'import bitbucket PullRequestsImporter', false do
+    it_behaves_like 'import bitbucket PullRequestsImporter', { resumable: false } do
       context 'when the client raises an error' do
         let(:exception) { StandardError.new('error fetching PRs') }
 
diff --git a/spec/support/shared_examples/lib/gitlab/bitbucket_import/stage_methods_shared_examples.rb b/spec/support/shared_examples/lib/gitlab/bitbucket_import/stage_methods_shared_examples.rb
index 486c7b72abd40c686d56088bd04649521542ba96..92bf5861186faf1e77627cb432e9f015987c2439 100644
--- a/spec/support/shared_examples/lib/gitlab/bitbucket_import/stage_methods_shared_examples.rb
+++ b/spec/support/shared_examples/lib/gitlab/bitbucket_import/stage_methods_shared_examples.rb
@@ -82,6 +82,8 @@
       end
 
       it 'logs stage start and finish' do
+        allow(worker).to receive(:import)
+
         expect(import_logger_double)
           .to receive(:info)
           .with(