::Z::Thinking::

::Simple::
文章 - 124,收藏 - , 评论 - 49, trackbacks - 0

Summary:

1,模型的不同,流模型vs Channel模型

2,在网络I/O中,selector的使用。

参考:http://www-128.ibm.com/developerworks/cn/java/j-javaio/index.html

image

image



概要:J2SE1.4加入了新的I/O库(NIO)允许在Java应用中使用高速I/O。IO使用了新的I/O模型,它与原有的I/O库使用的模型有着很大区别。这篇文章将一步一步教您使用NIO库中的select工具。select使您的服务器可以处理来自多个连接的大数据量。在对NIO库的简要介绍后,本文还讲解了select工具的原理,最后分析了一个利用select工作的服务器的源码。

Java使用了非常优雅的基于流的I/O模型。流是一种产生或消耗字节序列的对象。流可以与过滤程序连接在一起,扩展到可以处理各种不同的数据。流模型非常复杂,但是效率不高。这对于大多数应用来说还好,但当系统需要和硬件处理同样高的速度时,流模型就无法应付了。
J2SE1.4中引入了新I/O库(NIO)来解决这个问题。NIO使用面向缓冲(buffer)的模型。这就是说,NIO主要处理大块的数据。这就避免了利用流模型处理所引起的问题,在有可能的情况下,它甚至可以为了得到最大的吞吐量而使用系统级的工具。
第一次了解到NIO是如何工作的,我就将它应用到高速服务器应用中了。
NIO系统
NIO是基于通道(channels)和缓冲(buffers)这两个概念的。通道有点像流模型中的流。而缓冲在流模型中没有类似的概念。
基本流InputStream和OutputStream能够读写字节数据;它们的子类可以读写各种各样的数据。在NIO中,所有的数据都通过缓冲读写。从图1可以看到两种模型的比较。

image

图1.流模型使用Streams和Bytes;NIO模型使用Channels和Buffers

同时要注意的还有流模型使用了InputStream和OutputStream分别用来读写数据,而NIO只使用通道来完成两者的功能。
使用缓冲的好处是它可以大块的处理数据。你可以读写大块数据,缓冲的大小只受你所分配的内存数量的限制。
缓冲的另一个好处是它可以表示系统级的缓冲。多种系统采用统一的内存配置完成I/O处理,而不需要将数据从系统内存中拷贝到应用程序的内存空间。buffer对象的不同实现可以直接表示这些系统级的缓冲,这就意味着你可以用最少的拷贝次数来完成对数据的读写(参看图2)。
image

图2.系统缓冲允许你使用系统级缓冲,避免额外的复制

select工具
select提供了一种很好的方法来完成大量的数据源并行处理。它的名字来源于Unix系统中提供相同功能的C程序系统调用select()。
通常,I/O属于阻塞式系统调用。当你对输入流调用read()方法,直到数据读入完成之前方法一直被阻塞。如果你读入本地文件就不需要等待很长时间。但是如果你从文件服务器或这是socket连接读取数据的话,那么你就要等很长时间。但你在等待过程中,你读取数据的线程将不能做任何事。
当然,在Java中你很容易为多个流创建多个线程。但是线程需要消耗大量的资源。在很多实现中,每个线程需要占用一块内存,即使它什么也不做。同时太多的线程会对性能造成很大的影响。
  select采用不同的工作方式。通过selet你把输入流注册到一个Selector对象上。当某个流发生I/O活动时,selector将会通知你。以这种方式就可以只用一个线程读入多个数据源。尽管Selector不能帮你读取数据,但是它可以监听网络连接请求和越过较慢的通道进行写数据。
应用范例
  我将建立一个简单的加密服务器来进一步说明select工具。这个程序取得客户数据,进行加密,然后再将加密后的数据返回给客户。我利用select工具接受连接请求和从现有连接中读取传入的数据。
  因为本文不是在讨论加密,所以程序采用的加密方式并不重要。我只是用rot13过滤数据然后再将它返回。rot13过滤器只是简单的将每个字母循环向前13步,例如将a转变为n、b转变为o等等。当到了字母表的结尾后将回到开头继续循环,所以w就应该是j。
  我不会同服务器一样详细介绍客户程序。你可以用系统中的telnet工具连接到服务器。我提供了一个测试程序向服务器申请大量的连接,以此来显示服务器在高负载下的工作性能。
