目前各大科技公司都提供了各种云平台服务,对于普通地金融科技公司而言,从传统的内部维护基础架构的日子一去不复返。策略转向了开发面向多平台,多终端的服务开发,其中前端开发倾向于web,后端开发倾向于自包含而且能灵活大量部署的微服务架构。如何从传统的开发风格转变为适应云平台服务的应用开发成为了一个需要攻坚的课题。

采用微服务架构拥有诸多好处,在本文中将不做赘述,可参见微服务架构一文。目前,微服务架构已经从设计逐渐落地,开发者社区逐渐贡献出云生态的组件框架,能够让这一战略目标变为可执行方案。

Spring Cloud微服务框架

Spring家族拥有多个方向的项目,目前最为领先行业的就是Spring Framework, Spring Boot, Spring Cloud三大方向,分别致力于帮助开发者开发系统,简化应用初始搭建,以及实现微服务开发设计。

Spring Cloud提供了非常完整的一套微服务实施方案:

  • 服务发现
  • 分布式配置
  • 客户端负载均衡
  • 服务容错保护
  • API网关
  • 安全
  • 事件驱动
  • 分布式服务跟踪

当然,代码构建微服务只是微服务落地的第一步,为了支持灵活大量部署,微服务需要借助容器技术来快速部署到各个云平台服务提供商的虚拟机上,Docker则是容器技术实现的一个典范,我将在另一篇文章中介绍容器技术。

Spring Cloud常用组件

API网关

Spring Cloud Gateway和Netflix Zuul为所有微服务提供了一个单一入口点。API网关是一个单独的中间件。

Spring Cloud Gateway源码分析

处理流程:

  • 基于webflux容器/Netty通信框架:NIO机制,事件循环监听端口请求
  • 请求的Route Predicate函数式过滤匹配规则
    • HTTP请求信息检查例如:Host,Query, Path, Header, Cookie
  • 请求过滤器Filter处理和转发接收
    • Pre型在请求转发前执行,可以做鉴权,限流等操作
    • Post型过滤器可以对返回数据进行增强处理
    • 下游服务可以为注册中心的地址/预先配置好的节点IP信息

安全——服务验证和授权

Spring Cloud Securty为微服务提供了一种灵活的用户验证机制,和授权模型。其中验证机制可以基于OAuth2.0标准下的OpenID协议完成,而用户服务授权模型则通过OAuth 2.0的token提供。云服务中的所有服务都应该引入服务验证和授权机制来保护内容服务的安全性。

如果需要更多的security功能,可以考虑引入spring security中的功能。本文将着重介绍基于OAuth2.0的微服务安全架构。

微服务中一般在gateway进行验证授权,而在下游微服务中只需要确认请求是经过认证的即可。关于鉴权的详细文章请见这里.

Spring Cloud Netflix

Netflix Eureka服务注册

Eureka是单独的服务组件,保证了A高可用,和P分区容忍性的中间件,与Zookeeper保证的C一致性,P分区容忍性不同,更加适合微服务架构,通过集群保证了高可用的动态服务注册以及心跳感知。Eureka一般和Ribbon放在同一个服务器上,所以在gateway上需要指向lb://abc-service就能保证负载均衡到对应的已注册资源服务中。

Netflix Ribbon负载均衡

Netflix Ribbon 采用拦截器将请求的Url进行负载均衡分发,从而达到内容微服务的负载均衡效果。Ribbon框架并不是单独执行,往往在前端服务中会通过(服务注册中心获取/API网关中写死)得到提供服务的IP地址,所以服务调用方引入Ribbon通过一定的均衡策略动态生成最终访问的IP地址。

  • 全局使用既定的某种负载均衡策略:

    1
    2
    3
    4
    @Bean
    public RandomRule randomRule() {
    return new RandomRule();
    }
  • 通过配置方式灵活配置服务提供者的负载均衡策略

1
2
3
4
5
# 负载均衡策略
# service-provider-name 为调用的服务的名称
service-provider-name:
ribbon:
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule

Ribbon负载均衡策略比较

策略名 策略声明 策略描述 实现说明
BestAvailableRule public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule 选择一个最小的并发请求的server 逐个考察Server:
如果Server被tripped了,则忽略,
再选择其中ActiveRequestsCount最小的server
AvailabilityFilteringRule public class AvailabilityFilteringRule extends PredicateBasedRule 过滤掉那些因为一直连接失败的被标记为circuit tripped的后端server,并过滤掉那些高并发的的后端server(active connections 超过配置的阈值) 使用一个AvailabilityPredicate来包含过滤server的逻辑, 其实就就是检查status里记录的各个server的运行状态
WeightedResponseTimeRule public class WeightedResponseTimeRule extends RoundRobinRule 根据相应时间分配一个weight,相应时间越长,weight越小,被选中的可能性越低。 一个后台线程定期的从status里面读取评价响应时间,为每个server计算一个weight;
Weight的计算也比较简单responsetime 减去每个server自己平均的responsetime是server的权重;
当刚开始运行,没有形成statas时,使用roudrobin策略选择server。
RetryRule public class RetryRule extends AbstractLoadBalancerRule 对选定的负载均衡策略机上重试机制。 在一个配置时间段内当选择server不成功,则一直尝试使用subRule的方式选择一个可用的server
RoundRobinRule public class RoundRobinRule extends AbstractLoadBalancerRule roundRobin方式轮询选择server 轮询index,选择index对应位置的server
RandomRule public class RandomRule extends AbstractLoadBalancerRule 随机选择一个server 在index上随机,选择index对应位置的server
ZoneAvoidanceRule public class ZoneAvoidanceRule extends PredicateBasedRule 复合判断server所在区域的性能和server的可用性选择server 使用ZoneAvoidancePredicate和AvailabilityPredicate来判断是否选择某个server,前一个判断判定一个zone的运行性能是否可用,剔除不可用的zone(的所有server),AvailabilityPredicate用于过滤掉连接数过多的Server。

Netflix Feign动态代理RPC调用

RPC远程调用的一种经典实现,为了让服务调用的代码跟普通方法调用一样方便,可以使用Netflix Feign动态代理被调用的服务接口,并且在底层实际使用HTTPClient进行调用。

Netflix Hystrix服务弹性保证

对于微服务的调用失败需要进行动态的感知,当大量请求失败后需要主动断路避免延迟。而且需要后备方式记录或者进行服务降级。在.NETcore 中对等的实现是steeltoe.

Hystrix熔断机制源码分析

基于注解@HystrixCommand和AOP实现,在方法执行前拦截的动态代理执行。对于有弹性机制需要的节点,需要引入Hystrix进行失败回退方法编写。

Hystrix手写代码示例

  1. 自定义注解 @WuzzHystrixCommand
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface WuzzHystrixCommand {

/**
* 默认超时时间
*
* @return
*/
int timeout() default 1000;

/**
* 回退方法
*
* @return
*/
String fallback() default "";

}
  1. 编写切面类,实现简易的逻辑处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Component
@Aspect
public class WuzzHystrixCommandAspect {
//线程池的处理,基于这个线程池的处理统计可以达到 THREAD 资源限流
ExecutorService executorService = Executors.newFixedThreadPool(10);

//注解切点
@Pointcut(value = "@annotation(com.wuzz.demo.custom.hystrix.WuzzHystrixCommand)")
public void pointCut() {
}

//环绕通知
@Around(value = "pointCut()&&@annotation(hystrixCommand)")
public Object doPointCut(ProceedingJoinPoint joinPoint, WuzzHystrixCommand hystrixCommand) throws InterruptedException, ExecutionException, TimeoutException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
int timeout = hystrixCommand.timeout();
//前置的判断逻辑
Future future = executorService.submit(() -> {
try {
return joinPoint.proceed(); //执行目标方法
} catch (Throwable throwable) {
throwable.printStackTrace();
}
return null;
});
Object result;
try {// 使用 future 来实现超时
result = future.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
future.cancel(true);
// ?
if (StringUtils.isBlank(hystrixCommand.fallback())) {
throw e;
}
//调用fallback
result = invokeFallback(joinPoint, hystrixCommand.fallback());
}
return result;
}

