190108-Quick-Fix 如何优雅的实现应用内外交互之接口设计篇

如何实现应用内外交互,是Quick-Fix框架的核心之一,我们常见的应用有提供web服务的(如Spring应用),有进行大数据计算的(如Storm应用),有提供rpc的后台服务(如通过dubbo提供rpc的数据服务),有纯jar服务等;基本上我们可以划分为两类

  • 应用本身,有一套健全的与外界交互的机制(这里不包括db/redis等数据的读写)
  • 应用只关注自己的服务功能(接收数据,产生数据,保存数据),本身不与第三方的应用进行交互

针对上面这两种case,我们应该怎么来设计一套应用内外交互的方案,来实现接收外部请求,执行应用内部方法或访问应用内部数据,并返回结果的目的?

190104-Quick-Fix 纯Jar应用及扩展手册

目前Quick-Fix框架提供了两种类型,三中不同场景下的Fixer,一种是以Jar方式启动的,一个是基于Spring生态体系玩法的,下面主要介绍这jar方式,如何使用QuickFix来实现应用内服务调用和数据订正

180807-Quick-Task 动态脚本支持框架之Groovy脚本加载执行

Quick-Task 动态脚本支持框架之Groovy脚本加载执行

上一篇简答说了如何判断有任务动态添加、删除或更新,归于一点就是监听文件的变化,判断目录下的Groovy文件是否有新增删除和改变,从而判定是否有任务的变更;

接下来的问题就比较明显了,当任务变更之后,就需要重新加载任务了,即如何动态的编译并执行Groovy文件呢?

相关系列博文:

180729-Quick-Task 动态脚本支持框架之任务动态加载

Quick-Task 动态脚本支持框架之任务动态加载

前面几篇博文分别介绍了整个项目的基本架构,使用说明,以及整体框架的设计与实现初稿,接下来则进入更细节的实现篇,将整个工程中核心实现捞出来,从为什么这么设计到最终的实现给予说明

相关系列博文:

180723-Quick-Task 动态脚本支持框架之结构设计篇

logo

文章链接:https://liuyueyi.github.io/hexblog/2018/07/23/180723-Quick-Task-动态脚本支持框架之结构设计篇/

Quick-Task 动态脚本支持框架之结构设计篇

相关博文:

前面两篇博文,主要是整体介绍和如何使用;接下来开始进入正题,逐步剖析,这个项目是怎么一步一步搭建起来的;本篇博文则主要介绍基本骨架的设计,围绕项目的核心点,实现一个基础的原型系统

180719-Quick-Task 动态脚本支持框架之使用介绍篇

logo

Quick-Task 动态脚本支持框架之使用介绍篇

相关博文:

QuickTask这个项目主要就是为了解决数据订正和接口验证不方便的场景,设计的一个及其简单的动态脚本调度框架,前面一篇整体介绍篇博文,主要介绍了这是个什么东西,整体的运行原理,以及一些简单的使用demo

本篇博文将主要放在应用场景的探讨上,在实际的项目环境中,可以怎么用

180702-QuickTask动态脚本支持框架整体介绍篇

logo

Quick-Task 动态脚本支持框架整体介绍篇

一个简单的动态脚本调度框架,支持运行时,实时增加,删除和修改动态脚本,可用于后端的进行接口验证、数据订正,执行定时任务或校验脚本

本项目主要涉及到的技术栈:

  • groovyEngine (groovy脚本加载执行)
  • commons-io (文件变动监听)

180701-计数时间窗口数据结构的设计

logo

相关博文:

维持计数时间窗口数据结构的设计

I. 背景

有这么一个场景,我需要维护每个商品在3min, 10mn, 1h, 6h, 24h这几个时间窗口内的贸易总额,那么可以怎么实现?

