[TOC]
**转载地址: https://blog.csdn.net/weixin_42089175/article/details/89045904 **
说明:《Socket网络编程进阶与实战》系列视频
概述
实现群发的效果, 客服端和客户端直接通过服务器中转消息实现通信处理。
客户端
ServerInfo
public class ServerInfo {
private int port;
private String address;
public ServerInfo(int port, String ip) {
this.port = port;
this.address = ip;
}
/**
* @return int return the port
*/
public int getPort() {
return port;
}
/**
* @param port the port to set
*/
public void setPort(int port) {
this.port = port;
}
/**
* @return String return the address
*/
public String getAddress() {
return address;
}
/**
* @param address the address to set
*/
public void setAddress(String address) {
this.address = address;
}
@Override
public String toString() {
return super.toString();
}
}
TCPClient
发送消息和接受服务器转发消息
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Scanner;
import com.chatroom.client.bean.ServerInfo;
import com.chatroom.clink.utils.CloseUtils;
public class TCPClient {
private static final String QQ_NAME = "小画: ";
public static void linkWith(ServerInfo info) throws IOException {
Socket socket = new Socket();
// 超时时间
socket.setSoTimeout(3000);
// 连接本地,端口2000;超时时间3000ms
socket.connect(new InetSocketAddress(Inet4Address.getByName(info.getAddress()), info.getPort()), 3000);
System.out.println("已发起服务器连接,并进入后续流程~");
System.out.println("客户端信息:" + socket.getLocalAddress() + " P:" + socket.getLocalPort());
System.out.println("服务器信息:" + socket.getInetAddress() + " P:" + socket.getPort());
try {
ReadHandler readHandler = new ReadHandler(socket.getInputStream());
readHandler.start();
// 发送接收数据
write(socket);
// 退出操作
readHandler.exit();
} catch (Exception e) {
System.out.println("异常关闭");
}
// 释放资源
socket.close();
System.out.println("客户端已退出~");
}
private static void write(Socket client) throws IOException {
// 构建键盘输入流
Scanner scanner = new Scanner(System.in);
// 得到Socket输出流,并转换为打印流
OutputStream outputStream = client.getOutputStream();
PrintStream socketPrintStream = new PrintStream(outputStream);
System.out.print(QQ_NAME);
while (!scanner.hasNext("bye")) {
System.out.print(QQ_NAME);
String str = scanner.nextLine();
str = QQ_NAME + str;
socketPrintStream.println(str);
}
// 资源释放
socketPrintStream.close();
}
static class ReadHandler extends Thread {
private boolean done = false;
private final InputStream inputStream;
ReadHandler(InputStream inputStream) {
this.inputStream = inputStream;
}
@Override
public void run() {
super.run();
try {
// 得到输入流,用于接收数据
BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
do {
String str;
try {
// 客户端拿到一条数据
str = socketInput.readLine();
} catch (SocketTimeoutException e) {
continue;
}
if (str == null) {
System.out.println("连接已关闭,无法读取数据!");
break;
}
// 打印到屏幕
System.out.println(str);
if (!str.contains(":")) {
System.out.print(QQ_NAME);
}
} while (!done);
} catch (Exception e) {
if (!done) {
System.out.println("连接异常断开:" + e.getMessage());
}
} finally {
// 连接关闭
CloseUtils.close(inputStream);
}
}
void exit() {
done = true;
CloseUtils.close(inputStream);
}
}
}
CloseUtils
public class CloseUtils {
public static void close(Closeable... closeables) {
if (closeables == null) {
return;
}
for (Closeable closeable : closeables) {
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
Client
public class Client {
public static void main(String[] args) {
ServerInfo info = new ServerInfo(30401,"127.0.0.1");
if (info != null) {
try {
TCPClient.linkWith(info);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
服务端
Server
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class Server {
public static void main(String[] args) throws IOException {
TCPServer tcpServer = new TCPServer(30401);
boolean isSucceed = tcpServer.start();
if (!isSucceed) {
System.out.println("Start TCP server failed!");
return;
}
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
String str;
do {
str = bufferedReader.readLine();
tcpServer.broadcast(str);
} while (!"00bye00".equalsIgnoreCase(str));
tcpServer.stop();
}
}
TCPServer
为每一个进来的客户端,重新创建一条线程。
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.chatroom.server.handle.ClientHandler;
public class TCPServer implements ClientHandler.ClientHandlerCallback {
private final int port;
private ClientListener mListener;
private List<ClientHandler> clientHandlerList = new ArrayList<>();
private final ExecutorService forwardingThreadPoolExecutor;
public TCPServer(int port) {
this.port = port;
// 转发线程池
this.forwardingThreadPoolExecutor = Executors.newSingleThreadExecutor();
}
public boolean start() {
try {
ClientListener listener = new ClientListener(port);
mListener = listener;
listener.start();
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
public void stop() {
if (mListener != null) {
mListener.exit();
}
synchronized (TCPServer.this) {
for (ClientHandler clientHandler : clientHandlerList) {
clientHandler.exit();
}
clientHandlerList.clear();
}
// 停止线程池
forwardingThreadPoolExecutor.shutdownNow();
}
public synchronized void broadcast(String str) {
for (ClientHandler clientHandler : clientHandlerList) {
clientHandler.send(str);
}
}
@Override
public synchronized void onSelfClosed(ClientHandler handler) {
clientHandlerList.remove(handler);
}
@Override
public void onNewMessageArrived(final ClientHandler handler, final String msg) {
// 打印到屏幕
System.out.println("Received-" + handler.getClientInfo() + ":" + msg);
// 异步提交转发任务
forwardingThreadPoolExecutor.execute(() -> {
synchronized (TCPServer.this) {
for (ClientHandler clientHandler : clientHandlerList) {
if (clientHandler.equals(handler)) {
// 跳过自己
continue;
}
// 对其他客户端发送消息
clientHandler.send(msg);
}
}
});
}
private class ClientListener extends Thread {
private ServerSocket server;
private boolean done = false;
private ClientListener(int port) throws IOException {
server = new ServerSocket(port);
System.out.println("服务器信息:" + server.getInetAddress() + " P:" + server.getLocalPort());
}
@Override
public void run() {
super.run();
System.out.println("服务器准备就绪~");
// 等待客户端连接
do {
// 得到客户端
Socket client;
try {
client = server.accept();
} catch (IOException e) {
continue;
}
try {
// 客户端构建异步线程
ClientHandler clientHandler = new ClientHandler(client, TCPServer.this);
// 读取数据并打印
clientHandler.readToPrint();
// 添加同步处理
synchronized (TCPServer.this) {
clientHandlerList.add(clientHandler);
}
} catch (IOException e) {
e.printStackTrace();
System.out.println("客户端连接异常:" + e.getMessage());
}
} while (!done);
System.out.println("服务器已关闭!");
}
void exit() {
done = true;
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
ClientHandler
读和写采用不同的线程
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.chatroom.clink.utils.CloseUtils;
public class ClientHandler {
private final Socket socket;
private final ClientReadHandler readHandler;
private final ClientWriteHandler writeHandler;
private final ClientHandlerCallback clientHandlerCallback;
private final String clientInfo;
public ClientHandler(Socket socket, ClientHandlerCallback clientHandlerCallback) throws IOException {
this.socket = socket;
this.readHandler = new ClientReadHandler(socket.getInputStream());
this.writeHandler = new ClientWriteHandler(socket.getOutputStream());
this.clientHandlerCallback = clientHandlerCallback;
this.clientInfo = "A[" + socket.getInetAddress().getHostAddress()
+ "] P[" + socket.getPort() + "]";
System.out.println("新客户端连接:" + clientInfo);
}
public String getClientInfo() {
return clientInfo;
}
public void exit() {
readHandler.exit();
writeHandler.exit();
CloseUtils.close(socket);
System.out.println("客户端已退出:" + socket.getInetAddress() +
" P:" + socket.getPort());
}
public void send(String str) {
writeHandler.send(str);
}
public void readToPrint() {
readHandler.start();
}
private void exitBySelf() {
exit();
clientHandlerCallback.onSelfClosed(this);
}
public interface ClientHandlerCallback {
// 自身关闭通知
void onSelfClosed(ClientHandler handler);
// 收到消息时通知
void onNewMessageArrived(ClientHandler handler, String msg);
}
class ClientReadHandler extends Thread {
private boolean done = false;
private final InputStream inputStream;
ClientReadHandler(InputStream inputStream) {
this.inputStream = inputStream;
}
@Override
public void run() {
super.run();
try {
// 得到输入流,用于接收数据
BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
do {
// 客户端拿到一条数据
String str = socketInput.readLine();
if (str == null) {
System.out.println("客户端已无法读取数据!");
// 退出当前客户端
ClientHandler.this.exitBySelf();
break;
}
// 通知到TCPServer
clientHandlerCallback.onNewMessageArrived(ClientHandler.this, str);
} while (!done);
} catch (Exception e) {
if (!done) {
System.out.println("连接异常断开");
ClientHandler.this.exitBySelf();
}
} finally {
// 连接关闭
CloseUtils.close(inputStream);
}
}
void exit() {
done = true;
CloseUtils.close(inputStream);
}
}
class ClientWriteHandler {
private boolean done = false;
private final PrintStream printStream;
private final ExecutorService executorService;
ClientWriteHandler(OutputStream outputStream) {
this.printStream = new PrintStream(outputStream);
this.executorService = Executors.newSingleThreadExecutor();
}
void exit() {
done = true;
CloseUtils.close(printStream);
executorService.shutdownNow();
}
void send(String str) {
if(done){
return;
}
executorService.execute(new WriteRunnable(str));
}
class WriteRunnable implements Runnable {
private final String msg;
WriteRunnable(String msg) {
this.msg = msg;
}
@Override
public void run() {
if (ClientWriteHandler.this.done) {
return;
}
try {
ClientWriteHandler.this.printStream.println(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
效果如下
注意
从上面的代码可以看到, 我们系统中使用了非常多的线程,其中客户端来一个就开启一个线程监听,同时我们的每一个客户端的读和写都是单独的线程处理。 所以性能并不理想。后期改进处理。
「真诚赞赏,手留余香」
真诚赞赏,手留余香
使用微信扫描二维码完成支付