Browse Source

actor系统引入

thing_master
siyang 1 year ago
parent
commit
c803106422
  1. 4
      application/pom.xml
  2. 27
      common/actor/pom.xml
  3. 36
      common/actor/src/main/java/com/thing/common/actor/Actor.java
  4. 56
      common/actor/src/main/java/com/thing/common/actor/ActorBizContext.java
  5. 12
      common/actor/src/main/java/com/thing/common/actor/ActorCreator.java
  6. 27
      common/actor/src/main/java/com/thing/common/actor/ActorId.java
  7. 16
      common/actor/src/main/java/com/thing/common/actor/ActorMsg.java
  8. 29
      common/actor/src/main/java/com/thing/common/actor/ActorRef.java
  9. 90
      common/actor/src/main/java/com/thing/common/actor/ActorSysContext.java
  10. 44
      common/actor/src/main/java/com/thing/common/actor/ActorSystem.java
  11. 16
      common/actor/src/main/java/com/thing/common/actor/enumeration/ActorBizType.java
  12. 19
      common/actor/src/main/java/com/thing/common/actor/enumeration/ActorMsgType.java
  13. 11
      common/actor/src/main/java/com/thing/common/actor/enumeration/ActorStopReason.java
  14. 38
      common/actor/src/main/java/com/thing/common/actor/exception/ActorException.java
  15. 17
      common/actor/src/main/java/com/thing/common/actor/exception/ActorNotRegisteredException.java
  16. 23
      common/actor/src/main/java/com/thing/common/actor/system/AbstractActor.java
  17. 18
      common/actor/src/main/java/com/thing/common/actor/system/AbstractActorCreator.java
  18. 30
      common/actor/src/main/java/com/thing/common/actor/system/AbstractActorId.java
  19. 5
      common/actor/src/main/java/com/thing/common/actor/system/ActorDispatcher.java
  20. 263
      common/actor/src/main/java/com/thing/common/actor/system/ActorMailbox.java
  21. 10
      common/actor/src/main/java/com/thing/common/actor/system/ActorSystemSettings.java
  22. 242
      common/actor/src/main/java/com/thing/common/actor/system/DefaultActorSystem.java
  23. 29
      common/actor/src/main/java/com/thing/common/actor/system/InitFailureStrategy.java
  24. 23
      common/actor/src/main/java/com/thing/common/actor/system/ProcessFailureStrategy.java
  25. 1
      common/pom.xml
  26. 27
      modules/actor-biz/pom.xml
  27. 50
      modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/CompanyActor.java
  28. 37
      modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/CompanyActorId.java
  29. 16
      modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/CompanyChangedMsg.java
  30. 42
      modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/NonCompanyActor.java
  31. 28
      modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/root/ActorSysProperties.java
  32. 95
      modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/root/RootActor.java
  33. 16
      modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/root/RootInitMsg.java
  34. 58
      modules/actor-biz/src/main/java/com/thing/common/actor/biz/base/BizContextAwareActor.java
  35. 38
      modules/actor-biz/src/main/java/com/thing/common/actor/biz/base/DefaultActorBizContext.java
  36. 100
      modules/actor-biz/src/main/java/com/thing/common/actor/biz/lifecycle/ActorLifecycle.java
  37. 1
      modules/pom.xml
  38. 10
      pom.xml

4
application/pom.xml

@ -73,6 +73,10 @@
</dependency> </dependency>
<!--modules--> <!--modules-->
<dependency>
<groupId>com.thing.modules</groupId>
<artifactId>actor-biz</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.thing.modules</groupId> <groupId>com.thing.modules</groupId>
<artifactId>alarm</artifactId> <artifactId>alarm</artifactId>

27
common/actor/pom.xml

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.thing</groupId>
<artifactId>common</artifactId>
<version>5.1</version>
</parent>
<groupId>com.thing.common</groupId>
<artifactId>actor</artifactId>
<packaging>jar</packaging>
<name>ThingBI Server Common Actor</name>
<dependencies>
<dependency>
<groupId>com.thing.common</groupId>
<artifactId>util</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>

36
common/actor/src/main/java/com/thing/common/actor/Actor.java

@ -0,0 +1,36 @@
package com.thing.common.actor;
import com.thing.common.actor.enumeration.ActorStopReason;
import com.thing.common.actor.exception.ActorException;
import com.thing.common.actor.system.InitFailureStrategy;
import com.thing.common.actor.system.ProcessFailureStrategy;
/**
* @author siyang
* @date 2024/8/1 10:27
* @description Actor模型定义
*/
public interface Actor {
/** 收到消息后的处理逻辑 */
void process(ActorMsg msg);
/** 获取一个Actor代理对象 */
ActorRef getActorRef();
default void init(ActorSysContext actorSysContext) {}
default void destroy(ActorStopReason stopReason, Throwable cause) throws ActorException {}
default InitFailureStrategy onInitFailure(int attempt, Throwable t) {
return InitFailureStrategy.retryWithDelay(5000L * attempt);
}
default ProcessFailureStrategy onProcessFailure(ActorMsg msg, Throwable t) {
if (t instanceof Error) {
return ProcessFailureStrategy.stop();
} else {
return ProcessFailureStrategy.resume();
}
}
}

56
common/actor/src/main/java/com/thing/common/actor/ActorBizContext.java

@ -0,0 +1,56 @@
package com.thing.common.actor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author siyang
* @date 2024/8/7 11:07
* @description 业务上下文
*/
public interface ActorBizContext {
ActorRef getRootActor();
ActorSystem getActorSystem();
default ScheduledExecutorService getScheduler() {
return getActorSystem().getScheduler();
}
default void tell(ActorMsg actorMsg) {
getRootActor().tell(actorMsg);
}
default void tellImportant(ActorMsg actorMsg) {
getRootActor().tellImportant(actorMsg);
}
/**
* 定时通知固定时间间隔
*
* @param actor actor代理
* @param actorMsg actor消息
* @param initialDelay 初始延时(ms)
* @param interval 任务间隔(ms)
*/
default void scheduleTell(ActorRef actor, ActorMsg actorMsg, long initialDelay, long interval) {
getScheduler()
.scheduleWithFixedDelay(
() -> actor.tell(actorMsg), initialDelay, interval, TimeUnit.MILLISECONDS);
}
/**
* 延迟通知
*
* @param actor actor代理
* @param actorMsg actor消息
* @param delay 延迟时间(ms)
*/
default void delayTell(ActorRef actor, ActorMsg actorMsg, long delay) {
if (delay > 0) {
getScheduler().schedule(() -> actor.tell(actorMsg), delay, TimeUnit.MILLISECONDS);
} else {
actor.tell(actorMsg);
}
}
}

