|
|
|
@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; |
|
|
|
import com.thing.common.cache.constants.CacheNameEnum; |
|
|
|
import com.thing.common.core.enumeration.GateWayStatus; |
|
|
|
import com.thing.common.core.enumeration.ThingStatus; |
|
|
|
import com.thing.common.core.utils.ConvertUtils; |
|
|
|
import com.thing.common.core.utils.JacksonUtil; |
|
|
|
import com.thing.common.data.tskv.TsKvDTO; |
|
|
|
import com.thing.common.tskv.service.TsKvService; |
|
|
|
@ -19,10 +18,7 @@ import org.apache.commons.collections4.CollectionUtils; |
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
|
import org.springframework.stereotype.Component; |
|
|
|
|
|
|
|
import java.util.Comparator; |
|
|
|
import java.util.Date; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Optional; |
|
|
|
import java.util.*; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
import static com.thing.thing.cache.service.CacheInit.KEY; |
|
|
|
@ -63,49 +59,67 @@ public class ThingStatusTask implements ITask { |
|
|
|
} |
|
|
|
//获取最新值 |
|
|
|
List<String> codes = optionalList.get().stream().map(s -> s.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText()).toList(); |
|
|
|
List<TsKvDTO> lastTsKvList = tsKvService.findLatestByCodesAndAttrs(codes, null, true); |
|
|
|
//处理 |
|
|
|
List<IotThingModelEntity> statusList = optionalList.get().stream() |
|
|
|
.map(item -> { |
|
|
|
// IotThingModelDTO iotThingModelDTO = JacksonUtil.convertValue(item, IotThingModelDTO.class); |
|
|
|
//找到最大时间的属性 |
|
|
|
Optional<TsKvDTO> optionalMax = lastTsKvList.stream() |
|
|
|
.filter(tsKvDTO -> StringUtils.equals(tsKvDTO.getThingCode(),item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText())) |
|
|
|
.max(Comparator.comparing(TsKvDTO::getTs)); |
|
|
|
if(optionalMax.isEmpty()){ |
|
|
|
item.put(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField(),ThingStatus.OFFLINE.getCode()); |
|
|
|
}else{ |
|
|
|
TsKvDTO tsKvDTO = optionalMax.get(); |
|
|
|
//根据当前时间和获取的最新时间 若大于45min 则离线 否则为在线 |
|
|
|
boolean isOffline = System.currentTimeMillis() - tsKvDTO.getTs() > TIME_INTERVAL; |
|
|
|
item.put(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField(),isOffline ? ThingStatus.OFFLINE.getCode() : ThingStatus.ONLINE.getCode()); |
|
|
|
item.put(CacheNameEnum.ModelField.THING_MODEL_STATUS_TS.getField(),new Date(tsKvDTO.getTs()).getTime()); |
|
|
|
//更新实体设备状态:这里需要精确匹配 |
|
|
|
List<ObjectNode> entityNodes = cache.findMapAccurateKeys(CacheNameEnum.THING_ENTITY, item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText()); |
|
|
|
|
|
|
|
if(CollectionUtils.isNotEmpty(entityNodes)){ |
|
|
|
for (ObjectNode entityNode : entityNodes) { |
|
|
|
if(CollectionUtils.isEmpty(codes)){ |
|
|
|
return; |
|
|
|
} |
|
|
|
List<ObjectNode> updateModelList = new ArrayList<>(); |
|
|
|
for (ObjectNode objectNode : optionalList.get()) { |
|
|
|
String modelCode = objectNode.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText(); |
|
|
|
//获取最新值 |
|
|
|
List<TsKvDTO> lastTsKvList = tsKvService.findLatestByCodeAndAttrs(modelCode, null, true); |
|
|
|
if(CollectionUtils.isEmpty(lastTsKvList)){ |
|
|
|
//若是没有值 则离线 |
|
|
|
objectNode.put(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField(),ThingStatus.OFFLINE.getCode()); |
|
|
|
//更新物模型缓存 |
|
|
|
cache.updateAccurateKeyMap(CacheNameEnum.THING_MODEL, objectNode.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText() |
|
|
|
+ KEY + objectNode.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(), objectNode); |
|
|
|
//更新物实体在线离线状态 |
|
|
|
List<ObjectNode> entityNodeList = cache.findMapAccurateKey(CacheNameEnum.THING_ENTITY, modelCode); |
|
|
|
if(CollectionUtils.isNotEmpty(entityNodeList)){ |
|
|
|
for (ObjectNode entityNode : entityNodeList) { |
|
|
|
//若是没有值 则离线 |
|
|
|
entityNode.put(CacheNameEnum.EntityField.THING_ENTITY_STATUS.getField(),ThingStatus.OFFLINE.getCode()); |
|
|
|
cache.updateAccurateKeyMap(CacheNameEnum.THING_ENTITY |
|
|
|
,entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_TENANT_CODE.getField()).asText() |
|
|
|
+ KEY + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_CODE.getField()).asText() |
|
|
|
+ KEY + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_ID.getField()).asText(),entityNode); |
|
|
|
} |
|
|
|
} |
|
|
|
updateModelList.add(objectNode); |
|
|
|
}else{ |
|
|
|
//找到最大时间的属性 |
|
|
|
Optional<TsKvDTO> optionalMax = lastTsKvList.stream() |
|
|
|
.filter(tsKvDTO -> StringUtils.equals(tsKvDTO.getThingCode(),modelCode)) |
|
|
|
.max(Comparator.comparing(TsKvDTO::getTs)); |
|
|
|
//根据当前时间和获取的最新时间 若大于45min 则离线 否则为在线 |
|
|
|
boolean isOffline = System.currentTimeMillis() - optionalMax.get().getTs() > TIME_INTERVAL; |
|
|
|
objectNode.put(CacheNameEnum.ModelField.THING_MODEL_STATUS.getField(),isOffline ? ThingStatus.OFFLINE.getCode() : ThingStatus.ONLINE.getCode()); |
|
|
|
objectNode.put(CacheNameEnum.ModelField.THING_MODEL_STATUS_TS.getField(),new Date(optionalMax.get().getTs()).getTime()); |
|
|
|
//更新物模型缓存 |
|
|
|
cache.updateAccurateKeyMap(CacheNameEnum.THING_MODEL, objectNode.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText() |
|
|
|
+ KEY + objectNode.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(), objectNode); |
|
|
|
updateModelList.add(objectNode); |
|
|
|
//更新实体设备状态:这里需要精确匹配 |
|
|
|
List<ObjectNode> entityNodes = cache.findMapAccurateKeys(CacheNameEnum.THING_ENTITY, modelCode); |
|
|
|
if(CollectionUtils.isNotEmpty(entityNodes)){ |
|
|
|
for (ObjectNode entityNode : entityNodes) { |
|
|
|
entityNode.put(CacheNameEnum.EntityField.THING_ENTITY_STATUS.getField() |
|
|
|
,isOffline ? ThingStatus.OFFLINE.getCode() : ThingStatus.ONLINE.getCode()); |
|
|
|
entityNode.put(CacheNameEnum.ModelField.THING_MODEL_STATUS_TS.getField(),new Date(optionalMax.get().getTs()).getTime()); |
|
|
|
cache.updateAccurateKeyMap(CacheNameEnum.THING_ENTITY |
|
|
|
,entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_TENANT_CODE.getField()).asText() |
|
|
|
+ KEY + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_CODE.getField()).asText() |
|
|
|
+ KEY + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_ID.getField()).asText(),entityNode); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
entityNode.put(CacheNameEnum.EntityField.THING_ENTITY_STATUS.getField() |
|
|
|
,isOffline ? ThingStatus.OFFLINE.getCode() : ThingStatus.ONLINE.getCode()); |
|
|
|
|
|
|
|
cache.updateAccurateKeyMap(CacheNameEnum.THING_ENTITY |
|
|
|
,entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_TENANT_CODE.getField()).asText() |
|
|
|
+ KEY + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_CODE.getField()).asText() |
|
|
|
+ KEY + entityNode.get(CacheNameEnum.EntityField.THING_ENTITY_ID.getField()).asText(),entityNode); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
//更新物模型 |
|
|
|
IotThingModelDTO iotThingModelDTO = ConvertUtils.sourceToTarget(item, IotThingModelDTO.class); |
|
|
|
cache.updateAccurateKeyEntity(CacheNameEnum.THING_MODEL |
|
|
|
, item.get(CacheNameEnum.ModelField.THING_MODEL_CODE.getField()).asText() |
|
|
|
+ KEY + item.get(CacheNameEnum.ModelField.THING_MODEL_ID.getField()).asText(), iotThingModelDTO); |
|
|
|
return JacksonUtil.convertValue(item, IotThingModelEntity.class); |
|
|
|
}).collect(Collectors.toList()); |
|
|
|
if(CollectionUtils.isNotEmpty(statusList)){ |
|
|
|
if(CollectionUtils.isNotEmpty(updateModelList)){ |
|
|
|
List<IotThingModelEntity> list = updateModelList.stream().map(item -> JacksonUtil.convertValue(item, IotThingModelEntity.class)).toList(); |
|
|
|
//修改状态 |
|
|
|
modelService.saveOrUpdateBatch(statusList); |
|
|
|
modelService.saveOrUpdateBatch(list); |
|
|
|
} |
|
|
|
log.info("设备在线离线 statusTask end"); |
|
|
|
} |
|
|
|
|