Jetty IO model

Jetty I/O Model

Jetty是一个开源的轻量级webserver框架,实现了大部分javax标准下的接口,并且也提供了很多有用的协议支持,例如http1.1, http2, 和websocket等。Jetty采用了最新的Java Nio接口,从底层解决了阻塞式处理I/O的问题,利用SelectionKey实现了多路复用socketChannel,得到了高效的I/O吞吐性能。本文将着重从源代码层面去讲解Jetty是如何做到。

Jetty Server

Jetty Server实例是整个服务的容器,包含了connectors负责管理客户端连接。

Jetty ServerConnector

ServerConnector扩展了既有的AbstractConnector,实现了异步处理ServerSocketChannel的selectedKeys事件回调。主要是通过SelectorManager管理一系列ManagedSelector,根据不同的协议生成不同的连接实例。

Jetty AbstractConnector

AbstractConnector是Connector的基本实现,主要包含了一组ConnectionFactory负责构造不同类型的connection,也包含了一组AbstractCoonector.Acceptor实例,负责监听所有的accept事件,accept事件是客户端连接的第一个事件。

Jetty SelectorManagerManagedSelector

SelectorManager是所有ManagedSelector的管理员,负责异步执行所有ManagedSelector提交的Task,是多路复用一个ServerSocketChannel的核心。底层是依赖Java Nio的ServerSocketChannel多路复用技术实现。

Java Nio的多路复用技术

