Skip to content
代码片段 群组 项目
未验证 提交 62b33a5b 编辑于 作者: Tianyang Hu's avatar Tianyang Hu 提交者: GitHub
浏览文件

Change sql.Options to an interface under sqlx. (#15790)

The Options interface isn't supposed to be exposed to end users. This can also avoid potential circular dependencies.
上级 84e24ea0
No related branches found
No related tags found
无相关合并请求
......@@ -32,10 +32,10 @@ import (
)
// Option is the base type of all the SQL transform options.
type Option func(*Options)
type Option func(sqlx.Options)
// Options contain all the options for a SQL transform.
type Options struct {
// options contain all the options for a SQL transform.
type options struct {
dialect string
expansionAddr string
inputs map[string]beam.PCollection
......@@ -43,15 +43,14 @@ type Options struct {
customs []sqlx.Option
}
// Add adds a custom option.
func (o *Options) Add(opt sqlx.Option) {
func (o *options) Add(opt sqlx.Option) {
o.customs = append(o.customs, opt)
}
// Input adds a named PCollection input to the transform.
func Input(name string, in beam.PCollection) Option {
return func(o *Options) {
o.inputs[name] = in
return func(o sqlx.Options) {
o.(*options).inputs[name] = in
}
}
......@@ -61,22 +60,22 @@ func Input(name string, in beam.PCollection) Option {
// There is currently no default output type, so users must set this option.
// In the future, Row, once implemented, may become the default output type.
func OutputType(t reflect.Type) Option {
return func(o *Options) {
o.outType = typex.New(t)
return func(o sqlx.Options) {
o.(*options).outType = typex.New(t)
}
}
// Dialect specifies the SQL dialect, e.g. use 'zetasql' for ZetaSQL.
func Dialect(dialect string) Option {
return func(o *Options) {
o.dialect = dialect
return func(o sqlx.Options) {
o.(*options).dialect = dialect
}
}
// ExpansionAddr is the URL of the expansion service to use.
func ExpansionAddr(addr string) Option {
return func(o *Options) {
o.expansionAddr = addr
return func(o sqlx.Options) {
o.(*options).expansionAddr = addr
}
}
......@@ -96,27 +95,27 @@ func ExpansionAddr(addr string) Option {
// sql.OutputType(reflect.TypeOf(int64(0))))
// // `out` is a PCollection<int64> with a single element 3.
func Transform(s beam.Scope, query string, opts ...Option) beam.PCollection {
options := &Options{
o := &options{
inputs: make(map[string]beam.PCollection),
}
for _, opt := range opts {
opt(options)
opt(o)
}
if options.outType == nil {
if o.outType == nil {
panic("output type must be specified for sql.Transform")
}
payload := beam.CrossLanguagePayload(&sqlx.ExpansionPayload{
Query: query,
Dialect: options.dialect,
Options: options.customs,
Dialect: o.dialect,
Options: o.customs,
})
expansionAddr := sqlx.DefaultExpansionAddr
if options.expansionAddr != "" {
expansionAddr = xlangx.Require(options.expansionAddr)
if o.expansionAddr != "" {
expansionAddr = xlangx.Require(o.expansionAddr)
}
out := beam.CrossLanguage(s, sqlx.Urn, payload, expansionAddr, options.inputs, beam.UnnamedOutput(options.outType))
out := beam.CrossLanguage(s, sqlx.Urn, payload, expansionAddr, o.inputs, beam.UnnamedOutput(o.outType))
return out[graph.UnnamedOutputTag]
}
......@@ -22,13 +22,21 @@
package sqlx
const (
// Urn is the URN for SQL transforms.
Urn = "beam:external:java:sql:v1"
// DefaultExpansionAddr is the default expansion service address for SQL.
// TODO: Change this to the Beam Java expansion address once Beam SQL
// is implemented in Beam Go.
DefaultExpansionAddr = "undefined"
)
// Options is the interface for adding SQL transform options.
type Options interface {
// Add adds a custom option.
Add(opt Option)
}
// Option represents a custom SQL transform option. The option provider is
// responsible for marshaling and unmarshaling the option.
type Option struct {
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册