Skip to content

Commit

Permalink
add Storm samples
Browse files Browse the repository at this point in the history
  • Loading branch information
Srinath Perera authored and Srinath Perera committed Jun 28, 2015
1 parent f4bf431 commit 9e12516
Show file tree
Hide file tree
Showing 17 changed files with 1,079 additions and 0 deletions.
33 changes: 33 additions & 0 deletions storm-samples/src/main/java/org/storm/sample/PatternUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2014, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.storm.sample;

import backtype.storm.tuple.Tuple;

public class PatternUtils {
public static long getInt(Tuple t, int index){
return Integer.parseInt(t.getString(index).trim());
}


public static String getString(Tuple t, int index){
return t.getString(index).trim();
}

}
45 changes: 45 additions & 0 deletions storm-samples/src/main/java/org/storm/sample/PrintBolt.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.storm.sample;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

/**
* Created on 6/27/15.
*/
public class PrintBolt extends BaseBasicBolt {
Log log = LogFactory.getLog(PrintBolt.class);
//private OutputProcessor printer = new OutputProcessor();


@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println("Output>" + tuple);
//printer.print(">" + tuple + " Done");

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.storm.sample.p1;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;


/**
* Created on 6/27/15.
*/
public class PreprocessingBolt extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String priceAsStr = tuple.getString(1);
Integer price = Integer.parseInt(priceAsStr.trim());
List<Object> output = new ArrayList<Object>();
if(price > 100){
output.add(tuple);
collector.emit(tuple.getValues());
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("symbol", "price", "volume", "timestamp"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.storm.sample.p1;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.io.*;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Arrays;
import java.util.Map;

public class PreprocessingSprout extends BaseRichSpout {

private BufferedReader reader;
SpoutOutputCollector _collector;

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("symbol", "price", "volume", "timestamp"));

}

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this._collector = collector;
URL url = PreprocessingSprout.class.getResource("/stockquotes.txt");
try {
String filePath = new File(url.toURI()).getAbsolutePath();
reader = new BufferedReader(new FileReader(filePath));
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}

@Override
public void nextTuple() {
try {
String line = reader.readLine();
if (line != null) {
String [] values = line.split(",");
_collector.emit(new Values((Object[])values));
System.out.println("Input>"+Arrays.toString(values));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.storm.sample.p1;

import org.storm.sample.PrintBolt;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;

/**
* This topology demonstrates Storm's stream groupings capabilities using word counting as an example.
*/
public class PreprocessingTopology {

public static void main(String[] args) throws Exception {

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("read", new PreprocessingSprout(), 1);
builder.setBolt("filter", new PreprocessingBolt(), 2).shuffleGrouping("read");
builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("filter");


Config conf = new Config();
conf.setDebug(false);


if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("preporcess", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
90 changes: 90 additions & 0 deletions storm-samples/src/main/java/org/storm/sample/p3/WindowBolt.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.storm.sample.p3;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


/**
* Created on 6/27/15.
*/
public class WindowBolt extends BaseBasicBolt {
TimeWindow timeWindow = new TimeWindow(60000);

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String tsAsStr = tuple.getString(3);
long ts = Long.parseLong(tsAsStr.trim());

if(timeWindow.hasExpired(ts)){
List<Tuple> collectedTuples = timeWindow.reset(ts);
long sum = 0;
for(Tuple t: collectedTuples){
String priceAsS = t.getString(1);
String amountAsS = t.getString(2);
long price = Long.parseLong(priceAsS.trim());
long amount = Long.parseLong(amountAsS.trim());
sum = sum + (price * amount);
}
collector.emit(new Values(sum));
}

timeWindow.add(tuple, ts);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("60Secvalue"));
}


public static class TimeWindow implements Serializable{
List<Tuple> window = new ArrayList<Tuple>();
long startTime = -1;
long windowSize;
public TimeWindow(long windowSize){
this.windowSize = windowSize;
}

public void add(Tuple t, long time){
window.add(t);
}

public boolean hasExpired(long time){
if(startTime == -1){
startTime = time;
}
return time - startTime > windowSize;
}

public List<Tuple> reset(long time){
startTime = time;
List<Tuple> copy = window;
window = new ArrayList<Tuple>();
return copy;
}
}
}
Loading

0 comments on commit 9e12516

Please sign in to comment.