private Object invokeFallback(ProceedingJoinPoint joinPoint, String fallback) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
//获取被代理的方法的参数和Method
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
Class<?>[] parameterTypes = method.getParameterTypes();
//得到fallback方法
try {
Method fallbackMethod = joinPoint.getTarget().getClass().getMethod(fallback, parameterTypes);
fallbackMethod.setAccessible(true);
//完成反射调用
return fallbackMethod.invoke(joinPoint.getTarget(), joinPoint.getArgs());
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
}
  1. 编写测试,调用:
1
2
3
4
5
6
7
8
9
10
11
@WuzzHystrixCommand(fallback = "customFallback", timeout = 3000)
@GetMapping("/custom/hystrix/test")
public String test() {
Map map = new HashMap<>();
map.put("id", 666);
return restTemplate.getForObject(REST_URL_PREFIX + "/hello?id={id}", String.class, map);
}

public String customFallback() {
return "custom 请求被降级";
}

正常得调用是没有问题的,这个时候我们把服务提供方的服务接口里 sleep 3秒来模仿调用超时,在访问接口就会得到降级服务后的返回。

Spring Cloud Alibaba

Nacos 配置中心,负载均衡,服务注册

Nacos服务注册源码分析

  • Nacos服务注册Naming Service源码分析
    * 接受Nacos客户端的API调用注册生成Instance实例
    * 将Instance放入serviceMap中ConcurrentHashMap集合中
    * consitencyService.listen实现数据定期检查
    * 通过Namespace对已注册服务的隔离
    * 定时检查HeartBeat对已注册Instance实例进行检查,更新实例状态
    * 对出现异常的服务进行基于UDP协议推送更新于PushService
    • Nacos服务方客户端注册源码分析
      • Spring Boot自动启动NacosAutoServiceRegistration进行注册调用
      • 监听ApplicationStartedEvent事件调用NacosServiceRegistry.register方法
      • namingService.regiterInstance中创建BeanInfo,定时发送心跳包:executorService.schedule(task, period, unit)
    • Nacos前端客户端服务消费者源码分析
      • 客户端发起订阅请求会定期发起UpdateTask来获得最新地址
      • 客户端也会提供本地EventListener回调实例处理出现异常的服务

Sentinel 弹性限流/熔断模式

Sentinel和Hystrix对比

SH

Seata 分布式事务

分布式事务解决了分布式系统中存储数据(数据库/缓存)一致性问题。

分布式事务解决理论

X/Open分布式事务模型

  • AP: Application 应用程序
  • RM: Resource Manager 资源管理者,数据库
  • TM: Transaction Manager事务管理器/协调者

两段提交协议

  • 准备阶段: TM同之RM准备事务,并告知准备结果
  • 提交/回滚阶段:如果所有RM返回成功则执行提交完成指令,反之执行回滚指令。

存在问题:

  1. 同步阻塞数据库
  2. 容易失败,一个节点失败就回滚
  3. TM单点故障问题,造成RM锁死。
  4. 脑裂问题,二阶段部分提交问题。

三段提交协议

  • CanCommit询问阶段: TM询问是否可以参与事务/超时。
  • PreCommit准备阶段:如果所有RM确认可以,则发起事务,并返回结果/超时。
  • DoCommit提交/回滚阶段:如果均成功提交则发起提交/回滚指令。

改进部分:

  • 超时即失败机制,避免两阶段提交锁死等待问题。
  • 提前确认节点状态

分布式事务解决方案

  • 基于MQ的消息中间件实现TCC (Try-Confirm-Cancel)模型补偿型方案(幂等性实现,最大努力通知机制)
  • 基于Seata模式的分布式事务框架(AT, TCC, Sega 和XA事务模式)

Seata源码分析

Spring Cloud Stream 发布/订阅流处理

Spring Cloud Stream支持消息中间件通信,因而可以支持多种高并发消息发布/消费场景:

Spring Cloud Stream包含如下四个核心部分:

  • Spring Messaging
    • Message, 消息对象,包含消息头和消息体
    • MessageChannel, 消息通道接口,用于接收消息,提供send方法将消息发送至消息通道
    • MessageHandler, 消息处理器接口,用于处理消息逻辑
  • Spring Integration
    • MessageDispatcher:消息分发接口,用于分发消息和添加删除消息处理器
    • MessageRouter: 消息路由接口,定义默认的输出消息通道
    • Filter:消息的过滤注解,用于配置消息过滤表达式
    • Aggregator: 消息的聚合注解,用于将多个消息聚合成一条
    • Splitter: 消息的分割,用于将一条消息拆分成多条
  • Binders 目标绑定器,负责于外部消息中间件系统集成的组件
    • doBindProducer: 为中间件绑定发送消息模块,让中间件能从MessageChannel接受到符合中间件格式的消息
    • doBindConsumer:为中间件绑定接受消息模块,让中间件能够发送符合Spring Message标准的消息到MessageChannel
  • Bindings 绑定生成的桥梁,支持Kafka,RabbitMQ中间件

RocketMQ 分布式消息通信源码分析

消息发送流程源码分析:

MQProducer

除了负责和Spring Cloud服务器中的Messaging集成之外,RocketMQ Binder还负责和MQ中间件集群通信,源码分发布/订阅两部分,分别如下:

  • 使用RocketMQTemplate真正发送MQ消息到中间件
  • 同时创建ConsumerEndpoint和input MessageChannel监听MQ订阅消息,并且负责转发给下游
  • 消息的消费分为顺序消费和并发消费,分别由DefaultMessageListenerOrderly,DefaultMessageListenerConcurrently实现,通过binders的配置设定。

消息订阅流程源码分析:

MQProducer

其中,服务器对消息的接收,是基于注解方式注入到响应的业务方法中的。这就是在业务代码中,不需要为接收信息创建MessageChannel,却能拿到信息体中的反序列化后信息。

RocketMQ 消息使用场景与实现

RocketMQ 顺序消息实现

顺序发送消费场景:订单创建、支付、退款流程处理,数据库BinLog信息消费等等。

  • 顺序发送需要将消息发送到同一队列即可,通过基于消息ID的哈希分队选择器即可完成。
  • 顺序消费需要binders中配置好ConsumerMQ的集群消费模式,即每条消息只会被ConsumerGroup中的一个Consumer消费。通过Consumer拿Broker独占锁实现。消费成功后会提交并更新消费进程,避免重复消费。代码如下:
1
2
3
4
5
6
7
8
9
10
11
try{
this.processQueue.getLockConsume().lock();
if(this.processQueue.isDropped()){
break;
}
status = messageListener.consumeMessage(Collections.unmodifieableList(msgs), context);
}catch(Throwable e){
hasException - true;
}finally{
this.processQueue.getLockConsume().unlock();
}

RocketMQ 普通消息发送实现

普通消息在队列选择可以由两种机制:

  • 轮询机制:轮流使用每个队列发送消息
  • 故障规避机制:

RocketMQ 消息并发消费实现

并发消费场景下,消息队列允许Consumer的线程消费池可以向同一个队列消费信息,并且每个消费线程消费信息会有自己的进度信息。

RocketMQ 分布式事务消息实现

为分布式事务处理提供了通信基础。

RMQ

Rocket发送事务消息:

Rocket发送事务消息是二次提交的,第一次发送prepare提交到服务器时消息主题会替换为RMQ_SYS_TRANS_HALF_TOPIC。等到本地事务执行完毕以后才进行二次提交,这时会发送给原本消息的topic。

  1. 由producer发送prepare(半消息)给MQ的broker。MQ会把消息记录到本地,然后回复prepare消息状态给producer。

  2. prepare消息发送以后获取发送状态,如果是成功则执行本地业务(本地事务),根据本地事务执行结果手动返回相应状态(RocketMQLocalTransactionState.COMMIT、RocketMQLocalTransactionState.ROLLBACK等)给MQ。

  3. 如果是COMMIT则说明本地事务执行成功,prepare为可提交状态,MQ收到COMMIT消息就会发送给consumer,然后consumer执行本地业务。如果是ROLLBACK则会删除prepare消息。

  4. 如果MQ一直没收到返回状态则会启动定时任务检查本地事务状态

  5. 消费者、生产者的事务各由开发者自己保证。MQ的事务是由MQ保证,这里会根据使用者配置的参数来决定如何执行。

