Skip to content

Commit

Permalink
Add count the hashtags
Browse files Browse the repository at this point in the history
  • Loading branch information
jhanMum committed Mar 16, 2018
1 parent 30fd52d commit 0552e7c
Show file tree
Hide file tree
Showing 2 changed files with 3,292 additions and 12 deletions.
24 changes: 12 additions & 12 deletions Codes/sparkKafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

sc = SparkContext(appName="CS523FinalProject")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 2)
ssc = StreamingContext(sc, 60)

kvs = KafkaUtils.createStream(ssc, 'localhost:2181', 'Spark-Streaming', {'tweets':1})
parsed = kvs.map(lambda v: json.loads(v[1]))
Expand Down Expand Up @@ -42,18 +42,18 @@
# plt.bar(data['bins'], data['freq'], width=2000)
# plt.title('Histogram of \'balance\'')

# Count the hashtags
hashtags = parsed.filter(lambda t: t.get('lang') == 'en') \
.map(lambda tweet: tweet.get('entities')) \
.filter(lambda e: e != None) \
.map(lambda e: e.get('hashtags')) \
.flatMap(lambda a: a[:]) \
.filter(lambda d: d.get('text').encode('utf-8').isalpha()) \
.map(lambda d: d.get('text').encode('utf-8')) \
.map(lambda s: (s,1)) \
.reduceByKey(lambda x,y: x + y)
hashtags.pprint()

# hashtags = parsed.filter(lambda t: t.get('lang') == 'en') \
# .map(lambda tweet: tweet.get('entities')) \
# .filter(lambda e: e != None) \
# .map(lambda e: e.get('hashtags')) \
# .flatMap(lambda a: a[:]) \
# .filter(lambda d: re.findall(ur'^\\[u]', d.get('text')) == None)
# .map(lambda d: d.get('text').encode('utf-8', 'ignore').decode('utf-8'))
# hashtags.pprint()
# .filter(lambda d: d != None).map(lambda d: d.get('text'))
# hashtagCounts = hashtags.map(lambda t: (t,1)).reduceByKey(lambda x,y: x+y)
# hashtagCounts.pprint()



Expand Down
Loading

0 comments on commit 0552e7c

Please sign in to comment.