Dubbo 是一款优秀的微服务框架,它以其高性能、简单易用、易扩展等特点,广泛应用于互联网、金融保险、科技公司、制造业、零售物流等多个领域。如今,Dubbo 框架已经成了互联网开发中比较常用的技术框架。
在Dubbo框架中,当客户端调用服务端的时候,请求抵达了服务端之后,会有专门的线程池去接收参数并且处理。所以如果要实现Dubbo的线程池监控,就需要先了解下Dubbo底层对于业务线程池的实现原理。
Dubbo底层对于线程池的查看
这里我所使用的框架是 Dubbo 2.7.8 版本,它在底层对于线程池的管理是通过一个叫做ExecutorRepository 的类处理的,这个类负责创建并管理 Dubbo 中的线程池,通过该扩展接口,我们可以获取到Dubbo在实际运行中的业务线程池对象。具体的处理逻辑部分如下所示:
package org.idea.dubbo.monitor.core.collect;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import java.lang.reflect.Field;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Author idea
* @Date created in 7:04 下午 2022/6/29
*/
public class DubboThreadPoolCollector {
/**
* 获取Dubbo的线程池
* @return
*/
public static ThreadPoolExecutor getDubboThreadPoolInfo(){
//dubbo线程池数量监控
try {
ExtensionLoader<ExecutorRepository> executorRepositoryExtensionLoader = ExtensionLoader.getExtensionLoader(ExecutorRepository.class);
DefaultExecutorRepository defaultExecutorRepository = (DefaultExecutorRepository) executorRepositoryExtensionLoader.getDefaultExtension();
Field dataField = defaultExecutorRepository.getClass().getDeclaredField("data");
dataField.setAccessible(true);
ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = (ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>>) dataField.get(defaultExecutorRepository);
ConcurrentMap<Integer, ExecutorService> executorServiceConcurrentMap = data.get("java.util.concurrent.ExecutorService");
//获取到默认的线程池模型
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorServiceConcurrentMap.get(9090);
return threadPoolExecutor;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
好了,现在我们知道如何在代码中实时查看Dubbo线程池的信息了,那么接下来要做的就是如何采集这些线程池的数据,并且进行上报,最后将上报存储的数据通过统计图的方式展示出来。下边我们按照采集,上报,展示三个环节来展示数据。
采集数据
在采集数据这块,有两种思路去采集,分别如下:
采用两种不同的模式采集出来的数据,可能会有些差异,下边是两种方式的比对:
统计方式 | 实现难度 | 可能存在的问题 |
---|---|---|
定时任务采集数据 | 简单 | 定时任务执行间隙中的数据无法采集,导致数据失真。 |
请求抵达时采集数据 | 稍为复杂一些 | 在每次请求的时候都需要采集数据,会对性能有一定损耗。 |
通过对实际的业务场景分析,其实第二种方式对应用的性能损耗极微,甚至可以忽略,所以使用这种方式去采集数据的话会比较合适。
下边让我们一起来看看这种方式采集数据的话,该如何实现。
首先我们需要自己定义一个filter过滤器:
package org.idea.dubbo.monitor.core.filter;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.*;
import org.idea.dubbo.monitor.core.DubboMonitorHandler;
import java.util.concurrent.ThreadPoolExecutor;
import static org.idea.dubbo.monitor.core.config.CommonCache.DUBBO_INFO_STORE_CENTER;
/**
* @Author idea
* @Date created in 2:33 下午 2022/7/1
*/
@Activate(group = CommonConstants.PROVIDER)
public class DubboRecordFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
ThreadPoolExecutor threadPoolExecutor = DubboMonitorHandler.getDubboThreadPoolInfo();
//请求的时候趣统计线程池,当请求量太小的时候,这块的数据可能不准确,但是如果请求量大的话,就接近准确了
DUBBO_INFO_STORE_CENTER.reportInfo(9090,threadPoolExecutor.getActiveCount(),threadPoolExecutor.getQueue().size());
return invoker.invoke(invocation);
}
}
并且在dubbo的spi配置文件中指定好它们
dubboRecordFilter=org.idea.dubbo.monitor.core.filter.DubboRecordFilter
当provider加入了这个过滤器以后,若有请求抵达服务端,则会通过这个filter触发采集操作。
package org.idea.dubbo.monitor.core.collect;
import org.idea.dubbo.monitor.core.bo.DubboInfoStoreBO;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Dubbo数据存储中心
*
* @Author idea
* @Date created in 11:15 上午 2022/7/1
*/
public class DubboInfoStoreCenter {
private static Map<Integer, DubboInfoStoreBO> dubboInfoStoreBOMap = new ConcurrentHashMap<>();
public void reportInfo(Integer port, Integer corePoolSize, Integer queueLength) {
synchronized (this) {
DubboInfoStoreBO dubboInfoStoreBO = dubboInfoStoreBOMap.get(port);
if (dubboInfoStoreBO != null) {
boolean hasChange = false;
int currentMaxPoolSize = dubboInfoStoreBO.getMaxCorePoolSize();
int currentMaxQueueLength = dubboInfoStoreBO.getMaxCorePoolSize();
if (corePoolSize > currentMaxPoolSize) {
dubboInfoStoreBO.setMaxCorePoolSize(corePoolSize);
hasChange = true;
}
if (queueLength > currentMaxQueueLength) {
dubboInfoStoreBO.setMaxQueueLength(queueLength);
hasChange = true;
}
if (hasChange) {
dubboInfoStoreBOMap.put(port, dubboInfoStoreBO);
}
} else {
dubboInfoStoreBO = new DubboInfoStoreBO();
dubboInfoStoreBO.setMaxQueueLength(queueLength);
dubboInfoStoreBO.setMaxCorePoolSize(corePoolSize);
dubboInfoStoreBOMap.put(port, dubboInfoStoreBO);
}
}
}
public DubboInfoStoreBO getInfo(Integer port){
return dubboInfoStoreBOMap.get(port);
}
public void cleanInfo(Integer port) {
dubboInfoStoreBOMap.remove(port);
}
}
注意这个采集类只会采集一段时间的数据,然后定期会清空重置。之所以这么做,是希望用这个map统计指定时间内的最大线程数和最大队列数,接着当这些峰值数据被上报到存储中心后就进行清空。
关于DubboInfoStoreCenter对象的定义,我将它放置在了一个叫做CommonCache的类里面,具体如下:
package org.idea.dubbo.monitor.core.config;
import org.idea.dubbo.monitor.core.store.DubboInfoStoreCenter;
/**
* @Author idea
* @Date created in 12:15 下午 2022/7/1
*/
public class CommonCache {
public static DubboInfoStoreCenter DUBBO_INFO_STORE_CENTER = new DubboInfoStoreCenter();
}
因为这里使用了static关键字去修饰DubboInfoStoreCenter,所以在上边的过滤器中,我们才可以直接通过静态类引用去调用它的采集接口。
好了,现在整体来看,我们已经实现了在过滤器中去实时采集线程池的数据,并且将它暂存在了一个Map表中,这个map的数据主要是记录了某段时间内的线程池峰值,供采集器角色去使用。
那么接下来,我们就来看看上报器模块主要做了哪些操作。
上报数据
上报数据前,最重要的就是选择合适的存储组件了。首先上报的数据本身体量并不大,我们可以将采集时间短设置为15秒,那么设计一个上报任务,每隔15秒采集一次dubbo线程池的数据。那么一天的时间就需上报5760次,假设一次上报存储一条记录的话,那么一天下来所需要存储的数据也并不是特别多。
并且存储下来的服务数据实际上也并不需要保留太长的时间,一般存储个一周时间也就足够了,所以最终我选用了Redis进行这方面的存储。
我们实际每次关注的数据字段主要有三个,关于它们的定义我整理成了下边这个对象:
package org.idea.dubbo.monitor.core.bo;
/**
* @Author idea
* @Date created in 7:17 下午 2022/6/29
*/
public class ThreadInfoBO {
private Integer activePoolSize;
private Integer queueLength;
private long saveTime;
public Integer getActivePoolSize() {
return activePoolSize;
}
public void setActivePoolSize(Integer activePoolSize) {
this.activePoolSize = activePoolSize;
}
public Integer getQueueLength() {
return queueLength;
}
public void setQueueLength(Integer queueLength) {
this.queueLength = queueLength;
}
public long getSaveTime() {
return saveTime;
}
public void setSaveTime(long saveTime) {
this.saveTime = saveTime;
}
@Override
public String toString() {
return "ThreadInfoBO{" +
", queueLength=" + queueLength +
", saveTime=" + saveTime +
'}';
}
}
接着会开启一个线程任务,每间隔15秒就会执行一轮上报数据的动作:
package org.idea.dubbo.monitor.core.report;
import com.alibaba.fastjson.JSON;
import org.idea.dubbo.monitor.core.bo.DubboInfoStoreBO;
import org.idea.dubbo.monitor.core.bo.ThreadInfoBO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.idea.dubbo.monitor.core.config.CommonCache.DUBBO_INFO_STORE_CENTER;
/**
* @Author idea
* @Date created in 12:13 下午 2022/7/1
*/
public class DubboInfoReportHandler implements CommandLineRunner {
@Autowired
private IReportTemplate reportTemplate;
private static final Logger LOGGER = LoggerFactory.getLogger(DubboInfoReportHandler.class);
public static ExecutorService executorService = Executors.newFixedThreadPool(1);
public static int DUBBO_PORT = 9090;
@Override
public void run(String... args) throws Exception {
executorService.submit(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(10000);
DubboInfoStoreBO dubboInfoStoreBO = DUBBO_INFO_STORE_CENTER.getInfo(DUBBO_PORT);
ThreadInfoBO threadInfoBO = new ThreadInfoBO();
threadInfoBO.setSaveTime(System.currentTimeMillis());
if(dubboInfoStoreBO!=null){
threadInfoBO.setQueueLength(dubboInfoStoreBO.getMaxQueueLength());
threadInfoBO.setActivePoolSize(dubboInfoStoreBO.getMaxCorePoolSize());
} else {
//这种情况可能是对应的时间段内没有流量请求到provider上
threadInfoBO.setQueueLength(0);
threadInfoBO.setActivePoolSize(0);
}
//这里是上报器上报数据到redis中
reportTemplate.reportData(JSON.toJSONString(threadInfoBO));
//上报之后,这里会重置map中的数据
DUBBO_INFO_STORE_CENTER.cleanInfo(DUBBO_PORT);
LOGGER.info(" =========== Dubbo线程池数据上报 =========== ");
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
}
}
这类要注意下,Dubbo应用的线程池上报任务应当等整个SpringBoot应用启动成功之后再去触发,否则可能会有些许数据不准确性。所以在定义Bean初始化线程的时候,我选择了CommandLineRunner接口。
细心查看代码的你可能会看到这么一个类
org.idea.dubbo.monitor.core.report.IReportTemplate
这个类定义了数据上报器的基本动作,下边是它的具体代码:
package org.idea.dubbo.monitor.core.report;
/**
* 上报模版
*
* @Author idea
* @Date created in 7:10 下午 2022/6/29
*/
public interface IReportTemplate {
/**
* 上报数据
*
* @return
*/
boolean reportData(String json);
}
实现类部分如下所示:
package org.idea.dubbo.monitor.core.report.impl;
import org.idea.dubbo.monitor.core.report.IReportTemplate;
import org.idea.qiyu.cache.redis.service.IRedisService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.util.concurrent.TimeUnit;
/**
* @Author idea
* @Date created in 7:12 下午 2022/6/29
*/
@Component
public class RedisTemplateImpl implements IReportTemplate {
@Resource
private IRedisService redisService;
private static String queueKey = "dubbo:threadpool:info:";
@Override
public boolean reportData(String json) {
redisService.lpush(queueKey + LocalDate.now().toString(), json);
redisService.expire(queueKey + LocalDate.now().toString(),7, TimeUnit.DAYS);
return true;
}
}
这里面我采用的是list的结构去存储这些数据指标,设定了一个过期时间为一周,最终存储到redis之后的格式如下所示:
好了,现在我们已经完成了对线程池的监控,最后只需要设计一个管理台,从缓存中提取上报的数据并且进行页面的展示即可。
实现的逻辑比较简单,只需要定义好统计图所需要的数据结构,然后在controller返回即可,例如下图所示:
最终展现出来的效果如下图:
随着请求dubbo接口的量发生变化,统计图可以展示出dubbo线程池的数据变动情况。如果希望统计图以实时的方式展示数据的话,其实只需要在js中写一个定时调用的函数即可。
这里我是使用的是echart插件做的图表渲染,我选用的是最简单的统计图类型,大家也可以根据自己的具体所需在echart的官网上选择合适的模型进行渲染,下边这是echart的官网地址:
https://echarts.apache.org/examples/zh/index.html
本文由哈喽比特于2年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/H9Y8Z2_ivZl_K0wDgslIjQ
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。