Skip to content

Commit

Permalink
Aws 1.0 transform plans along with more sql validations. (#154)
Browse files Browse the repository at this point in the history
Signed-off-by: Varun Mittal <varunmittal91@gmail.com>
  • Loading branch information
varunmittal91 authored Nov 27, 2023
1 parent 46f7fa7 commit 555b254
Show file tree
Hide file tree
Showing 46 changed files with 498 additions and 37 deletions.
28 changes: 28 additions & 0 deletions focus_converter_base/focus_converter/configs/base_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,20 @@ class UnnestValueConversionArgs(BaseModel):
] = "first"


class MissingColumnDType(BaseModel):
data_type: Literal["string", "float", "int"]


class DTypeConversionArg(BaseModel):
column_name: str
dtype: Literal["string", "float", "int"]
strict: bool = False


class SetColumnDTypesConversionArgs(BaseModel):
dtype_args: List[DTypeConversionArg]


CONFIG_FILE_PATTERN = re.compile("(.+)_S\d{3}.yaml")


Expand Down Expand Up @@ -157,6 +171,20 @@ def conversion_args_validation(cls, v: Any, field_info: ValidationInfo) -> str:
raise ValueError(
e, f"Missing or bad static value argument: {field_info.data}"
)
elif conversion_type == STATIC_CONVERSION_TYPES.APPLY_DEFAULT_IF_COLUMN_MISSING:
try:
MissingColumnDType.model_validate(v)
except ValidationError as e:
raise ValueError(
e, f"Missing or bad unnest value argument: {field_info.data}"
)
elif conversion_type == STATIC_CONVERSION_TYPES.SET_COLUMN_DTYPES:
try:
SetColumnDTypesConversionArgs.model_validate(v)
except ValidationError as e:
raise ValueError(
e, f"Missing or bad set column dtype argument: {field_info.data}"
)
return v