RocketMQ消费模式

  • at-most-once最多一次
  • at-least-once最少一次,RocketMQ通过消费者ACK机制支持至少一次
  • exactly-only-once仅此一次

RocketMQ实现原理

高性能设计

高可用设计

project bootstrap

Demo项目是通过VS2019自带的Addin模板生成。默认debug模式部署在一个o365 dev tenant上。目前已有Github demo项目也可以直接下载,相关链接

Demo 项目原型简述

引入demo项目是一个查看邮件各种属性的ESCPOC项目,在读取一封邮件时,可以点击ESCPOC按钮查看邮件的属性。在启动debug并上传manifest后,点击Ribbon上的MyAddinGroup按钮,显示如下图所示:

ESCPOCDemo

我们的目标POC项目是一个非常简单的发送弹框程序,需要订阅每一封邮件的发送事件itemsend并且根据Web API调用结果显示一个web UI.

ESCPOC.xml 修改剖析

Manifest文件是O365插件加载的关键配置,需要完全符合schema定义的规则才能正确显式UI以及相应的回调API。

  1. Validation
    Manifest 文件具有很强的格式要求,需要运用微软提供的工具对自己的manifest文件进行语法检查。
1
2
3
#install latest version
npm install -g office-addin-manifest
office-addin-manifest validate PATH_MANIFEST_FILE
  1. 创建ItemSend相关配置
    目前微软已经有开源的示例代码于https://github.com/OfficeDev/outlook-add-in-command-demo,可以作为参考。
    1
    <Event Type="ItemSend" FunctionExecution="synchronous" FunctionName="itemSendHandler" /> 

注意配置上下文需要在VersionOverrides 1.1框架下。

O365 环境准备

对于跨平台的O365 Addin,一个合法的O365账号是需要的,而且需要拥有上传插件的权限。在用用MSDN订阅的情况下,可以创建合适的E3等级Office Tenant。例如本文则使用huangsun@sunnyhll.onmicrosoft.com作为测试账号,密码为系统密码。

Web Addin查看可以点击Outlook桌面客户端的Manage Addin按钮,也可以在https://outlook.office365.com/owa/?path=/options/manageapps 链接中看到。可以看到即使安装了很多插件,这些插件却不是实时安装在本地Outlook桌面客户端上的, 而是在需要访问的时候才进行加载执行的。

开启ItemSend事件监听

相关链接.

“My Custom Roles”权限

对于个人O365账号,这里不需要进行权限获取,Microsoft Tenant默认每个客户的SideLoad权限开启。

Web Addin部署

对于Outlook Web Addin, 主要分成两部分部署:

  • Manifest配置文件部署,在Exchange Mailbox 上注册插件
  • 插件服务部署,必须采用https协议,插件服务前后端本身需要在同一域名下。
    部署完成即可进行测试。

Excel Addin原型

Excel Addin属于Office Addin一部分,example link: https://github.com/OfficeDev/Office-Add-in-samples/

本文是Effetive Java一书提出地90条Java代码规范建议,具体内容需要查看相应地书籍内容,本文仅作为索引复习回忆。

创建与销毁对象

  1. 用静态工厂方法代替构造器

  2. 遇到多个构造器参数时要考虑使用构建器(builder)

  3. 用私有构造器或者枚举类型强化Singleton属性

  4. 通过私有构造器强化不可实例化的能力

  5. 优先考虑依赖注入来引用资源

  6. 避免创建不必要的对象

  7. 消除过期的对象引用

  8. 避免使用中介方法和清除方法

  9. try-with-resources优先于try-finally

对于所有对象都通用的方法

  1. 覆盖Equals时请遵守通用约定

  2. 覆盖equals时总要覆盖hashCode

  3. 始终要覆盖toString

  4. 谨慎的覆盖clone

  5. 考虑实现Comparable接口

类和接口

  1. 使用类和成员的可访问性最小化

  2. 要在公有类而非公有域中使用访问方法

  3. 使可变性最小化

  4. 复合优先于集成

  5. 要么设计继承并提供文档说明,要么继承

  6. 接口优于抽象类

  7. 为后代设计接口

  8. 接口只用于定义类型

  9. 类层次优于标签类

  10. 静态成员类由于非静态成员类

  11. 限制源文件为单个顶级类

泛型

  1. 请不要使用原生态类型

  2. 消除非受检的警告

  3. 列表由于数组

  4. 优先考虑泛型

  5. 优先考虑泛型方法

  6. 利用有限限制通配符来提升API的灵活性

  7. 谨慎并用泛型和可变参数

  8. 优先考虑类型安全的异构容器

枚举和注解

  1. 用enum代替int常量

  2. 用实例域代替序数

  3. 用EnumSet代替位域

  4. 用EnumMap代替序数索引

  5. 用接口模拟可扩展的枚举

  6. 注解优先于明明模式

  7. 坚持使用Override注解

  8. 用标记接口定义类型

Lamda和stream

  1. Lamda优先于匿名类

  2. 方法引用优先于Lamda

  3. 坚持使用标准的函数接口

  4. 谨慎使用Stream

  5. 优先选择Stream中无副作用的函数

  6. Stream要优先用Collection作为返回类型

  7. 谨慎使用Stream并行

方法

  1. 检查参数的有效性

  2. 必要时进行保护性拷贝

  3. 谨慎设计方法签名

  4. 慎用重载

  5. 慎用可变参数

  6. 返回零长度的数组或者集合,而不是null

  7. 谨慎返回optional

  8. 为所有导出的API元素编写文档注释

通用编程

  1. 将局部变量的作用域最小化

  2. for-each循环优先传统的for循环

  3. 了解和使用类库

  4. 如果需要精确的答案,请避免使用float和double

  5. 基本类型优先于装箱基本类型

  6. 如果其他类型更适合,则尽量避免使用字符串

  7. 了解字符串连接的性能

  8. 通过接口引用对象

  9. 接口优先于反射机制

  10. 谨慎地使用本地方法

  11. 谨慎地进行优化

  12. 遵守普遍接受的命名惯例

异常

  1. 只针对异常的情况才使用异常

  2. 对可恢复的情况使用受检异常,对编程错误使用运行时异常

  3. 避免不必要地使用受检异常

  4. 优先使用标准的异常

  5. 抛出与抽象对应的异常

  6. 每个方法抛出的所有异常都要建立文档

  7. 在细节消息中包含失败捕获信息

  8. 努力使失败保持原子性

  9. 不要忽略异常

并发

  1. 同步访问共享的可变数据

  2. 避免过度同步

  3. executor,task和stream优先于线程

  4. 并发工具优先于wait和notify

  5. 线程安全性的文档化

  6. 慎用延迟初始化

  7. 不要依赖于线程调度器

序列化

  1. 其他方法优先于Java序列化

  2. 谨慎地实现Serializable接口

  3. 考虑使用自定义地序列化形式

  4. 保护性地编写readOjbect方法

  5. 对于实例控制,枚举类型优先于readResolve

  6. 考虑用序列化代理代替序列化实例

Why Microservice

  • Heterogenious technical stack:
    支持多重技术栈,由于HTTP协议被多种编程语言支持,微服务可以用多种语言实现。
  • Robustness:整个App mesh网络不会因为单一的微服务的奔溃而停止工作。
  • Extensions:可以针对单一微服务进行扩展。
  • Simple deployment:微服务可以进行独立部署,不需要因为单一服务升级而多次部署。
  • Efficiency:微服务小团队开发高效敏捷。
  • More client support:多种前端客户端支持,可以重复利用同一个微服务后端网络。
  • Migration:基于单一微服务迁移的整体升级,阻碍更小。

What is Microservice

微服务就是一些协同工作的小而自治的服务。

How to architect Microservices

微服务特点

  • Loose coupling松耦合

  • High Cohesion高内聚

上下文边界确定

业务或者职能的界限,往往也可以作为微服务架构中的服务边界。不同的业务只能之间的内部细节并不需要相互知晓。同一个事件,在不同的业务职能会有不同的体现,但是在内部细节上却没有相互交叉。

共享和隐藏模型设计

限界上下文的确立,能够帮助确立共享模块,对于处于两个上下文边界上需要共同的消息,可以确立一个共享模块,专门进行信息共享。同时,对于上下文内部的逻辑,也能进行对应的逻辑模块设计,从而完成对于整个上下文的设计。

切勿过早划分

对于一些过早划分的业务产品,警惕由于后期需求用例改变造成多个上下文之间的重叠。比较推荐的是先进行单体设计,不进行划分,在服务稳定之后再进行划分设计迁移,基于以有代码进行划分,比从头开始构建微服务简单得多。

逐步划分上下文

不断对上下文的界限进行迭代,从复杂的共享模型中慢慢抽出简单的共享模型,将嵌套的模块逐步上升到顶层模型,对其中某些紧密关联模型进行共享。当然,模型共享的粒度取决于代码是分开维护还是集中维护,如果是集中维护也未尝不可进行嵌套模型共享。

微服务的集成

微服务之间的通信机制,可以成为微服务的集成。微服务间的API相互调用设计需要注意一下几个方面:

  • 为用户创建接口

    用户上下文会触发一定特定的流程。

  • 共享数据库

    数据库是多个微服务共享的,因为每个微服务都可能对同一个数据库进行修改。

  • 同步与异步通信模式

    两种不同的通信模式中,同步模式会阻塞线程,而异步模式是基于事件响应请求的,能提降低耦合性,提高吞吐量。

  • 编排与协同

    编排和协同是两种管理流程的架构风格。(同步)编排是通过一个中心执行者将每一步的步骤执行,知道获取最终的结果。(异步)协同则是以事件为机制进行执行,执行单元会订阅事件,API调用会触发一个事件,事件订阅者则会自动执行响应的流程。后者会需要监控服务,监控结果要映射到流程中,troubleshooting难度增加,这是一个难题。ATOM是一个符合REST规范的协议可以通过它提供资源feed的发布服务,客户端可以消费该信息。

  • RPC与REST

    远程调用允许进行本地调用,事实上是由某个远程服务器产生。RPC会有一定的技术耦合要求。protocol buffers, Thrift是比较推荐的RPC框架。REST则是RPC的一个替代方案。通过URI对客户端与服务器进行了松耦合。

  • Json与XML

    JSON与XML都是一种有效的数据序列化格式,前者更加流行XML对超媒体控制更加好。

  • API重定向

    API的重定向常常发生在多个API版本共存的时候,当老版本的API准备deco时,需要绞杀者模式将旧的API拦截,选择是否替换成新版本的实现。当旧版本的访问完全消失时,再删除旧的API。

拆分成微服务

  1. 寻找独立的上下文边界——接缝
  2. 拆分数据库表格的混合加载功能,放弃直接利用数据库命令访问上下文边界间的外键,改为由代码(微服务服务API)
  3. 共享的静态数据改由配置维护关联。
  4. 针对共享数据,建立合适的领域进行关联,例如代码中建立客户关系,来维护财务——客户——仓库关系。
  5. 共享表格,对于有大量列信息的表格,可以根据上下文边界将表格拆分成两个。
  6. 自数据库开始进行代码重构。
  7. 事务的边界重构,对于多表的修改操作,需要协同处理错误,或者利用分布式事务处理工具代为管理事务。
  8. 拆分后的数据库在制作报表时会出现问题,因为不再能用sql语言进行表格操作。解决方法可以是主动定期导出表格合并,或者基于修改时间订阅导出行为。

部署微服务

  1. 准备CI系统,流水线,自动化流程
  2. Paas,Docker打包部署

测试

  1. 单元测试
  2. 服务测试(mock/打桩)
  3. E2E测试 —— 消费者驱动的测试
  4. 上线 —— 金丝雀发布法

监控

微服务的监控难度高于单一服务器应用。微服务包含多个服务,而每个服务的实例个数不等。关联标识可以帮忙关联同一个事件服务调用的日志,采用统一标准化的格式能够更快的辅助错误定位。

监控的内容包括CPU,响应时间,以及合理的语义监控(合成事务监控)。

微服务安全

  1. 身份验证和授权

身份雅正确认了登录者的身份。授权机制能够确定登录者可以访问和进行的操作。常见的单点登录(Single Sign-On),企业级标准为SAML和OpenID Connect,前者基于SOAP标准,后者基于OAuth2.0. 身份提供者可以是外部系统或者内部目录服务,如LDAP/AD等。

微服务的身份认证和授权部分可以依托于网关,网关可以作为认证代理,通过网管认证的所有查询和操作都可以发送到微服务集群任意一个实例中处理。微服务内部可以决定身份可以授权的操作。

  1. 服务间的身份验证和授权

服务之间的身份验证是指微服务的各个服务之间的身份验证和授权,一般来说有如下几种方式管理:

  • 在微服务边界内允许一切
  • HTTP(S)基本身份验证(HTTP明文传输认证信息,如果基于SSL认证则需要管理多台服务器之间的自签发证书。)
  • 使用SAML或OpenID Connect认证授权,可以有效的避免中间人攻击,这样每个服务也需要一个身份。
  • 客户端证书(采用客户端X.509证书,通过TLS层协议对服务器验证进行保证。)
  • HMAC(Hash-based Message Authentication Code,请求主题和私有密钥一起被哈希处理后进行发送,服务器使用请求主题和自己私钥副本重建哈希值。如果匹配则接收,防止的中间人攻击。)
  • JWT(JSON Web Token,JWT的原则是在服务器身份验证之后,将生成一个JSON对象并将其发送回用户。当用户与服务器通信时,客户在请求中发回JSON对象。服务器仅依赖于这个JSON对象来标识用户。为了防止用户篡改数据,服务器将在生成对象时添加签名。)
  • API密钥(API密钥是给予某种形式的秘密令牌的名称,该秘密令牌与Web服务(或类似)请求一起提交以便识别请求的来源。密钥可以包括在请求内容的一些摘要中,以进一步验证原点并防止篡改值。)
  1. 数据加密(现存多种算法进行数据加密,密钥可以通过加盐哈希保护)
  2. 深度防御
  • 防火墙
  • 日志
  • 入侵监测
  • 网络隔离
  • 操作系统安全
  • OWASP标准(Open Web Application Security Project开放式Web应用程序安全项目,https://www.owasp.org)

系统设计与组织架构

公司团队的组织架构会影响系统设计。

  • 单地域的团队拥有更加好的灵活性和效率,因此对于异地团队最好的办法是想办法合理拆分,让不同的团队负责不同的松耦合模块。

  • 每个团队需要负责对服务的需求,更改,构建,部署到运维,自治的团队能够很好的激励团队的效率。

  • 小团队规模,少于10个人的团队能够对其所负责的系统整个生命周期负责,技术选择和实现上具有灵活性。当然这个对服务的高效构建部署效率很高,比如利用云服务的Infra来轻松构建CI/CD流程。

规模化微服务

  • 允许故障无处不在,故障永远会在意想不到的时候发生,所以微服务本身需要能够允许故障出现,硬件上也不用为避免故障作特殊设计。

  • 服务平行扩容阈值控制,需要参考一些系统参数,响应时间/延迟,可用性,数据持久性(丢包率)。

  • 当故障出现时,适当的服务功能降级,允许应用能够在其他方面能够正确运行而不是直接返回错误界面。

  • 微服务延迟的影响控制,通常会导致worker线程池的阻塞队列超长,最终线程池没有可用的线程而宕机。合理设置线程池的舱壁bulkhead,在舱位线程用尽后断路该服务,避免因为单个服务的延迟影响导致所有的线程都阻塞于同一个服务。

  • 数据库扩展,当数据库需要服务于高吞吐量服务时,可以通过数据库副本,RDBMS系统,扩展写操作,独立CQRS读写分离系统,等多种方式扩展。

  • 缓存可以优化重复请求,氛围哭护短、代理和服务器端缓存。

  • 自动伸缩,当云管理能够强大到自动调整微服务实例个数,就可以基于当前流量进行自动调整。

  • CAP定理(一致性consistency、可用性availability和分区容忍性partition tolerance)是分布式系统中需要控制好的三个平衡。

  • 服务发现,可以通过DNS服务器进行关联,负载均衡服务器将查询分发到不同的IP服务器上。

  • 动态服务注册,新加入的微服务实例的IP应用信息需要能共享给其他服务,Zookeeper,consul和Eureka等服务可以管理配置管理和服务发现。

  • 文档服务,为API构建合适的文档,Swagger可以很好的自动生成API文档,HAL和HAL浏览器也可以客户端逐步探索API。

  • 自描述系统(UDDI Universal Description Discovery and Integration通用描述发现与集成服务),这个标准能帮助了解哪些服务正在运行。

本文将从计算机底层实现的角度描述Java目前的并发工具的实现细节。

Synchronized实现细节

JVM基于进入和退出Monitor对象来实现方法同步和代码块同步。即使用monitorenter和monitorexit指令实现的。

  • monitorenter指令是在编译后插入到同步代码块的开始位置,而monitorexit是插入到方法结束处和异常处。
  • 线程执行到monitorenter指令时会尝试获取对象所对应的monitor的所有权/对象锁。

Synchronized用的锁是存在与Java对象头里。Java对象头的结构于这里

锁一共有4种状态,级别从低到高分别是:无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态。

偏向锁:

1. 设置:当一个线程获取锁,对象锁从无锁状态变成偏向锁,允许该线程反复拿锁。(已经有identity hashcode的对象不会使用偏向锁)

2. 撤销:当其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,撤销后变成无锁状态,膨胀成轻量级锁。(偏向对象如果需要identity hashcode则会膨胀成重量级锁)

** identity hash code: 未被覆写的 java.lang.Object.hashCode() 或者 java.lang.System.identityHashCode(Object) 所返回的值。

轻量级锁:

1. 设置:JVM在当前线程栈帧中创建用于存储锁纪录的空间,并将对象头的MarkWord复制到锁纪录中。然后尝试用CAS操作将指向锁纪录指针放入MarkWord空间。如果成功,则获得锁,如果失败,则自旋来获取锁。

2. 解锁: CAS操作将Displaced MarkWord
放回到对象头,如果成功,表示没有竞争。如果失败说明存在竞争,即两个线程中解锁线程无法将MarkWord取回,因为另一个自旋线程已经尝试将MarkWord放到自己线程的锁记录中,CAS会发现MW的owner不是自己,膨胀成重量级锁。阻塞所有没拿到锁的线程。

重量级锁:

1. 设置:重量级锁会阻塞拿不到锁的所有线程,减少了自旋带来的CPU开销。一旦升级成重量级锁,将不会降级锁。

原子操作的实现原理

原子操作主要依赖的是计算机的如下几个CPU基本操作:

AtomicFundamentals

为了达到原子操作的目的,CPU会通过如下两种方式确保在某一时刻,只有一个CPU对共享内存中的数据进行写操作:

  1. 通过总线锁保证原子性:CPU会提供一个LOCK #信号,将其他处理器的内存访问请求阻塞住,从而独占共享内存。
  2. 通过缓存所定保证原子性:需要处理器支持,开销较小,CPU修改缓存行进行缓存锁定,那么另一个CPU就不能同时缓存该内存数据,这是通过处理器的缓存以执行机制来保证的。

JVM中是可以通过锁和循环CAS操作来保证该变量的赋值成功的。

其中循环CAS操作可能带来如下问题:

  1. ABA问题,值已经变化,但CAS的旧值比较返回true,解决方法是加入变量版本号。
  2. 循环时间开销大,可以通过JVM支持CPU的pause指令提升效率。
  3. 只能保证单一共享变量的原子性,可以通过AtomicReference方式合并变量成为一个新的对象处理。

锁机制,除了偏向锁,锁本身就是通过循环CAS实现拿/释放锁。

Lock锁实现细节

对比与synchronized中采用的锁, Lock有如下不同之处:

LockSynchronized

以下是Lock的API介绍,本文将展示API的实现细节:
LockAPI

ReentrantLock的实现依赖于Java同步器框架,下面将着重讲解同步器框架的实现。对于公平锁和非公平锁的实现,AQS基本实现都一样,除了在公平锁释放节点是会调用hasQueuedPredecessors()方法判定是否队列中用等待节点,从而保证不会有线程在释放阶段竞争到锁,让线程串行化拿锁。

队列同步器AQS框架实现细节

http://www.cnblogs.com/waterystone/p/4920797.html

类如其名,抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch。

实现细节在另一篇文章已讲述。

同步器本身是一个抽象类,实现了同步器的类可以完成线程同步,包括:同步队列,独占式同步状态获取与释放,共享式同步状态获取与释放及超时获取同步状态。队列同步器使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
abstract class AbstractQueueSynchronizer {

// 标记锁状态, 0是无锁状态,非0则是同步队列有被阻塞节点/线程。
private volatile int state;
//同步状态相关方法
protected final in getState(); // 获取当前的同步状态
protected final void setState(int newState); //设置当前的同步状态
protected final boolean compareAndSetState(int expect, int update); //使用CAS设置当前状态,能够保证状态设置的原子性

//支持重写的方法
protected boolean tryAcquire(int arg);
protected boolean tryRelease(int arg);
protected int tryAcquireShared(int arg);
protected boolean tryReleaseShared(int arg);
protected boolean isHeldExclusive();

//模板方法
public final void acquire(int arg);
public void acquireInterruptibly(int arg);
public boolean tryAcquireNanos(int arg, long nanos);
public void acquireShared(int arg);
public void acquireSharedInterruptibly(int arg);
public boolean tryAcquireSharedNanos(int arg, long nanos);
public boolean release(int arg);
public boolean releaseShared(int arg);
public Collection<Thread> getQueuedThreads();
}

支持重写方法的不同重写可以实现不同的锁,具体如下:
AQSoverride

在实现重写方法可以调用模板方法,具体如下:
AQStemplate

下文将分类别讲述模板方法的实现:

同步队列

同步器内部有一个同步队列(FIFO双向队列)进行同步状态管理。当前线程获取同步状态失败时,同步器会将当前线程及等待状态等信息构造成为一个节点并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把节点中的线程唤醒,使其再次尝试获取同步状态。

AQSQueue

compareAndSetTail(…)方法基于CAS设置尾节点,保证队列更新的线程安全。
而头节点的更新是由获得锁的线程更新的,因此不需要作同步处理,是线程安全的。

1
2
3
4
5
6
7
public class Node {
private int waitStatus;
private Node prev;
private Node next;
private Node nextWaiter;
private Thread thread;
}

具体描述如下图所示:

Node

  1. 当首节点的线程成功地获取了同步状态/锁,该线程会将首节点设置为后继节点,并且将本节点的next引用断开即可。然后线程执行任务,任务完成后唤醒后继节点。具体的节点处理实现由锁的类型(独占/共享)的模板方法Release/ReleaseShared实现。

Dequeue

  1. 当一个线程无法获取到同步状态,会被构造成节点加入到同步队列尾部,CAS设置能保证节点加入过程的线程安全。过程图如图所示。

Enqueue

节点操作由具体的模板方法AcquireQueued/AcquireShared等实现,而节点封装和入队细节如下列代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public abstract class AbstractQueuedSynchronizer {

private Node head;
private Node tail;

private Node addWaiter(Node mode){
Node node = new Node(Thread.currentThread(), mode);
//快速尝试在尾部添加
Node pred = tail;
if( pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)){
pred.next = node;
return node;
}
}
enq(node);
return node;
}

private Node enq(final Node node){
// 死循环保证了节点的添加在有并发冲突的情况也能成功
for(;;){
Node t = tail;
if(t == null){//Must initialize
if(compareAndSetHead(new Node()))
tail = head;
}else{
node.prev = t;
if(compareAndSetTail(t, node)){
t.next = node;
return t;
}
}
}
}
}

独占式同步状态获取与释放

独占式同步状态,是指在同一时刻只能有一个线程成功获取同步状态,锁的获取是排他的,不是共享的。

  1. 独占式同步状态获取流程如图所示:
    Acquire

