应用场景:
在RPC框架中,使用Netty作为高性能的网络通信框架时,每一次服务调用,都需要与Netty服务端建立连接的话,很容易导致Netty服务器资源耗尽。所以,想到连接池技术,将与同一个Netty服务器地址建立的连接放入池中维护,同一个地址的连接确保只建立一次。这样,凡是连接同一个Netty服务器的客户端,拿到的都是同一个连接,不需要新建连接,就可以大大减少连接的个数,从而大幅度提升服务器性能。
用一张图说明一下设计思路:
解释一下,途中主要定义了两个类,ConnectClient是负责管理连接的,是一个抽象类,init和send是抽象方法,而NettyClient是负责与Netty服务器通信的,它继承了ConnectClient,并实现了它的几个抽象方法,init方法中会与Netty服务器建立连接,send方法会向Netty服务器发送消息。
值得注意的是,ConnectClient类中有一个非抽象方法,就是asyncSend(),它里面调用了自己的send()抽象方法。抽象方法不能直接调用,必须拿到NettyClient这个具体实现类的实例对象才能调。这个asyncSend()方法有一个关键的入参,即Class<? extends ConnectClient> netImpl,它是一个Class类型,是从外部传进来的,所以这就比较有灵活性了,好处就是这个ConnectClient类不需要依赖任何具体的实现,只要传进一个自己的子类的Class即可,它就可以用这个Class通过反射的方式创建出具体的实现类的实例对象,然后调用其send方法。可以理解成ConnectClient用asyncSend方法包装了NettyClient的send方法,目的是让外部不要直接调用NettyClient中的send方法,而是调用自己的asyncSend方法,然后在这个asyncSend方法中,会先获取连接,再调用NettyClient中的send方法发送消息。
模拟代码:
下面就通过几段代码来模拟Tcp客户端和Tcp服务器端建立连接并发送消息的场景。 这里并没有真的使用Netty框架,因为本文不是讲怎么使用Netty框架,而是分享如何管理连接。
首先,模拟一个Tcp服务端程序(就当做是Netty的服务器):
1 /** 2 * 模拟TCP服务端 3 * 4 * @author syj 5 */ 6 public class NetChannel { 7 8 /** 9 * 建立连接10 *11 * @param host12 * @param port13 */14 public void connect(String host, int port) {15 System.out.println("模拟连接TCP服务器成功: host=" + host + ",port=" + port);16 }17 18 /**19 * 发送消息20 *21 * @param msg22 */23 public void send(String msg) {24 System.out.println("模拟向TCP服务器发送消息成功:" + msg);25 }26 }
定义一个Tcp客户端,负责与Netty服务器通信:
1 /** 2 * 模拟TCP客户端 3 * 4 * @author syj 5 */ 6 public class NetClient extends ConnectClient { 7 8 // 模拟TCP服务器 9 private NetChannel channel;10 11 /**12 * 建立连接13 *14 * @param address 格式 host:port, 例如 192.168.1.103:999915 * @throws Exception16 */17 @Override18 public void init(String address) throws Exception {19 if (address == null || address.trim().length() == 0) {20 throw new RuntimeException(">>>> address error");21 }22 String[] split = address.split(":");23 if (split.length != 2) {24 throw new RuntimeException(">>>> address error");25 }26 String host = split[0];27 int port = Integer.valueOf(split[1]);28 channel = new NetChannel();29 channel.connect(host, port);30 }31 32 /**33 * 发送消息34 *35 * @param msg36 * @throws Exception37 */38 @Override39 public void send(String msg) throws Exception {40 channel.send(msg);41 }42 }
连接管理类:
该类使用一个ConcurrentHashMap作为连接池,来保存与TCP服务器建立的连接,key是TCP服务器的地址,value是连接对象。
由于是多线程环境,为保证线程安全问题,使用synchronized加锁,避免一个连接被创建多次。
由于可能会有很多针对同一个TCP服务器的连接请求,使用lockClientMap来管理锁,同一个TCP服务器的请求使用同一把锁,保证同一个TCP服务器的连接只创建一次。
这样既保证了线程安全,又能降低性能消耗。
1 import java.util.concurrent.ConcurrentHashMap; 2 3 /** 4 * TCP连接管理 5 * 6 * @author syj 7 */ 8 public abstract class ConnectClient { 9 10 /**11 * 建立连接12 *13 * @param address14 * @throws Exception15 */16 public abstract void init(String address) throws Exception;17 18 /**19 * 发送消息20 *21 * @param msg22 * @throws Exception23 */24 public abstract void send(String msg) throws Exception;25 26 /**27 * 发送消息28 *29 * @param address30 * @param msg31 * @param netImpl32 * @throws Exception33 */34 public static void asyncSend(String address, String msg, Class netImpl) throws Exception {35 ConnectClient connect = ConnectClient.getConnect(address, netImpl);36 connect.send(msg);37 }38 39 // 连接池40 private static volatile ConcurrentHashMapconnectClientMap;41 // 锁42 private static volatile ConcurrentHashMap lockClientMap = new ConcurrentHashMap<>();43 44 /**45 * 获取连接46 * 确保同一个TCP服务器地址对应的连接只建立一次47 *48 * @param netImpl49 * @return50 * @throws Exception51 */52 public static ConnectClient getConnect(String address, Class netImpl) throws Exception {53 // 创建连接池54 if (connectClientMap == null) {55 synchronized (ConnectClient.class) {56 if (connectClientMap == null) {57 connectClientMap = new ConcurrentHashMap<>();58 }59 }60 }61 62 // 获取连接63 ConnectClient connectClient = connectClientMap.get(address);64 if (connectClient != null) {65 return connectClient;66 }67 68 // 获取锁,同一个地址使用同一把锁69 Object lock = lockClientMap.get(address);70 if (lock == null) {71 lockClientMap.putIfAbsent(address, new Object());72 lock = lockClientMap.get(address);73 }74 synchronized (lock) {75 connectClient = connectClientMap.get(address);76 if (connectClient != null) {77 return connectClient;78 }79 80 // 新建连接81 ConnectClient client = netImpl.newInstance();82 client.init(address);83 // 放入连接池84 connectClientMap.put(address, client);85 }86 connectClient = connectClientMap.get(address);87 return connectClient;88 }89 }
任务类用于并发连接测试:
1 import java.util.UUID; 2 3 /** 4 * 任务 5 * 6 * @author syj 7 */ 8 public class Task implements Runnable { 9 10 private Class netType;// 客户端类型11 private String address;12 private long count;13 14 public Task(String address, long count, Class netType) {15 this.address = address;16 this.count = count;17 this.netType = netType;18 }19 20 @Override21 public void run() {22 try {23 String uuid = UUID.randomUUID().toString().replace("-", "");24 String msg = String.format("%s \t %s \t %s \t %s", Thread.currentThread().getName(), count, address, uuid);25 ConnectClient.asyncSend(address, msg, netType);26 // 模拟业务耗时27 Thread.sleep((long) (Math.random() * 1000));28 } catch (Exception e) {29 e.printStackTrace();30 }31 }32 }
测试类(模拟了10个TCP服务器的地址和端口):
通过一个死循环来模拟测试高并发场景下,连接的线程安全和性能表现。
1 import java.util.concurrent.ExecutorService; 2 import java.util.concurrent.Executors; 3 4 /** 5 * 模拟TCP客户端并发获取连接发送消息 6 * 7 * @author syj 8 */ 9 public class App {10 11 // TCP服务器通信地址和端口12 public static final String[] NET_ADDRESS_ARR = {13 "192.168.1.101:9999",14 "192.168.1.102:9999",15 "192.168.1.103:9999",16 "192.168.1.104:9999",17 "192.168.1.105:9999",18 "192.168.1.106:9999",19 "192.168.1.107:9999",20 "192.168.1.108:9999",21 "192.168.1.109:9999",22 "192.168.1.110:9999"23 };24 25 public static ExecutorService executorService = Executors.newCachedThreadPool();26 public static volatile long count;// 统计任务执行总数27 28 public static void main(String[] args) {29 while (true) {30 try {31 Thread.sleep(5);// 防止 CPU 100%32 } catch (InterruptedException e) {33 e.printStackTrace();34 }35 executorService.execute(new Task(NET_ADDRESS_ARR[(int) (Math.random() * 10)], ++count, NetClient.class));36 executorService.execute(new Task(NET_ADDRESS_ARR[(int) (Math.random() * 10)], ++count, NetClient.class));37 }38 }39 }
测试结果:
1 模拟连接TCP服务器成功: host=192.168.1.107,port=9999 2 模拟向TCP服务器发送消息成功:pool-1-thread-14 14 192.168.1.107:9999 3f31022b959e4962b00b0719fa206416 3 模拟连接TCP服务器成功: host=192.168.1.108,port=9999 4 模拟向TCP服务器发送消息成功:pool-1-thread-37 37 192.168.1.108:9999 2e4e4c6db63145f190f76d1dbe59f1c4 5 模拟连接TCP服务器成功: host=192.168.1.106,port=9999 6 模拟向TCP服务器发送消息成功:pool-1-thread-49 49 192.168.1.106:9999 e50ea4937c1c4425b647e4606ced7a1f 7 模拟连接TCP服务器成功: host=192.168.1.103,port=9999 8 模拟向TCP服务器发送消息成功:pool-1-thread-17 17 192.168.1.103:9999 21cfcd3665aa4688aea0ac90b68e5a22 9 模拟连接TCP服务器成功: host=192.168.1.102,port=999910 模拟向TCP服务器发送消息成功:pool-1-thread-25 25 192.168.1.102:9999 bbdbde3e28ab4ac0901c1447ac3ddd3f11 模拟连接TCP服务器成功: host=192.168.1.101,port=999912 模拟向TCP服务器发送消息成功:pool-1-thread-10 10 192.168.1.101:9999 08cc445cc06a44f5823a8487d05e3e3013 模拟连接TCP服务器成功: host=192.168.1.105,port=999914 模拟向TCP服务器发送消息成功:pool-1-thread-45 45 192.168.1.105:9999 3e925cf96b874ba09c59e63613e6066215 模拟连接TCP服务器成功: host=192.168.1.104,port=999916 模拟向TCP服务器发送消息成功:pool-1-thread-53 53 192.168.1.104:9999 2408dab5c0ca480b8c2593311f3ec7d517 模拟向TCP服务器发送消息成功:pool-1-thread-13 13 192.168.1.105:9999 5a3c0f86046f4cb99986d0281e567e3118 模拟向TCP服务器发送消息成功:pool-1-thread-36 36 192.168.1.107:9999 b85d9d79461d4345a2da8f8dd00a572a19 模拟向TCP服务器发送消息成功:pool-1-thread-9 9 192.168.1.102:9999 c2895f68a33745d7a4370034b647446120 模拟向TCP服务器发送消息成功:pool-1-thread-41 41 192.168.1.102:9999 a303193a58204e7fadaf64cec8eaa86d21 模拟向TCP服务器发送消息成功:pool-1-thread-59 59 192.168.1.101:9999 08785c0acfc14c618cf3762d35055e9b22 模拟向TCP服务器发送消息成功:pool-1-thread-54 54 192.168.1.107:9999 6fa8e3939a904271b03b78204d4a146a23 模拟向TCP服务器发送消息成功:pool-1-thread-15 15 192.168.1.102:9999 229989d1405b49cdb31052b081a3386924 模拟向TCP服务器发送消息成功:pool-1-thread-7 7 192.168.1.107:9999 8e3c8d1007a34a01b166101fae30449c25 模拟连接TCP服务器成功: host=192.168.1.109,port=999926 模拟向TCP服务器发送消息成功:pool-1-thread-8 8 192.168.1.109:9999 ca63dd93685641d19c875e4809e9a8dc27 模拟向TCP服务器发送消息成功:pool-1-thread-1 1 192.168.1.106:9999 cd9f473797de46ef8361f3b8b0a6d57528 模拟向TCP服务器发送消息成功:pool-1-thread-27 27 192.168.1.102:9999 872d825fd64e409d8b992e12e0372daa29 模拟向TCP服务器发送消息成功:pool-1-thread-3 3 192.168.1.103:9999 baace7f8f06242f68cac0c43337e49cf30 模拟向TCP服务器发送消息成功:pool-1-thread-39 39 192.168.1.108:9999 bc0d70348f574cbba449496b3142e51831 模拟向TCP服务器发送消息成功:pool-1-thread-55 55 192.168.1.106:9999 95ba7c57a1d84c18a6ab328eb01e85f132 模拟向TCP服务器发送消息成功:pool-1-thread-38 38 192.168.1.108:9999 a571001c573c4851a4bb1e0dcb9a204a33 模拟向TCP服务器发送消息成功:pool-1-thread-4 4 192.168.1.104:9999 dcdd6093afc345e39453883cf049fa2134 模拟向TCP服务器发送消息成功:pool-1-thread-28 28 192.168.1.106:9999 0ba4362898f84335bb336d17780855fc35 模拟向TCP服务器发送消息成功:pool-1-thread-47 47 192.168.1.108:9999 db993121a9934558942a09a9d9a8e03f36 模拟向TCP服务器发送消息成功:pool-1-thread-30 30 192.168.1.102:9999 a0e50592deca471b9c5982c83d00f30337 模拟连接TCP服务器成功: host=192.168.1.110,port=999938 模拟向TCP服务器发送消息成功:pool-1-thread-51 51 192.168.1.110:9999 41703aba37ca47148d23d6826264a05a39 模拟向TCP服务器发送消息成功:pool-1-thread-5 5 192.168.1.102:9999 15f453cc0a7743f79dc105963f39f94640 模拟向TCP服务器发送消息成功:pool-1-thread-52 52 192.168.1.105:9999 9ca521963bf84c418335e7702e471fa941 模拟向TCP服务器发送消息成功:pool-1-thread-40 40 192.168.1.101:9999 bec1d265b7dc46f5afebc42fea10a31342 模拟向TCP服务器发送消息成功:pool-1-thread-26 26 192.168.1.104:9999 a44662dc498045e78eb531b6ee6fc27b43 模拟向TCP服务器发送消息成功:pool-1-thread-11 11 192.168.1.109:9999 6104c4fd2dab4d44af86f0cd1e3e272d44 模拟向TCP服务器发送消息成功:pool-1-thread-24 24 192.168.1.105:9999 344025da2a6c4190a87403c5d96b321e45 模拟向TCP服务器发送消息成功:pool-1-thread-22 22 192.168.1.110:9999 aa0b4c48527446738d28e99eef4957f546 模拟向TCP服务器发送消息成功:pool-1-thread-23 23 192.168.1.107:9999 79f5fc4278164cd68ac1a260322e6f6847 模拟向TCP服务器发送消息成功:pool-1-thread-56 56 192.168.1.109:9999 39c38939ced140058f25fe903a3b1f4f48 模拟向TCP服务器发送消息成功:pool-1-thread-18 18 192.168.1.109:9999 c29ea09b5f264b488f3e15e91c5f2bd5
可见,与每个TCP服务器的连接只会建立一次,连接得到复用。