Java的 I/O机制

UNIX网络编程对I/O模型的分类

Linux 的内核将所有外部设备都看做一个文件来操作(一切皆文件),对一个文件的读写操作会调用内核提供的系统命令,返回一个file descriptor(fd,文件描述符)。而对一个socket的读写也会有响应的描述符,称为socket fd(socket文件描述符),描述符就是一个数字,指向内核中的一个结构体(文件路径,数据区等一些属性)。

根据UNIX网络编程对I/O模型的分类,UNIX提供了5种I/O模型。

进程是无法直接操作I/O设备的,其必须通过系统调用请求内核来协助完成I/O动作,而内核会为每个I/O设备维护一个buffer。

image

整个请求过程为: 用户进程发起请求,内核接受到请求后,从I/O设备中获取数据到buffer中,再将buffer中的数据copy到用户进程的地址空间,该用户进程获取到数据后再响应客户端。

阻塞I/O模型

最常用的I/O模型,默认情况下,所有文件操作都是阻塞的。

比如I/O模型下的套接字接口:在进程空间中调用recvfrom,其系统调用直到数据包到达(比如,还没有收到一个完整的UDP包)且被复制到应用进程的缓冲区中或者发生错误时才返回,而在用户进程这边,整个进程会被阻塞。当内核一直等到数据准备好了,它就会将数据从内核中拷贝到用户内存,然后内核返回结果,用户进程才解除block的状态,重新运行起来。

进程在调用recvfrom开始到它返回的整段时间内都是被阻塞的,所以叫阻塞I/O模型。所以,blocking IO的特点就是在IO执行的两个阶段都被block了。

image

非阻塞I/O模型

当用户进程调用recvfrom时,系统不会阻塞用户进程,而是立刻返回一个ewouldblock错误,从用户进程角度讲 ,并不需要等待,而是马上就得到了一个结果。用户进程判断标志是ewouldblock时,就知道数据还没准备好,于是它就可以去做其他的事了,于是它可以再次发送recvfrom,一旦内核中的数据准备好了。并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。

当一个应用程序在一个循环里对一个非阻塞调用recvfrom,我们称为轮询。应用程序不断轮询内核,看看是否已经准备好了某些操作。这通常是浪费CPU时间,但这种模式偶尔会遇到。

image

I/O复用模型

单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

Linux提供select/poll,进程通过将一个或多个fd(文件描述符)传递给select或poll系统调用,阻塞在select操作上,这样,select/poll可以帮我们侦测多个fd是否处于就绪状态。

select/poll是顺序扫描fd是否就绪,而且支持的fd数量有限,因此它的使用受到了一些制约。

Linux还提供一个epoll系统调用,epoll使用基于事件驱动方式代替顺序扫描,因此性能更高。当有fd就绪时,立即回调函数rollback。

image

I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,pselect,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。

IO多路复用之select、poll、epoll详解

当用户进程调用了select,那么整个进程会被block,而同时,内核会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从内核拷贝到用户进程。

这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。(多说一句。所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)

在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

信号驱动I/O模型

首先开启套接口信号驱动I/O功能,并通过系统调用sigaction执行一个信号处理函数(此系统调用立即返回,进程继续工作,非阻塞)。当数据准备就绪时,就为改进程生成一个SIGIO信号,通过信号回调通知应用程序调用recvfrom来读取数据,并通知主循环函数处理树立。

image

异步I/O

告知内核启动某个操作,并让内核在整个操作完成后(包括数据的复制)通知进程。

信号驱动I/O模型通知的是何时可以开始一个I/O操作,异步I/O模型有内核通知I/O操作何时已经完成。

image

I/O多路复用技术

I/O编程中,需要处理多个客户端接入请求时,可以利用多线程或者I/O多路复用技术进行处理。

正如前面的简介,I/O多路复用技术通过把多个I/O的阻塞复用到同一个select的阻塞上,从而使得系统在单线程的情况下可以同时处理多个客户端请求。

与传统的多线程模型相比,I/O多路复用的最大优势就是系统开销小,系统不需要创建新的额外线程,也不需要维护这些线程的运行,降低了系统的维护工作量,节省了系统资源。

