diff --git a/app/services/ci/collect_pipeline_analytics_service.rb b/app/services/ci/collect_pipeline_analytics_service.rb
new file mode 100644
index 0000000000000000000000000000000000000000..3508df9df685b636b4f798a4e66b32c11189153c
--- /dev/null
+++ b/app/services/ci/collect_pipeline_analytics_service.rb
@@ -0,0 +1,66 @@
+# frozen_string_literal: true
+
+module Ci
+  class CollectPipelineAnalyticsService
+    TIME_BUCKETS_LIMIT = 1.week.in_hours + 1 # +1 to add some error margin
+
+    STATUS_GROUP_TO_STATUSES = { success: %w[success], failed: %w[failed], other: %w[canceled skipped] }.freeze
+    STATUS_GROUPS = STATUS_GROUP_TO_STATUSES.keys.freeze
+    STATUS_TO_STATUS_GROUP = STATUS_GROUP_TO_STATUSES.flat_map { |k, v| v.product([k]) }.to_h
+
+    def initialize(current_user:, project:, from_time:, to_time:, status_groups: [:all])
+      @current_user = current_user
+      @project = project
+      @status_groups = status_groups
+      @from_time = from_time || 1.week.ago.utc
+      @to_time = to_time || Time.now.utc
+    end
+
+    def execute
+      return ServiceResponse.error(message: 'Project must be specified') unless @project
+
+      unless ::Gitlab::ClickHouse.configured?
+        return ServiceResponse.error(message: 'ClickHouse database is not configured')
+      end
+
+      return ServiceResponse.error(message: 'Not allowed') unless allowed?
+
+      if (@to_time - @from_time) / 1.hour > TIME_BUCKETS_LIMIT
+        return ServiceResponse.error(message: "Maximum of #{TIME_BUCKETS_LIMIT} 1-hour intervals can be requested")
+      end
+
+      ServiceResponse.success(payload: { aggregate: calculate_aggregate })
+    end
+
+    private
+
+    def allowed?
+      @current_user&.can?(:read_ci_cd_analytics, @project)
+    end
+
+    def clickhouse_model
+      ::ClickHouse::Models::Ci::FinishedPipelinesHourly
+    end
+
+    def calculate_aggregate
+      result = @status_groups.index_with(0)
+      query = clickhouse_model.for_project(@project).within_dates(@from_time, @to_time)
+      if @status_groups.include?(:all)
+        all_query = query.select(query.count_pipelines_function.as('all'))
+        result[:all] = ::ClickHouse::Client.select(all_query.to_sql, :main).first['all']
+      end
+
+      if @status_groups.intersect?(STATUS_GROUPS)
+        query = query
+          .select(:status, query.count_pipelines_function.as('count'))
+          .by_status(@status_groups.flat_map(&STATUS_GROUP_TO_STATUSES).compact)
+          .group_by_status
+
+        result_by_status = ::ClickHouse::Client.select(query.to_sql, :main).map(&:values).to_h
+        result_by_status.each_pair { |status, count| result[STATUS_TO_STATUS_GROUP[status]] += count }
+      end
+
+      result
+    end
+  end
+end
diff --git a/ee/spec/services/ci/collect_queueing_history_service_spec.rb b/ee/spec/services/ci/collect_queueing_history_service_spec.rb
index 7b09bdae88bde006ebd674f91db2a250e139d26a..62f4b97c60a0dd7fe516e442963b193124c6c7e2 100644
--- a/ee/spec/services/ci/collect_queueing_history_service_spec.rb
+++ b/ee/spec/services/ci/collect_queueing_history_service_spec.rb
@@ -14,7 +14,7 @@
   let(:percentiles) { [50, 75, 90, 95, 99] }
   let(:runner_type) { nil }
   let(:from_time) { starting_time }
-  let(:to_time) { starting_time + 3.hours }
+  let(:to_time) { 3.hours.after(starting_time) }
   let(:owner_namespace) { nil }
 
   let(:service) do
@@ -200,13 +200,13 @@
       builds = [from_time - 1.second,
         from_time,
         to_time,
-        to_time + 5.minutes + 1.second].map do |started_at|
+        (5.minutes + 1.second).after(to_time)].map do |started_at|
         build_stubbed(:ci_build,
           :success,
-          created_at: started_at - 1.minute,
-          queued_at: started_at - 1.minute,
+          created_at: 1.minute.before(started_at),
+          queued_at: 1.minute.before(started_at),
           started_at: started_at,
-          finished_at: started_at + 10.minutes,
+          finished_at: 10.minutes.after(started_at),
           runner: runner,
           runner_manager: runner.runner_managers.first)
       end
