-->

java网络编程基础——TCP网络编程三

2020-01-25 09:19发布

AIO实现非阻塞通信

java7 NIO2 提供了异步Channel支持,这种异步Channel可以提供更高效的IO,这种基于异步Channel的IO被称为异步IO(Asynchronous IO)

IO操作分为两步:1、程序发出IO请求  2、完成实际的IO操作

阻塞和非阻塞IO是根据第一步划分的:

发出IO请求如果阻塞线程则是阻塞IO,如果不阻塞线程,则是非阻塞IO。

同步IO和异步IO是根据第二步划分:

如果实际的IO操作是由操作系统完成,再将结果返回给应用程序,这就是异步IO。

如果实际的IO需要应用程序本身去执行,会阻塞线程,那就是同步IO。

(java传统的IO操作和基于Channel的非阻塞IO都是同步IO)

 

 

NIO2提供了一系列以Asynchronous开头的Channel接口和类。

其中AsynchronousSocketChannel、AsynchronousServerSocketChannel是支持TCP通信的异步Channel。

 

AsynchronousServerSocketChannel:负责监听的Channel,与ServerSocketChannel相似。

AsynchronousServerSocketChannel使用需要三步:

1)调用open()静态方法创建AsynchronousServerSocketChannel实例

2)调用AsynchronousServerSocketChannel的bind()方法让他在指定IP,端口监听。

3)调用AsynchronousServerSocketChannel的accept()方法接收连接请求。

 

AsynchronousSocketChannel:与SocketChannel类似,执行具体的IO操作

AsynchronousSocketChannel的用法也可以分为三步:

1)调用Open()静态方法创建AsynchronousSocketChannel实例

2)调用AsynchronousSocketChannel的connect()方法让他在指定IP,端口服务器。

3)调用AsynchronousSocketChannel的read()、write()方法进行读写。

 

AsynchronousServerSocketChannel、AsynchronousSocketChannel都允许使用线程池管理,open()方法创建对应实例时都可以传入AsynchronousChannelGroup。AsynchronousChannelGroup创建需要传入一个线程池ExecutorService。

AsynchronousServerSocketChannel的accept()方法、AsynchronousSocketChannel的read()、write()方法都有两个版本

1)返回Future对象版本:必须等到Future的get方法返回时IO操作才完成,get方法会阻塞线程的。

2)需要传入CompletionHandler版本:通过ComplctionHandler完成相关操作。

 

        CompletionHandler是一个接口,该接口中定义了两个方法:

completed(V result,A attachment):当IO操作完成时触发该方法,第一参数表示IO操作返回的参数;第二个参数表示发起IO操作时传入的附加参数。

failed(Trowable exc,A attachment):当IO操作失败事触发该方法,第一参数表示异常信息,,第二个参数表示发起IO操作时传入的附加参数。

 

下面使用Future对象版本实现简单的AIO服务端、客户端通信:

package net;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.Future;
 
public class SimpleAIOServer {
 
    static final int PORT = 30000;
     
