diff --git a/config/feature_flags/development/workhorse_use_sidechannel.yml b/config/feature_flags/development/workhorse_use_sidechannel.yml new file mode 100644 index 0000000000000000000000000000000000000000..f39d313bf1a34e42ee23af01017a8c9e3ec81297 --- /dev/null +++ b/config/feature_flags/development/workhorse_use_sidechannel.yml @@ -0,0 +1,8 @@ +--- +name: workhorse_use_sidechannel +introduced_by_url: +rollout_issue_url: https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1193 +milestone: '14.4' +type: development +group: 'group::scalability' +default_enabled: false diff --git a/lib/gitlab/workhorse.rb b/lib/gitlab/workhorse.rb index 0f33c3aa68eb765940ca9b9b06e6f30981c88cfc..5780e4d6da85ebd3174ff9bfdfbf6d6faa5de46d 100644 --- a/lib/gitlab/workhorse.rb +++ b/lib/gitlab/workhorse.rb @@ -32,7 +32,8 @@ def git_http_ok(repository, repo_type, user, action, show_all_refs: false) GitalyServer: { address: Gitlab::GitalyClient.address(repository.storage), token: Gitlab::GitalyClient.token(repository.storage), - features: Feature::Gitaly.server_feature_flags(repository.project) + features: Feature::Gitaly.server_feature_flags(repository.project), + sidechannel: Feature.enabled?(:workhorse_use_sidechannel, repository.project, default_enabled: :yaml) } } diff --git a/spec/lib/gitlab/workhorse_spec.rb b/spec/lib/gitlab/workhorse_spec.rb index 09f90a3e5b687787cca28c40535b86a5b3e5bd48..8ba56af561dc92d83f087fa2cf7b2e46b84c31dd 100644 --- a/spec/lib/gitlab/workhorse_spec.rb +++ b/spec/lib/gitlab/workhorse_spec.rb @@ -244,13 +244,15 @@ def call_verify(headers) GitalyServer: { features: { 'gitaly-feature-enforce-requests-limits' => 'true' }, address: Gitlab::GitalyClient.address('default'), - token: Gitlab::GitalyClient.token('default') + token: Gitlab::GitalyClient.token('default'), + sidechannel: false } } end before do allow(Gitlab.config.gitaly).to receive(:enabled).and_return(true) + stub_feature_flags(workhorse_use_sidechannel: false) end it 'includes a Repository param' do @@ -332,6 +334,46 @@ def call_verify(headers) it { expect { subject }.to raise_exception('Unsupported action: download') } end + + context 'when workhorse_use_sidechannel flag is set' do + context 'when a feature flag is set globally' do + before do + stub_feature_flags(workhorse_use_sidechannel: true) + end + + it 'sets the flag to true' do + response = described_class.git_http_ok(repository, Gitlab::GlRepository::PROJECT, user, action) + + expect(response.dig(:GitalyServer, :sidechannel)).to eq(true) + end + end + + context 'when a feature flag is set for a single project' do + before do + stub_feature_flags(workhorse_use_sidechannel: project) + end + + it 'sets the flag to true for that project' do + response = described_class.git_http_ok(repository, Gitlab::GlRepository::PROJECT, user, action) + + expect(response.dig(:GitalyServer, :sidechannel)).to eq(true) + end + + it 'sets the flag to false for other projects' do + other_project = create(:project, :public, :repository) + response = described_class.git_http_ok(other_project.repository, Gitlab::GlRepository::PROJECT, user, action) + + expect(response.dig(:GitalyServer, :sidechannel)).to eq(false) + end + + it 'sets the flag to false when there is no project' do + snippet = create(:personal_snippet, :repository) + response = described_class.git_http_ok(snippet.repository, Gitlab::GlRepository::SNIPPET, user, action) + + expect(response.dig(:GitalyServer, :sidechannel)).to eq(false) + end + end + end end context 'when receive_max_input_size has been updated' do diff --git a/workhorse/gitaly_test.go b/workhorse/gitaly_test.go index 42aaad6e02db02231b24adad703f6a82542bc635..4ace925001a2b32aec0d7846a329f90c418d7c5e 100644 --- a/workhorse/gitaly_test.go +++ b/workhorse/gitaly_test.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "io" "io/ioutil" "math/rand" "net" @@ -20,12 +21,16 @@ import ( "github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868 "github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868 + "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + gitalyclient "gitlab.com/gitlab-org/gitaly/v14/client" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/api" "gitlab.com/gitlab-org/gitlab/workhorse/internal/git" "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly" @@ -375,12 +380,24 @@ func TestPostReceivePackRouting(t *testing.T) { } } +type gitalyServerStarter func(*testing.T, codes.Code) (*combinedServer, string) + // ReaderFunc is an adapter to turn a conforming function into an io.Reader. type ReaderFunc func(b []byte) (int, error) func (r ReaderFunc) Read(b []byte) (int, error) { return r(b) } func TestPostUploadPackProxiedToGitalySuccessfully(t *testing.T) { + testPostUploadPackProxiedToGitalySuccessfully(t, startGitalyServer, gitOkBody(t)) +} + +func TestPostUploadPackWithSidechannelProxiedToGitalySuccessfully(t *testing.T) { + testPostUploadPackProxiedToGitalySuccessfully( + t, startGitalyServerWithSideChannel(testhelper.PostUploadPackWithSidechannel), gitOkBodyWithSidechannel(t), + ) +} + +func testPostUploadPackProxiedToGitalySuccessfully(t *testing.T, startGitaly gitalyServerStarter, apiResponse *api.Response) { for i, tc := range []struct { showAllRefs bool code codes.Code @@ -391,10 +408,9 @@ func TestPostUploadPackProxiedToGitalySuccessfully(t *testing.T) { {false, codes.Unavailable}, } { t.Run(fmt.Sprintf("Case %d", i), func(t *testing.T) { - apiResponse := gitOkBody(t) apiResponse.ShowAllRefs = tc.showAllRefs - gitalyServer, socketPath := startGitalyServer(t, tc.code) + gitalyServer, socketPath := startGitaly(t, tc.code) defer gitalyServer.GracefulStop() apiResponse.GitalyServer.Address = "unix:" + socketPath @@ -460,8 +476,16 @@ func TestPostUploadPackProxiedToGitalySuccessfully(t *testing.T) { func TestPostUploadPackProxiedToGitalyInterrupted(t *testing.T) { apiResponse := gitOkBody(t) + testPostUploadPackProxiedToGitalyInterrupted(t, startGitalyServer, apiResponse) +} - gitalyServer, socketPath := startGitalyServer(t, codes.OK) +func TestPostUploadPackWithSidechannelProxiedToGitalyInterrupted(t *testing.T) { + apiResponse := gitOkBodyWithSidechannel(t) + testPostUploadPackProxiedToGitalyInterrupted(t, startGitalyServerWithSideChannel(testhelper.PostUploadPackWithSidechannel), apiResponse) +} + +func testPostUploadPackProxiedToGitalyInterrupted(t *testing.T, startGitaly gitalyServerStarter, apiResponse *api.Response) { + gitalyServer, socketPath := startGitaly(t, codes.OK) defer gitalyServer.GracefulStop() apiResponse.GitalyServer.Address = "unix:" + socketPath @@ -493,10 +517,19 @@ func TestPostUploadPackProxiedToGitalyInterrupted(t *testing.T) { } func TestPostUploadPackRouting(t *testing.T) { - gitalyServer, socketPath := startGitalyServer(t, codes.OK) + apiResponse := gitOkBody(t) + testPostUploadPackRouting(t, startGitalyServer, apiResponse) +} + +func TestPostUploadPackWithSidechannelRouting(t *testing.T) { + apiResponse := gitOkBodyWithSidechannel(t) + testPostUploadPackRouting(t, startGitalyServerWithSideChannel(testhelper.PostUploadPackWithSidechannel), apiResponse) +} + +func testPostUploadPackRouting(t *testing.T, startGitaly gitalyServerStarter, apiResponse *api.Response) { + gitalyServer, socketPath := startGitaly(t, codes.OK) defer gitalyServer.GracefulStop() - apiResponse := gitOkBody(t) apiResponse.GitalyServer.Address = "unix:" + socketPath ts := testAuthServer(t, nil, nil, 200, apiResponse) defer ts.Close() @@ -869,3 +902,21 @@ func startGitalyServer(t *testing.T, finalMessageCode codes.Code) (*combinedServ return &combinedServer{Server: server, GitalyTestServer: gitalyServer}, socketPath } + +func startGitalyServerWithSideChannel(handler func(interface{}, grpc.ServerStream, io.ReadWriteCloser) error) gitalyServerStarter { + return func(t *testing.T, finalMessageCode codes.Code) (*combinedServer, string) { + socketPath := path.Join(scratchDir, fmt.Sprintf("gitaly-%d.sock", rand.Int())) + if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) { + t.Fatal(err) + } + server := grpc.NewServer(gitalyclient.TestSidechannelServer(logrus.NewEntry(logrus.StandardLogger()), insecure.NewCredentials(), handler)...) + listener, err := net.Listen("unix", socketPath) + require.NoError(t, err) + + gitalyServer := testhelper.NewGitalyServer(finalMessageCode) + + go server.Serve(listener) + + return &combinedServer{Server: server, GitalyTestServer: gitalyServer}, socketPath + } +} diff --git a/workhorse/internal/gitaly/gitaly.go b/workhorse/internal/gitaly/gitaly.go index 6ea9996205660ef5f9bbd0bb0306b4ada569ae56..362f380dc4d24661d5019f7ed644373879af4492 100644 --- a/workhorse/internal/gitaly/gitaly.go +++ b/workhorse/internal/gitaly/gitaly.go @@ -11,6 +11,7 @@ import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -23,15 +24,19 @@ import ( ) type Server struct { - Address string `json:"address"` - Token string `json:"token"` - Features map[string]string `json:"features"` + Address string `json:"address"` + Token string `json:"token"` + Features map[string]string `json:"features"` + Sidechannel bool `json:"sidechannel"` } -type cacheKey struct{ address, token string } +type cacheKey struct { + address, token string + sidechannel bool +} func (server Server) cacheKey() cacheKey { - return cacheKey{address: server.Address, token: server.Token} + return cacheKey{address: server.Address, token: server.Token, sidechannel: server.Sidechannel} } type connectionsCache struct { @@ -41,9 +46,17 @@ type connectionsCache struct { var ( jsonUnMarshaler = jsonpb.Unmarshaler{AllowUnknownFields: true} - cache = connectionsCache{ + // This connection cache map contains two types of connections: + // - Normal gRPC connections + // - Sidechannel connections. When client dials to the Gitaly server, the + // server multiplexes the connection using Yamux. In the future, the server + // can open another stream to transfer data without gRPC. Besides, we apply + // a framing protocol to add the half-close capability to Yamux streams. + // Hence, we cannot use those connections interchangeably. + cache = connectionsCache{ connections: make(map[cacheKey]*grpc.ClientConn), } + sidechannelRegistry *gitalyclient.SidechannelRegistry connectionsTotal = promauto.NewCounterVec( prometheus.CounterOpts{ @@ -54,6 +67,12 @@ var ( ) ) +func InitializeSidechannelRegistry(logger *logrus.Logger) { + if sidechannelRegistry == nil { + sidechannelRegistry = gitalyclient.NewSidechannelRegistry(logrus.NewEntry(logger)) + } +} + func withOutgoingMetadata(ctx context.Context, features map[string]string) context.Context { md := metadata.New(nil) for k, v := range features { @@ -72,7 +91,12 @@ func NewSmartHTTPClient(ctx context.Context, server Server) (context.Context, *S return nil, nil, err } grpcClient := gitalypb.NewSmartHTTPServiceClient(conn) - return withOutgoingMetadata(ctx, server.Features), &SmartHTTPClient{grpcClient}, nil + smartHTTPClient := &SmartHTTPClient{ + SmartHTTPServiceClient: grpcClient, + sidechannelRegistry: sidechannelRegistry, + useSidechannel: server.Sidechannel, + } + return withOutgoingMetadata(ctx, server.Features), smartHTTPClient, nil } func NewBlobClient(ctx context.Context, server Server) (context.Context, *BlobClient, error) { @@ -173,7 +197,13 @@ func newConnection(server Server) (*grpc.ClientConn, error) { ), ) - conn, connErr := gitalyclient.Dial(server.Address, connOpts) + var conn *grpc.ClientConn + var connErr error + if server.Sidechannel { + conn, connErr = gitalyclient.DialSidechannel(context.Background(), server.Address, sidechannelRegistry, connOpts) // lint:allow context.Background + } else { + conn, connErr = gitalyclient.Dial(server.Address, connOpts) + } label := "ok" if connErr != nil { diff --git a/workhorse/internal/gitaly/gitaly_test.go b/workhorse/internal/gitaly/gitaly_test.go index b17fb5c1d7b952b162b0a3bfe5c36e7f6ad7c442..9c54caae8c6c2a1a4f519d9b8d2f102cdb5dcdb6 100644 --- a/workhorse/internal/gitaly/gitaly_test.go +++ b/workhorse/internal/gitaly/gitaly_test.go @@ -4,14 +4,32 @@ import ( "context" "testing" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "google.golang.org/grpc/metadata" ) func TestNewSmartHTTPClient(t *testing.T) { - ctx, _, err := NewSmartHTTPClient(context.Background(), serverFixture()) + ctx, client, err := NewSmartHTTPClient(context.Background(), serverFixture()) require.NoError(t, err) testOutgoingMetadata(t, ctx) + + require.False(t, client.useSidechannel) + require.Nil(t, client.sidechannelRegistry) +} + +func TestNewSmartHTTPClientWithSidechannel(t *testing.T) { + InitializeSidechannelRegistry(logrus.StandardLogger()) + + fixture := serverFixture() + fixture.Sidechannel = true + + ctx, client, err := NewSmartHTTPClient(context.Background(), fixture) + require.NoError(t, err) + testOutgoingMetadata(t, ctx) + + require.True(t, client.useSidechannel) + require.NotNil(t, client.sidechannelRegistry) } func TestNewBlobClient(t *testing.T) { diff --git a/workhorse/internal/gitaly/smarthttp.go b/workhorse/internal/gitaly/smarthttp.go index 69656ab0a924290ceabaf71dabb66c6ea8b72794..de6954efa602d7f1205de45c1f8d608145c39d5a 100644 --- a/workhorse/internal/gitaly/smarthttp.go +++ b/workhorse/internal/gitaly/smarthttp.go @@ -5,11 +5,14 @@ import ( "fmt" "io" + gitalyclient "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" ) type SmartHTTPClient struct { + useSidechannel bool + sidechannelRegistry *gitalyclient.SidechannelRegistry gitalypb.SmartHTTPServiceClient } @@ -93,6 +96,14 @@ func (client *SmartHTTPClient) ReceivePack(ctx context.Context, repo *gitalypb.R } func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error { + if client.useSidechannel { + return client.runUploadPackWithSidechannel(ctx, repo, clientRequest, clientResponse, gitConfigOptions, gitProtocol) + } + + return client.runUploadPack(ctx, repo, clientRequest, clientResponse, gitConfigOptions, gitProtocol) +} + +func (client *SmartHTTPClient) runUploadPack(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error { stream, err := client.PostUploadPack(ctx) if err != nil { return err @@ -137,3 +148,38 @@ func (client *SmartHTTPClient) UploadPack(ctx context.Context, repo *gitalypb.Re return nil } + +func (client *SmartHTTPClient) runUploadPackWithSidechannel(ctx context.Context, repo *gitalypb.Repository, clientRequest io.Reader, clientResponse io.Writer, gitConfigOptions []string, gitProtocol string) error { + ctx, waiter := client.sidechannelRegistry.Register(ctx, func(conn gitalyclient.SidechannelConn) error { + if _, err := io.Copy(conn, clientRequest); err != nil { + return err + } + + if err := conn.CloseWrite(); err != nil { + return fmt.Errorf("fail to signal sidechannel half-close: %w", err) + } + + if _, err := io.Copy(clientResponse, conn); err != nil { + return err + } + + return nil + }) + defer waiter.Close() + + rpcRequest := &gitalypb.PostUploadPackWithSidechannelRequest{ + Repository: repo, + GitConfigOptions: gitConfigOptions, + GitProtocol: gitProtocol, + } + + if _, err := client.PostUploadPackWithSidechannel(ctx, rpcRequest); err != nil { + return err + } + + if err := waiter.Close(); err != nil { + return fmt.Errorf("fail to close sidechannel connection: %w", err) + } + + return nil +} diff --git a/workhorse/internal/testhelper/gitaly.go b/workhorse/internal/testhelper/gitaly.go index 13e71c570a7cc125c12077f83d38d6af7460f968..da2fbf30785329e5cd95ddacd28360c95d51e54f 100644 --- a/workhorse/internal/testhelper/gitaly.go +++ b/workhorse/internal/testhelper/gitaly.go @@ -1,6 +1,7 @@ package testhelper import ( + "bytes" "fmt" "io" "io/ioutil" @@ -11,6 +12,7 @@ import ( "github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868 "github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868 "golang.org/x/net/context" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -23,6 +25,7 @@ type GitalyTestServer struct { finalMessageCode codes.Code sync.WaitGroup LastIncomingMetadata metadata.MD + gitalypb.UnimplementedSmartHTTPServiceServer gitalypb.UnimplementedRepositoryServiceServer gitalypb.UnimplementedBlobServiceServer gitalypb.UnimplementedDiffServiceServer @@ -191,13 +194,14 @@ func (s *GitalyTestServer) PostUploadPack(stream gitalypb.SmartHTTPService_PostU return err } - jsonString, err := marshalJSON(req) - if err != nil { + marshaler := &jsonpb.Marshaler{} + jsonBytes := &bytes.Buffer{} + if err := marshaler.Marshal(jsonBytes, req); err != nil { return err } if err := stream.Send(&gitalypb.PostUploadPackResponse{ - Data: []byte(strings.Join([]string{jsonString}, "\000") + "\000"), + Data: append(jsonBytes.Bytes(), 0), }); err != nil { return err } @@ -229,6 +233,43 @@ func (s *GitalyTestServer) PostUploadPack(stream gitalypb.SmartHTTPService_PostU return s.finalError() } +// PostUploadPackWithSidechannel should be a part of smarthttp server in real +// server. In workhorse, setting up a real sidechannel server is troublesome. +// Therefore, we bring up a sidechannel server with a mock server exported via +// gitalyclient.TestSidechannelServer. This is the handler for that mock +// server. +func PostUploadPackWithSidechannel(srv interface{}, stream grpc.ServerStream, conn io.ReadWriteCloser) error { + if method, ok := grpc.Method(stream.Context()); !ok || method != "/gitaly.SmartHTTPService/PostUploadPackWithSidechannel" { + return fmt.Errorf("unexpected method: %s", method) + } + + var req gitalypb.PostUploadPackWithSidechannelRequest + if err := stream.RecvMsg(&req); err != nil { + return err + } + + if err := validateRepository(req.GetRepository()); err != nil { + return err + } + + marshaler := &jsonpb.Marshaler{} + jsonBytes := &bytes.Buffer{} + if err := marshaler.Marshal(jsonBytes, &req); err != nil { + return err + } + + // Bounce back all data back to the client, plus flushing bytes + if _, err := conn.Write(append(jsonBytes.Bytes(), 0)); err != nil { + return err + } + + if _, err := io.Copy(conn, conn); err != nil { + return err + } + + return stream.SendMsg(&gitalypb.PostUploadPackWithSidechannelResponse{}) +} + func (s *GitalyTestServer) CommitIsAncestor(ctx context.Context, in *gitalypb.CommitIsAncestorRequest) (*gitalypb.CommitIsAncestorResponse, error) { return nil, nil } diff --git a/workhorse/main.go b/workhorse/main.go index 6e7b80bc8c66fbbd682c2d5a20fa152a5522bd13..123d21596e240d0a1320a444a93724ad446a7f90 100644 --- a/workhorse/main.go +++ b/workhorse/main.go @@ -18,6 +18,7 @@ import ( "gitlab.com/gitlab-org/labkit/tracing" "gitlab.com/gitlab-org/gitlab/workhorse/internal/config" + "gitlab.com/gitlab-org/gitlab/workhorse/internal/gitaly" "gitlab.com/gitlab-org/gitlab/workhorse/internal/queueing" "gitlab.com/gitlab-org/gitlab/workhorse/internal/redis" "gitlab.com/gitlab-org/gitlab/workhorse/internal/secret" @@ -233,6 +234,8 @@ func run(boot bootConfig, cfg config.Config) error { } defer accessCloser.Close() + gitaly.InitializeSidechannelRegistry(accessLogger) + up := wrapRaven(upstream.NewUpstream(cfg, accessLogger)) done := make(chan os.Signal, 1) diff --git a/workhorse/main_test.go b/workhorse/main_test.go index 6e61e2fc65afd246ae7619430d02ec0ab0fa2df8..435e1e0e15d77efe465abb5b9494354945228768 100644 --- a/workhorse/main_test.go +++ b/workhorse/main_test.go @@ -814,6 +814,8 @@ func startWorkhorseServerWithConfig(cfg *config.Config) *httptest.Server { testhelper.ConfigureSecret() u := upstream.NewUpstream(*cfg, logrus.StandardLogger()) + gitaly.InitializeSidechannelRegistry(logrus.StandardLogger()) + return httptest.NewServer(u) } @@ -834,6 +836,20 @@ func gitOkBody(t *testing.T) *api.Response { } } +func gitOkBodyWithSidechannel(t *testing.T) *api.Response { + return &api.Response{ + GL_ID: "user-123", + GL_USERNAME: "username", + Repository: gitalypb.Repository{ + StorageName: "default", + RelativePath: "foo/bar.git", + }, + GitalyServer: gitaly.Server{ + Sidechannel: true, + }, + } +} + func httpGet(t *testing.T, url string, headers map[string]string) (*http.Response, string) { req, err := http.NewRequest("GET", url, nil) require.NoError(t, err)