使用Selectors
让我们来看一下Selector是如何工作的。在下面的范例中,我用Selector完成两个任务:接受连接请求和接受现有连接的请求数据。
监听连接请求
  首先我要创立一个Selector对象。Selector是这个过程的核心对象;每个连接都必须对这个对象进行注册。静态方法Selector.open()可以创建Selector对象:
Selector selector = Selector.open();
因为我将要创建一个C/S系统,所以要监听ServerSocketChannel。我必须将它设置为非阻塞通道,只有这样它才可以被Selector使用:
ServerSocketChannel ssc = ...
ssc.configureBlocking(false);
ssc.register(selector, OP_ACCEPT);
SelectionKey.OP_ACCEPT参数告诉Selector我只想监听连接请求而不是普通数据。因为服务器套接字不接受普通得数据。
主循环
  现在我已经对Selector进行了注册,让我们来开动它。因为我要不断的等待新的请求,所以我使用了一个无限循环,并在其中调用Selector的select()方法。
while (true) {
//察看是否有新的请求包括连接请求和现有连接的请求数据。
int num = selector.select();

//如果没有新的请求,将继续循环等待。
if (num == 0) {
continue;
}

//获得被发现请求的关键字并一个一个的处理它们。
Set keys = selector.selectedKeys();

Iterator it = keys.iterator();

while (it.hasNext()) {
//获得单个I/O请求的关键字
  SelectionKey key = (SelectionKey)it.next();

//...处理SelectionKey...
}
 
//删去关键字,因为你已经处理过它们了。
keys.clear();
}
  注意,这个结构很像是在图形用户界面中使用的事件循环结构(event loop)。特别是和事件循环结构一样输入来自不同对象,而且我们不能预计输入发生的时间。
  在循环中,select()返回发生I/O活动的通道的个数。如果返回0循环回到顶端继续等待,否则必须处理I/O活动。
  这些I/O活动被描述为一个或多个SelectionKey对象。一个SelectionKey代表一个Selector上注册的一个通道。当Selector发现某个通道发生活动时,它返回与此通道相应的SelctionKey。
接收连接
  一个SelctionKey可以表示多种I/O活动,你必须找出活动的类型。在我的代码中,只注册了ServerSockChannel一个通道,所以只需要处理这个通道的连接请求。
  SelectionKey的readyOps()方法用来决定I/O活动的类型。这个方法返回一个bitmask(位掩码)来表示发生在这个通道上的活动的类型。可以检查它是否包含用来表示连接请求的OP_ACCEPT位:
if ((key.readyOps() & SelectionKey.OP_ACCEPT) ==
SelectionKey.OP_ACCEPT) {

//接收连接请求。
Socket s = serverSocket.accept();

// ... 处理连接请求...
}
  如果它是一个连接请求就使用阻塞式调用accept()得到一个连接。但是实际accept()并不会发生阻塞,因为Selector已经告诉我们有一个连接请求发生正在被等待处理。
  现在连接已建立,让我们用它来接收数据。
监听传入数据
在接收到连接后,需要监听来自这些连接的传入数据。像注册服务器套接字监听连接请求一样,需要注册新的连接套接字来监听传入数据。和服务器套接字一样,也要将新的套接字设置为非阻塞。
  因为至少有注册了一个连接套接字,所以我将要接收它或它们的数据。
//一定要将它设置为非阻塞,这样才可以使用Selector。
SocketChannel sc = socket.getChannel();
sc.configureBlocking( false );

//把它注册到selector,并设置为reading属性。
sc.register( selector, SelectionKey.OP_READ );
将Seletor设置为监听OP_READ活动,而不是OP_ACCEPT,这就意味着要监听输入数据,而不是连接请求。
回到循环顶端
有两种套接字注册到Selector上:服务器套接字和普通套接字。在循环顶端,再次调用select方法:
int num = selector.select();
现在任何套接字发生活动我都回收到通知,包括服务器套接字受到了连接请求和普通套接字收到数据。当连入更多的连接时,他们都会被注册到Seletor上。
传入数据
因为至少有一个普通套接字被注册,所以我将接收到其中之一的数据。当接收到数据时,套接字的readyOps()返回bitmask被设置为OP_READ,你可以获得这个SelectionKey(选择关键字)。
} else if ((key.readyOps() & SelectionKey.OP_READ) ==
SelectionKey.OP_READ) {

SocketChannel sc = (SocketChannel)key.channel();
processInput( sc );

// ...
}
数据传入后,我将套接字更确切的是SocketChannel传递给加密程序。你可以从源代码中找到加密程序,它将传入的数据加密以后返回到客户端。
最终结果
这里有很多步骤,它们适合其它的select程序。所有的输入源都被注册到Selector上。在一个无限循环中不断的调用Selector的select()方法。每次该方法返回时代表某些输入源发生了活动。处理发生活动的输入源并继续循环。图3说明了这个过程。
image

