diff --git a/mh-admin/src/main/java/com/mh/web/controller/energy/SysEnergyAnalyzeController.java b/mh-admin/src/main/java/com/mh/web/controller/energy/SysEnergyAnalyzeController.java index cfafe8c..8720649 100644 --- a/mh-admin/src/main/java/com/mh/web/controller/energy/SysEnergyAnalyzeController.java +++ b/mh-admin/src/main/java/com/mh/web/controller/energy/SysEnergyAnalyzeController.java @@ -8,10 +8,13 @@ import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.mh.common.core.controller.BaseController; import com.mh.common.core.domain.AjaxResult; +import com.mh.common.core.domain.ColumnData; import com.mh.common.core.domain.ColumnFilter; +import com.mh.common.core.domain.entity.CollectionParamsManage; import com.mh.common.core.domain.entity.SysDictData; import com.mh.common.core.domain.vo.EnergyQueryVO; import com.mh.common.core.page.TableDataInfo; +import com.mh.system.service.device.ICollectionParamsManageService; import com.mh.system.service.energy.EnergyAnalyzeService; import jakarta.annotation.Resource; import jakarta.servlet.http.HttpServletResponse; @@ -27,6 +30,7 @@ import java.net.URLEncoder; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; /** * @author LJF @@ -44,7 +48,7 @@ public class SysEnergyAnalyzeController extends BaseController { private EnergyAnalyzeService energyAnalyzeService; @Resource - private DeviceManageService deviceManageService; + private ICollectionParamsManageService collectionParamsManageService; /** * 整体机房设备组图形,表格数据查询(公用一个接口) @@ -158,8 +162,8 @@ public class SysEnergyAnalyzeController extends BaseController { * @return */ @PostMapping("/sys/analyze/device") - public HttpResult deviceAnalyze(@RequestBody EnergyQueryVO vo) { - return energyAnalyzeService.deviceAnalyze(vo); + public AjaxResult deviceAnalyze(@RequestBody EnergyQueryVO vo) { + return AjaxResult.success(energyAnalyzeService.deviceAnalyze(vo)); } /** @@ -173,8 +177,7 @@ public class SysEnergyAnalyzeController extends BaseController { try { String fileName = "设备能耗分析.xlsx"; // 从数据库获取数据 - PageResult data = (PageResult)energyAnalyzeService.deviceAnalyze(vo).getData(); - List> dataList = (List>) data.getContent(); + List> dataList = energyAnalyzeService.deviceAnalyze(vo); if (dataList != null) { // 设置响应格式 response.setContentType("application/vdn.ms-excel;charset=utf-8"); @@ -182,7 +185,7 @@ public class SysEnergyAnalyzeController extends BaseController { response.setCharacterEncoding("UTF-8"); // 根据deviceNum获取有多少列数据 - List deviceManageEntities = deviceManageService.queryAllDevice(); + List deviceManageEntities = collectionParamsManageService.queryAllDevice(); List> head = ListUtils.newArrayList(); List titleArr = new ArrayList<>(); @@ -195,10 +198,10 @@ public class SysEnergyAnalyzeController extends BaseController { head.add(head0); titleArr = (List) map.getValue(); for (String columnValue : titleArr) { - Optional device = deviceManageEntities.stream().filter(val -> val.getDeviceNum().equals(columnValue)).findFirst(); + Optional device = deviceManageEntities.stream().filter(val -> val.getMtNum().equals(columnValue)).findFirst(); String columnName = columnValue; if (device.isPresent()) { - DeviceManageEntity deviceManageEntity = device.get(); + CollectionParamsManage deviceManageEntity = device.get(); columnName = deviceManageEntity.getRemark(); } List head1 = ListUtils.newArrayList(); diff --git a/mh-admin/src/main/java/com/mh/web/controller/energy/SysEnergyConsumptionController.java b/mh-admin/src/main/java/com/mh/web/controller/energy/SysEnergyConsumptionController.java new file mode 100644 index 0000000..463ed63 --- /dev/null +++ b/mh-admin/src/main/java/com/mh/web/controller/energy/SysEnergyConsumptionController.java @@ -0,0 +1,83 @@ +package com.mh.web.controller.energy; + +import com.mh.common.core.controller.BaseController; +import com.mh.common.core.domain.AjaxResult; +import com.mh.common.core.domain.vo.EnergyConsumptionVO; +import com.mh.common.utils.DateUtils; +import com.mh.system.service.energy.IEnergyService; +import jakarta.annotation.Resource; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author LJF + * @version 1.0 + * @project NewZhujiang_Server + * @description 系统能耗分析 + * @date 2023/7/18 14:00:09 + */ +@RestController +@RequestMapping("/energy") +public class SysEnergyConsumptionController extends BaseController { + + @Resource + private IEnergyService energyService; + + /** + * 获取系统能耗分析 + * @param vo + * @return + */ + @PostMapping("/sys") + public AjaxResult sys(@RequestBody EnergyConsumptionVO vo) { + DateUtils.energyDateChange(vo); + return energyService.sys(vo); + } + + /** + * 获取能耗同比分析 + * @param vo + * @return + */ + @PostMapping("/yoy") + public AjaxResult yoy(@RequestBody EnergyConsumptionVO vo) { + DateUtils.energyDateChange(vo); + return energyService.yoy(vo); + } + + /** + * 获取能耗环比分析 + * @param vo + * @return + */ + @PostMapping("/mom") + public AjaxResult mom(@RequestBody EnergyConsumptionVO vo) { + DateUtils.energyDateChange(vo); + return energyService.mom(vo); + } + + /** + * 获取能耗同比环比分析 + * @param vo + * @return + */ + @PostMapping("/yoyMom") + public AjaxResult yoyMom(@RequestBody EnergyConsumptionVO vo) { + DateUtils.energyDateChange(vo); + return energyService.yoyMom(vo); + } + + /** + * 获取机房各个设备能耗 + * @param vo + * @return + */ + @PostMapping("/device") + public AjaxResult device(@RequestBody EnergyConsumptionVO vo) { + DateUtils.energyDateChange(vo); + return energyService.device(vo); + } + +} diff --git a/mh-admin/src/main/java/com/mh/web/controller/energy/SysEnergyQueryController.java b/mh-admin/src/main/java/com/mh/web/controller/energy/SysEnergyQueryController.java new file mode 100644 index 0000000..ad317c4 --- /dev/null +++ b/mh-admin/src/main/java/com/mh/web/controller/energy/SysEnergyQueryController.java @@ -0,0 +1,128 @@ +package com.mh.web.controller.energy; + +import com.alibaba.excel.EasyExcel; +import com.alibaba.excel.util.ListUtils; +import com.mh.common.core.domain.AjaxResult; +import com.mh.common.core.domain.vo.EnergyQueryVO; +import com.mh.system.service.energy.IEnergyQueryService; +import jakarta.annotation.Resource; +import jakarta.servlet.http.HttpServletResponse; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * @author LJF + * @version 1.0 + * @project NewZhujiang_Server + * @description 系统能源查询(中港项目) + * @date 2023-12-13 09:39:02 + */ +@Slf4j +@RestController +@RequestMapping("/energy") +public class SysEnergyQueryController { + + @Resource + private IEnergyQueryService energyQueryService; + + /** + * 整体机房图形,表格数据查询(公用一个接口) + * @param page + * @return + */ + @PostMapping("/sys/query") + public AjaxResult sysQuery(@RequestBody EnergyQueryVO page) { + return energyQueryService.sysQuery(page); + } + + /** + * 整体机房表格数据查询 + * @param vo + * @param response + */ + @PostMapping("/sys/query/export") + public void exportSysTable(@RequestBody EnergyQueryVO vo, HttpServletResponse response) { + // 文件名 + try { + String fileName = "机房整体能耗表.xlsx"; + // 从数据库获取数据 + List> dataList = (List>) energyQueryService.sysQuery(vo).get("data"); + if (dataList != null) { + // 设置响应格式 + response.setContentType("application/vdn.ms-excel;charset=utf-8"); + response.setHeader("Content-Disposition", "attachment; filename=\"" + URLEncoder.encode(fileName, "UTF-8") + "\""); + response.setCharacterEncoding("UTF-8"); + + List> head = ListUtils.newArrayList(); + List titleArr = new ArrayList<>(); + List timeStrArr = new ArrayList<>(); + List cold = new ArrayList<>(); + List meter = new ArrayList<>(); + List cop = new ArrayList<>(); + for (Map map : dataList) { + if (map.containsKey("titleArr")) { + titleArr = Arrays.asList((String[]) map.get("titleArr")); + List head0 = ListUtils.newArrayList(); + head0.add("日期"); + List head1 = ListUtils.newArrayList(); + head1.add("制冷量"); + List head2 = ListUtils.newArrayList(); + head2.add("耗电量"); + List head3= ListUtils.newArrayList(); + head3.add("COP"); + head.add(head0); + head.add(head1); + head.add(head2); + head.add(head3); + } + if (map.containsKey("timeStrArr")) { + timeStrArr = Arrays.asList((String[])map.get("timeStrArr")); + } + if (map.containsKey("cold")) { + cold = Arrays.asList((String[])map.get("cold")); + } + if (map.containsKey("meter")) { + meter = Arrays.asList((String[])map.get("meter")); + } + if (map.containsKey("cop")) { + cop = Arrays.asList((String[])map.get("cop")); + } + } + List> excelDataList = ListUtils.newArrayList(); + for (int i = 0; i < timeStrArr.size(); i++) { + List list1 = ListUtils.newArrayList(); + list1.add(timeStrArr.get(i)); + list1.add(cold.get(i)); + list1.add(meter.get(i)); + list1.add(cop.get(i)); + excelDataList.add(list1); + } + // 内容格式 + EasyExcel.write(response.getOutputStream()).head(head).sheet("机房整体能耗表").doWrite(excelDataList); + } + } catch (Exception e) { + log.error("下载报表异常", e); + throw new RuntimeException("下载报表异常"); + } + } + + /** + * 各个设备类型图形,表格数据查询(公用一个接口) + * @param page + * @return + */ + @PostMapping("/sys/deviceType/query") + public AjaxResult deviceTypeQuery(@RequestBody EnergyQueryVO page) { + return energyQueryService.deviceTypeQuery(page); + } + +} diff --git a/mh-admin/src/main/resources/application-dev.yml b/mh-admin/src/main/resources/application-dev.yml index 924115d..0b7c6f7 100644 --- a/mh-admin/src/main/resources/application-dev.yml +++ b/mh-admin/src/main/resources/application-dev.yml @@ -78,6 +78,18 @@ spring: max-active: 8 # #连接池最大阻塞等待时间(使用负值表示没有限制) max-wait: -1ms + # rabbitmq配置 + rabbitmq: + host: 127.0.0.1 + port: 5672 + username: eemcs + password: mh@803 + virtual-host: /eemcs + listener: + direct: + prefetch: 2 + simple: + prefetch: 2 # 数据源配置 datasource: type: com.alibaba.druid.pool.DruidDataSource diff --git a/mh-admin/src/main/resources/application-prod.yml b/mh-admin/src/main/resources/application-prod.yml index b6b4c74..0b7c6f7 100644 --- a/mh-admin/src/main/resources/application-prod.yml +++ b/mh-admin/src/main/resources/application-prod.yml @@ -43,8 +43,6 @@ spring: messages: # 国际化资源文件路径 basename: i18n/messages - profiles: - active: druid # 文件上传 servlet: multipart: @@ -80,6 +78,79 @@ spring: max-active: 8 # #连接池最大阻塞等待时间(使用负值表示没有限制) max-wait: -1ms + # rabbitmq配置 + rabbitmq: + host: 127.0.0.1 + port: 5672 + username: eemcs + password: mh@803 + virtual-host: /eemcs + listener: + direct: + prefetch: 2 + simple: + prefetch: 2 + # 数据源配置 + datasource: + type: com.alibaba.druid.pool.DruidDataSource + driverClassName: org.postgresql.Driver + druid: + # 主库数据源 + master: + #添加allowMultiQueries=true 在批量更新时才不会出错 + url: jdbc:postgresql://127.0.0.1:5432/eemcs + username: postgres + password: mh@803 + # 从库数据源 + slave: + # 从数据源开关/默认关闭 + enabled: false + url: + username: + password: + # 初始连接数 + initialSize: 5 + # 最小连接池数量 + minIdle: 10 + # 最大连接池数量 + maxActive: 20 + # 配置获取连接等待超时的时间 + maxWait: 60000 + # 配置连接超时时间 + connectTimeout: 30000 + # 配置网络超时时间 + socketTimeout: 60000 + # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 + timeBetweenEvictionRunsMillis: 60000 + # 配置一个连接在池中最小生存的时间,单位是毫秒 + minEvictableIdleTimeMillis: 300000 + # 配置一个连接在池中最大生存的时间,单位是毫秒 + maxEvictableIdleTimeMillis: 900000 + # 配置检测连接是否有效 + validationQuery: SELECT 1 + testWhileIdle: true + testOnBorrow: false + testOnReturn: false + webStatFilter: + enabled: true + statViewServlet: + enabled: true + # 设置白名单,不填则允许所有访问 + allow: + url-pattern: /druid/* + # 控制台管理用户名和密码 + login-username: mh + login-password: 123456 + filter: + stat: + enabled: true + # 慢SQL记录 + log-slow-sql: true + slow-sql-millis: 1000 + merge-sql: true + wall: + config: + multi-statement-allow: true # MyBatis配置 mybatis-plus: @@ -123,7 +194,7 @@ mqttSpring: port: 2883 username: mh password: mhtech@803 - client-id: mqtt_mz_producer_prod + client-id: mqtt_mz_producer_dev # If the protocol is ws/wss, this value is required. path: # Topics that need to be subscribed when initially connecting to mqtt, multiple topics are divided by ",". @@ -134,4 +205,4 @@ mqttSpring: protocol: WS host: 127.0.0.1 port: 8083 - path: /mqtt \ No newline at end of file + path: /mqtt diff --git a/mh-admin/src/main/resources/application-test.yml b/mh-admin/src/main/resources/application-test.yml index e5683a1..0b7c6f7 100644 --- a/mh-admin/src/main/resources/application-test.yml +++ b/mh-admin/src/main/resources/application-test.yml @@ -43,8 +43,6 @@ spring: messages: # 国际化资源文件路径 basename: i18n/messages - profiles: - active: druid # 文件上传 servlet: multipart: @@ -80,6 +78,79 @@ spring: max-active: 8 # #连接池最大阻塞等待时间(使用负值表示没有限制) max-wait: -1ms + # rabbitmq配置 + rabbitmq: + host: 127.0.0.1 + port: 5672 + username: eemcs + password: mh@803 + virtual-host: /eemcs + listener: + direct: + prefetch: 2 + simple: + prefetch: 2 + # 数据源配置 + datasource: + type: com.alibaba.druid.pool.DruidDataSource + driverClassName: org.postgresql.Driver + druid: + # 主库数据源 + master: + #添加allowMultiQueries=true 在批量更新时才不会出错 + url: jdbc:postgresql://127.0.0.1:5432/eemcs + username: postgres + password: mh@803 + # 从库数据源 + slave: + # 从数据源开关/默认关闭 + enabled: false + url: + username: + password: + # 初始连接数 + initialSize: 5 + # 最小连接池数量 + minIdle: 10 + # 最大连接池数量 + maxActive: 20 + # 配置获取连接等待超时的时间 + maxWait: 60000 + # 配置连接超时时间 + connectTimeout: 30000 + # 配置网络超时时间 + socketTimeout: 60000 + # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 + timeBetweenEvictionRunsMillis: 60000 + # 配置一个连接在池中最小生存的时间,单位是毫秒 + minEvictableIdleTimeMillis: 300000 + # 配置一个连接在池中最大生存的时间,单位是毫秒 + maxEvictableIdleTimeMillis: 900000 + # 配置检测连接是否有效 + validationQuery: SELECT 1 + testWhileIdle: true + testOnBorrow: false + testOnReturn: false + webStatFilter: + enabled: true + statViewServlet: + enabled: true + # 设置白名单,不填则允许所有访问 + allow: + url-pattern: /druid/* + # 控制台管理用户名和密码 + login-username: mh + login-password: 123456 + filter: + stat: + enabled: true + # 慢SQL记录 + log-slow-sql: true + slow-sql-millis: 1000 + merge-sql: true + wall: + config: + multi-statement-allow: true # MyBatis配置 mybatis-plus: @@ -123,7 +194,7 @@ mqttSpring: port: 2883 username: mh password: mhtech@803 - client-id: mqtt_mz_test_dev + client-id: mqtt_mz_producer_dev # If the protocol is ws/wss, this value is required. path: # Topics that need to be subscribed when initially connecting to mqtt, multiple topics are divided by ",". @@ -134,4 +205,4 @@ mqttSpring: protocol: WS host: 127.0.0.1 port: 8083 - path: /mqtt \ No newline at end of file + path: /mqtt diff --git a/mh-common/src/main/java/com/mh/common/constant/Constants.java b/mh-common/src/main/java/com/mh/common/constant/Constants.java index 56c52c7..2246c12 100644 --- a/mh-common/src/main/java/com/mh/common/constant/Constants.java +++ b/mh-common/src/main/java/com/mh/common/constant/Constants.java @@ -182,6 +182,11 @@ public class Constants { public static final String COOL_TOWER_TYPE = "tower"; // 冷却塔 public static final String CLOSE_HOST = "close_host_device_id"; // 关闭主机的设备id public static final String OPEN_VALVE = "open_valve_device_id"; // 开启蝶阀的设备id + public static final CharSequence CHILLERS = "chillers"; + public static final CharSequence OTHER = "other"; + public static final CharSequence DEVICE = "devices"; + public static final String CHILLERS_TYPE = "0"; // 主机类型设备 + public static final String OTHER_TYPE = "1"; // 其他设备 public static boolean CONTROL_WEB_FLAG = false; public static boolean SEND_STATUS = false; // 指令发送状态 public static boolean FLAG = false; diff --git a/mh-common/src/main/java/com/mh/common/constant/TopicEnum.java b/mh-common/src/main/java/com/mh/common/constant/TopicEnum.java index 8569098..022de21 100644 --- a/mh-common/src/main/java/com/mh/common/constant/TopicEnum.java +++ b/mh-common/src/main/java/com/mh/common/constant/TopicEnum.java @@ -15,7 +15,7 @@ import static com.mh.common.constant.TopicConst.*; public enum TopicEnum { /** - * 客户端主动上报数据 + * 冷水机组客户端主动上报数据 */ CLIENT_UPLOAD_DATA(Pattern.compile("^" + MH_UPLOAD + EVENTS_UPLOAD + REGEX_SN + "$"), ChannelName.EVENTS_UPLOAD_INBOUND), diff --git a/mh-common/src/main/java/com/mh/common/core/domain/ColumnData.java b/mh-common/src/main/java/com/mh/common/core/domain/ColumnData.java new file mode 100644 index 0000000..e9b0ebd --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/core/domain/ColumnData.java @@ -0,0 +1,42 @@ +package com.mh.common.core.domain; + +import java.util.Arrays; + +/** + * @author LJF + * @version 1.0 + * @project NewZhujiang_Server + * @description 各个设备的数据 + * @date 2023-12-18 09:33:30 + */ +public class ColumnData { + + private String name; + + private String[] value; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String[] getValue() { + return value; + } + + public void setValue(String[] value) { + this.value = value; + } + + @Override + public String toString() { + return "ColumnData{" + + "name='" + name + '\'' + + ", value=" + Arrays.toString(value) + + '}'; + } + +} diff --git a/mh-common/src/main/java/com/mh/common/core/domain/dto/EnergyConsumptionDTO.java b/mh-common/src/main/java/com/mh/common/core/domain/dto/EnergyConsumptionDTO.java new file mode 100644 index 0000000..b467f61 --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/core/domain/dto/EnergyConsumptionDTO.java @@ -0,0 +1,68 @@ +package com.mh.common.core.domain.dto; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * @author LJF + * @version 1.0 + * @project NewZhujiang_Server + * @description 返回能耗分析的前端值 + * @date 2023/7/18 15:07:55 + */ +public class EnergyConsumptionDTO implements Serializable { + + private static final long serialVersionUID = 10L; + + private String[] titles; + + private String[] times; + + private String[] lineTimes; + + private List> data; + + public String[] getTitles() { + return titles; + } + + public void setTitles(String[] titles) { + this.titles = titles; + } + + public String[] getTimes() { + return times; + } + + public void setTimes(String[] times) { + this.times = times; + } + + public String[] getLineTimes() { + return lineTimes; + } + + public void setLineTimes(String[] lineTimes) { + this.lineTimes = lineTimes; + } + + public List> getData() { + return data; + } + + public void setData(List> data) { + this.data = data; + } + + @Override + public String toString() { + return "EnergyConsumptionDTO{" + + "titles=" + Arrays.toString(titles) + + ", times=" + Arrays.toString(times) + + ", data=" + data + + '}'; + } + +} diff --git a/mh-common/src/main/java/com/mh/common/core/domain/entity/ChillersEntity.java b/mh-common/src/main/java/com/mh/common/core/domain/entity/ChillersEntity.java new file mode 100644 index 0000000..f9e2153 --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/core/domain/entity/ChillersEntity.java @@ -0,0 +1,55 @@ +package com.mh.common.core.domain.entity; + +import lombok.Data; +import org.springframework.format.annotation.DateTimeFormat; + +import java.io.Serializable; +import java.util.Date; + +/** + * @author LJF + * @title : + * @description :冷水机组设备实体类 + * @updateTime 2020-05-20 + * @throws : + */ +@Data +public class ChillersEntity implements Serializable { + + static final long serialVersionUID = 42L; + + private Long id; + + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date createTime; + + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date updateTime; + + private int register_id; + private String deviceCode; // 设备代码 + private String deviceNum; // 设备码 + private String collectionNum; // 冷水机组采集地址 + private String registerAddress; // 寄存器地址 + private String registerName; // 寄存器名称 + private String funCode; // 功能码 + private int digit; // 保留位数 + private String otherName; // 别名 + private int grade; // 标志 1:冷水机组设备 2:ddc设备采集参数 3:控制指令类型 4:故障类型 + private String lastValue; // 最新采集数据 +// @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private String lastTime; // 最新采集时间 +// @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") +// private Date localTime; // 本地采集时间 + private String dataCom; // 采集端口号 + private String ddcAddr; // DDC地址 + private String deviceType; + + // update by ljf on 2020-05-26 添加网络管理器的IP和采集的端口号 + private String IP; // IP地址 + private int port; // 端口号 + + private Integer paramType; // 参数类型 + + +} diff --git a/mh-common/src/main/java/com/mh/common/core/domain/entity/CollectionParamsManage.java b/mh-common/src/main/java/com/mh/common/core/domain/entity/CollectionParamsManage.java index 728c8e9..dbc42f0 100644 --- a/mh-common/src/main/java/com/mh/common/core/domain/entity/CollectionParamsManage.java +++ b/mh-common/src/main/java/com/mh/common/core/domain/entity/CollectionParamsManage.java @@ -1,11 +1,13 @@ package com.mh.common.core.domain.entity; +import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableName; import com.mh.common.core.domain.BaseEntity; import org.apache.commons.lang3.builder.ToStringBuilder; import java.math.BigDecimal; import java.util.Date; +import java.util.Map; /** * @author LJF @@ -55,7 +57,7 @@ public class CollectionParamsManage extends BaseEntity { /** * 仪表标识码 */ - private String identityCode; + private String identifyCode; /** * 仪表标定脉冲 @@ -65,7 +67,7 @@ public class CollectionParamsManage extends BaseEntity { /** * 仪表范围 */ - private String mtRange; + private BigDecimal mtRange; /** * 仪表比率 @@ -153,6 +155,32 @@ public class CollectionParamsManage extends BaseEntity { */ private Integer grade; + @TableField(exist = false) + private String searchValue; + + @TableField(exist = false) + private Map params; + + @Override + public Map getParams() { + return params; + } + + @Override + public void setParams(Map params) { + this.params = params; + } + + @Override + public String getSearchValue() { + return searchValue; + } + + @Override + public void setSearchValue(String searchValue) { + this.searchValue = searchValue; + } + public Integer getGrade() { return grade; } @@ -225,12 +253,12 @@ public class CollectionParamsManage extends BaseEntity { this.funcCode = funcCode; } - public String getIdentityCode() { - return identityCode; + public String getIdentifyCode() { + return identifyCode; } - public void setIdentityCode(String identityCode) { - this.identityCode = identityCode; + public void setIdentifyCode(String identifyCode) { + this.identifyCode = identifyCode; } public String getMtCaliberPulse() { @@ -241,11 +269,11 @@ public class CollectionParamsManage extends BaseEntity { this.mtCaliberPulse = mtCaliberPulse; } - public String getMtRange() { + public BigDecimal getMtRange() { return mtRange; } - public void setMtRange(String mtRange) { + public void setMtRange(BigDecimal mtRange) { this.mtRange = mtRange; } @@ -379,7 +407,7 @@ public class CollectionParamsManage extends BaseEntity { .append("mtCode", mtCode) .append("registerAddr", registerAddr) .append("funcCode", funcCode) - .append("identityCode", identityCode) + .append("identifyCode", identifyCode) .append("mtCaliberPulse", mtCaliberPulse) .append("mtRange", mtRange) .append("mtRatio", mtRatio) @@ -399,7 +427,8 @@ public class CollectionParamsManage extends BaseEntity { .append("isUse", isUse) .append("otherName", otherName) .append("grade", grade) + .append("searchValue", searchValue) + .append("params", params) .toString(); } - } diff --git a/mh-common/src/main/java/com/mh/common/core/domain/entity/ConsumptionAnalyzeEntity.java b/mh-common/src/main/java/com/mh/common/core/domain/entity/ConsumptionAnalyzeEntity.java new file mode 100644 index 0000000..e2acc7d --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/core/domain/entity/ConsumptionAnalyzeEntity.java @@ -0,0 +1,106 @@ +package com.mh.common.core.domain.entity; + +import java.io.Serializable; + +/** + * @author LJF + * @version 1.0 + * @project NewZhujiang_Server + * @description 能耗分析 + * @date 2023/7/18 17:16:51 + */ +public class ConsumptionAnalyzeEntity implements Serializable { + + private static final long serialVersionUID = 10L; + + /** + * 当前值 + */ + private String curValue; + + /** + * 上一个值 + */ + private String lastValue; + + /** + * 设备对象 + */ + private String deviceType; + + /** + * 时间 + */ + private String timeStr; + + /** + * 同比 + */ + private String yoy; + + /** + * 环比 + */ + private String mom; + + public String getCurValue() { + return curValue; + } + + public void setCurValue(String curValue) { + this.curValue = curValue; + } + + public String getLastValue() { + return lastValue; + } + + public void setLastValue(String lastValue) { + this.lastValue = lastValue; + } + + public String getDeviceType() { + return deviceType; + } + + public void setDeviceType(String deviceType) { + this.deviceType = deviceType; + } + + public String getTimeStr() { + return timeStr; + } + + public void setTimeStr(String timeStr) { + this.timeStr = timeStr; + } + + public String getYoy() { + return yoy; + } + + public void setYoy(String yoy) { + this.yoy = yoy; + } + + public String getMom() { + return mom; + } + + public void setMom(String mom) { + this.mom = mom; + } + + @Override + public String toString() { + return "ConsumptionAnalyzeEntity{" + + "curValue='" + curValue + '\'' + + ", lastValue='" + lastValue + '\'' + + ", deviceType='" + deviceType + '\'' + + ", timeStr='" + timeStr + '\'' + + ", yoy='" + yoy + '\'' + + ", mom='" + mom + '\'' + + '}'; + } + +} diff --git a/mh-common/src/main/java/com/mh/common/core/domain/entity/DeviceReportEntity.java b/mh-common/src/main/java/com/mh/common/core/domain/entity/DeviceReportEntity.java new file mode 100644 index 0000000..7562e16 --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/core/domain/entity/DeviceReportEntity.java @@ -0,0 +1,43 @@ +package com.mh.common.core.domain.entity; + +import lombok.Data; +import org.springframework.format.annotation.DateTimeFormat; + +import java.io.Serializable; +import java.time.LocalDateTime; +import java.util.Date; + +/** + * @Author : Rainbow + * @date : 2023/6/16 + */ +@Data +public class DeviceReportEntity implements Serializable { + + static final long serialVersionUID = 42L; + + private Long id; + + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date createTime; + + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date updateTime; + private String deviceNum; + private String deviceCode; + private String deviceType; + private String lastValue; + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date lastTime; + private String curValue; + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date curTime; + private LocalDateTime localDateTime; + private String usedValue; + /** + * 倍率 + */ + private int ratio; + private String calcValue; + private int grade; +} diff --git a/mh-common/src/main/java/com/mh/common/core/redis/RedisCache.java b/mh-common/src/main/java/com/mh/common/core/redis/RedisCache.java index 4f6f455..cf24b91 100644 --- a/mh-common/src/main/java/com/mh/common/core/redis/RedisCache.java +++ b/mh-common/src/main/java/com/mh/common/core/redis/RedisCache.java @@ -1,11 +1,10 @@ package com.mh.common.core.redis; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.lang.reflect.Type; +import java.util.*; import java.util.concurrent.TimeUnit; + +import com.alibaba.fastjson2.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.BoundSetOperations; import org.springframework.data.redis.core.HashOperations; @@ -153,6 +152,24 @@ public class RedisCache return redisTemplate.opsForList().range(key, 0, -1); } + public List getCacheList(final String key, Class clazz) { + // 从 Redis 中获取数据 + List objects = redisTemplate.opsForList().range(key, 0, -1); + List list = new ArrayList<>(); + + if (objects != null && !objects.isEmpty()) { + for (Object obj : objects) { + // 将对象转换为字符串 + String jsonString = JSONObject.toJSONString(obj); + // 解析 JSON 字符串为目标类型 + T object = JSONObject.parseObject(jsonString, clazz); + list.add(object); + } + } + return list; + } + + /** * 缓存Set * diff --git a/mh-common/src/main/java/com/mh/common/utils/EnergyThreadPoolService.java b/mh-common/src/main/java/com/mh/common/utils/EnergyThreadPoolService.java new file mode 100644 index 0000000..6da2aae --- /dev/null +++ b/mh-common/src/main/java/com/mh/common/utils/EnergyThreadPoolService.java @@ -0,0 +1,48 @@ +package com.mh.common.utils; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * @author LJF + * @title : 单例的线程池 + * @description 使用静态内部类进行创建,专门解析接收到的报文数据 + * @updateTime 2020-12-09 + * @throws : + */ +public class EnergyThreadPoolService { + + /** 线程池保持ALIVE状态线程数 */ + public static final int CORE_POOL_SIZE = 10; + + /** 线程池最大线程数 */ + public static final int MAX_POOL_SIZE = 50; + + /** 空闲线程回收时间 */ + public static final int KEEP_ALIVE_TIME = 30000; + + /** 线程池等待队列 */ + public static final int BLOCKING_QUEUE_SIZE = 1000; + + // 私有化构造器 + private EnergyThreadPoolService(){} + + // 对外访问的公共方法 + public static ThreadPoolExecutor getInstance() { + return ThreadPoolServiceHolder.instance; + } + + //写一个静态内部类,里面实例化外部类 + private static class ThreadPoolServiceHolder { + private static final ThreadPoolExecutor instance = new ThreadPoolExecutor( + CORE_POOL_SIZE, // 线程池保持存活的线程数 + MAX_POOL_SIZE, // 最大线程数 + KEEP_ALIVE_TIME, // 空闲线程回收时间 + TimeUnit.MICROSECONDS, // 单位 + new LinkedBlockingQueue<>(BLOCKING_QUEUE_SIZE), // 线程队列 + new ThreadPoolExecutor.AbortPolicy() // 线程池对拒绝任务的处理策略 + ); + } + +} diff --git a/mh-framework/pom.xml b/mh-framework/pom.xml index abe19bc..0527bf9 100644 --- a/mh-framework/pom.xml +++ b/mh-framework/pom.xml @@ -59,6 +59,11 @@ mh-system + + org.springframework.boot + spring-boot-starter-amqp + + \ No newline at end of file diff --git a/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java b/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java new file mode 100644 index 0000000..b6bdfac --- /dev/null +++ b/mh-framework/src/main/java/com/mh/framework/dealdata/DataProcessService.java @@ -0,0 +1,109 @@ +package com.mh.framework.dealdata; + +import com.mh.common.core.domain.entity.ChillersEntity; +import com.mh.common.core.domain.entity.DeviceReportEntity; +import com.mh.common.model.request.AdvantechReceiver; + +import java.util.List; +import java.util.Map; + +/** + * 数据解析,处理,入库操作 + * @Author : Rainbow + * @date : 2023/5/26 + */ +public interface DataProcessService { + /** + * 主机数据解析入库 + * @param data 未解析数据 + */ + void insertChillerData(AdvantechReceiver data); + + /** + * 其他数据解析入库 + * @param data 未解析数据 + */ + void insertOtherData(AdvantechReceiver data); + + /** + * 电表等其他设备数据入库 + * @param data + */ + void insertDeviceData(AdvantechReceiver data); + + /** + * 查询初始值 + * @param deviceNum + * @return + */ + String queryInitValue(String deviceNum); + + /** + * 查询上一次采集数据、时间等参数 + * @param deviceNum + * @param type min hour day month year + * @return + */ + DeviceReportEntity queryLastValue(String deviceNum,String type); + + /** + * 批量插入data_min表中 + * @param dataMinList + */ + void insertDatabase(List dataMinList); + + /** + * 查询data_ + type + year 表中未处理的数据 + * @param type min hour day month year + * @return + */ + Map queryUntreatedData(String type); + + /** + * 批量插入报表中 + * @param hourList 报表数据 + * @param type 报表类型 + */ + void batchInsertTable(List hourList, String type); + + + /** + * 根据Id批量修改grade = 1 + * 需要根据IdMap来判断修改哪个表的id + * @param idMap 主键ID集合 + * @param type 报表类型 + */ + void batchUpdateGrade(Map> idMap, String type); + + /** + * 批量插入或者更新 + * @param dataList 数据集 + * @param tableType + */ + void batchInsertOrUpdate(List dataList, String tableType); + + /** + * 查询倍率 + * @param deviceNum + * @return + */ + Integer queryRatio(String deviceNum); + + /** + * 插入主机历史流水表 + * @param cacheList + */ + void insertChillerReport(List cacheList); + + /** + * 批量更新运行时长 + * @param runTimeList + */ + void batchUpdateRunTime(List> runTimeList); + + /** + * 计算COP + * @param time + */ + void calculateCopByTime(String time); +} diff --git a/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java new file mode 100644 index 0000000..1678581 --- /dev/null +++ b/mh-framework/src/main/java/com/mh/framework/dealdata/impl/DataProcessServiceImpl.java @@ -0,0 +1,206 @@ +package com.mh.framework.dealdata.impl; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.TypeReference; +import com.mh.common.constant.Constants; +import com.mh.common.core.domain.entity.ChillersEntity; +import com.mh.common.core.domain.entity.CollectionParamsManage; +import com.mh.common.core.domain.entity.DeviceReportEntity; +import com.mh.common.core.redis.RedisCache; +import com.mh.common.model.request.AdvantechDatas; +import com.mh.common.model.request.AdvantechReceiver; +import com.mh.common.utils.DateUtils; +import com.mh.common.utils.EnergyThreadPoolService; +import com.mh.common.utils.StringUtils; +import com.mh.framework.dealdata.DataProcessService; +import com.mh.system.mapper.device.DatabaseMapper; +import com.mh.system.service.device.ICollectionParamsManageService; +import jakarta.annotation.Resource; +import lombok.Synchronized; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.util.*; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.stream.Collectors; + +/** + * 数据解析处理入库操作 + * + * @Author : Rainbow + * @date : 2023/5/26 + */ +@Service +@Slf4j +public class DataProcessServiceImpl implements DataProcessService { + + /** + * 主机 + */ + private static final String CHILLERS = "CHILLERS"; + + /** + * 计量设备 + */ + private static final String DEVICES = "DEVICES"; + + @Autowired + DatabaseMapper databaseMapper; + + @Autowired + RedisCache redisCache; + + @Autowired + private ICollectionParamsManageService collectionParamsManageService; + + ThreadPoolExecutor threadPoolService = EnergyThreadPoolService.getInstance(); + + @Override + public void insertChillerData(AdvantechReceiver data) { + insertData(data, "CHILLERS_REGISTER", CHILLERS); + } + + @Override + public void insertDeviceData(AdvantechReceiver data) { + insertData(data, "DEVICES_REGISTER", DEVICES); + } + + private void insertData(AdvantechReceiver data, String registerKey, String cacheKey) { + log.info("{}数据解析入库:{}", registerKey.equals("CHILLERS_REGISTER") ? "冷水机组" : "计量设备", data); + if (registerKey.equals("CHILLERS_REGISTER")) { + databaseMapper.createChillerTable(); + } else { + databaseMapper.createDataTable(); + } + ArrayList entities = new ArrayList<>(); + + List registers = redisCache.getCacheList(registerKey, CollectionParamsManage.class); + if (null == registers || registers.isEmpty()) { + if (registerKey.equals("CHILLERS_REGISTER")) { + registers = collectionParamsManageService.queryCollectionParamsByMtType(Constants.CHILLERS_TYPE); + } else { + registers = collectionParamsManageService.queryCollectionParamsByMtType(Constants.OTHER_TYPE); + } + redisCache.setCacheList(registerKey, registers); + } + + String timeString = data.getTs(); + OffsetDateTime offsetDateTime; + try { + offsetDateTime = OffsetDateTime.parse(timeString, DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ")); + } catch (DateTimeParseException e) { + log.error("时间格式解析异常", e); + return; + } + LocalDateTime localDateTime = offsetDateTime.toLocalDateTime(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + String formattedTime = localDateTime.format(formatter); + + // 假设 data 是一个包含 JSON 数据的对象 + List list = JSON.parseObject(data.getD().toString(), new TypeReference>() {}); + for (AdvantechDatas advantechDatas : list) { + String tag = advantechDatas.getTag(); + String value = advantechDatas.getValue(); + log.info("时间: {},tag标签: {},value值: {}", formattedTime, tag, value); + try { + if (StringUtils.isBlank(tag)) { + continue; + } + CollectionParamsManage collectionParamsManage = new CollectionParamsManage(); + + for (CollectionParamsManage entity : registers) { + if (tag.equals(String.valueOf(entity.getOtherName()))) { + collectionParamsManage = entity; + try { + collectionParamsManage.setCurValue(new BigDecimal(value)); + } catch (NumberFormatException e) { + log.error("数值格式解析异常", e); + continue; + } + Date date = Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant()); + collectionParamsManage.setCurTime(date); + break; + } + } + entities.add(collectionParamsManage); + } catch (Exception e) { + log.error("解析异常", e); + } + } + + redisCache.setCacheList(cacheKey, entities); + + threadPoolService.execute(() -> { + collectionParamsManageService.updateCollectionParamsManages(entities); + }); + } + + @Override + public void insertOtherData(AdvantechReceiver data) { + log.info("其他设备数据解析入库:{}", data); + } + + @Override + public String queryInitValue(String deviceNum) { + return ""; + } + + @Override + public DeviceReportEntity queryLastValue(String deviceNum, String type) { + return null; + } + + @Override + public void insertDatabase(List dataMinList) { + + } + + @Override + public Map queryUntreatedData(String type) { + return Map.of(); + } + + @Override + public void batchInsertTable(List hourList, String type) { + + } + + @Override + public void batchUpdateGrade(Map> idMap, String type) { + + } + + @Override + public void batchInsertOrUpdate(List dataList, String tableType) { + + } + + @Override + public Integer queryRatio(String deviceNum) { + return 0; + } + + @Override + public void insertChillerReport(List cacheList) { + + } + + @Override + public void batchUpdateRunTime(List> runTimeList) { + + } + + @Override + public void calculateCopByTime(String time) { + + } +} diff --git a/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttMessageChannel.java b/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttMessageChannel.java index 7ceda6c..e8ba111 100644 --- a/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttMessageChannel.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/config/MqttMessageChannel.java @@ -19,11 +19,20 @@ import org.springframework.messaging.MessageChannel; @Configuration public class MqttMessageChannel { + @Bean(name = ChannelName.OUTBOUND) + public MessageChannel outboundChannel() { + return new DirectChannel(); + } + @Bean(name = ChannelName.INBOUND) public MessageChannel inboundChannel() { return new DirectChannel(); } + /** + * 事件主动上报通道 + * @return + */ @Bean(name = ChannelName.EVENTS_UPLOAD_INBOUND) public MessageChannel eventsUploadInbound() { return new DirectChannel(); diff --git a/mh-framework/src/main/java/com/mh/framework/mqtt/handler/InboundMessageRouter.java b/mh-framework/src/main/java/com/mh/framework/mqtt/handler/InboundMessageRouter.java index 99fb838..48b3e37 100644 --- a/mh-framework/src/main/java/com/mh/framework/mqtt/handler/InboundMessageRouter.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/handler/InboundMessageRouter.java @@ -37,8 +37,8 @@ public class InboundMessageRouter extends AbstractMessageRouter { protected Collection determineTargetChannels(Message message) { MessageHeaders headers = message.getHeaders(); String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString(); - byte[] payload = (byte[]) message.getPayload(); - log.info("从当前主题 topic: {}, 接收到的消息:{}", topic, new String(payload)); +// byte[] payload = (byte[]) message.getPayload(); +// log.info("从当前主题 topic: {}, 接收到的消息:{}", topic, new String(payload)); // 找到对应的主题消息通道 TopicEnum topicEnum = TopicEnum.find(topic); MessageChannel bean = (MessageChannel) SpringUtils.getBean(topicEnum.getBeanName()); diff --git a/mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttGatewayService.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttGatewayService.java index e2cdf50..55fbf7a 100644 --- a/mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttGatewayService.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/IMqttGatewayService.java @@ -14,9 +14,9 @@ import org.springframework.stereotype.Component; * @description 消息网关 * @date 2024-10-30 14:43:55 */ -@Component -@Configuration -@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND) +//@Component +//@Configuration +//@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND) public interface IMqttGatewayService { /** @@ -24,14 +24,14 @@ public interface IMqttGatewayService { * @param topic * @param payload */ - void publish(@Header(MqttHeaders.TOPIC) String topic, String payload, @Header(MqttHeaders.QOS) int qos); + void publish(String topic, String payload, int qos); /** * 发送消息 * @param topic * @param payload */ - void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload); + void publish(String topic, byte[] payload); /** * 发送消息并带上qos @@ -39,5 +39,5 @@ public interface IMqttGatewayService { * @param payload * @param qos */ - void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload, @Header(MqttHeaders.QOS) int qos); + void publish(String topic, byte[] payload, int qos); } diff --git a/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java index adc35fc..81cd0b9 100644 --- a/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/EventsServiceImpl.java @@ -1,9 +1,13 @@ package com.mh.framework.mqtt.service.impl; +import com.alibaba.fastjson2.JSONObject; import com.fasterxml.jackson.databind.ObjectMapper; import com.mh.common.constant.ChannelName; +import com.mh.common.constant.Constants; import com.mh.common.model.request.AdvantechDatas; +import com.mh.common.model.request.AdvantechReceiver; import com.mh.framework.mqtt.service.IEventsService; +import com.mh.framework.rabbitmq.producer.SendMsgByTopic; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -12,6 +16,7 @@ import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Service; import java.io.IOException; +import java.util.Objects; /** * @author LJF @@ -27,22 +32,31 @@ public class EventsServiceImpl implements IEventsService { @Autowired private ObjectMapper mapper; + @Autowired + private SendMsgByTopic sendMsgByTopic; + @ServiceActivator(inputChannel = ChannelName.EVENTS_UPLOAD_INBOUND) @Override public void handleInboundUpload(byte[] receiver, MessageHeaders headers) { - handleInboundData(receiver, "主动上报数据"); + // 获取当前的主题 + String topic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); + handleInboundData(receiver, topic, "主动上报数据"); } @ServiceActivator(inputChannel = ChannelName.EVENTS_COLLECTION_INBOUND) @Override public void handleInboundCollection(byte[] receiver, MessageHeaders headers) { - handleInboundData(receiver, "主动下发采集数据"); + // 获取当前的主题 + String topic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); + handleInboundData(receiver, topic, "主动下发采集数据"); } @ServiceActivator(inputChannel = ChannelName.EVENTS_CONTROL_INBOUND) @Override public void handleInboundControl(byte[] receiver, MessageHeaders headers) { - handleInboundData(receiver, "控制指令下发"); + // 获取当前的主题 + String topic = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString(); + handleInboundData(receiver, topic, "控制指令下发"); } @ServiceActivator(inputChannel = ChannelName.EVENTS_SEND_INBOUND) @@ -52,10 +66,22 @@ public class EventsServiceImpl implements IEventsService { log.info("接收到控制指令下发=>{}", sendStr); } - private void handleInboundData(byte[] receiver, String logMessage) { + private void handleInboundData(byte[] receiver,String topic, String logMessage) { try { - AdvantechDatas commonTopicReceiver = mapper.readValue(receiver, AdvantechDatas.class); + AdvantechReceiver commonTopicReceiver = mapper.readValue(receiver, AdvantechReceiver.class); log.info("{}: {}", logMessage, commonTopicReceiver); + // 接入消息队列,利用消息对接进行数据处理 + // 判断当前主题属于哪种主动上报数据 + if (topic.contains(Constants.CHILLERS)) { + sendMsgByTopic.sendToChillerMQ(JSONObject.toJSONString(commonTopicReceiver)); + } else if (topic.contains(Constants.OTHER)) { + sendMsgByTopic.sendToOtherMQ(JSONObject.toJSONString(commonTopicReceiver)); + } else if (topic.contains(Constants.DEVICE)) { + sendMsgByTopic.sendToDeviceMQ(JSONObject.toJSONString(commonTopicReceiver)); + } else { + // 非本地主题处理 + log.info("非本地主题处理: {}", topic); + } } catch (IOException e) { log.error("处理数据时发生错误: ", e); } diff --git a/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttGatewayServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttGatewayServiceImpl.java new file mode 100644 index 0000000..9ab6fb2 --- /dev/null +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttGatewayServiceImpl.java @@ -0,0 +1,54 @@ +package com.mh.framework.mqtt.service.impl; + +import com.mh.framework.mqtt.service.IMqttGatewayService; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.MessageChannel; +import org.springframework.stereotype.Service; + +/** + * @author LJF + * @version 1.0 + * @project EEMCS + * @description 网关实现类 + * @date 2025-02-07 08:44:55 + */ +@Service +public class MqttGatewayServiceImpl implements IMqttGatewayService { + + private final MessageChannel outboundChannel; + + public MqttGatewayServiceImpl(@Qualifier("outbound") MessageChannel outboundChannel) { + this.outboundChannel = outboundChannel; + } + + @Override + public void publish(String topic, String payload, int qos) { + outboundChannel.send( + MessageBuilder + .withPayload(payload) + .setHeader(MqttHeaders.TOPIC, topic) + .setPriority(qos) + .build()); + } + + @Override + public void publish(String topic, byte[] payload) { + outboundChannel.send( + MessageBuilder + .withPayload(payload) + .setHeader(MqttHeaders.TOPIC, topic) + .build()); + } + + @Override + public void publish(String topic, byte[] payload, int qos) { + outboundChannel.send( + MessageBuilder + .withPayload(payload) + .setHeader(MqttHeaders.TOPIC, topic) + .setPriority(qos) + .build()); + } +} diff --git a/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttMsgSenderServiceImpl.java b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttMsgSenderServiceImpl.java index f4bddd5..855be91 100644 --- a/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttMsgSenderServiceImpl.java +++ b/mh-framework/src/main/java/com/mh/framework/mqtt/service/impl/MqttMsgSenderServiceImpl.java @@ -6,9 +6,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.mh.common.constant.TopicConst; import com.mh.common.model.response.CommonTopicResponse; import com.mh.common.model.response.ServiceReply; -import com.mh.common.utils.spring.SpringUtils; import com.mh.framework.mqtt.service.IMqttGatewayService; import com.mh.framework.mqtt.service.IMqttMsgSenderService; +import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -28,16 +28,13 @@ import java.util.UUID; @Service public class MqttMsgSenderServiceImpl implements IMqttMsgSenderService { - private final IMqttGatewayService iMqttGatewayService; - @Autowired - public MqttMsgSenderServiceImpl(IMqttGatewayService myGateway) { - this.iMqttGatewayService = myGateway; - } + private IMqttGatewayService mqttGatewayService; @Autowired private ObjectMapper mapper; + /** * 发布,默认qos为0,非持久化 * @@ -48,7 +45,7 @@ public class MqttMsgSenderServiceImpl implements IMqttMsgSenderService { public void publish(String topic, String pushMessage) { synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充 try { - iMqttGatewayService.publish(topic, pushMessage, 0); + mqttGatewayService.publish(topic, pushMessage, 0); log.info("发送主题:{},消息:{}", topic, pushMessage); } catch (Exception e) { log.error("发送主题异常:{},消息:{}", topic, pushMessage, e); @@ -66,7 +63,7 @@ public class MqttMsgSenderServiceImpl implements IMqttMsgSenderService { public void publish(String topic, int qos, CommonTopicResponse response) { try { log.info("发送主题:{},消息:{}", topic, response.toString()); - iMqttGatewayService.publish(topic, mapper.writeValueAsBytes(response), qos); + mqttGatewayService.publish(topic, mapper.writeValueAsBytes(response), qos); } catch (JsonProcessingException e) { log.error("发送主题:{},消息:{}", topic, response.toString(), e); throw new RuntimeException(e); diff --git a/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java b/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java new file mode 100644 index 0000000..fa5cc38 --- /dev/null +++ b/mh-framework/src/main/java/com/mh/framework/rabbitmq/RabbitMqConfig.java @@ -0,0 +1,104 @@ +package com.mh.framework.rabbitmq; + +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @Author : Rainbow + * @date : 2023/5/26 + */ +@Configuration +public class RabbitMqConfig { + /**交换机*/ + public static final String EXCHANGE_NAME = "exchange_eemcs"; + /**主机队列*/ + public static final String QUEUE_CHILLER = "queue_chiller"; + /**主机routing-key*/ + public static final String ROUTING_KEY_CHILLER = "topic.chillers.eemcs.#"; + /**电表、冷量计等设备队列*/ + public static final String QUEUE_DEVICES = "queue_devices"; + /**电表冷量计routing-key*/ + public static final String ROUTING_KEY_DEVICES = "topic.devices.eemcs.#"; + /**其他设备队列*/ + public static final String QUEUE_OTHER = "queue_other"; + /**DDC routing-key*/ + public static final String ROUTING_KEY_OTHER = "topic.other.eemcs.#"; + + /**durable参数表示交换机是否持久化,值为true表示持久化,值为false表示不持久化。 + * 在RabbitMQ中,持久化交换机会被存储在磁盘上以便在服务器重启后恢复, + * 而非持久化交换机则只存在于内存中,服务器重启后会丢失*/ + /**声明交换机*/ + @Bean(EXCHANGE_NAME) + public Exchange exchange(){ + return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); + } + + /**主机队列*/ + @Bean(QUEUE_CHILLER) + public Queue chillerQueue(){ + return new Queue(QUEUE_CHILLER); + } + + /**主机队列绑定交换机*/ + @Bean(ROUTING_KEY_CHILLER) + public Binding chillerBinding(@Qualifier(QUEUE_CHILLER) Queue queue, + @Qualifier(EXCHANGE_NAME) Exchange exchange){ + return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_CHILLER).noargs(); + } + + /**计量设备队列*/ + @Bean(QUEUE_DEVICES) + public Queue devicesQueue(){ + return new Queue(QUEUE_DEVICES); + } + + /**计量设备绑定交换机*/ + @Bean(ROUTING_KEY_DEVICES) + public Binding deviceBinding(@Qualifier(QUEUE_DEVICES) Queue queue, + @Qualifier(EXCHANGE_NAME) Exchange exchange){ + return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_DEVICES).noargs(); + } + + /**其他设备队列*/ + @Bean(QUEUE_OTHER) + public Queue otherQueue(){ + return new Queue(QUEUE_OTHER); + } + + /**其他队列绑定交换机*/ + @Bean(ROUTING_KEY_OTHER) + public Binding otherBinding(@Qualifier(QUEUE_OTHER) Queue queue, + @Qualifier(EXCHANGE_NAME) Exchange exchange){ + return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_OTHER).noargs(); + } + + /** + * 默认消费者数量1 + * setConcurrentConsumers(10); + * 最大消费者数量,默认1 + * maxConcurrentConsumers + * prefetchCount 每个消费者可以拉取的消息,默认250 + * defaultRequeueRejected 未处理消息是否回归队列,true不重新加入,直接丢弃。false重新加入,默认false + * acknowledgeMode 确认模式 默认为自动确认 + * @param connectionFactory + * @return + */ + @Bean + @ConditionalOnClass + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + factory.setConnectionFactory(connectionFactory); + factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); + factory.setPrefetchCount(5); + factory.setConcurrentConsumers(5); + factory.setMaxConcurrentConsumers(10); + factory.setDefaultRequeueRejected(true); + return factory; + } + +} diff --git a/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java b/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java new file mode 100644 index 0000000..43bde35 --- /dev/null +++ b/mh-framework/src/main/java/com/mh/framework/rabbitmq/consumer/ReceiveHandler.java @@ -0,0 +1,136 @@ +package com.mh.framework.rabbitmq.consumer; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.mh.common.model.request.AdvantechReceiver; +import com.mh.framework.dealdata.DataProcessService; +import com.mh.framework.rabbitmq.RabbitMqConfig; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.apache.poi.ss.formula.functions.T; +import org.springframework.amqp.core.ExchangeTypes; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * @Author : ljf + * @date : 2025-02-07 + */ +@Component +@Slf4j +public class ReceiveHandler { + + @Autowired + DataProcessService dataProcessService; + + /** + * 监听主机参数 + * queues:指定监听的队列名,可以接收单个队列,也可以接收多个队列的数组或列表。 + * containerFactory:指定使用的监听器容器工厂,通常为SimpleRabbitListenerContainerFactory或其子类的实例,也可以使用自定义的工厂类。 + * concurrency:并发处理消息的数量,即开启几个线程来处理消息,默认为1个线程。 与配置SimpleRabbitListenerContainerFactory的setConcurrentConsumers一致 + * priority:消息的消费者优先级,默认为0,可以在MQ的 x-max-priority 参数打开的情况下使用,来指定消息的优先级。 + * exclusive:是否为独占消费,即限制该队列只能由一个消费者进行消费,设置为true时表示是独占消费,默认为false。 + * bindings:指定队列的绑定关系,即交换机和路由键,可以传入多个Binding对象,也可以使用@QueueBinding注解并传入多个@QueueBinding注解实例。 + * id:指定这个监听器容器的唯一标识,必须显式地设置它以启用有状态的 RabbitListenerContainerFactory 实现,以进行更高效的重启尝试,默认为空。 + */ + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = RabbitMqConfig.QUEUE_CHILLER, durable = "true"), + exchange = @Exchange( + value = RabbitMqConfig.EXCHANGE_NAME, + ignoreDeclarationExceptions = "true", + type = ExchangeTypes.TOPIC + ), + key = {RabbitMqConfig.ROUTING_KEY_CHILLER})) + public void receiveChillerData(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException, InterruptedException { + + try { + //TODO 开启多线程处理主机数据,如果不通过线程池开启线程来处理, + // 设置SimpleRabbitListenerContainerFactory中的setConcurrentConsumers(10)数量也可以实现多线程处理 + log.info("MQ消费者:主机数据采集:{}", msg); + //TODO 数据解析入库操作 msg转成实体类,入库 + AdvantechReceiver chillerData = JSONObject.parseObject(msg, AdvantechReceiver.class); + dataProcessService.insertChillerData(chillerData); + channel.basicAck(tag, false); + } catch (Exception e) { + //false不进入队列,丢弃,true则回到队列 + log.error("data:{},chillersException:{}", msg, e); + Thread.sleep(100); + channel.basicReject(tag, false); + } + + } + + /** + * 处理电表冷量计等数据 + * + * @param msg + * @param channel + * @param tag + */ + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = RabbitMqConfig.QUEUE_DEVICES, durable = "true"), + exchange = @Exchange( + value = RabbitMqConfig.EXCHANGE_NAME, + ignoreDeclarationExceptions = "true", + type = ExchangeTypes.TOPIC + ), + key = {RabbitMqConfig.ROUTING_KEY_DEVICES} + )) + public void receiveDeviceData(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws InterruptedException, IOException { + try { + //TODO 处理电表等数据 + log.info("MQ消费者:计量设备数据采集:{}", msg); + //TODO 数据解析入库操作 msg转成实体类,入库 + AdvantechReceiver deviceData = JSONObject.parseObject(msg, AdvantechReceiver.class); + dataProcessService.insertDeviceData(deviceData); + // 正常执行,手动确认ack + channel.basicAck(tag, false); + } catch (Exception e) { + log.error("data:{},deviceException:{}", msg, e); + Thread.sleep(100); + channel.basicAck(tag, false); + + } + } + + /** + * 处理其他设备数据 + * + * @param msg + * @param channel + * @param tag + */ + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = RabbitMqConfig.QUEUE_OTHER, durable = "true"), + exchange = @Exchange( + value = RabbitMqConfig.EXCHANGE_NAME, + ignoreDeclarationExceptions = "true", + type = ExchangeTypes.TOPIC + ), + key = {RabbitMqConfig.ROUTING_KEY_OTHER} + )) + public void receiveOtherData(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws InterruptedException, IOException { + try { + log.info("MQ消费者:其他设备采集:{}", msg); + //TODO 数据解析入库操作 msg转成实体类,入库 + AdvantechReceiver OtherData = JSONObject.parseObject(msg, AdvantechReceiver.class); + dataProcessService.insertOtherData(OtherData); + // 正常执行,手动确认ack + channel.basicAck(tag, false); + } catch (Exception e) { + log.error("data:{},ddcException:{}", msg, e); + Thread.sleep(100); + channel.basicAck(tag, false); + } + } + +} diff --git a/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java b/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java new file mode 100644 index 0000000..5cc2535 --- /dev/null +++ b/mh-framework/src/main/java/com/mh/framework/rabbitmq/producer/SendMsgByTopic.java @@ -0,0 +1,37 @@ +package com.mh.framework.rabbitmq.producer; + +import com.mh.common.model.request.AdvantechReceiver; +import com.mh.framework.rabbitmq.RabbitMqConfig; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * @Author : ljf + * @date : 2025-02-07 + */ +@Component +@Slf4j +public class SendMsgByTopic { + @Autowired + RabbitTemplate rabbitTemplate; + + /**主机数据报文注入rabbitmq*/ + public String sendToChillerMQ(String data){ + rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"topic.chillers.eemcs.data",data); + return "success"; + } + + /**电表等数据注入rabbitmq*/ + public String sendToDeviceMQ(String data){ + rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"topic.devices.eemcs.data",data); + return "success"; + } + + /**ddc等数据注入rabbitmq*/ + public String sendToOtherMQ(String data) { + rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"topic.other.eemcs.data",data); + return "success"; + } +} diff --git a/mh-system/src/main/java/com/mh/system/mapper/device/CollectionParamsManageMapper.java b/mh-system/src/main/java/com/mh/system/mapper/device/CollectionParamsManageMapper.java index fe523b5..eabaeca 100644 --- a/mh-system/src/main/java/com/mh/system/mapper/device/CollectionParamsManageMapper.java +++ b/mh-system/src/main/java/com/mh/system/mapper/device/CollectionParamsManageMapper.java @@ -46,4 +46,8 @@ public interface CollectionParamsManageMapper extends BaseMapper" + "") List getDeviceByOther(@Param("deviceNum") String deviceNum); + + @Select("select count(1) from collection_params_manage where grade >= #{grade} and grade < #{grade}+20 and mt_type like concat('%',#{mtType},'%') ") + int selectSummary(@Param("grade") int grade, + @Param("mtType") String deviceType); } diff --git a/mh-system/src/main/java/com/mh/system/mapper/device/DatabaseMapper.java b/mh-system/src/main/java/com/mh/system/mapper/device/DatabaseMapper.java new file mode 100644 index 0000000..b98ca20 --- /dev/null +++ b/mh-system/src/main/java/com/mh/system/mapper/device/DatabaseMapper.java @@ -0,0 +1,29 @@ +package com.mh.system.mapper.device; + +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Options; +import org.apache.ibatis.annotations.Select; +import org.apache.ibatis.mapping.StatementType; + +/** + * 数据库操作相关API + * + * @Author : Rainbow + * @date : 2023/6/14 + */ +@Mapper +public interface DatabaseMapper { + + /** + * 查询是否存在设备日月年表,不存在则创建表 + */ + @Select("call public.pro_create_table();") + void createDataTable(); + + /** + * 判断是否存在主机日月年表,不存在则创建表 + */ + @Select("call public.pro_create_chillers_table();") + void createChillerTable(); + +} diff --git a/mh-system/src/main/java/com/mh/system/mapper/device/DeviceLedgerMapper.java b/mh-system/src/main/java/com/mh/system/mapper/device/DeviceLedgerMapper.java index 18d2c61..e226e14 100644 --- a/mh-system/src/main/java/com/mh/system/mapper/device/DeviceLedgerMapper.java +++ b/mh-system/src/main/java/com/mh/system/mapper/device/DeviceLedgerMapper.java @@ -3,6 +3,8 @@ package com.mh.system.mapper.device; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.mh.common.core.domain.entity.DeviceLedger; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Update; /** * @author LJF @@ -13,4 +15,7 @@ import org.apache.ibatis.annotations.Mapper; */ @Mapper public interface DeviceLedgerMapper extends BaseMapper { + + @Update("update device_ledger set status = 0 where id = #{id}") + void updateOnlineStatus(@Param("id") String deviceLedgerId); } diff --git a/mh-system/src/main/java/com/mh/system/mapper/device/GatewayManageMapper.java b/mh-system/src/main/java/com/mh/system/mapper/device/GatewayManageMapper.java index 8aa6063..d1cb6d6 100644 --- a/mh-system/src/main/java/com/mh/system/mapper/device/GatewayManageMapper.java +++ b/mh-system/src/main/java/com/mh/system/mapper/device/GatewayManageMapper.java @@ -3,6 +3,8 @@ package com.mh.system.mapper.device; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.mh.common.core.domain.entity.GatewayManage; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Update; /** * @author LJF @@ -13,4 +15,7 @@ import org.apache.ibatis.annotations.Mapper; */ @Mapper public interface GatewayManageMapper extends BaseMapper { + + @Update("update gateway_manage set status = 0 where id = #{gatewayId}") + void updateOnlineStatus(@Param("gatewayId") String gatewayId); } diff --git a/mh-system/src/main/java/com/mh/system/mapper/energy/EnergyAnalyzeMapper.java b/mh-system/src/main/java/com/mh/system/mapper/energy/EnergyAnalyzeMapper.java index 68104fc..eaec735 100644 --- a/mh-system/src/main/java/com/mh/system/mapper/energy/EnergyAnalyzeMapper.java +++ b/mh-system/src/main/java/com/mh/system/mapper/energy/EnergyAnalyzeMapper.java @@ -7,6 +7,7 @@ import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; import java.util.List; +import java.util.Map; /** * @author LJF @@ -128,7 +129,7 @@ public interface EnergyAnalyzeMapper { " and dh.cur_time <= #{endTime} " + " and dh.device_num in " + "" + - " #{item.value} " + + " #{item} " + "" + "group by " + " convert(varchar(${len}), " + @@ -142,7 +143,7 @@ public interface EnergyAnalyzeMapper { @Param("lastTableName") String lastTableName, @Param("curTableName") String curTableName, @Param("len") String dateLen, - @Param("params") List params); + @Param("params") List params); @Select("") + List queryManyTable(@Param("startTime") String startTime, + @Param("endTime") String endTime, + @Param("lastTableName") String lastTableName, + @Param("curTableName") String curTableName, + @Param("len") String dateLen, + @Param("paramType") String paramType); + + /** + * 单表查询操作 + * @param startTime 开始时间 + * @param endTime 结束时间 + * @param lastTableName 上一个表名 + * @param curTableName 当前表名 + * @return + */ + @Select("") + List queryOneTable(@Param("startTime") String startTime, + @Param("endTime") String endTime, + @Param("lastTableName") String lastTableName, + @Param("curTableName") String curTableName, + @Param("len") String dateLen, + @Param("paramType") String paramType); + + /** + * 查询能耗分析同比 + * @param startTime + * @param endTime + * @param lastTableName + * @param curTableName + * @param dateLen + * @param paramType + * @return + */ + List queryManyTableYoy(@Param("startTime") String startTime, + @Param("endTime") String endTime, + @Param("lastTableName") String lastTableName, + @Param("curTableName") String curTableName, + @Param("yoyStartTime") String yoyStartTime, + @Param("yoyEndTime") String yoyEndTime, + @Param("yoyLastTableName") String yoyLastTableName, + @Param("yoyCurTableName") String yoyCurTableName, + @Param("len") String dateLen, + @Param("paramType") String paramType, + @Param("timeType") String timeType); + + + /** + * 查询能耗分析同比 + * @param startTime + * @param endTime + * @param lastTableName + * @param curTableName + * @param yoyStartTime + * @param yoyEndTime + * @param dateLen + * @param paramType + * @param timeType + * @return + */ + List queryOneTableYoy(@Param("startTime") String startTime, + @Param("endTime") String endTime, + @Param("lastTableName") String lastTableName, + @Param("curTableName") String curTableName, + @Param("yoyStartTime") String yoyStartTime, + @Param("yoyEndTime") String yoyEndTime, + @Param("len") String dateLen, + @Param("paramType") String paramType, + @Param("timeType") String timeType); + + /** + * 查询能耗分析环比(查询类型是month,year) + * @param startTime + * @param endTime + * @param yoyStartTime + * @param yoyEndTime + * @param tableName + * @param dateLen + * @param paramType + * @param timeType + * @return + */ + List queryMonthAndYearMom(@Param("startTime") String startTime, + @Param("endTime") String endTime, + @Param("momStartTime") String yoyStartTime, + @Param("momEndTime") String yoyEndTime, + @Param("tableName") String tableName, + @Param("len") String dateLen, + @Param("paramType") String paramType, + @Param("timeType") String timeType); + + /** + * 单表查询能耗分析环比(查询类型是hour,day) + * @param startTime + * @param endTime + * @param yoyStartTime + * @param yoyEndTime + * @param tableName + * @param dateLen + * @param paramType + * @param timeType + * @return + */ + List queryOneTableMom(@Param("startTime") String startTime, + @Param("endTime") String endTime, + @Param("momStartTime") String yoyStartTime, + @Param("momEndTime") String yoyEndTime, + @Param("tableName") String tableName, + @Param("len") String dateLen, + @Param("paramType") String paramType, + @Param("timeType") String timeType); + + /** + * 多表查询能耗分析环比(查询类型是hour,day) + * @param startTime + * @param endTime + * @param yoyStartTime + * @param yoyEndTime + * @param tableName1 上一年的表 + * @param tableName2 今年的表 + * @param dateLen + * @param paramType + * @param timeType + * @return + */ + List queryManyTableMom(@Param("startTime") String startTime, + @Param("endTime") String endTime, + @Param("momStartTime") String yoyStartTime, + @Param("momEndTime") String yoyEndTime, + @Param("tableName1") String tableName1, + @Param("tableName2") String tableName2, + @Param("len") String dateLen, + @Param("paramType") String paramType, + @Param("timeType") String timeType); + + /** + * 查询各个机房设备的用电量(查询月,年) + * @param startTime + * @param endTime + * @param lastTableName + * @param curTableName + * @param dateLen + * @param deviceType + * @return + */ + List queryDeviceOneTable(@Param("startTime") String startTime, + @Param("endTime") String endTime, + @Param("lastTableName") String lastTableName, + @Param("curTableName") String curTableName, + @Param("len") String dateLen, + @Param("deviceType") String deviceType); + + /** + * 查询各个机房设备的用电量(查询小时,天) + * @param startTime + * @param endTime + * @param lastTableName + * @param curTableName + * @param dateLen + * @param deviceType + * @return + */ + List queryDeviceManyTable(@Param("startTime") String startTime, + @Param("endTime") String endTime, + @Param("lastTableName") String lastTableName, + @Param("curTableName") String curTableName, + @Param("len") String dateLen, + @Param("deviceType") String deviceType); + + /** + * 查询系统分析折线图 + * @param curTable + * @param curCopTable + * @param startTime + * @param endTime + * @return + */ + List queryLineDataSysByOne(@Param("curTable") String curTable, + @Param("curCopTable") String curCopTable, + @Param("startTime") String startTime, + @Param("endTime") String endTime); + + /** + * 查询系统分析折线图 + * @param lastTable + * @param curTable + * @param lastCopTable + * @param curCopTable + * @param startTime + * @param endTime + * @return + */ + List queryLineDataSysByMany(@Param("lastTable") String lastTable, + @Param("curTable") String curTable, + @Param("lastCopTable") String lastCopTable, + @Param("curCopTable") String curCopTable, + @Param("startTime") String startTime, + @Param("endTime") String endTime); + + /** + * 查询各个设备的折线图数据 + * @param startTime + * @param endTime + * @param lastTableName + * @param curTableName + * @param deviceType + * @return + */ + List queryDeviceLineManyTable(@Param("startTime") String startTime, + @Param("endTime") String endTime, + @Param("lastTableName") String lastTableName, + @Param("curTableName") String curTableName, + @Param("deviceType") String deviceType); + + @Select("select isnull(sum(calc_value),0) from ${tableName} " + + " where device_type = 'cloud' " + + " and device_code in (select device_code from device_manage where grade >=0 and grade < 10) " + + " and cur_time >= concat(#{timeStr},':00:00') and cur_time <= concat(#{timeStr},':59:59') ") + String queryCoolingData(@Param("tableName") String tableName, @Param("timeStr") String timeString); +} diff --git a/mh-system/src/main/java/com/mh/system/mapper/energy/EnergyQueryMapper.java b/mh-system/src/main/java/com/mh/system/mapper/energy/EnergyQueryMapper.java new file mode 100644 index 0000000..0970a72 --- /dev/null +++ b/mh-system/src/main/java/com/mh/system/mapper/energy/EnergyQueryMapper.java @@ -0,0 +1,167 @@ +package com.mh.system.mapper.energy; + +import com.mh.common.core.domain.entity.ConsumptionAnalyzeEntity; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; + +import java.util.List; + +/** + * @author LJF + * @version 1.0 + * @project NewZhujiang_Server + * @description 能耗查询 + * @date 2023-12-13 16:00:01 + */ +@Mapper +public interface EnergyQueryMapper { + + /** + * 跨表查询操作 + * @param startTime 开始时间 + * @param endTime 结束时间 + * @param lastTableName 上一个表名 + * @param curTableName 当前表名 + * @return + */ + @Select("") + List queryManyTable(@Param("startTime") String startTime, + @Param("endTime") String endTime, + @Param("lastTableName") String lastTableName, + @Param("curTableName") String curTableName, + @Param("len") String dateLen, + @Param("paramType") String paramType, + @Param("haveMeter") boolean haveMeter, + @Param("haveCloud") boolean haveCloud); + + /** + * 单表查询操作 + * @param startTime 开始时间 + * @param endTime 结束时间 + * @param lastTableName 上一个表名 + * @param curTableName 当前表名 + * @return + */ + @Select("") + List queryOneTable(@Param("startTime") String startTime, + @Param("endTime") String endTime, + @Param("lastTableName") String lastTableName, + @Param("curTableName") String curTableName, + @Param("len") String dateLen, + @Param("paramType") String paramType, + @Param("haveMeter") boolean haveMeter, + @Param("haveCloud") boolean haveCloud); + +} diff --git a/mh-system/src/main/java/com/mh/system/service/device/ICollectionParamsManageService.java b/mh-system/src/main/java/com/mh/system/service/device/ICollectionParamsManageService.java index dfd54b1..230db69 100644 --- a/mh-system/src/main/java/com/mh/system/service/device/ICollectionParamsManageService.java +++ b/mh-system/src/main/java/com/mh/system/service/device/ICollectionParamsManageService.java @@ -2,6 +2,7 @@ package com.mh.system.service.device; import com.mh.common.core.domain.entity.CollectionParamsManage; +import java.util.ArrayList; import java.util.List; /** @@ -23,4 +24,9 @@ public interface ICollectionParamsManageService { int deleteCommunicationByIds(String[] cpmIds); + List queryAllDevice(); + + List queryCollectionParamsByMtType(String mtType); + + void updateCollectionParamsManages(ArrayList chillersEntities); } diff --git a/mh-system/src/main/java/com/mh/system/service/device/impl/CollectionParamsManageServiceImpl.java b/mh-system/src/main/java/com/mh/system/service/device/impl/CollectionParamsManageServiceImpl.java index 4c83cf0..4ded95b 100644 --- a/mh-system/src/main/java/com/mh/system/service/device/impl/CollectionParamsManageServiceImpl.java +++ b/mh-system/src/main/java/com/mh/system/service/device/impl/CollectionParamsManageServiceImpl.java @@ -3,10 +3,13 @@ package com.mh.system.service.device.impl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.mh.common.core.domain.entity.CollectionParamsManage; import com.mh.system.mapper.device.CollectionParamsManageMapper; +import com.mh.system.mapper.device.DeviceLedgerMapper; +import com.mh.system.mapper.device.GatewayManageMapper; import com.mh.system.service.device.ICollectionParamsManageService; import jakarta.annotation.Resource; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; /** @@ -22,6 +25,12 @@ public class CollectionParamsManageServiceImpl implements ICollectionParamsManag @Resource private CollectionParamsManageMapper collectionParamsManageMapper; + @Resource + private DeviceLedgerMapper deviceLedgerMapper; + + @Resource + private GatewayManageMapper gatewayManageMapper; + @Override public List selectCollectionParamsManageList(CollectionParamsManage communicationParams) { if (communicationParams == null) { @@ -75,4 +84,39 @@ public class CollectionParamsManageServiceImpl implements ICollectionParamsManag } return 0; } + + @Override + public List queryAllDevice() { + return collectionParamsManageMapper.selectList(null); + } + + @Override + public List queryCollectionParamsByMtType(String mtType) { + QueryWrapper queryWrapper = new QueryWrapper<>(); + if ("0".equals(mtType)) { + queryWrapper.eq("mt_type", mtType); + } else { + queryWrapper.ne("mt_type", mtType); + } + return collectionParamsManageMapper.selectList(queryWrapper); + } + + @Override + public void updateCollectionParamsManages(ArrayList chillersEntities) { + // 批量更新 + chillersEntities.forEach(chillerEntity -> { + collectionParamsManageMapper.updateById(chillerEntity); + }); + // 根据台账id更新台账在线情况 + // 根据chillerEntity使用stream流获取台账id分组数据 + chillersEntities.stream().map(CollectionParamsManage::getDeviceLedgerId).distinct().forEach(deviceLedgerId -> { + // 根据台账id 更新 device_ledger 在线情况,0:在线,1:离线 + deviceLedgerMapper.updateOnlineStatus(deviceLedgerId); + }); + // 根据网关id更新网关在线情况 + chillersEntities.stream().map(CollectionParamsManage::getGatewayId).distinct().forEach(gatewayId -> { + // 根据网关id 更新 gateway_manage 在线情况,0:在线,1:离线 + gatewayManageMapper.updateOnlineStatus(gatewayId); + }); + } } diff --git a/mh-system/src/main/java/com/mh/system/service/energy/EnergyAnalyzeService.java b/mh-system/src/main/java/com/mh/system/service/energy/EnergyAnalyzeService.java index ad263b1..70151ae 100644 --- a/mh-system/src/main/java/com/mh/system/service/energy/EnergyAnalyzeService.java +++ b/mh-system/src/main/java/com/mh/system/service/energy/EnergyAnalyzeService.java @@ -2,10 +2,10 @@ package com.mh.system.service.energy; import com.alibaba.fastjson2.JSONObject; import com.mh.common.core.domain.AjaxResult; -import com.mh.common.core.domain.ColumnFilter; import com.mh.common.core.domain.vo.EnergyQueryVO; import java.util.List; +import java.util.Map; /** * @author LJF @@ -23,4 +23,5 @@ public interface EnergyAnalyzeService { */ JSONObject sysAnalyze(EnergyQueryVO vo); + List> deviceAnalyze(EnergyQueryVO vo); } diff --git a/mh-system/src/main/java/com/mh/system/service/energy/IEnergyQueryService.java b/mh-system/src/main/java/com/mh/system/service/energy/IEnergyQueryService.java new file mode 100644 index 0000000..31a8dbb --- /dev/null +++ b/mh-system/src/main/java/com/mh/system/service/energy/IEnergyQueryService.java @@ -0,0 +1,23 @@ +package com.mh.system.service.energy; + +import com.mh.common.core.domain.AjaxResult; +import com.mh.common.core.domain.vo.EnergyQueryVO; + +/** + * @author LJF + * @version 1.0 + * @project NewZhujiang_Server + * @description 能源查询服务 + * @date 2023-12-13 15:50:36 + */ +public interface IEnergyQueryService { + + /** + * 能源查询,整个机房耗电量,制冷量,COP值 + * @param page + * @return + */ + AjaxResult sysQuery(EnergyQueryVO page); + + AjaxResult deviceTypeQuery(EnergyQueryVO page); +} diff --git a/mh-system/src/main/java/com/mh/system/service/energy/IEnergyService.java b/mh-system/src/main/java/com/mh/system/service/energy/IEnergyService.java new file mode 100644 index 0000000..02780e4 --- /dev/null +++ b/mh-system/src/main/java/com/mh/system/service/energy/IEnergyService.java @@ -0,0 +1,26 @@ +package com.mh.system.service.energy; + +import com.mh.common.core.domain.AjaxResult; +import com.mh.common.core.domain.vo.EnergyConsumptionVO; + +/** + * @author LJF + * @version 1.0 + * @project NewZhujiang_Server + * @description 系统能耗分析 + * @date 2023/7/18 14:05:40 + */ +public interface IEnergyService { + + AjaxResult yoy(EnergyConsumptionVO vo); + + AjaxResult mom(EnergyConsumptionVO vo); + + AjaxResult device(EnergyConsumptionVO vo); + + String queryCoolingData(String tableName, String timeString); + + AjaxResult yoyMom(EnergyConsumptionVO vo); + + AjaxResult sys(EnergyConsumptionVO vo); +} diff --git a/mh-system/src/main/java/com/mh/system/service/energy/impl/EnergyAnalyzeServiceImpl.java b/mh-system/src/main/java/com/mh/system/service/energy/impl/EnergyAnalyzeServiceImpl.java index 381bdcd..d0538dd 100644 --- a/mh-system/src/main/java/com/mh/system/service/energy/impl/EnergyAnalyzeServiceImpl.java +++ b/mh-system/src/main/java/com/mh/system/service/energy/impl/EnergyAnalyzeServiceImpl.java @@ -2,11 +2,14 @@ package com.mh.system.service.energy.impl; import com.alibaba.fastjson2.JSONObject; import com.mh.common.constant.Constants; +import com.mh.common.core.domain.AjaxResult; +import com.mh.common.core.domain.ColumnData; import com.mh.common.core.domain.ColumnFilter; import com.mh.common.core.domain.entity.CollectionParamsManage; import com.mh.common.core.domain.entity.DeviceTypeEnergyEntity; import com.mh.common.core.domain.vo.EnergyQueryVO; import com.mh.common.utils.DateUtils; +import com.mh.system.domain.SysOperLog; import com.mh.system.mapper.device.CollectionParamsManageMapper; import com.mh.system.mapper.energy.EnergyAnalyzeMapper; import com.mh.system.service.energy.EnergyAnalyzeService; @@ -142,6 +145,105 @@ public class EnergyAnalyzeServiceImpl implements EnergyAnalyzeService { return resultJson; } + @Override + public List> deviceAnalyze(EnergyQueryVO vo) { + DateUtils.sysEnergyDateChange(vo); + // 获取参数 + Map paramsMap = vo.getParams(); + List params = new ArrayList<>(paramsMap.values()); + if (params.isEmpty()) { + return List.of(); + } + AtomicReference lastTableName = new AtomicReference<>("data_" + vo.getTimeType()); + AtomicReference curTableName = new AtomicReference<>("data_" + vo.getTimeType()); + String timeType = vo.getTimeType(); + List deviceTypeEnergyEntities = null; + // 表格数据 + if ("month".equalsIgnoreCase(timeType) || "year".equalsIgnoreCase(timeType)) { + // 单表 + deviceTypeEnergyEntities = energyAnalyzeMapper.queryDeviceOneTable(vo.getStartTime(), vo.getEndTime(), lastTableName.get(), curTableName.get(), DateUtils.getTimeLen(vo.getTimeType()), params); + } else { + lastTableName.set(lastTableName + vo.getStartTime().substring(0, 4)); + curTableName.set(curTableName + vo.getEndTime().substring(0, 4)); + if (lastTableName.get().equalsIgnoreCase(curTableName.get())) { + // 单表 + deviceTypeEnergyEntities = energyAnalyzeMapper.queryDeviceOneTable(vo.getStartTime(), vo.getEndTime(), lastTableName.get(), curTableName.get(), DateUtils.getTimeLen(vo.getTimeType()), params); + } else { + // 多表 + deviceTypeEnergyEntities = energyAnalyzeMapper.queryDeviceManyTable(vo.getStartTime(), vo.getEndTime(), lastTableName.get(), curTableName.get(), DateUtils.getTimeLen(vo.getTimeType()), params); + } + } + if (null == deviceTypeEnergyEntities || deviceTypeEnergyEntities.isEmpty()) { + return List.of(); + } + // 分组拿出各个设备 + Map> deviceNumMap = deviceTypeEnergyEntities.stream() + .sorted(Comparator.comparing(DeviceTypeEnergyEntity::getTimeStr)) + .collect(Collectors.groupingBy(DeviceTypeEnergyEntity::getDeviceNum)); + + List title = new ArrayList<>(); + List timeStr = new ArrayList<>(); + List columnData = new ArrayList<>(); + // 组装对应需要查询的数据 + int i = 0; + for (Map.Entry> stringListEntry : deviceNumMap.entrySet()) { + title.add(stringListEntry.getKey()); + List value = stringListEntry.getValue(); + ColumnData column = new ColumnData(); + column.setName(stringListEntry.getKey()); + for (DeviceTypeEnergyEntity deviceTypeEnergyEntity : value) { + if (i == 0) { + timeStr.add(deviceTypeEnergyEntity.getTimeStr()); + } else { + break; + } + } + int pageNum = Integer.parseInt(String.valueOf(vo.getParams().get("pageNum"))); + int pageSize = Integer.parseInt(String.valueOf(vo.getParams().get("pageSize"))); + if (pageNum != 0) { + int startIndex = (pageNum-1)*pageSize; + int endIndex = Math.min(pageNum * pageSize, timeStr.size()); + value = value.subList(startIndex, endIndex); + column.setValue(getArr(value, timeStr.subList(startIndex, endIndex).toArray(new String[0]))); + } else { + column.setValue(getArr(value, timeStr.toArray(new String[0]))); + } + columnData.add(column); + i++; + } + int pageNum = Integer.parseInt(String.valueOf(vo.getParams().get("pageNum"))); + int pageSize = Integer.parseInt(String.valueOf(vo.getParams().get("pageSize"))); + Map map = new JSONObject(); + map.put("title", title); + if (pageNum == 0) { + map.put("timeStr", timeStr); + map.put("dataList", columnData); + } else { + int startIndex = (pageNum-1) * pageSize; + int endIndex = Math.min(pageNum * pageSize, timeStr.size()); + if (startIndex > endIndex) { + return List.of(); + } + map.put("timeStr", timeStr.subList(startIndex, endIndex)); + map.put("dataList", columnData); + } + return new ArrayList<>(map.entrySet()); + } + + private static String[] getArr(List copLineData, String[] lineTimeStrArr) { + String[] lineCopArr = new String[lineTimeStrArr.length]; + for (int i = 0; i < lineTimeStrArr.length; i++) { + int j = i; + Optional first = copLineData.stream().filter(s -> lineTimeStrArr[j].equalsIgnoreCase(s.getTimeStr())).findFirst(); + if (first.isPresent()) { + lineCopArr[i] = first.get().getCalcValue(); + } else { + lineCopArr[i] = "0.00"; + } + } + return lineCopArr; + } + private static List getResultData(List coolTower) { return coolTower.stream() .sorted(Comparator.comparing(DeviceTypeEnergyEntity::getGrade)) diff --git a/mh-system/src/main/java/com/mh/system/service/energy/impl/EnergyQueryServiceImpl.java b/mh-system/src/main/java/com/mh/system/service/energy/impl/EnergyQueryServiceImpl.java new file mode 100644 index 0000000..1893e09 --- /dev/null +++ b/mh-system/src/main/java/com/mh/system/service/energy/impl/EnergyQueryServiceImpl.java @@ -0,0 +1,162 @@ +package com.mh.system.service.energy.impl; + +import com.mh.common.core.domain.AjaxResult; +import com.mh.common.core.domain.entity.ConsumptionAnalyzeEntity; +import com.mh.common.core.domain.vo.EnergyQueryVO; +import com.mh.common.utils.DateUtils; +import com.mh.system.mapper.device.CollectionParamsManageMapper; +import com.mh.system.mapper.energy.EnergyQueryMapper; +import com.mh.system.service.energy.IEnergyQueryService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * @author LJF + * @version 1.0 + * @project NewZhujiang_Server + * @description 能源查询服务实现类 + * @date 2023-12-13 15:54:39 + */ +@Slf4j +@Service +public class EnergyQueryServiceImpl implements IEnergyQueryService { + + @Resource + private EnergyQueryMapper energyQueryMapper; + + @Resource + private CollectionParamsManageMapper collectionParamsManageMapper; + + @Override + public AjaxResult sysQuery(EnergyQueryVO vo) { + DateUtils.sysEnergyDateChange(vo); + // 获取参数 + AtomicReference lastTableName = new AtomicReference<>("data_" + vo.getTimeType()); + AtomicReference curTableName = new AtomicReference<>("data_" + vo.getTimeType()); + String timeType = vo.getTimeType(); + // 判断是否有总表 + boolean haveMeter = collectionParamsManageMapper.selectSummary(40, "meter") != 0; + boolean haveCloud = collectionParamsManageMapper.selectSummary(40, "cloud") != 0; + List consumptionAnalyzeEntities = null; + // 表格数据 + if ("month".equalsIgnoreCase(timeType) || "year".equalsIgnoreCase(timeType)) { + // 单表 + consumptionAnalyzeEntities = energyQueryMapper.queryOneTable(vo.getStartTime(), vo.getEndTime(), lastTableName.get(), curTableName.get(), DateUtils.getTimeLen(vo.getTimeType()), null, haveMeter, haveCloud); + } else { + lastTableName.set(lastTableName + vo.getStartTime().substring(0, 4)); + curTableName.set(curTableName + vo.getEndTime().substring(0, 4)); + if (lastTableName.get().equalsIgnoreCase(curTableName.get())) { + // 单表 + consumptionAnalyzeEntities = energyQueryMapper.queryOneTable(vo.getStartTime(), vo.getEndTime(), lastTableName.get(), curTableName.get(), DateUtils.getTimeLen(vo.getTimeType()), null, haveMeter, haveCloud); + } else { + // 多表 + consumptionAnalyzeEntities = energyQueryMapper.queryManyTable(vo.getStartTime(), vo.getEndTime(), lastTableName.get(), curTableName.get(), DateUtils.getTimeLen(vo.getTimeType()), null, haveMeter, haveCloud); + } + } + if (null == consumptionAnalyzeEntities || consumptionAnalyzeEntities.size() == 0) { + return AjaxResult.success(); + } + // 分组并按时间排序操作,拿到冷量记和电表数据 + Map> collect = consumptionAnalyzeEntities.stream() + .parallel() + .collect(Collectors.groupingBy(ConsumptionAnalyzeEntity::getDeviceType, HashMap::new, Collectors + .collectingAndThen(Collectors.toList(), + list -> list.stream().sorted(Comparator.comparing(ConsumptionAnalyzeEntity::getTimeStr)).collect(Collectors.toList())))); + List meterData = new ArrayList<>(); + List coldData = new ArrayList<>(); + for (Map.Entry> nmap : collect.entrySet()) { + // 获取电表的值 + if (nmap.getKey().equalsIgnoreCase("meter")) { + meterData = nmap.getValue(); + } + // 获取冷量计的值 + if (nmap.getKey().equalsIgnoreCase("cloud")) { + coldData = nmap.getValue(); + } + } + String[] copArr = new String[meterData.size()]; + String[] timeStrArr = meterData.stream() + .map(ConsumptionAnalyzeEntity::getTimeStr) + .toArray(String[]::new); + String[] meterArr = meterData.stream() + .map(ConsumptionAnalyzeEntity::getCurValue) + .toArray(String[]::new); + String[] coldArr = getArr(coldData, timeStrArr); + // 计算COP=制冷量/耗电量 + for (int i = 0; i < meterData.size(); i++) { + try { + double cold = Math.round(Double.parseDouble(coldArr[i]) * 100) / 100.0; + double meter = Math.round(Double.parseDouble(meterArr[i]) * 100) / 100.0; + double cop = Math.round((meter==0 ? 0.00 : cold/ meter) * 100) / 100.0; + copArr[i] = String.valueOf(cop); + } catch (NumberFormatException e) { + log.error("处理累计能耗异常==>",e); + throw new RuntimeException(e); + } + } + // 表格数据 + Map map = new HashMap<>(); + int pageNum = Integer.parseInt(String.valueOf(vo.getParams().get("pageNum"))); + int pageSize = Integer.parseInt(String.valueOf(vo.getParams().get("pageSize"))); + if (pageNum == 0) { + map.put("coldArr", coldArr); + map.put("meterArr", meterArr); + map.put("copArr", copArr); + map.put("timeStrArr", timeStrArr); + } else { + int startIndex = (pageNum-1)*pageSize; + int endIndex = Math.min(pageNum * pageSize, coldArr.length); + if (startIndex > endIndex) { + return AjaxResult.success(); + } + map.put("coldArr", Arrays.copyOfRange(coldArr, startIndex , endIndex)); + map.put("meterArr", Arrays.copyOfRange(meterArr, startIndex , endIndex)); + map.put("copArr", Arrays.copyOfRange(copArr, startIndex , endIndex)); + map.put("timeStrArr", Arrays.copyOfRange(timeStrArr, startIndex , endIndex)); + } + + // 组装赋值 + List> listData = new ArrayList<>(); + Map cold = new HashMap<>(); + cold.put("cold", map.get("coldArr")); + listData.add(cold); + Map meter = new HashMap<>(); + meter.put("meter", map.get("meterArr")); + listData.add(meter); + Map cop = new HashMap<>(); + cop.put("cop", map.get("copArr")); + listData.add(cop); + String[] titleArr = new String[]{"cold", "meter", "cop"}; + Map titles = new HashMap<>(); + titles.put("titleArr", titleArr); + listData.add(titles); + Map timeStr = new HashMap<>(); + timeStr.put("timeStrArr", map.get("timeStrArr")); + listData.add(timeStr); + return AjaxResult.success(listData); + } + + private static String[] getArr(List copLineData, String[] lineTimeStrArr) { + String[] lineCopArr = new String[lineTimeStrArr.length]; + for (int i = 0; i < lineTimeStrArr.length; i++) { + int j = i; + Optional first = copLineData.stream().filter(s -> lineTimeStrArr[j].equalsIgnoreCase(s.getTimeStr())).findFirst(); + if (first.isPresent()) { + lineCopArr[i] = first.get().getCurValue(); + } else { + lineCopArr[i] = "0.00"; + } + } + return lineCopArr; + } + + @Override + public AjaxResult deviceTypeQuery(EnergyQueryVO page) { + return null; + } +} diff --git a/mh-system/src/main/java/com/mh/system/service/energy/impl/EnergyServiceImpl.java b/mh-system/src/main/java/com/mh/system/service/energy/impl/EnergyServiceImpl.java new file mode 100644 index 0000000..ac2702b --- /dev/null +++ b/mh-system/src/main/java/com/mh/system/service/energy/impl/EnergyServiceImpl.java @@ -0,0 +1,590 @@ +package com.mh.system.service.energy.impl; + +import com.mh.common.core.domain.AjaxResult; +import com.mh.common.core.domain.dto.EnergyConsumptionDTO; +import com.mh.common.core.domain.entity.ConsumptionAnalyzeEntity; +import com.mh.common.core.domain.vo.EnergyConsumptionVO; +import com.mh.common.utils.DateUtils; +import com.mh.common.utils.EnergyThreadPoolService; +import com.mh.common.utils.StringUtils; +import com.mh.system.mapper.energy.EnergyMapper; +import com.mh.system.service.energy.IEnergyService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * @author LJF + * @version 1.0 + * @project NewZhujiang_Server + * @description 系统能耗分析 + * @date 2023/7/18 14:08:10 + */ +@Slf4j +@Service +public class EnergyServiceImpl implements IEnergyService { + + @Resource + private EnergyMapper energyMapper; + + @Override + public AjaxResult device(EnergyConsumptionVO vo) { + AtomicReference lastTableName = new AtomicReference<>("data_" + vo.getTimeType()); + AtomicReference curTableName = new AtomicReference<>("data_" + vo.getTimeType()); + String timeType = vo.getTimeType(); + ThreadPoolExecutor executor = EnergyThreadPoolService.getInstance(); + CountDownLatch latch = new CountDownLatch(2); + List>> futures = new ArrayList<>(); + futures.add(executor.submit(() -> { + List consumptionAnalyzeEntities; + if ("month".equalsIgnoreCase(timeType) || "year".equalsIgnoreCase(timeType)) { + // 单表 + consumptionAnalyzeEntities = energyMapper.queryDeviceOneTable(vo.getStartTime(), vo.getEndTime(), lastTableName.get(), curTableName.get(), getTimeLen(vo.getTimeType()), vo.getDeviceType()); + } else { + // 多表 + lastTableName.set(lastTableName + vo.getStartTime().substring(0, 4)); + curTableName.set(curTableName + vo.getEndTime().substring(0, 4)); + consumptionAnalyzeEntities = energyMapper.queryDeviceManyTable(vo.getStartTime(), vo.getEndTime(), lastTableName.get(), curTableName.get(), getTimeLen(vo.getTimeType()), vo.getDeviceType()); + } + if (null == consumptionAnalyzeEntities || consumptionAnalyzeEntities.size() == 0) { + latch.countDown(); + return null; + } + // 组装赋值 + String[] timeStrArr = consumptionAnalyzeEntities.stream() + .map(ConsumptionAnalyzeEntity::getTimeStr) + .toArray(String[]::new); + String[] meterArr = consumptionAnalyzeEntities.stream() + .map(ConsumptionAnalyzeEntity::getCurValue) + .toArray(String[]::new); + Map map = new HashMap<>(); + map.put("timeStrArr", timeStrArr); + map.put("meterArr", meterArr); + latch.countDown(); + return map; + })); + futures.add(executor.submit(() -> { + // 折线图,都查询min表 + // 多表 + String lastTable = "data_min" + vo.getStartTime().substring(0,4); + String curTable = "data_min" + vo.getEndTime().substring(0,4); + List consumptionAnalyzeEntities = energyMapper.queryDeviceLineManyTable(vo.getStartTime(), vo.getEndTime(), lastTable, curTable, vo.getDeviceType()); + if (null == consumptionAnalyzeEntities || consumptionAnalyzeEntities.size() == 0) { + latch.countDown(); + return null; + } + // 组装赋值 + String[] timeStrArr = consumptionAnalyzeEntities.stream() + .map(ConsumptionAnalyzeEntity::getTimeStr) + .toArray(String[]::new); + String[] meterArr = consumptionAnalyzeEntities.stream() + .map(ConsumptionAnalyzeEntity::getCurValue) + .toArray(String[]::new); + Map map = new HashMap<>(); + map.put("timeStrLineArr", timeStrArr); + map.put("meterLineArr", meterArr); + latch.countDown(); + return map; + })); + + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + Map allData = new HashMap<>(); + // 获取数据 + for (Future> future : futures) { + try { + Map map = future.get(); + if (null != map) { + allData.putAll(map); + } + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + EnergyConsumptionDTO energyConsumptionDTO = new EnergyConsumptionDTO(); + String[] titleArr = new String[]{"meter", "lineMeter"}; + energyConsumptionDTO.setTitles(titleArr); + energyConsumptionDTO.setTimes((String[]) allData.get("timeStrArr")); + energyConsumptionDTO.setLineTimes((String[]) allData.get("timeStrLineArr")); + List> listData = new ArrayList<>(); + Map meter = new HashMap<>(); + meter.put("meter",allData.get("meterArr")); + listData.add(meter); + Map lineMeter = new HashMap<>(); + lineMeter.put("lineMeter",allData.get("meterLineArr")); + listData.add(lineMeter); + energyConsumptionDTO.setData(listData); + return AjaxResult.success(energyConsumptionDTO); + } + + @Override + public AjaxResult sys(EnergyConsumptionVO vo) { + AtomicReference lastTableName = new AtomicReference<>("data_" + vo.getTimeType()); + AtomicReference curTableName = new AtomicReference<>("data_" + vo.getTimeType()); + String timeType = vo.getTimeType(); + EnergyConsumptionDTO energyConsumptionDTO = new EnergyConsumptionDTO(); + String[] titleArr = new String[]{"cold", "meter", "cop", "lineCop", "lineInstantaneousCold", "lineInstantaneousMeter"}; + ThreadPoolExecutor executor = EnergyThreadPoolService.getInstance(); + CountDownLatch latch = new CountDownLatch(2); + List>> futures = new ArrayList<>(); + futures.add(executor.submit(() -> { + List consumptionAnalyzeEntities = null; + // 表格数据 + if ("month".equalsIgnoreCase(timeType) || "year".equalsIgnoreCase(timeType)) { + // 单表 + consumptionAnalyzeEntities = energyMapper.queryOneTable(vo.getStartTime(), vo.getEndTime(), lastTableName.get(), curTableName.get(), getTimeLen(vo.getTimeType()), null); + } else { + lastTableName.set(lastTableName + vo.getStartTime().substring(0, 4)); + curTableName.set(curTableName + vo.getEndTime().substring(0, 4)); + if (lastTableName.get().equalsIgnoreCase(curTableName.get())) { + // 单表 + consumptionAnalyzeEntities = energyMapper.queryOneTable(vo.getStartTime(), vo.getEndTime(), lastTableName.get(), curTableName.get(), getTimeLen(vo.getTimeType()), null); + } else { + // 多表 + consumptionAnalyzeEntities = energyMapper.queryManyTable(vo.getStartTime(), vo.getEndTime(), lastTableName.get(), curTableName.get(), getTimeLen(vo.getTimeType()), null); + } + } + if (null == consumptionAnalyzeEntities || consumptionAnalyzeEntities.size() == 0) { + latch.countDown(); + return null; + } + // 分组并按时间排序操作,拿到冷量记和电表数据 + Map> collect = consumptionAnalyzeEntities.stream() + .parallel() + .collect(Collectors.groupingBy(ConsumptionAnalyzeEntity::getDeviceType, HashMap::new, Collectors + .collectingAndThen(Collectors.toList(), + list -> list.stream().sorted(Comparator.comparing(ConsumptionAnalyzeEntity::getTimeStr)).collect(Collectors.toList())))); + List meterData = new ArrayList<>(); + List coldData = new ArrayList<>(); + for (Map.Entry> nmap : collect.entrySet()) { + // 获取电表的值 + if (nmap.getKey().equalsIgnoreCase("meter")) { + meterData = nmap.getValue(); + } + // 获取冷量计的值 + if (nmap.getKey().equalsIgnoreCase("cloud")) { + coldData = nmap.getValue(); + } + } + String[] copArr = new String[meterData.size()]; + String[] timeStrArr = meterData.stream() + .map(ConsumptionAnalyzeEntity::getTimeStr) + .toArray(String[]::new); + String[] meterArr = meterData.stream() + .map(ConsumptionAnalyzeEntity::getCurValue) + .toArray(String[]::new); + String[] coldArr = getArr(coldData, timeStrArr); + // 计算COP=制冷量/耗电量 + for (int i = 0; i < meterData.size(); i++) { + try { + double cold = Math.round(Double.parseDouble(coldArr[i]) * 100) / 100.0; + double meter = Math.round(Double.parseDouble(meterArr[i]) * 100) / 100.0; + double cop = Math.round((meter==0 ? 0.00 : cold/ meter) * 100) / 100.0; + copArr[i] = String.valueOf(cop); + } catch (NumberFormatException e) { + log.error("处理累计能耗异常==>",e); + throw new RuntimeException(e); + } + } + // 表格数据 + Map map = new HashMap<>(); + map.put("coldArr", coldArr); + map.put("meterArr", meterArr); + map.put("copArr", copArr); + map.put("timeStrArr", timeStrArr); + latch.countDown(); + return map; + })); + futures.add(executor.submit(() -> { + // 折线图数据,都是查询的分钟表,最低颗粒度 + List lineData; + String lastTable = "data_min" + vo.getStartTime().substring(0, 4); + String curTable = "data_min" + vo.getEndTime().substring(0, 4); + String lastCopTable = "data_min_cop"; + String curCopTable = "data_min_cop"; + if (lastTable.equalsIgnoreCase(curTable)) { + // 查询单表 + lineData = energyMapper.queryLineDataSysByOne(curTable, curCopTable, vo.getStartTime(), vo.getEndTime()); + } else { + // 查询多表 + lineData = energyMapper.queryLineDataSysByMany(lastTable, curTable, lastCopTable, curCopTable, vo.getStartTime(), vo.getEndTime()); + } + if (null == lineData) { + latch.countDown(); + return null; + } + // 分组并按时间排序操作,拿到冷量记,电表,cop数据 + Map> collectLine = lineData.stream() + .parallel() + .collect(Collectors.groupingBy(ConsumptionAnalyzeEntity::getDeviceType, HashMap::new, Collectors + .collectingAndThen(Collectors.toList(), + list -> list.stream().sorted(Comparator.comparing(ConsumptionAnalyzeEntity::getTimeStr)).collect(Collectors.toList())))); +// List meterLineData = new ArrayList<>(); +// List coldLineData = new ArrayList<>(); +// List copLineData = new ArrayList<>(); + List efrColdLineData = new ArrayList<>(); + List efrMeterLineData = new ArrayList<>(); + for (Map.Entry> nmap : collectLine.entrySet()) { +// // 获取电表的值 +// if (nmap.getKey().equalsIgnoreCase("meter")) { +// meterLineData = nmap.getValue(); +// } +// // 获取冷量计的值 +// if (nmap.getKey().equalsIgnoreCase("cloud")) { +// coldLineData = nmap.getValue(); +// } +// // 获取cop的值 +// if (nmap.getKey().equalsIgnoreCase("cop")) { +// copLineData = nmap.getValue(); +// } + // 获取瞬时冷量计的值 + if (nmap.getKey().equalsIgnoreCase("efrCloud")) { + efrColdLineData = nmap.getValue(); + } + // 获取瞬时功率的值 + if (nmap.getKey().equalsIgnoreCase("efrMeter")) { + efrMeterLineData = nmap.getValue(); + } + } + String[] lineTimeStrArr = efrMeterLineData.stream() + .map(ConsumptionAnalyzeEntity::getTimeStr) + .toArray(String[]::new); +// String[] lineMeterArr = meterLineData.stream() +// .map(ConsumptionAnalyzeEntity::getCurValue) +// .toArray(String[]::new); + String[] lineInstantaneousMeterArr = getArr(efrMeterLineData, lineTimeStrArr); +// String[] lineColdArr = getArr(coldLineData, lineTimeStrArr); +// String[] lineCopArr = getArr(copLineData, lineTimeStrArr); + String[] lineEfrColdArr = getArr(efrColdLineData, lineTimeStrArr); + String[] lineCopArr = new String[efrMeterLineData.size()]; + // 计算瞬时COP=制冷量/耗电量 + for (int i = 0; i < efrMeterLineData.size(); i++) { + try { + double cold = Math.round(Double.parseDouble(lineEfrColdArr[i]) * 100) / 100.0; + double meter = Math.round(Double.parseDouble(lineInstantaneousMeterArr[i]) * 100) / 100.0; + double cop = Math.round((meter==0 ? 0.00 : cold/ meter) * 100) / 100.0; + lineCopArr[i] = String.valueOf(cop); + } catch (NumberFormatException e) { + log.error("处理累计能耗异常==>",e); + throw new RuntimeException(e); + } + } + // 折现图数据 + Map map = new HashMap<>(); + map.put("lineTimeStrArr", lineTimeStrArr); +// map.put("lineMeterArr", lineMeterArr); +// map.put("lineColdArr", lineColdArr); + map.put("lineCopArr", lineCopArr); + map.put("lineInstantaneousColdArr", lineEfrColdArr); + map.put("lineInstantaneousMeterArr", lineInstantaneousMeterArr); + latch.countDown(); + return map; + })); + + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + Map allData = new HashMap<>(); + // 获取数据 + for (Future> future : futures) { + try { + Map map = future.get(); + if (null != map) { + allData.putAll(map); + } + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + // 组装赋值 + energyConsumptionDTO.setTitles(titleArr); + energyConsumptionDTO.setTimes((String[]) allData.get("timeStrArr")); + energyConsumptionDTO.setLineTimes((String[]) allData.get("lineTimeStrArr")); + List> listData = new ArrayList<>(); + Map cold = new HashMap<>(); + cold.put("cold", allData.get("coldArr")); + listData.add(cold); + Map meter = new HashMap<>(); + meter.put("meter", allData.get("meterArr")); + listData.add(meter); + Map cop = new HashMap<>(); + cop.put("cop", allData.get("copArr")); + listData.add(cop); + Map coldLine = new HashMap<>(); + coldLine.put("lineCold", allData.get("lineColdArr")); + listData.add(coldLine); + Map meterLine = new HashMap<>(); + meterLine.put("lineMeter", allData.get("lineMeterArr")); + listData.add(meterLine); + Map copLine = new HashMap<>(); + copLine.put("lineCop",allData.get("lineCopArr")); + listData.add(copLine); + Map instantaneousColdLine = new HashMap<>(); + instantaneousColdLine.put("lineInstantaneousCold",allData.get("lineInstantaneousColdArr")); + listData.add(instantaneousColdLine); + Map instantaneousMeterLine = new HashMap<>(); + instantaneousMeterLine.put("lineInstantaneousMeter",allData.get("lineInstantaneousMeterArr")); + listData.add(instantaneousMeterLine); + energyConsumptionDTO.setData(listData); + return AjaxResult.success(energyConsumptionDTO); + } + + private static String[] getArr(List copLineData, String[] lineTimeStrArr) { + String[] lineCopArr = new String[lineTimeStrArr.length]; + for (int i = 0; i < lineTimeStrArr.length; i++) { + int j = i; + Optional first = copLineData.stream().filter(s -> lineTimeStrArr[j].equalsIgnoreCase(s.getTimeStr())).findFirst(); + if (first.isPresent()) { + lineCopArr[i] = first.get().getCurValue(); + } else { + lineCopArr[i] = "0.00"; + } + } + return lineCopArr; + } + + @Override + public AjaxResult yoy(EnergyConsumptionVO vo) { + // 获取今年的值 + List consumptionAnalyzeEntities; + String lastTableName = "data_"+vo.getTimeType(); + String curTableName = "data_"+vo.getTimeType(); + String startTime = vo.getStartTime(); + String endTime = vo.getEndTime(); + String deviceType = vo.getParamType(); + if ("cop".equalsIgnoreCase(vo.getParamType())) { + lastTableName = "data_min_cop"; + curTableName = "data_min_cop"; + deviceType = "system"; + } else { + lastTableName = lastTableName+startTime.substring(0,4); + curTableName = curTableName+endTime.substring(0,4); + } + String timeType = vo.getTimeType(); + if (!(lastTableName.equalsIgnoreCase(curTableName)) && ("hour".equalsIgnoreCase(timeType) || "day".equalsIgnoreCase(timeType))) { + // 多表 + // 获取上一期的时间 + startTime = DateUtils.yoyDate(startTime); + endTime = DateUtils.yoyDate(endTime); + String yoyLastTableName = lastTableName.substring(0, lastTableName.length() - 4) + startTime.substring(0,4); + String yoyCurTableName = curTableName.substring(0, lastTableName.length() - 4) + endTime.substring(0,4); + consumptionAnalyzeEntities = energyMapper.queryManyTableYoy(vo.getStartTime(), vo.getEndTime(), lastTableName, curTableName, + startTime, endTime, yoyLastTableName, yoyCurTableName, getTimeLen(vo.getTimeType()), deviceType, vo.getTimeType()); + } else { + // 单表 + startTime = DateUtils.yoyDate(startTime); + endTime = DateUtils.yoyDate(endTime); + if (!"cop".equalsIgnoreCase(vo.getParamType())) { + String lastTable = lastTableName.substring(0, lastTableName.length() - 4); + if (("month".equalsIgnoreCase(timeType) || "year".equalsIgnoreCase(timeType))) { + lastTableName = lastTable; + curTableName = curTableName.substring(0, curTableName.length() - 4); + } else { + lastTableName = lastTable + startTime.substring(0, 4); + } + } else { + deviceType = "system"; + } + consumptionAnalyzeEntities = energyMapper.queryOneTableYoy(vo.getStartTime(), vo.getEndTime(), lastTableName, curTableName, + startTime, endTime, getTimeLen(vo.getTimeType()), deviceType, vo.getTimeType()); + } + return getConsumptionDTOHttpResult(consumptionAnalyzeEntities, "yoy"); + } + + @Override + public AjaxResult mom(EnergyConsumptionVO vo) { + List consumptionAnalyzeEntities = null; + // 开始时间和结束时间都往前-1月或者年 + String startTime = DateUtils.momDate(vo.getStartTime(), vo.getTimeType(), "start"); + String endTime = DateUtils.momDate(vo.getEndTime(), vo.getTimeType(), "end"); + String deviceType = vo.getParamType(); + // 判断查询时间类型是month,year + if ("month".equalsIgnoreCase(vo.getTimeType()) || "year".equalsIgnoreCase(vo.getTimeType())) { + String tableName = "data_"+vo.getTimeType(); + // 计算COP值(后期有一个表存储) + if (!StringUtils.isBlank(vo.getParamType()) && vo.getParamType().equalsIgnoreCase("cop")) { + tableName = "data_min_cop"; + deviceType = "system"; + } + consumptionAnalyzeEntities = energyMapper.queryMonthAndYearMom(vo.getStartTime(), vo.getEndTime(), startTime, endTime, tableName, getTimeLen(vo.getTimeType()), deviceType, vo.getTimeType()); + } else { + // 判斷是否需要進行連表查詢 + String tableName1 = "data_"+vo.getTimeType()+startTime.substring(0, 4); + String tableName2 = "data_"+vo.getTimeType()+vo.getEndTime().substring(0, 4); + // 计算COP值(后期有一个表存储) + if (!StringUtils.isBlank(vo.getParamType()) && vo.getParamType().equalsIgnoreCase("cop")) { + tableName1 = "data_min_cop"; + tableName2 = "data_min_cop"; + deviceType = "system"; + } + if (tableName1.equalsIgnoreCase(tableName2)) { + // 单表查询 + consumptionAnalyzeEntities = energyMapper.queryOneTableMom(vo.getStartTime(), vo.getEndTime(), startTime, endTime, tableName1, getTimeLen(vo.getTimeType()), deviceType, vo.getTimeType()); + } else { + // 多表查询 + consumptionAnalyzeEntities = energyMapper.queryManyTableMom(vo.getStartTime(), vo.getEndTime(), startTime, endTime, tableName1, tableName2, getTimeLen(vo.getTimeType()), deviceType, vo.getTimeType()); + } + } + return getConsumptionDTOHttpResult(consumptionAnalyzeEntities, "mom"); + } + + @Override + public AjaxResult yoyMom(EnergyConsumptionVO vo) { + // 多线程分别求出yoy和mom + ThreadPoolExecutor executor = EnergyThreadPoolService.getInstance(); + CountDownLatch latch = new CountDownLatch(2); + List>> futures = new ArrayList<>(); + futures.add(executor.submit(() -> { + Map map = new HashMap<>(); + map.put("yoy", yoy(vo)); + latch.countDown(); + return map; + })); + futures.add(executor.submit(() -> { + Map map = new HashMap<>(); + map.put("mom", mom(vo)); + latch.countDown(); + return map; + })); + + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Map allData = new HashMap<>(); + // 获取数据 + for (Future> future : futures) { + try { + Map map = future.get(); + if (null != map) { + allData.putAll(map); + } + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + // 封装数据 + if (allData.containsKey("yoy") && allData.containsKey("mom")) { + AjaxResult yoy = (AjaxResult) allData.get("yoy"); + AjaxResult mom = (AjaxResult) allData.get("mom"); + if (yoy.isSuccess() && mom.isSuccess()) { + EnergyConsumptionDTO yoyData = (EnergyConsumptionDTO) yoy.get("data"); + EnergyConsumptionDTO momData = (EnergyConsumptionDTO) mom.get("data"); + EnergyConsumptionDTO result = new EnergyConsumptionDTO(); + result.setTitles(new String[]{"curValue", "lastValue", "yoy", "mom"}); + result.setTimes(yoyData.getTimes()); + result.setLineTimes(yoyData.getLineTimes()); + List> data = new ArrayList<>(); + // 同比数据 + Map lastValueMap = new HashMap<>(); + Map yoyMap = new HashMap<>(); + for (int i = 0; i < yoyData.getData().size(); i++) { + if (yoyData.getData().get(i).containsKey("lastValue")) { + lastValueMap.put("lastValue", yoyData.getData().get(i).get("lastValue")); + data.add(lastValueMap); + } + if (yoyData.getData().get(i).containsKey("yoy")) { + yoyMap.put("yoy", yoyData.getData().get(i).get("yoy")); + data.add(yoyMap); + } + } + // 环比数据 + Map curValMap = new HashMap<>(); + Map momMap = new HashMap<>(); + for (int i = 0; i < momData.getData().size(); i++) { + if (momData.getData().get(i).containsKey("curValue")) { + curValMap.put("curValue", momData.getData().get(i).get("curValue")); + data.add(curValMap); + } + if (momData.getData().get(i).containsKey("mom")) { + momMap.put("mom", momData.getData().get(i).get("mom")); + data.add(momMap); + } + } + result.setData(data); + return AjaxResult.success(result); + } + } + return AjaxResult.success(null); + } + + private static AjaxResult getConsumptionDTOHttpResult(List consumptionAnalyzeEntities, String compareType) { + EnergyConsumptionDTO energyConsumptionDTO = new EnergyConsumptionDTO(); + if (null == consumptionAnalyzeEntities || consumptionAnalyzeEntities.size() == 0) { + return AjaxResult.success(); + } + String[] lastValue = new String[consumptionAnalyzeEntities.size()]; + String[] curValue = new String[consumptionAnalyzeEntities.size()]; + String[] yoyValue = new String[consumptionAnalyzeEntities.size()]; + String[] timeStrArr = new String[consumptionAnalyzeEntities.size()]; + String[] titleArr = new String[]{"curValue", "lastValue", compareType}; + for (int i = 0; i < consumptionAnalyzeEntities.size(); i++) { + ConsumptionAnalyzeEntity entity = consumptionAnalyzeEntities.get(i); + lastValue[i] = entity.getLastValue(); + curValue[i] = entity.getCurValue(); + if ("yoy".equalsIgnoreCase(compareType)) { + yoyValue[i] = entity.getYoy(); + } + if ("mom".equalsIgnoreCase(compareType)) { + yoyValue[i] = entity.getMom(); + } + timeStrArr[i] = entity.getTimeStr(); + } + energyConsumptionDTO.setTitles(titleArr); + energyConsumptionDTO.setTimes(timeStrArr); + List> listData = new ArrayList<>(); + Map cur = new HashMap<>(); + cur.put("curValue",curValue); + listData.add(cur); + Map last = new HashMap<>(); + last.put("lastValue",lastValue); + listData.add(last); + Map yoy = new HashMap<>(); + yoy.put(compareType,yoyValue); + listData.add(yoy); + energyConsumptionDTO.setData(listData); + return AjaxResult.success(energyConsumptionDTO); + } + + private static String getTimeLen(String timeType) { + String timeLen = ""; + switch (timeType) { + case "hour": + timeLen = "13"; + break; + case "day": + timeLen = "10"; + break; + case "month": + timeLen = "7"; + break; + case "year": + timeLen = "4"; + break; + } + return timeLen; + } + + @Override + public String queryCoolingData(String tableName, String timeString) { + return energyMapper.queryCoolingData(tableName, timeString); + } +} diff --git a/mh-system/src/main/resources/mapper/system/EnergyMapper.xml b/mh-system/src/main/resources/mapper/system/EnergyMapper.xml new file mode 100644 index 0000000..33d5b4b --- /dev/null +++ b/mh-system/src/main/resources/mapper/system/EnergyMapper.xml @@ -0,0 +1,803 @@ + + + + + + + ,month(cur_time) + + + ,month(cur_time),day(cur_time) + + + ,month(cur_time),day(cur_time),datepart(hour, cur_time) + + + + + + ,month(cur_time) as monthStr + + + ,month(cur_time) as monthStr,day(cur_time) as dayStr + + + ,month(cur_time) as monthStr,day(cur_time) as dayStr,datepart(hour, cur_time) as hourStr + + + + + + ,month(cur_time) + + + ,month(dateadd(day,1,cur_time)),day(dateadd(day,1,cur_time)) + + + ,month(dateadd(hour,1,cur_time)),day(dateadd(hour,1,cur_time)),datepart(hour, dateadd(hour,1,cur_time)) + + + + + ,month(cur_time) as monthStr + + + ,month(dateadd(day,1,cur_time)) as monthStr,day(dateadd(day,1,cur_time)) as dayStr + + + ,month(dateadd(hour,1,cur_time)) as monthStr,day(dateadd(hour,1,cur_time)) as dayStr,datepart(hour, dateadd(hour,1,cur_time)) as hourStr + + + + + + and a.monthStr = b.monthStr + + + and a.monthStr = b.monthStr and a.dayStr = b.dayStr + + + and a.monthStr = b.monthStr and a.dayStr = b.dayStr and a.hourStr = b.hourStr + + order by timeStr + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index b34d4b5..30a5645 100644 --- a/pom.xml +++ b/pom.xml @@ -36,10 +36,12 @@ 6.0.0 2.6.0 3.5.3 - 6.4.1 + 6.3.4 + 3.4.2 1.18.36 6.2.1 3.2.1 + 3.4.2 @@ -246,6 +248,11 @@ spring-integration-mqtt ${mqtt.version} + + org.springframework.boot + spring-boot-starter-integration + ${integration.version} + @@ -268,6 +275,13 @@ ${easyexcel.version} + + + org.springframework.boot + spring-boot-starter-amqp + ${amqp.version} + + diff --git a/sql/创建冷水机组参数表存储过程.sql b/sql/创建冷水机组参数表存储过程.sql new file mode 100644 index 0000000..01ff98d --- /dev/null +++ b/sql/创建冷水机组参数表存储过程.sql @@ -0,0 +1,153 @@ +CREATE OR REPLACE FUNCTION pro_create_chillers_table() +RETURNS VOID AS $$ +DECLARE +year VARCHAR(4); + sqlStr TEXT; + table_name VARCHAR(100); + is_exists INTEGER; +BEGIN + -- 获取当前年份 +year := TO_CHAR(CURRENT_DATE, 'YYYY'); + + -- 判断是否存在表 chillers_data_min + year + table_name := 'chillers_data_min' || year; +SELECT COUNT(*) INTO is_exists FROM pg_tables WHERE tablename = table_name; +IF is_exists = 0 THEN + sqlStr := 'CREATE TABLE ' || table_name || ' ( + id BIGSERIAL PRIMARY KEY, + device_code VARCHAR(20), + device_num VARCHAR(20), + collection_num VARCHAR(20), + register_address VARCHAR(50), + register_name VARCHAR(50), + fun_code VARCHAR(10), + last_value NUMERIC(24,2), + last_time TIMESTAMP, + cur_value NUMERIC(24,2), + cur_time TIMESTAMP, + ratio INTEGER, + calc_value NUMERIC(24,2), + local_time TIMESTAMP, + grade INTEGER, + ddc_addr VARCHAR(50), + register_id INTEGER + ); + CREATE INDEX cls_' || table_name || ' ON ' || table_name || ' (cur_time DESC)'; + RAISE NOTICE '%', sqlStr; +EXECUTE sqlStr; +END IF; + + -- 判断是否存在表 chillers_data_hour + year + table_name := 'chillers_data_hour' || year; +SELECT COUNT(*) INTO is_exists FROM pg_tables WHERE tablename = table_name; +IF is_exists = 0 THEN + sqlStr := 'CREATE TABLE ' || table_name || ' ( + id BIGSERIAL PRIMARY KEY, + device_code VARCHAR(20), + device_num VARCHAR(20), + collection_num VARCHAR(20), + register_address VARCHAR(50), + register_name VARCHAR(50), + fun_code VARCHAR(10), + last_value NUMERIC(24,2), + last_time VARCHAR(13), + cur_value NUMERIC(24,2), + cur_time VARCHAR(13), + ratio INTEGER, + calc_value NUMERIC(24,2), + local_time TIMESTAMP, + grade INTEGER, + ddc_addr VARCHAR(50), + register_id INTEGER + ); + CREATE INDEX cls_' || table_name || ' ON ' || table_name || ' (cur_time DESC)'; + RAISE NOTICE '%', sqlStr; +EXECUTE sqlStr; +END IF; + + -- 判断是否存在表 chillers_data_day + year + table_name := 'chillers_data_day' || year; +SELECT COUNT(*) INTO is_exists FROM pg_tables WHERE tablename = table_name; +IF is_exists = 0 THEN + sqlStr := 'CREATE TABLE ' || table_name || ' ( + id BIGSERIAL PRIMARY KEY, + device_code VARCHAR(20), + device_num VARCHAR(20), + collection_num VARCHAR(20), + register_address VARCHAR(50), + register_name VARCHAR(50), + fun_code VARCHAR(10), + last_value NUMERIC(24,2), + last_time VARCHAR(10), + cur_value NUMERIC(24,2), + cur_time VARCHAR(10), + ratio INTEGER, + calc_value NUMERIC(24,2), + local_time TIMESTAMP, + grade INTEGER, + ddc_addr VARCHAR(50), + register_id INTEGER + ); + CREATE INDEX cls_' || table_name || ' ON ' || table_name || ' (cur_time DESC)'; + RAISE NOTICE '%', sqlStr; +EXECUTE sqlStr; +END IF; + +-- 判断是否存在表 chillers_data_month + table_name := 'chillers_data_month'; +SELECT COUNT(*) INTO is_exists FROM pg_tables WHERE tablename = table_name; +IF is_exists = 0 THEN + sqlStr := 'CREATE TABLE ' || table_name || ' ( + id BIGSERIAL PRIMARY KEY, + device_code VARCHAR(20), + device_num VARCHAR(20), + collection_num VARCHAR(20), + register_address VARCHAR(50), + register_name VARCHAR(50), + fun_code VARCHAR(10), + last_value NUMERIC(24,2), + last_time VARCHAR(10), + cur_value NUMERIC(24,2), + cur_time VARCHAR(10), + ratio INTEGER, + calc_value NUMERIC(24,2), + local_time TIMESTAMP, + grade INTEGER, + ddc_addr VARCHAR(50), + register_id INTEGER + ); + CREATE INDEX cls_' || table_name || ' ON ' || table_name || ' (cur_time DESC)'; + RAISE NOTICE '%', sqlStr; +EXECUTE sqlStr; +END IF; + +-- 判断是否存在表 chillers_data_year + table_name := 'chillers_data_year'; +SELECT COUNT(*) INTO is_exists FROM pg_tables WHERE tablename = table_name; +IF is_exists = 0 THEN + sqlStr := 'CREATE TABLE ' || table_name || ' ( + id BIGSERIAL PRIMARY KEY, + device_code VARCHAR(20), + device_num VARCHAR(20), + collection_num VARCHAR(20), + register_address VARCHAR(50), + register_name VARCHAR(50), + fun_code VARCHAR(10), + last_value NUMERIC(24,2), + last_time VARCHAR(10), + cur_value NUMERIC(24,2), + cur_time VARCHAR(10), + ratio INTEGER, + calc_value NUMERIC(24,2), + local_time TIMESTAMP, + grade INTEGER, + ddc_addr VARCHAR(50), + register_id INTEGER + ); + CREATE INDEX cls_' || table_name || ' ON ' || table_name || ' (cur_time DESC)'; + RAISE NOTICE '%', sqlStr; +EXECUTE sqlStr; +END IF; + +END; +$$ LANGUAGE plpgsql; diff --git a/sql/创建计量设备表存储过程.sql b/sql/创建计量设备表存储过程.sql new file mode 100644 index 0000000..53c4c9b --- /dev/null +++ b/sql/创建计量设备表存储过程.sql @@ -0,0 +1,133 @@ +CREATE OR REPLACE FUNCTION pro_create_table() +RETURNS VOID AS $$ +DECLARE +year VARCHAR(4); + sqlStr TEXT; + table_name VARCHAR(100); + is_exists INTEGER; +BEGIN + -- 获取当前年份 +year := TO_CHAR(CURRENT_DATE, 'YYYY'); + + -- 判断是否存在 data_min + 当前年份 的表 + table_name := 'data_min' || year; +SELECT COUNT(*) INTO is_exists FROM pg_tables WHERE tablename = table_name; +IF is_exists = 0 THEN + sqlStr := 'CREATE TABLE ' || table_name || ' ( + id BIGSERIAL PRIMARY KEY, + device_num VARCHAR(20), + device_code VARCHAR(20), + device_type VARCHAR(100), + last_value NUMERIC(24,2), + last_time TIMESTAMP, + cur_value NUMERIC(24,2), + cur_time TIMESTAMP, + used_value NUMERIC(24,2), + ratio INTEGER, + calc_value NUMERIC(24,2), + grade INTEGER, + UNIQUE (device_num, cur_time) + ); + CREATE INDEX cls_' || table_name || ' ON ' || table_name || ' (cur_time DESC)'; + RAISE NOTICE '%', sqlStr; +EXECUTE sqlStr; +END IF; + + -- 判断是否存在 data_hour + 当前年份 的表 + table_name := 'data_hour' || year; +SELECT COUNT(*) INTO is_exists FROM pg_tables WHERE tablename = table_name; +IF is_exists = 0 THEN + sqlStr := 'CREATE TABLE ' || table_name || ' ( + id BIGSERIAL PRIMARY KEY, + device_num VARCHAR(20), + device_code VARCHAR(20), + device_type VARCHAR(100), + last_value NUMERIC(24,2), + last_time TIMESTAMP, + cur_value NUMERIC(24,2), + cur_time TIMESTAMP, + used_value NUMERIC(24,2), + ratio INTEGER, + calc_value NUMERIC(24,2), + grade INTEGER, + UNIQUE (device_num, cur_time) + ); + CREATE INDEX cls_' || table_name || ' ON ' || table_name || ' (cur_time DESC)'; + RAISE NOTICE '%', sqlStr; +EXECUTE sqlStr; +END IF; + + -- 判断是否存在 data_day + 当前年份 的表 + table_name := 'data_day' || year; +SELECT COUNT(*) INTO is_exists FROM pg_tables WHERE tablename = table_name; +IF is_exists = 0 THEN + sqlStr := 'CREATE TABLE ' || table_name || ' ( + id BIGSERIAL PRIMARY KEY, + device_num VARCHAR(20), + device_code VARCHAR(20), + device_type VARCHAR(100), + last_value NUMERIC(24,2), + last_time TIMESTAMP, + cur_value NUMERIC(24,2), + cur_time TIMESTAMP, + used_value NUMERIC(24,2), + ratio INTEGER, + calc_value NUMERIC(24,2), + grade INTEGER, + UNIQUE (device_num, cur_time) + ); + CREATE INDEX cls_' || table_name || ' ON ' || table_name || ' (cur_time DESC)'; + RAISE NOTICE '%', sqlStr; +EXECUTE sqlStr; +END IF; + + -- 判断是否存在 data_month 的表 + table_name := 'data_month'; +SELECT COUNT(*) INTO is_exists FROM pg_tables WHERE tablename = table_name; +IF is_exists = 0 THEN + sqlStr := 'CREATE TABLE ' || table_name || ' ( + id BIGSERIAL PRIMARY KEY, + device_num VARCHAR(20), + device_code VARCHAR(20), + device_type VARCHAR(100), + last_value NUMERIC(24,2), + last_time TIMESTAMP, + cur_value NUMERIC(24,2), + cur_time TIMESTAMP, + used_value NUMERIC(24,2), + ratio INTEGER, + calc_value NUMERIC(24,2), + grade INTEGER, + UNIQUE (device_num, cur_time) + ); + CREATE INDEX cls_' || table_name || ' ON ' || table_name || ' (cur_time DESC)'; + RAISE NOTICE '%', sqlStr; +EXECUTE sqlStr; +END IF; + + -- 判断是否存在 data_year 的表 + table_name := 'data_year'; +SELECT COUNT(*) INTO is_exists FROM pg_tables WHERE tablename = table_name; +IF is_exists = 0 THEN + sqlStr := 'CREATE TABLE ' || table_name || ' ( + id BIGSERIAL PRIMARY KEY, + device_num VARCHAR(20), + device_code VARCHAR(20), + device_type VARCHAR(100), + last_value NUMERIC(24,2), + last_time TIMESTAMP, + cur_value NUMERIC(24,2), + cur_time TIMESTAMP, + used_value NUMERIC(24,2), + ratio INTEGER, + calc_value NUMERIC(24,2), + grade INTEGER, + UNIQUE (device_num, cur_time) + ); + CREATE INDEX cls_' || table_name || ' ON ' || table_name || ' (cur_time DESC)'; + RAISE NOTICE '%', sqlStr; +EXECUTE sqlStr; +END IF; + +END; +$$ LANGUAGE plpgsql;