@field_validator("column_prefix")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
plan_name: adds dtypes to the columns required for the conversion
conversion_type: set_column_dtypes
column: PlaceHolder
focus_column: PlaceHolder
conversion_args:
dtype_args:
- column_name: line_item_unblended_cost
dtype: float
- column_name: savings_plan_used_commitment
dtype: float
- column_name: savings_plan_used_commitment
dtype: float
- column_name: savings_plan_total_commitment_to_date
dtype: float
- column_name: reservation_unused_amortized_upfront_fee_for_billing_period
dtype: float
- column_name: reservation_unused_recurring_fee
dtype: float
- column_name: pricing_public_on_demand_rate
dtype: float
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# fails if the column is not present, TODO: add a default value
plan_name: add default value to line_item_net_unblended_cost if not present in CUR dataset
conversion_type: apply_default_if_column_missing
column: line_item_net_unblended_cost
focus_column: PlaceHolder
conversion_args:
data_type: float
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# fails if the column is not present, TODO: add a default value
plan_name: add default value to reservation_reservation_a_r_n if not present in CUR dataset
conversion_type: apply_default_if_column_missing
column: reservation_reservation_a_r_n
focus_column: PlaceHolder
conversion_args:
data_type: string
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# fails if the column is not present, TODO: add a default value
plan_name: add default value to ReservationARN if not present in CUR dataset
conversion_type: apply_default_if_column_missing
column: reservation_reservation_arn
focus_column: PlaceHolder
conversion_args:
data_type: string
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# fails if the column is not present, TODO: add a default value
plan_name: add default value to SavingsPlanArn if not present in CUR dataset
conversion_type: apply_default_if_column_missing
column: savings_plan_savings_plan_arn
focus_column: PlaceHolder
conversion_args:
data_type: string

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
plan_name: convert lineItem/NetUnblendedCost, lineItem/UnblendedCost to BilledCost
conversion_type: sql_condition
conversion_args:
conditions:
- WHEN line_item_net_unblended_cost is not null THEN line_item_net_unblended_cost
default_value: line_item_unblended_cost
column: NA
focus_column: BilledCost
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
plan_name: convert lineItem/LineItemDescription to ChargeDescription
conversion_type: rename_column
column: line_item_line_item_description
focus_column: ChargeDescription
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
plan_name: convert Bill/BillType to ChargeFrequency
conversion_type: map_values
conversion_args:
value_list:
- key: Refund
value: One-Time
- key: Purchase
value: One-Time
- key: Anniversary
value: Recurring
default_value: ""
column: bill_bill_type
focus_column: ChargeFrequency
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
plan_name: convert lineItem/NetUnblendedCost, lineItem/UnblendedCost to BilledCost
conversion_type: sql_condition
conversion_args:
conditions:
- WHEN ChargeType = 'Usage' AND savings_plan_savings_plan_arn is not null OR reservation_reservation_arn is not null THEN 'Used Commitment'
- WHEN ChargeType = 'Usage' THEN 'On-Demand'
- WHEN ChargeType = 'Adjustment' AND line_item_line_item_type = 'BundledDiscount' THEN 'Credit'
- WHEN ChargeType = 'Adjustment' AND line_item_line_item_type = 'Credit' THEN 'Credit'
- WHEN ChargeType = 'Adjustment' AND line_item_line_item_type = 'Discount' THEN 'Credit'
- WHEN ChargeType = 'Adjustment' AND line_item_line_item_type = 'DiscountedUsage' THEN 'Credit'
- WHEN ChargeType = 'Adjustment' AND line_item_line_item_type = 'Refund' THEN 'Refund'
- WHEN ChargeType = 'Adjustment' THEN 'General Adjustment'
default_value: "NULL"
column: NA
focus_column: ChargeSubcategory
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# fails if the column is not present, TODO: add a default value
plan_name: convert ReservationARN/SavingsPlanArn to CommitmentDiscountCategory
conversion_type: sql_condition
conversion_args:
conditions:
- WHEN savings_plan_savings_plan_arn is not null THEN 'Spend'
- WHEN reservation_reservation_arn is not null THEN 'Usage'
default_value: "null"
column: NA
focus_column: CommitmentDiscountCategory
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# fails if the column is not present, TODO: add a default value
plan_name: convert ReservationARN/SavingsPlanArn to CommitmentDiscountId
conversion_type: sql_condition
conversion_args:
conditions:
- WHEN savings_plan_savings_plan_arn is not null THEN savings_plan_savings_plan_arn
default_value: reservation_reservation_arn
column: NA
focus_column: CommitmentDiscountId
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# fails if the column is not present, TODO: add a default value
plan_name: convert ReservationARN/SavingsPlanArn to CommitmentDiscountType
conversion_type: sql_condition
conversion_args:
conditions:
- WHEN savings_plan_savings_plan_arn is not null THEN 'Savings Plan'
- WHEN reservation_reservation_arn is not null THEN 'Reserved Instances (RI)'
default_value: "null"
column: NA
focus_column: CommitmentDiscountType
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
plan_name: conversion plan for EffectiveCost
conversion_type: sql_query
conversion_args: >
SELECT
*,
CASE
WHEN (line_item_line_item_type = 'SavingsPlanCoveredUsage') THEN savings_plan_savings_plan_effective_cost
WHEN (line_item_line_item_type = 'SavingsPlanRecurringFee') THEN (savings_plan_total_commitment_to_date - savings_plan_used_commitment)
WHEN (line_item_line_item_type = 'SavingsPlanNegation') THEN 0
WHEN (line_item_line_item_type = 'SavingsPlanUpfrontFee') THEN 0
WHEN (line_item_line_item_type = 'DiscountedUsage') THEN reservation_effective_cost
WHEN (line_item_line_item_type = 'RIFee') THEN (reservation_unused_amortized_upfront_fee_for_billing_period + reservation_unused_recurring_fee)
WHEN ((line_item_line_item_type = 'Fee') AND (reservation_reservation_a_r_n <> '')) THEN 0
ELSE line_item_unblended_cost
END AS EffectiveCost
FROM {{ TABLE_NAME }}
column: NA
focus_column: EffectiveCost
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
plan_name: convert Pricing/publicOnDemandCost to ListCost
conversion_type: sql_condition
conversion_args:
conditions:
- WHEN pricing_public_on_demand_rate is not null THEN pricing_public_on_demand_rate * line_item_usage_amount
default_value: pricing_public_on_demand_cost
column: NA
focus_column: ListCost
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
plan_name: convert Pricing/publicOnDemandRate to ListUnitPrice
conversion_type: rename_column
column: pricing_public_on_demand_rate
focus_column: ListUnitPrice
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
plan_name: convert LineItem/UsageAmount to PricingQuantity
conversion_type: sql_condition
conversion_args:
conditions:
- WHEN reservation_total_reserved_units is not null THEN reservation_total_reserved_units
default_value: line_item_usage_amount
column: line_item_usage_amount
focus_column: PricingQuantity
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
plan_name: convert Pricing/PricingUnit to PricingUnit
conversion_type: rename_column
column: pricing_unit
focus_column: PricingUnit
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
plan_name: convert Product/sku to SkuId
conversion_type: rename_column
column: product_sku
focus_column: SkuId
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
plan_name: convert Pricing/RateCode, Pricing/RateId to SkuPriceId
conversion_type: sql_condition
conversion_args:
conditions:
- WHEN pricing_rate_code is not null THEN pricing_rate_code
default_value: pricing_rate_id
column: NA
focus_column: SkuPriceId
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
plan_name: convert LineItem/UsageAmount to UsageQuantity
conversion_type: rename_column
column: line_item_usage_amount
focus_column: UsageQuantity
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
plan_name: convert Pricing/Unit to UsageUnit
conversion_type: rename_column
column: pricing_unit
focus_column: UsageUnit
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class STATIC_CONVERSION_TYPES(Enum):
# apply default values if column not present
APPLY_DEFAULT_IF_COLUMN_MISSING = "apply_default_if_column_missing"