主要的应用场景:

  • 服务器需要同时处理多个处于监听状态或多个连接状态的套接字。
  • 服务器需要同时处理多种网络协议的套接字。
  • 支持I/O多路复用的系统调用主要有select、pselect、poll、epoll。

而当前推荐使用的是epoll,优势如下:

  • 支持一个进程打开的socket fd不受限制。
  • I/O效率不会随着fd数目的增加而线性下将。
  • 使用mmap加速内核与用户空间的消息传递。
  • epoll拥有更加简单的API。

image

image

image

Java I/O 概览

Java 的 I/O 大概可以分成以下几类:

  • 磁盘操作:File
  • 字节操作:InputStream 和 OutputStream
  • 字符操作:Reader 和 Writer
  • 对象操作:Serializable:序列化就是将一个对象转换成字节序列,方便存储和传输。
  • 网络操作:Socket
  • 新的输入/输出:NIO

Java I/O 使用了装饰者模式来实现

Java磁盘操作

File 类可以用于表示文件和目录的信息,但是它不表示文件的内容。

递归地列出一个目录下所有文件:

1
2
3
4
5
6
7
8
9
10
11
12
public static void listAllFiles(File dir) {
if (dir == null || !dir.exists()) {
return;
}
if (dir.isFile()) {
System.out.println(dir.getName());
return;
}
for (File file : dir.listFiles()) {
listAllFiles(file);
}
}

从 Java7 开始,可以使用 Paths 和 Files 代替 File。

Java字节操作

实现文件复制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void copyFile(String src, String dist) throws IOException {
FileInputStream in = new FileInputStream(src);
FileOutputStream out = new FileOutputStream(dist);

byte[] buffer = new byte[20 * 1024];
int cnt;

// read() 最多读取 buffer.length 个字节
// 返回的是实际读取的个数
// 返回 -1 的时候表示读到 eof,即文件尾
while ((cnt = in.read(buffer, 0, buffer.length)) != -1) {
out.write(buffer, 0, cnt);
}

in.close();
out.close();
}

装饰者模式

Java I/O 使用了装饰者模式来实现。以 InputStream 为例,

  • InputStream 是抽象组件;
  • FileInputStream 是 InputStream 的子类,属于具体组件,提供了字节流的输入操作;
  • FilterInputStream 属于抽象装饰者,装饰者用于装饰组件,为组件提供额外的功能。例如 BufferedInputStream 为 FileInputStream 提供缓存的功能。

image

实例化一个具有缓存功能的字节流对象时,只需要在 FileInputStream 对象上再套一层 BufferedInputStream 对象即可。

1
2
FileInputStream fileInputStream = new FileInputStream(filePath);
BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);

DataInputStream 装饰者提供了对更多数据类型进行输入的操作,比如 int、double 等基本类型。

Java字符操作

编码与解码

编码就是把字符转换为字节,而解码是把字节重新组合成字符。

如果编码和解码过程使用不同的编码方式那么就出现了乱码。

  • GBK 编码中,中文字符占 2 个字节,英文字符占 1 个字节;
  • UTF-8 编码中,中文字符占 3 个字节,英文字符占 1 个字节;
  • UTF-16be 编码中,中文字符和英文字符都占 2 个字节。

UTF-16be 中的 be 指的是 Big Endian,也就是大端。相应地也有 UTF-16le,le 指的是 Little Endian,也就是小端。

Java 的内存编码使用双字节编码 UTF-16be,这不是指 Java 只支持这一种编码方式,而是说 char 这种类型使用 UTF-16be 进行编码。char 类型占 16 位,也就是两个字节,Java 使用这种双字节编码是为了让一个中文或者一个英文都能使用一个 char 来存储。

String 的编码方式

String 可以看成一个字符序列,可以指定一个编码方式将它编码为字节序列,也可以指定一个编码方式将一个字节序列解码为 String。

1
2
3
4
String str1 = "中文";
byte[] bytes = str1.getBytes("UTF-8");
String str2 = new String(bytes, "UTF-8");
System.out.println(str2);

在调用无参数 getBytes() 方法时,默认的编码方式不是 UTF-16be。双字节编码的好处是可以使用一个 char 存储中文和英文,而将 String 转为 bytes[] 字节数组就不再需要这个好处,因此也就不再需要双字节编码。getBytes() 的默认编码方式与平台有关,一般为 UTF-8。

