Java中的IO流三

Java的IO相关内容。

通道

Java的NIO中的通道都实现了java.nio.channels.Channel接口,Channel接口只有两个方法isOpen()和close()方法,close()方法是AutoCloseable接口中的方法。

支持读操作的Channel可以实现ReadableByteChannel接口,该接口只有一个read方法,支持写操作的Channel可以实现WritableByteChannel接口,该接口只有一个write方法,支持读写操作以外的一些移动读写的Channel可以实现SeekableByteChannel,该接口有一些移动读取的方法。

ReadableByteChannel接口的子接口ScatteringByteChannel提供了不同的read方法,使用一个ByteBuffer数组的read方法,将通道的数据读到多个ByteBuffer中。

WritableByteChannel接口的子接口GatheringByteChannel提供了不同write方法,使用多个ByteBuffer,将多个ByteBuffer的数据写入通道。

文件通道

FileChannel实现了SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel接口,而且提供了一些文件操作相关的功能。

FileChannel可以通过open方法来创建,创建的时候可以传入OpenOption参数,StandardOpenOption枚举类定义了一些操作参数。

FileChannel也可以从已有的FileInputStream、FileOutputStream和RandomAccessFile类的对象中通过getChannel方法得到。

FileChannel的transferFrom和transferTo两个方法可以实现数据传输。

内存映射文件

在操作大文件的时候可以通过内存映射文件将大文件映射到内存,这样,可以直接通过内存操作来操作文件。

FileChannel类的map方法可以把一个文件的全部或部分内容映射到内存中,所得到的是一个ByteBuffer类的子类MappedByteBuffer的对象,程序只要对这个MappedByteBuffer类的对象进行操作即可。对该对象的修改会自动同步到文件中。

在进行内存映射的时候需要指定映射模式,一共有3种映射模式,定义在FileChannel.MapMode类中。

  • READ_ONLY : 只能对映射后的MappedByteBuffer对象进行读操作
  • READ_WRITE : 可以对映射后的MappedByteBuffer对象进行读写操作
  • PRIVATE : 对映射后的MappedByteBuffer对象进行的操作不会同步到文件中

FileChannel的force方法可以强制要求把更新同步到文件中。

文件锁定

FileChannel的lock和tryLock方法可以将当前文件通道对应的文件进行加锁,加锁可以锁定全部内容,也可以锁定部分内容。

文件加锁是用来与操作系统上的其他程序进行同步使用的。

套接字通道

java.nio.channels.NetworkChannel接口表示的是一个套接字对应的通道。

相对于Socket和ServerSocket类,Java NIO也提供了SocketChannel和ServerSocketChannel类。这两个都支持阻塞和非阻塞两种模式。

阻塞

例子:

客户端套接字

1
2
3
4
5
6
7
8
9
private static void loadWebPageUseSocket() throws IOException {
try (FileChannel dest = FileChannel.open(Paths.get("pages.html"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
SocketChannel channel = SocketChannel.open(new InetSocketAddress("www.baidu.com", 80))) {
String request = "GET / HTTP/1.1\r\n\r\nHost:www.baidu.com\r\n\r\n";
ByteBuffer bb = ByteBuffer.wrap(request.getBytes(StandardCharsets.UTF_8));
channel.write(bb);
dest.transferFrom(channel, 0, Integer.MAX_VALUE);
}
}

服务端套接字

1
2
3
4
5
6
7
8
9
private static void startSimpleServer() throws IOException {
ServerSocketChannel server = ServerSocketChannel.open();
server.bind(new InetSocketAddress("localhost", 8680));
while (true) {
try (SocketChannel channel = server.accept()) {
channel.write(ByteBuffer.wrap("hello there".getBytes(StandardCharsets.UTF_8)));
}
}
}

非阻塞

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class LoadWebPageUseSelector {

public void load(Set<URL> urls) throws IOException {
Map<SocketAddress, String> mapping = urlToSocketAddress(urls);
Selector selector = Selector.open();
for (SocketAddress socketAddress : mapping.keySet()) {
register(selector, socketAddress);
}
int finished = 0;
int total = mapping.size();
ByteBuffer bb = ByteBuffer.allocate(10240);
int len = -1;
while (finished < total) {
for (SelectionKey key : selector.selectedKeys()) {
if (key.isValid() && key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
InetSocketAddress address = (InetSocketAddress) channel.getRemoteAddress();
FileChannel fileChannel = FileChannel.open(Paths.get(address.getHostName() + ".html"), StandardOpenOption.APPEND, StandardOpenOption.CREATE);
bb.clear();
while ((len = channel.read(bb)) > 0 || bb.position() != 0) {
bb.flip();
fileChannel.write(bb);
bb.compact();
}
if (len == -1) {
finished++;
key.cancel();
}
} else if (key.isValid() && key.isConnectable()) {
SocketChannel channel = (SocketChannel) key.channel();
boolean success = channel.finishConnect();
if (!success) {
finished++;
key.cancel();
} else {
InetSocketAddress address = (InetSocketAddress) channel.getRemoteAddress();
String path = mapping.get(address);
String request = "GET " + path + " HTTP/1.1\r\n\r\nHost:" + address.getHostString() + "\r\n";
ByteBuffer header = ByteBuffer.wrap(request.getBytes(StandardCharsets.UTF_8));
channel.write(header);
}
}
}
}
}

private void register(Selector selector, SocketAddress socketAddress) throws IOException {
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(socketAddress);
channel.register(selector, SelectionKey.OP_CONNECT|SelectionKey.OP_READ);
}

private Map<SocketAddress, String> urlToSocketAddress(Set<URL> urls) {
Map<SocketAddress, String> map = new HashMap<>();
for (URL url : urls) {
int port = url.getPort() != -1 ? url.getPort() : url.getDefaultPort();
SocketAddress address = new InetSocketAddress(url.getHost(), port);
String path = url.getPath();
if (url.getQuery() != null) {
path = path + "?" + url.getQuery();
}
map.put(address, path);
}
return map;
}

}