# set column dtypes
SET_COLUMN_DTYPES = "set_column_dtypes"


__all__ = [
"STATIC_CONVERSION_TYPES",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,3 @@ def assign_static_value(
)

return pl.lit(conversion_args.static_value).alias(column_alias)

@staticmethod
def apply_default_if_column_missing(
plan: ConversionPlan, column_alias, column_validator: ColumnValidator
):
# TOTO: add option to set a default value instead of just NULL
column_validator.map_static_default_value_if_not_present(
plan=plan, column_alias=column_alias
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from typing import List, Tuple

import polars as pl

from focus_converter.configs.base_config import (
ConversionPlan,
MissingColumnDType,
SetColumnDTypesConversionArgs,
)
from focus_converter.conversion_functions.validations import ColumnValidator


class DeferredColumnFunctions:
"""
# Set of functions that can only be executed once the lazyframe is loaded and the column names available
# are known.
"""

def __init__(self):
# missing_column_plans = []
self.__missing_column_plans__: List[Tuple[str, ConversionPlan]] = []

# enforced column dtypes, need to be applied before any other conversion
# if column is present then a cast operation can be applied, if not then a new column can be added
# with null values and then cast operation can be applied
self.__enforced_column_dtypes__: List[ConversionPlan] = []

@staticmethod
def convert_focus_data_type_polars_dtype(focus_data_type):
if focus_data_type == "string":
return pl.Utf8
elif focus_data_type == "float":
return pl.Float64
elif focus_data_type == "int":
return pl.Int64
else:
raise RuntimeError(f"data_type: {focus_data_type} not implemented")

def map_missing_column_plan(
self, plan: ConversionPlan, column_alias, column_validator: ColumnValidator
):
self.__missing_column_plans__.append((column_alias, plan))
column_validator.map_static_default_value_if_not_present(
plan=plan, column_alias=column_alias
)

def map_dtype_plan(self, plan: ConversionPlan, column_validator: ColumnValidator):
self.__enforced_column_dtypes__.append(plan)
column_validator.map_dtype_enforced_node(plan=plan)

def apply_missing_column_plan(self, lf: pl.LazyFrame):
for column_alias, missing_column_plan in self.__missing_column_plans__:
if missing_column_plan.column not in lf.columns:
conversion_arg: MissingColumnDType = MissingColumnDType.model_validate(
missing_column_plan.conversion_args
)

if conversion_arg.data_type == "string":
dtype = pl.Utf8
elif conversion_arg.data_type == "float":
dtype = pl.Float64
elif conversion_arg.data_type == "int":
dtype = pl.Int64
else:
raise RuntimeError(
f"data_type: {conversion_arg.data_types} not implemented"
)

lf = lf.with_columns(
pl.lit(None).cast(dtype).alias(missing_column_plan.column)
)
else:
lf = lf.with_columns(
pl.col(missing_column_plan.column).alias(missing_column_plan.column)
)
return lf

def apply_dtype_plan(self, lf: pl.LazyFrame):
for plan in self.__enforced_column_dtypes__:
conversion_args = SetColumnDTypesConversionArgs.model_validate(
plan.conversion_args
)
for column_obj in conversion_args.dtype_args:
if column_obj.column_name not in lf.columns:
lf = lf.with_columns(
pl.lit(None)
.cast(
self.convert_focus_data_type_polars_dtype(column_obj.dtype),
strict=False,
)
.alias(column_obj.column_name)
)
else:
lf = lf.with_columns(
pl.col(column_obj.column_name).cast(
self.convert_focus_data_type_polars_dtype(column_obj.dtype),
strict=False,
)
)
return lf
Loading

0 comments on commit 555b254

Please sign in to comment.