七月网

集群计算(集群计算特点)

七月网2750

一、如何计算Flink集群规模:信封背计算法

2017年柏林Flink Forward大会上Robert Metger的"Keep It Going: How to Reiably and Efficiently Operate Apache Flink"的演讲很受欢迎。Robert的其中一个主题演讲涉及到了如何估算Flink集群规模。Flink Forward大会的观众们认为这个计算方法对他们很有用,因此我们把他的演讲主题转变成这篇博客。

集群计算(集群计算特点)

Flink社区上经常被问起的一个问题是当从开发转到线上时,如何估算集群规模大小。当然,最准确的的答案是根据需要,但是这并没有什么用。这篇博客提出了一系列问题使你能够计算出一些基准。

首先,思考一下你的应用操作需要的资源基准的指标。

最后,考虑一下你的服务等级协议(SLAS),比如宕机时间、延迟和最大的吞吐量。这些指标将直接影响你的容量计算。

接下来,看一下基于预算可用的资源大小。比如:

基于上述这些因素,你现在能够估算正常流程的的资源基准。另外,还需要增加一些资源用作异常的恢复和checkpointing。

我现在通过一个集群上的虚拟job部署来描述整个资源基准的建立过程。信封背计算法的所用到的数字是不精准的,同时并没有考虑的很全面。在后面,我会指出在做计算时的忽视的一些点。

在这个案列中,我将部署典型的Flink流式应用,Kafka Topic的数据作为数据源。这个流接着使用keyed, aggregating window操作转换。窗口操作执行5分钟的窗口聚合。同时假设有源源不断的数据进来,window被设置成1分钟滑动一次。

这表示每分钟执行一次过去5分钟内的窗口聚合。这个流式应用根据userId字段进行聚合。Kafka Topic中消息的大小平均是2KB。

吞吐量是每秒100万条消息。为了理解窗口操作的state大小,你需要知道distinct Keys的数量,就是userIds的数量,这边大约是5000万个不同的用户ID。对于每个用户,你需要计算4个数字,通过longs(8 byte)存储。

现在,让我们总结一下这个任务的关键指标:

总共有5台机器运行这个job,每台机器上运行一个TaskManager。磁盘通过网络挂载,同时有10 gigabit的以太网接入。同时Kafka是独立部署在其他机器上。

每台机器有16CPU核。为了简化的需要,这边不考虑CPU和内存的使用情况。在实际情况下,你需要根据应用逻辑和state backend的使用,来考虑内存的使用。这个例子使用RocksDB state backend。(它是健壮的,同时对内存需求比较低)。

为了理解整个job运行部署的资源需求,最容易的方式是关注单台机器和TaskManager的操作。你可以通过单台机器计算出来的数字来推断整个集群的资源需求。

默认(所有的操作都有并行度和没有特殊的调度限制)所有的操作在每台机器上都有运行。

在这个例子中,Kafka source,窗口操作和Kafka sink都运行在每台机器上。

keyBy是一个分离的操作,因此资源需求计算比较容易。在现实中,keyBy是一个API,连接了Kafak Source和窗口操作。

我现在将从头到底理解这些操作的网络资源需求。

为了计算Kafka source收到的数据量,首先需要计算Kafka的聚合输入。sources每秒收到100万消息,每条消息2KB大小。

2GB/s除以5台机器,得到如下结果:

集群中每台机器上TaskManager的source收到400MB/s的数据。

接下来,你需要确保同一个key的所有事件落在一些机器上。这边,你从kafka中读取的数据可能被重新分区。

shuffle过程发送所有拥有相同key的数据到同一台机器,因此这边把400MB/s的数据分割成一个根据userId分区的流。

平均来看,你将发送80MB/s的数据到每一台机器。这个分析是从单台机器的角度,但是一些数据已经在目标机器上了,因此要减去80MB/s。

接下去的问题是窗口操作发送多少数据到Kafka Sink。结果是67MB/s,让我们看一下如何计算。

窗口操作为每个key保持了4个数字(longs)聚合。每一分钟,操作将发送当前的聚合值。每个key发送2ints(user_id, window_ts)和4 longs。

然后乘以keys数量(500000000除以机器数量)

这表示每个TaskManager从窗口操作中平均发送67MB/s的用户数据。因为Kafka sink运行在每个TaskManager上,所以没有进一步的分区操作。这就是从Flink到Kafka的发送的数据量。