相关的实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final void acquire(int arg) {
if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

private final boolean acquireQueued(final Node node, int arg){
boolean failed = true;
try {
boolean interrupted = false;
//在同步队列入队完成后,每个节点都在自旋等待,但是只有前驱是头节点的节点(头节点的next节点)才能有终端自旋的可能
for(;;){
final Node p = node.predecessor();
if(p == head $$ tryAcquire(arg)){
//更新头节点
setHead(node);
//断开头节点对后续节点的引用
p.next = null; //help GC
failed = false;
return interrupted;
}
if(shouldParkAfterFailedAcquiredAcquire(p, node) && parkAndCheckInterrupe())
interrupted = true;
}
}finally {
if(failed)
cancelAcquire(node);
}
}

  1. 独占式同步状态释放
    在当前线程获取了同步状态并执行后,需要释放同步状态,从而使得后续的同步节点可以获取同步状态。
1
2
3
4
5
6
7
8
9
10

public final void release(int arg){
if(tryRelease(arg)){
Node h = head;
if(h != null && h.waitStatus != 0)
unparkSuccessor(h);//使用lockSupport去唤醒等待状态的线程。
return true;
}
return false;
}

共享式同步状态获取与释放

共享式获取与独占式获取最主要的区别在与同一时刻能否有多个线程同时获取到同步状态。以文件的读写为例,读操作可以是共享式访问,写操作则是独占式访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public final void acquireShared(int arg){
if(tryAcquireShared(arg) < 0>)
doAcquireShared(arg);
}

private void doAcquireShared(int arg){
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// 自旋获取共享锁
for(;;) {
final Node p = node.predecessor();
if (p == head){
int r = tryAcquireShared(arg);
//获取到了同步状态
if( r >= 0){
setHeadAndPropagate(node, r);
p.next = null;
if(interrupted)
selfInterrupt();
failed = false;
return;
}
}
if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if(failed)
cancelAcquire(node);
}
}

public final boolean releaseShared(int arg) {
//tryReleaseShared通过自旋和CAS保证释放状态成功
if (tryReleaseShared(arg)) {
//释放同步状态后,会唤醒处于等待状态的节点。
doReleaseShared();
return true;
}
return false;
}

独占式超时获取同步状态

超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回true,否则,返回false。具体流程如下:

Workflow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException{
long lastTime = System.nanoTime();
final Node node = addWaiter(Node,EXCLUSIVE);
boolean failed = true;
try {
for(;;){
final Node p = node.predecessor();
//确定时间内获取成功
if(p == head && tryAcquire(arg)){
setHead(node);
p.next = null;
failed = false;
return true;
}
//确定时间内获取失败
if(nanosTimeout <= 0)
return false;
if(shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this nanosTimeout);
long now = System.nanoTime();
// 计算lastTime到现在的睡眠时间,并且更新还应该睡眠的时间
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if(failed)
cancelAcquire(node);
}
}

Condition实现细节

每个Condition对象都包含一个等待队列,Object包含一个AQS队列,两个队列节点是AQS中的Node,并用这个两个队列共同实现了wait/notify功能。

等待

Condition.await()让线程释放锁,构造新节点加入等待队列进入等待状态。返回的前提是重新获取了condition相关联的锁。底层是通过LockSupport的park()方法释放。

通知

Condition.signal()将等待队列中等待时间最长的节点加入同步队列,并用LockSupport.unpark()唤醒该节点。加入同步队列的节点通过tryAcquire()竞争获取锁,获取锁后从await()中返回继续执行。

并发容器实现细节

ConcurrentHashMap实现细节

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。

ConcurrentHashMap

读取HashEntry信息是不需要拿锁,从而保证了并发读的高效性。HashMap中读取Map.Entry<K,V>是需要拿锁的。

1
2
3
4
5
6
static final class HashEntry<K,V> {  
final K key; // immutable field保证链表稳定性,新节点头插入,或者替换已有的值(不能重复value)。
final int hash;
volatile V value; //volatile保证可见性,删除需要O(n)倒序复制被删节点前的list。
final HashEntry<K,V> next;
}

在Hash中,会进行分段哈希从而保证segment中数组的均匀性,会对hash值的高字段和低字段进行分段处理,前半段获取segment位置,后端确定segment中数组中未知。

  • get()操作不需要枷锁,除非读到的值为空才会加锁重读。
  • put()操作需要对加锁操作。扩容时,只会对某segment中的数组进行扩种。
  • size()操作会两次不加锁计算,当操作数没有变化,则直接返回,如果变化则加锁获取size。

阻塞队列实现原理

线程池并行执行机制Executor

Java线程即是工作单元,也是执行机制。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。应用程序通过Executor框架控制上层的调度,下层的调度由操作系统内核控制,下层调度不受应用程序的控制。
Executor

Executor接收Runnable/Callable<T>接口的实例的任务,返回Future<T>接口的FutureTask<T>实例,当任务完成时,可以获取任务执行结果。

ThreadPoolExecutor (线程池)框架和实现原理

ThreadPoolExecutor可以有三种: FixedThreadPool, SingleThreadExecutor, CachedThreadPool。这三种是不同配置的ThreadPoolExecutor,并非不同子类型。

  • FixedThreadPool:
    FixedThreadPool是使用固定线程数的线程池,适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,适用于负载较重的服务器。
    FixedThreadPool

    1
    2
    3
    4
    public static ExecutorService newFixedThreadPool(int nThreads){
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());//无界队列
    };
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory);
  • SingleThreadExecutor:
    SingleThreadExecutor是单个线程的线程“池”,适用于需要保证顺序执行的各个任务,并且保证没有多个线程活动的应用场景。
    SingleThreadExecutor

1
2
3
4
5
public static ExecutorService SingleThreadExecutor(){
// FinalizableDelegatedExecutorService 增加对gc时停掉线程池的功能
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
};
public static ExecutorService SingleThreadExecutor(ThreadFactory threadFactory);
  • CachedThreadPool:
    CachedThreadPool是大小无界的线程池,适用于执行多的短期异步任务的小程序,适合负载较轻的服务器。
    CachedThreadPool
1
2
3
4
public static ExecutorService CachedThreadPool(){
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());//使用没有容量的工作队列,因为线程数没有限制。
};
public static ExecutorService CachedThreadPool(ThreadFactory threadFactory);

ThreadPoolExecutor实现原理

1
2
3
4
5
6
7
8
9
10
public Class ThreadPoolExecutor{
private int corePool; // 核心线程池大小,当前运行线程数少于此则创建新线程至预热完成。
private int maximumPool; //最大线程池的大小,当前线程数不会超过此,多的任务会挂起在工作阻塞队列中。
private BlockingQueue<Runnable> workQueue; //保存任务的工作队列,只要队列不为空,空闲的线程会获取任务并执行,直到任务队列为空。
private RejectedExecutionHandler handler; // 工作队列饱和时调用的handler

private long keepAliveTime; //空闲线程存活时间
private TimeUnit unit; //空闲时间单位
...
}

ScheduledThreadPoolExecutor (调度线程池)框架实现原理

调度线程池是线程池实现的扩展,主要在给定的延迟后运行任务,或者定期执行任务。
ScheduledThreadPoolExecutor有两种: ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor。

  • ScheduledThreadPoolExecutor:
    ScheduledThreadPoolExecutor是固定个数线程的调度线程池,适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景。

    1
    2
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory);
  • SingleThreadScheduledExecutor:
    SingleThreadScheduledExecutor是单个后台线程执行周期任务,同事需要保证顺序地执行各个任务的应用场景。

    1
    2
    public static ScheduledExecutorService SingleThreadScheduledExecutor();
    public static ScheduledExecutorService SingleThreadScheduledExecutor(ThreadFactory threadFactory);

ScheduledThreadPoolExecutor实现原理

1
2
3
4
5
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor{
private long time; // 任务要被执行的具体时间,当前时间大于等于time值时被执行
private long sequenceNumber; // 任务被添加到Executor时的序号
private long period; // 任务执行的间隔周期
}
  1. 提交任务
    ScheduledThreadPoolExecutor接受实现了RunnableScheduledFuture接口的ScheduledFutureTask实例。通过scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法提交,放在DelayQueue<RunnableScheduledFuture>中。
    ScheduledTask

  2. 执行任务
    DelayQueue封装了一个PriorityQueue, 对队列中的ScheduledFutureTask进行排序,time小的排在前面。如果time值相同,则sequenceNumber小的在前面。

  3. 放回任务
    对于执行过的周期任务,time值被重置为下一次的执行时间,并且将task放回DelayQueue中。