12
common/actor/src/main/java/com/thing/common/actor/ActorCreator.java

@ -0,0 +1,12 @@
package com.thing.common.actor;
/**
* @author siyang
* @date 2024/8/1 14:07
* @description
*/
public interface ActorCreator {
ActorId createActorId();
Actor createActor();
}

27
common/actor/src/main/java/com/thing/common/actor/ActorId.java

@ -0,0 +1,27 @@
package com.thing.common.actor;
import com.thing.common.actor.enumeration.ActorBizType;
import com.thing.common.actor.system.AbstractActorId;
import java.io.Serializable;
/**
* @author siyang
* @date 2024/8/1 11:51
* @description actor唯一标识由id和bizType两部分组成唯一性判定逻辑参考{@link AbstractActorId}
*/
public interface ActorId {
/**
* 每个actor的id都是唯一的不可重复
*
* @return 唯一的id
*/
Serializable id();
/**
* actor业务类型
*
* @return 不同actor的业务类型可以相同
*/
ActorBizType bizType();
}

16
common/actor/src/main/java/com/thing/common/actor/ActorMsg.java

@ -0,0 +1,16 @@
package com.thing.common.actor;
import com.thing.common.actor.enumeration.ActorMsgType;
import com.thing.common.actor.enumeration.ActorStopReason;
/**
* @author siyang
* @date 2024/8/1 10:30
* @description
*/
public interface ActorMsg {
ActorMsgType getMsgType();
/** 按业务场景,在合适的子类中实现,针对不同的actor歇菜原因做不同的处理 */
default void onActorStopped(ActorStopReason reason) {}
}

29
common/actor/src/main/java/com/thing/common/actor/ActorRef.java

@ -0,0 +1,29 @@
package com.thing.common.actor;
/**
* @author siyang
* @date 2024/8/1 10:39
* @description actor代理对象接口
*/
public interface ActorRef {
/**
* 获取actor唯一标识对象
*
* @return actor唯一标识对象
*/
ActorId getActorId();
/**
* 普通的通知
*
* @param actorMsg 消息
*/
void tell(ActorMsg actorMsg);
/**
* 重要通知邮箱中同时存在"通知""重要通知"会优先执行重要通知的邮件
*
* @param actorMsg 消息
*/
void tellImportant(ActorMsg actorMsg);
}

90
common/actor/src/main/java/com/thing/common/actor/ActorSysContext.java

@ -0,0 +1,90 @@
package com.thing.common.actor;
import com.thing.common.actor.enumeration.ActorBizType;
import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;
/**
* @author siyang
* @date 2024/8/1 14:04
* @description 系统上下文
*/
public interface ActorSysContext extends ActorRef {
/** 获取自己的唯一标识 */
ActorId getSelf();
/** 获取父级 Actor的代理对象 */
ActorRef getParentRef();
/**
* 向目标actor通知消息
*
* @param target 目标actor的唯一标识
* @param actorMsg 消息
*/
void tell(ActorId target, ActorMsg actorMsg);
/**
* 向目标actor群发消息
*
* @param targetList actor唯一标识列表
* @param actorMsg 消息
*/
void tell(List<ActorId> targetList, ActorMsg actorMsg);
/**
* 停止目标actor
*
* @param target actor唯一标识
*/
void stop(ActorId target);
/**
* 获取或者创建子Actor代理对象
*
* @param actorId actor唯一标识
* @param dispatcher 执行分发器
* @param creator actor创建器
* @param createCondition actor创建条件
* @return ActorRef
*/
ActorRef getOrCreateChildActor(
ActorId actorId,
Supplier<String> dispatcher,
Supplier<ActorCreator> creator,
Supplier<Boolean> createCondition);
/**
* 过滤Actor
*
* @param childFilter 过滤条件
* @return 子ActorId列表
*/
List<ActorId> filterChildren(Predicate<ActorId> childFilter);
/**
* 向actor广播消息
*
* @param msg 消息
* @param highPriority 是否高优先级
*/
void broadcastToChildren(ActorMsg msg, boolean highPriority);
/**
* 向actor广播消息
*
* @param msg 消息
* @param actorBizType actor业务类型
*/
void broadcastToChildrenByType(ActorMsg msg, ActorBizType actorBizType);
/**
* 向子actor广播消息
*
* @param msg 消息
* @param childFilter actor过滤条件
*/
void broadcastToChildren(ActorMsg msg, Predicate<ActorId> childFilter);
}

44
common/actor/src/main/java/com/thing/common/actor/ActorSystem.java

@ -0,0 +1,44 @@
package com.thing.common.actor;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Predicate;
/**
* @author siyang
* @date 2024/8/1 14:37
* @description actor系统能力定义
*/
public interface ActorSystem {
ScheduledExecutorService getScheduler();
void createDispatcher(String dispatcherId, ExecutorService executor);
void destroyDispatcher(String dispatcherId);
ActorRef getActor(ActorId actorId);
ActorRef createRootActor(String dispatcherId, ActorCreator creator);
ActorRef createChildActor(String dispatcherId, ActorCreator creator, ActorId parent);
void tell(ActorId target, ActorMsg actorMsg);
void tellImportant(ActorId target, ActorMsg actorMsg);
void stop(ActorRef actorRef);
void stop(ActorId actorId);
void stop();
void broadcastToChildren(ActorId parent, ActorMsg msg);
void broadcastToChildren(ActorId parent, ActorMsg msg, boolean highPriority);
void broadcastToChildren(ActorId parent, Predicate<ActorId> childFilter, ActorMsg msg);
List<ActorId> filterChildren(ActorId parent, Predicate<ActorId> childFilter);
}

16
common/actor/src/main/java/com/thing/common/actor/enumeration/ActorBizType.java

@ -0,0 +1,16 @@
package com.thing.common.actor.enumeration;
/**
* @author siyang
* @date 2024/8/1 11:52
* @description actor业务类型
*/
public enum ActorBizType {
COMPANY,
NON_COMPANY,
DEVICE;
public static boolean same(ActorBizType a, ActorBizType b) {
return a.equals(b);
}
}

19
common/actor/src/main/java/com/thing/common/actor/enumeration/ActorMsgType.java

@ -0,0 +1,19 @@
package com.thing.common.actor.enumeration;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author siyang
* @date 2024/8/1 10:28
* @description 消息类型
*/
@Getter
@AllArgsConstructor
public enum ActorMsgType {
SYSTEM_INIT_MSG(false),
COMPANY_CHANGED_MSG(false);
/** 表示当前消息是否依赖租户信息 */
private final Boolean tenantBased;
}

11
common/actor/src/main/java/com/thing/common/actor/enumeration/ActorStopReason.java

@ -0,0 +1,11 @@
package com.thing.common.actor.enumeration;
/**
* @author siyang
* @date 2024/8/1 10:33
* @description actor歇菜原因
*/
public enum ActorStopReason {
INIT_FAILED,
STOPPED
}

38
common/actor/src/main/java/com/thing/common/actor/exception/ActorException.java

@ -0,0 +1,38 @@
package com.thing.common.actor.exception;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @author siyang
* @date 2024/8/1 15:08
* @description actor错误在发生时需给出判断是否还能抢救一下如果无法抢救给unrecoverable赋值为true在后续逻辑中做相应处理
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class ActorException extends Exception {
private final boolean unrecoverable;
public ActorException(String message, Throwable cause) {
super(message, cause);
this.unrecoverable = false;
}
public ActorException(String message) {
this(message, false);
}
public ActorException(String message, boolean unrecoverable) {
super(message);
this.unrecoverable = unrecoverable;
}
public ActorException(Exception e) {
this(e, false);
}
public ActorException(Exception e, boolean unrecoverable) {
super(e);
this.unrecoverable = unrecoverable;
}
}

17
common/actor/src/main/java/com/thing/common/actor/exception/ActorNotRegisteredException.java

@ -0,0 +1,17 @@
package com.thing.common.actor.exception;
import com.thing.common.actor.ActorId;
import lombok.Data;
import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode(callSuper = true)
public class ActorNotRegisteredException extends RuntimeException {
private ActorId target;
public ActorNotRegisteredException(ActorId target, String message) {
super(message);
this.target = target;
}
}

23
common/actor/src/main/java/com/thing/common/actor/system/AbstractActor.java

@ -0,0 +1,23 @@
package com.thing.common.actor.system;
import com.thing.common.actor.Actor;
import com.thing.common.actor.ActorRef;
import com.thing.common.actor.ActorSysContext;
import lombok.Getter;
@Getter
public abstract class AbstractActor implements Actor {
protected ActorSysContext actorSysContext;
@Override
public void init(ActorSysContext actorSysContext) {
this.actorSysContext = actorSysContext;
}
@Override
public ActorRef getActorRef() {
return actorSysContext;
}
}

18
common/actor/src/main/java/com/thing/common/actor/system/AbstractActorCreator.java

@ -0,0 +1,18 @@
package com.thing.common.actor.system;
import com.thing.common.actor.ActorBizContext;
import com.thing.common.actor.ActorCreator;
/**
* @author siyang
* @date 2024/8/2 10:07
* @description 基于上下文的actor创建器
*/
public abstract class AbstractActorCreator implements ActorCreator {
protected final transient ActorBizContext actorBizContext;
public AbstractActorCreator(ActorBizContext actorBizContext) {
super();
this.actorBizContext = actorBizContext;
}
}

30
common/actor/src/main/java/com/thing/common/actor/system/AbstractActorId.java

@ -0,0 +1,30 @@
package com.thing.common.actor.system;
import com.thing.common.actor.ActorId;
import com.thing.common.actor.enumeration.ActorBizType;
/**
* @author siyang
* @date 2024/8/2 10:40
*/
public abstract class AbstractActorId implements ActorId {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AbstractActorId that = (AbstractActorId) o;
return id().equals(that.id()) && ActorBizType.same(bizType(), that.bizType());
}
@Override
public int hashCode() {
return (bizType() + "_" + id()).hashCode();
}
@Override
public String toString() {
return bizType() + ":" + id();
}
}

5
common/actor/src/main/java/com/thing/common/actor/system/ActorDispatcher.java

@ -0,0 +1,5 @@
package com.thing.common.actor.system;
import java.util.concurrent.ExecutorService;
public record ActorDispatcher(String dispatcherId, ExecutorService executor) {}

263
common/actor/src/main/java/com/thing/common/actor/system/ActorMailbox.java