Reference

  • ServerSocketChannel

    • 创建:通过ServerSocketChannel类的静态方法open()获得。
    • 绑定端口:每个ServerSocketChannel都有一个对应的ServerSocket,通过其socket()方法获得。获得ServerSocket是为了使用其bind()方法绑定监听端口号。若是使用其accept()方法监听请求就和普通Socket的处理模式无异。
    • 设置是否使用阻塞模式:true/false。configureBlocking(false)——不适用阻塞模式。阻塞模式不能使用Selector!
    • 注册选择器以及选择器关心的操作类型:register(Selector,int) 第一个参数可以传入一个选择器对象,第二个可传入SelectionKey代表操作类型的四个静态整型常量中的一个,表示该选择器关心的操作类型。
  • Selector

    • 创建:通过Selector的静态方法open()获得。

    • 等待请求:select(long)——long代表最长等待时间,超过该时间程序继续向下执行。若设为0或者不传参数,表示一直阻塞,直到有请求。

    • 获得选择结果集合:selectedKeys(),返回一个SelectionKey集合。SelectionKey对象保存处理当前请求的Channel、Selector、操作类型以及附加对象等等。
      SelectionKey对象有四个静态常量代表四种操作类型以及对应的判断是否是该种操作的方法:

      • SelectionKey.OP_ACCEPT——代表接收请求操作 isAcceptable()

      • SelectionKey.OP_CONNECT——代表连接操作 isConnectable()

      • SelectionKey.OP_READ——代表读操作 isReadable()

      • SelectionKey.OP_WRITE——代表写操作 isWritable()

  • NioSocket中服务端的处理过程分为5步:

    1. 创建ServerScoketChannel对象并设置相关参数(绑定监听端口号,是否使用阻塞模式)
    2. 创建Selector并注册到服务端套接字信道(ServerScoketChannel)上
    3. 使用Selector的select方法等待请求
    4. 接收到请求后使用selectedKeys方法获得selectionKey集合
    5. 根据选择键获得Channel、Selector和操作类型进行具体处理。
  • Code Sample

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/*
模拟服务端 -nio-Socket 实现
*/
public class NIOServer {
public static void main ( String [] args ) {
try {
// 创建 ServerSocketChannel 通道,绑定监听端口为 8080
ServerSocketChannel serverSocketChannel = ServerSocketChannel . open () ;
serverSocketChannel. socket ().bind( new InetSocketAddress( 8080 )) ;
// 设置为非阻塞模式
serverSocketChannel.configureBlocking( false ) ;
// 注册选择器 , 设置选择器选择的操作类型
Selector selector = Selector . open () ;
serverSocketChannel.register(selector , SelectionKey . OP_ACCEPT ) ;
// 创建处理器
Handler handler = new Handler( 1204 ) ;
while ( true ) {
// 等待请求,每次等待阻塞 3s ,超过时间则向下执行,若传入 0 或不传值,则在接收到请求前一直阻塞
if (selector. select ( 3000 ) == 0 ) {
System . out .println( " 等待请求超时 ......" ) ;
continue ;
}
System . out .println( "----- 处理请求 -----" ) ;
// 获取待处理的选择键集合
Iterator< SelectionKey > keyIterator = selector. selectedKeys (). iterator () ;
while (keyIterator. hasNext ()) {
SelectionKey selectionKey = keyIterator. next () ;
try {
// 如果是连接请求,调用处理器的连接处理方法
if (selectionKey.isAcceptable()){
handler.handleAccept(selectionKey) ;
}
// 如果是读请求,调用对应的读方法
if (selectionKey.isReadable()) {
handler.handleRead(selectionKey) ;
}
} catch ( IOException e ) {
keyIterator. remove () ;
continue ;
}
}
// 处理完毕从待处理集合移除该选择键
keyIterator. remove () ;
}
} catch ( IOException e ) {
e .printStackTrace() ;
}
}

/*
处理器类
*/
private static class Handler {
private int bufferSize = 1024 ; // 缓冲器容量
private String localCharset = "UTF-8" ; // 编码格式

public Handler (){}
public Handler ( int bufferSize ){
this ( bufferSize , null ) ;
}
public Handler ( String localCharset ){
this (- 1 , localCharset ) ;
}
public Handler ( int bufferSize , String localCharset ){
if ( bufferSize > 0 ){
this . bufferSize = bufferSize ;
}
if ( localCharset != null ){
this . localCharset = localCharset ;
}
}
/*
连接请求处理方法
*/
public void handleAccept ( SelectionKey selectionKey ) throws IOException {
// 通过选择器键获取服务器套接字通道,通过 accept() 方法获取套接字通道连接
SocketChannel socketChannel = (( ServerSocketChannel ) selectionKey . channel ()). accept () ;
// 设置套接字通道为非阻塞模式
socketChannel.configureBlocking( false ) ;
// 为套接字通道注册选择器,该选择器为服务器套接字通道的选择器,即选择到该 SocketChannel 的选择器
// 设置选择器关心请求为读操作,设置数据读取的缓冲器容量为处理器初始化时候的缓冲器容量
socketChannel.register( selectionKey . selector () , SelectionKey . OP_READ , ByteBuffer . allocate ( bufferSize )) ;
}

public void handleRead ( SelectionKey selectionKey ) throws IOException {
// 获取套接字通道
SocketChannel socketChannel = ( SocketChannel ) selectionKey . channel () ;
// 获取缓冲器并进行重置 ,selectionKey.attachment() 为获取选择器键的附加对象
ByteBuffer byteBuffer = ( ByteBuffer ) selectionKey .attachment() ;
byteBuffer.clear() ;
// 没有内容则关闭通道
if (socketChannel. read (byteBuffer) == - 1 ) {
socketChannel.close() ;
} else {
// 将缓冲器转换为读状态
byteBuffer.flip() ;
// 将缓冲器中接收到的值按 localCharset 格式编码保存
String receivedRequestData = Charset . forName ( localCharset ). newDecoder ().decode(byteBuffer).toString() ;
System . out .println( " 接收到客户端的请求数据: " +receivedRequestData) ;
// 返回响应数据给客户端
String responseData = " 已接收到你的请求数据,响应数据为: ( 响应数据 )" ;
byteBuffer = ByteBuffer . wrap (responseData.getBytes( localCharset )) ;
socketChannel. write (byteBuffer) ;
// 关闭通道
socketChannel.close() ;
}
}
}
}

Jetty对socket的多路复用技术实现