图3.完整的Select循环


完成服务器
上述代码只是一部分,完整的代码包括一个完整的主循环和proccessInput代码。从命令行启动该服务器进行测试:
java Server [端口]
然后,用telnet连接这个服务器。如果是同一台机器,你可以用localhost或127.0.0.1作为登录的主机名。
源代码中还包含一个Client.java文件,你可以用这个程序测试你的服务器。它有多个线程,每个线程都发送和读取数据。每个线程都用一个无限循环,不断的发送数据。如下方式可以用来运行它:
java Client [主机名] [端口] [线程个数]
综述
通过前面的讨论,select I/O模型是基于事件驱动(event-drivern)的。所有输入源被注册到一个Selector对象上,它等待着任何输入源的活动。这个模型不同于流模型,但它仍是一个实体模型。很大程度上,可以说select模型运行在流模型之上。从硬件角度来讲,I/O是基于事件驱动的,因为像网卡、键盘和磁盘等外设在发送数据前都不会发出通知。
流模型用缓冲掩盖了阻塞式I/O调用之后的事件驱动I/O的复杂性使得程序变得更简单。但当你需要更高的速度时,你需要越过流这一层直接使用I/O事件本身。
NIO库提供了非常优雅的基于缓冲的接口连接到Select模型。同时它也完全适合旧的流模型;实际上,旧的java.io.*包中的类现在都是基于java.nio.*的,因此它们可以很好的配合。

下载文章中的源代码>>>http://www.javaworld.com/javaworld/jw-04-2003/select/jw-0411-select.zip


SocketChannel,ServerSocketChannel完全代替了Socket和ServerSocket。

注册的操作可以是SelectionKey.OP_READ,SelectionKey.OP_WRITE,SelectionKey.OP_ACCEPT,OP_CONNECT ...

一个socket/serverSocket可以监听多个。

===============================================================

