Java NIO

Java异步计算方法与异步网络通信框架

Java异步方法

异步方法(Java asynchronized programming)是提升程序性能的一个有效手段,采用已经封装好的方法,让JVM去控制线程的创建和回收管理,减少了线程的阻塞,让程序员更加关注方法的设计。

Callable,Future, 和FutureTask

  • Callable

Callable接口使用泛型去定义它的返回类型。Executors类提供了一些有用的方法去在线程池中执行Callable内的任务。由于Callable任务是并行的,必须等待它返回的结果。java.util.concurrent.Future对象解决了这个问题。

在线程池提交Callable任务后返回了一个Future对象,使用它可以知道Callable任务的状态和得到Callable返回的执行结果。Future提供了get()方法,等待Callable结束并获取它的执行结果。

Callable与Runnable接口很相似,不同之处在于一个需要返回值,一个不需要。而且Callable可以抛出异常。

  • Future和FutureTask

FutureTask是Future接口的一个唯一实现类。

FutureTask类实现了RunnableFuture接口,RunnableFuture继承了Runnable接口和Future接口,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

Future接口的不足:

Future接口可以构建异步应用,但依然有其局限性。它很难直接表述多个Future 结果之间的依赖性。实际开发中,我们经常需要达成以下目的:

* 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
* 等待 Future 集合中的所有任务都完成。
* 仅等待 Future 集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。
* 通过编程方式完成一个 Future 任务的执行(即以手工设定异步操作结果的方式)。
* 应对 Future 的完成事件(即当 Future 的完成事件发生时会收到通知,并能使用 Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)

在CompletableFuture中,满足了上述的目的。

https://www.cnblogs.com/dolphin0520/p/3949310.html

CompletableFuture

通过静态方法产生CompletableFuture实例。CompletableFuture的底层实现原理:

静态方法 内容
runAsync(Runnable runnable) 使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。
runAsync(Runnable runnable, Executor executor) 使用指定的thread pool执行异步代码。
supplyAsync(Supplier<U> supplier) 使用ForkJoinPool.commonPool()作为它的线程池执行异步代码,异步操作有返回值
supplyAsync(Supplier<U> supplier, Executor executor) 使用指定的thread pool执行异步代码,异步操作有返回值

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// CompletableFuture 在调用runAsync后会立刻执行,定义的Runnable输入参入与.NET中定义的Task.Run(Action a)很相似。
CompletableFuture cf = CompletableFuture.runAsync(()=> System.out.println("Hello"));
CompletableFuture<String> cf2 = CompletableFuture.supplierAsync(()=>"Hello");

try {
// get()方法会阻塞当前线程等待结果,类似于.NET的Task.Result属性。此接口继承于Future的get()方法,运行方式和CompletableFuture的join()一致,与之不相同的是get()会抛出InterruptedException 和ExecutionException异常需要开发者手动处理,而join()抛出的异常则交由exceptionally()中的callback进行处理。
cf.get();
// join()方法也会阻塞当前线程,同步执行等待结果,和.NET的Task.wait()方法相似,如果有异常/在等待时被取消,会抛出相应未经过检查的exception,比如CompletionException异常 /CancellationException异常。
System.out.println(cf2.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}

底层是通过一个线程池接收提交的任务,源码是用一个Executor实例执行lamda表达式,一般默认是运用ForJoinPool线程池中的线程执行。

1
2
3
4
5
6
7
8
9
private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();


/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); }
}

实例方法 内容
thenRun(Runnable runnable) 可以传入一个Lamda表达式作为后续处理方法
thenAccept(Comsummer<? super T> consummer) 在异步task执行完成后传入一个Action方法来处理输出参数,如果void输出是不需要thenAccept(**),也有async版本
thenApply(Function<? super T, ? super E> function) 在异步task执行完成后传入一个Func来处理输出参数,并且得出返回值,也有async版本
thenCombine(CompletionStage<? extends U> taskToBeCombined, Function<? super T, ? super U> function) Task并联,两个都执行完成了再继续fucntion)
thenCompose(Function<? extends U> function, CompletionStage<U> cf) Task串联,本task执行完成后再执行下一步
complete(T t) 立刻返回Task的结果,并且返回值是t。当然如果Task已经异步执行完毕,则设置失效
completeExceptionally(Throwable x) 立刻返回Task并且抛出对应的Exception。如果Task已经结束,则不变。
get() 尝试获取结果,如果没有结束/抛出exception/cancel掉,都会抛出对应的exception.
join() join()方法会阻塞当前线程,同步执行等待结果,和.NET的Task.wait()方法相似,如果有异常/被取消,会抛出相应的exception。
handle(BiFunction<? super T, Throwable, ? extends U> fn) 将event handler传入,待到task执行完毕后进行处理/exception handling。也有async版本
exceptionally​(Function<Throwable,? extends T> fn) 为异步方法注册exception的callback,当然也可以给该task注册下一步的方法在thenApply(..)中。
  • Lambda表达式