DelayQueue部分实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class DelayQueue<E extends Delayed>{
private ReentrantLoc lock;
private PriorityQueue<E> q;
private Thread leader;
private Condition available;

public E take(){
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 处理队列需要获得锁
try {
for(;;){
E first = q.peek();// 线程锁住等待直到有task出现
if( first == null) {
available.await();
}else {
long deplay = first.getDelay(TimeUnit.NANOSECONDS);
if(delay > 0){
long tl = available.awaitNanos(delay); // wait nanoseonds for task to be ready to execute.
}else {
E x = q.poll(); // when task is ready, take the task and do
assert x != null;
if( q.size() != 0)
available.signalAll(); //release condition
return x;
}
}
}
} finally {
lock.unlock();
}
}

public void offer(E e){
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
q.offer(e); // 拿到锁后放入这个queue
if(first == null) || e.compareTo(first) < 0)
available.singalAll(); // 检查task是否到期可以执行,如果ready触发take的线程。
return true;
} finally {
lock.unlock();
}
}

}

ForkJoinPool 框架实现原理

ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,前者用于存放程序提交的任务,ForkJoinWorkerThread数组负责执行任务。

算法执行分两段,fork阶段将任务分割到足够小,创建/唤醒一个工作线程执行;join阶段将任务的结果收集合并结果得到最后的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class ForkJoinTask<V> {
public final ForkJoinTask<V> fork() {
((ForkJoinWorkerThread) Thread.currentThread()).pushTask(this);
return this;
}

public final V join() {
if(doJoin() != NORMAL)
return reportResult();
else
return getRawResult();
}

private V reportResult(){
int s; Throwable ex;
if((s = status) == CANCELLED)
throw new CancellationException();
if(s == EXCEPTIONAL && (ex = getThrowableException()) != null)
UNSAFE.throwException(ex);
return getRawResult();
}

private int doJoin(){
Thread t; ForkJoinWorkerTHread w; int s; boolean completed;
if((t = Thread.currentThread()) instanceof ForkJoinWorkThread) {
if (( s = STATUS) < 0)
return s;
if( ((w = (ForkJoineWorkerThread)t).unpushTask(this))) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion( rex);
}
if(completed)
return setCompletion(NORMAL);
}
return w.joinTask(this);
}
else
return exeternalAwaitDone();
}
}

public class ForkJoinWorkerThread{
final void pushTask(ForkJoinTask<?> t) {
ForkJoinTask<?>[] q; int s, m;
if((q = quque) != null) {
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
UNSAFE.putOrderObject(q, u, t);
queueTop = s + 1;
if ((s -= queueBase) <= 2)
pool.singalWork();
else if (s == m)
growQuque();
}
}
}

异步执行机制

FutureTask实现原理

FutureTask实现了Future, Runnable接口。是Executor的执行任务单元,也可以由调用线程直接执行FutureTask.run()。FutureTask的get方法能阻塞当前线程,等待任务执行结果再执行下文。

FutureTask通过内部聚合的AQS的子类实现完成FUtureTask的获取和释放操作。队列中的每个FutureTask实例的get方法并不以一定需要在run方法之前执行,get方法会阻塞调用线程直到run方法被执行完成。每个任务只会执行一次,并且会有定义好的执行顺序,从get方法调用线程恢复执行上下文,从而达到异步调用的效果。类似.NET中的beginInvoke和endInvoke方法。
Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class FutureTask<V> implements Future<V>, Runnable{
private Sync sync;

public void run() {
...
Callable.call();
...
AQS.compareAndSetState(int expect, int update);
...
AQS.releaseShared(int arg);
...
FutureTask.done();
}
public boolean cancel();
public void get() {
...
AQS.acquireSharedInterruptily(int arg);
...
};

class Sync extends AbstractQueuedSynchronizer {
public V innerGet();
public void innerRun();
public boolean innerCancel();
}
}

CompletableFuture实现原理

CompletableFuture实现了Future,CompletionStage接口,后者接口增加了更多对任务流程的控制接口,可以直接通过接口实现回调函数的定义。

What is Observable

Observable利用观察者模式,建立了发布者publisher和订阅者subscriber之间的联系。联系本身不会修改发布的信息,只是定义声明了订阅者对发布信息的处理方式。

使用目的:

  1. 流式处理本身是延迟执行的,即在需要数据的一刻进行处理,并且不改变数据本身而是生成一个新的流。多流处理的支持。

  2. 异步回调保证了流式处理结果的实时渲染,因此Observable是高性能前端的重要概念。可以将同步方法转换成异步方法。

  3. 支持事件处理,动态注册事件及句柄。

创建Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import { Observable} from 'rxjs';
// subscriber/observer is callback functions for this observable.
// {next, error, complete}
const observable = new Observable(subscriber => {
// observable can decide how frequency next/error/complete callback is called.
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);

setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);

return {unsubscribe() {}};
})

传入Observable回调函数/创建一个subscriber

1
2
3
4
5
observable.subscribe({
x => console.log('got value ' + x),
err => console.error('got error ' + err),
() => console.log('done')
})

创建multicast Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
function multicastSequenceSubscriber(){
const observers = [];

let timeoutId;

return (observer) => {
observers.push(observer);
// start the sequence when firstly subscribed.

if( observers.length === 1) {
timeoutId = doSequence({
next(val) {
// 将所有observer封装成一个observer
observers.forEach(obs => obs.next(val));
},

complete() {
observers.slice(0).forEach( obs => obs.complete());
}
}, seq, 0)
}

return {
unsubscribe() {
//remove observer
observers.splice(observers.indexOf(observer), 1);
// cancel subscription if no observers.
if(observers.length === 0){
clearTimeout(timeoutId);
}

}
}
}
}

const multicastObservable = new Observerble(multicastSequenceSubscriber())

// subscribe to this observable
multicastObservable.subscribe({
next(num) { console.log(num)}
complete(){}
})

multicastObservable.subscribe({
next(num) { console.log('second: ' + num)}
complete(){}
})

Observable关系操作符

Area Operator
Creation from, fromEvent, of
Comibination combineLatest,concat, merge, startWith, withLatestFrom, zip
Filtering debounceTime, distinctUtilChanged, filter, take, takeUtil
Transformation bufferTime, concatMap, map, mergeMap, scan, switchMap
Utility tap
Multicasting share

Observable的关系操作符,是通过.pipe()引入的,更多的操作符,可以参考rxjs官方API文档here

Using observables in Angular

  • EventEmitter
  • HTTP模块处理AJAX requests
  • Router, Forms 模块监听/响应用户输入

Observable v.s. Promise

  • 共同点
方面 细节
异步处理 subscribe/then callback
使用语言 支持typescript, javascript
支持HTTP 通过Angular httpClient支持, observable.toPromise()
  • 不同点
方面 细节
使用场景 O: 事件处理句柄, 流式处理; P: 链式处理
操作符复杂度 O:多样化; P:单一

Reactive Programming

使用异步数据流进行编程,通过代码来忠实反映业务之间的关系。

HTTP 协议

HTTP协议属于应用层协议,其设计目的是为了方便文本内容的分享与发布。HTTP协议是Web资源共享的基础,与URL(文档地址定位符),HTML共同构建了Web世界。

HTTP协议属于点到点通信协议,请求访问资源的一端称为客户端,提供资源相应的一端称为服务器。

HTTP协议方法

  • GET:请求访问资源。
  • POST:传输主体内容。
  • PUT:传输内容,幂等传输。
  • HEAD: 获得报文首部,用于确认资源有效性和更新日期。
  • DELETE: 删除内容。
  • OPTIONS: 询问资源支持方法。
  • TRACE:追踪路径,用于计算客户端到服务器端的跳数。
  • CONNECT: 要求用socket协议连接代理。

HTTP方法本身是无状态协议,为了支持复杂有状态场景,引入了Cookie技术,确保认证客户端通信的上下文是连续的。

HTTP状态码

类别 原因短语
1XX Informational(信息性状态码) 接收的请求正在处理
2XX Success(成功状态码) 请求正常处理完毕
3XX Redirection(重定向状态码) 需要进行附加操作以完成请求
4XX Client Error(客户端错误状态码) 服务器无法处理请求
5XX Server Error(服务器错误状态码) 服务器处理请求出错

HTTP协议架构中的转发功能节点

  • 代理:代理服务器的基本行为是接收客户端发送的请求转发给其他服务器,不改变请求URI。分为两类:
    1. 缓存代理:缓存内容。
    2. 透明代理:纯粹内容转发,不做缓存。
  • 网关:网关为通信链路上的服务器提供非HTTP协议连接,例如与信用卡结算系统联动,数据库联动等。
  • 隧道:隧道的建立能够确保客户端与服务器之间安全的通信。也能扩展HTTP通信协议,例如支持推送功能等等。