SocketChannel socket;
if (key.isAcceptable()) {
    System.out.println("Acceptable Key");
    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    socket = (SocketChannel) ssc.accept();
    socket.configureBlocking(false);
    SelectionKey another =
      socket.register(sel,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
if (key.isReadable()) {
    System.out.println("Readable Key");
    String ret = readMessage(key);
    if (ret.length() > 0) {
      writeMessage(socket,ret);
    }
   
}
if (key.isWritable()) {
    System.out.println("Writable Key");
    String ret = readMessage(key);
    socket = (SocketChannel)key.channel();  
    if (result.length() > 0 ) {
      writeMessage(socket,ret);
    }
    }


// $Id$

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;

public class Server implements Runnable
{
  // The port we will listen on
  private int port;

  // A pre-allocated buffer for encrypting data
  private final ByteBuffer buffer = ByteBuffer.allocate( 16384 );

  public Server( int port ) {
    this.port = port;
    new Thread( this ).start();
  }

  public void run() {
    try {
      // Instead of creating a ServerSocket,
      // create a ServerSocketChannel
      ServerSocketChannel ssc = ServerSocketChannel.open();

      // Set it to non-blocking, so we can use select
      ssc.configureBlocking( false );

      // Get the Socket connected to this channel, and bind it
      // to the listening port
      ServerSocket ss = ssc.socket();
      InetSocketAddress isa = new InetSocketAddress( port );
      ss.bind( isa );

      // Create a new Selector for selecting
      Selector selector = Selector.open();

      // Register the ServerSocketChannel, so we can
      // listen for incoming connections
      ssc.register( selector, SelectionKey.OP_ACCEPT );
      System.out.println( "Listening on port "+port );

      while (true) {
        // See if we've had any activity -- either
        // an incoming connection, or incoming data on an
        // existing connection
        int num = selector.select();

        // If we don't have any activity, loop around and wait
        // again
        if (num == 0) {
          continue;
        }

        // Get the keys corresponding to the activity
        // that has been detected, and process them
        // one by one
        Set keys = selector.selectedKeys();
        Iterator it = keys.iterator();
        while (it.hasNext()) {
          // Get a key representing one of bits of I/O
          // activity
          SelectionKey key = (SelectionKey)it.next();

          // What kind of activity is it?
          if ((key.readyOps() & SelectionKey.OP_ACCEPT) ==
            SelectionKey.OP_ACCEPT) {

System.out.println( "acc" );
            // It's an incoming connection.
            // Register this socket with the Selector
            // so we can listen for input on it

            Socket s = ss.accept();
            System.out.println( "Got connection from "+s );

            // Make sure to make it non-blocking, so we can
            // use a selector on it.
            SocketChannel sc = s.getChannel();
            sc.configureBlocking( false );

            // Register it with the selector, for reading
            sc.register( selector, SelectionKey.OP_READ );
          } else if ((key.readyOps() & SelectionKey.OP_READ) ==
            SelectionKey.OP_READ) {

            SocketChannel sc = null;

            try {

              // It's incoming data on a connection, so
              // process it
              sc = (SocketChannel)key.channel();
              boolean ok = processInput( sc );

              // If the connection is dead, then remove it
              // from the selector and close it
              if (!ok) {
                key.cancel();

                Socket s = null;
                try {
                  s = sc.socket();
                  s.close();
                } catch( IOException ie ) {
                  System.err.println( "Error closing socket "+s+": "+ie );
                }
              }

            } catch( IOException ie ) {

              // On exception, remove this channel from the selector
              key.cancel();

              try {
                sc.close();
              } catch( IOException ie2 ) { System.out.println( ie2 ); }

              System.out.println( "Closed "+sc );
            }
          }
        }

        // We remove the selected keys, because we've dealt
        // with them.
        keys.clear();
      }
    } catch( IOException ie ) {
      System.err.println( ie );
    }
  }

  // Do some cheesy encryption on the incoming data,
  // and send it back out
  private boolean processInput( SocketChannel sc ) throws IOException {
    buffer.clear();
    sc.read( buffer );
    buffer.flip();

    // If no data, close the connection
    if (buffer.limit()==0) {
      return false;
    }

    // Simple rot-13 encryption
    for (int i=0; i<buffer.limit(); ++i) {
      byte b = buffer.get( i );

      if ((b>='a' && b<='m') || (b>='A' && b<='M')) {
        b += 13;
      } else if ((b>='n' && b<='z') || (b>='N' && b<='Z')) {
        b -= 13;
      }

      buffer.put( i, b );
    }

    sc.write( buffer );

    System.out.println( "Processed "+buffer.limit()+" from "+sc );

    return true;
  }

  static public void main( String args[] ) throws Exception {
    int port = Integer.parseInt( args[0] );

    new Server( port );
  }
}


// $Id$

import java.io.*;
import java.net.*;
import java.util.*;

public class Client implements Runnable
{
  private String host;
  private int port;

  // Bounds on how much we write per cycle
  private static final int minWriteSize = 1024;
  private static final int maxWriteSize = 65536;

  // Bounds on how long we wait between cycles
  private static final int minPause = (int)( 0.05 * 1000 );
  private static final int maxPause = (int)( 0.5 * 1000 );

  // Random number generator
  Random rand = new Random();

  public Client( String host, int port, int numThreads ) {
    this.host = host;
    this.port = port;

    for (int i=0; i<numThreads; ++i) {
      new Thread( this ).start();
    }
  }

  public void run() {

    byte buffer[] = new byte[maxWriteSize];

    try {
      Socket s = new Socket( host, port );

      InputStream in = s.getInputStream();
      OutputStream out = s.getOutputStream();

      while (true) {
        int numToWrite = minWriteSize +
          (int)(rand.nextDouble() * (maxWriteSize-minWriteSize));

        for (int i=0; i<numToWrite; ++i) {
          buffer[i] = (byte)rand.nextInt( 256 );
        }

        out.write( buffer, 0, numToWrite );
        int sofar = 0;
        while (sofar < numToWrite) {
          sofar += in.read( buffer, sofar, numToWrite-sofar );
        }

System.out.println( Thread.currentThread()+" wrote "+numToWrite );

        int pause = minPause +
          (int)(rand.nextDouble() * (maxPause-minPause));
        try { Thread.sleep( pause ); } catch( InterruptedException ie ) {}
      }
    } catch( IOException ie ) {
      ie.printStackTrace();
    }
  }

  static public void main( String args[] ) throws Exception {
    String host = args[0];
    int port = Integer.parseInt( args[1] );
    int numThreads = Integer.parseInt( args[2] );

    new Client( host, port, numThreads );
  }
}



Trackback: http://tb.donews.net/TrackBack.aspx?PostId=445807


[点击此处收藏本文]  发表于2005年06月27日 2:52 PM




正在读取评论……