Skip to content

Latest commit



135 lines (126 loc) · 3.81 KB


File metadata and controls

135 lines (126 loc) · 3.81 KB

Streams Processing with Apache Flink

In this workshop is build on top of the "edge2AI Workshop" (pre-condition) and add enhanced features in stream processing.

Labs summary

  • Lab 1 - Count by sensor_id

  • Lab 2 - Filtering on sensor_0 value

Lab 1 - Apache Flink - Count by sensor_id

  1. Let’s use the #iot stream form the sensors from the previous lab

    1. Dataflow:

      iot streamingFlinkDataflowCount
    2. Open two SSH connections to your environment

      flink ssl lite

      let’s have look at the code:

      Local Execution Environment

      // get iot stream from kafka - topic "iot"
          DataStream<String> iotStream = env.addSource(
              new FlinkKafkaConsumer<>("iot", new SimpleStringSchema(), properties));

      Collection Data Sources

      // split and sum on 'sensor_id'
          DataStream<Tuple5<Long, Integer, Integer, Integer, Integer>> aggStream = iotStream
           .flatMap(new trxJSONDeserializer())
           .keyBy(1) // = sensor_id
           .sum(4) ;

      Iterator Data Sink

      // write the aggregated data stream to a Kafka sink
              FlinkKafkaProducer<Tuple5<Long, Integer, Integer, Integer, Integer>> myProducer = new FlinkKafkaProducer<Tuple5<Long, Integer, Integer, Integer, Integer>>(
                      topic, new serializeSum2String(), propertiesProducer);
  2. Let’s run the application

    use the first SSH connection to run the Flink application

    cd /opt/cloudera/parcels/FLINK
    sudo wget -P /opt/cloudera/parcels/FLINK/lib/flink/examples/streaming
    ./bin/flink run -m yarn-cluster -c consumer.IoTUC1CountEventsPerSensorId -ynm IoTUC1CountEventsPerSensorId lib/flink/examples/streaming/streaming-flink- edge2ai-1.dim.local:9092
  3. Let’s see how the application works

    1. use the second SSH connection to see the result

      $ cd /opt/cloudera/parcels/CDH
      $ ./bin/kafka-console-consumer --bootstrap-server edge2ai-1.dim.local:9092 --topic result_iot_uc1_Count_EventsPerSensorId

      SSH connection

      Kafka topic simulation sum
    2. SMM view:

      SMM topic simulation sum
    3. YARN & FLINK UI view:

      Flink UI provide more details and monitoring of the job’s

      FLINK running jobs lite

Lab 2 - Filtering on sensor_0 value

  1. Let’s use the #iot stream form the sensors from the previous lab

    1. Dataflow:

      iot streamingFlinkDataflowFilter

      Collection Data Sources

      // split on 'sensor_id' & filter on sensor_0
      DataStream<Tuple5<Long, Integer, Integer, Integer, Integer>> aggStream = iotStream
         .flatMap(new trxJSONDeserializer())
         .keyBy(1) // sensor_id
         .filter(new FilterFunction<Tuple5<Long, Integer, Integer, Integer, Integer>>()
                public boolean filter(Tuple5<Long, Integer, Integer, Integer, Integer> value) throws Exception {
                     return value.f2 >= 50 ;
  2. Let’s run the application

    use the new SSH connection to run the Flink application

    cd /opt/cloudera/parcels/FLINK
    ./bin/flink run -m yarn-cluster -c consumer.IoTUC2CountEventsPerSensorIdFilter -ynm IoTUC2CountEventsPerSensorIdFilter lib/flink/examples/streaming/streaming-flink- edge2ai-1.dim.local:9092
  3. Let’s see how the application works

    1. use the second SSH connection to see the result

      $ cd /opt/cloudera/parcels/CDH
      $ ./bin/kafka-console-consumer --bootstrap-server edge2ai-1.dim.local:9092 --topic result_iot_Consumer_Filter