    public static void main(String[] args) throws Exception {
         
        try (//创建AsynchronousServerSocketChannel实例
                AsynchronousServerSocketChannel serverSocketChannel = 
                    AsynchronousServerSocketChannel.open();)
        {
            serverSocketChannel.bind(new InetSocketAddress(PORT));
            while(true) {
                //采用循环接收客户端的连接
                Future<AsynchronousSocketChannel> future = serverSocketChannel.accept();
                //获取连接后返回AsynchronousSocketChannel
                AsynchronousSocketChannel socketChannel = future.get();
                 
                 
                //向客户端输出数据
                Future<Integer> future1 =socketChannel.write(ByteBuffer.wrap("AIO HELLO".
                getBytes("UTF-8")));
                future1.get();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
package net;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.Future;
 
public class SimpleAIOClient {
 
    static final int PORT = 30000;
     
    public static void main(String[] args) throws Exception {
         
        //用户读取数据的Buffer
        ByteBuffer buff = ByteBuffer.allocate(1024);
        Charset charset = Charset.forName("UTF-8");
        try(//创建AsynchronousSocketChannel实例
            AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();)
        {
            //连接到远程服务器
            Future<Void> future = socketChannel.connect(new InetSocketAddress("127.0.0.1", PORT));
            future.get();
             
            buff.clear();
            //socketChannel中读取数据
            Future<Integer> future1 = socketChannel.read(buff);
            future1.get();
             
            buff.flip();
            String content = charset.decode(buff).toString();
            System.out.println("服务器:" + content);
         
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

结果:

服务器:AIO HELLO

AIO实现多人聊天

package net;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class AIOServer {
 
    static final int PORT = 30000;
    static List<AsynchronousSocketChannel> channelList = new ArrayList<>();
     
    public void init() throws IOException {
        //创建一个线程池
        ExecutorService executor = Executors.newFixedThreadPool(20);
        //以指定线程池创建分组管理器
        AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(executor);
        //以线程池创建AsynchronousServerSocketChannel
        AsynchronousServerSocketChannel serverSocketChannel = 
                                AsynchronousServerSocketChannel.open(channelGroup);
        //绑定端口
        serverSocketChannel.bind(new InetSocketAddress(PORT));
        //使用CompletionHandler处理客户端连接请求,此处的Handler主要处理客户端连接请求
        serverSocketChannel.accept(null, new AcceptHandler(serverSocketChannel));
         
    }
     
    public static void main(String[] args) throws Exception {
        AIOServer aioServer = new AIOServer();
        aioServer.init();
        Thread.sleep(Integer.MAX_VALUE);
         //不让服务器停止
        while(true) {}
    }
     
}
package net;
 
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
 
public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object>{
 
    private AsynchronousServerSocketChannel serverSocketChannel = null;
    public AcceptHandler(AsynchronousServerSocketChannel serverSocketChannel) {
        this.serverSocketChannel = serverSocketChannel ;
    }
     
    //定义一个Buffer准备读取数据
    ByteBuffer buff = ByteBuffer.allocate(1024);
    Charset charset = Charset.forName("UTF-8");
     
    //当IO操作完成时触发该方法
    @Override
    public void completed(final AsynchronousSocketChannel socketChannel, Object attachment) {
        //记录新进来的Channel
        AIOServer.channelList.add(socketChannel);
         
        //准备接收客户端的下一次连接
        serverSocketChannel.accept(null, this);
         
        //读取客户端数据,此处的Handler主要处理读取客户数据
        socketChannel.read(buff, null, new CompletionHandler<Integer, Object>() {
 
            @Override
            public void completed(Integer result, Object attachment) {
                buff.flip();
                //将Buffer中的数据转换成字符串
                String content = charset.decode(buff).toString();
                //将客户端发来的数据 发送到么每个客户端
                for(AsynchronousSocketChannel asc : AIOServer.channelList) {
                    try {
                        asc.write(ByteBuffer.wrap(content.getBytes(charset))).get();
                    } catch (InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                }
                //清空buff容器,用户读取下一次数据
                buff.clear();
            }
 
            //当IO操作失败事触发该方法
            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("读取数据失败:"+ exc);
                //读取数据失败,客户端出问题,移除对应的channel
                AIOServer.channelList.remove(socketChannel);
            }
        });
         
    }
 
    @Override
    public void failed(Throwable exc, Object attachment) {
        System.out.println("连接失败:"+exc);
    }
 
}
package net;
import java.awt.BorderLayout;
import java.awt.event.ActionEvent;
import java.awt.event.InputEvent;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.swing.AbstractAction;
import javax.swing.Action;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTextArea;
import javax.swing.JTextField;
import javax.swing.KeyStroke;
 
public class AIOClient {
    static final int PORT = 30000;
    //与服务器通信的异步Channel
    AsynchronousSocketChannel socketChannel = null;
     
    JFrame mainWin = new JFrame("多人聊天");
    JTextArea jta = new JTextArea(16,48);
    JTextField jtf = new JTextField(40);
    JButton sendBtn = new JButton("发送");
     
    public void init() {
        mainWin.setLayout(new BorderLayout());
        jta.setEditable(false);
        mainWin.add(new JScrollPane(jta),BorderLayout.CENTER);
        JPanel jp = new JPanel();
        jp.add(jtf);
        jp.add(sendBtn);
         
        @SuppressWarnings("serial")
        Action sendAction  = new AbstractAction() {
            @Override
            public void actionPerformed(ActionEvent e) {
                String content = jtf.getText();
                if(content.trim().length() > 0) {
                    //将输入内容写到channel中
                    try {
                        socketChannel.write(ByteBuffer.
                        wrap(content.getBytes(StandardCharsets.UTF_8))).get();
                    } catch (InterruptedException | ExecutionException e1) {
                        e1.printStackTrace();
                    }
                }
                jtf.setText("");
            }
        };
         
        sendBtn.addActionListener(sendAction);
        jtf.getInputMap().put(KeyStroke.getKeyStroke('\n', InputEvent.CTRL_MASK), "send");
        jtf.getActionMap().put("send", sendAction);
        mainWin.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
        mainWin.add(jp, BorderLayout.SOUTH);
        mainWin.pack();
        mainWin.setVisible(true);
    }
     
    public void connect() throws Exception {
        //用于读取数据的buffer
        ByteBuffer buff = ByteBuffer.allocate(1024);
        //创建一个线程池
        ExecutorService executor = Executors.newFixedThreadPool(80);
        //以指定线程池创建分组管理器
        AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(executor);
        //以分组管理器创建AsynchronousSocketChannel
        socketChannel =  AsynchronousSocketChannel.open(channelGroup);
        //连接服务器
        socketChannel.connect(new InetSocketAddress("127.0.0.1", PORT)).get();
        jta.append("***与服务器连接成功***\n");
        socketChannel.read(buff, null, new CompletionHandler<Integer, Object>() {
 
            @Override
            public void completed(Integer result, Object attachment) {
                buff.flip();
                //将Buffer转换成字符串
                String content = StandardCharsets.UTF_8.decode(buff).toString();
                //显示从服务器读取的数据
                jta.append("某人说:"+content+"\n");
                buff.clear();
                //为下一次读取数据做准备
                socketChannel.read(buff, null, this);
            }
 
            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("读取数据失败"+exc);
            }
        });
    }
     
    public static void main(String[] args) throws Exception {
        AIOClient aioClient = new AIOClient();
        aioClient.init();
        aioClient.connect();
    }
}

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package net;
 
import java.io.IOException;
import 
标签: