diff --git a/common/tskv/src/main/java/com/thing/common/tskv/event/TsKvEventServiceImpl.java b/common/tskv/src/main/java/com/thing/common/tskv/event/TsKvEventServiceImpl.java index beb00eb..afe63d3 100644 --- a/common/tskv/src/main/java/com/thing/common/tskv/event/TsKvEventServiceImpl.java +++ b/common/tskv/src/main/java/com/thing/common/tskv/event/TsKvEventServiceImpl.java @@ -67,7 +67,7 @@ public class TsKvEventServiceImpl implements TsKvEventService { protoList.forEach(dataProto -> { String val = dataProto.getTskvProto().getVal(); //递增属性过滤 并且数值大于0的将统计计算 - if (setKeys.contains(dataProto.getTskvProto().getKey()) && NumberUtils.isCreatable(val) && new BigDecimal(val).compareTo(BigDecimal.ZERO) > 0) { + if (setKeys.contains(dataProto.getTskvProto().getKey()) && NumberUtils.isCreatable(val) /*&& new BigDecimal(val).compareTo(BigDecimal.ZERO) > 0*/) { dataProtos.add(dataProto); // am 属性过滤 } else if (amKeys.contains(dataProto.getTskvProto().getKey())) { diff --git a/modules/quartz/src/main/java/com/thing/quartz/calc/task/CalcTask.java b/modules/quartz/src/main/java/com/thing/quartz/calc/task/CalcTask.java index 46c76e8..501003b 100644 --- a/modules/quartz/src/main/java/com/thing/quartz/calc/task/CalcTask.java +++ b/modules/quartz/src/main/java/com/thing/quartz/calc/task/CalcTask.java @@ -3,25 +3,31 @@ package com.thing.quartz.calc.task; import cn.hutool.core.collection.CollectionUtil; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Lists; +import com.thing.calculation.dto.CalcSourceConfigDTO; import com.thing.calculation.dto.CalcTargetConfigDTO; import com.thing.calculation.dto.ExecuteCalcRequest; -import com.thing.calculation.dto.ReCalcRequest; -import com.thing.calculation.handler.CalcExecuteHandler; +import com.thing.calculation.enumeration.CalcConfigType; import com.thing.calculation.service.CalcTargetConfigService; import com.thing.common.core.utils.DateTimeUtils; +import com.thing.common.core.utils.FormulaUtil; +import com.thing.common.data.event.QueueConsumerEvent; +import com.thing.common.data.proto.QueueProto; +import com.thing.common.data.tskv.TsKvDTO; +import com.thing.common.tskv.service.TsKvService; import com.thing.quartz.timetask.task.ITask; - +import com.thing.queue.util.Topics; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; - import org.apache.commons.lang3.StringUtils; +import org.jetbrains.annotations.NotNull; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Component; +import java.math.BigDecimal; +import java.math.RoundingMode; import java.time.LocalDateTime; -import java.util.List; -import java.util.Objects; - -import static com.thing.common.core.utils.DateTimeUtils.DATE_TIME_PATTERN; +import java.util.*; +import java.util.stream.Collectors; /** * @author siyang @@ -33,11 +39,13 @@ import static com.thing.common.core.utils.DateTimeUtils.DATE_TIME_PATTERN; @Component("CalcTask") public class CalcTask implements ITask { - private final CalcExecuteHandler calcExecuteHandler; private final CalcTargetConfigService calcTargetConfigService; + private final TsKvService tsKvService; + private final ApplicationEventPublisher publisher; @Override public void run(String params) { + log.info("CalcTask start params:{}", params); ExecuteCalcRequest request; if (StringUtils.isBlank(params)) { request = new ExecuteCalcRequest(); @@ -48,25 +56,114 @@ public class CalcTask implements ITask { if(CollectionUtil.isEmpty(targetConfigs)){ return; } - String dayBeginTime; - String dayEndTime; - if(!Objects.isNull(request) && !Objects.isNull(request.getStartTime())&& !Objects.isNull(request.getEndTime())){ - dayBeginTime = request.getStartTime(); - dayEndTime = request.getEndTime(); - }else{ - LocalDateTime hourStart = DateTimeUtils.getHourStart(LocalDateTime.now()); - LocalDateTime hourEnd = DateTimeUtils.getHourEnd(LocalDateTime.now()); - dayBeginTime = hourStart.minusHours(1).format(DATE_TIME_PATTERN); - dayEndTime = hourEnd.format(DATE_TIME_PATTERN); + Long startTime = DateTimeUtils.convertTimeToLong(request.getStartTime()); + Long endTime = DateTimeUtils.convertTimeToLong(request.getEndTime()); + + //开始====================================================== + List protoList = new ArrayList<>(); + for (CalcTargetConfigDTO config : targetConfigs) { + List sourceConfigs = config.getSourceConfigs(); + //转换入参 + Map> multiMap = convertParam(sourceConfigs); + //查询计算目标的最新值 + TsKvDTO targetLastet = tsKvService.findLatestByCodeAndAttr(config.getTargetThingCode(), config.getTargetAttrCode()); + //若是最新值是空的,查询只算七天前的时间全部的source tskv值 + if(Objects.isNull(targetLastet)){ + //只算七天前的时间 + if(Objects.isNull(startTime) || Objects.isNull(endTime)){ + startTime = DateTimeUtils.parse(LocalDateTime.now().minusDays(7)); + endTime = DateTimeUtils.parse(LocalDateTime.now()); + } + List tsKvByMultiMap = tsKvService.findTsKvByMultiMap(multiMap, startTime, endTime, true); + if(CollectionUtil.isEmpty(tsKvByMultiMap)){ + //若是source tskv值是空的,则跳过 + continue; + } + getCalResult(config, tsKvByMultiMap, protoList, sourceConfigs); + }else{ + if(Objects.isNull(startTime) || Objects.isNull(endTime)){ + startTime = targetLastet.getTs(); + endTime = System.currentTimeMillis(); + } + List tsKvByMultiMap = tsKvService.findTsKvByMultiMap(multiMap, startTime, endTime, true); + if(CollectionUtil.isEmpty(tsKvByMultiMap)){ + //若是source tskv值是空的,则跳过 + continue; + } + getCalResult(config, tsKvByMultiMap, protoList, sourceConfigs); + } + } + if(CollectionUtil.isEmpty(protoList)){ + return; + } + List list = protoList.stream().sorted(Comparator.comparing(s -> s.getTskvProto().getTs())).toList(); + // 批量推送 + publisher.publishEvent( + new QueueConsumerEvent(Topics.V1_TSKV_CALC_LOG_CAL.getValue(), list)); + log.info("CalcTask end params:{}", params); + //结束====================================================== + + // calcExecuteHandler.handleCalculate(request,null); + } + + private static void getCalResult(CalcTargetConfigDTO config, List tsKvByMultiMap, List protoList, List sourceConfigs) { + //若有source tskv值,则计算结果 + Map> tsMap = tsKvByMultiMap.stream().collect(Collectors.groupingBy(TsKvDTO::getTs)); + for (Map.Entry> entry : tsMap.entrySet()) { + if(config.getConfigType().equals(CalcConfigType.DATA_MAPPING.getCode()) || config.getConfigType().equals(CalcConfigType.AGG_MAPPING.getCode())){ + //直接计算结果 + BigDecimal totalVal = + entry.getValue().stream() + .map(s-> new BigDecimal(s.getVal())) + .reduce(BigDecimal.ZERO, BigDecimal::add).setScale(4, RoundingMode.HALF_UP); + QueueProto.TsKvProto tsKvProto = + QueueProto.TsKvProto.newBuilder() + .setThingCode(config.getTargetThingCode()) + .setKey(config.getTargetAttrCode()) + .setTs(entry.getKey()) + .setVal(totalVal.toPlainString()) + .build(); + protoList.add( + QueueProto.DataProto.newBuilder().setTskvProto(tsKvProto).build()); + }else{ + //公式计算 + Map variableMap = new HashMap<>(); + for (CalcSourceConfigDTO sourceConfig : sourceConfigs) { + TsKvDTO tsKvDTO = entry.getValue().stream().filter(s -> s.getThingCode().equals(sourceConfig.getSourceThingCode()) && s.getAttrKey().equals(sourceConfig.getSourceAttrCode())).findFirst().orElse(null); + if(!Objects.isNull(tsKvDTO)){ + variableMap.put(sourceConfig.getSourceAttrAlias(), new BigDecimal(tsKvDTO.getVal())); + }else{ + variableMap.put(sourceConfig.getSourceAttrAlias(), BigDecimal.ZERO); + } + } + Object result = FormulaUtil.executeFormula(config.getFormula(), variableMap); + QueueProto.TsKvProto tsKvProto = + QueueProto.TsKvProto.newBuilder() + .setThingCode(config.getTargetThingCode()) + .setKey(config.getTargetAttrCode()) + .setTs(entry.getKey()) + .setVal(result.toString()) + .build(); + protoList.add( + QueueProto.DataProto.newBuilder().setTskvProto(tsKvProto).build()); + } } - for (CalcTargetConfigDTO targetConfig : targetConfigs) { - ReCalcRequest request1 = new ReCalcRequest(); - request1.setCalcTargetConfigIds(Lists.newArrayList(targetConfig.getId())); - request1.setStartTime(dayBeginTime); - request1.setEndTime(dayEndTime); - calcTargetConfigService.reCalculate(request1); + } + + @NotNull + private static Map> convertParam(List sourceConfigs) { + //将sourceConfigs 转换入参 + Map> multiMap = new HashMap<>(); + for (CalcSourceConfigDTO sourceConfig : sourceConfigs) { + if(multiMap.containsKey(sourceConfig.getSourceThingCode())){ + Collection list = multiMap.get(sourceConfig.getSourceThingCode()); + list.add(sourceConfig.getSourceAttrCode()); + multiMap.put(sourceConfig.getSourceThingCode(), list); + }else{ + multiMap.put(sourceConfig.getSourceThingCode(), Lists.newArrayList(sourceConfig.getSourceAttrCode())); + } } - // calcExecuteHandler.handleCalculate(request,null); + return multiMap; } }