diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index ff9c00e0ed59..b682c4bfaa65 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -294,7 +294,7 @@ func (c *Config) Validate() error { return nil } -func (c *Config) isModuleEnabled(m string) bool { +func (c *Config) isTarget(m string) bool { return util.StringsContain(c.Target, m) } @@ -726,24 +726,24 @@ func (t *Loki) setupModuleManager() error { } // Add IngesterQuerier as a dependency for store when target is either querier, ruler, read, or backend. - if t.Cfg.isModuleEnabled(Querier) || t.Cfg.isModuleEnabled(Ruler) || t.Cfg.isModuleEnabled(Read) || t.Cfg.isModuleEnabled(Backend) { + if t.Cfg.isTarget(Querier) || t.Cfg.isTarget(Ruler) || t.Cfg.isTarget(Read) || t.Cfg.isTarget(Backend) { deps[Store] = append(deps[Store], IngesterQuerier) } // If the query scheduler and querier are running together, make sure the scheduler goes // first to initialize the ring that will also be used by the querier - if (t.Cfg.isModuleEnabled(Querier) && t.Cfg.isModuleEnabled(QueryScheduler)) || t.Cfg.isModuleEnabled(All) { + if (t.Cfg.isTarget(Querier) && t.Cfg.isTarget(QueryScheduler)) || t.Cfg.isTarget(All) { deps[Querier] = append(deps[Querier], QueryScheduler) } // If the query scheduler and query frontend are running together, make sure the scheduler goes // first to initialize the ring that will also be used by the query frontend - if (t.Cfg.isModuleEnabled(QueryFrontend) && t.Cfg.isModuleEnabled(QueryScheduler)) || t.Cfg.isModuleEnabled(All) { + if (t.Cfg.isTarget(QueryFrontend) && t.Cfg.isTarget(QueryScheduler)) || t.Cfg.isTarget(All) { deps[QueryFrontend] = append(deps[QueryFrontend], QueryScheduler) } // Initialise query tags interceptors on targets running ingester - if t.Cfg.isModuleEnabled(Ingester) || t.Cfg.isModuleEnabled(Write) || t.Cfg.isModuleEnabled(All) { + if t.Cfg.isTarget(Ingester) || t.Cfg.isTarget(Write) || t.Cfg.isTarget(All) { deps[Server] = append(deps[Server], IngesterGRPCInterceptors) } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 458d7c9e3f5c..a509fbe263e0 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -295,7 +295,7 @@ func (t *Loki) initOverrides() (_ services.Service, err error) { } func (t *Loki) initOverridesExporter() (services.Service, error) { - if t.Cfg.isModuleEnabled(OverridesExporter) && t.TenantLimits == nil || t.Overrides == nil { + if t.Cfg.isTarget(OverridesExporter) && t.TenantLimits == nil || t.Overrides == nil { // This target isn't enabled by default ("all") and requires per-tenant limits to run. return nil, errors.New("overrides-exporter has been enabled, but no runtime configuration file was configured") } @@ -347,7 +347,7 @@ func (t *Loki) initDistributor() (services.Service, error) { // Register the distributor to receive Push requests over GRPC // EXCEPT when running with `-target=all` or `-target=` contains `ingester` - if !t.Cfg.isModuleEnabled(All) && !t.Cfg.isModuleEnabled(Write) && !t.Cfg.isModuleEnabled(Ingester) { + if !t.Cfg.isTarget(All) && !t.Cfg.isTarget(Write) && !t.Cfg.isTarget(Ingester) { logproto.RegisterPusherServer(t.Server.GRPC, t.distributor) } @@ -407,13 +407,13 @@ func (t *Loki) initQuerier() (services.Service, error) { } querierWorkerServiceConfig := querier.WorkerServiceConfig{ - AllEnabled: t.Cfg.isModuleEnabled(All), - ReadEnabled: t.Cfg.isModuleEnabled(Read), + AllEnabled: t.Cfg.isTarget(All), + ReadEnabled: t.Cfg.isTarget(Read), GrpcListenAddress: t.Cfg.Server.GRPCListenAddress, GrpcListenPort: t.Cfg.Server.GRPCListenPort, QuerierWorkerConfig: &t.Cfg.Worker, - QueryFrontendEnabled: t.Cfg.isModuleEnabled(QueryFrontend), - QuerySchedulerEnabled: t.Cfg.isModuleEnabled(QueryScheduler), + QueryFrontendEnabled: t.Cfg.isTarget(QueryFrontend), + QuerySchedulerEnabled: t.Cfg.isTarget(QueryScheduler), SchedulerRing: scheduler.SafeReadRing(t.Cfg.QueryScheduler, t.querySchedulerRingManager), } @@ -727,7 +727,7 @@ func (t *Loki) initBloomStore() (services.Service, error) { bsCfg := t.Cfg.StorageConfig.BloomShipperConfig var metasCache cache.Cache - if cache.IsCacheConfigured(bsCfg.MetasCache) { + if t.Cfg.isTarget(IndexGateway) && cache.IsCacheConfigured(bsCfg.MetasCache) { metasCache, err = cache.New(bsCfg.MetasCache, reg, logger, stats.BloomMetasCache, constants.Loki) // always enable LRU cache @@ -781,7 +781,7 @@ func (t *Loki) updateConfigForShipperStore() { } switch true { - case t.Cfg.isModuleEnabled(Ingester), t.Cfg.isModuleEnabled(Write): + case t.Cfg.isTarget(Ingester), t.Cfg.isTarget(Write): // Use embedded cache for caching index in memory, this also significantly helps performance. t.Cfg.StorageConfig.IndexQueriesCacheConfig = cache.Config{ EmbeddedCache: cache.EmbeddedCacheConfig{ @@ -803,7 +803,7 @@ func (t *Loki) updateConfigForShipperStore() { t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeWriteOnly t.Cfg.StorageConfig.TSDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.TSDBShipperConfig.ResyncInterval) - case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read), t.Cfg.isModuleEnabled(Backend), t.isModuleActive(IndexGateway), t.Cfg.isModuleEnabled(BloomCompactor): + case t.Cfg.isTarget(Querier), t.Cfg.isTarget(Ruler), t.Cfg.isTarget(Read), t.Cfg.isTarget(Backend), t.isModuleActive(IndexGateway), t.Cfg.isTarget(BloomCompactor): // We do not want query to do any updates to index t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeReadOnly t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly @@ -844,7 +844,7 @@ func (t *Loki) setupAsyncStore() error { ) switch true { - case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read): + case t.Cfg.isTarget(Querier), t.Cfg.isTarget(Ruler), t.Cfg.isTarget(Read): // Do not use the AsyncStore if the querier is configured with QueryStoreOnly set to true if t.Cfg.Querier.QueryStoreOnly { break @@ -855,16 +855,16 @@ func (t *Loki) setupAsyncStore() error { asyncStore = true // The legacy Read target includes the index gateway, so disable the index-gateway client in that configuration. - if t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read) { + if t.Cfg.LegacyReadTarget && t.Cfg.isTarget(Read) { t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Disabled = true t.Cfg.StorageConfig.TSDBShipperConfig.IndexGatewayClientConfig.Disabled = true } // Backend target includes the index gateway - case t.Cfg.isModuleEnabled(IndexGateway), t.Cfg.isModuleEnabled(Backend): + case t.Cfg.isTarget(IndexGateway), t.Cfg.isTarget(Backend): // we want to use the actual storage when running the index-gateway, so we remove the Addr from the config t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Disabled = true t.Cfg.StorageConfig.TSDBShipperConfig.IndexGatewayClientConfig.Disabled = true - case t.Cfg.isModuleEnabled(All): + case t.Cfg.isTarget(All): // We want ingester to also query the store when using boltdb-shipper but only when running with target All. // We do not want to use AsyncStore otherwise it would start spiraling around doing queries over and over again to the ingesters and store. // ToDo: See if we can avoid doing this when not running loki in clustered mode. @@ -985,8 +985,8 @@ func (t *Loki) supportIndexDeleteRequest() bool { // compactorAddress returns the configured address of the compactor. // It prefers grpc address over http. If the address is grpc then the bool would be true otherwise false func (t *Loki) compactorAddress() (string, bool, error) { - legacyReadMode := t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read) - if t.Cfg.isModuleEnabled(All) || legacyReadMode || t.Cfg.isModuleEnabled(Backend) { + legacyReadMode := t.Cfg.LegacyReadTarget && t.Cfg.isTarget(Read) + if t.Cfg.isTarget(All) || legacyReadMode || t.Cfg.isTarget(Backend) { // In single binary or read modes, this module depends on Server return net.JoinHostPort(t.Cfg.Server.GRPCListenAddress, strconv.Itoa(t.Cfg.Server.GRPCListenPort)), true, nil } @@ -1151,8 +1151,8 @@ func (t *Loki) initRulerStorage() (_ services.Service, err error) { // unfortunately there is no way to generate a "default" config and compare default against actual // to determine if it's unconfigured. the following check, however, correctly tests this. // Single binary integration tests will break if this ever drifts - legacyReadMode := t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read) - if (t.Cfg.isModuleEnabled(All) || legacyReadMode || t.Cfg.isModuleEnabled(Backend)) && t.Cfg.Ruler.StoreConfig.IsDefaults() { + legacyReadMode := t.Cfg.LegacyReadTarget && t.Cfg.isTarget(Read) + if (t.Cfg.isTarget(All) || legacyReadMode || t.Cfg.isTarget(Backend)) && t.Cfg.Ruler.StoreConfig.IsDefaults() { level.Info(util_log.Logger).Log("msg", "Ruler storage is not configured; ruler will not be started.") return } @@ -1476,7 +1476,7 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) { t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly managerMode := lokiring.ClientMode - if t.Cfg.isModuleEnabled(IndexGateway) || legacyReadMode || t.Cfg.isModuleEnabled(Backend) { + if t.Cfg.isTarget(IndexGateway) || legacyReadMode || t.Cfg.isTarget(Backend) { managerMode = lokiring.ServerMode } rm, err := lokiring.NewRingManager(indexGatewayRingKey, managerMode, t.Cfg.IndexGateway.Ring, t.Cfg.IndexGateway.Ring.ReplicationFactor, indexgateway.NumTokens, util_log.Logger, prometheus.DefaultRegisterer) @@ -1497,7 +1497,7 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) { func (t *Loki) initIndexGatewayInterceptors() (services.Service, error) { // Only expose per-tenant metric if index gateway runs as standalone service - if t.Cfg.isModuleEnabled(IndexGateway) { + if t.Cfg.isTarget(IndexGateway) { interceptors := indexgateway.NewServerInterceptors(prometheus.DefaultRegisterer) t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, interceptors.PerTenantRequestCount) } @@ -1572,7 +1572,7 @@ func (t *Loki) initQuerySchedulerRing() (_ services.Service, err error) { t.Cfg.QueryScheduler.SchedulerRing.ListenPort = t.Cfg.Server.GRPCListenPort managerMode := lokiring.ClientMode - if t.Cfg.isModuleEnabled(QueryScheduler) || t.Cfg.isModuleEnabled(Backend) || t.Cfg.isModuleEnabled(All) || (t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read)) { + if t.Cfg.isTarget(QueryScheduler) || t.Cfg.isTarget(Backend) || t.Cfg.isTarget(All) || (t.Cfg.LegacyReadTarget && t.Cfg.isTarget(Read)) { managerMode = lokiring.ServerMode } rm, err := lokiring.NewRingManager(schedulerRingKey, managerMode, t.Cfg.QueryScheduler.SchedulerRing, scheduler.ReplicationFactor, scheduler.NumTokens, util_log.Logger, prometheus.DefaultRegisterer) diff --git a/pkg/loki/validation.go b/pkg/loki/validation.go index 1acb8d20afe1..6e7e19cc4480 100644 --- a/pkg/loki/validation.go +++ b/pkg/loki/validation.go @@ -14,7 +14,7 @@ func validateBackendAndLegacyReadMode(c *Config) []error { var errs []error // Honor the legacy scalable deployment topology if c.LegacyReadTarget { - if c.isModuleEnabled(Backend) { + if c.isTarget(Backend) { errs = append(errs, fmt.Errorf("CONFIG ERROR: invalid target, cannot run backend target with legacy read mode")) } }