Skip to content

Commit

Permalink
fix(blooms): Disable metas cache on bloom gateway (#12959)
Browse files Browse the repository at this point in the history
The bloom gateway does not fetch any metas any more since the index gateway resolves them.
Renamed isModuleEnabled() to isTarget(), because the function name is confusing.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed May 14, 2024
1 parent 7cc9a93 commit 00bdd2f
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 26 deletions.
10 changes: 5 additions & 5 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (c *Config) Validate() error {
return nil
}

func (c *Config) isModuleEnabled(m string) bool {
func (c *Config) isTarget(m string) bool {
return util.StringsContain(c.Target, m)
}

Expand Down Expand Up @@ -726,24 +726,24 @@ func (t *Loki) setupModuleManager() error {
}

// Add IngesterQuerier as a dependency for store when target is either querier, ruler, read, or backend.
if t.Cfg.isModuleEnabled(Querier) || t.Cfg.isModuleEnabled(Ruler) || t.Cfg.isModuleEnabled(Read) || t.Cfg.isModuleEnabled(Backend) {
if t.Cfg.isTarget(Querier) || t.Cfg.isTarget(Ruler) || t.Cfg.isTarget(Read) || t.Cfg.isTarget(Backend) {
deps[Store] = append(deps[Store], IngesterQuerier)
}

// If the query scheduler and querier are running together, make sure the scheduler goes
// first to initialize the ring that will also be used by the querier
if (t.Cfg.isModuleEnabled(Querier) && t.Cfg.isModuleEnabled(QueryScheduler)) || t.Cfg.isModuleEnabled(All) {
if (t.Cfg.isTarget(Querier) && t.Cfg.isTarget(QueryScheduler)) || t.Cfg.isTarget(All) {
deps[Querier] = append(deps[Querier], QueryScheduler)
}

// If the query scheduler and query frontend are running together, make sure the scheduler goes
// first to initialize the ring that will also be used by the query frontend
if (t.Cfg.isModuleEnabled(QueryFrontend) && t.Cfg.isModuleEnabled(QueryScheduler)) || t.Cfg.isModuleEnabled(All) {
if (t.Cfg.isTarget(QueryFrontend) && t.Cfg.isTarget(QueryScheduler)) || t.Cfg.isTarget(All) {
deps[QueryFrontend] = append(deps[QueryFrontend], QueryScheduler)
}

// Initialise query tags interceptors on targets running ingester
if t.Cfg.isModuleEnabled(Ingester) || t.Cfg.isModuleEnabled(Write) || t.Cfg.isModuleEnabled(All) {
if t.Cfg.isTarget(Ingester) || t.Cfg.isTarget(Write) || t.Cfg.isTarget(All) {
deps[Server] = append(deps[Server], IngesterGRPCInterceptors)
}

Expand Down
40 changes: 20 additions & 20 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (t *Loki) initOverrides() (_ services.Service, err error) {
}

func (t *Loki) initOverridesExporter() (services.Service, error) {
if t.Cfg.isModuleEnabled(OverridesExporter) && t.TenantLimits == nil || t.Overrides == nil {
if t.Cfg.isTarget(OverridesExporter) && t.TenantLimits == nil || t.Overrides == nil {
// This target isn't enabled by default ("all") and requires per-tenant limits to run.
return nil, errors.New("overrides-exporter has been enabled, but no runtime configuration file was configured")
}
Expand Down Expand Up @@ -347,7 +347,7 @@ func (t *Loki) initDistributor() (services.Service, error) {

// Register the distributor to receive Push requests over GRPC
// EXCEPT when running with `-target=all` or `-target=` contains `ingester`
if !t.Cfg.isModuleEnabled(All) && !t.Cfg.isModuleEnabled(Write) && !t.Cfg.isModuleEnabled(Ingester) {
if !t.Cfg.isTarget(All) && !t.Cfg.isTarget(Write) && !t.Cfg.isTarget(Ingester) {
logproto.RegisterPusherServer(t.Server.GRPC, t.distributor)
}

Expand Down Expand Up @@ -407,13 +407,13 @@ func (t *Loki) initQuerier() (services.Service, error) {
}

querierWorkerServiceConfig := querier.WorkerServiceConfig{
AllEnabled: t.Cfg.isModuleEnabled(All),
ReadEnabled: t.Cfg.isModuleEnabled(Read),
AllEnabled: t.Cfg.isTarget(All),
ReadEnabled: t.Cfg.isTarget(Read),
GrpcListenAddress: t.Cfg.Server.GRPCListenAddress,
GrpcListenPort: t.Cfg.Server.GRPCListenPort,
QuerierWorkerConfig: &t.Cfg.Worker,
QueryFrontendEnabled: t.Cfg.isModuleEnabled(QueryFrontend),
QuerySchedulerEnabled: t.Cfg.isModuleEnabled(QueryScheduler),
QueryFrontendEnabled: t.Cfg.isTarget(QueryFrontend),
QuerySchedulerEnabled: t.Cfg.isTarget(QueryScheduler),
SchedulerRing: scheduler.SafeReadRing(t.Cfg.QueryScheduler, t.querySchedulerRingManager),
}

Expand Down Expand Up @@ -727,7 +727,7 @@ func (t *Loki) initBloomStore() (services.Service, error) {
bsCfg := t.Cfg.StorageConfig.BloomShipperConfig

var metasCache cache.Cache
if cache.IsCacheConfigured(bsCfg.MetasCache) {
if t.Cfg.isTarget(IndexGateway) && cache.IsCacheConfigured(bsCfg.MetasCache) {
metasCache, err = cache.New(bsCfg.MetasCache, reg, logger, stats.BloomMetasCache, constants.Loki)

// always enable LRU cache
Expand Down Expand Up @@ -781,7 +781,7 @@ func (t *Loki) updateConfigForShipperStore() {
}

switch true {
case t.Cfg.isModuleEnabled(Ingester), t.Cfg.isModuleEnabled(Write):
case t.Cfg.isTarget(Ingester), t.Cfg.isTarget(Write):
// Use embedded cache for caching index in memory, this also significantly helps performance.
t.Cfg.StorageConfig.IndexQueriesCacheConfig = cache.Config{
EmbeddedCache: cache.EmbeddedCacheConfig{
Expand All @@ -803,7 +803,7 @@ func (t *Loki) updateConfigForShipperStore() {
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeWriteOnly
t.Cfg.StorageConfig.TSDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.TSDBShipperConfig.ResyncInterval)

case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read), t.Cfg.isModuleEnabled(Backend), t.isModuleActive(IndexGateway), t.Cfg.isModuleEnabled(BloomCompactor):
case t.Cfg.isTarget(Querier), t.Cfg.isTarget(Ruler), t.Cfg.isTarget(Read), t.Cfg.isTarget(Backend), t.isModuleActive(IndexGateway), t.Cfg.isTarget(BloomCompactor):
// We do not want query to do any updates to index
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeReadOnly
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly
Expand Down Expand Up @@ -844,7 +844,7 @@ func (t *Loki) setupAsyncStore() error {
)

switch true {
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read):
case t.Cfg.isTarget(Querier), t.Cfg.isTarget(Ruler), t.Cfg.isTarget(Read):
// Do not use the AsyncStore if the querier is configured with QueryStoreOnly set to true
if t.Cfg.Querier.QueryStoreOnly {
break
Expand All @@ -855,16 +855,16 @@ func (t *Loki) setupAsyncStore() error {
asyncStore = true

// The legacy Read target includes the index gateway, so disable the index-gateway client in that configuration.
if t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read) {
if t.Cfg.LegacyReadTarget && t.Cfg.isTarget(Read) {
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Disabled = true
t.Cfg.StorageConfig.TSDBShipperConfig.IndexGatewayClientConfig.Disabled = true
}
// Backend target includes the index gateway
case t.Cfg.isModuleEnabled(IndexGateway), t.Cfg.isModuleEnabled(Backend):
case t.Cfg.isTarget(IndexGateway), t.Cfg.isTarget(Backend):
// we want to use the actual storage when running the index-gateway, so we remove the Addr from the config
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Disabled = true
t.Cfg.StorageConfig.TSDBShipperConfig.IndexGatewayClientConfig.Disabled = true
case t.Cfg.isModuleEnabled(All):
case t.Cfg.isTarget(All):
// We want ingester to also query the store when using boltdb-shipper but only when running with target All.
// We do not want to use AsyncStore otherwise it would start spiraling around doing queries over and over again to the ingesters and store.
// ToDo: See if we can avoid doing this when not running loki in clustered mode.
Expand Down Expand Up @@ -985,8 +985,8 @@ func (t *Loki) supportIndexDeleteRequest() bool {
// compactorAddress returns the configured address of the compactor.
// It prefers grpc address over http. If the address is grpc then the bool would be true otherwise false
func (t *Loki) compactorAddress() (string, bool, error) {
legacyReadMode := t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read)
if t.Cfg.isModuleEnabled(All) || legacyReadMode || t.Cfg.isModuleEnabled(Backend) {
legacyReadMode := t.Cfg.LegacyReadTarget && t.Cfg.isTarget(Read)
if t.Cfg.isTarget(All) || legacyReadMode || t.Cfg.isTarget(Backend) {
// In single binary or read modes, this module depends on Server
return net.JoinHostPort(t.Cfg.Server.GRPCListenAddress, strconv.Itoa(t.Cfg.Server.GRPCListenPort)), true, nil
}
Expand Down Expand Up @@ -1151,8 +1151,8 @@ func (t *Loki) initRulerStorage() (_ services.Service, err error) {
// unfortunately there is no way to generate a "default" config and compare default against actual
// to determine if it's unconfigured. the following check, however, correctly tests this.
// Single binary integration tests will break if this ever drifts
legacyReadMode := t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read)
if (t.Cfg.isModuleEnabled(All) || legacyReadMode || t.Cfg.isModuleEnabled(Backend)) && t.Cfg.Ruler.StoreConfig.IsDefaults() {
legacyReadMode := t.Cfg.LegacyReadTarget && t.Cfg.isTarget(Read)
if (t.Cfg.isTarget(All) || legacyReadMode || t.Cfg.isTarget(Backend)) && t.Cfg.Ruler.StoreConfig.IsDefaults() {
level.Info(util_log.Logger).Log("msg", "Ruler storage is not configured; ruler will not be started.")
return
}
Expand Down Expand Up @@ -1476,7 +1476,7 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) {
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly

managerMode := lokiring.ClientMode
if t.Cfg.isModuleEnabled(IndexGateway) || legacyReadMode || t.Cfg.isModuleEnabled(Backend) {
if t.Cfg.isTarget(IndexGateway) || legacyReadMode || t.Cfg.isTarget(Backend) {
managerMode = lokiring.ServerMode
}
rm, err := lokiring.NewRingManager(indexGatewayRingKey, managerMode, t.Cfg.IndexGateway.Ring, t.Cfg.IndexGateway.Ring.ReplicationFactor, indexgateway.NumTokens, util_log.Logger, prometheus.DefaultRegisterer)
Expand All @@ -1497,7 +1497,7 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) {

func (t *Loki) initIndexGatewayInterceptors() (services.Service, error) {
// Only expose per-tenant metric if index gateway runs as standalone service
if t.Cfg.isModuleEnabled(IndexGateway) {
if t.Cfg.isTarget(IndexGateway) {
interceptors := indexgateway.NewServerInterceptors(prometheus.DefaultRegisterer)
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, interceptors.PerTenantRequestCount)
}
Expand Down Expand Up @@ -1572,7 +1572,7 @@ func (t *Loki) initQuerySchedulerRing() (_ services.Service, err error) {
t.Cfg.QueryScheduler.SchedulerRing.ListenPort = t.Cfg.Server.GRPCListenPort

managerMode := lokiring.ClientMode
if t.Cfg.isModuleEnabled(QueryScheduler) || t.Cfg.isModuleEnabled(Backend) || t.Cfg.isModuleEnabled(All) || (t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read)) {
if t.Cfg.isTarget(QueryScheduler) || t.Cfg.isTarget(Backend) || t.Cfg.isTarget(All) || (t.Cfg.LegacyReadTarget && t.Cfg.isTarget(Read)) {
managerMode = lokiring.ServerMode
}
rm, err := lokiring.NewRingManager(schedulerRingKey, managerMode, t.Cfg.QueryScheduler.SchedulerRing, scheduler.ReplicationFactor, scheduler.NumTokens, util_log.Logger, prometheus.DefaultRegisterer)
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func validateBackendAndLegacyReadMode(c *Config) []error {
var errs []error
// Honor the legacy scalable deployment topology
if c.LegacyReadTarget {
if c.isModuleEnabled(Backend) {
if c.isTarget(Backend) {
errs = append(errs, fmt.Errorf("CONFIG ERROR: invalid target, cannot run backend target with legacy read mode"))
}
}
Expand Down

0 comments on commit 00bdd2f

Please sign in to comment.