Skip to content

Commit

Permalink
make column name sql-complaint
Browse files Browse the repository at this point in the history
  • Loading branch information
Tagar committed Feb 19, 2018
1 parent bb589ed commit 7aff06b
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
12 changes: 8 additions & 4 deletions abalon/spark/pivoter.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,12 @@ def AggSparkPivoter (df, all_vars=None, agg_op=operator.add):
'''

def agg_merge_two_dicts(x, y, agg_op):
return {k: agg_op(float(x.get(k, 0)),
z = {k: agg_op(float(x.get(k, 0)),
float(y.get(k, 0))
)
for k in set(x).union(y)
for k in set(x).union(y)
}
return z

return pivot_df(df, agg_merge_two_dicts, all_vars, agg_op)

Expand All @@ -120,15 +121,18 @@ def map_dict_to_denseArray(idx, d):
# get list of variables from the dataset:
all_vars = sorted([row[0] for row in df.select(key_col).distinct().collect()])

# key becomes column names, so make it sql-complaint
columns = [field_name.strip().replace('.','_') for field_name in all_vars]

pivoted_rdd = (df.rdd
.map(lambda (idx, k, v): (idx, {k: v})) # convert k,v to a 1-element dict
.reduceByKey(lambda x,y: merger_func(x, y, agg_op)) # merge into a single dict for all vars for this idx
.map(lambda (idx, d): list(map_dict_to_denseArray(idx, d)))
# create final rdd with dense array of all variables
)

fields = [StructField(idx_col, StringType(), False)]
fields += [StructField(field_name, DoubleType(), True) for field_name in all_vars]
fields = [StructField(idx_col, StringType(), False)]
fields += [StructField(column, DoubleType(), True) for column in columns]

schema = StructType(fields)

Expand Down
2 changes: 1 addition & 1 deletion abalon/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

version = '2.2.4'
version = '2.2.5'


0 comments on commit 7aff06b

Please sign in to comment.