从窗口操作中得到的数据每分钟会发送一次。在实际中,这个操作不会发以67MB/s的发送数据,而是在一分钟之内的几秒间到达最大带宽。

到目前为止,我们仅仅计算了Flink处理的用户数据。你同时还需要考虑磁盘的使用,比如存储state和checkpointing。为了计算磁盘的花销,你需要查看窗口计算如何进入state。Kafka Source也需要保持一些state,但是跟窗口操作的state相比,可以忽略不计。

为了理解窗口操作的state大小,让我们换一个角度看这个问题。Flink计算5分钟的时间窗口,并且1分钟滑动一次。Flink是通过保持5个窗口来实现滑动窗口。根据先前提到的,在使用窗口时,你需要为每个窗口保持40bytes的状态,并且窗口是提前聚合的。对于每一条到来的事件,你首先需要取出当前聚合值,再更新聚合值,然后把新值写回去。

有40MB/s的磁盘读写(每台机器上)。根据先前说的,磁盘是通过网络挂载的。因此需要在先前的基础上增加这个值。

上述的计算只考虑了事件到达窗口操作时触发时state的进入。此外,你还需要checkpoint和容错机制。因为,如果一台机器或者其他任何东西挂掉,你需要恢复你的窗口并继续处理。

根据先前所说,Checkpointing是每隔1分钟执行一次,并且每个checkpoint会复制整个job的状态到(通过网络挂载)文件系统。

现在,让我们快速的计算一下每台机器的state大小:

和窗口操作类似,checkpointing也是每分钟执行一次。它尝试全速发送数据到外部存储。Checkpointing引起了额外的state进入。(自从Flink1.3后,RocksDB支持增量checkpointing来降低每次checkpoint时所需的网络传输。)

400是80MB的state读写乘以5台机器。2335是Kafka进和出的总值。

免责声明:上述这些计算没有包含协议的花销,比如TCP、Ethernet和RPC(在Flink、Kafka和HDFS等中)。但是上述的计算仍旧对如何计算一个job的资源有指导意义。

基于我的分析,这个例子中,5个节点的集群,在典型的操作中,每个机器需要处理760MB/s的数据进出,同时每台机器可以处理的容量是1250MB/s。这样保留了40%的网络容量来应对我刚才提到的复杂度,比如网络协议花销,事件重放,数据倾斜引起的不平均的负载等。

当然,没有一个标准答案来说明留40%的余量是否合适。但是这个算法可以给你一个好的开始。尝试上述的计算,修改上述的参数为你自己的参数。Happy scaling!

How To Size Your Apache Flink® Cluster: A Back-of-the-Envelope Calculation

二、服务器集群是什么

1、服务器集群就是指将很多服务器集中起来一起进行同一种服务,在客户端看来就像是只有一个服务器。集群可以利用多个计算机进行并行计算从而获得很高的计算速度,也可以用多个计算机做备份,从而使得任何一个机器坏了整个系统还是能正常运行。

2、建立在现有网络结构之上,它提供了一种廉价有效透明的方法扩展网络设备和服务器的带宽、增加吞吐量、加强网络数据处理能力、提高网络的灵活性和可用性。

3、所谓分布式资源共享服务器就是指数据和程序可以不位于一个服务器上,而是分散到多个服务器,以网络上分散分布的地理信息数据及受其影响的数据库操作为研究对象的一种理论计算模型服务器形式。分布式有利于任务在整个计算机系统上进行分配与优化,克服了传统集中式系统会导致中心主机资源紧张与响应瓶颈的缺陷,解决了网络GIS

4、中存在的数据异构、数据共享、运算复杂等问题,是地理信息系统技术的一大进步。

5、这个三种架构都是常见的服务器架构,集群的主要是IT公司在做,可以保障重要数据安全;负载均衡主要是为了分担访问量,避免临时的网络堵塞,主要用于电子商务类型的网站;分布式服务器主要是解决跨区域,多个单个节点达到高速访问的目前,一般是类似CDN的用途的话,会采用分布式服务器。

6、纯手工打字,希望可以帮的到你!

三、什么是集群

集群主要分成三大类(高可用集群,负载均衡集群,科学计算集群)

高可用集群( High Availability Cluster)

负载均衡集群(Load Balance Cluster)

科学计算集群(High Performance Computing Cluster)

1、高可用集群(High Availability Cluster)

