diff --git a/abalon/spark/pivoter.py b/abalon/spark/pivoter.py index 1f9e41f..e6ceac1 100644 --- a/abalon/spark/pivoter.py +++ b/abalon/spark/pivoter.py @@ -90,11 +90,12 @@ def AggSparkPivoter (df, all_vars=None, agg_op=operator.add): ''' def agg_merge_two_dicts(x, y, agg_op): - return {k: agg_op(float(x.get(k, 0)), + z = {k: agg_op(float(x.get(k, 0)), float(y.get(k, 0)) ) - for k in set(x).union(y) + for k in set(x).union(y) } + return z return pivot_df(df, agg_merge_two_dicts, all_vars, agg_op) @@ -120,6 +121,9 @@ def map_dict_to_denseArray(idx, d): # get list of variables from the dataset: all_vars = sorted([row[0] for row in df.select(key_col).distinct().collect()]) + # key becomes column names, so make it sql-complaint + columns = [field_name.strip().replace('.','_') for field_name in all_vars] + pivoted_rdd = (df.rdd .map(lambda (idx, k, v): (idx, {k: v})) # convert k,v to a 1-element dict .reduceByKey(lambda x,y: merger_func(x, y, agg_op)) # merge into a single dict for all vars for this idx @@ -127,8 +131,8 @@ def map_dict_to_denseArray(idx, d): # create final rdd with dense array of all variables ) - fields = [StructField(idx_col, StringType(), False)] - fields += [StructField(field_name, DoubleType(), True) for field_name in all_vars] + fields = [StructField(idx_col, StringType(), False)] + fields += [StructField(column, DoubleType(), True) for column in columns] schema = StructType(fields) diff --git a/abalon/version.py b/abalon/version.py index 527aad2..af51104 100644 --- a/abalon/version.py +++ b/abalon/version.py @@ -12,6 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -version = '2.2.4' +version = '2.2.5'