@@ -264,7 +264,7 @@
     end
 
     context 'when requesting more that TIME_BUCKETS_LIMIT' do
-      let(:to_time) { starting_time + 190.minutes }
+      let(:to_time) { 190.minutes.after(starting_time) }
 
       it 'returns error' do
         expect(result.error?).to eq(true)
diff --git a/lib/click_house/models/ci/finished_pipelines_base.rb b/lib/click_house/models/ci/finished_pipelines_base.rb
index 86e5ab92e80945487160395da36df6e8b02a194a..e457851391c8423b5d483c6fff7e517f1d5242b9 100644
--- a/lib/click_house/models/ci/finished_pipelines_base.rb
+++ b/lib/click_house/models/ci/finished_pipelines_base.rb
@@ -20,6 +20,18 @@ def for_project(project)
           where(path: project.project_namespace.traversal_path)
         end
 
+        def within_dates(from_time, to_time)
+          query = self
+          started_at_bucket = @query_builder.table[:started_at_bucket]
+
+          # rubocop: disable CodeReuse/ActiveRecord -- this is a ClickHouse model
+          query = query.where(started_at_bucket.gteq(format_time(from_time))) if from_time
+          query = query.where(started_at_bucket.lt(format_time(to_time))) if to_time
+          # rubocop: enable CodeReuse/ActiveRecord
+
+          query
+        end
+
         def by_status(statuses)
           where(status: statuses)
         end
@@ -31,6 +43,16 @@ def group_by_status
         def count_pipelines_function
           Arel::Nodes::NamedFunction.new('countMerge', [@query_builder.table[:count_pipelines]])
         end
+
+        private
+
+        def format_time(date)
+          Arel::Nodes::NamedFunction.new('toDateTime64', [
+            Arel::Nodes::SqlLiteral.new(date.utc.strftime("'%Y-%m-%d %H:%M:%S'")),
+            6,
+            Arel::Nodes.build_quoted('UTC')
+          ])
+        end
       end
     end
   end
