flame

路漫漫其修远兮 吾将上下而求索

0%

Java NIO

前言

​ 本文是对java中nio(non-blocking io)的总结:

  • 网络服务的总结

  • Reactor模式

  • java.nio的api

  • 代码实现

NIO-新的还是非阻塞的

新的还是非阻塞的

NIO 最开始是新的输入/输出(New Input/Output)的英文缩写,但是,该Java API 已经出现足够长的时间 了,不再是“新的”了,因此,如今大多数的用户认为NIO 代表非阻塞 I/O(Non-blocking I/O),而阻塞I/O(blocking I/O)是旧的输入/输出(old input/output,OIO)。你也可能遇到它被称为普通I/O(plain I/O)的时候。

java.nio.channels.Selector是 Java 的非阻塞 I/O 实现的关键。

它使用了事件通知 API 以确定在一组非阻塞套接字中有哪些已经就绪能够进 行 I/O 相关的操作。

使用较少的线程便可以处理许多连接,因此也减少了内存管理和上下文切换所带来开销。

当没有 I/O 操作需要处理的时候,线程也可以被用于其他任务。

网络服务

下图是网络服务经典设计, 每个handler都是在自己的单独线程中执行

Classic Network Service Designs

从上图中可以看出, 网络服务中的任务可以划分为:

  • Read request

  • Decode request

  • Process service

  • Encode reply

  • Send reply

而在不同类型的网络服务中,任务的性质和成本是有区别,例如:解析XML、传送文件、生成web页面、纯计算任务等等

C10K C10M问题

见于 C10K C10M的经典问题

可扩展的目标

在网络服务中,我们会关注如下目标:

  • 负载持续增加时可以优雅的降级
  • 能够通过增加系统硬件资源(CPU, memory, disk, bandwidth),持续提升系统的处理能力
  • 低延迟
  • 能够满足峰值需要
  • 可调优的服务质量

而在实现可扩展的目标时,最优的策略一般是分而治之

设计策略:分而治之

  • 任务划分

    把任务划分为更小的任务进行处理,每个任务的执行都是非阻塞

  • 事件触发

    一个IO事件看作是任务执行的触发器

  • 基本机制: java.nio的支持

    非阻塞的读和写

    监听事件, 把事件分发给事件关联的任务

  • 变化的可能

    基于事件驱动的模式

基于事件驱动的模式

Usually more efficient than alternatives

  • Fewer resources

    Don’t usually need a thread per client

  • Less overhead

    Less context switching, often less locking

  • But dispatching can be slower

    Must manually bind actions to events

Usually harder to program

  • Must break up into simple non-blocking actions

    Similar to GUI event-driven actions

    Cannot eliminate all blocking: GC, page faults, etc

  • Must keep track of logical state of service

Reactor模式

Reactor是基于事件驱动的模式,常用于网络编程

在Reactor Pattern中的作用划分

  • Reactor

    responds to IO events by dispatching the appropriate handler

  • Handlers

    perform non-blocking actions

  • Manage

    by binding handlers to events

Reactor论文

论文详情: An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events

Reactor论文的解决方案:

Solution

Integrate the synchronous demultiplexing of events and the dispatching of their corresponding event handlers that process the events. In addition, decouple the applicationspecific dispatching and implementation of services from the general-purpose event demultiplexing and dispatching mechanisms. For each service the application offers, introduce a separate Event Handler that processes certain types of events. All Event Handlers implement the same interface. Event Handlers register with an Initiation Dispatcher, which uses a Synchronous Event Demultiplexer to wait for events to occur. When events occur, the Synchronous Event Demultiplexer notifies the Initiation Dispatcher, which synchronously calls back to the Event Handler associated with the event. The Event Handler then dispatches the event to the method that implements the requested service.

下图是Reactor论文总结的基本结构

Reactor论文总结的基本结构

下图是Reactor论文总结的协作

Reactor论文总结的协作

Reactor Paper Collaboration Scenarios

下图是Reactor论文的举例 : Client Connects to a Reactive Logging Server

Scenarios: Client Connects to a Reactive Logging Server

