Jetty IO model
Jetty I/O Model
Jetty是一个开源的轻量级webserver框架,实现了大部分javax标准下的接口,并且也提供了很多有用的协议支持,例如http1.1, http2, 和websocket等。Jetty采用了最新的Java Nio接口,从底层解决了阻塞式处理I/O的问题,利用SelectionKey实现了多路复用socketChannel,得到了高效的I/O吞吐性能。本文将着重从源代码层面去讲解Jetty是如何做到。
Jetty Server
Jetty Server实例是整个服务的容器,包含了connectors负责管理客户端连接。
Jetty ServerConnector
ServerConnector扩展了既有的AbstractConnector,实现了异步处理ServerSocketChannel的selectedKeys事件回调。主要是通过SelectorManager管理一系列ManagedSelector,根据不同的协议生成不同的连接实例。
Jetty AbstractConnector
AbstractConnector是Connector的基本实现,主要包含了一组ConnectionFactory负责构造不同类型的connection,也包含了一组AbstractCoonector.Acceptor实例,负责监听所有的accept事件,accept事件是客户端连接的第一个事件。
Jetty SelectorManager与ManagedSelector
SelectorManager是所有ManagedSelector的管理员,负责异步执行所有ManagedSelector提交的Task,是多路复用一个ServerSocketChannel的核心。底层是依赖Java Nio的ServerSocketChannel多路复用技术实现。
Java Nio的多路复用技术
ServerSocketChannel
- 创建:通过ServerSocketChannel类的静态方法open()获得。
- 绑定端口:每个ServerSocketChannel都有一个对应的ServerSocket,通过其socket()方法获得。获得ServerSocket是为了使用其bind()方法绑定监听端口号。若是使用其accept()方法监听请求就和普通Socket的处理模式无异。
- 设置是否使用阻塞模式:true/false。configureBlocking(false)——不适用阻塞模式。阻塞模式不能使用Selector!
- 注册选择器以及选择器关心的操作类型:register(Selector,int) 第一个参数可以传入一个选择器对象,第二个可传入SelectionKey代表操作类型的四个静态整型常量中的一个,表示该选择器关心的操作类型。
Selector
创建:通过Selector的静态方法open()获得。
等待请求:select(long)——long代表最长等待时间,超过该时间程序继续向下执行。若设为0或者不传参数,表示一直阻塞,直到有请求。
获得选择结果集合:selectedKeys(),返回一个SelectionKey集合。SelectionKey对象保存处理当前请求的Channel、Selector、操作类型以及附加对象等等。
SelectionKey对象有四个静态常量代表四种操作类型以及对应的判断是否是该种操作的方法:SelectionKey.OP_ACCEPT——代表接收请求操作 isAcceptable()
SelectionKey.OP_CONNECT——代表连接操作 isConnectable()
SelectionKey.OP_READ——代表读操作 isReadable()
SelectionKey.OP_WRITE——代表写操作 isWritable()
NioSocket中服务端的处理过程分为5步:
- 创建ServerScoketChannel对象并设置相关参数(绑定监听端口号,是否使用阻塞模式)
- 创建Selector并注册到服务端套接字信道(ServerScoketChannel)上
- 使用Selector的select方法等待请求
- 接收到请求后使用selectedKeys方法获得selectionKey集合
- 根据选择键获得Channel、Selector和操作类型进行具体处理。
Code Sample
1 | /* |
Jetty对socket的多路复用技术实现
Jetty主要是依赖底层Java Nio的多路复用技术,并且封装成为高吞吐量的服务器。封装主要分成两部分:
- Connector -> Acceptor: 当新的请求到达Jetty时,Jetty ServerConnector会拿到一个非阻塞、新的_acceptChannel:ServerSocketChannel,其关联的selector用于接收所有后续Accept/Update/onSelected/close/ReplaceKey事件。Acceptor作为Selector的回调类,提供了所有socketchannel事件的回调方法。
1 | /** |
- SelectorManager -> ManagedSelector -> Selector: SelectorManager负责维护一组可以复用的Selector实例,对于每个_acceptChannel:ServerSocketChannel非阻塞实例,可以注册一个Selector实例用于监听相应的SelectionKey事件。
1 |
|
Selector事件的生产者/消费者模式:选择器本身是可以复用的,也就是大量请求创建的大量socketchannel是可以共享一个选择器实例,并且通过selectedKey.channel()获得相应的channel实例。这相较于传统的一个请求一个线程模式提高了线程利用率,避免了大量请求带来的线程池饥饿问题。同时,selector本身的事件也需要有很好的资源复用,否则,生产者产生的大量任务同时也会让消费者线程饥饿造成阻塞。Jetty为了解决这个问题采用了一种新的消费模式: EPC,即 eat what you kill消费者和生产者同线程优先处理模式。
- SelectorProducer类提供了一个循环loop负责阻塞查询select()并且将查询到的事件/任务提交到_updates队列中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public Runnable produce()
{
while (true)
{
Runnable task = processSelected();
if (task != null)
return task;
processUpdates();
updateKeys();
if (!select())
return null;
}
}- 产生的任务主要分四种:
- processSelected(): 处理selected事件产生真正的task,或者处理connect事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51private Runnable processSelected()
{
while (_cursor.hasNext())
{
SelectionKey key = _cursor.next();
Object attachment = key.attachment();
SelectableChannel channel = key.channel();
if (key.isValid())
{
if (LOG.isDebugEnabled())
LOG.debug("selected {} {} {} ", safeReadyOps(key), key, attachment);
try
{
if (attachment instanceof Selectable)
{
// Try to produce a task
Runnable task = ((Selectable)attachment).onSelected();
if (task != null)
return task;
}
else if (key.isConnectable())
{
processConnect(key, (Connect)attachment);
}
else
{
throw new IllegalStateException("key=" + key + ", att=" + attachment + ", iOps=" + safeInterestOps(key) + ", rOps=" + safeReadyOps(key));
}
}
catch (CancelledKeyException x)
{
if (LOG.isDebugEnabled())
LOG.debug("Ignoring cancelled key for channel {}", channel);
IO.close(attachment instanceof EndPoint ? (EndPoint)attachment : channel);
}
catch (Throwable x)
{
LOG.warn("Could not process key for channel {}", channel, x);
IO.close(attachment instanceof EndPoint ? (EndPoint)attachment : channel);
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Selector loop ignoring invalid key for channel {}", channel);
IO.close(attachment instanceof EndPoint ? (EndPoint)attachment : channel);
}
}
return null;
}- processUpdates(): 将updates中的所有SelectorUpdate实例处理完,应该是Selector内部的事件,例如Start,DumpKeys,Acceptor,Accept,Connect,CloseConnection,StopSelector等,有update方法实现具体update内容。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48private void processUpdates()
{
try (AutoLock l = _lock.lock())
{
Deque<SelectorUpdate> updates = _updates;
_updates = _updateable;
_updateable = updates;
}
if (LOG.isDebugEnabled())
LOG.debug("updateable {}", _updateable.size());
for (SelectorUpdate update : _updateable)
{
if (_selector == null)
break;
try
{
if (LOG.isDebugEnabled())
LOG.debug("update {}", update);
update.update(_selector);
}
catch (Throwable x)
{
LOG.warn("Cannot update selector {}", ManagedSelector.this, x);
}
}
_updateable.clear();
Selector selector;
int updates;
try (AutoLock l = _lock.lock())
{
updates = _updates.size();
_selecting = updates == 0;
selector = _selecting ? null : _selector;
}
if (LOG.isDebugEnabled())
LOG.debug("updates {}", updates);
if (selector != null)
{
if (LOG.isDebugEnabled())
LOG.debug("wakeup on updates {}", this);
selector.wakeup();
}
}- updateKeys(): 处理Selectable实例的updateKey和_keys清理
1
2
3
4
5
6
7
8
9
10
11
12
13
14private void updateKeys()
{
// Do update keys for only previously selected keys.
// This will update only those keys whose selection did not cause an
// updateKeys update to be submitted.
for (SelectionKey key : _keys)
{
Object attachment = key.attachment();
if (attachment instanceof Selectable)
((Selectable)attachment).updateKey();
}
_keys.clear();
}- processConnect(key, attachment): 处理connect请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31private void processConnect(SelectionKey key, Connect connect)
{
SelectableChannel channel = key.channel();
try
{
key.attach(connect.attachment);
boolean connected = _selectorManager.doFinishConnect(channel);
if (LOG.isDebugEnabled())
LOG.debug("Connected {} {}", connected, channel);
if (connected)
{
if (connect.timeout.cancel())
{
key.interestOps(0);
execute(new CreateEndPoint(connect, key));
}
else
{
throw new SocketTimeoutException("Concurrent Connect Timeout");
}
}
else
{
throw new ConnectException();
}
}
catch (Throwable x)
{
connect.failed(x);
}
}- select(): 阻塞调用Selector.select()方法获取selectionKeys事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
private boolean select()
{
try
{
Selector selector = _selector;
if (selector != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Selector {} waiting with {} keys", selector, selector.keys().size());
//阻塞等待select()返回
int selected = ManagedSelector.this.select(selector);
// The selector may have been recreated.
selector = _selector;
if (selector != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Selector {} woken up from select, {}/{}/{} selected", selector, selected, selector.selectedKeys().size(), selector.keys().size());
int updates;
try (AutoLock l = _lock.lock())
{
// finished selecting
_selecting = false;
updates = _updates.size();
}
_keys = selector.selectedKeys();
int selectedKeys = _keys.size();
if (selectedKeys > 0)
_keyStats.record(selectedKeys);
_cursor = selectedKeys > 0 ? _keys.iterator() : Collections.emptyIterator();
if (LOG.isDebugEnabled())
LOG.debug("Selector {} processing {} keys, {} updates", selector, selectedKeys, updates);
return true;
}
}
}
catch (Throwable x)
{
IO.close(_selector);
_selector = null;
if (isRunning())
{
LOG.warn("Fatal select() failure", x);
onSelectFailed(x);
}
else
{
if (LOG.isDebugEnabled())
LOG.warn("select() failure", x);
else
LOG.warn("select() failure {}", x.toString());
}
}
return false;
}
Jetty多协议支持
Jetty的多协议支持包括HTTP0.9 1.1 2.0和ws等等,其实这些协议本质上都是基于TCP/IP层socket协议上的应用层协议,也就是说,在I/O层面都是通过socket协议的握手和通信规则,只是在内容传输上会有不同。例如早期的http协议是不面向连接的,也就是说即使建立了socket连接,也会在http回应发出后被断开,从而在每次http请求产生更多的IO开销。http也可以复用socket连接,只需要池化管理socket连接即可。而websocket协议则是通过http请求后的upgrade请求建立长连接,从而达到双向全双工通信的效果,所以能够高效的利用socket连接。
JettyWebsocketServlet
在socket连接建立后,Jetty会将request真正的进行处理,主要是通过servlet和filter中的逻辑进行内容处理。主要是通过service方法为入口。当Jetty处理完socket连接IO事件processConnect后,会创建CreateEndPoint任务让selectormanager创建客户端点,并且调用newConnection方法->getDefaultConnectionFactory().newConnection方法创建Connection实例,最终调用connectionOpened -> connection.onOpen方法处理不同协议Connection onOpen后续逻辑。
1 |
|
http请求会采用HttpConnection实例,而websocket会采用NegotiatingClientConnection实例。HTTP协议的handle是基于状态机HttpHandleState实现,根据不同状态生成不同的action,如dispatch/handling/async_error/blocking等。最终request应该会dispatch用到不同servlet的service方法,进行websocket升级处理。
1 |
|
这是HttpChannel实例中具体的handle方法,负责执行不同的action,其中最主要的就是dispatch方法,负责分发request到不同的servelet进行服务器用户逻辑处理。
1 |
|