Java知识分享
热爱技术,分享技术

多线程设计模式之Thread Per Message设计模式

1. 什么是Thread Per Message模式

Thread Per Message的意思是为每一个消息的处理开辟一个线程使得消息能够以并发的方式进行处理,从而提高系统整体的吞吐能力。这就好比电话接线员一样,收到的每一个电话投诉或者业务的处理请求,都会提交对应的工单,然后交由对应的工作人员来处理。

多线程设计模式之Thread Per Message设计模式插图
多线程设计模式

2. 每个任务一个线程

在本节中,我们首先实现-一个简单的Thread-Per-Message,但是在开发中不建议采用这种方式,在后文中会对图此进行详细解释。Request 的代码如下

public class Request {
private final String business;

public Request(String business) {
this.business = business;
}

@Override
public String toString() {
return business;
}
}

客户提交的任何业务请求都会被封装成Request对象。

public class Operator {
public void call(String business) {
TaskHandler taskHandler = new TaskHandler(new Request(business));
new Thread(taskHandler).start();
}
}

Operator代表了接线员,当有电话打进来时,话务员会将客户的请求封装成-一工单Request,然后开辟-一个线程(工作人员)去处理。截至目前,我们完成了关于Thread-Per-Message的设计,但是这种设计方式存在着很严重的问题,我们知道每一个JVM中可创建的线程数量是有限的,针对每一个任务都创建一个新的线程,假如每–个线程执行的时间比较长,那么在某个时刻JVM会由于无法再创建新的线程而导致栈内存的溢出;再假如每–个任务的执行时间都比较短,频繁地创建销毁线程对系统性能的开销也一个不小的影响。

这种处理方式虽然有很多问题,但不代表其就一无是处了,其实它也有自己的使用场景,比如在基于Event的编程模型中,当系统初始化事件发生时,需要进行若干资源的后台加载,由于系统初始化时的任务数量并不多,可以考虑使用该模式响应初始化Event,或者系统在关闭时,进行资源回收也可以考虑将销毁事件触发的动作交给该模式。

我们可以将call方法中的创建新线程的方式交给线程池去处理,这样可以避免线程频繁创建和销毁带来的系统开销,还能将线程数量控制在-一个可控的范围之内。

下面使用线程池重构Operator

public class Operator {
private final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 100,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());

public void call(String business) {
TaskHandler taskHandler = new TaskHandler(new Request(business));
poolExecutor.execute(taskHandler);
}
}

3. 多用户的网络聊天

Thread-Per-Message模式在网络通信中的使用也是非常广泛的,比如在本节中介绍的网络聊天程序,在服务端每一个连接到服务端的连接都将创建一个独立的线程进行处理,当客户端的连接数超过了服务端的最大受理能力时,客户端将被存放至排队队列中。

3.1 服务端程序

下面编写服务端程序ChatServer用于接收来自客户端的链接,并且与之进行TCP通信交互,当服务器端接收到了每一次的客户端连接后便会给线程池提交-一个任务用于与客户端进行交互,进而提高并发响应能力。

/**
* 聊天服务端
* by hackcloud.cn
*/
public class ChatServer {

//服务端接口
private final int port;
//定义线程池
private ThreadPoolExecutor threadPool;
//服务端Socket
private ServerSocket serverSocket;


public ChatServer(int port) {
this.port = port;
}

//默认端口
public ChatServer() {
this(13312);
}

public void startServer() throws Exception {
//创建线程池,核心线程数量为2,最大线程数为4,队列中最大可加入500个任务
this.threadPool = new ThreadPoolExecutor(2, 4,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(500));
this.serverSocket = new ServerSocket(port);
this.serverSocket.setReuseAddress(true);
this.listen();
}

private void listen() throws IOException {
for (; ; ) {
final Socket client = serverSocket.accept();
this.threadPool.execute(new ClientHandler(client));
}
}
}

在上面的程序中,当接收到了新的客户端连接时,会为每一个客户端连接创建一个线程ClientHandler与客户端进行交互,当客户端的连接个数超过线程池的最大数量时,客户端虽然可以成功接人服务端,但是会进人阻塞队列

3.2 响应客户端连接的Handler

待服务端接收到客户端的连接之后,便会创建一个新的ChatHandler任务提交给线程池ChatHandler任务是Runnable接口的实现,主要负责和客户端进行消息通信。

package com.ibfbc.concurrency.per.chat;

import java.io.*;
import java.net.Socket;
/**
* 客户端
* Created by laijun.vip
*/
public class ClientHandler implements Runnable {
    private final Socket socket;
    private final String clientIdentify;

    public ClientHandler(final Socket socket) {
        this.socket = socket;
        this.clientIdentify = socket.getInetAddress().getHostAddress() + ":" + socket.getPort();
    }

    @Override
    public void run() {
        try {
            this.chat();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void chat() throws IOException {
        BufferedReader bufferedReader = wrap2Reader(this.socket.
                getInputStream());
        PrintStream printStream = wrap2Print(this.socket.getOutputStream());
        String received;
        while ((received = bufferedReader.readLine()) != null) {
            //将客户端发送的消息输出到控制台
            System.out.printf("cl ient: 8s-message :8s\n", clientIdentify, received);
            if (received.equals("quit")) {
                //1如果客户端发送了quit指令,则断开与客户端的连接
                write2Client(printStream, "client will close");
                socket.close();
                break;
            }
            //向客户端发送消息
            write2Client(printStream, "Server:" + received);
        }

    }

    //将输入字节流封装成BufferedReader缓冲字符流
    private BufferedReader wrap2Reader(InputStream inputStream) {
        return new BufferedReader(new InputStreamReader(inputStream));
    }
    //将输出字节流封装成PrintStream

    private PrintStream wrap2Print(OutputStream outputStream) {
        return new PrintStream(outputStream);
    }

    //该方法主要用于向客户端发送消息
    private void write2Client(PrintStream print, String message) {
        print.println(message);
        print.flush();

    }
}

上面的通信方式是一种典型的一问一答的聊天方式,客户端连接后发送消息给服务端,服务端回复消息给客户端,每–个线程负责处理一个来自客户端的连接。

3.3 聊天程序测试

public static void main(String[] args) throws Exception {
new ChatServer().startServer();
}
多线程设计模式之Thread Per Message设计模式插图(1)

4. 总结

Thread-Per-Message设计模式在日常的开发中非常常见,但是也要灵活使用,比如为了避免频繁创建线程带来的系统开销,可以用线程池来代替

打赏
本站所有资源均来源于网络,仅供学习使用,请支持正版!Java技术开源 » 多线程设计模式之Thread Per Message设计模式

评论 抢沙发

评论前必须登录!

 

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续提供更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫

微信扫一扫

登录

找回密码

注册