Jetty主要是依赖底层Java Nio的多路复用技术,并且封装成为高吞吐量的服务器。封装主要分成两部分:

  • Connector -> Acceptor: 当新的请求到达Jetty时,Jetty ServerConnector会拿到一个非阻塞、新的_acceptChannel:ServerSocketChannel,其关联的selector用于接收所有后续Accept/Update/onSelected/close/ReplaceKey事件。Acceptor作为Selector的回调类,提供了所有socketchannel事件的回调方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* Called by {@link #open()} to obtain the accepting channel.
*
* @return ServerSocketChannel used to accept connections.
* @throws IOException if unable to obtain or configure the server channel
*/
protected ServerSocketChannel openAcceptChannel() throws IOException
{
ServerSocketChannel serverChannel = null;
if (isInheritChannel())
{
Channel channel = System.inheritedChannel();
if (channel instanceof ServerSocketChannel)
serverChannel = (ServerSocketChannel)channel;
else
LOG.warn("Unable to use System.inheritedChannel() [{}]. Trying a new ServerSocketChannel at {}:{}", channel, getHost(), getPort());
}

if (serverChannel == null)
{
InetSocketAddress bindAddress = getHost() == null ? new InetSocketAddress(getPort()) : new InetSocketAddress(getHost(), getPort());
serverChannel = ServerSocketChannel.open();
setSocketOption(serverChannel, StandardSocketOptions.SO_REUSEADDR, getReuseAddress());
setSocketOption(serverChannel, StandardSocketOptions.SO_REUSEPORT, isReusePort());
try
{
serverChannel.bind(bindAddress, getAcceptQueueSize());
}
catch (Throwable e)
{
IO.close(serverChannel);
throw new IOException("Failed to bind to " + bindAddress, e);
}
}

return serverChannel;
}

  • SelectorManager -> ManagedSelector -> Selector: SelectorManager负责维护一组可以复用的Selector实例,对于每个_acceptChannel:ServerSocketChannel非阻塞实例,可以注册一个Selector实例用于监听相应的SelectionKey事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

// 创建一个selector实例,并且用一个新的生产者线程不断监听channel中的事件。
@Override
protected void doStart() throws Exception
{
super.doStart();

_selector = _selectorManager.newSelector();

// The producer used by the strategies will never
// be idle (either produces a task or blocks).

// The normal strategy obtains the produced task, schedules
// a new thread to produce more, runs the task and then exits.
_selectorManager.execute(_strategy::produce);

// Set started only if we really are started
Start start = new Start();
submit(start);
start._started.await();
}

  • Selector事件的生产者/消费者模式:选择器本身是可以复用的,也就是大量请求创建的大量socketchannel是可以共享一个选择器实例,并且通过selectedKey.channel()获得相应的channel实例。这相较于传统的一个请求一个线程模式提高了线程利用率,避免了大量请求带来的线程池饥饿问题。同时,selector本身的事件也需要有很好的资源复用,否则,生产者产生的大量任务同时也会让消费者线程饥饿造成阻塞。Jetty为了解决这个问题采用了一种新的消费模式: EPC,即 eat what you kill消费者和生产者同线程优先处理模式。

    ExectuteProduceConsume

    1. SelectorProducer类提供了一个循环loop负责阻塞查询select()并且将查询到的事件/任务提交到_updates队列中。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @Override
    public Runnable produce()
    {
    while (true)
    {
    Runnable task = processSelected();
    if (task != null)
    return task;

    processUpdates();

    updateKeys();

    if (!select())
    return null;
    }
    }
    1. 产生的任务主要分四种:
    • processSelected(): 处理selected事件产生真正的task,或者处理connect事件。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    private Runnable processSelected()
    {
    while (_cursor.hasNext())
    {
    SelectionKey key = _cursor.next();
    Object attachment = key.attachment();
    SelectableChannel channel = key.channel();
    if (key.isValid())
    {
    if (LOG.isDebugEnabled())
    LOG.debug("selected {} {} {} ", safeReadyOps(key), key, attachment);
    try
    {
    if (attachment instanceof Selectable)
    {
    // Try to produce a task
    Runnable task = ((Selectable)attachment).onSelected();
    if (task != null)
    return task;
    }
    else if (key.isConnectable())
    {
    processConnect(key, (Connect)attachment);
    }
    else
    {
    throw new IllegalStateException("key=" + key + ", att=" + attachment + ", iOps=" + safeInterestOps(key) + ", rOps=" + safeReadyOps(key));
    }
    }
    catch (CancelledKeyException x)
    {
    if (LOG.isDebugEnabled())
    LOG.debug("Ignoring cancelled key for channel {}", channel);
    IO.close(attachment instanceof EndPoint ? (EndPoint)attachment : channel);
    }
    catch (Throwable x)
    {
    LOG.warn("Could not process key for channel {}", channel, x);
    IO.close(attachment instanceof EndPoint ? (EndPoint)attachment : channel);
    }
    }
    else
    {
    if (LOG.isDebugEnabled())
    LOG.debug("Selector loop ignoring invalid key for channel {}", channel);
    IO.close(attachment instanceof EndPoint ? (EndPoint)attachment : channel);
    }
    }
    return null;
    }

    • processUpdates(): 将updates中的所有SelectorUpdate实例处理完,应该是Selector内部的事件,例如Start,DumpKeys,Acceptor,Accept,Connect,CloseConnection,StopSelector等,有update方法实现具体update内容。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
     private void processUpdates()
    {
    try (AutoLock l = _lock.lock())
    {
    Deque<SelectorUpdate> updates = _updates;
    _updates = _updateable;
    _updateable = updates;
    }

    if (LOG.isDebugEnabled())
    LOG.debug("updateable {}", _updateable.size());

    for (SelectorUpdate update : _updateable)
    {
    if (_selector == null)
    break;
    try
    {
    if (LOG.isDebugEnabled())
    LOG.debug("update {}", update);
    update.update(_selector);
    }
    catch (Throwable x)
    {
    LOG.warn("Cannot update selector {}", ManagedSelector.this, x);
    }
    }
    _updateable.clear();

    Selector selector;
    int updates;
    try (AutoLock l = _lock.lock())
    {
    updates = _updates.size();
    _selecting = updates == 0;
    selector = _selecting ? null : _selector;
    }

    if (LOG.isDebugEnabled())
    LOG.debug("updates {}", updates);

    if (selector != null)
    {
    if (LOG.isDebugEnabled())
    LOG.debug("wakeup on updates {}", this);
    selector.wakeup();
    }
    }
    • updateKeys(): 处理Selectable实例的updateKey和_keys清理
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
     private void updateKeys()
    {
    // Do update keys for only previously selected keys.
    // This will update only those keys whose selection did not cause an
    // updateKeys update to be submitted.
    for (SelectionKey key : _keys)
    {
    Object attachment = key.attachment();
    if (attachment instanceof Selectable)
    ((Selectable)attachment).updateKey();
    }
    _keys.clear();
    }

    • processConnect(key, attachment): 处理connect请求
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
            private void processConnect(SelectionKey key, Connect connect)
    {
    SelectableChannel channel = key.channel();
    try
    {
    key.attach(connect.attachment);
    boolean connected = _selectorManager.doFinishConnect(channel);
    if (LOG.isDebugEnabled())
    LOG.debug("Connected {} {}", connected, channel);
    if (connected)
    {
    if (connect.timeout.cancel())
    {
    key.interestOps(0);
    execute(new CreateEndPoint(connect, key));
    }
    else
    {
    throw new SocketTimeoutException("Concurrent Connect Timeout");
    }
    }
    else
    {
    throw new ConnectException();
    }
    }
    catch (Throwable x)
    {
    connect.failed(x);
    }
    }
    • select(): 阻塞调用Selector.select()方法获取selectionKeys事件。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59

    private boolean select()
    {
    try
    {
    Selector selector = _selector;
    if (selector != null)
    {
    if (LOG.isDebugEnabled())
    LOG.debug("Selector {} waiting with {} keys", selector, selector.keys().size());
    //阻塞等待select()返回
    int selected = ManagedSelector.this.select(selector);
    // The selector may have been recreated.
    selector = _selector;
    if (selector != null)
    {
    if (LOG.isDebugEnabled())
    LOG.debug("Selector {} woken up from select, {}/{}/{} selected", selector, selected, selector.selectedKeys().size(), selector.keys().size());

    int updates;
    try (AutoLock l = _lock.lock())
    {
    // finished selecting
    _selecting = false;
    updates = _updates.size();
    }

    _keys = selector.selectedKeys();
    int selectedKeys = _keys.size();
    if (selectedKeys > 0)
    _keyStats.record(selectedKeys);
    _cursor = selectedKeys > 0 ? _keys.iterator() : Collections.emptyIterator();
    if (LOG.isDebugEnabled())
    LOG.debug("Selector {} processing {} keys, {} updates", selector, selectedKeys, updates);

    return true;
    }
    }
    }
    catch (Throwable x)
    {
    IO.close(_selector);
    _selector = null;

    if (isRunning())
    {
    LOG.warn("Fatal select() failure", x);
    onSelectFailed(x);
    }
    else
    {
    if (LOG.isDebugEnabled())
    LOG.warn("select() failure", x);
    else
    LOG.warn("select() failure {}", x.toString());
    }
    }
    return false;
    }

Jetty多协议支持

Jetty的多协议支持包括HTTP0.9 1.1 2.0和ws等等,其实这些协议本质上都是基于TCP/IP层socket协议上的应用层协议,也就是说,在I/O层面都是通过socket协议的握手和通信规则,只是在内容传输上会有不同。例如早期的http协议是不面向连接的,也就是说即使建立了socket连接,也会在http回应发出后被断开,从而在每次http请求产生更多的IO开销。http也可以复用socket连接,只需要池化管理socket连接即可。而websocket协议则是通过http请求后的upgrade请求建立长连接,从而达到双向全双工通信的效果,所以能够高效的利用socket连接。

JettyWebsocketServlet

在socket连接建立后,Jetty会将request真正的进行处理,主要是通过servlet和filter中的逻辑进行内容处理。主要是通过service方法为入口。当Jetty处理完socket连接IO事件processConnect后,会创建CreateEndPoint任务让selectormanager创建客户端点,并且调用newConnection方法->getDefaultConnectionFactory().newConnection方法创建Connection实例,最终调用connectionOpened -> connection.onOpen方法处理不同协议Connection onOpen后续逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

void createEndPoint(SelectableChannel channel, SelectionKey selectionKey) throws IOException
{
EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey);
Object context = selectionKey.attachment();
Connection connection = _selectorManager.newConnection(channel, endPoint, context);
endPoint.setConnection(connection);
submit(selector ->
{
SelectionKey key = selectionKey;
if (key.selector() != selector)
{
key = channel.keyFor(selector);
if (key != null && endPoint instanceof Selectable)
((Selectable)endPoint).replaceKey(key);
}
if (key != null)
key.attach(endPoint);
}, true);
endPoint.onOpen();
endPointOpened(endPoint);
_selectorManager.connectionOpened(connection, context);
if (LOG.isDebugEnabled())
LOG.debug("Created {}", endPoint);
}