1
byte[] bytes = str1.getBytes();

Reader 与 Writer

不管是磁盘还是网络传输,最小的存储单元都是字节,而不是字符。但是在程序中操作的通常是字符形式的数据,因此需要提供对字符进行操作的方法。

  • InputStreamReader 实现从字节流解码成字符流;
  • OutputStreamWriter 实现字符流编码成为字节流。

实现逐行输出文本文件的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void readFileContent(String filePath) throws IOException {

FileReader fileReader = new FileReader(filePath);
BufferedReader bufferedReader = new BufferedReader(fileReader);

String line;
while ((line = bufferedReader.readLine()) != null) {
System.out.println(line);
}

// 装饰者模式使得 BufferedReader 组合了一个 Reader 对象
// 在调用 BufferedReader 的 close() 方法时会去调用 Reader 的 close() 方法
// 因此只要一个 close() 调用即可
bufferedReader.close();
}

Java对象操作

序列化

序列化就是将一个对象转换成字节序列,方便存储和传输。

  • 序列化:ObjectOutputStream.writeObject()
  • 反序列化:ObjectInputStream.readObject()

不会对静态变量进行序列化,因为序列化只是保存对象的状态,静态变量属于类的状态。

Serializable

序列化的类需要实现 Serializable 接口,它只是一个标准,没有任何方法需要实现,但是如果不去实现它的话而进行序列化,会抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void main(String[] args) throws IOException, ClassNotFoundException {

A a1 = new A(123, "abc");
String objectFile = "file/a1";

ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FileOutputStream(objectFile));
objectOutputStream.writeObject(a1);
objectOutputStream.close();

ObjectInputStream objectInputStream = new ObjectInputStream(new FileInputStream(objectFile));
A a2 = (A) objectInputStream.readObject();
objectInputStream.close();
System.out.println(a2);
}

private static class A implements Serializable {

private int x;
private String y;

A(int x, String y) {
this.x = x;
this.y = y;
}

@Override
public String toString() {
return "x = " + x + " " + "y = " + y;
}
}

transient

transient 关键字可以使一些属性不会被序列化。

ArrayList 中存储数据的数组 elementData 是用 transient 修饰的,因为这个数组是动态扩展的,并不是所有的空间都被使用,因此就不需要所有的内容都被序列化。通过重写序列化和反序列化方法,使得可以只序列化数组中有内容的那部分数据。

1
private transient Object[] elementData;

想序列化ArrayList的话,把elementdata的元素一个个读出来,一个个序列化。

补充阅读

Java网络操作

Java 中的网络支持:

  • InetAddress:用于表示网络上的硬件资源,即 IP 地址;
  • URL:统一资源定位符;
  • Sockets:使用 TCP 协议实现网络通信;
  • Datagram:使用 UDP 协议实现网络通信。

InetAddress

没有公有的构造函数,只能通过静态方法来创建实例。

1
2
InetAddress.getByName(String host);
InetAddress.getByAddress(byte[] address);

URL

可以直接从 URL 中读取字节流数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws IOException {

URL url = new URL("http://www.baidu.com");

/* 字节流 */
InputStream is = url.openStream();

/* 字符流 */
InputStreamReader isr = new InputStreamReader(is, "utf-8");

/* 提供缓存功能 */
BufferedReader br = new BufferedReader(isr);

String line;
while ((line = br.readLine()) != null) {
System.out.println(line);
}

br.close();
}

Sockets

  • ServerSocket:服务器端类
  • Socket:客户端类
  • 服务器和客户端通过 InputStream 和 OutputStream 进行输入输出。

image

Datagram

  • DatagramSocket:通信类
  • DatagramPacket:数据包类

零拷贝 zero-copy

概述

考虑这样一种常用的情形:你需要将静态内容(类似图片、文件)展示给用户。那么这个情形就意味着你需要先将静态内容从磁盘中拷贝出来放到一个内存buf中,然后将这个buf通过socket传输给用户,进而用户或者静态内容的展示。这看起来再正常不过了,但是实际上这是很低效的流程,我们把上面的这种情形抽象成下面的过程:

1
read(file, tmp_buf, len);write(socket, tmp_buf, len);

首先调用read将静态内容,这里假设为文件A,读取到tmp_buf, 然后调用write将tmp_buf写入到socket中,如图:

image

在这个过程中文件A的经历了4次copy的过程:

  1. 首先,调用read时,文件A拷贝到了kernel模式;
  2. 之后,CPU控制将kernel模式数据copy到user模式下;
  3. 调用write时,先将user模式下的内容copy到kernel模式下的socket的buffer中;
  4. 最后将kernel模式下的socket buffer的数据copy到网卡设备中传送;

从上面的过程可以看出,数据白白从kernel模式到user模式走了一圈,浪费了2次copy(第一次,从kernel模式拷贝到user模式;第二次从user模式再拷贝回kernel模式,即上面4次过程的第2和3步骤。)。而且上面的过程中kernel和user模式的上下文的切换也是4次。

幸运的是,你可以用一种叫做Zero-Copy的技术来去掉这些无谓的copy。应用程序用Zero-Copy来请求kernel直接把disk的data传输给socket,而不是通过应用程序传输。Zero-Copy大大提高了应用程序的性能,并且减少了kernel和user模式上下文的切换。

详述

Zero-Copy技术省去了将操作系统的read buffer拷贝到程序的buffer,以及从程序buffer拷贝到socket buffer的步骤,直接将read buffer拷贝到socket buffer. Java NIO中的FileChannal.transferTo()方法就是这样的实现,这个实现是依赖于操作系统底层的sendFile()实现的。

1
public void transferTo(long position, long count, WritableByteChannel target);

他底层的调用时系统调用sendFile()方法:

1
#include <sys/socket.h>ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

下图展示了在transferTo()之后的数据流向:

image

下图展示了在使用transferTo()之后的上下文切换:

image

使用了Zero-Copy技术之后,整个过程如下:

  1. transferTo()方法使得文件A的内容直接拷贝到一个read buffer(kernel buffer)中;
  2. 然后数据(kernel buffer)拷贝到socket buffer中。
  3. 最后将socket buffer中的数据拷贝到网卡设备(protocol engine)中传输;
    这显然是一个伟大的进步:这里把上下文的切换次数从4次减少到2次,同时也把数据copy的次数从4次降低到了3次。
    但是这是Zero-Copy么,答案是否定的。

进阶

Linux 2.1内核开始引入了sendfile函数(上一节有提到),用于将文件通过socket传送。

1
sendfile(socket, file, len);

该函数通过一次系统调用完成了文件的传送,减少了原来read/write方式的模式切换。此外更是减少了数据的copy, sendfile的详细过程如图:

image

通过sendfile传送文件只需要一次系统调用,当调用sendfile时:

  1. 首先(通过DMA)将数据从磁盘读取到kernel buffer中;
  2. 然后将kernel buffer拷贝到socket buffer中;
  3. 最后将socket buffer中的数据copy到网卡设备(protocol engine)中发送;

这个过程就是第二节(详述)中的那个步骤。

sendfile与read/write模式相比,少了一次copy。但是从上述过程中也可以发现从kernel buffer中将数据copy到socket buffer是没有必要的。

Linux2.4 内核对sendfile做了改进,如图:

image

改进后的处理过程如下:

  1. 将文件拷贝到kernel buffer中;
  2. 向socket buffer中追加当前要发生的数据在kernel buffer中的位置和偏移量;
  3. 根据socket buffer中的位置和偏移量直接将kernel buffer的数据copy到网卡设备(protocol engine)中;

经过上述过程,数据只经过了2次copy就从磁盘传送出去了。这个才是真正的Zero-Copy(这里的零拷贝是针对kernel来讲的,数据在kernel模式下是Zero-Copy)。

正是Linux2.4的内核做了改进,Java中的TransferTo()实现了Zero-Copy,如下图:

image

Zero-Copy技术的使用场景有很多,比如Kafka, 又或者是Netty等,可以大大提升程序的性能。

image

Java NIO

NIO,就是 New IO,是非阻塞 IO。支持面向缓冲区,基于通道的 IO 操作。

传统的 IO 是面向流的,并且是单向的。如果要进行双工的通信,要建立两个流,一个输入流,一个输出流。

