Summary:
1,模型的不同,流模型vs Channel模型
2,在网络I/O中,selector的使用。
参考:http://www-128.ibm.com/developerworks/cn/java/j-javaio/index.html


概要:J2SE1.4加入了新的I/O库(NIO)允许在Java应用中使用高速I/O。IO使用了新的I/O模型,它与原有的I/O库使用的模型有着很大区别。这篇文章将一步一步教您使用NIO库中的select工具。select使您的服务器可以处理来自多个连接的大数据量。在对NIO库的简要介绍后,本文还讲解了select工具的原理,最后分析了一个利用select工作的服务器的源码。
Java使用了非常优雅的基于流的I/O模型。流是一种产生或消耗字节序列的对象。流可以与过滤程序连接在一起,扩展到可以处理各种不同的数据。流模型非常复杂,但是效率不高。这对于大多数应用来说还好,但当系统需要和硬件处理同样高的速度时,流模型就无法应付了。
J2SE1.4中引入了新I/O库(NIO)来解决这个问题。NIO使用面向缓冲(buffer)的模型。这就是说,NIO主要处理大块的数据。这就避免了利用流模型处理所引起的问题,在有可能的情况下,它甚至可以为了得到最大的吞吐量而使用系统级的工具。
第一次了解到NIO是如何工作的,我就将它应用到高速服务器应用中了。
NIO系统
NIO是基于通道(channels)和缓冲(buffers)这两个概念的。通道有点像流模型中的流。而缓冲在流模型中没有类似的概念。
基本流InputStream和OutputStream能够读写字节数据;它们的子类可以读写各种各样的数据。在NIO中,所有的数据都通过缓冲读写。从图1可以看到两种模型的比较。
图1.流模型使用Streams和Bytes;NIO模型使用Channels和Buffers同时要注意的还有流模型使用了InputStream和OutputStream分别用来读写数据,而NIO只使用通道来完成两者的功能。
使用缓冲的好处是它可以大块的处理数据。你可以读写大块数据,缓冲的大小只受你所分配的内存数量的限制。
缓冲的另一个好处是它可以表示系统级的缓冲。多种系统采用统一的内存配置完成I/O处理,而不需要将数据从系统内存中拷贝到应用程序的内存空间。buffer对象的不同实现可以直接表示这些系统级的缓冲,这就意味着你可以用最少的拷贝次数来完成对数据的读写(参看图2)。
图2.系统缓冲允许你使用系统级缓冲,避免额外的复制select工具select提供了一种很好的方法来完成大量的数据源并行处理。它的名字来源于Unix系统中提供相同功能的C程序系统调用select()。
通常,I/O属于阻塞式系统调用。当你对输入流调用read()方法,直到数据读入完成之前方法一直被阻塞。如果你读入本地文件就不需要等待很长时间。但是如果你从文件服务器或这是socket连接读取数据的话,那么你就要等很长时间。但你在等待过程中,你读取数据的线程将不能做任何事。
当然,在Java中你很容易为多个流创建多个线程。但是线程需要消耗大量的资源。在很多实现中,每个线程需要占用一块内存,即使它什么也不做。同时太多的线程会对性能造成很大的影响。
select采用不同的工作方式。通过selet你把输入流注册到一个Selector对象上。当某个流发生I/O活动时,selector将会通知你。以这种方式就可以只用一个线程读入多个数据源。尽管Selector不能帮你读取数据,但是它可以监听网络连接请求和越过较慢的通道进行写数据。
应用范例 我将建立一个简单的加密服务器来进一步说明select工具。这个程序取得客户数据,进行加密,然后再将加密后的数据返回给客户。我利用select工具接受连接请求和从现有连接中读取传入的数据。
因为本文不是在讨论加密,所以程序采用的加密方式并不重要。我只是用rot13过滤数据然后再将它返回。rot13过滤器只是简单的将每个字母循环向前13步,例如将a转变为n、b转变为o等等。当到了字母表的结尾后将回到开头继续循环,所以w就应该是j。
我不会同服务器一样详细介绍客户程序。你可以用系统中的telnet工具连接到服务器。我提供了一个测试程序向服务器申请大量的连接,以此来显示服务器在高负载下的工作性能。
使用Selectors让我们来看一下Selector是如何工作的。在下面的范例中,我用Selector完成两个任务:接受连接请求和接受现有连接的请求数据。
监听连接请求 首先我要创立一个Selector对象。Selector是这个过程的核心对象;每个连接都必须对这个对象进行注册。静态方法Selector.open()可以创建Selector对象:
Selector selector = Selector.open();
因为我将要创建一个C/S系统,所以要监听ServerSocketChannel。我必须将它设置为非阻塞通道,只有这样它才可以被Selector使用:
ServerSocketChannel ssc = ...
ssc.configureBlocking(false);
ssc.register(selector, OP_ACCEPT);
SelectionKey.OP_ACCEPT参数告诉Selector我只想监听连接请求而不是普通数据。因为服务器套接字不接受普通得数据。
主循环 现在我已经对Selector进行了注册,让我们来开动它。因为我要不断的等待新的请求,所以我使用了一个无限循环,并在其中调用Selector的select()方法。
while (true) {
//察看是否有新的请求包括连接请求和现有连接的请求数据。
int num = selector.select();
//如果没有新的请求,将继续循环等待。
if (num == 0) {
continue;
}
//获得被发现请求的关键字并一个一个的处理它们。
Set keys = selector.selectedKeys();
Iterator it = keys.iterator();
while (it.hasNext()) {
//获得单个I/O请求的关键字
SelectionKey key = (SelectionKey)it.next();
//...处理SelectionKey...
}
//删去关键字,因为你已经处理过它们了。
keys.clear();
}
注意,这个结构很像是在图形用户界面中使用的事件循环结构(event loop)。特别是和事件循环结构一样输入来自不同对象,而且我们不能预计输入发生的时间。
在循环中,select()返回发生I/O活动的通道的个数。如果返回0循环回到顶端继续等待,否则必须处理I/O活动。
这些I/O活动被描述为一个或多个SelectionKey对象。一个SelectionKey代表一个Selector上注册的一个通道。当Selector发现某个通道发生活动时,它返回与此通道相应的SelctionKey。
接收连接 一个SelctionKey可以表示多种I/O活动,你必须找出活动的类型。在我的代码中,只注册了ServerSockChannel一个通道,所以只需要处理这个通道的连接请求。
SelectionKey的readyOps()方法用来决定I/O活动的类型。这个方法返回一个bitmask(位掩码)来表示发生在这个通道上的活动的类型。可以检查它是否包含用来表示连接请求的OP_ACCEPT位:
if ((key.readyOps() & SelectionKey.OP_ACCEPT) ==
SelectionKey.OP_ACCEPT) {
//接收连接请求。
Socket s = serverSocket.accept();
// ... 处理连接请求...
}
如果它是一个连接请求就使用阻塞式调用accept()得到一个连接。但是实际accept()并不会发生阻塞,因为Selector已经告诉我们有一个连接请求发生正在被等待处理。
现在连接已建立,让我们用它来接收数据。
监听传入数据在接收到连接后,需要监听来自这些连接的传入数据。像注册服务器套接字监听连接请求一样,需要注册新的连接套接字来监听传入数据。和服务器套接字一样,也要将新的套接字设置为非阻塞。
因为至少有注册了一个连接套接字,所以我将要接收它或它们的数据。
//一定要将它设置为非阻塞,这样才可以使用Selector。
SocketChannel sc = socket.getChannel();
sc.configureBlocking( false );
//把它注册到selector,并设置为reading属性。
sc.register( selector, SelectionKey.OP_READ );
将Seletor设置为监听OP_READ活动,而不是OP_ACCEPT,这就意味着要监听输入数据,而不是连接请求。
回到循环顶端有两种套接字注册到Selector上:服务器套接字和普通套接字。在循环顶端,再次调用select方法:
int num = selector.select();
现在任何套接字发生活动我都回收到通知,包括服务器套接字受到了连接请求和普通套接字收到数据。当连入更多的连接时,他们都会被注册到Seletor上。
传入数据因为至少有一个普通套接字被注册,所以我将接收到其中之一的数据。当接收到数据时,套接字的readyOps()返回bitmask被设置为OP_READ,你可以获得这个SelectionKey(选择关键字)。
} else if ((key.readyOps() & SelectionKey.OP_READ) ==
SelectionKey.OP_READ) {
SocketChannel sc = (SocketChannel)key.channel();
processInput( sc );
// ...
}
数据传入后,我将套接字更确切的是SocketChannel传递给加密程序。你可以从源代码中找到加密程序,它将传入的数据加密以后返回到客户端。
最终结果这里有很多步骤,它们适合其它的select程序。所有的输入源都被注册到Selector上。在一个无限循环中不断的调用Selector的select()方法。每次该方法返回时代表某些输入源发生了活动。处理发生活动的输入源并继续循环。图3说明了这个过程。
图3.完整的Select循环
完成服务器
上述代码只是一部分,完整的代码包括一个完整的主循环和proccessInput代码。从命令行启动该服务器进行测试:
java Server [端口]
然后,用telnet连接这个服务器。如果是同一台机器,你可以用localhost或127.0.0.1作为登录的主机名。
源代码中还包含一个Client.java文件,你可以用这个程序测试你的服务器。它有多个线程,每个线程都发送和读取数据。每个线程都用一个无限循环,不断的发送数据。如下方式可以用来运行它:
java Client [主机名] [端口] [线程个数]
综述
通过前面的讨论,select I/O模型是基于事件驱动(event-drivern)的。所有输入源被注册到一个Selector对象上,它等待着任何输入源的活动。这个模型不同于流模型,但它仍是一个实体模型。很大程度上,可以说select模型运行在流模型之上。从硬件角度来讲,I/O是基于事件驱动的,因为像网卡、键盘和磁盘等外设在发送数据前都不会发出通知。
流模型用缓冲掩盖了阻塞式I/O调用之后的事件驱动I/O的复杂性使得程序变得更简单。但当你需要更高的速度时,你需要越过流这一层直接使用I/O事件本身。
NIO库提供了非常优雅的基于缓冲的接口连接到Select模型。同时它也完全适合旧的流模型;实际上,旧的java.io.*包中的类现在都是基于java.nio.*的,因此它们可以很好的配合。
下载文章中的源代码>>>http://www.javaworld.com/javaworld/jw-04-2003/select/jw-0411-select.zip
SocketChannel,ServerSocketChannel完全代替了Socket和ServerSocket。
注册的操作可以是SelectionKey.OP_READ,SelectionKey.OP_WRITE,SelectionKey.OP_ACCEPT,OP_CONNECT ...
一个socket/serverSocket可以监听多个。
===============================================================
SocketChannel socket;
if (key.isAcceptable()) {
System.out.println("Acceptable Key");
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
socket = (SocketChannel) ssc.accept();
socket.configureBlocking(false);
SelectionKey another =
socket.register(sel,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
if (key.isReadable()) {
System.out.println("Readable Key");
String ret = readMessage(key);
if (ret.length() > 0) {
writeMessage(socket,ret);
}
}
if (key.isWritable()) {
System.out.println("Writable Key");
String ret = readMessage(key);
socket = (SocketChannel)key.channel();
if (result.length() > 0 ) {
writeMessage(socket,ret);
}
}
// $Id$
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;
public class Server implements Runnable
{
// The port we will listen on
private int port;
// A pre-allocated buffer for encrypting data
private final ByteBuffer buffer = ByteBuffer.allocate( 16384 );
public Server( int port ) {
this.port = port;
new Thread( this ).start();
}
public void run() {
try {
// Instead of creating a ServerSocket,
// create a ServerSocketChannel
ServerSocketChannel ssc = ServerSocketChannel.open();
// Set it to non-blocking, so we can use select
ssc.configureBlocking( false );
// Get the Socket connected to this channel, and bind it
// to the listening port
ServerSocket ss = ssc.socket();
InetSocketAddress isa = new InetSocketAddress( port );
ss.bind( isa );
// Create a new Selector for selecting
Selector selector = Selector.open();
// Register the ServerSocketChannel, so we can
// listen for incoming connections
ssc.register( selector, SelectionKey.OP_ACCEPT );
System.out.println( "Listening on port "+port );
while (true) {
// See if we've had any activity -- either
// an incoming connection, or incoming data on an
// existing connection
int num = selector.select();
// If we don't have any activity, loop around and wait
// again
if (num == 0) {
continue;
}
// Get the keys corresponding to the activity
// that has been detected, and process them
// one by one
Set keys = selector.selectedKeys();
Iterator it = keys.iterator();
while (it.hasNext()) {
// Get a key representing one of bits of I/O
// activity
SelectionKey key = (SelectionKey)it.next();
// What kind of activity is it?
if ((key.readyOps() & SelectionKey.OP_ACCEPT) ==
SelectionKey.OP_ACCEPT) {
System.out.println( "acc" );
// It's an incoming connection.
// Register this socket with the Selector
// so we can listen for input on it
Socket s = ss.accept();
System.out.println( "Got connection from "+s );
// Make sure to make it non-blocking, so we can
// use a selector on it.
SocketChannel sc = s.getChannel();
sc.configureBlocking( false );
// Register it with the selector, for reading
sc.register( selector, SelectionKey.OP_READ );
} else if ((key.readyOps() & SelectionKey.OP_READ) ==
SelectionKey.OP_READ) {
SocketChannel sc = null;
try {
// It's incoming data on a connection, so
// process it
sc = (SocketChannel)key.channel();
boolean ok = processInput( sc );
// If the connection is dead, then remove it
// from the selector and close it
if (!ok) {
key.cancel();
Socket s = null;
try {
s = sc.socket();
s.close();
} catch( IOException ie ) {
System.err.println( "Error closing socket "+s+": "+ie );
}
}
} catch( IOException ie ) {
// On exception, remove this channel from the selector
key.cancel();
try {
sc.close();
} catch( IOException ie2 ) { System.out.println( ie2 ); }
System.out.println( "Closed "+sc );
}
}
}
// We remove the selected keys, because we've dealt
// with them.
keys.clear();
}
} catch( IOException ie ) {
System.err.println( ie );
}
}
// Do some cheesy encryption on the incoming data,
// and send it back out
private boolean processInput( SocketChannel sc ) throws IOException {
buffer.clear();
sc.read( buffer );
buffer.flip();
// If no data, close the connection
if (buffer.limit()==0) {
return false;
}
// Simple rot-13 encryption
for (int i=0; i<buffer.limit(); ++i) {
byte b = buffer.get( i );
if ((b>='a' && b<='m') || (b>='A' && b<='M')) {
b += 13;
} else if ((b>='n' && b<='z') || (b>='N' && b<='Z')) {
b -= 13;
}
buffer.put( i, b );
}
sc.write( buffer );
System.out.println( "Processed "+buffer.limit()+" from "+sc );
return true;
}
static public void main( String args[] ) throws Exception {
int port = Integer.parseInt( args[0] );
new Server( port );
}
}
// $Id$
import java.io.*;
import java.net.*;
import java.util.*;
public class Client implements Runnable
{
private String host;
private int port;
// Bounds on how much we write per cycle
private static final int minWriteSize = 1024;
private static final int maxWriteSize = 65536;
// Bounds on how long we wait between cycles
private static final int minPause = (int)( 0.05 * 1000 );
private static final int maxPause = (int)( 0.5 * 1000 );
// Random number generator
Random rand = new Random();
public Client( String host, int port, int numThreads ) {
this.host = host;
this.port = port;
for (int i=0; i<numThreads; ++i) {
new Thread( this ).start();
}
}
public void run() {
byte buffer[] = new byte[maxWriteSize];
try {
Socket s = new Socket( host, port );
InputStream in = s.getInputStream();
OutputStream out = s.getOutputStream();
while (true) {
int numToWrite = minWriteSize +
(int)(rand.nextDouble() * (maxWriteSize-minWriteSize));
for (int i=0; i<numToWrite; ++i) {
buffer[i] = (byte)rand.nextInt( 256 );
}
out.write( buffer, 0, numToWrite );
int sofar = 0;
while (sofar < numToWrite) {
sofar += in.read( buffer, sofar, numToWrite-sofar );
}
System.out.println( Thread.currentThread()+" wrote "+numToWrite );
int pause = minPause +
(int)(rand.nextDouble() * (maxPause-minPause));
try { Thread.sleep( pause ); } catch( InterruptedException ie ) {}
}
} catch( IOException ie ) {
ie.printStackTrace();
}
}
static public void main( String args[] ) throws Exception {
String host = args[0];
int port = Integer.parseInt( args[1] );
int numThreads = Integer.parseInt( args[2] );
new Client( host, port, numThreads );
}
}
Trackback: http://tb.donews.net/TrackBack.aspx?PostId=445807