在讲解 [Kafka的副本同步限流机制三部曲(源码篇)] 第二篇(原理篇) 之前
我想先讲解一下 Kafka中的数据采集和统计机制 当你了解这个机制之后才会更容易理解限流机制
你会不会好奇,kafka监控中,那些数据都是怎么计算出来的 比如下图这些指标
LogiKM监控图
这些数据都是通过Jmx获取的kafka监控指标, 那么我们今天来探讨一下,这些指标都是怎么被计算出来的
在开始分析之前,我们可以 自己思考一下
如果让你统计前一分钟内的流速,你会怎么统计才能够让数字更加精确呢?
我相信你脑海中肯定出现了一个词:滑动窗口
在kafka的数据采样和统计中,也是用了这个方法, 通过多个样本<span style="letter-spacing: 1px;">Sample
进行采样,并合并统计
当然这一个过程少不了滑动窗口的影子
我们先看下整个Kafka的数据采集和统计机制的类图
数据采集和统计全类图
看着整个类图好像很复杂,但是最核心的就是两个Interface接口
Measurable:
可测量的、可统计的 Interface。这个Interface 有一个方法, 专门用来计算需要被统计的值的
/**
* 测量这个数量并将结果作为双精度返回
* 参数:
* config – 此指标的配置
* now – 进行测量的 POSIX 时间(以毫秒为单位)
* 返回:
* 测量值
*/
double measure(MetricConfig config, long now);
比如说返回 <span style="letter-spacing: 1px;">近一分钟的bytesIn
Stat:
记录数据, 上面的是统计,但是统计需要数据来支撑, 这个Interface就是用来做记录的,这个Interface有一个方法
/**
* 记录给定的值
* 参数:
* config – 用于该指标的配置
* value – 要记录的值
* timeMs – 此值发生的 POSIX 时间(以毫秒为单位)
*/
void record(MetricConfig config, double value, long timeMs);
有了这两个接口,就基本上可以记录数据和数据统计了
当然这两个接口都有一个 MetricConfig
对象
MetricConfig
这是一个统计配置类, 主要是定义采样的样本数、单个样本的时间窗口大小、单个样本的事件窗口大小、限流机制有了这样一个配置了,就可以自由定义时间窗口的大小,和采样的样本数之类的影响最终数据精度的变量。
这里我需要对两个参数重点说明一下
单个样本的时间窗口大小: 当前记录时间 - 当前样本的开始时间 >= 此值 则需要使用下一个样本。单个样本的事件窗口大小: 当前样本窗口时间次数 >= 此值 则需要使用下一个样本
在整个统计中,不一定是按照时间窗口来统计的, 也可以按照事件窗口来统计, 具体按照不同需求选择配置
好了,大家脑海里面已经有了最基本的概念了,我们接下来就以一个kafka内部经常使用的 SampledStat
记录和统计的抽象类来好好的深入分析理解一下。
这个记录统计抽象类,是按照采样的形式来计算的。里面使用了一个或者多个样本进行采样统计
List<Sample> samples
; 当前使用的样本:current
样本初始化的值:initialValue
SampledStat :
实现了MeasurableStat
的抽象类,说明它又能采集记录数据,又能统计分析数据
当然它自身也定义了有两个抽象方法
/** 更新具体样本的数值 (单个样本)**/
protected abstract void update(Sample sample, MetricConfig config, double value, long timeMs);
/**组合所有样本的数据 来统计出想要的数据 **/
public abstract double combine(List<Sample> samples, MetricConfig config, long now);
SampledStat图形化展示
如上图所示, 是一个SampledStat
的图形化展示, 其中定义了 若干个样本 Sample
记录数据
@Override
public void record(MetricConfig config, double value, long timeMs) {
Sample sample = current(timeMs);
if (sample.isComplete(timeMs, config))
sample = advance(config, timeMs);
update(sample, config, value, timeMs);
sample.eventCount += 1;
}
当前时间 - 当前Sample的开始时间 >= 配置的时间窗口值 或者 事件总数 >= 配置的事件窗口值
/** 当前时间 - 当前Sample的开始时间 >= 配置的时间窗口值 或者 事件总数 >= 配置的事件窗口值 **/
public boolean isComplete(long timeMs, MetricConfig config) {
return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow();
}
3 . 如果这个Sample已经完成(超过窗口期), 则开始选择下一个窗口,如果下一个还没创建则创建新的,如果下一个已经存在,则重置这个Sample
4 . 拿到最终要使用的Sample后, 将数据记录到这个Sample中。具体怎么记录是让具体的实现类来实现的,因为想要最终统计的数据可以不一样,比如你只想记录Sample中的最大值,那么更新的时候判断是不是比之前的值大则更新,如果你想统计平均值,那么这里就让单个Sample中所有的值累加(最终会 除以 Sample数量 求平均数的)
5 . 记录事件次数+1。
记录数据的展示图
统计数据
/** 测量 统计 数据**/
@Override
public double measure(MetricConfig config, long now) {
// 重置过期样本
purgeObsoleteSamples(config, now);
// 组合所有样本数据,并展示最终统计数据,具体实现类来实现该方法
return combine(this.samples, config, now);
}
1 . 先重置 过期样本 , 过期样本的意思是:当前时间 - 每个样本的起始事件 > 样本数量 * 每个样本的窗口时间 ; 就是滑动窗口的概念,只统计这个滑动窗口的样本数据, 过期的样本数据会被重置(过期数据不采纳), 如下图所示
滑动窗口重置过期数据
2 . 组合所有样本数据并进行不同维度的统计并返回数值, 因为不同场景想要得到的数据不同,所以这个只是一个抽象方法,需要实现类来实现这个计算逻辑,比如如果是计算平均值 Avg
, 它的计算逻辑就是把所有的样本数据值累加并除以累积的次数
那我们再来看看不同的统计实现类
一个简单的
SampledStat
实现类 它统计所有样本最终的平均值 每个样本都会累加每一次的记录值, 最后把所有样本数据叠加 / 总共记录的次数
在这里插入图片描述
每个样本都保存这个样本的最大值, 然后最后再对比所有样本值的最大值
在这里插入图片描述
每个样本累积每一次的记录值, 统计的时候 把所有样本的累计值 再累积返回
在这里插入图片描述
Rate
也是实现了MeasurableStat
接口的,说明 它也有 记录record
和 统计measure
的方法, 实际上这个类 是一个组合类 ,里面组合了SampledStat
和TimeUnit unit
,这不是很明显了么, SampledStat负责记录和统计, 得到的数据 跟时间TimeUnit
做一下处理就得出来速率了, 比如SampledStat
的实现类AVG
可以算出来 被统计的 评价值, 但是如果我们再除以 一个时间维度, 是不是就可以得出 平均速率 了
这个有效时间 的计算会影响着最终速率的结果
public long windowSize(MetricConfig config, long now) {
// 将过期的样本给重置掉
stat.purgeObsoleteSamples(config, now);
// 总共运行的时候 = 当前时间 - 最早的样本的开始时间
long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs;
// 总时间/单个创建时间 = 多少个完整的窗口时间
int numFullWindows = (int) (totalElapsedTimeMs / config.timeWindowMs());
int minFullWindows = config.samples() - 1;
// If the available windows are less than the minimum required, add the difference to the totalElapsedTime
if (numFullWindows < minFullWindows)
totalElapsedTimeMs += (minFullWindows - numFullWindows) * config.timeWindowMs();
return totalElapsedTimeMs;
}
这是Rate的有效时间的计算逻辑,当然Rate
还有一个子类是 SampleRate
SampleRate的窗口Size计算逻辑
这个子类,将 有效时间的计算逻辑改的更简单, 如果运行时间<一个样本窗口的时间 则他的运行时间就是单个样本的窗口时间, 否则就直接用这个运行的时间, 这个计算逻辑更简单 它跟Rate
的区别就是, 不考虑采样的时间是否足够多,我们用图来简单描述一下
SampleRate
SampleRate 速率逻辑
Rate
Rate 速率逻辑
这是一个
CompoundStat
的实现类, 说明它是一个复合统计, 可以统计很多指标在这里面 它包含速率指标和累积总指标的复合统计数据
底层实现的逻辑还是上面讲解过的
我们知道 在分区副本重分配过程中,有一个限流机制,就是指定某个限流值,副本同步过程不能超过这个阈值。做限流,那么肯定首先就需要统计 副本同步 的流速;那么上面我们讲了这么多,你应该很容易能够想到如果统计了吧?流速 bytes/s , 统计一秒钟同步了多少流量, 那么我们可以把样本窗口设置为
1s
,然后多设置几个样本窗口求平均值。
接下来我们看看 Kafka是怎么统计的, 首先找到记录 Follower Fetch 副本流量的地方如下
ReplicaFetcherThread#processPartitionData
if(quota.isThrottled(topicPartition))
quota.record(records.sizeInBytes)
设置时间窗口配置
这里设置的timeWindowMs
单个样本窗口时间= 1 snumQuotaSamples
样本数 = 11 当然这些都是可以配置的
查看使用了哪个实现类
我们可以看到最终是使用了 SampleRate
来统计流量 !
上面我们起始是主要讲解了
Measurable
接口, 它的父类是MetricValueProvider<Double>
,它没有方法,只是定义,当还有一个子接口是Gauge
,它并不是上面那种采样的形式来统计数据, 它返回的是当前的值, 瞬时值它提供的方法是value()
,Measurable
提供的是measure()
这个在kafka中使用场景很少,就不详细介绍了。
好了,这一篇我们主要讲解了一下 Kafka中的数据采集和统计机制
那么 接下来下一篇,我们来聊聊 Kafka的监控机制, 如何把这些采集
到的信息给保存起来并对外提供!!!
本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/PkbKgWhEzy5dqg4QLwwDhA
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。