Skip to content
代码片段 群组 项目
提交 709e8889 编辑于 作者: Hannes Moser's avatar Hannes Moser
浏览文件

Add AwarenessChannel for Awareness session subscriptions

The awareness channel allows clients to subscribe to
changes in an Awareness session via a Websocket
connection. It also enables clients to report back any
activity (touch) from their side, and request updates
to the current state of the awareness session.
上级 17e8037e
No related branches found
No related tags found
无相关合并请求
# frozen_string_literal: true
class AwarenessChannel < ApplicationCable::Channel # rubocop:disable Gitlab/NamespacedClass
REFRESH_INTERVAL = ENV.fetch("GITLAB_AWARENESS_REFRESH_INTERVAL_SEC", 60)
private_constant :REFRESH_INTERVAL
# Produces a refresh interval value, based of the
# GITLAB_AWARENESS_REFRESH_INTERVAL_SEC environment variable or the given
# default. Makes sure, that the interval after a jitter is applied, is never
# less than half the predefined interval.
def self.refresh_interval(range: -10..10)
min = REFRESH_INTERVAL / 2.to_f
[min.to_i, REFRESH_INTERVAL.to_i + rand(range)].max.seconds
end
private_class_method :refresh_interval
# keep clients updated about session membership
periodically every: self.refresh_interval do
transmit payload
end
def subscribed
reject unless valid_subscription?
return if subscription_rejected?
stream_for session, coder: ActiveSupport::JSON
session.join(current_user)
AwarenessChannel.broadcast_to(session, payload)
end
def unsubscribed
return if subscription_rejected?
session.leave(current_user)
AwarenessChannel.broadcast_to(session, payload)
end
# Allows a client to let the server know they are still around. This is not
# like a heartbeat mechanism. This can be triggered by any action that results
# in a meaningful "presence" update. Like scrolling the screen (debounce),
# window becoming active, user starting to type in a text field, etc.
def touch
session.touch!(current_user)
transmit payload
end
private
def valid_subscription?
current_user.present? && path.present?
end
def payload
{ collaborators: collaborators }
end
def collaborators
session.online_users_with_last_activity.map do |user, last_activity|
collaborator(user, last_activity)
end
end
def collaborator(user, last_activity)
{
id: user.id,
name: user.name,
avatar_url: user.avatar_url(size: 36),
last_activity: last_activity,
last_activity_humanized: ActionController::Base.helpers.distance_of_time_in_words(
Time.zone.now, last_activity
)
}
end
def session
@session ||= AwarenessSession.for(path)
end
def path
params[:path]
end
end
...@@ -143,17 +143,34 @@ def size ...@@ -143,17 +143,34 @@ def size
end end
end end
def to_param
id&.to_s
end
def to_s
"awareness_session=#{id}"
end
def online_users_with_last_activity(threshold: PRESENCE_LIFETIME)
users_with_last_activity.filter do |_user, last_activity|
user_online?(last_activity, threshold: threshold)
end
end
def users def users
User.where(id: user_ids) User.where(id: user_ids)
end end
def users_with_last_activity def users_with_last_activity
# where in (x, y, [...z]) is a set and does not maintain any order, we need to # where in (x, y, [...z]) is a set and does not maintain any order, we need
# make sure to establish a stable order for both, the pairs returned from # to make sure to establish a stable order for both, the pairs returned from
# redis and the ActiveRecord query. Using IDs in ascending order. # redis and the ActiveRecord query. Using IDs in ascending order.
user_ids, last_activities = user_ids_with_last_activity user_ids, last_activities = user_ids_with_last_activity
.sort_by(&:first) .sort_by(&:first)
.transpose .transpose
return [] if user_ids.blank?
users = User.where(id: user_ids).order(id: :asc) users = User.where(id: user_ids).order(id: :asc)
users.zip(last_activities) users.zip(last_activities)
end end
...@@ -162,6 +179,10 @@ def users_with_last_activity ...@@ -162,6 +179,10 @@ def users_with_last_activity
attr_reader :id attr_reader :id
def user_online?(last_activity, threshold:)
last_activity.to_i + threshold.to_i > Time.zone.now.to_i
end
# converts session id from hex to integer representation # converts session id from hex to integer representation
def id_i def id_i
Integer(id, 16) if id.present? Integer(id, 16) if id.present?
......
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe AwarenessChannel, :clean_gitlab_redis_shared_state, type: :channel do
before do
stub_action_cable_connection(current_user: user)
end
context "with user" do
let(:user) { create(:user) }
describe "when no path parameter given" do
it "rejects subscription" do
subscribe path: nil
expect(subscription).to be_rejected
end
end
describe "with valid path parameter" do
it "successfully subscribes" do
subscribe path: "/test"
session = AwarenessSession.for("/test")
expect(subscription).to be_confirmed
# check if we can use session object instead
expect(subscription).to have_stream_from("awareness:#{session.to_param}")
end
it "broadcasts set of collaborators when subscribing" do
session = AwarenessSession.for("/test")
freeze_time do
collaborator = {
id: user.id,
name: user.name,
avatar_url: user.avatar_url(size: 36),
last_activity: Time.zone.now,
last_activity_humanized: ActionController::Base.helpers.distance_of_time_in_words(
Time.zone.now, Time.zone.now
)
}
expect do
subscribe path: "/test"
end.to have_broadcasted_to("awareness:#{session.to_param}")
.with(collaborators: [collaborator])
end
end
it "transmits payload when user is touched" do
subscribe path: "/test"
perform :touch
expect(transmissions.size).to be 1
end
it "unsubscribes from channel" do
subscribe path: "/test"
session = AwarenessSession.for("/test")
expect { subscription.unsubscribe_from_channel }
.to change { session.size}.by(-1)
end
end
end
context "with guest" do
let(:user) { nil }
it "rejects subscription" do
subscribe path: "/test"
expect(subscription).to be_rejected
end
end
end
...@@ -2,14 +2,24 @@ ...@@ -2,14 +2,24 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe AwarenessSession do RSpec.describe AwarenessSession, :clean_gitlab_redis_shared_state do
subject { AwarenessSession.for(session_id) } subject { AwarenessSession.for(session_id) }
let!(:user) { create(:user) } let!(:user) { create(:user) }
let(:session_id) { 1 } let(:session_id) { 1 }
after do describe "when initiating a session" do
redis_shared_state_cleanup! it "provides a string representation of the model instance" do
expected = "awareness_session=6b86b273ff34fce"
expect(subject.to_s).to eql(expected)
end
it "provides a parameterized version of the session identifier" do
expected = "6b86b273ff34fce"
expect(subject.to_param).to eql(expected)
end
end end
describe "when a user joins a session" do describe "when a user joins a session" do
...@@ -103,6 +113,26 @@ ...@@ -103,6 +113,26 @@
expect(ttl_user).to be > 0 expect(ttl_user).to be > 0
end end
end end
it "fetches user(s) from database" do
subject.join(user)
expect(subject.users.first).to eql(user)
end
it "fetches and filters online user(s) from database" do
subject.join(user)
travel 2.hours do
subject.join(user2)
online_users = subject.online_users_with_last_activity
online_user, _ = online_users.first
expect(online_users.size).to be 1
expect(online_user).to eql(user2)
end
end
end end
describe "when a user leaves a session" do describe "when a user leaves a session" do
......
...@@ -2,15 +2,11 @@ ...@@ -2,15 +2,11 @@
require 'spec_helper' require 'spec_helper'
RSpec.describe Awareness do RSpec.describe Awareness, :clean_gitlab_redis_shared_state do
subject { create(:user) } subject { create(:user) }
let(:session) { AwarenessSession.for(1) } let(:session) { AwarenessSession.for(1) }
after do
redis_shared_state_cleanup!
end
describe "when joining a session" do describe "when joining a session" do
it "increases the number of sessions" do it "increases the number of sessions" do
expect { subject.join(session) } expect { subject.join(session) }
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册