|
|
@ -3,25 +3,31 @@ package com.thing.quartz.calc.task; |
|
|
import cn.hutool.core.collection.CollectionUtil; |
|
|
import cn.hutool.core.collection.CollectionUtil; |
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
import com.google.common.collect.Lists; |
|
|
import com.google.common.collect.Lists; |
|
|
|
|
|
import com.thing.calculation.dto.CalcSourceConfigDTO; |
|
|
import com.thing.calculation.dto.CalcTargetConfigDTO; |
|
|
import com.thing.calculation.dto.CalcTargetConfigDTO; |
|
|
import com.thing.calculation.dto.ExecuteCalcRequest; |
|
|
import com.thing.calculation.dto.ExecuteCalcRequest; |
|
|
import com.thing.calculation.dto.ReCalcRequest; |
|
|
|
|
|
import com.thing.calculation.handler.CalcExecuteHandler; |
|
|
|
|
|
|
|
|
import com.thing.calculation.enumeration.CalcConfigType; |
|
|
import com.thing.calculation.service.CalcTargetConfigService; |
|
|
import com.thing.calculation.service.CalcTargetConfigService; |
|
|
import com.thing.common.core.utils.DateTimeUtils; |
|
|
import com.thing.common.core.utils.DateTimeUtils; |
|
|
|
|
|
import com.thing.common.core.utils.FormulaUtil; |
|
|
|
|
|
import com.thing.common.data.event.QueueConsumerEvent; |
|
|
|
|
|
import com.thing.common.data.proto.QueueProto; |
|
|
|
|
|
import com.thing.common.data.tskv.TsKvDTO; |
|
|
|
|
|
import com.thing.common.tskv.service.TsKvService; |
|
|
import com.thing.quartz.timetask.task.ITask; |
|
|
import com.thing.quartz.timetask.task.ITask; |
|
|
|
|
|
|
|
|
|
|
|
import com.thing.queue.util.Topics; |
|
|
import lombok.RequiredArgsConstructor; |
|
|
import lombok.RequiredArgsConstructor; |
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
|
|
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
|
|
|
import org.jetbrains.annotations.NotNull; |
|
|
|
|
|
import org.springframework.context.ApplicationEventPublisher; |
|
|
import org.springframework.stereotype.Component; |
|
|
import org.springframework.stereotype.Component; |
|
|
|
|
|
|
|
|
|
|
|
import java.math.BigDecimal; |
|
|
|
|
|
import java.math.RoundingMode; |
|
|
import java.time.LocalDateTime; |
|
|
import java.time.LocalDateTime; |
|
|
import java.util.List; |
|
|
|
|
|
import java.util.Objects; |
|
|
|
|
|
|
|
|
|
|
|
import static com.thing.common.core.utils.DateTimeUtils.DATE_TIME_PATTERN; |
|
|
|
|
|
|
|
|
import java.util.*; |
|
|
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
|
/** |
|
|
/** |
|
|
* @author siyang |
|
|
* @author siyang |
|
|
@ -33,11 +39,13 @@ import static com.thing.common.core.utils.DateTimeUtils.DATE_TIME_PATTERN; |
|
|
@Component("CalcTask") |
|
|
@Component("CalcTask") |
|
|
public class CalcTask implements ITask { |
|
|
public class CalcTask implements ITask { |
|
|
|
|
|
|
|
|
private final CalcExecuteHandler calcExecuteHandler; |
|
|
|
|
|
private final CalcTargetConfigService calcTargetConfigService; |
|
|
private final CalcTargetConfigService calcTargetConfigService; |
|
|
|
|
|
private final TsKvService tsKvService; |
|
|
|
|
|
private final ApplicationEventPublisher publisher; |
|
|
|
|
|
|
|
|
@Override |
|
|
@Override |
|
|
public void run(String params) { |
|
|
public void run(String params) { |
|
|
|
|
|
log.info("CalcTask start params:{}", params); |
|
|
ExecuteCalcRequest request; |
|
|
ExecuteCalcRequest request; |
|
|
if (StringUtils.isBlank(params)) { |
|
|
if (StringUtils.isBlank(params)) { |
|
|
request = new ExecuteCalcRequest(); |
|
|
request = new ExecuteCalcRequest(); |
|
|
@ -48,25 +56,114 @@ public class CalcTask implements ITask { |
|
|
if(CollectionUtil.isEmpty(targetConfigs)){ |
|
|
if(CollectionUtil.isEmpty(targetConfigs)){ |
|
|
return; |
|
|
return; |
|
|
} |
|
|
} |
|
|
String dayBeginTime; |
|
|
|
|
|
String dayEndTime; |
|
|
|
|
|
if(!Objects.isNull(request) && !Objects.isNull(request.getStartTime())&& !Objects.isNull(request.getEndTime())){ |
|
|
|
|
|
dayBeginTime = request.getStartTime(); |
|
|
|
|
|
dayEndTime = request.getEndTime(); |
|
|
|
|
|
}else{ |
|
|
|
|
|
LocalDateTime hourStart = DateTimeUtils.getHourStart(LocalDateTime.now()); |
|
|
|
|
|
LocalDateTime hourEnd = DateTimeUtils.getHourEnd(LocalDateTime.now()); |
|
|
|
|
|
dayBeginTime = hourStart.minusHours(1).format(DATE_TIME_PATTERN); |
|
|
|
|
|
dayEndTime = hourEnd.format(DATE_TIME_PATTERN); |
|
|
|
|
|
|
|
|
Long startTime = DateTimeUtils.convertTimeToLong(request.getStartTime()); |
|
|
|
|
|
Long endTime = DateTimeUtils.convertTimeToLong(request.getEndTime()); |
|
|
|
|
|
|
|
|
|
|
|
//开始====================================================== |
|
|
|
|
|
List<QueueProto.DataProto> protoList = new ArrayList<>(); |
|
|
|
|
|
for (CalcTargetConfigDTO config : targetConfigs) { |
|
|
|
|
|
List<CalcSourceConfigDTO> sourceConfigs = config.getSourceConfigs(); |
|
|
|
|
|
//转换入参 |
|
|
|
|
|
Map<String, Collection<String>> multiMap = convertParam(sourceConfigs); |
|
|
|
|
|
//查询计算目标的最新值 |
|
|
|
|
|
TsKvDTO targetLastet = tsKvService.findLatestByCodeAndAttr(config.getTargetThingCode(), config.getTargetAttrCode()); |
|
|
|
|
|
//若是最新值是空的,查询只算七天前的时间全部的source tskv值 |
|
|
|
|
|
if(Objects.isNull(targetLastet)){ |
|
|
|
|
|
//只算七天前的时间 |
|
|
|
|
|
if(Objects.isNull(startTime) || Objects.isNull(endTime)){ |
|
|
|
|
|
startTime = DateTimeUtils.parse(LocalDateTime.now().minusDays(7)); |
|
|
|
|
|
endTime = DateTimeUtils.parse(LocalDateTime.now()); |
|
|
|
|
|
} |
|
|
|
|
|
List<TsKvDTO> tsKvByMultiMap = tsKvService.findTsKvByMultiMap(multiMap, startTime, endTime, true); |
|
|
|
|
|
if(CollectionUtil.isEmpty(tsKvByMultiMap)){ |
|
|
|
|
|
//若是source tskv值是空的,则跳过 |
|
|
|
|
|
continue; |
|
|
|
|
|
} |
|
|
|
|
|
getCalResult(config, tsKvByMultiMap, protoList, sourceConfigs); |
|
|
|
|
|
}else{ |
|
|
|
|
|
if(Objects.isNull(startTime) || Objects.isNull(endTime)){ |
|
|
|
|
|
startTime = targetLastet.getTs(); |
|
|
|
|
|
endTime = System.currentTimeMillis(); |
|
|
|
|
|
} |
|
|
|
|
|
List<TsKvDTO> tsKvByMultiMap = tsKvService.findTsKvByMultiMap(multiMap, startTime, endTime, true); |
|
|
|
|
|
if(CollectionUtil.isEmpty(tsKvByMultiMap)){ |
|
|
|
|
|
//若是source tskv值是空的,则跳过 |
|
|
|
|
|
continue; |
|
|
|
|
|
} |
|
|
|
|
|
getCalResult(config, tsKvByMultiMap, protoList, sourceConfigs); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
if(CollectionUtil.isEmpty(protoList)){ |
|
|
|
|
|
return; |
|
|
|
|
|
} |
|
|
|
|
|
List<QueueProto.DataProto> list = protoList.stream().sorted(Comparator.comparing(s -> s.getTskvProto().getTs())).toList(); |
|
|
|
|
|
// 批量推送 |
|
|
|
|
|
publisher.publishEvent( |
|
|
|
|
|
new QueueConsumerEvent(Topics.V1_TSKV_CALC_LOG_CAL.getValue(), list)); |
|
|
|
|
|
log.info("CalcTask end params:{}", params); |
|
|
|
|
|
//结束====================================================== |
|
|
|
|
|
|
|
|
|
|
|
// calcExecuteHandler.handleCalculate(request,null); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private static void getCalResult(CalcTargetConfigDTO config, List<TsKvDTO> tsKvByMultiMap, List<QueueProto.DataProto> protoList, List<CalcSourceConfigDTO> sourceConfigs) { |
|
|
|
|
|
//若有source tskv值,则计算结果 |
|
|
|
|
|
Map<Long, List<TsKvDTO>> tsMap = tsKvByMultiMap.stream().collect(Collectors.groupingBy(TsKvDTO::getTs)); |
|
|
|
|
|
for (Map.Entry<Long, List<TsKvDTO>> entry : tsMap.entrySet()) { |
|
|
|
|
|
if(config.getConfigType().equals(CalcConfigType.DATA_MAPPING.getCode()) || config.getConfigType().equals(CalcConfigType.AGG_MAPPING.getCode())){ |
|
|
|
|
|
//直接计算结果 |
|
|
|
|
|
BigDecimal totalVal = |
|
|
|
|
|
entry.getValue().stream() |
|
|
|
|
|
.map(s-> new BigDecimal(s.getVal())) |
|
|
|
|
|
.reduce(BigDecimal.ZERO, BigDecimal::add).setScale(4, RoundingMode.HALF_UP); |
|
|
|
|
|
QueueProto.TsKvProto tsKvProto = |
|
|
|
|
|
QueueProto.TsKvProto.newBuilder() |
|
|
|
|
|
.setThingCode(config.getTargetThingCode()) |
|
|
|
|
|
.setKey(config.getTargetAttrCode()) |
|
|
|
|
|
.setTs(entry.getKey()) |
|
|
|
|
|
.setVal(totalVal.toPlainString()) |
|
|
|
|
|
.build(); |
|
|
|
|
|
protoList.add( |
|
|
|
|
|
QueueProto.DataProto.newBuilder().setTskvProto(tsKvProto).build()); |
|
|
|
|
|
}else{ |
|
|
|
|
|
//公式计算 |
|
|
|
|
|
Map<String, Object> variableMap = new HashMap<>(); |
|
|
|
|
|
for (CalcSourceConfigDTO sourceConfig : sourceConfigs) { |
|
|
|
|
|
TsKvDTO tsKvDTO = entry.getValue().stream().filter(s -> s.getThingCode().equals(sourceConfig.getSourceThingCode()) && s.getAttrKey().equals(sourceConfig.getSourceAttrCode())).findFirst().orElse(null); |
|
|
|
|
|
if(!Objects.isNull(tsKvDTO)){ |
|
|
|
|
|
variableMap.put(sourceConfig.getSourceAttrAlias(), new BigDecimal(tsKvDTO.getVal())); |
|
|
|
|
|
}else{ |
|
|
|
|
|
variableMap.put(sourceConfig.getSourceAttrAlias(), BigDecimal.ZERO); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
Object result = FormulaUtil.executeFormula(config.getFormula(), variableMap); |
|
|
|
|
|
QueueProto.TsKvProto tsKvProto = |
|
|
|
|
|
QueueProto.TsKvProto.newBuilder() |
|
|
|
|
|
.setThingCode(config.getTargetThingCode()) |
|
|
|
|
|
.setKey(config.getTargetAttrCode()) |
|
|
|
|
|
.setTs(entry.getKey()) |
|
|
|
|
|
.setVal(result.toString()) |
|
|
|
|
|
.build(); |
|
|
|
|
|
protoList.add( |
|
|
|
|
|
QueueProto.DataProto.newBuilder().setTskvProto(tsKvProto).build()); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
for (CalcTargetConfigDTO targetConfig : targetConfigs) { |
|
|
|
|
|
ReCalcRequest request1 = new ReCalcRequest(); |
|
|
|
|
|
request1.setCalcTargetConfigIds(Lists.newArrayList(targetConfig.getId())); |
|
|
|
|
|
request1.setStartTime(dayBeginTime); |
|
|
|
|
|
request1.setEndTime(dayEndTime); |
|
|
|
|
|
calcTargetConfigService.reCalculate(request1); |
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@NotNull |
|
|
|
|
|
private static Map<String, Collection<String>> convertParam(List<CalcSourceConfigDTO> sourceConfigs) { |
|
|
|
|
|
//将sourceConfigs 转换入参 |
|
|
|
|
|
Map<String, Collection<String>> multiMap = new HashMap<>(); |
|
|
|
|
|
for (CalcSourceConfigDTO sourceConfig : sourceConfigs) { |
|
|
|
|
|
if(multiMap.containsKey(sourceConfig.getSourceThingCode())){ |
|
|
|
|
|
Collection<String> list = multiMap.get(sourceConfig.getSourceThingCode()); |
|
|
|
|
|
list.add(sourceConfig.getSourceAttrCode()); |
|
|
|
|
|
multiMap.put(sourceConfig.getSourceThingCode(), list); |
|
|
|
|
|
}else{ |
|
|
|
|
|
multiMap.put(sourceConfig.getSourceThingCode(), Lists.newArrayList(sourceConfig.getSourceAttrCode())); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
// calcExecuteHandler.handleCalculate(request,null); |
|
|
|
|
|
|
|
|
return multiMap; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
} |
|
|
} |