`
zhuyifeng
  • 浏览: 44011 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论

UDP传输的实现与检测重发机制

阅读更多

      java建立UDP连接和建立TCP/IP连接一样简单,只用到了SocketAddress(绑定主机的IP地址和应用程序端口,包括自己的和发送地)、DatagramSocket(数据套接字,接收和发送数据包都靠它,同时在new该对象时需要try…catch,而SocketAddress则无需)、DatagramPacket(数据包对象,UDP数据都是一个包一个包发送的)这三个类。然后实现起来是非常简单的。首先是发送方new两个SocketAddress对象,一个参数写自己的机子,另一个写要发送的目标主机。然后把自己的SocketAddress对象传入DatagramSocket创建其对象,假设对象名为ds。再将要发送的数据连同目标主机的SocketAddress对象一起传入DatagramPacket创建其对象,设为dp。最后调用方法ds.send(dp);就OK了。这是发送方的,接收方差不多,实现起来没多大难度。不过众所周知,UDP有一个特点却是比不上TCP/IP的,那就是建立的连接不一定非常可靠,经常会出现丢包的情况,所以需要在发送过程中添加检测机制,而这也就大大加大了实现的难度。基本检测机制是:

      发送方发送一个数据包,接收方接收后,发回一个应答包告诉发送方已收到消息,此应答包包含发送的消息的唯一序列号。关于这个序列号的唯一性设置我还不是很明白,所以在测试中就把第n条消息的n作为序列号了。若发送方在一定时间内(时间长短可自行设置)未收到应答包则重发消息。重发消息有两个原因:①接收方未收到,此时重发是应该的。②接收方收到消息但是发回的应答包丢失了,此时重发消息则重复了,所以在接收方还得添加一个机制:若收到的消息与以前发过的消息重复,则再次发送应答包。在这些机制下,能够初步的保证UDP传输的完整性。以下是实现代码:

 

发送端:

 

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;

public class UDPServer {

	public UDPServer() {
		try {
			sa = new InetSocketAddress("127.0.0.1", 9999);
			ds = new DatagramSocket(sa);
			saa = new InetSocketAddress("127.0.0.1", 1234);
			String msg = "";
			startThread();
			new Thread() {
				public void run() {
					while (true) {
						recMsg();
					}
				}
			}.start();
			for (int i = 1;; i++) {
				msg = "第" + i + "条消息";
				Msg m = new Msg(msg, i);
				sendMsg(m);
				list.add(m);
				Thread.sleep(1000);
			}
		} catch (Exception e) {
		}
	}

	public void sendMsg(Msg m) {
		DatagramPacket dp;
		try {
			dp = new DatagramPacket(m.getmsg().getBytes(), m.getmsg()
					.getBytes().length, saa);
			ds.send(dp);
			m.setsendtime(System.currentTimeMillis());
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public void recMsg() {
		byte[] buffer = new byte[20];
		DatagramPacket dpp = new DatagramPacket(buffer, buffer.length);
		try {
			ds.receive(dpp);
		} catch (IOException e) {
			e.printStackTrace();
		}
		String rec = new String(dpp.getData()).trim();
		int id = Integer.parseInt(rec.substring(4, rec.length() - 3));
		for (int i = 0; i < list.size(); i++) {
			if (id == list.get(i).getid()) {
				System.out.println("确认对方已收到第" + list.get(i).getid()
						+ "条消息,从队列中除去");
				list.remove(i);
			}
		}
	}

	public void startThread() {
		new Thread() {
			public void run() {
				while (true) {
					// 如果list里面有元素
					if (list.size() > 1) {
						for (int i = 0; i < list.size(); i++) {
							if (System.currentTimeMillis()
									- list.get(i).getsendtime() > 3000
									&& list.get(i).gettime() < 4) {
								sendMsg(list.get(i));
								System.out.println("重发第" + list.get(i).getid()
										+ "条消息第" + list.get(i).gettime() + "次");
								if (list.get(i).gettime() == 3) {
									System.out.println("重发第"
											+ list.get(i).getid()
											+ "条消息已超过4次,丢弃包");// System.exit(0);
									list.remove(i);
								} else {
									list.get(i).settime(
											list.get(i).gettime() + 1);
								}
							}
						}
					}
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}.start();
	}

	public static void main(String[] args) {
		new UDPServer();
	}

	class Msg {
		public Msg(String msg, int id) {
			this.msg = msg;
			this.id = id;
		}

		public void setid(int i) {
			id = i;
		}

		public int getid() {
			return id;
		}

		public void setmsg(String newmsg) {
			msg = newmsg;
		}

		public String getmsg() {
			return msg;
		}

		public void settime(int i) {
			time = i;
		}

		public int gettime() {
			return time;
		}

		public void setsendtime(long time) {
			lastsendtime = time;
		}

		public long getsendtime() {
			return lastsendtime;
		}

		private int id;
		private String msg;
		private int time;
		private long lastsendtime;
	}

	List<Msg> list = new ArrayList<Msg>();
	SocketAddress sa;
	DatagramSocket ds;
	SocketAddress saa;

}

 

 

接收端:

import java.awt.Dimension;
import java.awt.FlowLayout;
import java.awt.Font;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;

import javax.swing.JFrame;
import javax.swing.JScrollPane;
import javax.swing.JTextArea;
import javax.swing.UIManager;

public class UDPClient {

	JFrame f;
	JTextArea jta, jta2;
	SocketAddress sa;
	SocketAddress saa;
	DatagramSocket ds;
	List<Msg> list = new ArrayList<Msg>();

	public UDPClient() {
		try {
			UIManager.setLookAndFeel(UIManager.getSystemLookAndFeelClassName());
		} catch (Exception e) {
		}
		Font font = new Font("Dialog", Font.PLAIN, 12);
		f = new JFrame();
		jta = new JTextArea();
		jta.setFont(font);
		jta2 = new JTextArea();
		jta2.setFont(font);
		JScrollPane jsp = new JScrollPane(jta);
		JScrollPane jsp2 = new JScrollPane(jta2);
		jsp.setPreferredSize(new Dimension(500, 400));
		jsp2.setPreferredSize(new Dimension(300, 400));
		f.setLayout(new FlowLayout());
		f.add(jsp);
		f.add(jsp2);
		f.pack();
		f.setLocationRelativeTo(null);
		f.setDefaultCloseOperation(3);
		f.setVisible(true);
		sa = new InetSocketAddress("127.0.0.1", 1234);
		saa = new InetSocketAddress("127.0.0.1", 9999);
		new Thread() {
			public void run() {
				recMsg();
			}
		}.start();
		startThread();
	}

	public void recMsg() {
		try {
			ds = new DatagramSocket(sa);
			while (true) {
				byte[] buffer = new byte[20];
				DatagramPacket dpp = new DatagramPacket(buffer, buffer.length);
				jta.append("等待数据...\n");
				ds.receive(dpp);
				String rec = new String(dpp.getData()).trim();
				if (check(rec)) {
					jta.append(rec + "\n");
					int id = Integer
							.parseInt(rec.substring(1, rec.length() - 3));
					Msg m = new Msg(rec, id);
					list.add(m);
					send(m);
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public boolean check(String msg) {
		if (list.size() > 0) {
			for (int i = 0; i < list.size(); i++) {
				if (list.get(i).getmsg().equals("msg")) {
					send(list.get(i));
					return false;
				}
			}
		}
		return true;
	}

	public void send(Msg m) {
		String msg = "已收到第" + m.getid() + "条消息";
		try {
			DatagramPacket dp = new DatagramPacket(msg.getBytes(),
					msg.getBytes().length, saa);
			ds.send(dp);
			jta2.append(msg + "\n");
			m.setrectime(System.currentTimeMillis());
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public void startThread() {
		new Thread() {
			public void run() {
				while (true) {
					if (list.size() > 0) {
						for (int i = 0; i < list.size(); i++) {
							if (System.currentTimeMillis()
									- list.get(i).getrectime() > 5000) {
								list.remove(i);
								jta2.append("确认已收到第" + list.get(i).getid()
										+ "条消息,从队列中除去\n");
							}
						}
					}
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}.start();
	}

	public static void main(String[] args) {
		new UDPClient();
	}

	class Msg {
		public Msg(String msg, int id) {
			this.msg = msg;
			this.id = id;
		}

		public void setid(int i) {
			id = i;
		}

		public int getid() {
			return id;
		}

		public void setmsg(String newmsg) {
			msg = newmsg;
		}

		public String getmsg() {
			return msg;
		}

		public void settime(int i) {
			time = i;
		}

		public int gettime() {
			return time;
		}

		public void setrectime(long time) {
			lastrectime = time;
		}

		public long getrectime() {
			return lastrectime;
		}

		private int id;
		private String msg;
		private int time;
		private long lastrectime;
	}

}

 

 

运行结果如图所示:

 

 

 

  • 大小: 95.9 KB
0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics