Skip to content

Commit

Permalink
Add internal-frontend role (#3706)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr authored Jan 11, 2023
1 parent c5254f3 commit 1a9695e
Show file tree
Hide file tree
Showing 28 changed files with 312 additions and 119 deletions.
8 changes: 0 additions & 8 deletions common/authorization/default_authorizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,6 @@ var resultDeny = Result{Decision: DecisionDeny}

func (a *defaultAuthorizer) Authorize(_ context.Context, claims *Claims, target *CallTarget) (Result, error) {

// TODO: This is a temporary workaround to allow calls to system namespace and
// calls with no namespace to pass through. When handling of mTLS data is added,
// we should remove "temporal-system" from here. Handling of call with
// no namespace will need to be performed at the API level, so that data would
// be filtered based of caller's permissions to namespaces and system.
if target.Namespace == "temporal-system" || target.Namespace == "" {
return resultAllow, nil
}
if claims == nil {
return resultDeny, nil
}
Expand Down
56 changes: 46 additions & 10 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package config

import (
"bytes"
"fmt"
"strings"
"time"

Expand All @@ -38,6 +39,7 @@ import (
"go.temporal.io/server/common/masker"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/common/telemetry"
)

Expand Down Expand Up @@ -111,9 +113,12 @@ type (

// RootTLS contains all TLS settings for the Temporal server
RootTLS struct {
// Internode controls backend service communication TLS settings.
// Internode controls backend service (history, matching, internal-frontend)
// communication TLS settings.
Internode GroupTLS `yaml:"internode"`
// Frontend controls SDK Client to Frontend communication TLS settings.
// Frontend controls frontend server TLS settings. To control system worker -> frontend
// TLS, use the SystemWorker field. (Frontend.Client is accepted for backwards
// compatibility.)
Frontend GroupTLS `yaml:"frontend"`
// SystemWorker controls TLS setting for System Workers connecting to Frontend.
SystemWorker WorkerTLS `yaml:"systemWorker"`
Expand Down Expand Up @@ -481,16 +486,30 @@ type (
S3ForcePathStyle bool `yaml:"s3ForcePathStyle"`
}

// PublicClient is config for internal nodes (history/matching/worker) connecting to
// temporal frontend. There are two methods of connecting:
// Explicit endpoint: Supply a host:port to connect to. This can resolve to multiple IPs,
// or a single IP that is a load-balancer.
// Membership resolver (new in 1.18): Leave this empty, and other nodes will use the
// membership service resolver to find the frontend.
// TODO: remove this and always use membership resolver
// PublicClient is the config for internal nodes (history/matching/worker) connecting to
// frontend. There are three methods of connecting:
// 1. Use membership to locate "internal-frontend" and connect to them using the Internode
// TLS config (which can be "no TLS"). This is recommended for deployments that use an
// Authorizer and ClaimMapper. To use this, leave this section out of your config, and
// make sure there is an "internal-frontend" section in Services.
// 2. Use membership to locate "frontend" and connect to them using the Frontend TLS config
// (which can be "no TLS"). This is recommended for deployments that don't use an
// Authorizer or ClaimMapper, or have implemented a custom ClaimMapper that correctly
// identifies the system worker using mTLS and assigns it an Admin-level claim.
// To use this, leave this section out of your config and make sure there is _no_
// "internal-frontend" section in Services.
// 3. Connect to an explicit endpoint using the SystemWorker (falling back to Frontend) TLS
// config (which can be "no TLS"). You can use this if you want to force frontend
// connections to go through an external load balancer. If you use this with a
// ClaimMapper+Authorizer, you need to ensure that your ClaimMapper assigns Admin
// claims to worker nodes, and your Authorizer correctly handles those claims.
PublicClient struct {
// HostPort is the host port to connect on. Host can be DNS name
// HostPort is the host port to connect on. Host can be DNS name. See the above
// comment: in many situations you can leave this empty.
HostPort string `yaml:"hostPort"`
// Force selection of either the "internode" or "frontend" TLS configs for these
// connections (only those two strings are valid).
ForceTLSConfig string `yaml:"forceTLSConfig"`
}

// NamespaceDefaults is the default config for each namespace
Expand Down Expand Up @@ -551,6 +570,12 @@ const (
ClusterMDStoreName DataStoreName = "ClusterMDStore"
)

const (
ForceTLSConfigAuto = ""
ForceTLSConfigInternode = "internode"
ForceTLSConfigFrontend = "frontend"
)

// Validate validates this config
func (c *Config) Validate() error {
if err := c.Persistence.Validate(); err != nil {
Expand All @@ -561,6 +586,17 @@ func (c *Config) Validate() error {
return err
}

_, hasIFE := c.Services[string(primitives.InternalFrontendService)]
if hasIFE && (c.PublicClient.HostPort != "" || c.PublicClient.ForceTLSConfig != "") {
return fmt.Errorf("when using internal-frontend, publicClient must be empty")
}

switch c.PublicClient.ForceTLSConfig {
case ForceTLSConfigAuto, ForceTLSConfigInternode, ForceTLSConfigFrontend:
default:
return fmt.Errorf("invalid value for publicClient.forceTLSConfig: %q", c.PublicClient.ForceTLSConfig)
}

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions common/membership/rpMonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ func ServiceNameToServiceTypeEnum(name primitives.ServiceName) (persistence.Serv
return persistence.All, nil
case primitives.FrontendService:
return persistence.Frontend, nil
case primitives.InternalFrontendService:
return persistence.InternalFrontend, nil
case primitives.HistoryService:
return persistence.History, nil
case primitives.MatchingService:
Expand Down
1 change: 1 addition & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1306,4 +1306,5 @@ const (
History
Matching
Worker
InternalFrontend
)
15 changes: 8 additions & 7 deletions common/primitives/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ type ServiceName string

// These constants represent service roles
const (
AllServices ServiceName = "all"
FrontendService ServiceName = "frontend"
HistoryService ServiceName = "history"
MatchingService ServiceName = "matching"
WorkerService ServiceName = "worker"
ServerService ServiceName = "server"
UnitTestService ServiceName = "unittest"
AllServices ServiceName = "all"
FrontendService ServiceName = "frontend"
InternalFrontendService ServiceName = "internal-frontend"
HistoryService ServiceName = "history"
MatchingService ServiceName = "matching"
WorkerService ServiceName = "worker"
ServerService ServiceName = "server"
UnitTestService ServiceName = "unittest"
)
80 changes: 60 additions & 20 deletions common/resource/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package resource

import (
"context"
"crypto/tls"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -93,7 +94,6 @@ type (
// See LifetimeHooksModule for detail
var Module = fx.Options(
persistenceClient.Module,
fx.Provide(SnTaggedLoggerProvider),
fx.Provide(HostNameProvider),
fx.Provide(TimeSourceProvider),
cluster.MetadataLifetimeHooksModule,
Expand Down Expand Up @@ -136,7 +136,7 @@ var DefaultOptions = fx.Options(
fx.Provide(DCRedirectionPolicyProvider),
)

func SnTaggedLoggerProvider(logger log.Logger, sn primitives.ServiceName) log.SnTaggedLogger {
func DefaultSnTaggedLoggerProvider(logger log.Logger, sn primitives.ServiceName) log.SnTaggedLogger {
return log.With(logger, tag.Service(sn))
}

Expand Down Expand Up @@ -395,19 +395,13 @@ func SdkClientFactoryProvider(
logger log.SnTaggedLogger,
resolver membership.GRPCResolver,
) (sdk.ClientFactory, error) {
tlsFrontendConfig, err := tlsConfigProvider.GetFrontendClientConfig()
frontendURL, frontendTLSConfig, err := getFrontendConnectionDetails(cfg, tlsConfigProvider, resolver)
if err != nil {
return nil, fmt.Errorf("unable to load frontend TLS configuration: %w", err)
}

hostPort := cfg.PublicClient.HostPort
if hostPort == "" {
hostPort = resolver.MakeURL(primitives.FrontendService)
return nil, err
}

return sdk.NewClientFactory(
hostPort,
tlsFrontendConfig,
frontendURL,
frontendTLSConfig,
metricsHandler,
logger,
), nil
Expand All @@ -426,24 +420,70 @@ func RPCFactoryProvider(
svcName primitives.ServiceName,
logger log.Logger,
tlsConfigProvider encryption.TLSConfigProvider,
dc *dynamicconfig.Collection,
resolver membership.GRPCResolver,
traceInterceptor telemetry.ClientTraceInterceptor,
) common.RPCFactory {
) (common.RPCFactory, error) {
svcCfg := cfg.Services[string(svcName)]
hostPort := cfg.PublicClient.HostPort
if hostPort == "" {
hostPort = resolver.MakeURL(primitives.FrontendService)
frontendURL, frontendTLSConfig, err := getFrontendConnectionDetails(cfg, tlsConfigProvider, resolver)
if err != nil {
return nil, err
}
return rpc.NewFactory(
&svcCfg.RPC,
svcName,
logger,
tlsConfigProvider,
dc,
hostPort,
frontendURL,
frontendTLSConfig,
[]grpc.UnaryClientInterceptor{
grpc.UnaryClientInterceptor(traceInterceptor),
},
)
), nil
}

func getFrontendConnectionDetails(
cfg *config.Config,
tlsConfigProvider encryption.TLSConfigProvider,
resolver membership.GRPCResolver,
) (string, *tls.Config, error) {
// To simplify the static config, we switch default values based on whether the config
// defines an "internal-frontend" service. The default for TLS config can be overridden
// with publicClient.forceTLSConfig, and the default for hostPort can be overridden by
// explicitly setting hostPort to "membership://internal-frontend" or
// "membership://frontend".
_, hasIFE := cfg.Services[string(primitives.InternalFrontendService)]

forceTLS := cfg.PublicClient.ForceTLSConfig
if forceTLS == config.ForceTLSConfigAuto {
if hasIFE {
forceTLS = config.ForceTLSConfigInternode
} else {
forceTLS = config.ForceTLSConfigFrontend
}
}

var frontendTLSConfig *tls.Config
var err error
switch forceTLS {
case config.ForceTLSConfigInternode:
frontendTLSConfig, err = tlsConfigProvider.GetInternodeClientConfig()
case config.ForceTLSConfigFrontend:
frontendTLSConfig, err = tlsConfigProvider.GetFrontendClientConfig()
default:
err = fmt.Errorf("invalid forceTLSConfig")
}
if err != nil {
return "", nil, fmt.Errorf("unable to load TLS configuration: %w", err)
}

frontendURL := cfg.PublicClient.HostPort
if frontendURL == "" {
if hasIFE {
frontendURL = resolver.MakeURL(primitives.InternalFrontendService)
} else {
frontendURL = resolver.MakeURL(primitives.FrontendService)
}
}

return frontendURL, frontendTLSConfig, nil
}
1 change: 1 addition & 0 deletions common/resourcetest/resourceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func NewTest(
historyServiceResolver := membership.NewMockServiceResolver(controller)
workerServiceResolver := membership.NewMockServiceResolver(controller)
membershipMonitor.EXPECT().GetResolver(primitives.FrontendService).Return(frontendServiceResolver, nil).AnyTimes()
membershipMonitor.EXPECT().GetResolver(primitives.InternalFrontendService).Return(nil, membership.ErrUnknownService).AnyTimes()
membershipMonitor.EXPECT().GetResolver(primitives.MatchingService).Return(matchingServiceResolver, nil).AnyTimes()
membershipMonitor.EXPECT().GetResolver(primitives.HistoryService).Return(historyServiceResolver, nil).AnyTimes()
membershipMonitor.EXPECT().GetResolver(primitives.WorkerService).Return(workerServiceResolver, nil).AnyTimes()
Expand Down
24 changes: 7 additions & 17 deletions common/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"go.temporal.io/server/common"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/convert"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/primitives"
Expand All @@ -49,8 +48,9 @@ type RPCFactory struct {
config *config.RPC
serviceName primitives.ServiceName
logger log.Logger
dc *dynamicconfig.Collection
frontendURL string

frontendURL string
frontendTLSConfig *tls.Config

initListener sync.Once
grpcListener net.Listener
Expand All @@ -65,16 +65,16 @@ func NewFactory(
sName primitives.ServiceName,
logger log.Logger,
tlsProvider encryption.TLSConfigProvider,
dc *dynamicconfig.Collection,
frontendURL string,
frontendTLSConfig *tls.Config,
clientInterceptors []grpc.UnaryClientInterceptor,
) *RPCFactory {
return &RPCFactory{
config: cfg,
serviceName: sName,
logger: logger,
dc: dc,
frontendURL: frontendURL,
frontendTLSConfig: frontendTLSConfig,
tlsFactory: tlsProvider,
clientInterceptors: clientInterceptors,
}
Expand Down Expand Up @@ -201,19 +201,9 @@ func (d *RPCFactory) CreateRemoteFrontendGRPCConnection(rpcAddress string) *grpc
return d.dial(rpcAddress, tlsClientConfig)
}

// CreateLocalFrontendGRPCConnection creates connection for internal calls
// CreateLocalFrontendGRPCConnection creates connection for internal frontend calls
func (d *RPCFactory) CreateLocalFrontendGRPCConnection() *grpc.ClientConn {
var tlsClientConfig *tls.Config
var err error
if d.tlsFactory != nil {
tlsClientConfig, err = d.tlsFactory.GetFrontendClientConfig()
if err != nil {
d.logger.Fatal("Failed to create tls config for gRPC connection", tag.Error(err))
return nil
}
}

return d.dial(d.frontendURL, tlsClientConfig)
return d.dial(d.frontendURL, d.frontendTLSConfig)
}

// CreateInternodeGRPCConnection creates connection for gRPC calls
Expand Down
Loading

0 comments on commit 1a9695e

Please sign in to comment.