This sequence of steps can be summarized as follows:

  1. The logging server (1) registers the Logging Acceptor with the Initiation Dispatcher to handle connection requests;
  2. The logging server invokes the handle events method (2) of the Initiation Dispatcher;
  3. The Initiation Dispatcher invokes the synchronous event demultiplexing select (3) operation to wait for connection requests or logging data to arrive;
  4. A client connects (4) to the logging server;
  5. The Logging Acceptor is notified by the Initiation Dispatcher (5) of the new connection request;
  6. The Logging Acceptor accepts (6) the new connection;
  7. The Logging Acceptor creates (7) a Logging Handler to service the new client;
  8. Logging Handler registers (8) its socket handle with the Initiation Dispatcher and instructs the dispatcher to notify it when the socket becomes “ready for reading.”

下图是Reactor模式的举例:Client Sends Logging Record to a Reactive Logging Server

Scenarios:Client Sends Logging Record to a Reactive Logging Server

The sequence of steps that the reactive logging server takes to service a logging record.

  1. The client sends (1) a logging record;
  2. The Initiation Dispatcher notifies (2) the associated Logging Handler when a client logging record is queued on its socket handle by OS;
  3. The record is received (3) in a non-blocking manner (steps 2 and 3 repeat until the logging record has been received completely);
  4. The Logging Handler processes the logging record and writes (4) it to the standard output.
  5. The Logging Handler returns (5) control to the Initiation Dispatcher’s event loop.

Basic version

Single thread version

Multithreaded versions

Worker Thread Pools

Using multiple reactors

Using Multiple Reactors

java nio

Doug Lea : Scalable IO in Java

java-nio原理是基于非阻塞同步IO模型(见于I/O模型), 解决的是IO执行的性能问题。

java nio的特性主要分为四大类:

  • Channel

    Connections to files, sockets etc that support non-blocking reads

  • Buffer

    Array-like objects that can be directly read or written by Channels

  • Selector

    Tell which of a set of Channels have IO events

  • SelectionKey

    Maintain IO event status and bindings

Api详情

Buffer

Buffer ByteBuffe r (CharBuffer, LongBuffer, etc not shown)