@ -0,0 +1,263 @@
package com.thing.common.actor.system;
import com.thing.common.actor.*;
import com.thing.common.actor.enumeration.ActorBizType;
import com.thing.common.actor.enumeration.ActorStopReason;
import com.thing.common.actor.exception.ActorException;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.function.Supplier;
@Slf4j
@Getter
@RequiredArgsConstructor
public final class ActorMailbox implements ActorSysContext {
private static final boolean HIGH_PRIORITY = true;
private static final boolean NORMAL_PRIORITY = false;
private static final boolean FREE = false;
private static final boolean BUSY = true;
private static final boolean NOT_READY = false;
private static final boolean READY = true;
private final ActorSystem system;
private final ActorSystemSettings settings;
private final ActorId selfId;
private final ActorRef parentRef;
private final Actor actor;
private final ActorDispatcher dispatcher;
private final ConcurrentLinkedQueue<ActorMsg> highPriorityMQ = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<ActorMsg> normalPriorityMQ = new ConcurrentLinkedQueue<>();
private final AtomicBoolean busy = new AtomicBoolean(FREE);
private final AtomicBoolean ready = new AtomicBoolean(NOT_READY);
private final AtomicBoolean destroyInProgress = new AtomicBoolean();
private volatile ActorStopReason stopReason;
public void initActor() {
dispatcher.executor().execute(() -> tryInit(1));
}
private void tryInit(int attempt) {
try {
log.debug("[{}] Trying to init actor, attempt: {}", selfId, attempt);
if (!destroyInProgress.get()) {
actor.init(this);
if (!destroyInProgress.get()) {
ready.set(READY);
tryProcessQueue(false);
}
}
} catch (Throwable t) {
InitFailureStrategy strategy;
int attemptIdx = attempt + 1;
if (isUnrecoverable(t)) {
strategy = InitFailureStrategy.stop();
} else {
log.debug("[{}] Failed to init actor, attempt: {}", selfId, attempt, t);
strategy = actor.onInitFailure(attempt, t);
}
if (strategy.isStop()
|| (settings.maxActorInitAttempts() > 0
&& attemptIdx > settings.maxActorInitAttempts())) {
log.info(
"[{}] Failed to init actor, attempt {}, going to stop attempts.",
selfId,
attempt,
t);
stopReason = ActorStopReason.INIT_FAILED;
destroy(t.getCause());
} else if (strategy.getRetryDelay() > 0) {
log.info(
"[{}] Failed to init actor, attempt {}, going to retry in attempts in {}ms",
selfId,
attempt,
strategy.getRetryDelay());
system.getScheduler()
.schedule(
() -> dispatcher.executor().execute(() -> tryInit(attemptIdx)),
strategy.getRetryDelay(),
TimeUnit.MILLISECONDS);
} else {
log.info(
"[{}] Failed to init actor, attempt {}, going to retry immediately",
selfId,
attempt);
dispatcher.executor().execute(() -> tryInit(attemptIdx));
}
}
}
private static boolean isUnrecoverable(Throwable t) {
if (t instanceof ActorException) {
if (Objects.nonNull(t.getCause())) {
return isUnrecoverable(t.getCause());
} else {
return ((ActorException) t).isUnrecoverable();
}
} else {
return false;
}
}
private void enqueue(ActorMsg msg, boolean highPriority) {
if (!destroyInProgress.get()) {
if (highPriority) {
highPriorityMQ.add(msg);
} else {
normalPriorityMQ.add(msg);
}
tryProcessQueue(true);
} else {
msg.onActorStopped(stopReason);
}
}
private void tryProcessQueue(boolean newMsg) {
// 当前邮箱还未就绪
if (ready.get() != READY) {
log.trace("[{}] MailBox is not ready, new msg: {}", selfId, newMsg);
return;
}
// 当前消息不是新的且邮箱为空
if (!newMsg && highPriorityMQ.isEmpty() && normalPriorityMQ.isEmpty()) {
log.trace("[{}] MailBox is empty, new msg: {}", selfId, false);
return;
}
// 当前消息是新消息或者邮箱非空则通过CAS方式设置邮箱为繁忙状态CAS竞争失败则表示当前邮箱正在处理消息
if (busy.compareAndSet(FREE, BUSY)) {
dispatcher.executor().execute(this::processMailbox);
} else {
log.trace("[{}] MailBox is busy, new msg: {}", selfId, newMsg);
}
}
private void processMailbox() {
boolean noMoreElements = false;
for (int i = 0; i < settings.actorThroughput(); i++) {
ActorMsg msg = highPriorityMQ.poll();
if (Objects.isNull(msg)) {
msg = normalPriorityMQ.poll();
}
if (Objects.nonNull(msg)) {
try {
log.debug("[{}] Going to process message: {}", selfId, msg);
actor.process(msg);
} catch (Throwable t) {
log.debug("[{}] Failed to process message: {}", selfId, msg, t);
ProcessFailureStrategy strategy = actor.onProcessFailure(msg, t);
if (strategy.isStop()) {
system.stop(selfId);
}
}
} else {
noMoreElements = true;
break;
}
}
if (noMoreElements) {
busy.set(FREE);
dispatcher.executor().execute(() -> tryProcessQueue(false));
} else {
dispatcher.executor().execute(this::processMailbox);
}
}
@Override
public ActorId getSelf() {
return selfId;
}
@Override
public void tell(ActorId target, ActorMsg actorMsg) {
system.tell(target, actorMsg);
}
@Override
public void tell(List<ActorId> targetList, ActorMsg actorMsg) {
targetList.forEach(target -> tell(target, actorMsg));
}
@Override
public void broadcastToChildren(ActorMsg msg, boolean highPriority) {
system.broadcastToChildren(selfId, msg, highPriority);
}
@Override
public void broadcastToChildrenByType(ActorMsg msg, ActorBizType actorBizType) {
broadcastToChildren(msg, actorId -> actorBizType.equals(actorId.bizType()));
}
@Override
public void broadcastToChildren(ActorMsg msg, Predicate<ActorId> childFilter) {
system.broadcastToChildren(selfId, childFilter, msg);
}
@Override
public List<ActorId> filterChildren(Predicate<ActorId> childFilter) {
return system.filterChildren(selfId, childFilter);
}
@Override
public void stop(ActorId target) {
system.stop(target);
}
@Override
public ActorRef getOrCreateChildActor(
ActorId actorId,
Supplier<String> dispatcher,
Supplier<ActorCreator> creator,
Supplier<Boolean> createCondition) {
ActorRef actorRef = system.getActor(actorId);
if (Objects.isNull(actorRef) && createCondition.get()) {
return system.createChildActor(dispatcher.get(), creator.get(), selfId);
} else {
return actorRef;
}
}
public void destroy(Throwable cause) {
if (Objects.isNull(stopReason)) {
stopReason = ActorStopReason.STOPPED;
}
destroyInProgress.set(true);
dispatcher
.executor()
.execute(
() -> {
try {
ready.set(NOT_READY);
actor.destroy(stopReason, cause);
highPriorityMQ.forEach(msg -> msg.onActorStopped(stopReason));
normalPriorityMQ.forEach(msg -> msg.onActorStopped(stopReason));
} catch (Throwable ignore) {
}
});
}
@Override
public ActorId getActorId() {
return selfId;
}
@Override
public void tell(ActorMsg actorMsg) {
enqueue(actorMsg, NORMAL_PRIORITY);
}
@Override
public void tellImportant(ActorMsg actorMsg) {
enqueue(actorMsg, HIGH_PRIORITY);
}
}

10
common/actor/src/main/java/com/thing/common/actor/system/ActorSystemSettings.java