问题拆解:

  • 每个商品,对应多个时间窗口
  • 商品可能很多,因此时间窗口的量可能很大
  • 时间窗口的数据为总数,因此不需要记录每个订单的情况,只需要维护一个交易总额即可
  • 热点数据,可能每分钟都有很多的成交
  • 冷门数据,可能一天都没有几笔成交

180628-动态任务执行框架想法篇

数据订正带来的动态任务执行框架的想法

I. 背景

对于后端而言,数据订正可算是非常非常频繁且常见的事情了,常见的有DB、缓存、内存等数据源中的数据订正,对于非应用内存而言,其他有实体或者可以直接通过官方的提供的控制台连接进行修改的数据订正,相对比较简单,而对于应用内存,如果没有应用内通知并处理相关逻辑,多半就只能重启应用来实现刷新内存缓存了

当然我这里说的也不是内存数据更新,最近遇到的一个问题就是redis缓存中的数据有问题,需要订正,而并不是简单的把数据删了就行,需要根据某些数据,做一些计算,然后得出新的数据,并写回到缓存

这样看来好像也不太麻烦,如果没有第三方依赖,大不了写个python脚本或者php脚本,重新算一下,也没什么毛病

然而实际情况却并不是这样,问题有以下几点:

  • 数据经过ProtoBuf进行编码存入redis,反序列化是个问题
  • 数据计算有依赖外部服务,如只能通过rpc调用第三方接口,而rpc框架没有提供php或python的sdk

基于此,就想也米有办法,可以直接搞一个项目,可以执行Groovy脚本,在Groovy脚本中实现数据订正逻辑?需求如下

  • 支持Groovy脚本的动态更新(支持动态新增,删除和修改脚本)
  • Groovy脚本可友好的访问我们需要的外部资源

180621-一个简单的时间窗口设计与实现

如何设计一个计数的时间窗口

时间窗口,通常对于一些实时信息展示中用得比较多,比如维持一个五分钟的交易明细时间窗口,就需要记录当前时间,到五分钟之前的所有交易明细,而五分钟之前的数据,则丢掉

一个简单的实现就是用一个队列来做,新的数据在对头添加;同时起一个线程,不断的询问队尾的数据是否过期,如果过期则丢掉

另外一中场景需要利用到这个时间窗口内的数据进行计算,如计算着五分钟交易中资金的流入流出总和,如果依然用上面的这种方式,会有什么问题?

  • 如果时间窗口大,需要存储大量的明细数据
  • 我们主要关心的只有资金流入流出;存这些明细数据得不偿失
  • 每次新增or删除过期数据时,实时计算流入流出消耗性能

针对这种特殊的场景,是否有什么取巧的实现方式呢?

熔断Hystrix使用尝鲜

熔断Hystrix使用尝鲜

当服务有较多外部依赖时,如果其中某个服务的不可用,导致整个集群会受到影响(比如超时,导致大量的请求被阻塞,从而导致外部请求无法进来),这种情况下采用hystrix就很有用了

出于这个目的,了解了下hystrix框架,下面记录下,框架尝新的历程

I. 原理探究

通过官网和相关博文,可以简单的说一下这个工作机制,大致流程如下

首先是请求过来 -> 判断熔断器是否开 -> 服务调用 -> 异常则走fallback,失败计数+1 -> 结束

下面是主流程图

流程图

1
2
3
4
5
6
7
8
9
10
graph LR
A(请求)-->B{熔断器是否已开}
B --> | 熔断 | D[fallback逻辑]
B --> | 未熔断 | E[线程池/Semphore]
E --> F{线程池满/无可用信号量}
F --> | yes | D
F --> | no | G{创建线程执行/本线程运行}
G --> | yes | I(结束)
G --> | no | D
D --> I(结束)

熔断机制主要提供了两种,一个是基于线程池的隔离方式来做;还有一个则是根据信号量的抢占来做

线程池方式 : 支持异步,支持超时设置,支持限流

信号量方式 : 本线程执行,无异步,无超时,支持限流,消耗更小

