博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
JAVA并发处理经验(四)并行模式与算法6:NIO网络编程
阅读量:6289 次
发布时间:2019-06-22

本文共 8776 字,大约阅读时间需要 29 分钟。

一、前言

首先我们必须了解NIO的一些基本概念

channel:是NIO中的一个通道,类似我们说的流。---管道

Buffer:理解为byte数组。与channel交流。----水流

Selector:有一个SelectableChancel实现,用线程管理------选择器

二、NIO编程

2.1 NIO服务端

package pattern.nio;import java.io.IOException;import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.net.Socket;import java.nio.ByteBuffer;import java.nio.channels.*;import java.nio.channels.spi.SelectorProvider;import java.util.*;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * Created by ycy on 16/1/19. */public class NioServer {    //选择器    private Selector selector;    //线程池    private ExecutorService tp = Executors.newCachedThreadPool();    //给定大小的map    public static Map
time_stat = new HashMap
(10240); public void startServer() throws Exception { //1'由selectorPrivider返回一个创建者,并打开一个选择器 selector = SelectorProvider.provider().openSelector(); //2'打开套接字通道 ServerSocketChannel ssc = ServerSocketChannel.open(); //block - 如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式 ssc.configureBlocking(false); InetSocketAddress isa = new InetSocketAddress(65500); // InetSocketAddress isa=new InetSocketAddress(8000); //获取与此通道关联的服务器套接字。将 ServerSocket 绑定到特定地址(IP 地址和端口号) ssc.socket().bind(isa); //将通道注册到选择器, SelectionKey accpetKey = ssc.register(selector, SelectionKey.OP_ACCEPT); try { for (; ; ) { selector.select();//消费阻塞 Set readykeys = selector.selectedKeys();//获取已经准备好的keys Iterator i = readykeys.iterator();//迭代 long e = 0; while (i.hasNext()) { SelectionKey sk = (SelectionKey) i.next(); i.remove();//必须消除,防止重复消费 if (sk.isAcceptable()) { doAccept(sk);//如果为接受状态,接受 } else if (sk.isValid() && sk.isReadable()) {//如果是可读 if (!time_stat.containsKey(((SocketChannel) sk.channel()).socket())) { //将socket方法如map time_stat.put(((SocketChannel) sk.channel()).socket(), System.currentTimeMillis());//增加一个时间戳 //读取 doRead(sk); } } else if (sk.isValid() && sk.isWritable()) { //写 doWrite(sk); e = System.currentTimeMillis(); long b = time_stat.remove(((SocketChannel) sk.channel()).socket()); System.out.println("spend:" + (e - b) + "ms");//输入处理写入耗时 } } } } catch (ClosedSelectorException e) { System.out.println("外面捕捉不做事"); } } /* 与客户端建立连接 */ private void doAccept(SelectionKey sk) { try { ServerSocketChannel server = (ServerSocketChannel) sk.channel(); SocketChannel clientChannel; clientChannel = server.accept();//生成一个channel表示与客户端通信 clientChannel.configureBlocking(false);//非阻塞模式 //Register this channel for reading SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ); //Allocate an Echoclient instance adn attach it to this selction key EchoClient echoClient = new EchoClient();//回复给客服端口的全部信息 clientKey.attach(echoClient);//附加实例,整个连接共享实例 InetAddress clientAddress = clientChannel.socket().getInetAddress(); System.out.println("Acceprted connection form " + clientAddress.getHostAddress() + ".") ; } catch (Exception e) { e.printStackTrace(); } } //执行读得操作 private void doRead(SelectionKey sk) { SocketChannel channel = (SocketChannel) sk.channel(); ByteBuffer bb = ByteBuffer.allocate(8192); int len; try { len = channel.read(bb); if (len < 0) { disconnect(sk); return; } } catch (Exception e) { System.out.println("faild to read from client"); e.printStackTrace(); disconnect(sk); return; } bb.flip(); tp.execute(new HanldeMsg(sk, bb)); } private void disconnect(SelectionKey sk) { try { SocketChannel channel = (SocketChannel) sk.channel(); channel.close(); //sk.cancel(); sk.selector().close(); } catch (IOException e) { e.printStackTrace(); } } //执行写操作 private void doWrite(SelectionKey sk) { SocketChannel channel = (SocketChannel) sk.channel(); EchoClient echoClient = (EchoClient) sk.attachment(); LinkedList
outq = echoClient.getOutputQuquq(); ByteBuffer bb = outq.getLast(); try { int len = channel.write(bb); if (len == -1) { disconnect(sk); return; } if (bb.remaining() == 0) { //已经完成 outq.removeLast(); } } catch (Exception e) { System.out.println("Faild to write to client"); e.printStackTrace(); disconnect(sk); } if (outq.size() == 0) {//很重要 sk.interestOps(SelectionKey.OP_READ); } } /内部匿名类/ class EchoClient { private LinkedList
outq; EchoClient() { outq = new LinkedList
(); } public LinkedList
getOutputQuquq() { return outq; } public void enqueue(ByteBuffer bb) { outq.addFirst(bb); } } //将接受的数据压入EchClient,需要处理业务在这u处理,处理完成之后重新注册事件op_write class HanldeMsg implements Runnable { SelectionKey sk; ByteBuffer bb; public HanldeMsg(SelectionKey sk, ByteBuffer bb) { this.sk = sk; this.bb = bb; } public void run() { EchoClient echoClient = (EchoClient) sk.attachment(); echoClient.enqueue(bb); sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); //强迫selector立即返回 selector.wakeup(); } }}

2.2NIO main方法

package pattern.nio;/** * Created by ycy on 16/1/20. */public class NioMain {    public static void main(String[] args) throws Exception {        NioServer nioServer=new NioServer();        nioServer.startServer();    }}

2.3 NIO客户端

package pattern.nio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.nio.channels.spi.SelectorProvider;import java.util.Iterator;/** * Created by ycy on 16/1/20. */public class NioClient {    private Selector selector;    public void init(String ip,int port) throws IOException {        SocketChannel channel=SocketChannel.open();        channel.configureBlocking(false);        this.selector= SelectorProvider.provider().openSelector();        channel.connect(new InetSocketAddress(ip,port));        channel.register(selector,SelectionKey.OP_CONNECT);    }    public void working() throws IOException {        while (true){            if (!selector.isOpen()){                break;            }                selector.select();                Iterator
ite=this.selector.selectedKeys().iterator(); while (ite.hasNext()){ SelectionKey key=ite.next(); ite.remove(); //连接事件 if (key.isConnectable()){ connect(key); }else if(key.isReadable()){ read(key); } } } } /* 连接 */ public void connect(SelectionKey key) throws IOException { SocketChannel channel=(SocketChannel)key.channel(); //如果正在连接,则连接完成 if(channel.isConnectionPending()){ channel.finishConnect(); } channel.configureBlocking(false); channel.write(ByteBuffer.wrap(new String("HELLO" ).getBytes())); channel.register(this.selector,SelectionKey.OP_READ); } public void read(SelectionKey key) throws IOException { SocketChannel channel=(SocketChannel)key.channel(); //读取缓冲区 ByteBuffer bb=ByteBuffer.allocate(1000); channel.read(bb); byte[] data=bb.array(); String msg=new String(data).trim(); System.out.println("客户端收到信息:"+msg); channel.close(); key.selector().close(); } public static void main(String[] args) throws IOException { NioClient nioClient=new NioClient(); nioClient.init("127.0.0.1",65500); nioClient.working(); }}

转载地址:http://hjuta.baihongyu.com/

你可能感兴趣的文章
java PO、BO
查看>>
docker拉取镜像报错:net/http: TLS handshake timeout.
查看>>
sublime text笔记
查看>>
为CommonMark.Net增加Table解析
查看>>
multi_index_container 多索引容器
查看>>
【学习Android NDK开发】Android.mk文件
查看>>
iOS-关于iOS应用支持IP6
查看>>
企业USB权限控制心得^
查看>>
Linux shell编程学习笔记-----第十七章
查看>>
Spring-MVC
查看>>
Vue+Element+computed实现购物车
查看>>
python库参考学习网址
查看>>
css3创建动画
查看>>
CentOS6.2安装memcache
查看>>
iOS向后台申请一段时间
查看>>
魅情景
查看>>
javascript 坑
查看>>
基于VUE的九宫格抽奖功能
查看>>
Linux中修改环境变量及生效方法
查看>>
2017/10/10 jar包错误
查看>>