Flink的基础代码示例(更新中)

分组聚合&分流操作
import org.apache.flink.api.common.functions.{FilterFunction, MapFunction, ReduceFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._

object rua01 {

  case class TemperatureSensorReanding(id: String, temperature: Double, time: Long)

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val dataStream: DataStream[String] = env.readTextFile("C:\\tool\\dev\\JAVA\\2020.09\\day0911_work01\\data\\sensor.txt")
    val tData: DataStream[TemperatureSensorReanding] = dataStream.map(data => {
      val arr = data.split(",")
      TemperatureSensorReanding(arr(0), arr(1).toDouble, arr(2).toLong)
    }) //.filter(new testFilterFunction)

    //分组聚合 每个传感器最小值
    tData.keyBy("id").minBy("temperature").print()

    tData.keyBy("id").reduce((curData, newData) =>
      TemperatureSensorReanding(curData.id, curData.temperature.min(newData.temperature), newData.time)
    ).print()

    //自定义ReduceFunction进行处理
    tData.keyBy("id").reduce(new testReduceFunction01).print()

    //分流操作,传感器低温高温分为2条流
    val splitStream: SplitStream[TemperatureSensorReanding] = tData.split(data => {
      if (data.temperature > 30.0) Seq("high") else Seq("low")
    })
    val highStream: DataStream[TemperatureSensorReanding] = splitStream.select("high")
    val lowStream: DataStream[TemperatureSensorReanding] = splitStream.select("low")
    val allStream: DataStream[TemperatureSensorReanding] = splitStream.select("high", "low")

    highStream.print("high")
    lowStream.print("low")
    allStream.print("all")

    //合流 connect 数据类型可不相同 2条流进行connect
    val warningStream: DataStream[(String, Double)] = highStream.map(data => (data.id, data.temperature))
    val connectedStreams: ConnectedStreams[(String, Double), TemperatureSensorReanding] = warningStream.connect(lowStream)
    //用coMap对数据分别进行处理
    val coMapResultStream: DataStream[Any] = connectedStreams.map(
      waringData => (waringData._1, waringData._2, "warn"),
      lowTempData => (lowTempData.id, "healthy")
    )
    coMapResultStream.print("coMAp")

    //Union合流 数据流必须为相同数据类型 可传入多条数据量
    val lowhighStream: DataStream[TemperatureSensorReanding] = highStream.union(lowStream)
    lowhighStream.print()

    env.execute("test")
  }


  class testReduceFunction01 extends ReduceFunction[TemperatureSensorReanding] {
    override def reduce(t: TemperatureSensorReanding, t1: TemperatureSensorReanding): TemperatureSensorReanding = {
      TemperatureSensorReanding(t.id, t.temperature.min(t1.temperature), t1.time)
    }
  }

  //自定义拦截器
  class testFilterFunction extends FilterFunction[TemperatureSensorReanding] {
    override def filter(value: TemperatureSensorReanding): Boolean = value.id.equals("a1")

  }

  //富函数 可获取运行时上下文,生命周期
  class testRichMongoMapper extends RichMapFunction[TemperatureSensorReanding, String] {

    //初始化操作 如数据库连接
    override def open(parameters: Configuration): Unit = {
      //getRuntimeContext
    }

    //收尾工作,如关闭连接 或清空状态
    override def close(): Unit = super.close()

    override def map(value: TemperatureSensorReanding): String = value.id + "Temperture"
  }

  class testMongoMapper extends MapFunction[TemperatureSensorReanding, String] {
    override def map(value: TemperatureSensorReanding): String = value.id + "Temperture"
  }
}
自定义SourceFunction
package com.kami.demo02

import java.util.Date

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}

import scala.collection.immutable
import scala.util.Random

object rua04 {


  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val dataStream: DataStream[TemperatureSensorReanding] = env.addSource(new SensorSource())
    dataStream.print()
    env.execute("test")
  }

  case class TemperatureSensorReanding(id: String, temperature: Double, time: Long)

  //自定义SourceFunction
  class SensorSource() extends SourceFunction[TemperatureSensorReanding] {

    //自定义标识位flag,表示是否正常接受数据源数据
    var running: Boolean = true

    override def cancel(): Unit = running = false

    override def run(ctx: SourceFunction.SourceContext[TemperatureSensorReanding]): Unit = {
      //随机生成
      val random = new Random()
      var curTemp = 1.to(10).map(i => ("sersor_" + i, random.nextDouble() * 100))

      while (running) {
        curTemp = curTemp.map(
          data => (data._1, data._2 + random.nextGaussian())
        )
        val curTime: Long = System.currentTimeMillis()
        curTemp.foreach(data => ctx.collect(TemperatureSensorReanding(data._1, data._2, curTime)))
        Thread.sleep(100)
      }
    }
  }

}
点赞

发表评论

电子邮件地址不会被公开。必填项已用 * 标注