目前各大科技公司都提供了各种云平台服务,对于普通地金融科技公司而言,从传统的内部维护基础架构的日子一去不复返。策略转向了开发面向多平台,多终端的服务开发,其中前端开发倾向于web,后端开发倾向于自包含而且能灵活大量部署的微服务架构。如何从传统的开发风格转变为适应云平台服务的应用开发成为了一个需要攻坚的课题。
采用微服务架构拥有诸多好处,在本文中将不做赘述,可参见微服务架构一文。目前,微服务架构已经从设计逐渐落地,开发者社区逐渐贡献出云生态的组件框架,能够让这一战略目标变为可执行方案。
Spring Cloud微服务框架
Spring家族拥有多个方向的项目,目前最为领先行业的就是Spring Framework, Spring Boot, Spring Cloud三大方向,分别致力于帮助开发者开发系统,简化应用初始搭建,以及实现微服务开发设计。
Spring Cloud提供了非常完整的一套微服务实施方案:
- 服务发现
- 分布式配置
- 客户端负载均衡
- 服务容错保护
- API网关
- 安全
- 事件驱动
- 分布式服务跟踪
当然,代码构建微服务只是微服务落地的第一步,为了支持灵活大量部署,微服务需要借助容器技术来快速部署到各个云平台服务提供商的虚拟机上,Docker则是容器技术实现的一个典范,我将在另一篇文章中介绍容器技术。
Spring Cloud常用组件
API网关
Spring Cloud Gateway和Netflix Zuul为所有微服务提供了一个单一入口点。API网关是一个单独的中间件。
Spring Cloud Gateway源码分析
处理流程:
- 基于webflux容器/Netty通信框架:NIO机制,事件循环监听端口请求
- 请求的Route Predicate函数式过滤匹配规则
- HTTP请求信息检查例如:Host,Query, Path, Header, Cookie
- 请求过滤器Filter处理和转发接收
- Pre型在请求转发前执行,可以做鉴权,限流等操作
- Post型过滤器可以对返回数据进行增强处理
- 下游服务可以为注册中心的地址/预先配置好的节点IP信息
安全——服务验证和授权
Spring Cloud Securty为微服务提供了一种灵活的用户验证机制,和授权模型。其中验证机制可以基于OAuth2.0标准下的OpenID协议完成,而用户服务授权模型则通过OAuth 2.0的token提供。云服务中的所有服务都应该引入服务验证和授权机制来保护内容服务的安全性。
如果需要更多的security功能,可以考虑引入spring security中的功能。本文将着重介绍基于OAuth2.0的微服务安全架构。
微服务中一般在gateway进行验证授权,而在下游微服务中只需要确认请求是经过认证的即可。关于鉴权的详细文章请见这里.
Spring Cloud Netflix
Netflix Eureka服务注册
Eureka是单独的服务组件,保证了A高可用,和P分区容忍性的中间件,与Zookeeper保证的C一致性,P分区容忍性不同,更加适合微服务架构,通过集群保证了高可用的动态服务注册以及心跳感知。Eureka一般和Ribbon放在同一个服务器上,所以在gateway上需要指向lb://abc-service就能保证负载均衡到对应的已注册资源服务中。
Netflix Ribbon负载均衡
Netflix Ribbon 采用拦截器将请求的Url进行负载均衡分发,从而达到内容微服务的负载均衡效果。Ribbon框架并不是单独执行,往往在前端服务中会通过(服务注册中心获取/API网关中写死)得到提供服务的IP地址,所以服务调用方引入Ribbon通过一定的均衡策略动态生成最终访问的IP地址。
全局使用既定的某种负载均衡策略:
1 2 3 4
| @Bean public RandomRule randomRule() { return new RandomRule(); }
|
通过配置方式灵活配置服务提供者的负载均衡策略
1 2 3 4 5
|
service-provider-name: ribbon: NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule
|
Ribbon负载均衡策略比较
| 策略名 |
策略声明 |
策略描述 |
实现说明 |
| BestAvailableRule |
public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule |
选择一个最小的并发请求的server |
逐个考察Server: 如果Server被tripped了,则忽略,再选择其中ActiveRequestsCount最小的server |
| AvailabilityFilteringRule |
public class AvailabilityFilteringRule extends PredicateBasedRule |
过滤掉那些因为一直连接失败的被标记为circuit tripped的后端server,并过滤掉那些高并发的的后端server(active connections 超过配置的阈值) |
使用一个AvailabilityPredicate来包含过滤server的逻辑, 其实就就是检查status里记录的各个server的运行状态 |
| WeightedResponseTimeRule |
public class WeightedResponseTimeRule extends RoundRobinRule |
根据相应时间分配一个weight,相应时间越长,weight越小,被选中的可能性越低。 |
一个后台线程定期的从status里面读取评价响应时间,为每个server计算一个weight;Weight的计算也比较简单responsetime 减去每个server自己平均的responsetime是server的权重;当刚开始运行,没有形成statas时,使用roudrobin策略选择server。 |
| RetryRule |
public class RetryRule extends AbstractLoadBalancerRule |
对选定的负载均衡策略机上重试机制。 |
在一个配置时间段内当选择server不成功,则一直尝试使用subRule的方式选择一个可用的server |
| RoundRobinRule |
public class RoundRobinRule extends AbstractLoadBalancerRule |
roundRobin方式轮询选择server |
轮询index,选择index对应位置的server |
| RandomRule |
public class RandomRule extends AbstractLoadBalancerRule |
随机选择一个server |
在index上随机,选择index对应位置的server |
| ZoneAvoidanceRule |
public class ZoneAvoidanceRule extends PredicateBasedRule |
复合判断server所在区域的性能和server的可用性选择server |
使用ZoneAvoidancePredicate和AvailabilityPredicate来判断是否选择某个server,前一个判断判定一个zone的运行性能是否可用,剔除不可用的zone(的所有server),AvailabilityPredicate用于过滤掉连接数过多的Server。 |
Netflix Feign动态代理RPC调用
RPC远程调用的一种经典实现,为了让服务调用的代码跟普通方法调用一样方便,可以使用Netflix Feign动态代理被调用的服务接口,并且在底层实际使用HTTPClient进行调用。
Netflix Hystrix服务弹性保证
对于微服务的调用失败需要进行动态的感知,当大量请求失败后需要主动断路避免延迟。而且需要后备方式记录或者进行服务降级。在.NETcore 中对等的实现是steeltoe.
Hystrix熔断机制源码分析
基于注解@HystrixCommand和AOP实现,在方法执行前拦截的动态代理执行。对于有弹性机制需要的节点,需要引入Hystrix进行失败回退方法编写。
Hystrix手写代码示例
- 自定义注解 @WuzzHystrixCommand
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface WuzzHystrixCommand {
int timeout() default 1000;
String fallback() default "";
}
|
- 编写切面类,实现简易的逻辑处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| @Component @Aspect public class WuzzHystrixCommandAspect { ExecutorService executorService = Executors.newFixedThreadPool(10);
@Pointcut(value = "@annotation(com.wuzz.demo.custom.hystrix.WuzzHystrixCommand)") public void pointCut() { }
@Around(value = "pointCut()&&@annotation(hystrixCommand)") public Object doPointCut(ProceedingJoinPoint joinPoint, WuzzHystrixCommand hystrixCommand) throws InterruptedException, ExecutionException, TimeoutException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { int timeout = hystrixCommand.timeout(); Future future = executorService.submit(() -> { try { return joinPoint.proceed(); } catch (Throwable throwable) { throwable.printStackTrace(); } return null; }); Object result; try { result = future.get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); future.cancel(true); if (StringUtils.isBlank(hystrixCommand.fallback())) { throw e; } result = invokeFallback(joinPoint, hystrixCommand.fallback()); } return result; }
private Object invokeFallback(ProceedingJoinPoint joinPoint, String fallback) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); Class<?>[] parameterTypes = method.getParameterTypes(); try { Method fallbackMethod = joinPoint.getTarget().getClass().getMethod(fallback, parameterTypes); fallbackMethod.setAccessible(true); return fallbackMethod.invoke(joinPoint.getTarget(), joinPoint.getArgs()); } catch (Exception e) { e.printStackTrace(); throw e; } } }
|
- 编写测试,调用:
1 2 3 4 5 6 7 8 9 10 11
| @WuzzHystrixCommand(fallback = "customFallback", timeout = 3000) @GetMapping("/custom/hystrix/test") public String test() { Map map = new HashMap<>(); map.put("id", 666); return restTemplate.getForObject(REST_URL_PREFIX + "/hello?id={id}", String.class, map); }
public String customFallback() { return "custom 请求被降级"; }
|
正常得调用是没有问题的,这个时候我们把服务提供方的服务接口里 sleep 3秒来模仿调用超时,在访问接口就会得到降级服务后的返回。
Spring Cloud Alibaba
Nacos 配置中心,负载均衡,服务注册
Nacos服务注册源码分析
- Nacos服务注册Naming Service源码分析
* 接受Nacos客户端的API调用注册生成Instance实例
* 将Instance放入serviceMap中ConcurrentHashMap集合中
* consitencyService.listen实现数据定期检查
* 通过Namespace对已注册服务的隔离
* 定时检查HeartBeat对已注册Instance实例进行检查,更新实例状态
* 对出现异常的服务进行基于UDP协议推送更新于PushService
- Nacos服务方客户端注册源码分析
- Spring Boot自动启动NacosAutoServiceRegistration进行注册调用
- 监听ApplicationStartedEvent事件调用NacosServiceRegistry.register方法
- namingService.regiterInstance中创建BeanInfo,定时发送心跳包:executorService.schedule(task, period, unit)
- Nacos前端客户端服务消费者源码分析
- 客户端发起订阅请求会定期发起UpdateTask来获得最新地址
- 客户端也会提供本地EventListener回调实例处理出现异常的服务
Sentinel 弹性限流/熔断模式
Sentinel和Hystrix对比