diff --git a/spec/services/ci/collect_pipeline_analytics_service_spec.rb b/spec/services/ci/collect_pipeline_analytics_service_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..fbf56774258eeaf2f3ce9352912c8a288d6bcd71
--- /dev/null
+++ b/spec/services/ci/collect_pipeline_analytics_service_spec.rb
@@ -0,0 +1,195 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe ::Ci::CollectPipelineAnalyticsService, :click_house, :enable_admin_mode,
+  feature_category: :fleet_visibility do
+  include ClickHouseHelpers
+
+  let_it_be(:project1) { create(:project).tap(&:reload) } # reload required to calculate traversal path
+  let_it_be(:project2) { create(:project).tap(&:reload) }
+  let_it_be(:current_user) { create(:user, reporter_of: [project1, project2]) }
+
+  let_it_be(:starting_time) { Time.utc(2023) }
+  let_it_be(:ending_time) { 1.week.after(Time.utc(2023)) }
+
+  let(:project) { project1 }
+  let(:status_groups) { [:all] }
+  let(:from_time) { starting_time }
+  let(:to_time) { ending_time }
+
+  let(:service) do
+    described_class.new(
+      current_user: current_user,
+      project: project,
+      from_time: from_time,
+      to_time: to_time,
+      status_groups: status_groups)
+  end
+
+  let(:pipelines) do
+    [
+      create_pipeline(project1, :running, 35.minutes.before(ending_time), 30.minutes),
+      create_pipeline(project1, :success, 1.day.before(ending_time), 30.minutes),
+      create_pipeline(project1, :canceled, 1.hour.before(ending_time), 1.minute),
+      create_pipeline(project1, :failed, 5.days.before(ending_time), 2.hours),
+      create_pipeline(project1, :failed, 1.week.before(ending_time), 45.minutes),
+      create_pipeline(project1, :skipped, 5.days.before(ending_time), 1.second),
+      create_pipeline(project1, :skipped, 1.second.before(starting_time), 45.minutes),
+      create_pipeline(project1, :success, ending_time, 30.minutes)
+    ]
+  end
+
+  subject(:result) { service.execute }
+
+  before do
+    insert_ci_pipelines_to_click_house(pipelines)
+  end
+
+  context 'when ClickHouse database is not configured' do
+    before do
+      allow(::Gitlab::ClickHouse).to receive(:configured?).and_return(false)
+    end
+
+    it 'returns error' do
+      expect(result.error?).to eq(true)
+      expect(result.errors).to contain_exactly('ClickHouse database is not configured')
+    end
+  end
+
+  shared_examples 'returns Not allowed error' do
+    it 'returns error' do
+      expect(result.error?).to eq(true)
+      expect(result.errors).to contain_exactly('Not allowed')
+    end
+  end
+
+  shared_examples 'a service returning aggregate analytics' do
+    using RSpec::Parameterized::TableSyntax
+
+    where(:status_groups, :expected_aggregate) do
+      %i[all]           | { all: 6 }
+      %i[all success]   | { all: 6, success: 1 }
+      %i[success other] | { success: 1, other: 2 }
+      %i[failed]        | { failed: 2 }
+    end
+
+    with_them do
+      it 'returns aggregate analytics' do
+        expect(result.success?).to eq(true)
+        expect(result.errors).to eq([])
+        expect(result.payload[:aggregate]).to eq(expected_aggregate)
+      end
+    end
+
+    context 'when dates are not specified' do
+      let(:from_time) { nil }
+      let(:to_time) { nil }
+
+      context 'and there are pipelines in the last week', time_travel_to: '2023-01-08' do
+        it 'returns aggregate analytics from last week' do
+          expect(result.errors).to eq([])
+          expect(result.success?).to eq(true)
+          expect(result.payload[:aggregate]).to eq({ all: 6 })
+        end
+      end
+
+      context 'and there are no pipelines in the last week', time_travel_to: '2023-01-15 00:00:01' do
+        it 'returns empty aggregate analytics' do
+          expect(result.errors).to eq([])
+          expect(result.success?).to eq(true)
+          expect(result.payload[:aggregate]).to eq({ all: 0 })
+        end
+      end
+    end
+
+    context 'when requesting statistics starting one second before beginning of week' do
+      let(:from_time) { 1.second.before(starting_time) }
+
+      it 'does not include job starting 1 second before start of week' do
+        expect(result.errors).to eq([])
+        expect(result.success?).to eq(true)
+        expect(result.payload[:aggregate]).to eq({ all: 6 })
+      end
+    end
+
+    context 'when requesting statistics starting one hour before beginning of week' do
+      let(:from_time) { 1.hour.before(starting_time) }
+
+      it 'includes job starting 1 second before start of week' do
+        expect(result.errors).to eq([])
+        expect(result.success?).to eq(true)
+        expect(result.payload[:aggregate]).to eq({ all: 7 })
+      end
+    end
+
+    context 'when requesting hourly statistics that span more than one week' do
+      let(:from_time) { (1.hour + 1.second).before(starting_time) }
+
+      it 'returns an error' do
+        expect(result.errors).to contain_exactly(
+          "Maximum of #{described_class::TIME_BUCKETS_LIMIT} 1-hour intervals can be requested")
+        expect(result.error?).to eq(true)
+      end
+    end
+
+    context 'when a different project is specified' do
+      let(:project) { project2 }
+      let(:status_groups) { %i[all success failed] }
+
+      before do
+        insert_ci_pipelines_to_click_house([
+          create_pipeline(project2, :failed, 1.week.before(ending_time), 45.minutes)
+        ])
+      end
+
+      it 'returns aggregate analytics for specified project only' do
+        expect(result.success?).to eq(true)
+        expect(result.errors).to eq([])
+        expect(result.payload[:aggregate]).to eq({ all: 1, success: 0, failed: 1 })
+      end
+    end
+  end
+
+  it_behaves_like 'a service returning aggregate analytics'
+
+  context 'when user is nil' do
+    let(:current_user) { nil }
+
+    include_examples 'returns Not allowed error'
+  end
+
+  context 'when project has analytics disabled' do
+    let_it_be(:project) { create(:project, :analytics_disabled) }
+
+    include_examples 'returns Not allowed error'
+  end
+
+  context 'when project is not specified' do
+    let(:project) { nil }
+
+    it 'returns error' do
+      expect(result.error?).to eq(true)
+      expect(result.errors).to contain_exactly('Project must be specified')
+    end
+  end
+
+  context 'when user is an admin' do
+    let(:current_user) { create(:admin) }
+
+    it_behaves_like 'a service returning aggregate analytics'
+  end
+
+  context 'when user is a guest' do
+    let_it_be(:current_user) { create(:user, guest_of: project1) }
+
+    include_examples 'returns Not allowed error'
+  end
+
+  def create_pipeline(project, status, started_at, duration)
+    build_stubbed(:ci_pipeline, status,
+      project: project,
+      created_at: 1.second.before(started_at), started_at: started_at, finished_at: duration.after(started_at),
+      duration: duration)
+  end
+end
diff --git a/spec/support/helpers/click_house_helpers.rb b/spec/support/helpers/click_house_helpers.rb
index 469112988d25cecc526658486ed7317bac398477..455fb17869b128438b9edebd01f240d96a3b1ba6 100644
--- a/spec/support/helpers/click_house_helpers.rb
+++ b/spec/support/helpers/click_house_helpers.rb
@@ -41,6 +41,18 @@ def insert_ci_builds_to_click_house(builds)
   # rubocop:enable Metrics/CyclomaticComplexity
   # rubocop:enable Metrics/PerceivedComplexity
 