HTTPS

HTTP协议的缺点:

  1. 明文信息会被窃听;
  2. 无法验证通信方身份,会被伪装;
  3. 无法验证报文的完整性,会被篡改。

HTTP over SSL/TSL

SSL/TSL协议独立与HTTP协议,存在于HTTP和TCP协议之间,在建立HTTP连接之前,先建立SSL通信机制,交换加密密钥,从而在HTTP通信报文可以进行加密传输。

HTTPS 协议通信握手/分手协议步骤:

HTTPS

握手主要分为4个阶段:

  1. SSL证书请求,及公钥获取。<= 由于非对称加密机制的效率较低,安全性高,只用作密钥交互。
  2. 客户端密钥加密发送。
  3. 服务器端密钥确认。
  4. 数据传输… <= 数据传输阶段的加密是基于协商确定的对称密钥发送,效率较高。

WebSocket协议

Web浏览器支持的全双工通信协议,在HTTP部首加入upgrade:websocket字段,服务器通过返回状态码101 swtiching protocols响应连接,一旦服务器与客户端建立WebSocket协议的通信连接,之后所有的通信都用这个专用协议进行。

  • 推送功能: 服务器可以直接发送数据到客户端。
  • 减少通信量:保持连接的状态减少HTTP协议的连接开销。

用户认证问题

HTTP/1.1常见的认证方式如下:

  • BASIC认证:直接提交用户名密码完成认证。
    BASIC
  • DIGEST认证:客户端根据服务器端的质询码生成响应码完成认证。
    DIGEST
  • SSL客户端认证:凭借HTTPS的客户端认证证书完成认证。
  • FormBase认证:在用户登录信息以表单形式提交后,服务器端发放Session ID用于用户认证状态的绑定和保持。
    FORM
  • Kerberos认证:
  • NTLM认证:
  • SSO认证:

Web安全问题

  • 跨站脚本工具XSS:通过执行非法HTML标签/JS脚本进行攻击。
    1. 表单中插入非法标签执行JS脚本。
    2. 基于用户Cookie的窃取攻击
  • SQL注入攻击:通过加入SQL结束符,强制加入SQL语句执行。
  • OS命令攻击:通过Shell脚本注入执行系统命令。
  • HTTP部首注入攻击:通过访问URL加入换行符,注入HTTP首部字段进行攻击。
    1. Cookie设置字段生效。
    2. HTTP响应截断攻击,强制显示伪造内容。
  • 邮件首部注入攻击:
    1. BCC邮件泄露。
  • 目录遍历攻击: 访问无疑公开的文件目录。
  • 远程文件包含漏洞:引入其他包,例如System包进行代码污染。

本文描述了当前主流的Java Xml libraries,以及对比区别。

下文摘取自https://www.baeldung.com/java-xml-libraries

  • SAX: It is an event based parsing API, it provides a low level access, is memory efficient and faster than DOM since it doesn’t load the whole document tree in memory but it doesn’t provide support for navigation like the one provided by XPath, although it is more efficient it is harder to use too.

  • DOM(DOM4J and JDOM): It as model based parser that loads a tree structure document in memory, so we have the original elements order, we can navigate our document both directions, it provides an API for reading and writing, it offers XML manipulation and it is very easy to use although the price is high strain on memory resources.

  • StAX: It offers the ease of DOM and the efficiency of SAX but it lacks of some functionality provided by DOM like XML manipulation and it only allows us to navigate the document forward.

  • JAXB: It allows us to navigate the document in both directions, it is more efficient than DOM, it allows conversion from XML to java types and it supports XML manipulation but it can only parse a valid XML document.

JavaXmlLibs

This is a guide for how to develop Office 365 cross platform addin.

Front end UI framework

Microsoft has provided offical Office UI framework: Office Fabric UI. The office site is at https://developer.microsoft.com/en-us/fabric. The are three directions of the UI project future roadmap. All the three projects are implementation of UI components, demos are available at https://developer.microsoft.com/en-us/fabric#/components.

  1. Fabric React (Official support)
  2. AngularJS (Community version, no updates)
  3. Fabric iOS
  4. Fabric JS (Stop support from MSFT)

New UI design sytem is called fluent, the website is at https://www.microsoft.com/design/fluent/. It can help let developer know how to design an Windows style application UI and the guide of UWP web app design.

Fabric.js is an open source js framework using canvas as the basic style of UI components, the official site is at http://fabricjs.com/.

Directly reuse existing Microsoft designed and implemented components is very convinient as long as your application can work well with these codes.

Except Windows style UI frameworks, there are other frameworks using morden components:

Front end Office.js framework

Office.js is the JavaScript based Office model provided by Microsoft, there are also Excel-15.js or Outlook-15.js for application respectively.

Office JavaScript API object model

This doc https://docs.microsoft.com/en-us/office/dev/add-ins/develop/office-javascript-api-object-model explains the API model for new O365 addin JS model.

The object is still under active development, there are more and more new features added, reading the documents can get the updated information. As for now, there are below bullet points:

JSAddinFeatures

This is also guide for how to develop Addin with Angular front end. https://docs.microsoft.com/en-us/office/dev/add-ins/develop/add-ins-with-angular2

Office appication has its own object model, so developers can read the related documents for detailed guide.

Outlook Backend service

Except the general Office JS API model, Outlook has its own API. The development guide is at https://docs.microsoft.com/en-us/outlook/add-ins/.

Manifest version and corresponding clients support

ClientSupport

Troubleshooting manifest file here

Outlook Web Addin Catalogs

Outlook Addin has its own concepts and there are many aspects:

OutlookCatalog

Sample Addins for Outlook

https://developer.microsoft.com/zh-cn/outlook/gallery/?filterBy=Outlook,Samples,Add-ins

Other Office Web Addin Demos

Git 基本操作

创建Git仓库

1
git init

查看代码情况

1
2
git status
git diff {filepath}

提交代码

1
2
3
4
git clone {clone.git}
git add -A
git commit -m "message"
git push orgin master

保存/恢复临时修改文件

1
2
git stash
git checkout stash@{0} -- {filepath}

读取文件历史版本

1
git checkout {commithash} -- {filepath}

创建标签

1
2
3
git tag -a v1.0  # tag most recent commit
git tag -a v0.9 85fc7e7 # tag on a specific commit
git tag # view tags

Pull/Fork工作流

  1. fork from the github
  2. clone to local and commit changes
    1
    2
    3
    4
    5
    6
    git clone {clone.git}
    git checkout -b feature
    # make changes to this branch
    git add -p
    git commit -m "feature message"
    git push origin feature
  3. rebase commits after remote master
    1
    2
    3
    4
    5
    6
    7
    git remote add upstream {remote.git}
    git pull upstream master
    # no changes on master, then automatically git history is updated
    git checkout feature
    git pull --rebase origin master
    # apply changes from origin master to feature newly commits
    git push origin feature --force
  4. raise pull request for code review

提交历史管理

缩减冗余commit log

  • 通过rebase修改commit history
1
2
3
# rewrite last 10 commit logs
git rebase -i HEAD~10 feature
git push orign feature --force
  • 通过squash命令压缩commit history
1
2
3
4
5
6
7
8
git reset --hard HEAD~12 # 将git指针定位到历史版本

git merge --squash HEAD@{1} # 从该点开始merge到最新的版本

git commmit -m "xxx" # 提交commit重写

git push origin master --force # 强制改写历史

分支管理

创建新分支

在创建新的local分支时,也可以添加commit hash告诉git分支最新的HEAD指向

1
2
git checkout -b feature
# create a branch based on current branch

切换分支/从Detached HEAD切换

1
git checkout feature

删除本地分支

1
2
git branch -d feature
git branch -D feature # force delete

删除远程分支

1
git push orgin --delete feature

合并分支

1
2
git merge feature
# merge changes from feature branch to current master branch

CherryPick别的分支commit/branch

在需要apply的分支上保证没有unstaged change,运行如下命令

1
git cherry-pick <commitHash>/<feature branch name>
0%