|
|
|
@ -52,51 +52,62 @@ public class CalcExecuteHandler { |
|
|
|
*/ |
|
|
|
public void handleCalculate(ExecuteCalcRequest request) { |
|
|
|
long calcStart = System.currentTimeMillis(); |
|
|
|
// 清理过期日志:7天 |
|
|
|
calcLogService.clearExpiredCalculatedLog(); |
|
|
|
|
|
|
|
//获取启用的计算配置:若是没有启动的直接跳过(这个过程将结果对象和来源对象的结构进行了拼接) |
|
|
|
List<CalcTargetConfigDTO> targetConfigs = calcTargetConfigService.getAllEnabled(); |
|
|
|
if (CollectionUtils.isEmpty(targetConfigs)) { |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
// 按照结果对象的id分组 |
|
|
|
Map<Long, CalcTargetConfigDTO> configMap = |
|
|
|
targetConfigs.stream() |
|
|
|
.collect(Collectors.toMap(CalcTargetConfigDTO::getId, Function.identity())); |
|
|
|
|
|
|
|
// 查询待计算状态的日志 |
|
|
|
// 查询待计算状态的日志:查看未计算和计算异常(次数没有达到最大次数的异常)的配置 |
|
|
|
List<CalcLogEntity> logs = |
|
|
|
calcLogService.getPendingLogs( |
|
|
|
configMap.keySet(), request.getStartTs(), request.getEndTs()); |
|
|
|
if (CollectionUtils.isEmpty(logs)) { |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
// 算出结果后批量推送 |
|
|
|
List<QueueProto.DataProto> protoList = new ArrayList<>(); |
|
|
|
logs.forEach( |
|
|
|
log -> { |
|
|
|
CalcTargetConfigDTO config = configMap.get(log.getCalcTargetConfigId()); |
|
|
|
dispatchCalculate(log, config); |
|
|
|
long calcEnd = System.currentTimeMillis(); |
|
|
|
log.setUpdateDate(calcEnd); |
|
|
|
if (!CalcStatus.isCalculated(log.getStatus()) || Objects.isNull(log.getResult())) { |
|
|
|
return; |
|
|
|
} |
|
|
|
log.setDuration(calcEnd - calcStart); |
|
|
|
QueueProto.TsKvProto tsKvProto = |
|
|
|
QueueProto.TsKvProto.newBuilder() |
|
|
|
.setThingCode(config.getTargetThingCode()) |
|
|
|
.setKey(config.getTargetAttrCode()) |
|
|
|
.setTs(log.getTime()) |
|
|
|
.setVal(log.getResult()) |
|
|
|
.build(); |
|
|
|
protoList.add( |
|
|
|
QueueProto.DataProto.newBuilder().setTskvProto(tsKvProto).build()); |
|
|
|
}); |
|
|
|
|
|
|
|
// 遍历计算日志 |
|
|
|
for (CalcLogEntity log : logs) { |
|
|
|
try { |
|
|
|
//根据目标的id获取配置 |
|
|
|
CalcTargetConfigDTO config = configMap.get(log.getCalcTargetConfigId()); |
|
|
|
if(Objects.isNull(config)){ |
|
|
|
continue; |
|
|
|
} |
|
|
|
//根据不同的类型进行计算:求和 和 公式计算 |
|
|
|
dispatchCalculate(log, config); |
|
|
|
long calcEnd = System.currentTimeMillis(); |
|
|
|
log.setUpdateDate(calcEnd); |
|
|
|
//若是计算结果不是:已经计算 或者计算结果是空,则跳过 |
|
|
|
if (!CalcStatus.isCalculated(log.getStatus()) || Objects.isNull(log.getResult())) { |
|
|
|
return; |
|
|
|
} |
|
|
|
// 记录计算耗时 |
|
|
|
log.setDuration(calcEnd - calcStart); |
|
|
|
// 构建结果推送对象 |
|
|
|
QueueProto.TsKvProto tsKvProto = |
|
|
|
QueueProto.TsKvProto.newBuilder() |
|
|
|
.setThingCode(config.getTargetThingCode()) |
|
|
|
.setKey(config.getTargetAttrCode()) |
|
|
|
.setTs(log.getTime()) |
|
|
|
.setVal(log.getResult()) |
|
|
|
.build(); |
|
|
|
protoList.add( |
|
|
|
QueueProto.DataProto.newBuilder().setTskvProto(tsKvProto).build()); |
|
|
|
}catch (Exception e){ |
|
|
|
//计算异常 |
|
|
|
log.setStatus(CalcStatus.CALCULATE_EXCEPTION.getCode()); |
|
|
|
} |
|
|
|
} |
|
|
|
// 批量推送 |
|
|
|
publisher.publishEvent( |
|
|
|
new QueueConsumerEvent(Topics.V1_TSKV_HISTORY.getValue(), protoList)); |
|
|
|
|
|
|
|
// 批量更新状态 |
|
|
|
calcLogService.batchUpdate(logs); |
|
|
|
} |
|
|
|
@ -111,10 +122,14 @@ public class CalcExecuteHandler { |
|
|
|
if (Objects.isNull(calcLog) || Objects.isNull(config)) { |
|
|
|
return; |
|
|
|
} |
|
|
|
// 计算次数+1 |
|
|
|
calcLog.increaseCalcCount(); |
|
|
|
// 执行计算 |
|
|
|
CalcConfigType configType = CalcConfigType.getByCode(config.getConfigType()); |
|
|
|
switch (configType) { |
|
|
|
// 映射计算 和 聚合计算 |
|
|
|
case DATA_MAPPING, AGG_MAPPING -> doMapping(calcLog); |
|
|
|
// 公式计算:指标计算、复杂计算、共享计算 |
|
|
|
case IND_CALC, COMPLEX_CALC, SHARE_CALC -> doCalc(calcLog); |
|
|
|
default -> {} |
|
|
|
} |
|
|
|
|