+  def insert_ci_pipelines_to_click_house(pipelines)
+    result = clickhouse_fixture(:ci_finished_pipelines, pipelines.map do |pipeline|
+      pipeline.slice(
+        %i[id duration status source ref committed_at created_at started_at finished_at]).symbolize_keys
+           .merge(
+             path: pipeline.project&.project_namespace&.traversal_path || '0/'
+           )
+    end)
+
+    expect(result).to eq(true)
+  end
+
   def self.default_timezone
     ActiveRecord.default_timezone
   end
diff --git a/spec/support/shared_examples/click_house/models/ci/ci_finished_builds_shared_examples.rb b/spec/support/shared_examples/click_house/models/ci/ci_finished_builds_shared_examples.rb
index dcfc1ccc27100d259c14455ff80862e7f15e2653..80bf4060b8df16179ced709cd3731ee64f051b36 100644
--- a/spec/support/shared_examples/click_house/models/ci/ci_finished_builds_shared_examples.rb
+++ b/spec/support/shared_examples/click_house/models/ci/ci_finished_builds_shared_examples.rb
@@ -22,6 +22,51 @@
     end
   end
 
+  describe '#within_dates' do
+    let(:from_time) { 1.hour.ago }
+    let(:to_time) { Time.current }
+
+    subject(:result_sql) { instance.within_dates(from_time, to_time).to_sql }
+
+    it 'builds the correct SQL' do
+      expected_sql = <<~SQL.lines(chomp: true).join(' ')
+        SELECT * FROM "#{table_name}"
+        WHERE "#{table_name}"."started_at_bucket" >= toDateTime64('#{from_time.utc.strftime('%Y-%m-%d %H:%M:%S')}', 6, 'UTC')
+        AND "#{table_name}"."started_at_bucket" < toDateTime64('#{to_time.utc.strftime('%Y-%m-%d %H:%M:%S')}', 6, 'UTC')
+      SQL
+
+      expect(result_sql.strip).to eq(expected_sql.strip)
+    end
+
+    context 'when only from_date is passed' do
+      let(:from_time) { 1.hour.ago }
+      let(:to_time) { nil }
+
+      it 'builds the correct SQL' do
+        expected_sql = <<~SQL.lines(chomp: true).join(' ')
+          SELECT * FROM "#{table_name}"
+          WHERE "#{table_name}"."started_at_bucket" >= toDateTime64('#{from_time.utc.strftime('%Y-%m-%d %H:%M:%S')}', 6, 'UTC')
+        SQL
+
+        expect(result_sql.strip).to eq(expected_sql.strip)
+      end
+    end
+
+    context 'when only to_date is passed' do
+      let(:from_time) { nil }
+      let(:to_time) { Time.current }
+
+      it 'builds the correct SQL' do
+        expected_sql = <<~SQL.lines(chomp: true).join(' ')
+          SELECT * FROM "#{table_name}"
+          WHERE "#{table_name}"."started_at_bucket" < toDateTime64('#{to_time.utc.strftime('%Y-%m-%d %H:%M:%S')}', 6, 'UTC')
+        SQL
+
+        expect(result_sql.strip).to eq(expected_sql.strip)
+      end
+    end
+  end
+
   describe '#by_status' do
     subject(:result_sql) { instance.by_status(%i[failed success]).to_sql }
 
@@ -98,13 +143,25 @@
     end
 
     it 'builds the correct SQL with chained methods' do
+      from_time = 1.hour.ago
+      to_time = Time.current
+
       expected_sql = <<~SQL.lines(chomp: true).join(' ')
         SELECT "#{table_name}"."status" FROM "#{table_name}"
         WHERE "#{table_name}"."path" = '#{path}'
+        AND "#{table_name}"."started_at_bucket" >= toDateTime64('#{from_time.utc.strftime('%Y-%m-%d %H:%M:%S')}', 6, 'UTC')
+        AND "#{table_name}"."started_at_bucket" < toDateTime64('#{to_time.utc.strftime('%Y-%m-%d %H:%M:%S')}', 6, 'UTC')
         AND "#{table_name}"."status" IN ('failed', 'success')
         GROUP BY "#{table_name}"."status"
       SQL
 
+      result_sql = instance
+        .for_project(project)
+        .select(:status)
+        .within_dates(from_time, to_time)
+        .by_status(%i[failed success])
+        .group_by_status.to_sql
+
       expect(result_sql.strip).to eq(expected_sql.strip)
     end
   end