Seata 分布式事务
分布式事务解决了分布式系统中存储数据(数据库/缓存)一致性问题。
分布式事务解决理论
X/Open分布式事务模型
- AP: Application 应用程序
- RM: Resource Manager 资源管理者,数据库
- TM: Transaction Manager事务管理器/协调者
两段提交协议
- 准备阶段: TM同之RM准备事务,并告知准备结果
- 提交/回滚阶段:如果所有RM返回成功则执行提交完成指令,反之执行回滚指令。
存在问题:
- 同步阻塞数据库
- 容易失败,一个节点失败就回滚
- TM单点故障问题,造成RM锁死。
- 脑裂问题,二阶段部分提交问题。
三段提交协议
- CanCommit询问阶段: TM询问是否可以参与事务/超时。
- PreCommit准备阶段:如果所有RM确认可以,则发起事务,并返回结果/超时。
- DoCommit提交/回滚阶段:如果均成功提交则发起提交/回滚指令。
改进部分:
- 超时即失败机制,避免两阶段提交锁死等待问题。
- 提前确认节点状态
分布式事务解决方案
- 基于MQ的消息中间件实现TCC (Try-Confirm-Cancel)模型补偿型方案(幂等性实现,最大努力通知机制)
- 基于Seata模式的分布式事务框架(AT, TCC, Sega 和XA事务模式)
Seata源码分析
Spring Cloud Stream 发布/订阅流处理
Spring Cloud Stream支持消息中间件通信,因而可以支持多种高并发消息发布/消费场景:
Spring Cloud Stream包含如下四个核心部分:
- Spring Messaging
- Message, 消息对象,包含消息头和消息体
- MessageChannel, 消息通道接口,用于接收消息,提供send方法将消息发送至消息通道
- MessageHandler, 消息处理器接口,用于处理消息逻辑
- Spring Integration
- MessageDispatcher:消息分发接口,用于分发消息和添加删除消息处理器
- MessageRouter: 消息路由接口,定义默认的输出消息通道
- Filter:消息的过滤注解,用于配置消息过滤表达式
- Aggregator: 消息的聚合注解,用于将多个消息聚合成一条
- Splitter: 消息的分割,用于将一条消息拆分成多条
- Binders 目标绑定器,负责于外部消息中间件系统集成的组件
- doBindProducer: 为中间件绑定发送消息模块,让中间件能从MessageChannel接受到符合中间件格式的消息
- doBindConsumer:为中间件绑定接受消息模块,让中间件能够发送符合Spring Message标准的消息到MessageChannel
- Bindings 绑定生成的桥梁,支持Kafka,RabbitMQ中间件
RocketMQ 分布式消息通信源码分析
消息发送流程源码分析:

除了负责和Spring Cloud服务器中的Messaging集成之外,RocketMQ Binder还负责和MQ中间件集群通信,源码分发布/订阅两部分,分别如下:
- 使用RocketMQTemplate真正发送MQ消息到中间件
- 同时创建ConsumerEndpoint和input MessageChannel监听MQ订阅消息,并且负责转发给下游
- 消息的消费分为顺序消费和并发消费,分别由DefaultMessageListenerOrderly,DefaultMessageListenerConcurrently实现,通过binders的配置设定。
消息订阅流程源码分析:

其中,服务器对消息的接收,是基于注解方式注入到响应的业务方法中的。这就是在业务代码中,不需要为接收信息创建MessageChannel,却能拿到信息体中的反序列化后信息。
RocketMQ 消息使用场景与实现
RocketMQ 顺序消息实现
顺序发送消费场景:订单创建、支付、退款流程处理,数据库BinLog信息消费等等。
- 顺序发送需要将消息发送到同一队列即可,通过基于消息ID的哈希分队选择器即可完成。
- 顺序消费需要binders中配置好ConsumerMQ的集群消费模式,即每条消息只会被ConsumerGroup中的一个Consumer消费。通过Consumer拿Broker独占锁实现。消费成功后会提交并更新消费进程,避免重复消费。代码如下:
1 2 3 4 5 6 7 8 9 10 11
| try{ this.processQueue.getLockConsume().lock(); if(this.processQueue.isDropped()){ break; } status = messageListener.consumeMessage(Collections.unmodifieableList(msgs), context); }catch(Throwable e){ hasException - true; }finally{ this.processQueue.getLockConsume().unlock(); }
|
RocketMQ 普通消息发送实现
普通消息在队列选择可以由两种机制:
- 轮询机制:轮流使用每个队列发送消息
- 故障规避机制:
RocketMQ 消息并发消费实现
并发消费场景下,消息队列允许Consumer的线程消费池可以向同一个队列消费信息,并且每个消费线程消费信息会有自己的进度信息。
RocketMQ 分布式事务消息实现
为分布式事务处理提供了通信基础。