基本上有上面这个简单的概念之后,开始进入我们的使用测试流程

II. 使用尝鲜

1. 引入依赖

1
2
3
4
5
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.12</version>
</dependency>

2. 简单使用

从官方文档来看,支持两种Command方式,一个是基于观察者模式的ObserverCommand, 一个是基本的Command,先用简单的看以下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
public class HystrixConfigTest extends HystrixCommand<String> {

private final String name;

public HystrixConfigTest(String name, boolean ans) {
// 注意的是同一个任务,
super(Setter.withGroupKey(
// CommandGroup是每个命令最少配置的必选参数,在不指定ThreadPoolKey的情况下,字面值用于对不同依赖的线程池/信号区分
HystrixCommandGroupKey.Factory.asKey("CircuitBreakerTestGroup"))
// 每个CommandKey代表一个依赖抽象,相同的依赖要使用相同的CommandKey名称。依赖隔离的根本就是对相同CommandKey的依赖做隔离.
.andCommandKey(HystrixCommandKey.Factory.asKey("CircuitBreakerTestKey_" + ans))
// 当对同一业务依赖做隔离时使用CommandGroup做区分,但是对同一依赖的不同远程调用如(一个是redis 一个是http),可以使用HystrixThreadPoolKey做隔离区分
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("CircuitBreakerTest_" + ans))
.andThreadPoolPropertiesDefaults( // 配置线程池
HystrixThreadPoolProperties.Setter()
.withCoreSize(12) // 配置线程池里的线程数,设置足够多线程,以防未熔断却打满threadpool
)
.andCommandPropertiesDefaults( // 配置熔断器
HystrixCommandProperties.Setter()
.withCircuitBreakerEnabled(true)
.withCircuitBreakerRequestVolumeThreshold(3)
.withCircuitBreakerErrorThresholdPercentage(80)
// .withCircuitBreakerForceOpen(true) // 置为true时,所有请求都将被拒绝,直接到fallback
// .withCircuitBreakerForceClosed(true) // 置为true时,将忽略错误
// .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE) // 信号量隔离
.withExecutionIsolationSemaphoreMaxConcurrentRequests(20)
.withExecutionTimeoutEnabled(true)
.withExecutionTimeoutInMilliseconds(200)
.withCircuitBreakerSleepWindowInMilliseconds(1000) //熔断器打开到关闭的时间窗长度
// .withExecutionTimeoutInMilliseconds(5000)
)
);
this.name = name;
}

@Override
protected String run() throws Exception {
System.out.println("running run():" + name + " thread: " + Thread.currentThread().getName());
int num = Integer.valueOf(name);
if (num % 2 == 0 && num < 10) { // 直接返回
return name;
} else if (num < 40) {
Thread.sleep(300);
return "sleep+"+ name;
} else { // 无限循环模拟超时
return name;
}
}
//
// @Override
// protected String getFallback() {
// Throwable t = this.getExecutionException();
// if(t instanceof HystrixRuntimeException) {
// System.out.println(Thread.currentThread() + " --> " + ((HystrixRuntimeException) t).getFailureType());
// } else if (t instanceof HystrixTimeoutException) {
// System.out.println(t.getCause());
// } else {
// t.printStackTrace();
// }
// System.out.println(Thread.currentThread() + " --> ----------over------------");
// return "CircuitBreaker fallback: " + name;
// }

public static class UnitTest {

@Test
public void testSynchronous() throws IOException, InterruptedException {
for (int i = 0; i < 50; i++) {
if (i == 41) {
Thread.sleep(2000);
}
try {
System.out.println("===========" + new HystrixConfigTest(String.valueOf(i), i % 2 == 0).execute());
} catch (HystrixRuntimeException e) {
System.out.println(i + " : " + e.getFailureType() + " >>>> " + e.getCause() + " <<<<<");
} catch (Exception e) {
System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
}
}

System.out.println("------开始打印现有线程---------");
Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
for (Thread thread : map.keySet()) {
System.out.println("--->name-->" + thread.getName());
}
System.out.println("thread num: " + map.size());

System.in.read();
}
}
}

