Browse Source

1、分批次执行线程;

prod_202403
mh 7 months ago
parent
commit
d8abc75341
  1. 5
      user-service/src/main/java/com/mh/user/job/DealDataJob.java
  2. 122
      user-service/src/test/java/com/mh/user/TestJwtUtils.java

5
user-service/src/main/java/com/mh/user/job/DealDataJob.java

@ -93,18 +93,21 @@ public class DealDataJob {
// 分组data_com // 分组data_com
Map<String, List<DeviceCodeParamEntity>> dataComMap = deviceParamsByType.stream().collect(Collectors.groupingBy(DeviceCodeParamEntity::getDataCom)); Map<String, List<DeviceCodeParamEntity>> dataComMap = deviceParamsByType.stream().collect(Collectors.groupingBy(DeviceCodeParamEntity::getDataCom));
int batchSize = 10; // 定义一个批次大小 int batchSize = 10; // 定义一个批次大小
int index = 0;
for (int k = 0; k < dataComMap.size(); ) { for (int k = 0; k < dataComMap.size(); ) {
CountDownLatch countDownLatch = new CountDownLatch(Math.min(batchSize, dataComMap.size() - k)); CountDownLatch countDownLatch = new CountDownLatch(Math.min(batchSize, dataComMap.size() - k));
index = k;
for (int j = 0; j < Math.min(batchSize, dataComMap.size() - k); j++) { for (int j = 0; j < Math.min(batchSize, dataComMap.size() - k); j++) {
if (Constant.WEB_FLAG) { if (Constant.WEB_FLAG) {
break; break;
} }
// 获取data_com口 // 获取data_com口
String dataCom = dataComMap.keySet().toArray(new String[0])[j]; String dataCom = dataComMap.keySet().toArray(new String[0])[index];
SerialPortThread myThread = new SerialPortThread(); SerialPortThread myThread = new SerialPortThread();
myThread.setName(threadName, dataCom.toLowerCase().replace("com", ""), countDownLatch); myThread.setName(threadName, dataCom.toLowerCase().replace("com", ""), countDownLatch);
Thread thread = new Thread(myThread); Thread thread = new Thread(myThread);
comThreadPool.execute(thread); comThreadPool.execute(thread);
index++;
} }
// 等待执行完成 // 等待执行完成
countDownLatch.await(); countDownLatch.await();

122
user-service/src/test/java/com/mh/user/TestJwtUtils.java

@ -1,10 +1,14 @@
package com.mh.user; package com.mh.user;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.mh.user.constants.Constant;
import com.mh.user.model.SysUser; import com.mh.user.model.SysUser;
import com.mh.user.serialport.SerialPortThread;
import com.mh.user.utils.ComThreadPoolService;
import io.jsonwebtoken.Claims; import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jwts; import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm; import io.jsonwebtoken.SignatureAlgorithm;
import org.checkerframework.checker.units.qual.K;
import sun.security.provider.MD5; import sun.security.provider.MD5;
import java.math.BigInteger; import java.math.BigInteger;
@ -13,6 +17,8 @@ import java.security.NoSuchAlgorithmException;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
/** /**
* @author ljf * @author ljf
@ -131,6 +137,7 @@ public class TestJwtUtils {
// //
// } // }
static ThreadPoolExecutor comThreadPool = ComThreadPoolService.getInstance();
/** /**
* eyJhbGciOiJIUzI1NiJ9. * eyJhbGciOiJIUzI1NiJ9.
@ -139,41 +146,90 @@ public class TestJwtUtils {
* *
* @param args * @param args
*/ */
public static void main(String[] args) throws InterruptedException {
public static void main(String[] args) { // JSONObject jsonObject = new JSONObject();
JSONObject jsonObject = new JSONObject(); // jsonObject.put("no", "abc");
jsonObject.put("no", "abc"); // jsonObject.put("model", 1);
jsonObject.put("model", 1); // jsonObject.put("scheme", 202);
jsonObject.put("scheme", 202); // String inputString = "85b5cebb6132a189570d51bf5e34d93c"+jsonObject.toJSONString();
String inputString = "85b5cebb6132a189570d51bf5e34d93c"+jsonObject.toJSONString(); //
// try {
try { // // 获取MessageDigest实例用于MD5算法
// 获取MessageDigest实例用于MD5算法 // MessageDigest md = MessageDigest.getInstance("MD5");
MessageDigest md = MessageDigest.getInstance("MD5"); //
// // 将输入字符串转换为字节数组并进行更新
// 将输入字符串转换为字节数组并进行更新 // byte[] messageBytes = inputString.getBytes();
byte[] messageBytes = inputString.getBytes(); // md.update(messageBytes);
md.update(messageBytes); //
// // 完成哈希计算并获取结果字节数组
// 完成哈希计算并获取结果字节数组 // byte[] digestBytes = md.digest();
byte[] digestBytes = md.digest(); //
// // 将字节数组转换为大写的十六进制字符串表示形式
// 将字节数组转换为大写的十六进制字符串表示形式 // BigInteger no = new BigInteger(1, digestBytes);
BigInteger no = new BigInteger(1, digestBytes); // String hashtext = no.toString(16);
String hashtext = no.toString(16); //
// // 如果生成的十六进制字符串长度小于32,则前面补0以达到32位
// 如果生成的十六进制字符串长度小于32,则前面补0以达到32位 // while (hashtext.length() < 32) {
while (hashtext.length() < 32) { // hashtext = "0" + hashtext;
hashtext = "0" + hashtext; // }
//
// System.out.println("MD5 Hash: " + hashtext);
//
// } catch (NoSuchAlgorithmException e) {
// // 处理MD5算法不可用的情况
// e.printStackTrace();
// }
for (int a = 0; a < 100; a++) {
if (comThreadPool.isShutdown()) {
comThreadPool = ComThreadPoolService.getInstance();
} }
Map<String, Object> dataComMap = new HashMap<>();
System.out.println("MD5 Hash: " + hashtext); dataComMap.put("1", "1");
dataComMap.put("2", "1");
} catch (NoSuchAlgorithmException e) { dataComMap.put("3", "1");
// 处理MD5算法不可用的情况 dataComMap.put("4", "1");
e.printStackTrace(); dataComMap.put("5", "1");
dataComMap.put("6", "1");
dataComMap.put("7", "1");
dataComMap.put("8", "1");
dataComMap.put("9", "1");
dataComMap.put("10", "1");
dataComMap.put("11", "1");
dataComMap.put("12", "1");
int batchSize = 5; // 定义一个批次大小
int index = 0;
for (int k = 0; k < dataComMap.size(); ) {
System.out.println("第几轮==>" + k);
CountDownLatch countDownLatch = new CountDownLatch(Math.min(batchSize, dataComMap.size() - k));
index = k;
for (int j = 0; j < Math.min(batchSize, dataComMap.size() - k); j++) {
if (Constant.WEB_FLAG) {
break;
}
// 获取data_com口
String dataCom = dataComMap.keySet().toArray(new String[0])[index];
index++;
CountDownLatch finalCountDownLatch = countDownLatch;
Thread thread = new Thread(() -> {
finalCountDownLatch.countDown();
System.out.println(Thread.currentThread() + ": " + dataCom);
});
comThreadPool.execute(thread);
}
// 等待执行完成
countDownLatch.await();
// 释放资源
countDownLatch = null;
// 检查是否需要继续创建新的线程(未达到数据总数量且WEB_FLAG为false)
if (k + batchSize < dataComMap.size() && !Constant.WEB_FLAG) {
k += batchSize;
} else {
break;
}
}
System.out.println("结束==》"+a);
comThreadPool.shutdown();
} }
} }
} }

Loading…
Cancel
Save