@ -0,0 +1,10 @@
package com.thing.common.actor.system;
/**
* actor系统设置
*
* @param actorThroughput actor系统吞吐量当一个actor处理完该数量消息后若还有消息则重新生成任务再执行目的是为了避免单个actor占用太多cpu时间
* @param schedulerPoolSize 定时调度线程池大小
* @param maxActorInitAttempts actor初始化失败重试次数
*/
public record ActorSystemSettings(int actorThroughput, int schedulerPoolSize, int maxActorInitAttempts) {}

242
common/actor/src/main/java/com/thing/common/actor/system/DefaultActorSystem.java

@ -0,0 +1,242 @@
package com.thing.common.actor.system;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import com.thing.common.actor.*;
import com.thing.common.actor.exception.ActorNotRegisteredException;
import com.thing.common.util.thread.ThingThreadFactory;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
/**
* @author siyang
* @date 2024/8/1 14:50
* @description 默认的Actor系统实现
*/
@Data
@Slf4j
public class DefaultActorSystem implements ActorSystem {
private final ConcurrentMap<String, ActorDispatcher> dispatchers = new ConcurrentHashMap<>();
private final ConcurrentMap<ActorId, ActorMailbox> actors = new ConcurrentHashMap<>();
private final ConcurrentMap<ActorId, ReentrantLock> actorCreationLocks = new ConcurrentHashMap<>();
private final ConcurrentMap<ActorId, Set<ActorId>> parentChildMap = new ConcurrentHashMap<>();
private final ActorSystemSettings settings;
private final ScheduledExecutorService scheduler;
public DefaultActorSystem(ActorSystemSettings settings) {
this.settings = settings;
this.scheduler =
Executors.newScheduledThreadPool(
settings.schedulerPoolSize(),
ThingThreadFactory.forName("actor-system-scheduler"));
}
@Override
public void createDispatcher(String dispatcherId, ExecutorService executor) {
ActorDispatcher current =
dispatchers.putIfAbsent(dispatcherId, new ActorDispatcher(dispatcherId, executor));
if (Objects.nonNull(current)) {
throw new RuntimeException(
"Dispatcher with id [" + dispatcherId + "] is already registered!");
}
}
@Override
public void destroyDispatcher(String dispatcherId) {
ActorDispatcher actorDispatcher = dispatchers.remove(dispatcherId);
if (Objects.nonNull(actorDispatcher)) {
actorDispatcher.executor().shutdownNow();
}
}
@Override
public ActorRef getActor(ActorId actorId) {
return actors.get(actorId);
}
@Override
public ActorRef createRootActor(String dispatcherId, ActorCreator creator) {
return createActor(dispatcherId, creator, null);
}
@Override
public ActorRef createChildActor(String dispatcherId, ActorCreator creator, ActorId parent) {
return createActor(dispatcherId, creator, parent);
}
private ActorRef createActor(String dispatcherId, ActorCreator creator, ActorId parent) {
ActorDispatcher actorDispatcher = dispatchers.get(dispatcherId);
if (Objects.isNull(actorDispatcher)) {
log.warn("Dispatcher with id [{}] is not registered!", dispatcherId);
throw new RuntimeException(
"Dispatcher with id [" + dispatcherId + "] is not registered!");
}
ActorId actorId = creator.createActorId();
ActorMailbox actorMailbox = actors.get(actorId);
if (Objects.nonNull(actorMailbox)) {
return actorMailbox;
}
Lock actorCreationLock =
actorCreationLocks.computeIfAbsent(actorId, id -> new ReentrantLock());
actorCreationLock.lock();
try {
actorMailbox = actors.get(actorId);
if (Objects.nonNull(actorMailbox)) {
return actorMailbox;
}
log.debug("Creating actor with id [{}]!", actorId);
Actor actor = creator.createActor();
ActorRef parentRef = null;
if (Objects.nonNull(parent)) {
parentRef = getActor(parent);
if (Objects.isNull(parentRef)) {
throw new ActorNotRegisteredException(
parent, "Parent Actor with id [" + parent + "] is not registered!");
}
}
ActorMailbox mailbox =
new ActorMailbox(this, settings, actorId, parentRef, actor, actorDispatcher);
actors.put(actorId, mailbox);
mailbox.initActor();
actorMailbox = mailbox;
if (Objects.nonNull(parent)) {
parentChildMap
.computeIfAbsent(parent, id -> ConcurrentHashMap.newKeySet())
.add(actorId);
}
} finally {
actorCreationLock.unlock();
actorCreationLocks.remove(actorId);
}
return actorMailbox;
}
@Override
public void tellImportant(ActorId target, ActorMsg actorMsg) {
tell(target, actorMsg, true);
}
@Override
public void tell(ActorId target, ActorMsg actorMsg) {
tell(target, actorMsg, false);
}
private void tell(ActorId target, ActorMsg actorMsg, boolean highPriority) {
ActorMailbox mailbox = actors.get(target);
if (Objects.isNull(mailbox)) {
throw new ActorNotRegisteredException(
target, "Actor with id [" + target + "] is not registered!");
}
if (highPriority) {
mailbox.tellImportant(actorMsg);
} else {
mailbox.tell(actorMsg);
}
}
@Override
public void broadcastToChildren(ActorId parent, ActorMsg msg) {
broadcastToChildren(parent, msg, false);
}
@Override
public void broadcastToChildren(ActorId parent, ActorMsg msg, boolean highPriority) {
broadcastToChildren(parent, id -> true, msg, highPriority);
}
@Override
public void broadcastToChildren(ActorId parent, Predicate<ActorId> childFilter, ActorMsg msg) {
broadcastToChildren(parent, childFilter, msg, false);
}
private void broadcastToChildren(
ActorId parent, Predicate<ActorId> childFilter, ActorMsg msg, boolean highPriority) {
Set<ActorId> children = parentChildMap.get(parent);
if (Objects.nonNull(children)) {
children.stream()
.filter(childFilter)
.forEach(
id -> {
try {
tell(id, msg, highPriority);
} catch (ActorNotRegisteredException e) {
log.warn("Actor is missing for {}", id);
}
});
}
}
@Override
public List<ActorId> filterChildren(ActorId parent, Predicate<ActorId> childFilter) {
Set<ActorId> children = parentChildMap.get(parent);
if (Objects.nonNull(children)) {
return children.stream().filter(childFilter).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
@Override
public void stop(ActorRef actorRef) {
stop(actorRef.getActorId());
}
@Override
public void stop(ActorId actorId) {
Set<ActorId> children = parentChildMap.remove(actorId);
if (Objects.nonNull(children)) {
for (ActorId child : children) {
stop(child);
}
}
parentChildMap.values().forEach(parentChildren -> parentChildren.remove(actorId));
ActorMailbox mailbox = actors.remove(actorId);
if (Objects.nonNull(mailbox)) {
mailbox.destroy(null);
}
}
@Override
public void stop() {
dispatchers
.values()
.forEach(
actorDispatcher -> {
actorDispatcher.executor().shutdown();
try {
boolean terminated =
actorDispatcher
.executor()
.awaitTermination(3, TimeUnit.SECONDS);
if (!terminated) {
log.warn(
"[{}] Failed to stop dispatcher",
actorDispatcher.dispatcherId());
}
} catch (InterruptedException e) {
log.warn(
"[{}] Failed to stop dispatcher",
actorDispatcher.dispatcherId(),
e);
}
});
if (Objects.nonNull(scheduler)) {
scheduler.shutdownNow();
}
actors.clear();
}
}

29
common/actor/src/main/java/com/thing/common/actor/system/InitFailureStrategy.java

@ -0,0 +1,29 @@
package com.thing.common.actor.system;
import lombok.Getter;
import lombok.ToString;
@Getter
@ToString
public class InitFailureStrategy {
private final boolean stop;
private final long retryDelay;
private InitFailureStrategy(boolean stop, long retryDelay) {
this.stop = stop;
this.retryDelay = retryDelay;
}
public static InitFailureStrategy retryImmediately() {
return retryWithDelay(0);
}
public static InitFailureStrategy retryWithDelay(long ms) {
return new InitFailureStrategy(false, ms);
}
public static InitFailureStrategy stop() {
return new InitFailureStrategy(true, 0);
}
}

23
common/actor/src/main/java/com/thing/common/actor/system/ProcessFailureStrategy.java

@ -0,0 +1,23 @@
package com.thing.common.actor.system;
import lombok.Getter;
import lombok.ToString;
@Getter
@ToString
public class ProcessFailureStrategy {
private final boolean stop;
private ProcessFailureStrategy(boolean stop) {
this.stop = stop;
}
public static ProcessFailureStrategy stop() {
return new ProcessFailureStrategy(true);
}
public static ProcessFailureStrategy resume() {
return new ProcessFailureStrategy(false);
}
}

1
common/pom.xml

@ -15,6 +15,7 @@
<main.dir>${basedir}/..</main.dir> <main.dir>${basedir}/..</main.dir>
</properties> </properties>
<modules> <modules>
<module>actor</module>
<module>cache</module> <module>cache</module>
<module>core</module> <module>core</module>
<module>data</module> <module>data</module>

27
modules/actor-biz/pom.xml

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.thing</groupId>
<artifactId>modules</artifactId>
<version>5.1</version>
</parent>
<groupId>com.thing.modules</groupId>
<artifactId>actor-biz</artifactId>
<packaging>jar</packaging>
<name>ThingBI Server Modules Actor Biz</name>
<dependencies>
<dependency>
<groupId>com.thing.common</groupId>
<artifactId>actor</artifactId>
</dependency>
<dependency>
<groupId>com.thing.modules</groupId>
<artifactId>thing</artifactId>
</dependency>
</dependencies>
</project>

50
modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/CompanyActor.java

@ -0,0 +1,50 @@
package com.thing.common.actor.biz.actors.company;
import com.thing.common.actor.Actor;
import com.thing.common.actor.ActorBizContext;
import com.thing.common.actor.ActorId;
import com.thing.common.actor.ActorMsg;
import com.thing.common.actor.biz.base.BizContextAwareActor;
import com.thing.common.actor.system.AbstractActorCreator;
import java.util.Optional;
/**
* @author siyang
* @date 2024/8/8 08:46
* @description 公司actor处理公司范畴下的业务
*/
public class CompanyActor extends BizContextAwareActor {
public CompanyActor(ActorBizContext actorBizContext) {
super(actorBizContext);
}
@Override
protected boolean doProcess(ActorMsg msg) {
return false;
}
public static class ActorCreator extends AbstractActorCreator {
private final Long companyId;
private final CompanyActorId companyActorId;
public ActorCreator(ActorBizContext actorBizContext, Long companyId) {
super(actorBizContext);
this.companyId = companyId;
this.companyActorId = new CompanyActorId(companyId);
}
@Override
public ActorId createActorId() {
return Optional.ofNullable(companyActorId).orElse(new CompanyActorId(companyId));
}
@Override
public Actor createActor() {
return new CompanyActor(actorBizContext);
}
}
}

37
modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/CompanyActorId.java

@ -0,0 +1,37 @@
package com.thing.common.actor.biz.actors.company;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.thing.common.actor.enumeration.ActorBizType;
import com.thing.common.actor.system.AbstractActorId;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.io.Serializable;
/**
* @author siyang
* @date 2024/8/8 08:42
* @description 公司actor唯一标识
*/
@Getter
@AllArgsConstructor
public class CompanyActorId extends AbstractActorId {
/** 公司顶级actorId,也是rootActor的id */
@JsonIgnore public static final CompanyActorId ROOT_ID = new CompanyActorId(-1L);
/** 非公司顶级actorId */
@JsonIgnore public static final CompanyActorId NON_COMPANY_ID = new CompanyActorId(-2L);
private final Long id;
@Override
public Serializable id() {
return id;
}
@Override
public ActorBizType bizType() {
return ActorBizType.COMPANY;
}
}

16
modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/CompanyChangedMsg.java

@ -0,0 +1,16 @@
package com.thing.common.actor.biz.actors.company;
import com.thing.common.actor.ActorMsg;
import com.thing.common.actor.enumeration.ActorMsgType;
/**
* @author siyang
* @date 2024/8/5 11:04
* @description
*/
public record CompanyChangedMsg(Long id, Boolean add, Boolean remove) implements ActorMsg {
@Override
public ActorMsgType getMsgType() {
return ActorMsgType.COMPANY_CHANGED_MSG;
}
}

42
modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/NonCompanyActor.java

@ -0,0 +1,42 @@
package com.thing.common.actor.biz.actors.company;
import com.thing.common.actor.Actor;
import com.thing.common.actor.ActorBizContext;
import com.thing.common.actor.ActorId;
import com.thing.common.actor.ActorMsg;
import com.thing.common.actor.biz.base.BizContextAwareActor;
import com.thing.common.actor.system.AbstractActorCreator;
/**
* @author siyang
* @date 2024/8/8 08:47
* @description 非公司actor业务与公司无关
*/
public class NonCompanyActor extends BizContextAwareActor {
public NonCompanyActor(ActorBizContext actorBizContext) {
super(actorBizContext);
}
@Override
protected boolean doProcess(ActorMsg msg) {
return false;
}
public static class ActorCreator extends AbstractActorCreator {
public ActorCreator(ActorBizContext actorBizContext) {
super(actorBizContext);
}
@Override
public ActorId createActorId() {
return CompanyActorId.NON_COMPANY_ID;
}
@Override
public Actor createActor() {
return new NonCompanyActor(actorBizContext);
}
}
}

28
modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/root/ActorSysProperties.java

@ -0,0 +1,28 @@
package com.thing.common.actor.biz.actors.root;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @author siyang
* @date 2024/8/2 11:36
* @description actor系统配置
*/
@Data
@Component
@ConfigurationProperties(prefix = "actors.system")
public class ActorSysProperties {
public static final String ROOT_DISPATCHER = "root-dispatcher";
public static final String DEVICE_DISPATCHER = "device-dispatcher";
public static final String COMPANY_DISPATCHER = "company-dispatcher";
public static final String NON_COMPANY_DISPATCHER = "non-company-dispatcher";
private int throughput = 5;
private int maxActorInitAttempts = 10;
private int schedulerPoolSize = 1;
private int rootDispatcherSize = 1;
private int tenantDispatcherSize = 2;
private int nonTenantDispatcherSize = 2;
private int deviceDispatcherSize = 4;
}

95
modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/root/RootActor.java

@ -0,0 +1,95 @@
package com.thing.common.actor.biz.actors.root;
import com.thing.common.actor.*;
import com.thing.common.actor.biz.actors.company.CompanyActor;
import com.thing.common.actor.biz.actors.company.CompanyActorId;
import com.thing.common.actor.biz.actors.company.CompanyChangedMsg;
import com.thing.common.actor.biz.actors.company.NonCompanyActor;
import com.thing.common.actor.biz.base.BizContextAwareActor;
import com.thing.common.actor.system.AbstractActorCreator;
import com.thing.sys.tenant.entity.SysTenantDetailEntity;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* @author siyang
* @date 2024/8/1 16:59
* @description 顶级actor
*/
@Slf4j
public class RootActor extends BizContextAwareActor {
public RootActor(ActorBizContext actorBizContext) {
super(actorBizContext);
}
@Override
protected boolean doProcess(ActorMsg msg) {
switch (msg.getMsgType()) {
case SYSTEM_INIT_MSG -> systemInit();
case COMPANY_CHANGED_MSG -> changeCompany((CompanyChangedMsg) msg);
default -> {
return false;
}
}
return true;
}
private void systemInit() {
initCompanyActors();
initNonCompanyActors();
}
private void changeCompany(CompanyChangedMsg msg) {
if (msg.add()) {
getOrCreateCompanyActor(msg.id());
}
if (msg.remove()) {
actorSysContext.stop(new CompanyActorId(msg.id()));
}
}
private void initCompanyActors() {
List<SysTenantDetailEntity> list = defaultBizCtx().getSysTenantDetailService().list();
list.forEach(tenant -> getOrCreateCompanyActor(tenant.getId()));
}
private void initNonCompanyActors() {
getOrCreateNonCompanyActor();
}
private void getOrCreateCompanyActor(Long tenantId) {
actorSysContext.getOrCreateChildActor(
new CompanyActorId(tenantId),
() -> ActorSysProperties.COMPANY_DISPATCHER,
() -> new CompanyActor.ActorCreator(actorBizContext, tenantId),
() -> true);
}
private void getOrCreateNonCompanyActor() {
actorSysContext.getOrCreateChildActor(
CompanyActorId.NON_COMPANY_ID,
() -> ActorSysProperties.NON_COMPANY_DISPATCHER,
() -> new NonCompanyActor.ActorCreator(actorBizContext),
() -> true);
}
public static class ActorCreator extends AbstractActorCreator {
public ActorCreator(ActorBizContext actorBizContext) {
super(actorBizContext);
}
@Override
public ActorId createActorId() {
return CompanyActorId.ROOT_ID;
}
@Override
public Actor createActor() {
return new RootActor(actorBizContext);
}
}
}

16
modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/root/RootInitMsg.java

@ -0,0 +1,16 @@
package com.thing.common.actor.biz.actors.root;
import com.thing.common.actor.ActorMsg;
import com.thing.common.actor.enumeration.ActorMsgType;
/**
* @author siyang
* @date 2024/8/1 17:01
* @description 顶级actor初始化消息
*/
public class RootInitMsg implements ActorMsg {
@Override
public ActorMsgType getMsgType() {
return ActorMsgType.SYSTEM_INIT_MSG;
}
}

58
modules/actor-biz/src/main/java/com/thing/common/actor/biz/base/BizContextAwareActor.java

@ -0,0 +1,58 @@
package com.thing.common.actor.biz.base;
import com.thing.common.actor.ActorBizContext;
import com.thing.common.actor.ActorMsg;
import com.thing.common.actor.system.AbstractActor;
import com.thing.common.actor.system.ProcessFailureStrategy;
import lombok.extern.slf4j.Slf4j;
/**
* @author siyang
* @date 2024/8/1 16:48
* @description 提供业务上下文能力的Actor所有涉及业务处理的actor都应继承该类
*/
@Slf4j
public abstract class BizContextAwareActor extends AbstractActor {
protected final ActorBizContext actorBizContext;
public BizContextAwareActor(ActorBizContext actorBizContext) {
this.actorBizContext = actorBizContext;
}
public DefaultActorBizContext defaultBizCtx() {
return (DefaultActorBizContext) actorBizContext;
}
/**
* 消息处理核心方法由子类实现
*
* @param msg 消息内容
* @return 是否处理成功
*/
protected abstract boolean doProcess(ActorMsg msg);
@Override
public void process(ActorMsg msg) {
if (log.isDebugEnabled()) {
log.debug("Processing msg: {}", msg);
}
if (!doProcess(msg)) {
log.warn("Unprocessed message: {}!", msg);
}
}
@Override
public ProcessFailureStrategy onProcessFailure(ActorMsg msg, Throwable t) {
log.debug("[{}] Processing failure for msg {}", getActorRef().getActorId(), msg, t);
return doProcessFailure(t);
}
protected ProcessFailureStrategy doProcessFailure(Throwable t) {
if (t instanceof Error) {
return ProcessFailureStrategy.stop();
} else {
return ProcessFailureStrategy.resume();
}
}
}

38
modules/actor-biz/src/main/java/com/thing/common/actor/biz/base/DefaultActorBizContext.java

@ -0,0 +1,38 @@
package com.thing.common.actor.biz.base;
import com.thing.common.actor.ActorBizContext;
import com.thing.common.actor.ActorRef;
import com.thing.common.actor.ActorSystem;
import com.thing.sys.tenant.service.SysTenantDetailService;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
/**
* @author siyang
* @date 2024/8/7 13:20
* @description actor业务上下文
*/
@Data
@Component
@RequiredArgsConstructor
public class DefaultActorBizContext implements ActorBizContext {
/* --------------------- actor系统工具 ---------------------*/
private ActorRef rootActor;
private ActorSystem actorSystem;
/* ---------------------- 业务service ----------------------*/
private final SysTenantDetailService sysTenantDetailService;
@Override
public ActorRef getRootActor() {
return rootActor;
}
@Override
public ActorSystem getActorSystem() {
return actorSystem;
}
}

100
modules/actor-biz/src/main/java/com/thing/common/actor/biz/lifecycle/ActorLifecycle.java

@ -0,0 +1,100 @@
package com.thing.common.actor.biz.lifecycle;
import static com.thing.common.actor.biz.actors.root.ActorSysProperties.*;
import com.thing.common.actor.ActorRef;
import com.thing.common.actor.ActorSystem;
import com.thing.common.actor.biz.actors.root.ActorSysProperties;
import com.thing.common.actor.biz.actors.root.RootActor;
import com.thing.common.actor.biz.actors.root.RootInitMsg;
import com.thing.common.actor.biz.base.DefaultActorBizContext;
import com.thing.common.actor.system.ActorSystemSettings;
import com.thing.common.actor.system.DefaultActorSystem;
import com.thing.common.util.thread.ThingExecutors;
import com.thing.common.util.thread.ThingThreadFactory;
import jakarta.annotation.Nonnull;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author siyang
* @date 2024/8/2 11:28
* @description 顶级actor服务
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class ActorLifecycle implements ApplicationListener<ContextRefreshedEvent> {
private final ActorSysProperties actorSysProperties;
private final DefaultActorBizContext actorBizContext;
private ActorSystem system;
private ActorRef rootActor;
@PostConstruct
public void initActorSystem() {
log.info("Initializing actor actorSystem.");
ActorSystemSettings settings =
new ActorSystemSettings(
actorSysProperties.getThroughput(),
actorSysProperties.getSchedulerPoolSize(),
actorSysProperties.getMaxActorInitAttempts());
system = new DefaultActorSystem(settings);
createDispatcher(ROOT_DISPATCHER, actorSysProperties.getRootDispatcherSize());
createDispatcher(DEVICE_DISPATCHER, actorSysProperties.getDeviceDispatcherSize());
createDispatcher(COMPANY_DISPATCHER, actorSysProperties.getTenantDispatcherSize());
createDispatcher(NON_COMPANY_DISPATCHER, actorSysProperties.getNonTenantDispatcherSize());
rootActor =
system.createRootActor(
ROOT_DISPATCHER, new RootActor.ActorCreator(actorBizContext));
actorBizContext.setActorSystem(system);
actorBizContext.setRootActor(rootActor);
log.info("Actor actorSystem initialized.");
}
@PreDestroy
public void stopActorSystem() {
if (system != null) {
log.info("Stopping actor system.");
system.stop();
log.info("Actor system stopped.");
}
}
@Override
public void onApplicationEvent(@Nonnull ContextRefreshedEvent event) {
log.info("Application ready. Sending application init message to actor system");
rootActor.tellImportant(new RootInitMsg());
}
private void createDispatcher(String dispatcherName, int poolSize) {
system.createDispatcher(dispatcherName, initDispatcherExecutor(dispatcherName, poolSize));
}
private ExecutorService initDispatcherExecutor(String dispatcherName, int poolSize) {
if (poolSize == 0) {
int cores = Runtime.getRuntime().availableProcessors();
poolSize = Math.max(1, cores / 2);
}
if (poolSize == 1) {
return Executors.newSingleThreadExecutor(ThingThreadFactory.forName(dispatcherName));
} else {
return ThingExecutors.newWorkStealingPool(poolSize, dispatcherName);
}
}
}

1
modules/pom.xml

@ -11,6 +11,7 @@
<packaging>pom</packaging> <packaging>pom</packaging>
<modules> <modules>
<module>actor-biz</module>
<module>dequeue</module> <module>dequeue</module>
<module>thing</module> <module>thing</module>
<module>msg</module> <module>msg</module>

10
pom.xml

@ -484,6 +484,11 @@
</dependency> </dependency>
<!-- =================================== 内部依赖: common模块 ======================================= --> <!-- =================================== 内部依赖: common模块 ======================================= -->
<dependency>
<groupId>com.thing.common</groupId>
<artifactId>actor</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.thing.common</groupId> <groupId>com.thing.common</groupId>
<artifactId>cache</artifactId> <artifactId>cache</artifactId>
@ -536,6 +541,11 @@
</dependency> </dependency>
<!-- =================================== 内部依赖: modules模块 ======================================= --> <!-- =================================== 内部依赖: modules模块 ======================================= -->
<dependency>
<groupId>com.thing.modules</groupId>
<artifactId>actor-biz</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.thing.modules</groupId> <groupId>com.thing.modules</groupId>
<artifactId>alarm</artifactId> <artifactId>alarm</artifactId>

Loading…
Cancel
Save