buffer的内部结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
abstract class Buffer {
int capacity();
int position();
Buffer position(int newPosition);
int limit();
Buffer limit(int newLimit);
Buffer mark();
Buffer reset();
Buffer clear();
Buffer flip();
Buffer rewind();
int remaining();
boolean hasRemaining();
boolean isReadOnly();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
abstract class ByteBuffer extends Buffer {
static ByteBuffer allocateDirect(int capacity);
static ByteBuffer allocate(int capacity);
static ByteBuffer wrap(byte[] src, int offset, int len);
static ByteBuffer wrap(byte[] src);
boolean isDirect();
ByteOrder order();
ByteBuffer order(ByteOrder bo);
ByteBuffer slice();
ByteBuffer duplicate();
ByteBuffer compact();
ByteBuffer asReadOnlyBuffer();
byte get();
byte get(int index);
ByteBuffer get(byte[] dst, int offset, int length);
ByteBuffer get(byte[] dst);
ByteBuffer put(byte b);
ByteBuffer put(int index, byte b);
ByteBuffer put(byte[] src, int offset, int length);
ByteBuffer put(ByteBuffer src);
ByteBuffer put(byte[] src);
char getChar();
char getChar(int index);
ByteBuffer putChar(char value);
ByteBuffer putChar(int index, char value);
CharBuffer asCharBuffer();
short getShort();
short getShort(int index);
ByteBuffer putShort(short value);
ByteBuffer putShort(int index, short value);
ShortBuffer asShortBuffer();
int getInt();
int getInt(int index);
ByteBuffer putInt(int value);
ByteBuffer putInt(int index, int value);
IntBuffer asIntBuffer();
long getLong();
long getLong(int index);
ByteBuffer putLong(long value);
ByteBuffer putLong(int index, long value);
LongBuffer asLongBuffer();
float getFloat();
float getFloat(int index);
ByteBuffer putFloat(float value);
ByteBuffer putFloat(int index, float value);
FloatBuffer asFloatBuffer();
double getDouble();
double getDouble(int index);
ByteBuffer putDouble(double value);
ByteBuffer putDouble(int index, double value);
DoubleBuffer asDoubleBuffer();
}

Channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
interface Channel {
boolean isOpen();
void close() throws IOException;
}

interface ReadableByteChannel extends Channel {
int read(ByteBuffer dst) throws IOException;
}

interface WritableByteChannel extends Channel {
int write(ByteBuffer src) throws IOException;
}

interface ScatteringByteChannel extends ReadableByteChannel {
int read(ByteBuffer[] dsts, int offset, int length)
throws IOException;
int read(ByteBuffer[] dsts) throws IOException;
}

interface GatheringByteChannel extends WritableByteChannel {
int write(ByteBuffer[] srcs, int offset, int length)
throws IOException;
int write(ByteBuffer[] srcs) throws IOException;
}

SelectableChannel

1
2
3
4
5
6
7
8
9
10
11
abstract class SelectableChannel implements Channel {
int validOps();
boolean isRegistered();
SelectionKey keyFor(Selector sel);
SelectionKey register(Selector sel, int ops)
throws ClosedChannelException;
void configureBlocking(boolean block)
throws IOException;
boolean isBlocking();
Object blockingLock();
}

SocketChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 abstract class SocketChannel implements ByteChannel ... {
static SocketChannel open() throws IOException;
Socket socket();
int validOps();
boolean isConnected();
boolean isConnectionPending();
boolean isInputOpen();
boolean isOutputOpen();
boolean connect(SocketAddress remote) throws IOException;
boolean finishConnect() throws IOException;
void shutdownInput() throws IOException;
void shutdownOutput() throws IOException;
int read(ByteBuffer dst) throws IOException;
int read(ByteBuffer[] dsts, int offset, int length)
throws IOException;
int read(ByteBuffer[] dsts) throws IOException;
int write(ByteBuffer src) throws IOException;
int write(ByteBuffer[] srcs, int offset, int length)
throws IOException;
int write(ByteBuffer[] srcs) throws IOException;
}

ServerSocketChannel

1
2
3
4
5
6
abstract class ServerSocketChannel extends ... {
static ServerSocketChannel open() throws IOException;
int validOps();
ServerSocket socket();
SocketChannel accept() throws IOException;
}

FileChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
abstract class FileChannel implements ... {
int read(ByteBuffer dst);
int read(ByteBuffer dst, long position);
int read(ByteBuffer[] dsts, int offset, int length);
int read(ByteBuffer[] dsts);
int write(ByteBuffer src);
int write(ByteBuffer src, long position);
int write(ByteBuffer[] srcs, int offset, int length);
int write(ByteBuffer[] srcs);
long position();
void position(long newPosition);
long size();
void truncate(long size);
void force(boolean flushMetaDataToo);
int transferTo(long position, int count,
WritableByteChannel dst);
int transferFrom(ReadableByteChannel src,
long position, int count);
FileLock lock(long position, long size, boolean shared);
FileLock lock();
FileLock tryLock(long pos, long size, boolean shared);
FileLock tryLock();
static final int MAP_RO, MAP_RW, MAP_COW;
MappedByteBuffer map(int mode, long position, int size);
}
NOTE: ALL methods throw IOException

Selector

1
2
3
4
5
6
7
8
9
10
 abstract class Selector {
static Selector open() throws IOException;
Set keys();
Set selectedKeys();
int selectNow() throws IOException;
int select(long timeout) throws IOException;
int select() throws IOException;
void wakeup();
void close() throws IOException;
}

SelectionKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
abstract class SelectionKey {
static final int OP_READ, OP_WRITE,
OP_CONNECT, OP_ACCEPT;
SelectableChannel channel();
Selector selector();
boolean isValid();
void cancel();
int interestOps();
void interestOps(int ops);
int readyOps();
boolean isReadable();
boolean isWritable();
boolean isConnectable();
boolean isAcceptable();
Object attach(Object ob);
Object attachment();
}

代码样例

Client发送Ping, Server响应Pong