Rocket发送事务消息:
Rocket发送事务消息是二次提交的,第一次发送prepare提交到服务器时消息主题会替换为RMQ_SYS_TRANS_HALF_TOPIC。等到本地事务执行完毕以后才进行二次提交,这时会发送给原本消息的topic。
由producer发送prepare(半消息)给MQ的broker。MQ会把消息记录到本地,然后回复prepare消息状态给producer。
prepare消息发送以后获取发送状态,如果是成功则执行本地业务(本地事务),根据本地事务执行结果手动返回相应状态(RocketMQLocalTransactionState.COMMIT、RocketMQLocalTransactionState.ROLLBACK等)给MQ。
如果是COMMIT则说明本地事务执行成功,prepare为可提交状态,MQ收到COMMIT消息就会发送给consumer,然后consumer执行本地业务。如果是ROLLBACK则会删除prepare消息。
如果MQ一直没收到返回状态则会启动定时任务检查本地事务状态
消费者、生产者的事务各由开发者自己保证。MQ的事务是由MQ保证,这里会根据使用者配置的参数来决定如何执行。
RocketMQ消费模式
- at-most-once最多一次
- at-least-once最少一次,RocketMQ通过消费者ACK机制支持至少一次
- exactly-only-once仅此一次
RocketMQ实现原理
高性能设计
高可用设计