HOME | EDIT | RSS | INDEX | ABOUT | GITHUB

使用 Flink 解救多线程 Scala 应用

问题

从 Zipkin Zipkin 是 twitter 用的分布式系统 tracing 收集服务 https://zipkin.io/ 读取 Traces 信息然后计算 pXX pecentile XX https://en.wikipedia.org/wiki/Percentile 数据

Scala

看似很简单, 拿所有traces算就好了啊?

比如这样的过程

  1. 拿到数据源
  2. 按数据源从 zipkin 抓 traces
  3. 计算 pXX
  4. 结果写入某个数据存储
val datasources = CollectorService.datasources
def singleProcess(datasource) = for {
  traces <- ZipkinService.getTraces(source)
  metrics = MetricService.calculateMetrics(traces, source)
  _ <- CollectorService.writeMetrics(metrics, source)
} yield ()

val program = datasources.map(_ map singleProcess)

datasouressingleProcess 都是 Free[Program, ?] Free Monad, 免费获得自定义数据类型的 Monad, 好将副作用与业务逻辑分开 https://typelevel.org/cats/datatypes/freemonad.html

由于是函数式, 很简单的可以多线程跑起来.

这里我用 Natural Transformation Program ~> IO 将代码翻译成 Cats Effect的 IO, 然后再 parSequence 多线程并发执行计算. 并行化代码基本上只需要这么一行 (program unsafeRunSync ()).parSequence

但是当拿超级大流量的 api 的一天时间的 traces 时, 会发现内存短缺, 也就是说, 对于大数据量计算 显然这样是不 scalable 的.

Flink Scala

要想应用能处理一天的大流量 api 的 traces, 自然直接的想法当然是找一个 MapReduce 的 Cluster 将一天的数据 map 成多份来进行处理, 最后 reduce 到一起来算 pXX.

 1: lazy val env = ExecutionEnvironment.getExecutionEnvironment // <= 1
 2: 
 3: lazy val rawSources = (ZipkinClient.datasources foldMap interp) unsafeRunSync ()
 4: 
 5: lazy val datasources =
 6:   env.fromCollection(rawSources.toList) // <= 2
 7: 
 8: lazy val program: DataSet[Vector[String]] = datasources
 9:   .faltMap(splitByHour _)        // <= 3
10:   .rebalance()                   // <= 4
11:   .map(source =>
12:     (ZipkinClient.fetchTraces(source) foldMap interp) unsafeRunSync ()) // <= 5
13:   .groupBy(_._1.endpoint.id)   // <= 6
14:   .reduce((s1, s2) => s1 |+| s2) // <= 7
15:   .map((traces: (DataSource, Vector[Trace])) =>
16:     (ZipkinClient.aggregate(traces) foldMap interp) unsafeRunSync ())  // <= 8
17:   .map((metrics: (Vector[LatencyMetrics], DataSource)) =>
18:     (CollectorService
19:       .writeMetrics(metrics._1, Vector(metrics._2)) foldMap interp) unsafeRunSync ())
20: 
  1. 当前执行环境, 例如跑在 cluster 上, 还是 jvm 里
  2. 初始化DataSet, DataSet是 flink batch job 的数据结果, DataSet 中的数据意味着可以并行跑在 cluster 中
  3. 拆分数据源, 类似大数据的 helloworld 计算字符个数例子, 你得先把句子拆成字符
  4. 负载均衡
  5. 合并, 类似于把同样的字符都归到一组, 方便下来计算
  6. reduce, 把每一组数据合并成一条数据
  7. 计算pXX
46709689-8d226a80-cc90-11e8-9f7c-5c110d48302c.png

有意思的是, 你是可以本地直接 sbt run 的, 此时 flink 的 cluster 会跑在 jvm 中.

但是 跑cluster 在单个 jvm 中会耗费大量资源, 所以最佳实践其实是根据运行环境, 选择不同的 ExecutionEnvironment

lazy val env = envOrNone("EXECUTION_IN_LOCAL")
  .map(_ => ExecutionEnvironment.createCollectionsEnvironment)
  .getOrElse(ExecutionEnvironment.getExecutionEnvironment)

若是在本地运行, 比如跑功能测试或集成测试, 使用 CollectionsEnvironment 会省不少资源

Flink Cluster in Docker

若是想要把 cluster 运行在 docker 中, 也是十分简易的事情

flink:
  image: flink
  volumes:
    - ./target:/target
  working_dir: /target
  command: flink run -m jobmanager:8081 -p 4 /target/scala-2.11/app.jar
  depends_on:
    - taskmanager
jobmanager:
  image: flink:1.6.1-scala_2.11
  expose:
    - "6123"
  ports:
    - "8082:8081"
  command: jobmanager
  environment:
    - JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
  image: flink:1.6.1-scala_2.11
  expose:
    - "6121"
    - "6122"
  depends_on:
    - jobmanager
  command: taskmanager
  links:
    - "jobmanager:jobmanager"
  environment:
    - JOB_MANAGER_RPC_ADDRESS=jobmanager

其中 flink 是启动 flink job 的服务, 里面跑 flink run 命令

jobmanager 相当于 zookeeper, taskmanager 是真正跑任务的 slave

Flink Cluster on K8s

当然可以在docker中跑起来意味着部署到 k8s 也就是一个配置文件的事情

基本上, 照着 官方的 yaml 部署就好了

通常部署一个jobmanager, 两个 taskmanager 这样就有 8 个 slots

4279a294-bbb8-4367-980b-f542b063709e.png

Footnotes:

1

Zipkin 是 twitter 用的分布式系统 tracing 收集服务 https://zipkin.io/

3

Free Monad, 免费获得自定义数据类型的 Monad, 好将副作用与业务逻辑分开 https://typelevel.org/cats/datatypes/freemonad.html

4

并行化代码基本上只需要这么一行

(program unsafeRunSync ()).parSequence