Skip to content

Commit

Permalink
non-OOP pivoter
Browse files Browse the repository at this point in the history
so don't have to install on executor nodes (spark doesn't serialize custom classes)
  • Loading branch information
Tagar committed Feb 18, 2018
1 parent c586fc8 commit c661d71
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 69 deletions.
127 changes: 59 additions & 68 deletions abalon/spark/pivoter.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,98 +48,89 @@

from abalon.spark.sparkutils import get_spark
from pyspark.sql.types import *
import operator

###########################################################################################################

class BasicSparkPivoter (object):

def __new__ (cls, df, idx_col=None, all_vars=None):
'''
Pivots a dataframe without aggregation.

Limitations:
- {index, colname} is a unique/ "PK" for this dataset
(there is no aggregation happens for value - use AggSparkPivoter instead if this is needed)
def simple_merge_two_dicts (x, y, agg_op):
x.update(y) # modifies x with y's keys and values & returns None
return x

:param df: dataframe to pivot (see expected schema of the df above)
:param idx_col: name of the index column; if not specified, will be taked from df
:param all_vars: list of all distinct values of `colname` column;
the only reason it's passed to this function is so you can redefine order of pivoted columns;
if not specified, datset will be scanned for all possible colnames
:return: resulting dataframe
'''
def BasicSparkPivoter (df, all_vars=None):

self = super(BasicSparkPivoter, cls).__new__(cls)
'''
Pivots a dataframe without aggregation.
return self.pivot_df(df, idx_col, all_vars)
Limitations:
- {index, colname} is a unique/ "PK" for this dataset
(there is no aggregation happens for value - use AggSparkPivoter instead if this is needed)
def merge_two_dicts(self, x, y):
x.update(y) # modifies x with y's keys and values & returns None
return x
:param df: dataframe to pivot (see expected schema of the df above)
:param all_vars: list of all distinct values of `colname` column;
if not specified, datset will be scanned for all possible colnames;
the only reason it's passed to this function is so you can redefine order of pivoted columns;
:return: resulting dataframe
'''

def map_dict_to_denseArray (self, idx, d):
yield idx
for var in self.all_vars:
if var in d:
yield float(d[var]) # assuming all variables can be cast to float/double
else:
yield None # this is what makes array 'dense'.. even non-existent vars are represented with nulls
return pivot_df(df, simple_merge_two_dicts, all_vars)

def pivot_df (self, df, idx_col, all_vars):

spark = get_spark()
def agg_merge_two_dicts(x, y, agg_op):
return {k: agg_op(x.get(k, 0.0),
y.get(k, 0.0))
for k in set(x).union(y)
}

if not all_vars:
# get list of variables from the dataset:
all_vars = sorted([row[0] for row in df.rdd.map(lambda (idx, k, v): k).distinct().collect()])
self.all_vars = all_vars
def AggSparkPivoter (df, all_vars=None, agg_op=operator.add):

if not idx_col:
idx_col = df.columns[1] # take 2nd column name
'''
Pivots a dataframe with aggregation.
pivoted_rdd = (df.rdd
.map(lambda (idx, k, v): (idx, {k: v})) # convert k,v to a 1-element dict
.reduceByKey(self.merge_two_dicts) # merge into a single dict for all vars for this idx
.map(lambda (idx, d): list(self.map_dict_to_denseArray(idx, d)))
# create final rdd with dense array of all variables
)
:param df: dataframe to pivot (see expected schema of the df above)
:param all_vars: list of all distinct values of `colname` column;
if not specified, datset will be scanned for all possible colnames;
the only reason it's passed to this function is so you can redefine order of pivoted columns;
:return: resulting dataframe
'''

fields = [StructField(idx_col, StringType(), False)]
fields += [StructField(field_name, DoubleType(), True) for field_name in self.all_vars]
return pivot_df(df, agg_merge_two_dicts, all_vars, agg_op)

schema = StructType(fields)

pivoted_df = spark.createDataFrame(pivoted_rdd, schema)
return pivoted_df
###########################################################################################################

def pivot_df (df, merger_func, all_vars=None, agg_op=None):

###########################################################################################################
def map_dict_to_denseArray(idx, d):
yield idx
for var in all_vars:
if var in d:
yield float(d[var]) # assuming all variables can be cast to float/double
else:
yield None # this is what makes array 'dense'.. even non-existent vars are represented with nulls

import operator
spark = get_spark()

class AggSparkPivoter (BasicSparkPivoter):
# assuming 2nd column is index column
idx_col = df.columns[1]

def __new__ (df, idx_col=None, all_vars=None, agg_op=operator.add):
'''
Pivots a dataframe without aggregation.
if not all_vars:
# get list of variables from the dataset:
all_vars = sorted([row[0] for row in df.select(idx_col).distinct().collect()])

:param df: dataframe to pivot (see expected schema of the df above)
:param idx_col: name of the index column; if not specified, will be taked from df
:param all_vars: list of all distinct values of `colname` column;
the only reason it's passed to this function is so you can redefine order of pivoted columns;
if not specified, datset will be scanned for all possible colnames
:param agg_op: aggregation operation/function, defaults to `add`
:return: resulting dataframe
'''
pivoted_rdd = (df.rdd
.map(lambda (idx, k, v): (idx, {k: v})) # convert k,v to a 1-element dict
.reduceByKey(lambda x,y: merger_func(x, y, agg_op)) # merge into a single dict for all vars for this idx
.map(lambda (idx, d): list(map_dict_to_denseArray(idx, d)))
# create final rdd with dense array of all variables
)

self = super(AggSparkPivoter, cls).__new__(cls)
fields = [StructField(idx_col, StringType(), False)]
fields += [StructField(field_name, DoubleType(), True) for field_name in all_vars]

self.agg_op = agg_op
schema = StructType(fields)

return self.pivot_df(df, idx_col, all_vars)
pivoted_df = spark.createDataFrame(pivoted_rdd, schema)
return pivoted_df

def merge_two_dicts(self, x, y):
return {k: self.agg_op(x.get(k, 0.0),
y.get(k, 0.0))
for k in set(x).union(y)
}

2 changes: 1 addition & 1 deletion abalon/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

version = '2.1.5'
version = '2.2.0'


0 comments on commit c661d71

Please sign in to comment.