常见的就是2个节点做成的HA集群,有很多通俗的不科学的名称,比如”双机热备”,“双机互备”,“双机”。高可用集群解决的是保障用户的应用程序持续对外提供服务的能力。(请注意高可用集群既不是用来保护业务数据的,保护的是用户的业务程序对外不间断提供服务,把因软件/硬件/人为造成的故障对业务的影响降低到最小程度)。

2、负载均衡集群(Load Balance Cluster)

负载均衡系统:集群中所有的节点都处于活动状态,它们分摊系统的工作负载。一般Web服务器集群、数据库集群和应用服务器集群都属于这种类型。

负载均衡集群一般用于相应网络请求的网页服务器,数据库服务器。这种集群可以在接到请求时,检查接受请求较少,不繁忙的服务器,并把请求转到这些服务器上。从检查其他服务器状态这一点上看,负载均衡和容错集群很接近,不同之处是数量上更多。

3、科学计算集群(High Performance Computing Cluster)

高性能计算(High Perfermance Computing)集群,简称HPC集群。这类集群致力于提供单个计算机所不能提供的强大的计算能力。

3.1、高吞吐计算(High-throughput Computing)

有一类高性能计算,可以把它分成若干可以并行的子任务,而且各个子任务彼此间没有什么关联。象在家搜寻外星人( SETI@HOME– Search for Extraterrestrial Intelligence at Home)就是这一类型应用。

这一项目是利用Internet上的闲置的计算资源来搜寻外星人。SETI项目的服务器将一组数据和数据模式发给Internet上参加SETI的计算节点,计算节点在给定的数据上用给定的模式进行搜索,然后将搜索的结果发给服务器。服务器负责将从各个计算节点返回的数据汇集成完整的数据。因为这种类型应用的一个共同特征是在海量数据上搜索某些模式,所以把这类计算称为高吞吐计算。

所谓的Internet计算都属于这一类。按照 Flynn的分类,高吞吐计算属于SIMD(Single Instruction/Multiple Data)的范畴。

3.2、分布计算(Distributed Computing)

另一类计算刚好和高吞吐计算相反,它们虽然可以给分成若干并行的子任务,但是子任务间联系很紧密,需要大量的数据交换。按照Flynn的分类,分布式的高性能计算属于MIMD(Multiple Instruction/Multiple Data)的范畴。

下面说说这几种集群的应用场景:

想Dubbo是比较偏向于负载均衡集群,用过的猿友应该知道(不知道的可以自行了解一下),Dubbo同一个服务是可以有多个提供者的,当一个消费者过来,它要消费那个提供者,这里是有负载均衡机制在里面的。

搜索引擎Elasticsearch比较偏向于科学计算集群的分布计算。

而到这里,可能不少猿友都知道,集群的一些术语:集群容错、负载均衡。

集群容错(http://dubbo.io/User+Guide-zh.htm#UserGuide-zh-%E9%9B%86%E7%BE%A4%E5%AE%B9%E9%94%99)

可以自行扩展集群容错策略,参见:集群扩展

失败自动切换,当出现失败,重试其它服务器。(缺省)

通常用于读操作,但重试会带来更长延迟。

可通过retries="2"来设置重试次数(不含第一次)。

快速失败,只发起一次调用,失败立即报错。

通常用于非幂等性的写操作,比如新增记录。

失败安全,出现异常时,直接忽略。

失败自动恢复,后台记录失败请求,定时重发。

并行调用多个服务器,只要一个成功即返回。

通常用于实时性要求较高的读操作,但需要浪费更多服务资源。

可通过forks="2"来设置最大并行数。

广播调用所有提供者,逐个调用,任意一台报错则报错。(2.1.0开始支持)

通常用于通知所有提供者更新缓存或日志等本地资源信息。

负载均衡(http://dubbo.io/User+Guide-zh.htm#UserGuide-zh-%E8%B4%9F%E8%BD%BD%E5%9D%87%E8%A1%A1)

在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。

轮循,按公约后的权重设置轮循比率。

存在慢的提供者累积请求问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。

使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

一致性Hash,相同参数的请求总是发到同一提供者。

当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

算法参见:http://en.wikipedia.org/wiki/Consistent_hashing。

缺省只对第一个参数Hash,如果要修改,请配置<dubbo:parameter key="hash.arguments" value="0,1"/>

缺省用160份虚拟节点,如果要修改,请配置<dubbo:parameter key="hash.nodes" value="320"/>

好了,文章到这里就结束啦,如果本次分享的集群计算和集群计算特点问题对您有所帮助,还望关注下本站哦!