NIO 是面向缓冲区的,是双向的,通信双方建立一个通道,然后把数据存放在缓冲区中,然后缓冲区在通道中流动进行数据的传输。打个比方,通道 —— 铁路,缓冲区 —— 火车,数据 —— 人,一堆人想要从 A 地到达 B 地,首先需要把人装上火车,然后火车在铁路上从 A 地跑到 B 地,实现将人从 A 地运到 B 地;同样的道理,一堆数据如果想要从 A 地运到 B 地,需要先把数据装入缓冲区,然后将缓冲区从 A 地跑到 B 地,从而实现将数据从 A 地运到 B 地。

缓冲区 Buffer

首先,有这些个 Buffer 种类:

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

简而言之就是,8 个基本类型,除了 reference 都有相应类型的缓冲区在。

缓冲区中的核心属性

1
2
3
4
5
// Invariants: mark <= position <= limit <= capacity
private int mark = -1; // 可以记录当前 position 的位置,然后调用 reset(),可以把 position 恢复到这个位置
private int position = 0; // 缓冲区中正在操作的数据的位置
private int limit; // 缓冲区中可以操作数据的大小,limit 后的数据是不能进行读写的
private int capacity; // 缓冲区中最大存储数据的容量,一旦声明不能改变

Note: 抽象类虽然自身不可以实例化,但是其子类覆盖了所有的抽象方法后,是可以实例化的,所以抽象类的构造函数,适用于给其子类对象进行初始化的。

所以对于 ByteBuffer.allocate() 方法,实际上是新建了一个 ByteBuffer 抽象类的子类 HeapByteBuffer 对象,HeapByteBuffer 类实现了 ByteBuffer 的所有抽象方法,所以我们可以通过调用 ByteBuffer 抽象类的构造函数来初始化 HeapByteBuffer 对象。

直接缓冲区与非直接缓冲区

非直接缓冲区 直接缓冲区
分配方法 allocate() allocateDirect()
特点 将缓冲区建立在 JVM 内存中 将缓冲区建立在物理内存 (直接内存) 中,可以提高效率

可以通过 isDirect() 来判断当前的缓冲区是不是直接缓冲区。

为什么使用直接缓冲区可以提高效率呢?

正常情况下,如果你想将一些数据写到物理磁盘上,你需要先将数据从 JVM 内存中 copy 到内核地址空间,因为内核才真正具有控制计算机硬件资源的功能,用户态运行的上层应用程序只能通过系统调用来让内核态的资源来帮助将数据写入硬盘。

image

当要将一个超大的文件写到硬盘上时,这个 copy 的操作就显得很费时了。

image

而使用直接内存,我们可以操作物理磁盘在内存中的内存映射文件,来直接将物理硬盘上的数据加载进内存,或者将内存中的写入硬盘中,而这个映射,其实是一个物理地址到逻辑地址的转换(或者逆过程)。

image

常用方法代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class BufferDemo {
private ByteBuffer buffer = ByteBuffer.allocate(1024);
private ByteBuffer directBuffer = ByteBuffer.allocateDirect(1024);

/**
* 打印 buffer 的 4 大核心属性
*/
@Test
public void test0() {
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
System.out.println("---------------------");
}

/**
* Test put, get, flip, rewind,
*/
@Test
public void test1() {
String str = "abcde";
/* put:向缓冲区中写入数据 */
buffer.put(str.getBytes());
System.out.println(buffer.position()); // 5
System.out.println(buffer.limit()); // 1024
System.out.println(buffer.capacity()); // 1024
System.out.println("---------------------");

/* flip:将缓冲区从写状态切换到读状态 */
buffer.flip();
System.out.println(buffer.position()); // 0
System.out.println(buffer.limit()); // 5
System.out.println(buffer.capacity()); // 1024
System.out.println("---------------------");

/* get:从缓冲区中读数据出来 */
byte[] tmp = new byte[2];
buffer.get(tmp);
System.out.println("get result: " + new String(tmp));
System.out.println(buffer.position()); // 2
System.out.println(buffer.limit()); // 5
System.out.println(buffer.capacity()); // 1024
System.out.println(buffer.remaining()); // 3,看看还有多少元素可读
System.out.println("---------------------");

/* mark:记录当前 position 的位置到 mark 变量 */
/* reset:令 position = mark */
buffer.mark();
buffer.get(tmp);
System.out.println("get result: " + new String(tmp));
System.out.println("before reset: position = " + buffer.position()); // 4
buffer.reset();
System.out.println("after reset: position = " + buffer.position()); // 2
System.out.println("---------------------");

/* rewind:令 position = 0,就是个倒带的操作 */
buffer.rewind();
System.out.println(buffer.position()); // 0
System.out.println(buffer.limit()); // 5
System.out.println(buffer.capacity()); // 1024
System.out.println(buffer.remaining()); // 5
System.out.println("---------------------");

/* compact:从读状态切换回写状态,可以接着上回写的地方继续往下写 */
buffer.compact();
System.out.println(buffer.position()); // 5
System.out.println(buffer.limit()); // 1024
System.out.println(buffer.capacity()); // 1024
System.out.println(buffer.remaining()); // 1019
System.out.println("---------------------");

/* clear,实际上并没有将数据真的清除,只有当新的数据把旧的数据覆盖了,旧的数据才真的被清除 */
buffer.clear();
System.out.println(buffer.position()); // 0
System.out.println(buffer.limit()); // 1024
System.out.println(buffer.capacity()); // 1024
System.out.println(buffer.remaining()); // 1024
System.out.println("---------------------");

/* isDirect:判断缓冲区是不是直接内存缓冲区 */
System.out.println(buffer.isDirect()); // false
System.out.println(directBuffer.isDirect()); // true
System.out.println("---------------------");
}
}

通道 Channel

用于连接两个节点。在 NIO 中负责缓冲区中数据的传输,它本身是不能存储数据的。

主要实现类

  • java.nio.channels.Channel 接口
    • 本地传输:
      • FileChannel
    • 网络传输:
      • SocketChannel
      • ServerSocketChannel
      • DatagramChannel

获取通道

Java 中以下类拥有 getChannel() 方法,可以通过这个方法获取对应的通道。

  • 本地 IO
    • FileInputStream / FileOutputStream
    • RandomAccessFile
  • 网络 IO
    • Socket
    • ServerSocket
    • DatagramSocket

除此之外,JDK 1.7 中的 AIO 还提供了以下两种获取通道的方式:

  • 为各个通道提供了静态方法 open()
  • Files.newByteChannel()

通道之间的数据传输

  • transferFrom()
  • transferTo()

分散读取与聚集写入

就是以前是用一个缓冲区协助通道传输数据,Scatter 和 Gather 就是用一堆缓冲区(缓冲区数组)协助通道传输数据。

分散读取 Scattering Reads:按照缓冲区的顺序,将从 Channel 中读取的数据依次将 Buffer 填满。 聚集写入 Gathering Writes:按照缓冲区的顺序,写入position 和 limit 之间的数据到 Channel 。
image image

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
public class ChannelDemo {
/**
* 利用通道完成文件的复制(非直接缓冲区)
*/
@Test
public void copyFile1() {
FileInputStream in = null;
FileOutputStream out = null;
FileChannel inChannel = null;
FileChannel outChannel = null;
try {
in = new FileInputStream("1.png");
out = new FileOutputStream("2.png");
inChannel = in.getChannel();
outChannel = out.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (inChannel.read(buffer) != -1) {
buffer.flip();
outChannel.write(buffer);
buffer.clear();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (in != null) {
in.close();
}
if (out != null) {
out.close();
}
if (inChannel != null) {
inChannel.close();
}
if (outChannel != null) {
out.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* 使用直接缓冲区完成文件的复制(内存映射文件),通过 Channel 的 map 方法获取的缓冲区就是直接缓冲区
*/
@Test
public void copyFile2() {
FileChannel inChannel = null;
FileChannel outChannel = null;
try {
inChannel = FileChannel.open(Paths.get("1.png"), StandardOpenOption.READ);
outChannel = FileChannel.open(Paths.get("2.png"), StandardOpenOption.WRITE,
StandardOpenOption.READ, StandardOpenOption.CREATE);
MappedByteBuffer inBuffer = inChannel.map(
FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outBuffer = outChannel.map(
FileChannel.MapMode.READ_WRITE, 0, inChannel.size());

byte[] dst = new byte[inBuffer.limit()];
inBuffer.get(dst);
outBuffer.put(dst);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (inChannel != null) {
try {
inChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (outChannel != null) {
try {
outChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

/**
* 通道之间的数据 transfer (直接缓冲区)
*/
@Test
public void copyFile3() {
FileChannel inChannel = null;
FileChannel outChannel = null;
try {
inChannel = FileChannel.open(Paths.get("1.png"), StandardOpenOption.READ);
outChannel = FileChannel.open(Paths.get("2.png"), StandardOpenOption.WRITE,
StandardOpenOption.READ, StandardOpenOption.CREATE);
// 以下两行效果相等
inChannel.transferTo(0, inChannel.size(), outChannel);
outChannel.transferFrom(inChannel, 0, inChannel.size());
} catch (IOException e) {
e.printStackTrace();
} finally {
if (inChannel != null) {
try {
inChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (outChannel != null) {
try {
outChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

阻塞与非阻塞

NIO 完成网络通信的三个核心

  • 通道(Channel):负责连接。
    • SocketChannel
    • ServerSocketChannel
    • DatagramChannel
    • Pipe.SinkChannel
    • Pipe.SourceChannel
  • 缓冲区(Buffer):负责数据存取。
  • 选择器(Selector):是 SelectableChannel 的多路复用器,用于监控 SelectableChannel 的 IO 状况。

DatagramChannel

管道 Pipe

Java NIO 的 Pipe 可以在两个线程之间建立单项的数据连接,管道有一个 Pipe.SinkChannel 和 一个 Pipe.SourceChannel,数据会被写到 Pipe.SinkChannel,然后从 Pipe.SourceChannel 中读取。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class NioPipeDemo {
@Test
public void test() throws IOException {
Pipe pipe = Pipe.open();
Pipe.SinkChannel sinkChannel = pipe.sink();

String str = "我是管道发来的数据";
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put(str.getBytes());
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
sinkChannel.write(byteBuffer);
}

Pipe.SourceChannel sourceChannel = pipe.source();
byteBuffer.clear();
int len = sourceChannel.read(byteBuffer);
System.out.println(new String(byteBuffer.array(), 0, len));

sinkChannel.close();
sourceChannel.close();
}
}

另一篇总结

Java BIO

BIO 全称Block-IO 是一种同步且阻塞的通信模式。是一个比较传统的通信方式,模式简单,使用方便。但并发处理能力低,通信耗时,依赖网速。

Java NIO

Java NIO,全程 Non-Block IO ,是Java SE 1.4版以后,针对网络传输效能优化的新功能。是一种非阻塞同步的通信模式。
NIO 与原来的 I/O 有同样的作用和目的, 他们之间最重要的区别是数据打包和传输的方式。原来的 I/O 以流的方式处理数据,而 NIO 以块的方式处理数据。
面向流的 I/O 系统一次一个字节地处理数据。一个输入流产生一个字节的数据,一个输出流消费一个字节的数据。
面向块的 I/O 系统以块的形式处理数据。每一个操作都在一步中产生或者消费一个数据块。按块处理数据比按(流式的)字节处理数据要快得多。但是面向块的 I/O 缺少一些面向流的 I/O 所具有的优雅性和简单性。

Java AIO

Java AIO,全程 Asynchronous IO,是异步非阻塞的IO。是一种非阻塞异步的通信模式。

在NIO的基础上引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。

我们站在宏观的角度,重新画一下重点:

BIO (Blocking I/O):同步阻塞I/O模式。
NIO (New I/O):同步非阻塞模式。
AIO (Asynchronous I/O):异步非阻塞I/O模型。

那么,同步阻塞、同步非阻塞、异步非阻塞都是怎么回事呢?关于这部分内容也可以查看《漫话:如何给女朋友解释什么是IO中的阻塞、非阻塞、同步、异步?》。

  • 同步阻塞模式:这种模式下,我们的工作模式是先来到厨房,开始烧水,并坐在水壶面前一直等着水烧开。

  • 同步非阻塞模式:这种模式下,我们的工作模式是先来到厨房,开始烧水,但是我们不一直坐在水壶前面等,而是回到客厅看电视,然后每隔几分钟到厨房看一下水有没有烧开。

  • 异步非阻塞I/O模型:这种模式下,我们的工作模式是先来到厨房,开始烧水,我们不一一直坐在水壶前面等,也不隔一段时间去看一下,而是在客厅看电视,水壶上面有个开关,水烧开之后他会通知我。

  • 阻塞VS非阻塞:人是否坐在水壶前面一直等。

  • 同步VS异步:水壶是不是在水烧开之后主动通知人。

适用场景

BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序直观简单易理解。

NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。

AIO方式适用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。

实现

BIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
//Initializes The Object
User1 user = new User1();
user.setName("hollis");
user.setAge(23);
System.out.println(user);

//Write Obj to File
ObjectOutputStream oos = null;
try {
oos = new ObjectOutputStream(new FileOutputStream("tempFile"));
oos.writeObject(user);
} catch (IOException e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(oos);
}

//Read Obj from File
File file = new File("tempFile");
ObjectInputStream ois = null;
try {
ois = new ObjectInputStream(new FileInputStream(file));
User1 newUser = (User1) ois.readObject();
System.out.println(newUser);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(ois);
try {
FileUtils.forceDelete(file);
} catch (IOException e) {
e.printStackTrace();
}
}

//Initializes The Object
User1 user = new User1();
user.setName("hollis");
user.setAge(23);
System.out.println(user);

//Write Obj to File
ObjectOutputStream oos = null;
try {
oos = new ObjectOutputStream(new FileOutputStream("tempFile"));
oos.writeObject(user);
} catch (IOException e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(oos);
}

//Read Obj from File
File file = new File("tempFile");
ObjectInputStream ois = null;
try {
ois = new ObjectInputStream(new FileInputStream(file));
User1 newUser = (User1) ois.readObject();
System.out.println(newUser);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(ois);
try {
FileUtils.forceDelete(file);
} catch (IOException e) {
e.printStackTrace();
}
}

NIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
static void readNIO() {
String pathname = "C:\\Users\\adew\\Desktop\\jd-gui.cfg";
FileInputStream fin = null;
try {
fin = new FileInputStream(new File(pathname));
FileChannel channel = fin.getChannel();

int capacity = 100;// 字节
ByteBuffer bf = ByteBuffer.allocate(capacity);
int length = -1;

while ((length = channel.read(bf)) != -1) {

bf.clear();
byte[] bytes = bf.array();
System.out.write(bytes, 0, length);
System.out.println();
}

channel.close();

} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fin != null) {
try {
fin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

static void writeNIO() {
String filename = "out.txt";
FileOutputStream fos = null;
try {

fos = new FileOutputStream(new File(filename));
FileChannel channel = fos.getChannel();
ByteBuffer src = Charset.forName("utf8").encode("你好你好你好你好你好");
int length = 0;

while ((length = channel.write(src)) != 0) {
System.out.println("写入长度:" + length);
}

} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

AIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class ReadFromFile {
public static void main(String[] args) throws Exception {
Path file = Paths.get("/usr/a.txt");
AsynchronousFileChannel channel = AsynchronousFileChannel.open(file);

ByteBuffer buffer = ByteBuffer.allocate(100_000);
Future<Integer> result = channel.read(buffer, 0);

while (!result.isDone()) {
ProfitCalculator.calculateTax();
}
Integer bytesRead = result.get();
System.out.println("Bytes read [" + bytesRead + "]");
}
}
class ProfitCalculator {
public ProfitCalculator() {
}
public static void calculateTax() {
}
}

public class WriteToFile {

public static void main(String[] args) throws Exception {
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(
Paths.get("/asynchronous.txt"), StandardOpenOption.READ,
StandardOpenOption.WRITE, StandardOpenOption.CREATE);
CompletionHandler<Integer, Object> handler = new CompletionHandler<Integer, Object>() {

@Override
public void completed(Integer result, Object attachment) {
System.out.println("Attachment: " + attachment + " " + result
+ " bytes written");
System.out.println("CompletionHandler Thread ID: "
+ Thread.currentThread().getId());
}

@Override
public void failed(Throwable e, Object attachment) {
System.err.println("Attachment: " + attachment + " failed with:");
e.printStackTrace();
}
};

System.out.println("Main Thread ID: " + Thread.currentThread().getId());
fileChannel.write(ByteBuffer.wrap("Sample".getBytes()), 0, "First Write",
handler);
fileChannel.write(ByteBuffer.wrap("Box".getBytes()), 0, "Second Write",
handler);

}
}