http请求会采用HttpConnection实例,而websocket会采用NegotiatingClientConnection实例。HTTP协议的handle是基于状态机HttpHandleState实现,根据不同状态生成不同的action,如dispatch/handling/async_error/blocking等。最终request应该会dispatch用到不同servlet的service方法,进行websocket升级处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

public Action handling()
{
try (AutoLock l = lock())
{
if (LOG.isDebugEnabled())
LOG.debug("handling {}", toStringLocked());

switch (_state)
{
case IDLE:
if (_requestState != RequestState.BLOCKING)
throw new IllegalStateException(getStatusStringLocked());
_initial = true;
_state = State.HANDLING;
return Action.DISPATCH;

case WOKEN:
if (_event != null && _event.getThrowable() != null && !_sendError)
{
_state = State.HANDLING;
return Action.ASYNC_ERROR;
}

Action action = nextAction(true);
if (LOG.isDebugEnabled())
LOG.debug("nextAction(true) {} {}", action, toStringLocked());
return action;

default:
throw new IllegalStateException(getStatusStringLocked());
}
}
}

这是HttpChannel实例中具体的handle方法,负责执行不同的action,其中最主要的就是dispatch方法,负责分发request到不同的servelet进行服务器用户逻辑处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208

public boolean handle()
{
if (LOG.isDebugEnabled())
LOG.debug("handle {} {} ", _request.getHttpURI(), this);

HttpChannelState.Action action = _state.handling();

// Loop here to handle async request redispatches.
// The loop is controlled by the call to async.unhandle in the
// finally block below. Unhandle will return false only if an async dispatch has
// already happened when unhandle is called.
loop:
while (!getServer().isStopped())
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("action {} {}", action, this);

switch (action)
{
case TERMINATED:
onCompleted();
break loop;

case WAIT:
// break loop without calling unhandle
break loop;

case DISPATCH:
{
if (!_request.hasMetaData())
throw new IllegalStateException("state=" + _state);

dispatch(DispatcherType.REQUEST, () ->
{
for (HttpConfiguration.Customizer customizer : _configuration.getCustomizers())
{
customizer.customize(getConnector(), _configuration, _request);
if (_request.isHandled())
return;
}
getServer().handle(HttpChannel.this);
});

break;
}

case ASYNC_DISPATCH:
{
dispatch(DispatcherType.ASYNC, () -> getServer().handleAsync(this));
break;
}

case ASYNC_TIMEOUT:
_state.onTimeout();
break;

case SEND_ERROR:
{
try
{
// Get ready to send an error response
_response.resetContent();

// the following is needed as you cannot trust the response code and reason
// as those could have been modified after calling sendError
Integer code = (Integer)_request.getAttribute(RequestDispatcher.ERROR_STATUS_CODE);
if (code == null)
code = HttpStatus.INTERNAL_SERVER_ERROR_500;
_response.setStatus(code);

// The handling of the original dispatch failed and we are now going to either generate
// and error response ourselves or dispatch for an error page. If there is content left over
// from the failed dispatch, then we try to consume it here and if we fail we add a
// Connection:close. This can't be deferred to COMPLETE as the response will be committed
// by then.
ensureConsumeAllOrNotPersistent();

ContextHandler.Context context = (ContextHandler.Context)_request.getAttribute(ErrorHandler.ERROR_CONTEXT);
ErrorHandler errorHandler = ErrorHandler.getErrorHandler(getServer(), context == null ? null : context.getContextHandler());

// If we can't have a body, then create a minimal error response.
if (HttpStatus.hasNoBody(_response.getStatus()) || errorHandler == null || !errorHandler.errorPageForMethod(_request.getMethod()))
{
sendResponseAndComplete();
break;
}

dispatch(DispatcherType.ERROR, () ->
{
errorHandler.handle(null, _request, _request, _response);
_request.setHandled(true);
});
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not perform ERROR dispatch, aborting", x);
if (_state.isResponseCommitted())
abort(x);
else
{
try
{
_response.resetContent();
sendResponseAndComplete();
}
catch (Throwable t)
{
if (x != t)
x.addSuppressed(t);
abort(x);
}
}
}
finally
{
// clean up the context that was set in Response.sendError
_request.removeAttribute(ErrorHandler.ERROR_CONTEXT);
}
break;
}

case ASYNC_ERROR:
{
throw _state.getAsyncContextEvent().getThrowable();
}

case READ_CALLBACK:
{
ContextHandler handler = _state.getContextHandler();
if (handler != null)
handler.handle(_request, _request.getHttpInput());
else
_request.getHttpInput().run();
break;
}

case WRITE_CALLBACK:
{
ContextHandler handler = _state.getContextHandler();
if (handler != null)
handler.handle(_request, _response.getHttpOutput());
else
_response.getHttpOutput().run();
break;
}

case COMPLETE:
{
if (!_response.isCommitted())
{
if (!_request.isHandled() && !_response.getHttpOutput().isClosed())
{
// The request was not actually handled
_response.sendError(HttpStatus.NOT_FOUND_404);
break;
}

// Indicate Connection:close if we can't consume all.
if (_response.getStatus() >= 200)
ensureConsumeAllOrNotPersistent();
}

// RFC 7230, section 3.3.
if (!_request.isHead() &&
_response.getStatus() != HttpStatus.NOT_MODIFIED_304 &&
!_response.isContentComplete(_response.getHttpOutput().getWritten()))
{
if (sendErrorOrAbort("Insufficient content written"))
break;
}

// If send error is called we need to break.
if (checkAndPrepareUpgrade())
break;

// Set a close callback on the HttpOutput to make it an async callback
_response.completeOutput(Callback.from(NON_BLOCKING, () -> _state.completed(null), _state::completed));

break;
}

default:
throw new IllegalStateException(this.toString());
}
}
catch (Throwable failure)
{
if ("org.eclipse.jetty.continuation.ContinuationThrowable".equals(failure.getClass().getName()))
LOG.trace("IGNORED", failure);
else
handleException(failure);
}

action = _state.unhandle();
}

if (LOG.isDebugEnabled())
LOG.debug("!handle {} {}", action, this);

boolean suspended = action == Action.WAIT;
return !suspended;
}