Skip to content
代码片段 群组 项目
未验证 提交 8c4bec83 编辑于 作者: Robert Burke's avatar Robert Burke 提交者: GitHub
浏览文件

[prism] Support AnyOf in Prism. (#33705)

上级 7daccb50
No related branches found
No related tags found
无相关合并请求
......@@ -82,6 +82,8 @@
* Initial support for AllowedLateness added. ([#33542](https://github.com/apache/beam/pull/33542))
* The Go SDK's inprocess Prism runner (AKA the Go SDK default runner) now supports non-loopback mode environment types. ([#33572](https://github.com/apache/beam/pull/33572))
* Support the Process Environment for execution in Prism ([#33651](https://github.com/apache/beam/pull/33651))
* Support the AnyOf Environment for execution in Prism ([#33705](https://github.com/apache/beam/pull/33705))
* This improves support for developing Xlang pipelines, when using a compatible cross language service.
## Breaking Changes
......
......@@ -23,6 +23,7 @@ import (
"log/slog"
"os"
"os/exec"
"slices"
"time"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
......@@ -46,16 +47,26 @@ import (
func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *worker.W) error {
logger := j.Logger.With(slog.String("envID", wk.Env))
// TODO fix broken abstraction.
// We're starting a worker pool here, because that's the loopback environment.
// It's sort of a mess, largely because of loopback, which has
// a different flow from a provisioned docker container.
e := j.Pipeline.GetComponents().GetEnvironments()[env]
if e.GetUrn() == urns.EnvAnyOf {
// We've been given a choice!
ap := &pipepb.AnyOfEnvironmentPayload{}
if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), ap); err != nil {
logger.Error("unmarshaling any environment payload", "error", err)
return err
}
e = selectAnyOfEnv(ap)
logger.Info("AnyEnv resolved", "selectedUrn", e.GetUrn(), "worker", wk.ID)
// Process the environment as normal.
}
switch e.GetUrn() {
case urns.EnvExternal:
ep := &pipepb.ExternalPayload{}
if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), ep); err != nil {
logger.Error("unmarshing external environment payload", "error", err)
logger.Error("unmarshaling external environment payload", "error", err)
return err
}
go func() {
externalEnvironment(ctx, ep, wk)
......@@ -65,13 +76,15 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor
case urns.EnvDocker:
dp := &pipepb.DockerPayload{}
if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), dp); err != nil {
logger.Error("unmarshing docker environment payload", "error", err)
logger.Error("unmarshaling docker environment payload", "error", err)
return err
}
return dockerEnvironment(ctx, logger, dp, wk, j.ArtifactEndpoint())
case urns.EnvProcess:
pp := &pipepb.ProcessPayload{}
if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), pp); err != nil {
logger.Error("unmarshing docker environment payload", "error", err)
logger.Error("unmarshaling process environment payload", "error", err)
return err
}
go func() {
processEnvironment(ctx, pp, wk)
......@@ -83,6 +96,33 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor
}
}
func selectAnyOfEnv(ap *pipepb.AnyOfEnvironmentPayload) *pipepb.Environment {
// Prefer external, then process, then docker, unknown environments are 0.
ranks := map[string]int{
urns.EnvDocker: 1,
urns.EnvProcess: 5,
urns.EnvExternal: 10,
}
envs := ap.GetEnvironments()
slices.SortStableFunc(envs, func(a, b *pipepb.Environment) int {
rankA := ranks[a.GetUrn()]
rankB := ranks[b.GetUrn()]
// Reverse the comparison so our favourite is at the front
switch {
case rankA > rankB:
return -1 // Usually "greater than" would be 1
case rankA < rankB:
return 1
}
return 0
})
// Pick our favourite.
return envs[0]
}
func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *worker.W) {
conn, err := grpc.Dial(ep.GetEndpoint().GetUrl(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
......
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"testing"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
)
func TestSelectAnyOf(t *testing.T) {
tests := []struct {
name, want string
wantTag string
envs []*pipepb.Environment
}{
{name: "singleDefault", want: urns.EnvDefault, envs: []*pipepb.Environment{{Urn: urns.EnvDefault}}},
{name: "singleDocker", want: urns.EnvDocker, envs: []*pipepb.Environment{{Urn: urns.EnvDocker}}},
{name: "singleProcess", want: urns.EnvProcess, envs: []*pipepb.Environment{{Urn: urns.EnvProcess}}},
{name: "singleExternal", want: urns.EnvExternal, envs: []*pipepb.Environment{{Urn: urns.EnvExternal}}},
{name: "multiplePickExternal_1", want: urns.EnvExternal, envs: []*pipepb.Environment{{Urn: urns.EnvExternal}, {Urn: urns.EnvDocker}, {Urn: urns.EnvProcess}}},
{name: "multiplePickExternal_2", want: urns.EnvExternal, envs: []*pipepb.Environment{{Urn: urns.EnvDocker}, {Urn: urns.EnvProcess}, {Urn: urns.EnvExternal}}},
{name: "multiplePickProcess", want: urns.EnvProcess, envs: []*pipepb.Environment{{Urn: urns.EnvDocker}, {Urn: urns.EnvProcess}}},
{name: "multiplePickDocker", want: urns.EnvDocker, envs: []*pipepb.Environment{{Urn: urns.EnvDefault}, {Urn: urns.EnvDocker}}},
{name: "multiplePickFirstExternal", want: urns.EnvExternal, wantTag: "first", envs: []*pipepb.Environment{{Urn: urns.EnvExternal, Payload: []byte("first")}, {Urn: urns.EnvExternal, Payload: []byte("second")}}},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
selected := selectAnyOfEnv(&pipepb.AnyOfEnvironmentPayload{Environments: test.envs})
if selected.GetUrn() != test.want {
t.Errorf("expected %v, got %v", test.want, selected.GetUrn())
}
if got, want := string(selected.GetPayload()), test.wantTag; got != want {
t.Errorf("expected payload with tag %v, got %v", want, got)
}
})
}
}
......@@ -147,4 +147,5 @@ var (
EnvProcess = envUrn(pipepb.StandardEnvironments_PROCESS)
EnvExternal = envUrn(pipepb.StandardEnvironments_EXTERNAL)
EnvDefault = envUrn(pipepb.StandardEnvironments_DEFAULT)
EnvAnyOf = envUrn(pipepb.StandardEnvironments_ANYOF)
)
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册