Browse Source

websocketbug修复

2024年12月23日15:40:45
thing_master
lishuai 1 year ago
parent
commit
827a84de87
  1. 73
      modules/thing/src/main/java/com/thing/websocket/QueueSocketEventListener.java
  2. 45
      modules/thing/src/main/java/com/thing/websocket/WebSocketServer.java

73
modules/thing/src/main/java/com/thing/websocket/QueueSocketEventListener.java

@ -2,11 +2,10 @@ package com.thing.websocket;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.map.MapUtil;
import com.google.common.collect.Lists;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import com.thing.common.core.event.QueueSocketEvent;
import com.thing.common.core.message.MessageData;
@ -15,10 +14,7 @@ import com.thing.thing.tskv.dto.ThingAttrDTO;
import com.thing.thing.tskv.dto.TsKvDTO;
import com.thing.thing.tskv.service.TskvService;
import com.thing.websocket.data.SocketDataCache;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@ -27,7 +23,6 @@ import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
@ -54,10 +49,13 @@ public class QueueSocketEventListener {
//超级API
if (MapUtil.isNotEmpty(webSocketServer.API_SOCKET_MAP)) {
webSocketServer.API_SOCKET_MAP.forEach((sessionId, thingList) -> {
JsonObject jsonObject = getApiMap(lastAttrValMap, thingList);
if (jsonObject != null && !jsonObject.isJsonNull() && !jsonObject.entrySet().isEmpty()) {
JSONObject jsonObject = getApiMap(lastAttrValMap, thingList);
if(null != jsonObject){
JSONObject result = jsonObject.getJSONObject("result"); // 获取 result
if(result.containsKey("values")){
webSocketServer.sendMessage(sessionId, new MessageData<>().data(jsonObject));
}
}
});
}
//看板
@ -120,33 +118,44 @@ public class QueueSocketEventListener {
private final Gson gson = new Gson();
private final Type listType = new TypeToken<List<QueueMsgDTO>>() {}.getType();
private JsonObject getApiMap(Map<String, List<QueueMsgDTO>> lastAttrValMap, JsonObject jsonObject) {
// 使用 Gson 将第一个 JsonObject 转换为 JsonElement
JsonElement jsonElement = gson.toJsonTree(jsonObject);
// JsonElement 转换为第二个 JsonObject
JsonObject dataJson = jsonElement.getAsJsonObject();
JsonObject result = dataJson.get("result").getAsJsonObject();
JsonObject info = result.get("info").getAsJsonObject();
List<QueueMsgDTO> queueMsgDTOList = Lists.newArrayList();
for (String thingCode : info.keySet()) {
JsonObject object = info.get(thingCode).getAsJsonObject();
Set<String> thingAttrList = object.get("attrs").getAsJsonObject().keySet();
private JSONObject getApiMap(Map<String, List<QueueMsgDTO>> lastAttrValMap, JSONObject jsonObject) {
// 获取 "result" 对象
JSONObject result = jsonObject.getJSONObject("result"); // 获取 result
if (result == null) {
return null; // 如果 "result" null返回空 JSON 对象
}
JSONObject info = result.getJSONObject("info"); // 获取 info
if (info == null) {
return null; // 如果 "info" null返回空 JSON 对象
}
List<QueueMsgDTO> queueMsgDTOList = new ArrayList<>();
info.forEach((k,v)->{
ObjectMapper objectMapper = new ObjectMapper();
ObjectNode objectNode = objectMapper.createObjectNode();
String string = info.getString(k);
JSONObject jsonObject1 = JSONObject.parseObject(string);
String thingCode = jsonObject1.getString("entityCode");
boolean b = lastAttrValMap.containsKey(thingCode);
if(b){
JSONObject attrs = jsonObject1.getJSONObject("attrs");
List<QueueMsgDTO> queueMsgDTOS = lastAttrValMap.get(thingCode);
if(CollectionUtil.isNotEmpty(thingAttrList) && CollectionUtil.isNotEmpty(queueMsgDTOS)){
if (CollectionUtil.isNotEmpty(queueMsgDTOS)) {
List<QueueMsgDTO> collect = queueMsgDTOS.stream()
.filter(s -> thingAttrList.contains(s.getAttrKey()) && StringUtils.equals(thingCode, s.getThingCode()))
.collect(Collectors.toList());
.filter(s -> attrs.keySet().contains(s.getAttrKey()) && s.getThingCode().equals(thingCode))
.toList();
queueMsgDTOList.addAll(collect);
}
}
if(CollectionUtil.isEmpty(queueMsgDTOList)){
return new JsonObject();
});
if (queueMsgDTOList.isEmpty()) {
result.remove("values");
jsonObject.put("result",result);
return null; // 如果没有符合条件的 QueueMsgDTO返回空 JSON 对象
}
// 使用 TypeToken 获取 List<QueueMsgDTO> 的类型
// Type listType = new TypeToken<List<QueueMsgDTO>>() {}.getType();
// List<QueueMsgDTO> 转换为 JsonElement
JsonElement jsonElement1 = gson.toJsonTree(queueMsgDTOList, listType);
result.add("values",jsonElement1);
return dataJson;
result.put("values", queueMsgDTOList); // values 放入 result
result.put("info", info); // values 放入 result
jsonObject.put("result",result);
return jsonObject; // 返回修改后的 dataJson
}
}

45
modules/thing/src/main/java/com/thing/websocket/WebSocketServer.java

@ -2,14 +2,11 @@ package com.thing.websocket;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.thing.common.core.constants.Constant;
import com.thing.common.core.message.MessageData;
import com.thing.common.core.utils.SpringContextUtils;
@ -34,7 +31,6 @@ import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -52,13 +48,13 @@ public class WebSocketServer {
/**
* 客户端连接信息
*/
private static Map<String, WebSocketData> servers = new ConcurrentHashMap<>();
private static final Map<String, WebSocketData> servers = new ConcurrentHashMap<>();
public static final Map<String, List<Long>> TRANSPORT_EXTEND_MAP = new ConcurrentHashMap<>();
public static final Map<String, List<SocketDataCache>> DASHBOARD_MAP = new ConcurrentHashMap<>();
public static final Map<String, JsonObject> API_SOCKET_MAP = new ConcurrentHashMap<>();
public static final Map<String, JSONObject> API_SOCKET_MAP = new ConcurrentHashMap<>();
private static final ByteBuffer PING_MSG = ByteBuffer.wrap(new byte[]{});
@ -94,7 +90,6 @@ public class WebSocketServer {
iotThingApiService = SpringContextUtils.getBean(IotThingApiService.class);
//加载节点配置
loadApiList(session, apiId);
}
//看板全局变量存储
@ -252,25 +247,15 @@ public class WebSocketServer {
if (MapUtil.isNotEmpty(apiMap)) {
try {
String jsonString = objectMapper.writeValueAsString(apiMap);
JsonNode jsonNode = objectMapper.readTree(jsonString);
convertValuesToString(jsonNode);
JsonObject jsonObject = JsonParser.parseString(jsonNode.toString()).getAsJsonObject();
JSONObject jsonObject1 = JSONObject.parseObject(jsonString);
//推送一次最新数据
sendMessage(session,new MessageData<>().data(jsonObject));
sendMessage(session,new MessageData<>().data(jsonObject1));
//存储一个超级API需要哪些物和相应的属性
API_SOCKET_MAP.put(session.getId(),jsonObject1);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
Map result = MapUtil.get(apiMap, "result", Map.class);
result.remove("values");
apiMap.put("result",result);
// 创建一个 Gson 对象
Gson gson = new Gson();
// Map<String, Object> 转换为 JsonObject
JsonObject jsonObject = gson.toJsonTree(apiMap).getAsJsonObject();
//存储一个超级API需要哪些物和相应的属性
API_SOCKET_MAP.put(session.getId(),jsonObject);
}
private void loadTransportExtendList(String sessionId, String transportExtendId) {
@ -279,20 +264,4 @@ public class WebSocketServer {
TRANSPORT_EXTEND_MAP.put(sessionId, collect);
}
public static void convertValuesToString(JsonNode jsonNode) {
if (jsonNode.isObject()) {
ObjectNode objectNode = (ObjectNode) jsonNode;
Iterator<Map.Entry<String, JsonNode>> iterator = objectNode.fields();
while (iterator.hasNext()) {
Map.Entry<String, JsonNode> entry = iterator.next();
convertValuesToString(entry.getValue());
if (entry.getValue().isValueNode()) {
objectNode.put(entry.getKey(), entry.getValue().asText());
}
}
} else if (jsonNode.isArray()) {
jsonNode.forEach(element -> convertValuesToString(element));
}
}
}
Loading…
Cancel
Save