Skip to content

Commit

Permalink
Refactor cron scaler config
Browse files Browse the repository at this point in the history
Signed-off-by: Rick Brouwer <rickbrouwer@gmail.com>
  • Loading branch information
rickbrouwer committed Aug 22, 2024
1 parent f4261e3 commit 1a90c80
Showing 1 changed file with 50 additions and 63 deletions.
113 changes: 50 additions & 63 deletions pkg/scalers/cron_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scalers
import (
"context"
"fmt"
"strconv"
"strings"
"time"

Expand All @@ -23,28 +22,58 @@ const (

type cronScaler struct {
metricType v2.MetricTargetType
metadata *cronMetadata
metadata cronMetadata
logger logr.Logger
}

type cronMetadata struct {
start string
end string
timezone string
desiredReplicas int64
triggerIndex int
Start string `keda:"name=start, order=triggerMetadata"`
End string `keda:"name=end, order=triggerMetadata"`
Timezone string `keda:"name=timezone, order=triggerMetadata"`
DesiredReplicas int64 `keda:"name=desiredReplicas, order=triggerMetadata"`
TriggerIndex int
}

func (m *cronMetadata) Validate() error {
if m.Timezone == "" {
return fmt.Errorf("no timezone specified")
}

parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
if m.Start == "" {
return fmt.Errorf("no start schedule specified")
}
if _, err := parser.Parse(m.Start); err != nil {
return fmt.Errorf("error parsing start schedule: %w", err)
}

if m.End == "" {
return fmt.Errorf("no end schedule specified")
}
if _, err := parser.Parse(m.End); err != nil {
return fmt.Errorf("error parsing end schedule: %w", err)
}

if m.Start == m.End {
return fmt.Errorf("start and end can not have exactly same time input")
}

if m.DesiredReplicas == 0 {
return fmt.Errorf("no desiredReplicas specified")
}

return nil
}

// NewCronScaler creates a new cronScaler
func NewCronScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}

meta, parseErr := parseCronMetadata(config)
if parseErr != nil {
return nil, fmt.Errorf("error parsing cron metadata: %w", parseErr)
meta, err := parseCronMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing cron metadata: %w", err)
}

return &cronScaler{
Expand All @@ -68,51 +97,12 @@ func getCronTime(location *time.Location, spec string) (int64, error) {
return cronTime, nil
}

func parseCronMetadata(config *scalersconfig.ScalerConfig) (*cronMetadata, error) {
if len(config.TriggerMetadata) == 0 {
return nil, fmt.Errorf("invalid Input Metadata. %s", config.TriggerMetadata)
func parseCronMetadata(config *scalersconfig.ScalerConfig) (cronMetadata, error) {
meta := cronMetadata{TriggerIndex: config.TriggerIndex}
if err := config.TypedConfig(&meta); err != nil {
return meta, err
}

meta := cronMetadata{}
if val, ok := config.TriggerMetadata["timezone"]; ok && val != "" {
meta.timezone = val
} else {
return nil, fmt.Errorf("no timezone specified. %s", config.TriggerMetadata)
}
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
if val, ok := config.TriggerMetadata["start"]; ok && val != "" {
_, err := parser.Parse(val)
if err != nil {
return nil, fmt.Errorf("error parsing start schedule: %w", err)
}
meta.start = val
} else {
return nil, fmt.Errorf("no start schedule specified. %s", config.TriggerMetadata)
}
if val, ok := config.TriggerMetadata["end"]; ok && val != "" {
_, err := parser.Parse(val)
if err != nil {
return nil, fmt.Errorf("error parsing end schedule: %w", err)
}
meta.end = val
} else {
return nil, fmt.Errorf("no end schedule specified. %s", config.TriggerMetadata)
}
if meta.start == meta.end {
return nil, fmt.Errorf("error parsing schedule. %s: start and end can not have exactly same time input", config.TriggerMetadata)
}
if val, ok := config.TriggerMetadata["desiredReplicas"]; ok && val != "" {
metadataDesiredReplicas, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("error parsing desiredReplicas metadata. %s", config.TriggerMetadata)
}

meta.desiredReplicas = int64(metadataDesiredReplicas)
} else {
return nil, fmt.Errorf("no DesiredReplicas specified. %s", config.TriggerMetadata)
}
meta.triggerIndex = config.TriggerIndex
return &meta, nil
return meta, nil
}

func (s *cronScaler) Close(context.Context) error {
Expand All @@ -127,37 +117,34 @@ func parseCronTimeFormat(s string) string {
return s
}

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *cronScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
var specReplicas int64 = 1
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("cron-%s-%s-%s", s.metadata.timezone, parseCronTimeFormat(s.metadata.start), parseCronTimeFormat(s.metadata.end)))),
Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, kedautil.NormalizeString(fmt.Sprintf("cron-%s-%s-%s", s.metadata.Timezone, parseCronTimeFormat(s.metadata.Start), parseCronTimeFormat(s.metadata.End)))),
},
Target: GetMetricTarget(s.metricType, specReplicas),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: cronMetricType}
return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *cronScaler) GetMetricsAndActivity(_ context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
var defaultDesiredReplicas = int64(defaultDesiredReplicas)

location, err := time.LoadLocation(s.metadata.timezone)
location, err := time.LoadLocation(s.metadata.Timezone)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("unable to load timezone. Error: %w", err)
}

// Since we are considering the timestamp here and not the exact time, timezone does matter.
currentTime := time.Now().Unix()

nextStartTime, startTimecronErr := getCronTime(location, s.metadata.start)
nextStartTime, startTimecronErr := getCronTime(location, s.metadata.Start)
if startTimecronErr != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error initializing start cron: %w", startTimecronErr)
}

nextEndTime, endTimecronErr := getCronTime(location, s.metadata.end)
nextEndTime, endTimecronErr := getCronTime(location, s.metadata.End)
if endTimecronErr != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error intializing end cron: %w", endTimecronErr)
}
Expand All @@ -167,7 +154,7 @@ func (s *cronScaler) GetMetricsAndActivity(_ context.Context, metricName string)
metric := GenerateMetricInMili(metricName, float64(defaultDesiredReplicas))
return []external_metrics.ExternalMetricValue{metric}, false, nil
case currentTime <= nextEndTime:
metric := GenerateMetricInMili(metricName, float64(s.metadata.desiredReplicas))
metric := GenerateMetricInMili(metricName, float64(s.metadata.DesiredReplicas))
return []external_metrics.ExternalMetricValue{metric}, true, nil
default:
metric := GenerateMetricInMili(metricName, float64(defaultDesiredReplicas))
Expand Down

0 comments on commit 1a90c80

Please sign in to comment.