diff --git a/common/authorization/default_authorizer.go b/common/authorization/default_authorizer.go index 28454004bcc..6ed2b8fd53b 100644 --- a/common/authorization/default_authorizer.go +++ b/common/authorization/default_authorizer.go @@ -46,14 +46,6 @@ var resultDeny = Result{Decision: DecisionDeny} func (a *defaultAuthorizer) Authorize(_ context.Context, claims *Claims, target *CallTarget) (Result, error) { - // TODO: This is a temporary workaround to allow calls to system namespace and - // calls with no namespace to pass through. When handling of mTLS data is added, - // we should remove "temporal-system" from here. Handling of call with - // no namespace will need to be performed at the API level, so that data would - // be filtered based of caller's permissions to namespaces and system. - if target.Namespace == "temporal-system" || target.Namespace == "" { - return resultAllow, nil - } if claims == nil { return resultDeny, nil } diff --git a/common/config/config.go b/common/config/config.go index 2fcce3ae8e4..c6485990417 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -26,6 +26,7 @@ package config import ( "bytes" + "fmt" "strings" "time" @@ -38,6 +39,7 @@ import ( "go.temporal.io/server/common/masker" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client" + "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/telemetry" ) @@ -111,9 +113,12 @@ type ( // RootTLS contains all TLS settings for the Temporal server RootTLS struct { - // Internode controls backend service communication TLS settings. + // Internode controls backend service (history, matching, internal-frontend) + // communication TLS settings. Internode GroupTLS `yaml:"internode"` - // Frontend controls SDK Client to Frontend communication TLS settings. + // Frontend controls frontend server TLS settings. To control system worker -> frontend + // TLS, use the SystemWorker field. (Frontend.Client is accepted for backwards + // compatibility.) Frontend GroupTLS `yaml:"frontend"` // SystemWorker controls TLS setting for System Workers connecting to Frontend. SystemWorker WorkerTLS `yaml:"systemWorker"` @@ -481,16 +486,30 @@ type ( S3ForcePathStyle bool `yaml:"s3ForcePathStyle"` } - // PublicClient is config for internal nodes (history/matching/worker) connecting to - // temporal frontend. There are two methods of connecting: - // Explicit endpoint: Supply a host:port to connect to. This can resolve to multiple IPs, - // or a single IP that is a load-balancer. - // Membership resolver (new in 1.18): Leave this empty, and other nodes will use the - // membership service resolver to find the frontend. - // TODO: remove this and always use membership resolver + // PublicClient is the config for internal nodes (history/matching/worker) connecting to + // frontend. There are three methods of connecting: + // 1. Use membership to locate "internal-frontend" and connect to them using the Internode + // TLS config (which can be "no TLS"). This is recommended for deployments that use an + // Authorizer and ClaimMapper. To use this, leave this section out of your config, and + // make sure there is an "internal-frontend" section in Services. + // 2. Use membership to locate "frontend" and connect to them using the Frontend TLS config + // (which can be "no TLS"). This is recommended for deployments that don't use an + // Authorizer or ClaimMapper, or have implemented a custom ClaimMapper that correctly + // identifies the system worker using mTLS and assigns it an Admin-level claim. + // To use this, leave this section out of your config and make sure there is _no_ + // "internal-frontend" section in Services. + // 3. Connect to an explicit endpoint using the SystemWorker (falling back to Frontend) TLS + // config (which can be "no TLS"). You can use this if you want to force frontend + // connections to go through an external load balancer. If you use this with a + // ClaimMapper+Authorizer, you need to ensure that your ClaimMapper assigns Admin + // claims to worker nodes, and your Authorizer correctly handles those claims. PublicClient struct { - // HostPort is the host port to connect on. Host can be DNS name + // HostPort is the host port to connect on. Host can be DNS name. See the above + // comment: in many situations you can leave this empty. HostPort string `yaml:"hostPort"` + // Force selection of either the "internode" or "frontend" TLS configs for these + // connections (only those two strings are valid). + ForceTLSConfig string `yaml:"forceTLSConfig"` } // NamespaceDefaults is the default config for each namespace @@ -551,6 +570,12 @@ const ( ClusterMDStoreName DataStoreName = "ClusterMDStore" ) +const ( + ForceTLSConfigAuto = "" + ForceTLSConfigInternode = "internode" + ForceTLSConfigFrontend = "frontend" +) + // Validate validates this config func (c *Config) Validate() error { if err := c.Persistence.Validate(); err != nil { @@ -561,6 +586,17 @@ func (c *Config) Validate() error { return err } + _, hasIFE := c.Services[string(primitives.InternalFrontendService)] + if hasIFE && (c.PublicClient.HostPort != "" || c.PublicClient.ForceTLSConfig != "") { + return fmt.Errorf("when using internal-frontend, publicClient must be empty") + } + + switch c.PublicClient.ForceTLSConfig { + case ForceTLSConfigAuto, ForceTLSConfigInternode, ForceTLSConfigFrontend: + default: + return fmt.Errorf("invalid value for publicClient.forceTLSConfig: %q", c.PublicClient.ForceTLSConfig) + } + return nil } diff --git a/common/membership/rpMonitor.go b/common/membership/rpMonitor.go index c788da387bf..ec2890e9f7e 100644 --- a/common/membership/rpMonitor.go +++ b/common/membership/rpMonitor.go @@ -160,6 +160,8 @@ func ServiceNameToServiceTypeEnum(name primitives.ServiceName) (persistence.Serv return persistence.All, nil case primitives.FrontendService: return persistence.Frontend, nil + case primitives.InternalFrontendService: + return persistence.InternalFrontend, nil case primitives.HistoryService: return persistence.History, nil case primitives.MatchingService: diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 2b49b6aef55..52f4cc81e0d 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -1306,4 +1306,5 @@ const ( History Matching Worker + InternalFrontend ) diff --git a/common/primitives/role.go b/common/primitives/role.go index 0800b7c022c..c143d22e747 100644 --- a/common/primitives/role.go +++ b/common/primitives/role.go @@ -28,11 +28,12 @@ type ServiceName string // These constants represent service roles const ( - AllServices ServiceName = "all" - FrontendService ServiceName = "frontend" - HistoryService ServiceName = "history" - MatchingService ServiceName = "matching" - WorkerService ServiceName = "worker" - ServerService ServiceName = "server" - UnitTestService ServiceName = "unittest" + AllServices ServiceName = "all" + FrontendService ServiceName = "frontend" + InternalFrontendService ServiceName = "internal-frontend" + HistoryService ServiceName = "history" + MatchingService ServiceName = "matching" + WorkerService ServiceName = "worker" + ServerService ServiceName = "server" + UnitTestService ServiceName = "unittest" ) diff --git a/common/resource/fx.go b/common/resource/fx.go index 1008a8eec75..f2b88928a4e 100644 --- a/common/resource/fx.go +++ b/common/resource/fx.go @@ -26,6 +26,7 @@ package resource import ( "context" + "crypto/tls" "fmt" "net" "os" @@ -93,7 +94,6 @@ type ( // See LifetimeHooksModule for detail var Module = fx.Options( persistenceClient.Module, - fx.Provide(SnTaggedLoggerProvider), fx.Provide(HostNameProvider), fx.Provide(TimeSourceProvider), cluster.MetadataLifetimeHooksModule, @@ -136,7 +136,7 @@ var DefaultOptions = fx.Options( fx.Provide(DCRedirectionPolicyProvider), ) -func SnTaggedLoggerProvider(logger log.Logger, sn primitives.ServiceName) log.SnTaggedLogger { +func DefaultSnTaggedLoggerProvider(logger log.Logger, sn primitives.ServiceName) log.SnTaggedLogger { return log.With(logger, tag.Service(sn)) } @@ -395,19 +395,13 @@ func SdkClientFactoryProvider( logger log.SnTaggedLogger, resolver membership.GRPCResolver, ) (sdk.ClientFactory, error) { - tlsFrontendConfig, err := tlsConfigProvider.GetFrontendClientConfig() + frontendURL, frontendTLSConfig, err := getFrontendConnectionDetails(cfg, tlsConfigProvider, resolver) if err != nil { - return nil, fmt.Errorf("unable to load frontend TLS configuration: %w", err) - } - - hostPort := cfg.PublicClient.HostPort - if hostPort == "" { - hostPort = resolver.MakeURL(primitives.FrontendService) + return nil, err } - return sdk.NewClientFactory( - hostPort, - tlsFrontendConfig, + frontendURL, + frontendTLSConfig, metricsHandler, logger, ), nil @@ -426,24 +420,70 @@ func RPCFactoryProvider( svcName primitives.ServiceName, logger log.Logger, tlsConfigProvider encryption.TLSConfigProvider, - dc *dynamicconfig.Collection, resolver membership.GRPCResolver, traceInterceptor telemetry.ClientTraceInterceptor, -) common.RPCFactory { +) (common.RPCFactory, error) { svcCfg := cfg.Services[string(svcName)] - hostPort := cfg.PublicClient.HostPort - if hostPort == "" { - hostPort = resolver.MakeURL(primitives.FrontendService) + frontendURL, frontendTLSConfig, err := getFrontendConnectionDetails(cfg, tlsConfigProvider, resolver) + if err != nil { + return nil, err } return rpc.NewFactory( &svcCfg.RPC, svcName, logger, tlsConfigProvider, - dc, - hostPort, + frontendURL, + frontendTLSConfig, []grpc.UnaryClientInterceptor{ grpc.UnaryClientInterceptor(traceInterceptor), }, - ) + ), nil +} + +func getFrontendConnectionDetails( + cfg *config.Config, + tlsConfigProvider encryption.TLSConfigProvider, + resolver membership.GRPCResolver, +) (string, *tls.Config, error) { + // To simplify the static config, we switch default values based on whether the config + // defines an "internal-frontend" service. The default for TLS config can be overridden + // with publicClient.forceTLSConfig, and the default for hostPort can be overridden by + // explicitly setting hostPort to "membership://internal-frontend" or + // "membership://frontend". + _, hasIFE := cfg.Services[string(primitives.InternalFrontendService)] + + forceTLS := cfg.PublicClient.ForceTLSConfig + if forceTLS == config.ForceTLSConfigAuto { + if hasIFE { + forceTLS = config.ForceTLSConfigInternode + } else { + forceTLS = config.ForceTLSConfigFrontend + } + } + + var frontendTLSConfig *tls.Config + var err error + switch forceTLS { + case config.ForceTLSConfigInternode: + frontendTLSConfig, err = tlsConfigProvider.GetInternodeClientConfig() + case config.ForceTLSConfigFrontend: + frontendTLSConfig, err = tlsConfigProvider.GetFrontendClientConfig() + default: + err = fmt.Errorf("invalid forceTLSConfig") + } + if err != nil { + return "", nil, fmt.Errorf("unable to load TLS configuration: %w", err) + } + + frontendURL := cfg.PublicClient.HostPort + if frontendURL == "" { + if hasIFE { + frontendURL = resolver.MakeURL(primitives.InternalFrontendService) + } else { + frontendURL = resolver.MakeURL(primitives.FrontendService) + } + } + + return frontendURL, frontendTLSConfig, nil } diff --git a/common/resourcetest/resourceTest.go b/common/resourcetest/resourceTest.go index b8850e72f70..93eae2f6784 100644 --- a/common/resourcetest/resourceTest.go +++ b/common/resourcetest/resourceTest.go @@ -160,6 +160,7 @@ func NewTest( historyServiceResolver := membership.NewMockServiceResolver(controller) workerServiceResolver := membership.NewMockServiceResolver(controller) membershipMonitor.EXPECT().GetResolver(primitives.FrontendService).Return(frontendServiceResolver, nil).AnyTimes() + membershipMonitor.EXPECT().GetResolver(primitives.InternalFrontendService).Return(nil, membership.ErrUnknownService).AnyTimes() membershipMonitor.EXPECT().GetResolver(primitives.MatchingService).Return(matchingServiceResolver, nil).AnyTimes() membershipMonitor.EXPECT().GetResolver(primitives.HistoryService).Return(historyServiceResolver, nil).AnyTimes() membershipMonitor.EXPECT().GetResolver(primitives.WorkerService).Return(workerServiceResolver, nil).AnyTimes() diff --git a/common/rpc/rpc.go b/common/rpc/rpc.go index 0186ad39359..abb6b96b071 100644 --- a/common/rpc/rpc.go +++ b/common/rpc/rpc.go @@ -35,7 +35,6 @@ import ( "go.temporal.io/server/common" "go.temporal.io/server/common/config" "go.temporal.io/server/common/convert" - "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/primitives" @@ -49,8 +48,9 @@ type RPCFactory struct { config *config.RPC serviceName primitives.ServiceName logger log.Logger - dc *dynamicconfig.Collection - frontendURL string + + frontendURL string + frontendTLSConfig *tls.Config initListener sync.Once grpcListener net.Listener @@ -65,16 +65,16 @@ func NewFactory( sName primitives.ServiceName, logger log.Logger, tlsProvider encryption.TLSConfigProvider, - dc *dynamicconfig.Collection, frontendURL string, + frontendTLSConfig *tls.Config, clientInterceptors []grpc.UnaryClientInterceptor, ) *RPCFactory { return &RPCFactory{ config: cfg, serviceName: sName, logger: logger, - dc: dc, frontendURL: frontendURL, + frontendTLSConfig: frontendTLSConfig, tlsFactory: tlsProvider, clientInterceptors: clientInterceptors, } @@ -201,19 +201,9 @@ func (d *RPCFactory) CreateRemoteFrontendGRPCConnection(rpcAddress string) *grpc return d.dial(rpcAddress, tlsClientConfig) } -// CreateLocalFrontendGRPCConnection creates connection for internal calls +// CreateLocalFrontendGRPCConnection creates connection for internal frontend calls func (d *RPCFactory) CreateLocalFrontendGRPCConnection() *grpc.ClientConn { - var tlsClientConfig *tls.Config - var err error - if d.tlsFactory != nil { - tlsClientConfig, err = d.tlsFactory.GetFrontendClientConfig() - if err != nil { - d.logger.Fatal("Failed to create tls config for gRPC connection", tag.Error(err)) - return nil - } - } - - return d.dial(d.frontendURL, tlsClientConfig) + return d.dial(d.frontendURL, d.frontendTLSConfig) } // CreateInternodeGRPCConnection creates connection for gRPC calls diff --git a/common/rpc/test/rpc_localstore_tls_test.go b/common/rpc/test/rpc_localstore_tls_test.go index 65f12b0e746..31dc09b01f6 100644 --- a/common/rpc/test/rpc_localstore_tls_test.go +++ b/common/rpc/test/rpc_localstore_tls_test.go @@ -38,7 +38,6 @@ import ( "google.golang.org/grpc/credentials" "go.temporal.io/server/common/config" - "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/rpc" @@ -136,7 +135,7 @@ func (s *localStoreRPCSuite) SetupSuite() { provider, err := encryption.NewTLSConfigProviderFromConfig(serverCfgInsecure.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - insecureFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), frontendURL, noExtraInterceptors) + insecureFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, frontendURL, nil, noExtraInterceptors) s.NotNil(insecureFactory) s.insecureRPCFactory = i(insecureFactory) @@ -344,22 +343,28 @@ func (s *localStoreRPCSuite) setupFrontend() { provider, err := encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - frontendMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), frontendURL, noExtraInterceptors) + tlsConfig, err := provider.GetFrontendClientConfig() + s.NoError(err) + frontendMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, frontendURL, tlsConfig, noExtraInterceptors) s.NotNil(frontendMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - frontendServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), frontendURL, noExtraInterceptors) + frontendServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, frontendURL, nil, noExtraInterceptors) s.NotNil(frontendServerTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSSystemWorker.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), frontendURL, noExtraInterceptors) + tlsConfig, err = provider.GetFrontendClientConfig() + s.NoError(err) + frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, frontendURL, tlsConfig, noExtraInterceptors) s.NotNil(frontendSystemWorkerMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - frontendMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), frontendURL, noExtraInterceptors) + tlsConfig, err = provider.GetFrontendClientConfig() + s.NoError(err) + frontendMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, frontendURL, tlsConfig, noExtraInterceptors) s.NotNil(frontendMutualTLSRefreshFactory) s.frontendMutualTLSRPCFactory = f(frontendMutualTLSFactory) @@ -374,7 +379,9 @@ func (s *localStoreRPCSuite) setupFrontend() { s.dynamicCACertPool, s.wrongCACertPool) s.NoError(err) - dynamicServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, s.dynamicConfigProvider, dynamicconfig.NewNoopCollection(), frontendURL, noExtraInterceptors) + tlsConfig, err = s.dynamicConfigProvider.GetFrontendClientConfig() + s.NoError(err) + dynamicServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, s.dynamicConfigProvider, frontendURL, tlsConfig, noExtraInterceptors) s.frontendDynamicTLSFactory = f(dynamicServerTLSFactory) s.internodeDynamicTLSFactory = i(dynamicServerTLSFactory) @@ -382,13 +389,17 @@ func (s *localStoreRPCSuite) setupFrontend() { provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreRootCAForceTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - frontendRootCAForceTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), frontendURL, noExtraInterceptors) + tlsConfig, err = provider.GetFrontendClientConfig() + s.NoError(err) + frontendRootCAForceTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, frontendURL, tlsConfig, noExtraInterceptors) s.NotNil(frontendServerTLSFactory) s.frontendConfigRootCAForceTLSFactory = f(frontendRootCAForceTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSRemoteCluster.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - remoteClusterMutualTLSRPCFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), frontendURL, noExtraInterceptors) + tlsConfig, err = provider.GetFrontendClientConfig() + s.NoError(err) + remoteClusterMutualTLSRPCFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, frontendURL, tlsConfig, noExtraInterceptors) s.NotNil(remoteClusterMutualTLSRPCFactory) s.remoteClusterMutualTLSRPCFactory = r(remoteClusterMutualTLSRPCFactory) } @@ -424,22 +435,30 @@ func (s *localStoreRPCSuite) setupInternode() { provider, err := encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - internodeMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), frontendURL, noExtraInterceptors) + tlsConfig, err := provider.GetFrontendClientConfig() + s.NoError(err) + internodeMutualTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, frontendURL, tlsConfig, noExtraInterceptors) s.NotNil(internodeMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - internodeServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), frontendURL, noExtraInterceptors) + tlsConfig, err = provider.GetFrontendClientConfig() + s.NoError(err) + internodeServerTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, frontendURL, tlsConfig, noExtraInterceptors) s.NotNil(internodeServerTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreAltMutualTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - internodeMutualAltTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), frontendURL, noExtraInterceptors) + tlsConfig, err = provider.GetFrontendClientConfig() + s.NoError(err) + internodeMutualAltTLSFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, frontendURL, tlsConfig, noExtraInterceptors) s.NotNil(internodeMutualAltTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - internodeMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, dynamicconfig.NewNoopCollection(), frontendURL, noExtraInterceptors) + tlsConfig, err = provider.GetFrontendClientConfig() + s.NoError(err) + internodeMutualTLSRefreshFactory := rpc.NewFactory(rpcTestCfgDefault, "tester", s.logger, provider, frontendURL, tlsConfig, noExtraInterceptors) s.NotNil(internodeMutualTLSRefreshFactory) s.internodeMutualTLSRPCFactory = i(internodeMutualTLSFactory) diff --git a/config/development-active.yaml b/config/development-active.yaml index 6cde02785b9..0130d13e769 100644 --- a/config/development-active.yaml +++ b/config/development-active.yaml @@ -44,6 +44,12 @@ services: membershipPort: 6933 bindOnLocalHost: true + internal-frontend: + rpc: + grpcPort: 7236 + membershipPort: 6936 + bindOnLocalHost: true + matching: rpc: grpcPort: 7235 @@ -117,9 +123,6 @@ namespaceDefaults: state: "disabled" URI: "file:///tmp/temporal_vis_archival/development" -publicClient: - hostPort: "localhost:7233" - dynamicConfigClient: filepath: "config/dynamicconfig/development-cass.yaml" pollInterval: "10s" diff --git a/config/development-cass-archival.yaml b/config/development-cass-archival.yaml index f623495062f..a81486f7cb6 100644 --- a/config/development-cass-archival.yaml +++ b/config/development-cass-archival.yaml @@ -36,6 +36,12 @@ services: membershipPort: 6933 bindOnLocalHost: true + internal-frontend: + rpc: + grpcPort: 7236 + membershipPort: 6936 + bindOnLocalHost: true + matching: rpc: grpcPort: 7235 @@ -97,9 +103,6 @@ namespaceDefaults: state: "enabled" URI: "file:///tmp/temporal_vis_archival/development" -publicClient: - hostPort: "localhost:7233" - dynamicConfigClient: filepath: "./config/dynamicconfig/development-cass.yaml" pollInterval: "10s" diff --git a/config/development-cass-es.yaml b/config/development-cass-es.yaml index 95ba894d987..a71f8907a0f 100644 --- a/config/development-cass-es.yaml +++ b/config/development-cass-es.yaml @@ -44,6 +44,12 @@ services: membershipPort: 6933 bindOnLocalHost: true + internal-frontend: + rpc: + grpcPort: 7236 + membershipPort: 6936 + bindOnLocalHost: true + matching: rpc: grpcPort: 7235 @@ -105,9 +111,6 @@ namespaceDefaults: state: "disabled" URI: "file:///tmp/temporal_vis_archival/development" -publicClient: - hostPort: "localhost:7233" - dynamicConfigClient: filepath: "config/dynamicconfig/development-cass.yaml" pollInterval: "10s" diff --git a/config/development-cass.yaml b/config/development-cass.yaml index 43710853b5d..cad52c1c256 100644 --- a/config/development-cass.yaml +++ b/config/development-cass.yaml @@ -60,6 +60,12 @@ services: membershipPort: 6933 bindOnLocalHost: true + internal-frontend: + rpc: + grpcPort: 7236 + membershipPort: 6936 + bindOnLocalHost: true + matching: rpc: grpcPort: 7235 @@ -121,9 +127,6 @@ namespaceDefaults: state: "disabled" URI: "file:///tmp/temporal_vis_archival/development" -publicClient: - hostPort: "localhost:7233" - dynamicConfigClient: filepath: "./config/dynamicconfig/development-cass.yaml" pollInterval: "10s" diff --git a/config/development-mysql-es.yaml b/config/development-mysql-es.yaml index dee3d793ad4..1e625791340 100644 --- a/config/development-mysql-es.yaml +++ b/config/development-mysql-es.yaml @@ -50,6 +50,12 @@ services: membershipPort: 6933 bindOnLocalHost: true + internal-frontend: + rpc: + grpcPort: 7236 + membershipPort: 6936 + bindOnLocalHost: true + matching: rpc: grpcPort: 7235 @@ -111,9 +117,6 @@ namespaceDefaults: state: "disabled" URI: "file:///tmp/temporal_vis_archival/development" -publicClient: - hostPort: "localhost:7233" - dynamicConfigClient: filepath: "config/dynamicconfig/development-sql.yaml" pollInterval: "10s" diff --git a/config/development-mysql.yaml b/config/development-mysql.yaml index c8297886efd..9063247cac4 100644 --- a/config/development-mysql.yaml +++ b/config/development-mysql.yaml @@ -51,6 +51,12 @@ services: membershipPort: 6933 bindOnLocalHost: true + internal-frontend: + rpc: + grpcPort: 7236 + membershipPort: 6936 + bindOnLocalHost: true + matching: rpc: grpcPort: 7235 @@ -112,9 +118,6 @@ namespaceDefaults: state: "disabled" URI: "file:///tmp/temporal_vis_archival/development" -publicClient: - hostPort: "localhost:7233" - dynamicConfigClient: filepath: "config/dynamicconfig/development-sql.yaml" pollInterval: "10s" diff --git a/config/development-other.yaml b/config/development-other.yaml index 0ce203f0873..87b5da81b0e 100644 --- a/config/development-other.yaml +++ b/config/development-other.yaml @@ -44,6 +44,12 @@ services: membershipPort: 9933 bindOnLocalHost: true + internal-frontend: + rpc: + grpcPort: 9236 + membershipPort: 9936 + bindOnLocalHost: true + matching: rpc: grpcPort: 9235 @@ -122,9 +128,6 @@ namespaceDefaults: state: "disabled" URI: "file:///tmp/temporal_vis_archival/development" -publicClient: - hostPort: "localhost:9233" - dynamicConfigClient: filepath: "config/dynamicconfig/development-cass.yaml" pollInterval: "10s" diff --git a/config/development-postgres-es.yaml b/config/development-postgres-es.yaml index 47fda6a4cda..06f64113f33 100644 --- a/config/development-postgres-es.yaml +++ b/config/development-postgres-es.yaml @@ -63,6 +63,12 @@ services: membershipPort: 6933 bindOnLocalHost: true + internal-frontend: + rpc: + grpcPort: 7236 + membershipPort: 6936 + bindOnLocalHost: true + matching: rpc: grpcPort: 7235 @@ -124,9 +130,6 @@ namespaceDefaults: state: "disabled" URI: "file:///tmp/temporal_vis_archival/development" -publicClient: - hostPort: "localhost:7233" - dynamicConfigClient: filepath: "config/dynamicconfig/development-sql.yaml" pollInterval: "10s" diff --git a/config/development-postgres.yaml b/config/development-postgres.yaml index 717248dede0..79dd13998df 100644 --- a/config/development-postgres.yaml +++ b/config/development-postgres.yaml @@ -51,6 +51,12 @@ services: membershipPort: 6933 bindOnLocalHost: true + internal-frontend: + rpc: + grpcPort: 7236 + membershipPort: 6936 + bindOnLocalHost: true + matching: rpc: grpcPort: 7235 @@ -112,9 +118,6 @@ namespaceDefaults: state: "disabled" URI: "file:///tmp/temporal_vis_archival/development" -publicClient: - hostPort: "localhost:7233" - dynamicConfigClient: filepath: "config/dynamicconfig/development-sql.yaml" pollInterval: "10s" diff --git a/config/development-sqlite-file.yaml b/config/development-sqlite-file.yaml index 8e409a090ef..29ca7743f7a 100644 --- a/config/development-sqlite-file.yaml +++ b/config/development-sqlite-file.yaml @@ -75,6 +75,12 @@ services: membershipPort: 6933 bindOnLocalHost: true + internal-frontend: + rpc: + grpcPort: 7236 + membershipPort: 6936 + bindOnLocalHost: true + matching: rpc: grpcPort: 7235 @@ -136,9 +142,6 @@ namespaceDefaults: state: "disabled" URI: "file:///tmp/temporal_vis_archival/development" -publicClient: - hostPort: "localhost:7233" - dynamicConfigClient: filepath: "config/dynamicconfig/development-sql.yaml" pollInterval: "10s" diff --git a/config/development-sqlite.yaml b/config/development-sqlite.yaml index c2ada09611e..2f87d2aaa17 100644 --- a/config/development-sqlite.yaml +++ b/config/development-sqlite.yaml @@ -71,6 +71,12 @@ services: membershipPort: 6933 bindOnLocalHost: true + internal-frontend: + rpc: + grpcPort: 7236 + membershipPort: 6936 + bindOnLocalHost: true + matching: rpc: grpcPort: 7235 @@ -132,9 +138,6 @@ namespaceDefaults: state: "disabled" URI: "file:///tmp/temporal_vis_archival/development" -publicClient: - hostPort: "localhost:7233" - dynamicConfigClient: filepath: "config/dynamicconfig/development-sql.yaml" pollInterval: "10s" diff --git a/config/development-standby.yaml b/config/development-standby.yaml index 798e70f95f1..4db51c2669a 100644 --- a/config/development-standby.yaml +++ b/config/development-standby.yaml @@ -44,6 +44,12 @@ services: membershipPort: 8933 bindOnLocalHost: true + internal-frontend: + rpc: + grpcPort: 8236 + membershipPort: 8936 + bindOnLocalHost: true + matching: rpc: grpcPort: 8235 @@ -122,9 +128,6 @@ namespaceDefaults: state: "disabled" URI: "file:///tmp/temporal_vis_archival/development" -publicClient: - hostPort: "localhost:8233" - dynamicConfigClient: filepath: "config/dynamicconfig/development-cass.yaml" pollInterval: "10s" diff --git a/docker/config_template.yaml b/docker/config_template.yaml index 53713b678fc..046e7f7d601 100644 --- a/docker/config_template.yaml +++ b/docker/config_template.yaml @@ -199,7 +199,8 @@ global: checkInterval: {{ default .Env.TEMPORAL_TLS_EXPIRATION_CHECKS_CHECK_INTERVAL "0s" }} internode: # This server section configures the TLS certificate that internal temporal - # cluster nodes (history or matching) present to other clients within the Temporal Cluster. + # cluster nodes (history, matching, and internal-frontend) present to other + # clients within the Temporal Cluster. server: requireClientAuth: {{ default .Env.TEMPORAL_TLS_REQUIRE_CLIENT_AUTH "false" }} @@ -218,7 +219,8 @@ global: {{- end }} # This client section is used to configure the TLS clients within - # the Temporal Cluster that connect to an Internode (history or matching) + # the Temporal Cluster that connect to an Internode (history, matching, or + # internal-frontend) client: serverName: {{ default .Env.TEMPORAL_TLS_INTERNODE_SERVER_NAME "" }} disableHostVerification: {{ default .Env.TEMPORAL_TLS_INTERNODE_DISABLE_HOST_VERIFICATION "false"}} @@ -232,8 +234,7 @@ global: {{- end }} frontend: # This server section configures the TLS certificate that the Frontend - # server presents to all clients (specifically the Worker role within - # the Temporal Cluster and all External SDKs connecting to the Cluster) + # server presents to external clients. server: requireClientAuth: {{ default .Env.TEMPORAL_TLS_REQUIRE_CLIENT_AUTH "false" }} certFile: {{ default .Env.TEMPORAL_TLS_FRONTEND_CERT "" }} @@ -298,6 +299,14 @@ services: membershipPort: {{ default .Env.FRONTEND_MEMBERSHIP_PORT "6933" }} bindOnIP: {{ default .Env.BIND_ON_IP "127.0.0.1" }} + {{- if .Env.USE_INTERNAL_FRONTEND }} + internal-frontend: + rpc: + grpcPort: {{ default .Env.INTERNAL_FRONTEND_GRPC_PORT "7236" }} + membershipPort: {{ default .Env.INTERNAL_FRONTEND_MEMBERSHIP_PORT "6936" }} + bindOnIP: {{ default .Env.BIND_ON_IP "127.0.0.1" }} + {{- end }} + matching: rpc: grpcPort: {{ default .Env.MATCHING_GRPC_PORT "7235" }} @@ -357,10 +366,14 @@ namespaceDefaults: state: "disabled" URI: "file:///tmp/temporal_vis_archival/development" +{{- if or (.Env.USE_INTERNAL_FRONTEND) (and (not .Env.TEMPORAL_AUTH_AUTHORIZER) (not .Env.TEMPORAL_AUTH_CLAIM_MAPPER)) }} +{{/* publicClient is not needed with internal frontend, or if not using authorizer + claim mapper */}} +{{- else }} {{ $publicIp := default .Env.BIND_ON_IP "127.0.0.1" -}} {{- $defaultPublicHostPost := (print $publicIp ":" $temporalGrpcPort) -}} publicClient: hostPort: "{{ default .Env.PUBLIC_FRONTEND_ADDRESS $defaultPublicHostPost }}" +{{- end }} dynamicConfigClient: filepath: "{{ default .Env.DYNAMIC_CONFIG_FILE_PATH "/etc/temporal/config/dynamicconfig/docker.yaml" }}" diff --git a/host/onebox.go b/host/onebox.go index 66e3d40bb36..d2c43d6bc37 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -392,6 +392,7 @@ func (c *temporalImpl) startFrontend(hosts map[primitives.ServiceName][]string, fx.Provide(func() persistenceClient.AbstractDataStoreFactory { return nil }), fx.Provide(func() dynamicconfig.Client { return c.dcClient }), fx.Provide(func() log.Logger { return c.logger }), + fx.Provide(resource.DefaultSnTaggedLoggerProvider), fx.Provide(func() *esclient.Config { return c.esConfig }), fx.Provide(func() esclient.Client { return c.esClient }), fx.Supply(c.spanExporters), @@ -479,6 +480,7 @@ func (c *temporalImpl) startHistory( fx.Provide(func() persistenceClient.AbstractDataStoreFactory { return nil }), fx.Provide(func() dynamicconfig.Client { return c.dcClient }), fx.Provide(func() log.Logger { return c.logger }), + fx.Provide(resource.DefaultSnTaggedLoggerProvider), fx.Provide(func() *esclient.Config { return c.esConfig }), fx.Provide(func() esclient.Client { return c.esClient }), fx.Provide(workflow.NewTaskGeneratorProvider), @@ -562,6 +564,7 @@ func (c *temporalImpl) startMatching(hosts map[primitives.ServiceName][]string, fx.Provide(func() persistenceClient.AbstractDataStoreFactory { return nil }), fx.Provide(func() dynamicconfig.Client { return c.dcClient }), fx.Provide(func() log.Logger { return c.logger }), + fx.Provide(resource.DefaultSnTaggedLoggerProvider), fx.Supply(c.spanExporters), temporal.ServiceTracingModule, matching.Module, @@ -647,6 +650,7 @@ func (c *temporalImpl) startWorker(hosts map[primitives.ServiceName][]string, st fx.Provide(func() persistenceClient.AbstractDataStoreFactory { return nil }), fx.Provide(func() dynamicconfig.Client { return c.dcClient }), fx.Provide(func() log.Logger { return c.logger }), + fx.Provide(resource.DefaultSnTaggedLoggerProvider), fx.Provide(func() esclient.Client { return c.esClient }), fx.Provide(func() *esclient.Config { return c.esConfig }), fx.Supply(c.spanExporters), diff --git a/host/simpleMonitor.go b/host/simpleMonitor.go index 80eadc348b4..886da95f905 100644 --- a/host/simpleMonitor.go +++ b/host/simpleMonitor.go @@ -62,7 +62,11 @@ func (s *simpleMonitor) WhoAmI() (*membership.HostInfo, error) { } func (s *simpleMonitor) GetResolver(service primitives.ServiceName) (membership.ServiceResolver, error) { - return s.resolvers[service], nil + resolver, ok := s.resolvers[service] + if !ok { + return nil, membership.ErrUnknownService + } + return resolver, nil } func (s *simpleMonitor) Lookup(service primitives.ServiceName, key string) (*membership.HostInfo, error) { diff --git a/service/frontend/adminHandler.go b/service/frontend/adminHandler.go index 3469477f141..3e8c687959a 100644 --- a/service/frontend/adminHandler.go +++ b/service/frontend/adminHandler.go @@ -806,12 +806,16 @@ func (adh *AdminHandler) DescribeCluster( var rings []*clusterspb.RingInfo for _, role := range []primitives.ServiceName{ primitives.FrontendService, + primitives.InternalFrontendService, primitives.HistoryService, primitives.MatchingService, primitives.WorkerService, } { resolver, err := monitor.GetResolver(role) if err != nil { + if role == primitives.InternalFrontendService { + continue // this one is optional + } return nil, err } diff --git a/service/frontend/fx.go b/service/frontend/fx.go index 0081b7e1588..8e6f79a8379 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -26,6 +26,7 @@ package frontend import ( "context" + "fmt" "net" "go.uber.org/fx" @@ -130,6 +131,7 @@ func NewServiceProvider( func GrpcServerOptionsProvider( logger log.Logger, serviceConfig *Config, + serviceName primitives.ServiceName, rpcFactory common.RPCFactory, namespaceLogInterceptor *interceptor.NamespaceLogInterceptor, namespaceRateLimiterInterceptor *interceptor.NamespaceRateLimitInterceptor, @@ -158,7 +160,16 @@ func GrpcServerOptionsProvider( Time: serviceConfig.KeepAliveTime(), Timeout: serviceConfig.KeepAliveTimeout(), } - grpcServerOptions, err := rpcFactory.GetFrontendGRPCServerOptions() + var grpcServerOptions []grpc.ServerOption + var err error + switch serviceName { + case primitives.FrontendService: + grpcServerOptions, err = rpcFactory.GetFrontendGRPCServerOptions() + case primitives.InternalFrontendService: + grpcServerOptions, err = rpcFactory.GetInternodeGRPCServerOptions() + default: + err = fmt.Errorf("unexpected frontend service name %q", serviceName) + } if err != nil { logger.Fatal("creating gRPC server options failed", tag.Error(err)) } diff --git a/temporal/fx.go b/temporal/fx.go index 6350fce59ab..7c547e23604 100644 --- a/temporal/fx.go +++ b/temporal/fx.go @@ -143,6 +143,7 @@ func NewServerFx(opts ...ServerOption) (*ServerFx, error) { fx.Provide(HistoryServiceProvider), fx.Provide(MatchingServiceProvider), fx.Provide(FrontendServiceProvider), + fx.Provide(InternalFrontendServiceProvider), fx.Provide(WorkerServiceProvider), fx.Provide(ApplyClusterMetadataConfigProvider), @@ -366,6 +367,7 @@ func HistoryServiceProvider( fx.Provide(func() encryption.TLSConfigProvider { return params.TlsConfigProvider }), fx.Provide(func() dynamicconfig.Client { return params.DynamicConfigClient }), fx.Provide(func() log.Logger { return params.Logger }), + fx.Provide(resource.DefaultSnTaggedLoggerProvider), fx.Provide(func() metrics.Handler { return params.MetricsHandler.WithTags(metrics.ServiceNameTag(serviceName)) }), @@ -428,6 +430,7 @@ func MatchingServiceProvider( fx.Provide(func() encryption.TLSConfigProvider { return params.TlsConfigProvider }), fx.Provide(func() dynamicconfig.Client { return params.DynamicConfigClient }), fx.Provide(func() log.Logger { return params.Logger }), + fx.Provide(resource.DefaultSnTaggedLoggerProvider), fx.Provide(func() metrics.Handler { return params.MetricsHandler.WithTags(metrics.ServiceNameTag(serviceName)) }), @@ -453,8 +456,19 @@ func MatchingServiceProvider( func FrontendServiceProvider( params ServiceProviderParamsCommon, ) (ServicesGroupOut, error) { - serviceName := primitives.FrontendService + return genericFrontendServiceProvider(params, primitives.FrontendService) +} +func InternalFrontendServiceProvider( + params ServiceProviderParamsCommon, +) (ServicesGroupOut, error) { + return genericFrontendServiceProvider(params, primitives.InternalFrontendService) +} + +func genericFrontendServiceProvider( + params ServiceProviderParamsCommon, + serviceName primitives.ServiceName, +) (ServicesGroupOut, error) { if _, ok := params.ServiceNames[serviceName]; !ok { params.Logger.Info("Service is not requested, skipping initialization.", tag.Service(serviceName)) return ServicesGroupOut{ @@ -483,11 +497,30 @@ func FrontendServiceProvider( fx.Provide(func() searchattribute.Mapper { return params.SearchAttributesMapper }), fx.Provide(func() []grpc.UnaryServerInterceptor { return params.CustomInterceptors }), fx.Provide(func() authorization.Authorizer { return params.Authorizer }), - fx.Provide(func() authorization.ClaimMapper { return params.ClaimMapper }), + fx.Provide(func() authorization.ClaimMapper { + switch serviceName { + case primitives.FrontendService: + return params.ClaimMapper + case primitives.InternalFrontendService: + return authorization.NewNoopClaimMapper() + default: + panic("Unexpected frontend service name") + } + }), fx.Provide(func() encryption.TLSConfigProvider { return params.TlsConfigProvider }), fx.Provide(func() dynamicconfig.Client { return params.DynamicConfigClient }), fx.Provide(func() log.Logger { return params.Logger }), + fx.Provide(func() log.SnTaggedLogger { + // Use "frontend" for logs even if serviceName is "internal-frontend", but add an + // extra tag to differentiate. + tags := []tag.Tag{tag.Service(primitives.FrontendService)} + if serviceName == primitives.InternalFrontendService { + tags = append(tags, tag.NewBoolTag("internal-frontend", true)) + } + return log.With(params.Logger, tags...) + }), fx.Provide(func() metrics.Handler { + // Use either "frontend" or "internal-frontend" for metrics return params.MetricsHandler.WithTags(metrics.ServiceNameTag(serviceName)) }), fx.Provide(func() resource.NamespaceLogger { return params.NamespaceLogger }), @@ -547,6 +580,7 @@ func WorkerServiceProvider( fx.Provide(func() encryption.TLSConfigProvider { return params.TlsConfigProvider }), fx.Provide(func() dynamicconfig.Client { return params.DynamicConfigClient }), fx.Provide(func() log.Logger { return params.Logger }), + fx.Provide(resource.DefaultSnTaggedLoggerProvider), fx.Provide(func() metrics.Handler { return params.MetricsHandler.WithTags(metrics.ServiceNameTag(serviceName)) }), @@ -844,6 +878,10 @@ var ServiceTracingModule = fx.Options( fx.Provide( fx.Annotate( func(rsn primitives.ServiceName, rsi resource.InstanceID) (*otelresource.Resource, error) { + // map "internal-frontend" to "frontend" for the purpose of tracing + if rsn == primitives.InternalFrontendService { + rsn = primitives.FrontendService + } serviceName := string(rsn) if !strings.HasPrefix(serviceName, "io.temporal.") { serviceName = fmt.Sprintf("io.temporal.%s", serviceName) diff --git a/temporal/server.go b/temporal/server.go index 9658c6d127a..e745c6751e1 100644 --- a/temporal/server.go +++ b/temporal/server.go @@ -48,6 +48,7 @@ type ( var ( Services = []string{ string(primitives.FrontendService), + string(primitives.InternalFrontendService), string(primitives.HistoryService), string(primitives.MatchingService), string(primitives.WorkerService),