本文大纲如下:
- 1、写作背景
- 2、基本的UDP包收发用法
- 3、采用NIO方式处理UDP
一、背景
本篇内容,主要来源是在对公司代码重构。公司一个项目是采用UDP方式通信,在UDP的不可靠基础上,封装成可靠的通信协议。其本质是UDP+协议的方式,因今天的重点是UDP通信,所以只讲解UDP模块。由于APP有N个的通信对象,之前的代码中,也就有了N个线程监听接收的消息,N个线程发送消息。这样就会使用大量的线程,而且监听的线程一直处于阻塞状态,效率低下。在这种情况下,也就有必要对此模块进行重构了。
二、基本的UDP包收发用法
这也是公司之前的用法,比较简单粗暴,好处是开发成本低,但后期业务增加的时候,性能会有所下降
对于收发UDP包,需要localIp + localPort + remoteIp + remotePort,属于端对端的通信
1)、UDP发送数据
public static void Send(byte[] data, int offset, int length, int localPort, InetAddress remoteAddress, int remotePort) throws Exception {if (remoteAddress == null || remotePort <= 0) {throw new Exception("Null remote address !!!");}if (data == null || offset < 0 || length <= 0) {throw new Exception("null send data !!!");}// 会分配一个可用的本地端口DatagramSocket socket = new DatagramSocket(null);// 多个UDP socket绑定相同的端口socket.setReuseAddress(true);// 绑定本地端口socket.bind(new InetSocketAddress(localPort));// 封装成PacketDatagramPacket packet = new DatagramPacket(data, offset, length, remoteAddress, remotePort);socket.send(packet);socket.close();}
发送UDP包流程:
- 构建DatagramSocket
- 绑定本地发送端口
- 构建发送的UDP数据包
- 发送
- 关闭Socket
2)、UDP接收数据
DatagramSocket socket = new MulticastSocket(null);
socket.setReuseAddress(true);
socket.bind(new InetSocketAddress(listenPort));protected Runnable listenLoop = new Runnable() {@Overridepublic void run() {byte[] receiveBuffer = new byte[1024];DatagramPacket packet = new DatagramPacket(receiveBuffer, receiveBuffer.length);while (listenRunning) {if (socket != null && !socket.isClosed()) {try {socket.receive(packet);} catch (IOException e) {e.printStackTrace();}}}}};
接收UDP包流程:
- 构建DatagramSocket
- 绑定本地发送端口
- 构建接收的UDP数据包
- socket.receive(packet);
- 关闭Socket
三、NIO重构UDP收发模块
1)、思路
NIO是同步非阻塞方式,将DatagramChannel向Selector选择器注册,使用一个Thread轮询Selector,当网卡准备数据时,就能告知用户开始处理发送或接收事件。总之,一切的数据发送和接收前,都得到Selector注册,得到了Selector的“允许”后,才能处理后续的工作。
2)、核心代码
// 发送接口
public interface Sender extends Closeable {// 触发异步的发送请求boolean postSendAsync() throws IOException;void send(String message,InetSocketAddress remoteAddress);
}
// 接收接口
public interface Receiver extends Closeable {// 触发异步的接收请求boolean postReceiveAsync() throws IOException;// 开始监听void start();
}
// 用于Channel向Selector注册
public interface IoProvider extends Closeable {boolean registerInput(DatagramChannel channel, HandleProviderCallback callback);boolean registerOutput(DatagramChannel channel, HandleProviderCallback callback);void unRegisterInput(DatagramChannel channel);void unRegisterOutput(DatagramChannel channel);abstract class HandleProviderCallback implements Runnable {@Overridepublic final void run() {onProviderIo();}/*** 可以进行接收或者发送时的回调**/protected abstract void onProviderIo();}}// 实现了Sender和Receiver
class DatagramChannelAdapter implements Sender,Receiver,Closeable {private final AtomicBoolean isClosed = new AtomicBoolean(false);private final AtomicBoolean isSending = new AtomicBoolean();private final DatagramChannel channel;private final IoProvider ioProvider;private final UdpDataDispatcher dispatcher;private final Queue<UDPSendSnapshot> queue = new ConcurrentLinkedQueue<>();private final ReceiveUdpListener receiverUdpListener;DatagramChannelAdapter(DatagramChannel channel, IoProvider ioProvider, ReceiveUdpListener receiverUdpListener) throws IOException {this.channel = channel;this.ioProvider = ioProvider;this.receiverUdpListener = receiverUdpListener;dispatcher = new UdpDataDispatcher(channel);// 非阻塞模式下操作channel.configureBlocking(false);}@Overridepublic boolean postReceiveAsync() throws IOException {if (isClosed.get()) {throw new IOException("Current channel is closed!");}// 注册能不能输入return ioProvider.registerInput(channel, inputCallback);}@Overridepublic void start() {try {postReceiveAsync();} catch (IOException e) {e.printStackTrace();}}@Overridepublic boolean postSendAsync() throws IOException {if (isClosed.get()) {throw new IOException("Current channel is closed!");}// 当前发送的数据附加到回调中return ioProvider.registerOutput(channel, outputCallback);}@Overridepublic void send(String message,InetSocketAddress remoteAddress) {queue.offer(new UDPSendSnapshot(message,remoteAddress));requestSend();}private void requestSend() {if (isSending.compareAndSet(false,true) ) {if (queue.size() <= 0){isSending.set(false);return;}try {if (!postSendAsync()) {isSending.set(false);}} catch (IOException e) {e.printStackTrace();CloseUtils.close(this);}}}@Overridepublic void close() throws IOException {if (isClosed.compareAndSet(false, true)) {// 解除注册回调ioProvider.unRegisterInput(channel);ioProvider.unRegisterOutput(channel);// 关闭CloseUtils.close(channel);}}// 输入的数据操作private final IoProvider.HandleProviderCallback inputCallback = new IoProvider.HandleProviderCallback() {@Overrideprotected void onProviderIo() {if (isClosed.get()) {return;}System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!inputCallback");ReceiveUdpData receiveUdp = dispatcher.receive();try {if (receiveUdp == null) {throw new IOException();}postReceiveAsync();receiverUdpListener.onReceiveUdpListener(receiveUdp.getBytes(),receiveUdp.getTotal(),receiveUdp.getAddress(),receiveUdp.getPort());} catch (IOException e) {CloseUtils.close(DatagramChannelAdapter.this);}}};// 输出的数据操作private final IoProvider.HandleProviderCallback outputCallback = new IoProvider.HandleProviderCallback() {@Overrideprotected void onProviderIo() {if (isClosed.get() || queue.size() == 0) {return;}System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!outputCallback");synchronized (isSending) {UDPSendSnapshot snapshot = queue.poll();dispatcher.sendMessage(snapshot.getMessage(),snapshot.getRemoteAddress());isSending.set(false);}}};/*** 收到监听UDP消息之后的回调*/interface ReceiveUdpListener {void onReceiveUdpListener(byte[] data, int length, InetSocketAddress address, int port);}
}
public class IoSelectorProvider implements IoProvider {private final AtomicBoolean isClosed = new AtomicBoolean(false);// 是否处于某个过程private final AtomicBoolean inRegInput = new AtomicBoolean(false);private final AtomicBoolean inRegOutput = new AtomicBoolean(false);// 读和写的数据选择器private final Selector readSelector;private final Selector writeSelector;private final ExecutorService dataHandlePool;private final HashMap<SelectionKey, Runnable> inputCallbackMap = new HashMap<>();private final HashMap<SelectionKey, Runnable> outputCallbackMap = new HashMap<>();public IoSelectorProvider() throws IOException {readSelector = Selector.open();writeSelector = Selector.open();dataHandlePool = Executors.newFixedThreadPool(4,new Factory.NameableThreadFactory("IoProvider-Thread-"));// 开始输出输入的监听startRead();startWrite();}private void startRead() {Runnable runnable = new Runnable() {@Overridepublic void run() {while (!isClosed.get()) {try {if (readSelector.select() == 0) {waitSelection(inRegInput);continue;} else if (inRegInput.get()) {waitSelection(inRegInput);}Set<SelectionKey> selectionKeys = readSelector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();if (selectionKey.isValid()) {// 对应着下面的两种形式 可读System.out.println("可读的回调");handleSelection(selectionKey,SelectionKey.OP_READ, inputCallbackMap, dataHandlePool, inRegInput);}iterator.remove();}} catch (IOException e) {e.printStackTrace();} catch (ClosedSelectorException ignored) {break;}}}};// 启动线程new Thread(runnable).start();}private void startWrite() {Runnable runnable = new Runnable() {@Overridepublic void run() {while (!isClosed.get()) {try {if (writeSelector.select() == 0) {waitSelection(inRegOutput);continue;} else if (inRegOutput.get()) {waitSelection(inRegOutput);}Set<SelectionKey> selectionKeys = writeSelector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();if (selectionKey.isValid()) {// 可写if (selectionKey.isWritable()) {System.out.println("可写的回调");handleSelection(selectionKey,SelectionKey.OP_WRITE, outputCallbackMap, dataHandlePool, inRegOutput);}}iterator.remove();}} catch (IOException e) {e.printStackTrace();} catch (ClosedSelectorException ignored) {break;}}}};// 启动线程new Thread(runnable).start();}private static void handleSelection(SelectionKey key, int keyOps,HashMap<SelectionKey, Runnable> map,ExecutorService pool, AtomicBoolean locker) {//noinspection SynchronizationOnLocalVariableOrMethodParametersynchronized (locker) {try {// 重点// 取消继续对keyOps的监听key.interestOps(key.readyOps() & ~keyOps);} catch (CancelledKeyException e) {return;}}Runnable runnable = null;try {runnable = map.get(key);} catch (Exception ignored) {}if (runnable != null && !pool.isShutdown()) {// 异步调度pool.execute(runnable);}}@Overridepublic boolean registerInput(DatagramChannel channel, HandleProviderCallback callback) {return registerSelection(channel, readSelector, SelectionKey.OP_READ, inRegInput,inputCallbackMap, callback) != null;}@Overridepublic boolean registerOutput(DatagramChannel channel, HandleProviderCallback callback) {return registerSelection(channel, writeSelector, SelectionKey.OP_WRITE, inRegOutput,outputCallbackMap, callback) != null;}@Overridepublic void unRegisterInput(DatagramChannel channel) {unRegisterSelection(channel, readSelector, inputCallbackMap, inRegInput);}@Overridepublic void unRegisterOutput(DatagramChannel channel) {unRegisterSelection(channel, writeSelector, outputCallbackMap, inRegOutput);}private static SelectionKey registerSelection(DatagramChannel channel, Selector selector,int registerOps, AtomicBoolean locker,HashMap<SelectionKey, Runnable> map,Runnable runnable) {//noinspection SynchronizationOnLocalVariableOrMethodParametersynchronized (locker) {// 设置锁定状态locker.set(true);try {// 唤醒当前的selector,让selector不处于select()状态selector.wakeup();SelectionKey key = null;if (channel.isRegistered()) {// 查询是否已经注册过key = channel.keyFor(selector);}if (key != null) {key.interestOps(key.readyOps() | registerOps);}if (key == null) {// 注册selector得到Keykey = channel.register(selector, registerOps);// 注册回调map.put(key, runnable);}return key;} catch (ClosedChannelException| CancelledKeyException| ClosedSelectorException e) {e.printStackTrace();return null;} finally {// 解除锁定状态locker.set(false);try {// 通知locker.notify();} catch (Exception ignored) {}}}}private static void unRegisterSelection(DatagramChannel channel, Selector selector,Map<SelectionKey, Runnable> map,AtomicBoolean locker) {//noinspection SynchronizationOnLocalVariableOrMethodParametersynchronized (locker) {locker.set(true);selector.wakeup();try {if (channel.isRegistered()) {SelectionKey key = channel.keyFor(selector);if (key != null) {// 取消监听的方法key.cancel();map.remove(key);}}} finally {locker.set(false);try {locker.notifyAll();} catch (Exception ignored) {}}}}private static void waitSelection(final AtomicBoolean locker) {//noinspection SynchronizationOnLocalVariableOrMethodParametersynchronized (locker) {if (locker.get()) {try {locker.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}@Overridepublic void close() throws IOException {if (isClosed.compareAndSet(false, true)) {dataHandlePool.shutdown();inputCallbackMap.clear();outputCallbackMap.clear();CloseUtils.close(readSelector);}}
}