在Java里,lamda表达式的实质是函数。Runnable是一种特殊的lamda表达式,也就是.NET 中的Action类型委托实例,()-> { ….; return void;}

https://www.jianshu.com/p/dff9063e1ab6

题外话: C#异步编程方法

C#异步编程框架依赖于CLR的实现,更加高效和简单明了,而Java的异步方法依赖线程池实现。

Java NIO网络编程框架

在网络编程领域,节点间式依赖TCP/UDP协议进行通信,Java提供的Socket实现在JDK 1.4以前是阻塞I/O, 也就是说每一个Socket连接,JVM中会分配一个线程阻塞等待连接建立,数据通信,在高并发场景下会发生线程池线程用尽的情况,为了优化网络通信效率,出现了非阻塞/异步I/O的设计方案。

Reactor模式设计思想

Netty通信框架的设计思想是基于Reactor模式思想,这是一种基于事件驱动的设计思想。能够高效的解决很多程序设计中的线程不必要阻塞问题。Java NIO也是基于Reactor设计思想实现的,在JVM本地方法层面做到了多路复用I/O。

NettyNIO

Reactor模式中的五个角色:

  1. Handle(句柄或描述符,在Windows下称为句柄,在Linux下称为描述符):
    本质上表示一种资源(比如说文件描述符,或是针对网络编程中的socket描述符),是由操作系统提供的;该资源用于表示一个个的事件,事件既可以来自于外部,也可以来自于内部;外部事件比如说客户端的连接请求,客户端发送过来的数据等;内部事件比如说操作系统产生的定时事件等。它本质上就是一个文件描述符,Handle是事件产生的发源地。
  2. Synchronous Event Demultiplexer(同步事件分离器):
    它本身是一个系统调用,用于等待事件的发生(事件可能是一个,也可能是多个)。调用方在调用它的时候会被阻塞,一直阻塞到同步事件分离器上有事件产生为止。对于Linux来说,同步事件分离器指的就是常用的I/O多路复用机制,比如说select、poll、epoll等。在Java NIO领域中,同步事件分离器对应的组件就是Selector;对应的阻塞方法就是select方法。
  3. Event Handler(事件处理器):
    本身由多个回调方法构成,这些回调方法构成了与应用相关的对于某个事件的反馈机制。在Java NIO领域中并没有提供事件处理器机制让我们调用或去进行回调,是由我们自己编写代码完成的。Netty相比于Java NIO来说,在事件处理器这个角色上进行了一个升级,它为我们开发者提供了大量的回调方法,供我们在特定事件产生时实现相应的回调方法进行业务逻辑的处理,即,ChannelHandler。ChannelHandler中的方法对应的都是一个个事件的回调。
  4. Concrete Event Handler(具体事件处理器):
    是事件处理器的实现。它本身实现了事件处理器所提供的各种回调方法,从而实现了特定于业务的逻辑。它本质上就是我们所编写的一个个的处理器实现。
  5. Initiation Dispatcher(初始分发器):
    实际上就是Reactor角色。它本身定义了一些规范,这些规范用于控制事件的调度方式,同时又提供了应用进行事件处理器的注册、删除等设施。它本身是整个事件处理器的核心所在,Initiation Dispatcher会通过Synchronous Event Demultiplexer来等待事件的发生。一旦事件发生,Initiation Dispatcher首先会分离出每一个事件,然后调用事件处理器,最后调用相关的回调方法来处理这些事件。Netty中ChannelHandler里的一个个回调方法都是由bossGroup或workGroup中的某个EventLoop来调用的。

Reactor线程处理流程理解:

  1. 初始化Initiation Dispatcher,然后将若干个Concrete Event Handler注册到Initiation Dispatcher中。当应用向Initiation Dispatcher注册Concrete Event Handler时,会在注册的同时指定感兴趣的事件,即,应用会标识出该事件处理器希望Initiation Dispatcher在某些事件发生时向其发出通知,事件通过Handle来标识,而Concrete Event Handler又持有该Handle。这样,事件 ————> Handle ————> Concrete Event Handler 就关联起来了。

  2. Initiation Dispatcher 会要求每个事件处理器向其传递内部的Handle。该Handle向操作系统标识了事件处理器。

  3. 当所有的Concrete Event Handler都注册完毕后,应用会调用handle_events方法来启动Initiation Dispatcher的事件循环。这是,Initiation Dispatcher会将每个注册的Concrete Event Handler的Handle合并起来,并使用Synchronous Event Demultiplexer(同步事件分离器)同步阻塞的等待事件的发生。比如说,TCP协议层会使用select同步事件分离器操作来等待客户端发送的数据到达连接的socket handler上。

比如,在Java中通过Selector的select()方法来实现这个同步阻塞等待事件发生的操作。在Linux操作系统下,select()的实现中:

  • 会将已经注册到Initiation Dispatcher的事件调用epollCtl(epfd, opcode, fd, events)注册到linux系统中,这里fd表示Handle,events表示我们所感兴趣的Handle的事件;

  • 通过调用epollWait方法同步阻塞的等待已经注册的事件的发生。不同事件源上的事件可能同时发生,一旦有事件被触发了,epollWait方法就会返回;

  • 最后通过发生的事件找到相关联的SelectorKeyImpl对象,并设置其发生的事件为就绪状态,然后将SelectorKeyImpl放入selectedSet中。这样一来我们就可以通过Selector.selectedKeys()方法得到事件就绪的SelectorKeyImpl集合了。

  1. 当与某个事件源对应的Handle变为ready状态时(比如说,TCP socket变为等待读状态时),Synchronous Event Demultiplexer就会通知Initiation Dispatcher。

  2. Initiation Dispatcher会触发事件处理器的回调方法,从而响应这个处于ready状态的Handle。当事件发生时,Initiation Dispatcher会将被事件源激活的Handle作为『key』来寻找并分发恰当的事件处理器回调方法。

  3. Initiation Dispatcher会回调事件处理器的handle_event(type)回调方法来执行特定于应用的功能(开发者自己所编写的功能),从而相应这个事件。所发生的事件类型可以作为该方法参数并被该方法内部使用来执行额外的特定于服务的分离与分发。

Netty通信框架之NIO的源码分析

Netty服务器单线程Reactor处理模型源码分析

  1. 服务器端的Reactor是一个线程对象(EventLoopGroup),该线程会启动事件循环,并使用Selector来实现IO的多路复用。注册Channel通道(包含Acceptor事件处理器Selector)到Reactor中,Acceptor事件处理器所关注的事件是ACCEPT(IO)事件,这样Reactor会循环监听客户端向服务器端发起的连接请求事件(ACCEPT事件)。

  2. 当客户端向服务器端发起一个连接请求,Reactor监听到了该ACCEPT事件的发生并将该ACCEPT事件派发给相应的Acceptor处理器来进行处理。Acceptor处理器通过accept()方法得到与这个客户端对应的连接(SocketChannel),然后将该连接所关注的READ(IO)事件以及对应的READ事件处理器注册到Reactor中,这样一来Reactor就会监听该连接的READ事件了。或者当你需要向客户端发送数据时,就向Reactor注册该连接的WRITE事件和其处理器。

  3. 当Reactor监听到有读或者写事件发生时,将相关的事件派发给对应的处理器(另一个EventLoopGroup工作循环线程组)进行处理。比如,读处理器会通过SocketChannel的read()方法读取数据,此时read()操作可以直接读取到数据,而不会堵塞与等待可读的数据到来。

  4. 每当处理完所有就绪的感兴趣的I/O事件后,Reactor线程会再次执行select()阻塞等待新的事件就绪并将其分派给对应处理器进行处理。

注意,Reactor的单线程模式的单线程主要是针对于I/O操作而言,也就是所以的I/O的accept()、read()、write()以及connect()操作都在一个线程上完成的。

但在目前的单线程Reactor模式中,不仅I/O操作在该Reactor线程上,连非I/O的业务操作也在该线程上进行处理了,这可能会大大延迟I/O请求的响应。所以我们应该将非I/O的业务逻辑操作从Reactor线程上卸载,以此来加速Reactor线程对I/O请求的响应。

服务器多线程Reactor处理流程:

Reactor线程池中的每一Reactor线程都会有自己的多路复用器Selector、线程和分发的事件循环逻辑。
mainReactor可以只有一个,但subReactor一般会有多个。mainReactor线程主要负责接收客户端的连接请求,然后将接收到的SocketChannel传递给subReactor,由subReactor来完成和客户端的通信。

  1. 注册一个Acceptor事件处理器到mainReactor中,Acceptor事件处理器所关注的事件是ACCEPT事件,这样mainReactor会监听客户端向服务器端发起的连接请求事件(ACCEPT事件)。启动mainReactor的事件循环。

  2. 客户端向服务器端发起一个连接请求,mainReactor监听到了该ACCEPT事件并将该ACCEPT事件派发给Acceptor处理器来进行处理。Acceptor处理器通过accept()方法得到与这个客户端对应的连接(SocketChannel),然后将这个SocketChannel传递给subReactor线程池。

  3. subReactor线程池分配一个subReactor线程给这个SocketChannel,即,将SocketChannel关注的READ事件以及对应的READ事件处理器注册到subReactor线程中。当然你也注册WRITE事件以及WRITE事件处理器到subReactor线程中以完成I/O写操作。Reactor线程池中的每一Reactor线程都会有自己的Selector、线程和分发的循环逻辑。

  4. 当有I/O事件就绪时,相关的subReactor就将事件派发给响应的处理器处理。注意,这里subReactor线程只负责完成I/O的read()操作,在读取到数据后将非I/O事件——业务逻辑的处理放入到EventExecutorGroup工作线程池的EventExecutor中完成,若完成业务逻辑后需要返回数据给客户端,则相关的I/O的write操作还是会被提交回subReactor线程来完成。

扩展知识:Tomcat 同步/阻塞BIO的源码分析

基于阻塞I/O服务器流程:

  1. 服务器端的Server是一个线程,线程中执行一个死循环来阻塞的监听客户端的连接请求和通信。

  2. 当客户端向服务器端发送一个连接请求后,服务器端的Server会接受客户端的请求,ServerSocket.accept()从阻塞中返回,得到一个与客户端连接相对于的Socket。

  3. 构建一个handler,将Socket传入该handler。创建一个线程并启动该线程,在线程中执行handler,这样与客户端的所有的通信以及数据处理都在该线程中执行。当该客户端和服务器端完成通信关闭连接后,线程就会被销毁。

  4. 然后Server继续执行accept()操作等待新的连接请求。

  • 优点:

    • 使用简单,容易编程
    • 在多核系统下,能够充分利用了多核CPU的资源。即,当I/O阻塞系统,但CPU空闲的时候,可以利用多线程使用CPU资源。
  • 缺点:该模式的本质问题在于严重依赖线程,但线程Java虚拟机非常宝贵的资源。随着客户端并发访问量的急剧增加,线程数量的不断膨胀将服务器端的性能将急剧下降。

    • 线程生命周期的开销非常高。线程的创建与销毁并不是没有代价的。在Linux这样的操作系统中,线程本质上就是一个进程,创建和销毁都是重量级的系统函数。
    • 资源消耗。内存:大量空闲的线程会占用许多内存,给垃圾回收器带来压力。;CPU:如果你已经拥有足够多的线程使所有CPU保持忙碌状态,那么再创建更过的线程反而会降低性能。
    • 稳定性。在可创建线程的数量上存在一个限制。这个限制值将随着平台的不同而不同,并且受多个因素制约:a)JVM的启动参数、b)Threa的构造函数中请求的栈大小、c)底层操作系统对线程的限制 等。如果破坏了这些限制,那么很可能抛出OutOfMemoryError异常。
    • 线程的切换成本是很高的。操作系统发生线程切换的时候,需要保留线程的上下文,然后执行系统调用。如果线程数过高,不仅会带来许多无用的上下文切换,还可能导致执行线程切换的时间甚至会大于线程执行的时间,这时候带来的表现往往是系统负载偏高、CPU sy(系统CPU)使用率特别高,导致系统几乎陷入不可用的状态。
    • 容易造成锯齿状的系统负载。一旦线程数量高但外部网络环境不是很稳定,就很容易造成大量请求的结果同时返回,激活大量阻塞线程从而使系统负载压力过大。
    • 若是长连接的情况下并且客户端与服务器端交互并不频繁的,那么客户端和服务器端的连接会一直保留着,对应的线程也就一直存在在,但因为不频繁的通信,导致大量线程在大量时间内都处于空置状态。
  • 适用场景:如果你有少量的连接使用非常高的带宽,一次发送大量的数据,也许典型的IO服务器实现可能非常契合。