使用起来还是比较简单的,一般步骤如下:

  • 继承 HsytrixCommand
  • 重载构造方法,内部需要指定各种配置
  • 实现run方法,这个里面主要执行熔断监控的方法

写上面的代码比较简单,但是有几个地方不太好处理

  1. 配置项的具体含义,又是怎么生效的?
  2. 某些异常不进入熔断逻辑怎么办?
  3. 监控数据如何获取?
  4. 如何模拟各种不同的case(超时?服务异常?熔断已开启?线程池满?无可用信号量?半熔断的重试?)

3. 实测理解

根据上面那一段代码的删删改改,貌似理解了以下几个点,不知道对误

a. 配置相关

  • groupKey 用于区分线程池和信号量,即一个group对应一个
  • commandKey 很重要,这个是用于区分业务
    • 简单来讲,group类似提供服务的app,command则对应app提供的service,一个app可以有多个service,这里就是将一个app的所有请求都放在一个线程池(or共享一个信号量)
  • 开启熔断机制,指定触发熔断的最小请求数(10s内),指定打开熔断的条件(失败率)
  • 设置熔断策略(线程池or信号量)
  • 设置重试时间(默认熔断开启后5s,放几个请求进去,看服务是否恢复)
  • 设置线程池大小,设置信号量大小,设置队列大小
  • 设置超时时间,设置允许超时设置

b. 使用相关

run方法是核心执行服务调用,如果需要某些服务不统计到熔断的失败率(比如因为调用姿势不对导致服务内部的异常抛上来了,但是服务本身是正常的),这个时候,就需要包装下调用逻辑,将不需要的异常包装到 HystrixBadRequestException 类里

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
protected String run() {
try {
return func.apply(route, parameterDescs);
} catch (Exception e) {
if (exceptionExcept(e)) {
// 如果是不关注的异常case, 不进入熔断逻辑
throw new HystrixBadRequestException("unexpected exception!", e);
} else {
throw e;
}
}
}

c. 如何获取失败的原因

当发生失败时,hystrix会把原生的异常包装到 HystrixRuntimeException 这个类里,所以我们可以在调用的地方如下处理

1
2
3
4
5
6
7
try {
System.out.println("===========" + new HystrixConfigTest(String.valueOf(i), i % 2 == 0).execute());
} catch (HystrixRuntimeException e) {
System.out.println(i + " : " + e.getFailureType() + " >>>> " + e.getCause() + " <<<<<");
} catch (Exception e) {
System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
}

当定义了fallback逻辑时,异常则不会抛到具体的调用方,所以在 fallback 方法内,则有必要获取对应的异常信息

1
2
// 获取异常信息
Throwable t = this.getExecutionException();

然后下一步就是需要获取对应的异常原因了,通过FailureType来表明失败的根源

1
((HystrixRuntimeException) t).getFailureType()

d.如何获取统计信息

hystrix自己提供了一套监控插件,基本上公司内都会有自己的监控统计信息,因此需要对这个数据进行和自定义,目前还没看到可以如何优雅的处理这些统计信息

4. 小结

主要是看了下这个东西可以怎么玩,整个用下来的感觉就是,设计的比较有意思,但是配置参数太多,很多都没有完全摸透

其次就是一些特殊的case(如监控,报警,特殊情况过滤)需要处理时,用起来并不是很顺手,主要问题还是没有理解清楚这个框架的内部工作机制的问题

III. 其他

个人博客: Z+|blog

基于hexo + github pages搭建的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

声明

尽信书则不如,已上内容,纯属一家之言,因本人能力一般,见识有限,如发现bug或者有更好的建议,随时欢迎批评指正

扫描关注

QrCode

7. 报警系统QuickAlarm之默认报警规则扩展

