From b2ce3a35c7e2d54ad767b3418b71f894cb92ad61 Mon Sep 17 00:00:00 2001 From: lishuai Date: Sun, 29 Sep 2024 13:28:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9C=A8=E7=BA=BF=E7=A6=BB=E7=BA=BF=E5=AE=9A?= =?UTF-8?q?=E6=97=B6=E4=BB=BB=E5=8A=A1=202024=E5=B9=B49=E6=9C=8829?= =?UTF-8?q?=E6=97=A513:28:19?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../quartz/timetask/task/ThingStatusTask.java | 104 ++++++++++-------- 1 file changed, 59 insertions(+), 45 deletions(-) diff --git a/modules/quartz/src/main/java/com/thing/quartz/timetask/task/ThingStatusTask.java b/modules/quartz/src/main/java/com/thing/quartz/timetask/task/ThingStatusTask.java index bc8e54a..ee5b515 100644 --- a/modules/quartz/src/main/java/com/thing/quartz/timetask/task/ThingStatusTask.java +++ b/modules/quartz/src/main/java/com/thing/quartz/timetask/task/ThingStatusTask.java @@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.thing.common.cache.constants.CacheNameEnum; import com.thing.common.core.enumeration.GateWayStatus; import com.thing.common.core.enumeration.ThingStatus; -import com.thing.common.core.utils.ConvertUtils; import com.thing.common.core.utils.JacksonUtil; import com.thing.common.data.tskv.TsKvDTO; import com.thing.common.tskv.service.TsKvService; @@ -19,10 +18,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; -import java.util.Comparator; -import java.util.Date; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.stream.Collectors; import static com.thing.thing.cache.service.CacheInit.KEY; @@ -63,49 +59,67 @@ public class ThingStatusTask implements ITask { } //获取最新值 List codes = optionalList.get().stream().map(s -> s.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText()).toList(); - List lastTsKvList = tsKvService.findLatestByCodesAndAttrs(codes, null, true); - //处理 - List statusList = optionalList.get().stream() - .map(item -> { - // IotThingModelDTO iotThingModelDTO = JacksonUtil.convertValue(item, IotThingModelDTO.class); - //找到最大时间的属性 - Optional optionalMax = lastTsKvList.stream() - .filter(tsKvDTO -> StringUtils.equals(tsKvDTO.getThingCode(),item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText())) - .max(Comparator.comparing(TsKvDTO::getTs)); - if(optionalMax.isEmpty()){ - item.put(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField(),ThingStatus.OFFLINE.getCode()); - }else{ - TsKvDTO tsKvDTO = optionalMax.get(); - //根据当前时间和获取的最新时间 若大于45min 则离线 否则为在线 - boolean isOffline = System.currentTimeMillis() - tsKvDTO.getTs() > TIME_INTERVAL; - item.put(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField(),isOffline ? ThingStatus.OFFLINE.getCode() : ThingStatus.ONLINE.getCode()); - item.put(CacheNameEnum.ModelField.THING_MODEL_STATUS_TS.getField(),new Date(tsKvDTO.getTs()).getTime()); - //更新实体设备状态:这里需要精确匹配 - List entityNodes = cache.findMapAccurateKeys(CacheNameEnum.THING_ENTITY, item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText()); - - if(CollectionUtils.isNotEmpty(entityNodes)){ - for (ObjectNode entityNode : entityNodes) { + if(CollectionUtils.isEmpty(codes)){ + return; + } + List updateModelList = new ArrayList<>(); + for (ObjectNode objectNode : optionalList.get()) { + String modelCode = objectNode.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText(); + //获取最新值 + List lastTsKvList = tsKvService.findLatestByCodeAndAttrs(modelCode, null, true); + if(CollectionUtils.isEmpty(lastTsKvList)){ + //若是没有值 则离线 + objectNode.put(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField(),ThingStatus.OFFLINE.getCode()); + //更新物模型缓存 + cache.updateAccurateKeyMap(CacheNameEnum.THING_MODEL, objectNode.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText() + + KEY + objectNode.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(), objectNode); + //更新物实体在线离线状态 + List entityNodeList = cache.findMapAccurateKey(CacheNameEnum.THING_ENTITY, modelCode); + if(CollectionUtils.isNotEmpty(entityNodeList)){ + for (ObjectNode entityNode : entityNodeList) { + //若是没有值 则离线 + entityNode.put(CacheNameEnum.EntityField.THING_ENTITY_STATUS.getField(),ThingStatus.OFFLINE.getCode()); + cache.updateAccurateKeyMap(CacheNameEnum.THING_ENTITY + ,entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_TENANT_CODE.getField()).asText() + + KEY + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_CODE.getField()).asText() + + KEY + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_ID.getField()).asText(),entityNode); + } + } + updateModelList.add(objectNode); + }else{ + //找到最大时间的属性 + Optional optionalMax = lastTsKvList.stream() + .filter(tsKvDTO -> StringUtils.equals(tsKvDTO.getThingCode(),modelCode)) + .max(Comparator.comparing(TsKvDTO::getTs)); + //根据当前时间和获取的最新时间 若大于45min 则离线 否则为在线 + boolean isOffline = System.currentTimeMillis() - optionalMax.get().getTs() > TIME_INTERVAL; + objectNode.put(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField(),isOffline ? ThingStatus.OFFLINE.getCode() : ThingStatus.ONLINE.getCode()); + objectNode.put(CacheNameEnum.ModelField.THING_MODEL_STATUS_TS.getField(),new Date(optionalMax.get().getTs()).getTime()); + //更新物模型缓存 + cache.updateAccurateKeyMap(CacheNameEnum.THING_MODEL, objectNode.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText() + + KEY + objectNode.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(), objectNode); + updateModelList.add(objectNode); + //更新实体设备状态:这里需要精确匹配 + List entityNodes = cache.findMapAccurateKeys(CacheNameEnum.THING_ENTITY, modelCode); + if(CollectionUtils.isNotEmpty(entityNodes)){ + for (ObjectNode entityNode : entityNodes) { + entityNode.put(CacheNameEnum.EntityField.THING_ENTITY_STATUS.getField() + ,isOffline ? ThingStatus.OFFLINE.getCode() : ThingStatus.ONLINE.getCode()); + entityNode.put(CacheNameEnum.ModelField.THING_MODEL_STATUS_TS.getField(),new Date(optionalMax.get().getTs()).getTime()); + cache.updateAccurateKeyMap(CacheNameEnum.THING_ENTITY + ,entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_TENANT_CODE.getField()).asText() + + KEY + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_CODE.getField()).asText() + + KEY + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_ID.getField()).asText(),entityNode); + } + } + } + } - entityNode.put(CacheNameEnum.EntityField.THING_ENTITY_STATUS.getField() - ,isOffline ? ThingStatus.OFFLINE.getCode() : ThingStatus.ONLINE.getCode()); - cache.updateAccurateKeyMap(CacheNameEnum.THING_ENTITY - ,entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_TENANT_CODE.getField()).asText() - + KEY + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_CODE.getField()).asText() - + KEY + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_ID.getField()).asText(),entityNode); - } - } - } - //更新物模型 - IotThingModelDTO iotThingModelDTO = ConvertUtils.sourceToTarget(item, IotThingModelDTO.class); - cache.updateAccurateKeyEntity(CacheNameEnum.THING_MODEL - , item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText() - + KEY + item.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(), iotThingModelDTO); - return JacksonUtil.convertValue(item, IotThingModelEntity.class); - }).collect(Collectors.toList()); - if(CollectionUtils.isNotEmpty(statusList)){ + if(CollectionUtils.isNotEmpty(updateModelList)){ + List list = updateModelList.stream().map(item -> JacksonUtil.convertValue(item, IotThingModelEntity.class)).toList(); //修改状态 - modelService.saveOrUpdateBatch(statusList); + modelService.saveOrUpdateBatch(list); } log.info("设备在线离线 statusTask end"); }