完善资料让更多小伙伴认识你,还能领取20积分哦, 立即完善>
|
|
相关推荐
2个回答
|
|
Sentinel中的星流控制
Sentinel 1.4.0开始引入了流控模块,Sentinel的流控中,应用分为两个角色
是一种嵌入模式,即Token Server嵌入普通web应用中,在这个模式下同时,多个实例都是对等的token server和client可以随时进行生成,相反是需要限制令牌服务器的总和 QPS,防止影响应用程序。图PS 嵌入模式适用于某些 独立应用程序内部的对应应用程序,架构如下:另一种是模式,即作为独立的令牌服务器进程启动,独立部署,隔离独立模式适合作为全球速率限制器给对应应用程序提供操作流控服务,架构如下 本文会基于嵌入模式,结合Apollo,应用自定义实现流控,直接进入主题 基本概念介绍, 针对流控,Sentinel 多个基本类需要介绍,方便大家理解转中的代码 先贴上一个重要令牌服务器配置的流控,图片来自Sentinel官方维基 先介绍下最的,也就是图片 机器分配的JSON串,也就是Sentinel Dashboard后台的JSON串是Sentinel Dashboard后台时产生的,分配Token Server时的界面 就像图示的serverId:对应选择的TokenServer的及其Sentinel端口 ip:对应的TokenServer的机器ip 端口:对应的TokenServer的提供Token服务的端口 clientSet:TokenClient的机器列表 这个JSON串是实现星流控制的关键,同时需要TokenServer和TokenClient监听到,因此需要配置中点并持久化到配置中心。 所以第一个关键是,需要将TokenServer分配配置传输到Apollo中,具体代码实现两个分别做介绍 几个类,TokenClient和TokenServer端来做介绍 TokenServer端:
ClusterStateManager:设置当前应用为TokenClient,或者TokenServer配置的 基本思路和流程 先根据一个例子来简单说明下实现 想法192.168.0.1,192.168.0.2,192.168.0.3,TokenServer的JSON对应如下,即在Dashboard分配192.168.0.1为TokenServer,其他为TokenClient [ { "clientSet": [ "192.168.0.2@8719", "192.168.0.3@8719", ], "ip": "192.168.0.1", "machineId": "192.168.0.1@8719", "maxAllowedQps": 20000, "port": 18730 } ] 现在的问题来了,应用端都监听到了这个TokenServer的配置
if(currentMachineId == 配置的machineId){ 设置当前应用为TokenServer(ClusterStateManager.registerProperty) 设置TokenServer最大QPS(ClusterServerConfigManager.loadGlobalFlowConfig) 设置TokenServer端口(ClusterServerConfigManager.registerServerTransportProperty) 设置TokenServer根据nameSpace监听集群限流规则的变化(ClusterFlowRuleManager.setPropertySupplier) }else{ // 当前机器在clientSet中 if(clientSet.contains(currentMachineId)){ 设置当前应用为TokenClient(ClusterStateManager.registerProperty) 设置TokenClient链接的TokenServer地址(ClusterClientConfigManager.registerServerAssignProperty) 设置TokenClient获取Token的超时时间(ClusterClientConfigManager.registerClientConfigProperty) } } 所以192.168.0.1的应用启动时,判断192.168.0.1@8719 = confg.machineId,所以分配其为TokenServer,而192.168.0.2和192.168.0.3的应用启动,时中包含192.168.0.2@8719和192.168.0。3@8719,所以其为TokenClient 所以第二个关键点就是客户端初始化代码,实现如上逻辑 Dashboard修改 先来解决第一个关键点,如何将Dboard中的TokenServer配置推送到Apollo中,让应用监听到,直接上代码,修改ClusterGroupEntity ,加入maxAllowedQps字段 新建ClusterMapDataApolloPublisher,TokenServer配置的服务类,代码如下,其中的BaseApolloRulePublisher参考上篇文章实现,不再重复贴代码 @Component("clusterMapDataApolloPublisher") public class ClusterMapDataApolloPublisher extends BaseApolloRulePublisher { @Override public void publish(String app, Object rules) throws Exception { if (rules == null) { return; } List List for (ClusterAppAssignMap clusterAppAssignMap : clusterAppAssignMapList) { ClusterGroupEntity clusterGroupEntity = new ClusterGroupEntity(); clusterGroupEntity.setMachineId(clusterAppAssignMap.getMachineId()); clusterGroupEntity.setIp(clusterAppAssignMap.getIp()); clusterGroupEntity.setPort(clusterAppAssignMap.getPort()); clusterGroupEntity.setClientSet(clusterAppAssignMap.getClientSet()); clusterGroupEntity.setMaxAllowedQps(clusterAppAssignMap.getMaxAllowedQps()); clusterGroupEntityList.add(clusterGroupEntity); } super.publish(app, clusterGroupEntityList); } @Override protected String getDataId() { return ApolloConfigUtil.getTokenServerClusterMapDataId(); } } 那么这个配置又在哪里呢?修改com.alibaba.csp.sentinel.dashboard.service.ClusterAssignServiceImpl#applyAssignToApp方法,加入如下代码 删除配置的地方呢?修改com.alibaba.csp.sentinel.dashboard.service.ClusterAssignServiceImpl#unbindClusterServers,加入如下代码 到这里,TokenServer相关配置就已经传输到Apollo中了,看下我的Apollo中存储结构,key可以自己指定 客户端初始化代码 下一个关键点是应用端本地配置的TokenServer来当前应用是TokenServer还是TokenClient,同样可以根据SPI确定实现中来进行初始化,直接上代码,具体逻辑不懂看上面的伪代码,最核心的就是根据配置的ip端口和当前应用的ip端口做,判断其是TokenServer还是TokenClient /** * 初始化集群限流规则 */ private void initClusterConfig() { //初始化token server端口相关配置 initServerTransportConfigProperty(); //最大token qps配置 initServerFlowConfig(); //为token server添加命名空间动态监听器 initClusterRuleSupplier(); //为每个client设置目标token server initClientServerAssignProperty(); //初始化token client通用超时配置 initClientConfigProperty(); //等待transport端口分配完毕 while (TransportConfig.getRuntimePort() == -1) { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } //初始化客户端状态为client 或者 server initStateProperty(); } |
|
|
|
初始化代码
private void initServerFlowConfig() { ClusterServerFlowConfigParser serverFlowConfigParser = new ClusterServerFlowConfigParser(); ReadableDataSource ApolloConfigUtil.getTokenServerClusterMapDataId(), defaultRules, s -> { ServerFlowConfig config = serverFlowConfigParser.convert(s); if (config != null) { ClusterServerConfigManager.loadGlobalFlowConfig(config); } return config; }); } private void initStateProperty() { // Cluster map format: // [{"clientSet":["112.12.88.66@8729","112.12.88.67@8727"],"ip":"112.12.88.68","machineId":"112.12.88.68@8728","port":11111}] // machineId: ReadableDataSource ApolloConfigUtil.getTokenServerClusterMapDataId(), defaultRules, new ClusterAssignStateParser()); ClusterStateManager.registerProperty(clusterModeDs.getProperty()); } private void initServerTransportConfigProperty() { ReadableDataSource ApolloConfigUtil.getTokenServerClusterMapDataId(), defaultRules, new ClusterTransportConfigParser()); ClusterServerConfigManager.registerServerTransportProperty(serverTransportDs.getProperty()); } private void initClientServerAssignProperty() { // Cluster map format: // [{"clientSet":["112.12.88.66@8729","112.12.88.67@8727"],"ip":"112.12.88.68","machineId":"112.12.88.68@8728","port":11111}] // machineId: ReadableDataSource ApolloConfigUtil.getTokenServerClusterMapDataId(), defaultRules, new ClusterAssignConfigParser()); ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty()); } private void initClientConfigProperty() { ReadableDataSource ApolloConfigUtil.getTokenClientConfigDataId(), defaultRules, source -> JSON.parseObject(source, new TypeReference })); ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty()); } /** * 初始化集群限流规则监听器 */ private void initClusterRuleSupplier() { // Register cluster flow rule property supplier which creates data source by namespace. ClusterFlowRuleManager.setPropertySupplier(namespace -> { ReadableDataSource ApolloConfigUtil.getFlowDataId(), defaultRules, source -> JSON.parseObject(source, new TypeReference
})); return ds.getProperty(); }); // Register cluster parameter flow rule property supplier. ClusterParamFlowRuleManager.setPropertySupplier(namespace -> { ReadableDataSource ApolloConfigUtil.getParamFlowDataId(), defaultRules, source -> JSON.parseObject(source, new TypeReference
})); return ds.getProperty(); }); } 解析配置相关代码如下 public class ClusterAssignConfigParser implements Converter @Override public ClusterClientAssignConfig convert(String source) { if (source == null) { return null; } RecordLog.info("[ClusterClientAssignConfigParser] Get data: " + source); List
}); if (groupList == null || groupList.isEmpty()) { return null; } return extractClientAssignment(groupList); } private ClusterClientAssignConfig extractClientAssignment(List // Skip the token servers. for (ClusterGroupEntity group : groupList) { if (MachineUtils.isCurrentMachineEqual(group)) { return null; } } // Build client assign config from the client set of target server group. for (ClusterGroupEntity group : groupList) { if (group.getClientSet().contains(MachineUtils.getCurrentMachineId())) { String ip = group.getIp(); Integer port = group.getPort(); return new ClusterClientAssignConfig(ip, port); } } // Not assigned. return null; } } public class ClusterAssignStateParser implements Converter @Override public Integer convert(String source) { if (source == null) { return null; } RecordLog.info("[ClusterClientAssignConfigParser] Get data: " + source); List
}); if (groupList == null || groupList.isEmpty()) { return ClusterStateManager.CLUSTER_NOT_STARTED; } return extractMode(groupList); } private int extractMode(List // If any server group machine matches current, then it's token server. for (ClusterGroupEntity group : groupList) { if (MachineUtils.isCurrentMachineEqual(group)) { return ClusterStateManager.CLUSTER_SERVER; } if (group.getClientSet() != null) { for (String client : group.getClientSet()) { if (client != null && client.equals(MachineUtils.getCurrentMachineId())) { return ClusterStateManager.CLUSTER_CLIENT; } } } } return ClusterStateManager.CLUSTER_NOT_STARTED; } } public class ClusterServerFlowConfigParser implements Converter @Override public ServerFlowConfig convert(String source) { if (source == null) { return null; } RecordLog.info("[ClusterServerFlowConfigParser] Get data: " + source); List
}); if (groupList == null || groupList.isEmpty()) { return null; } return extractServerFlowConfig(groupList); } private ServerFlowConfig extractServerFlowConfig(List for (ClusterGroupEntity group : groupList) { if (MachineUtils.isCurrentMachineEqual(group)) { return new ServerFlowConfig() .setExceedCount(ClusterServerConfigManager.getExceedCount()) .setIntervalMs(ClusterServerConfigManager.getIntervalMs()) .setMaxAllowedQps(group.getMaxAllowedQps()) .setMaxOccupyRatio(ClusterServerConfigManager.getMaxOccupyRatio()) .setSampleCount(ClusterServerConfigManager.getSampleCount()); } } return null; } } public class ClusterTransportConfigParser implements Converter @Override public ServerTransportConfig convert(String source) { if (source == null) { return null; } RecordLog.info("[ClusterServerTransportConfigParser] Get data: " + source); List
}); if (groupList == null || groupList.isEmpty()) { return null; } return extractServerTransportConfig(groupList); } private ServerTransportConfig extractServerTransportConfig(List for (ClusterGroupEntity group : groupList) { if (MachineUtils.isCurrentMachineEqual(group)) { return new ServerTransportConfig().setPort(group.getPort()).setIdleSeconds(600); } } return null; } } 缺点及不足 侧面先人提出的大背景 虚拟仪表板发送规则到业务的appId中的某个名称空间,那么TokenServer无法监听多个业务appId中的规则变化, 仪表板发送规则到统一的Sentinel配置appId中,那么业务应用如何同时监听自己业务的appId和SentinelappId呢? 这是一个配置点,暂时我没有好想法,有想法可以私信我啊 |
|
|
|
只有小组成员才能发言,加入小组>>
863 浏览 0 评论
1191 浏览 1 评论
2566 浏览 5 评论
2901 浏览 9 评论
移植了freeRTOS到STMf103之后显示没有定义的原因?
2761 浏览 6 评论
keil5中manage run-time environment怎么是灰色,不可以操作吗?
1202浏览 3评论
214浏览 2评论
486浏览 2评论
399浏览 2评论
M0518 PWM的电压输出只有2V左右,没有3.3V是怎么回事?
482浏览 1评论
小黑屋| 手机版| Archiver| 电子发烧友 ( 湘ICP备2023018690号 )
GMT+8, 2025-1-23 02:21 , Processed in 0.569336 second(s), Total 80, Slave 61 queries .
Powered by 电子发烧友网
© 2015 bbs.elecfans.com
关注我们的微信
下载发烧友APP
电子发烧友观察
版权所有 © 湖南华秋数字科技有限公司
电子发烧友 (电路图) 湘公网安备 43011202000918 号 电信与信息服务业务经营许可证:合字B2-20210191 工商网监 湘ICP备2023018690号