本篇主要是扩展默认的报警规则,使其能更加友好的支持同时选择多种报警方式

扩展遵循两个原则

  • 不影响原有的配置文件格式
  • 简化规则解析复杂度

6. 报警系统QuickAlarm使用手册

本文将主要说明QuickAlarm该如何使用,以及使用时需要注意事项

5. 报警系统QuickAlarm之频率统计及接口封装

前面将报警规则的制定加载解析,以及报警执行器的定义加载和扩展进行了讲解,基本上核心的内容已经完结,接下来剩下内容就比较简单了

  • 报警频率的统计
  • 报警线程池
  • 对外封装统一可用的解耦

4. 报警系统QuickAlarm之报警规则解析

前面两篇分别说了报警执行器和报警规则的定义及用户扩展加载,接下来就是比较核心的一块了,如何将报警规则和报警执行器关联起来,即当发生报警时,应该call哪一个报警执行器

3. 报警系统QuickAlarm之报警规则的设定与加载

前面一篇是报警执行器的定义与加载已经完成,但与之对应的报警规则有是如何定义和加载的呢?

此外,既然命名为规则,那么就需要有对应的解析器,以根据报警规则和报警类型等相关输入条件,来选择对应的报警执行器,因此本文主要包括的内容就比较清晰了

  • 报警规则的定义
  • 报警规则的加载
  • 报警规则的解析以及报警执行器选择

2. 报警系统QuickAlarm之报警执行器的设计与实现

根据前面一篇总纲的博文,将整体结构划分为了四大块,本文则主要目标集中在第一块,报警执行器(AlarmExecute)的设计与加载上了

主要的关注点无外乎 定义-》加载-》实现逻辑三块了:

  • AlarmExecute 的接口定义
  • 如何加载用户自定义的AlarmExecute
  • AlarmExecute的内部实现

1. 报警系统QuickAlarm设计总纲

背景

日常的系统中,报警是不可缺少的一环,目前报警方式很多,最常见的有直接打日志,微信报警,短信报警,邮件报警等;而涉及到报警,一般不可避免的需要提前设置一些基本信息,如报警方式,报警频率,报警用户,开关等;

另外一个常见的问题是一般采用的是单一的报警方式,比如不管什么类型的报警全部都用短信方式触达,然后就会发现手机时常处于被淹没的状态了,久而久之对报警短信就不会敏感了

4. SPI框架实现之旅四:使用测试

使用测试

前面三篇主要是介绍如何设计的,如何实现的,这一篇,则主要集中在如何使用。实现得再好,如果不好用,也白搭

本篇介绍几个简单的使用case,包括静态使用,动态适配,自定义选择器等

3. SPI框架实现之旅三:实现说明

实现说明

前一篇 《SPI框架实现之旅二:整体设计》中,介绍了几个定义的接口,注解;叙述了实现流程;并简单的介绍了 SpiLoader中的部分实现; 本篇则主要介绍SpiLoader类的实现

类图结构如下:

https://static.oschina.net/uploads/img/201705/27183336_TOny.png

2. SPI框架实现之旅二:整体设计

整体设计

上一篇简单的说了一下spi相关的东西, 接下来我们准备开动,本篇博文主要集中在一些术语,使用规范的约定和使用方式

1. SPI框架实现之旅一:背景介绍

背景介绍

SPI的全名为Service Provider Interface,简单的总结下java spi机制的思想。我们系统里抽象的各个模块,往往有很多不同的实现方案,比如日志模块的方案,xml解析模块、jdbc模块的方案等。面向的对象的设计里,我们一般推荐模块之间基于接口编程,模块之间不对实现类进行硬编码。一旦代码里涉及具体的实现类,就违反了可拔插的原则,如果需要替换一种实现,就需要修改代码。为了实现在模块装配的时候能不在程序里动态指明,这就需要一种服务发现机制。 java spi就是提供这样的一个机制:为某个接口寻找服务实现的机制

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×