From c803106422a2edb78ea55f4459aa7b6aed5047c5 Mon Sep 17 00:00:00 2001 From: siyang <2337720667@qq.com> Date: Thu, 8 Aug 2024 13:50:57 +0800 Subject: [PATCH] =?UTF-8?q?actor=E7=B3=BB=E7=BB=9F=E5=BC=95=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- application/pom.xml | 4 + common/actor/pom.xml | 27 ++ .../java/com/thing/common/actor/Actor.java | 36 +++ .../thing/common/actor/ActorBizContext.java | 56 ++++ .../com/thing/common/actor/ActorCreator.java | 12 + .../java/com/thing/common/actor/ActorId.java | 27 ++ .../java/com/thing/common/actor/ActorMsg.java | 16 ++ .../java/com/thing/common/actor/ActorRef.java | 29 ++ .../thing/common/actor/ActorSysContext.java | 90 ++++++ .../com/thing/common/actor/ActorSystem.java | 44 +++ .../actor/enumeration/ActorBizType.java | 16 ++ .../actor/enumeration/ActorMsgType.java | 19 ++ .../actor/enumeration/ActorStopReason.java | 11 + .../actor/exception/ActorException.java | 38 +++ .../ActorNotRegisteredException.java | 17 ++ .../common/actor/system/AbstractActor.java | 23 ++ .../actor/system/AbstractActorCreator.java | 18 ++ .../common/actor/system/AbstractActorId.java | 30 ++ .../common/actor/system/ActorDispatcher.java | 5 + .../common/actor/system/ActorMailbox.java | 263 ++++++++++++++++++ .../actor/system/ActorSystemSettings.java | 10 + .../actor/system/DefaultActorSystem.java | 242 ++++++++++++++++ .../actor/system/InitFailureStrategy.java | 29 ++ .../actor/system/ProcessFailureStrategy.java | 23 ++ common/pom.xml | 1 + modules/actor-biz/pom.xml | 27 ++ .../biz/actors/company/CompanyActor.java | 50 ++++ .../biz/actors/company/CompanyActorId.java | 37 +++ .../biz/actors/company/CompanyChangedMsg.java | 16 ++ .../biz/actors/company/NonCompanyActor.java | 42 +++ .../biz/actors/root/ActorSysProperties.java | 28 ++ .../actor/biz/actors/root/RootActor.java | 95 +++++++ .../actor/biz/actors/root/RootInitMsg.java | 16 ++ .../actor/biz/base/BizContextAwareActor.java | 58 ++++ .../biz/base/DefaultActorBizContext.java | 38 +++ .../actor/biz/lifecycle/ActorLifecycle.java | 100 +++++++ modules/pom.xml | 1 + pom.xml | 10 + 38 files changed, 1604 insertions(+) create mode 100644 common/actor/pom.xml create mode 100644 common/actor/src/main/java/com/thing/common/actor/Actor.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/ActorBizContext.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/ActorCreator.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/ActorId.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/ActorMsg.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/ActorRef.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/ActorSysContext.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/ActorSystem.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/enumeration/ActorBizType.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/enumeration/ActorMsgType.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/enumeration/ActorStopReason.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/exception/ActorException.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/exception/ActorNotRegisteredException.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/system/AbstractActor.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/system/AbstractActorCreator.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/system/AbstractActorId.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/system/ActorDispatcher.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/system/ActorMailbox.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/system/ActorSystemSettings.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/system/DefaultActorSystem.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/system/InitFailureStrategy.java create mode 100644 common/actor/src/main/java/com/thing/common/actor/system/ProcessFailureStrategy.java create mode 100644 modules/actor-biz/pom.xml create mode 100644 modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/CompanyActor.java create mode 100644 modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/CompanyActorId.java create mode 100644 modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/CompanyChangedMsg.java create mode 100644 modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/NonCompanyActor.java create mode 100644 modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/root/ActorSysProperties.java create mode 100644 modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/root/RootActor.java create mode 100644 modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/root/RootInitMsg.java create mode 100644 modules/actor-biz/src/main/java/com/thing/common/actor/biz/base/BizContextAwareActor.java create mode 100644 modules/actor-biz/src/main/java/com/thing/common/actor/biz/base/DefaultActorBizContext.java create mode 100644 modules/actor-biz/src/main/java/com/thing/common/actor/biz/lifecycle/ActorLifecycle.java diff --git a/application/pom.xml b/application/pom.xml index 0bc6af1..8d8fc4d 100644 --- a/application/pom.xml +++ b/application/pom.xml @@ -73,6 +73,10 @@ + + com.thing.modules + actor-biz + com.thing.modules alarm diff --git a/common/actor/pom.xml b/common/actor/pom.xml new file mode 100644 index 0000000..728e20d --- /dev/null +++ b/common/actor/pom.xml @@ -0,0 +1,27 @@ + + + 4.0.0 + + com.thing + common + 5.1 + + + com.thing.common + actor + jar + ThingBI Server Common Actor + + + + com.thing.common + util + + + org.projectlombok + lombok + + + \ No newline at end of file diff --git a/common/actor/src/main/java/com/thing/common/actor/Actor.java b/common/actor/src/main/java/com/thing/common/actor/Actor.java new file mode 100644 index 0000000..0562d75 --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/Actor.java @@ -0,0 +1,36 @@ +package com.thing.common.actor; + +import com.thing.common.actor.enumeration.ActorStopReason; +import com.thing.common.actor.exception.ActorException; +import com.thing.common.actor.system.InitFailureStrategy; +import com.thing.common.actor.system.ProcessFailureStrategy; + +/** + * @author siyang + * @date 2024/8/1 10:27 + * @description Actor模型定义 + */ +public interface Actor { + + /** 收到消息后的处理逻辑 */ + void process(ActorMsg msg); + + /** 获取一个Actor代理对象 */ + ActorRef getActorRef(); + + default void init(ActorSysContext actorSysContext) {} + + default void destroy(ActorStopReason stopReason, Throwable cause) throws ActorException {} + + default InitFailureStrategy onInitFailure(int attempt, Throwable t) { + return InitFailureStrategy.retryWithDelay(5000L * attempt); + } + + default ProcessFailureStrategy onProcessFailure(ActorMsg msg, Throwable t) { + if (t instanceof Error) { + return ProcessFailureStrategy.stop(); + } else { + return ProcessFailureStrategy.resume(); + } + } +} diff --git a/common/actor/src/main/java/com/thing/common/actor/ActorBizContext.java b/common/actor/src/main/java/com/thing/common/actor/ActorBizContext.java new file mode 100644 index 0000000..a64a831 --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/ActorBizContext.java @@ -0,0 +1,56 @@ +package com.thing.common.actor; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * @author siyang + * @date 2024/8/7 11:07 + * @description 业务上下文 + */ +public interface ActorBizContext { + ActorRef getRootActor(); + + ActorSystem getActorSystem(); + + default ScheduledExecutorService getScheduler() { + return getActorSystem().getScheduler(); + } + + default void tell(ActorMsg actorMsg) { + getRootActor().tell(actorMsg); + } + + default void tellImportant(ActorMsg actorMsg) { + getRootActor().tellImportant(actorMsg); + } + + /** + * 定时通知(固定时间间隔) + * + * @param actor actor代理 + * @param actorMsg actor消息 + * @param initialDelay 初始延时(ms) + * @param interval 任务间隔(ms) + */ + default void scheduleTell(ActorRef actor, ActorMsg actorMsg, long initialDelay, long interval) { + getScheduler() + .scheduleWithFixedDelay( + () -> actor.tell(actorMsg), initialDelay, interval, TimeUnit.MILLISECONDS); + } + + /** + * 延迟通知 + * + * @param actor actor代理 + * @param actorMsg actor消息 + * @param delay 延迟时间(ms) + */ + default void delayTell(ActorRef actor, ActorMsg actorMsg, long delay) { + if (delay > 0) { + getScheduler().schedule(() -> actor.tell(actorMsg), delay, TimeUnit.MILLISECONDS); + } else { + actor.tell(actorMsg); + } + } +} diff --git a/common/actor/src/main/java/com/thing/common/actor/ActorCreator.java b/common/actor/src/main/java/com/thing/common/actor/ActorCreator.java new file mode 100644 index 0000000..c9e0dba --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/ActorCreator.java @@ -0,0 +1,12 @@ +package com.thing.common.actor; + +/** + * @author siyang + * @date 2024/8/1 14:07 + * @description + */ +public interface ActorCreator { + ActorId createActorId(); + + Actor createActor(); +} diff --git a/common/actor/src/main/java/com/thing/common/actor/ActorId.java b/common/actor/src/main/java/com/thing/common/actor/ActorId.java new file mode 100644 index 0000000..cca100c --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/ActorId.java @@ -0,0 +1,27 @@ +package com.thing.common.actor; + +import com.thing.common.actor.enumeration.ActorBizType; +import com.thing.common.actor.system.AbstractActorId; + +import java.io.Serializable; + +/** + * @author siyang + * @date 2024/8/1 11:51 + * @description actor唯一标识,由id和bizType两部分组成。唯一性判定逻辑参考{@link AbstractActorId} + */ +public interface ActorId { + /** + * 每个actor的id都是唯一的,不可重复 + * + * @return 唯一的id + */ + Serializable id(); + + /** + * actor业务类型 + * + * @return 不同actor的业务类型可以相同 + */ + ActorBizType bizType(); +} diff --git a/common/actor/src/main/java/com/thing/common/actor/ActorMsg.java b/common/actor/src/main/java/com/thing/common/actor/ActorMsg.java new file mode 100644 index 0000000..1c1dcef --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/ActorMsg.java @@ -0,0 +1,16 @@ +package com.thing.common.actor; + +import com.thing.common.actor.enumeration.ActorMsgType; +import com.thing.common.actor.enumeration.ActorStopReason; + +/** + * @author siyang + * @date 2024/8/1 10:30 + * @description + */ +public interface ActorMsg { + ActorMsgType getMsgType(); + + /** 按业务场景,在合适的子类中实现,针对不同的actor歇菜原因做不同的处理 */ + default void onActorStopped(ActorStopReason reason) {} +} diff --git a/common/actor/src/main/java/com/thing/common/actor/ActorRef.java b/common/actor/src/main/java/com/thing/common/actor/ActorRef.java new file mode 100644 index 0000000..3fdbc11 --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/ActorRef.java @@ -0,0 +1,29 @@ +package com.thing.common.actor; + +/** + * @author siyang + * @date 2024/8/1 10:39 + * @description actor代理对象接口 + */ +public interface ActorRef { + /** + * 获取actor唯一标识对象 + * + * @return actor唯一标识对象 + */ + ActorId getActorId(); + + /** + * 普通的通知 + * + * @param actorMsg 消息 + */ + void tell(ActorMsg actorMsg); + + /** + * 重要通知:邮箱中同时存在"通知"和"重要通知",会优先执行重要通知的邮件 + * + * @param actorMsg 消息 + */ + void tellImportant(ActorMsg actorMsg); +} diff --git a/common/actor/src/main/java/com/thing/common/actor/ActorSysContext.java b/common/actor/src/main/java/com/thing/common/actor/ActorSysContext.java new file mode 100644 index 0000000..58ee5fe --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/ActorSysContext.java @@ -0,0 +1,90 @@ +package com.thing.common.actor; + +import com.thing.common.actor.enumeration.ActorBizType; + +import java.util.List; +import java.util.function.Predicate; +import java.util.function.Supplier; + +/** + * @author siyang + * @date 2024/8/1 14:04 + * @description 系统上下文 + */ +public interface ActorSysContext extends ActorRef { + /** 获取自己的唯一标识 */ + ActorId getSelf(); + + /** 获取父级 Actor的代理对象 */ + ActorRef getParentRef(); + + /** + * 向目标actor通知消息 + * + * @param target 目标actor的唯一标识 + * @param actorMsg 消息 + */ + void tell(ActorId target, ActorMsg actorMsg); + + /** + * 向目标actor群发消息 + * + * @param targetList actor唯一标识列表 + * @param actorMsg 消息 + */ + void tell(List targetList, ActorMsg actorMsg); + + /** + * 停止目标actor + * + * @param target actor唯一标识 + */ + void stop(ActorId target); + + /** + * 获取或者创建子Actor代理对象 + * + * @param actorId actor唯一标识 + * @param dispatcher 执行分发器 + * @param creator actor创建器 + * @param createCondition actor创建条件 + * @return ActorRef + */ + ActorRef getOrCreateChildActor( + ActorId actorId, + Supplier dispatcher, + Supplier creator, + Supplier createCondition); + + /** + * 过滤Actor + * + * @param childFilter 过滤条件 + * @return 子ActorId列表 + */ + List filterChildren(Predicate childFilter); + + /** + * 向actor广播消息 + * + * @param msg 消息 + * @param highPriority 是否高优先级 + */ + void broadcastToChildren(ActorMsg msg, boolean highPriority); + + /** + * 向actor广播消息 + * + * @param msg 消息 + * @param actorBizType actor业务类型 + */ + void broadcastToChildrenByType(ActorMsg msg, ActorBizType actorBizType); + + /** + * 向子actor广播消息 + * + * @param msg 消息 + * @param childFilter actor过滤条件 + */ + void broadcastToChildren(ActorMsg msg, Predicate childFilter); +} diff --git a/common/actor/src/main/java/com/thing/common/actor/ActorSystem.java b/common/actor/src/main/java/com/thing/common/actor/ActorSystem.java new file mode 100644 index 0000000..27e0692 --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/ActorSystem.java @@ -0,0 +1,44 @@ +package com.thing.common.actor; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Predicate; + +/** + * @author siyang + * @date 2024/8/1 14:37 + * @description actor系统能力定义 + */ +public interface ActorSystem { + + ScheduledExecutorService getScheduler(); + + void createDispatcher(String dispatcherId, ExecutorService executor); + + void destroyDispatcher(String dispatcherId); + + ActorRef getActor(ActorId actorId); + + ActorRef createRootActor(String dispatcherId, ActorCreator creator); + + ActorRef createChildActor(String dispatcherId, ActorCreator creator, ActorId parent); + + void tell(ActorId target, ActorMsg actorMsg); + + void tellImportant(ActorId target, ActorMsg actorMsg); + + void stop(ActorRef actorRef); + + void stop(ActorId actorId); + + void stop(); + + void broadcastToChildren(ActorId parent, ActorMsg msg); + + void broadcastToChildren(ActorId parent, ActorMsg msg, boolean highPriority); + + void broadcastToChildren(ActorId parent, Predicate childFilter, ActorMsg msg); + + List filterChildren(ActorId parent, Predicate childFilter); +} diff --git a/common/actor/src/main/java/com/thing/common/actor/enumeration/ActorBizType.java b/common/actor/src/main/java/com/thing/common/actor/enumeration/ActorBizType.java new file mode 100644 index 0000000..621eef4 --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/enumeration/ActorBizType.java @@ -0,0 +1,16 @@ +package com.thing.common.actor.enumeration; + +/** + * @author siyang + * @date 2024/8/1 11:52 + * @description actor业务类型 + */ +public enum ActorBizType { + COMPANY, + NON_COMPANY, + DEVICE; + + public static boolean same(ActorBizType a, ActorBizType b) { + return a.equals(b); + } +} diff --git a/common/actor/src/main/java/com/thing/common/actor/enumeration/ActorMsgType.java b/common/actor/src/main/java/com/thing/common/actor/enumeration/ActorMsgType.java new file mode 100644 index 0000000..1ab0274 --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/enumeration/ActorMsgType.java @@ -0,0 +1,19 @@ +package com.thing.common.actor.enumeration; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * @author siyang + * @date 2024/8/1 10:28 + * @description 消息类型 + */ +@Getter +@AllArgsConstructor +public enum ActorMsgType { + SYSTEM_INIT_MSG(false), + COMPANY_CHANGED_MSG(false); + + /** 表示当前消息是否依赖租户信息 */ + private final Boolean tenantBased; +} diff --git a/common/actor/src/main/java/com/thing/common/actor/enumeration/ActorStopReason.java b/common/actor/src/main/java/com/thing/common/actor/enumeration/ActorStopReason.java new file mode 100644 index 0000000..042d8eb --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/enumeration/ActorStopReason.java @@ -0,0 +1,11 @@ +package com.thing.common.actor.enumeration; + +/** + * @author siyang + * @date 2024/8/1 10:33 + * @description actor歇菜原因 + */ +public enum ActorStopReason { + INIT_FAILED, + STOPPED +} diff --git a/common/actor/src/main/java/com/thing/common/actor/exception/ActorException.java b/common/actor/src/main/java/com/thing/common/actor/exception/ActorException.java new file mode 100644 index 0000000..5663198 --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/exception/ActorException.java @@ -0,0 +1,38 @@ +package com.thing.common.actor.exception; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * @author siyang + * @date 2024/8/1 15:08 + * @description actor错误,在发生时需给出判断:是否还能抢救一下。如果无法抢救,给unrecoverable赋值为true,在后续逻辑中做相应处理 + */ +@Data +@EqualsAndHashCode(callSuper = true) +public class ActorException extends Exception { + private final boolean unrecoverable; + + public ActorException(String message, Throwable cause) { + super(message, cause); + this.unrecoverable = false; + } + + public ActorException(String message) { + this(message, false); + } + + public ActorException(String message, boolean unrecoverable) { + super(message); + this.unrecoverable = unrecoverable; + } + + public ActorException(Exception e) { + this(e, false); + } + + public ActorException(Exception e, boolean unrecoverable) { + super(e); + this.unrecoverable = unrecoverable; + } +} diff --git a/common/actor/src/main/java/com/thing/common/actor/exception/ActorNotRegisteredException.java b/common/actor/src/main/java/com/thing/common/actor/exception/ActorNotRegisteredException.java new file mode 100644 index 0000000..33297ab --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/exception/ActorNotRegisteredException.java @@ -0,0 +1,17 @@ +package com.thing.common.actor.exception; + +import com.thing.common.actor.ActorId; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class ActorNotRegisteredException extends RuntimeException { + + private ActorId target; + + public ActorNotRegisteredException(ActorId target, String message) { + super(message); + this.target = target; + } +} diff --git a/common/actor/src/main/java/com/thing/common/actor/system/AbstractActor.java b/common/actor/src/main/java/com/thing/common/actor/system/AbstractActor.java new file mode 100644 index 0000000..c6908c2 --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/system/AbstractActor.java @@ -0,0 +1,23 @@ +package com.thing.common.actor.system; + +import com.thing.common.actor.Actor; +import com.thing.common.actor.ActorRef; +import com.thing.common.actor.ActorSysContext; + +import lombok.Getter; + +@Getter +public abstract class AbstractActor implements Actor { + + protected ActorSysContext actorSysContext; + + @Override + public void init(ActorSysContext actorSysContext) { + this.actorSysContext = actorSysContext; + } + + @Override + public ActorRef getActorRef() { + return actorSysContext; + } +} diff --git a/common/actor/src/main/java/com/thing/common/actor/system/AbstractActorCreator.java b/common/actor/src/main/java/com/thing/common/actor/system/AbstractActorCreator.java new file mode 100644 index 0000000..770ec25 --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/system/AbstractActorCreator.java @@ -0,0 +1,18 @@ +package com.thing.common.actor.system; + +import com.thing.common.actor.ActorBizContext; +import com.thing.common.actor.ActorCreator; + +/** + * @author siyang + * @date 2024/8/2 10:07 + * @description 基于上下文的actor创建器 + */ +public abstract class AbstractActorCreator implements ActorCreator { + protected final transient ActorBizContext actorBizContext; + + public AbstractActorCreator(ActorBizContext actorBizContext) { + super(); + this.actorBizContext = actorBizContext; + } +} diff --git a/common/actor/src/main/java/com/thing/common/actor/system/AbstractActorId.java b/common/actor/src/main/java/com/thing/common/actor/system/AbstractActorId.java new file mode 100644 index 0000000..35f965a --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/system/AbstractActorId.java @@ -0,0 +1,30 @@ +package com.thing.common.actor.system; + + +import com.thing.common.actor.ActorId; +import com.thing.common.actor.enumeration.ActorBizType; + +/** + * @author siyang + * @date 2024/8/2 10:40 + */ +public abstract class AbstractActorId implements ActorId { + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AbstractActorId that = (AbstractActorId) o; + return id().equals(that.id()) && ActorBizType.same(bizType(), that.bizType()); + } + + @Override + public int hashCode() { + return (bizType() + "_" + id()).hashCode(); + } + + @Override + public String toString() { + return bizType() + ":" + id(); + } +} diff --git a/common/actor/src/main/java/com/thing/common/actor/system/ActorDispatcher.java b/common/actor/src/main/java/com/thing/common/actor/system/ActorDispatcher.java new file mode 100644 index 0000000..ca791c9 --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/system/ActorDispatcher.java @@ -0,0 +1,5 @@ +package com.thing.common.actor.system; + +import java.util.concurrent.ExecutorService; + +public record ActorDispatcher(String dispatcherId, ExecutorService executor) {} diff --git a/common/actor/src/main/java/com/thing/common/actor/system/ActorMailbox.java b/common/actor/src/main/java/com/thing/common/actor/system/ActorMailbox.java new file mode 100644 index 0000000..e1617da --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/system/ActorMailbox.java @@ -0,0 +1,263 @@ +package com.thing.common.actor.system; + +import com.thing.common.actor.*; +import com.thing.common.actor.enumeration.ActorBizType; +import com.thing.common.actor.enumeration.ActorStopReason; +import com.thing.common.actor.exception.ActorException; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; +import java.util.function.Supplier; + +@Slf4j +@Getter +@RequiredArgsConstructor +public final class ActorMailbox implements ActorSysContext { + private static final boolean HIGH_PRIORITY = true; + private static final boolean NORMAL_PRIORITY = false; + + private static final boolean FREE = false; + private static final boolean BUSY = true; + + private static final boolean NOT_READY = false; + private static final boolean READY = true; + + private final ActorSystem system; + private final ActorSystemSettings settings; + private final ActorId selfId; + private final ActorRef parentRef; + private final Actor actor; + private final ActorDispatcher dispatcher; + + private final ConcurrentLinkedQueue highPriorityMQ = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue normalPriorityMQ = new ConcurrentLinkedQueue<>(); + private final AtomicBoolean busy = new AtomicBoolean(FREE); + private final AtomicBoolean ready = new AtomicBoolean(NOT_READY); + private final AtomicBoolean destroyInProgress = new AtomicBoolean(); + private volatile ActorStopReason stopReason; + + public void initActor() { + dispatcher.executor().execute(() -> tryInit(1)); + } + + private void tryInit(int attempt) { + try { + log.debug("[{}] Trying to init actor, attempt: {}", selfId, attempt); + if (!destroyInProgress.get()) { + actor.init(this); + if (!destroyInProgress.get()) { + ready.set(READY); + tryProcessQueue(false); + } + } + } catch (Throwable t) { + InitFailureStrategy strategy; + int attemptIdx = attempt + 1; + if (isUnrecoverable(t)) { + strategy = InitFailureStrategy.stop(); + } else { + log.debug("[{}] Failed to init actor, attempt: {}", selfId, attempt, t); + strategy = actor.onInitFailure(attempt, t); + } + if (strategy.isStop() + || (settings.maxActorInitAttempts() > 0 + && attemptIdx > settings.maxActorInitAttempts())) { + log.info( + "[{}] Failed to init actor, attempt {}, going to stop attempts.", + selfId, + attempt, + t); + stopReason = ActorStopReason.INIT_FAILED; + destroy(t.getCause()); + } else if (strategy.getRetryDelay() > 0) { + log.info( + "[{}] Failed to init actor, attempt {}, going to retry in attempts in {}ms", + selfId, + attempt, + strategy.getRetryDelay()); + system.getScheduler() + .schedule( + () -> dispatcher.executor().execute(() -> tryInit(attemptIdx)), + strategy.getRetryDelay(), + TimeUnit.MILLISECONDS); + } else { + log.info( + "[{}] Failed to init actor, attempt {}, going to retry immediately", + selfId, + attempt); + dispatcher.executor().execute(() -> tryInit(attemptIdx)); + } + } + } + + private static boolean isUnrecoverable(Throwable t) { + if (t instanceof ActorException) { + if (Objects.nonNull(t.getCause())) { + return isUnrecoverable(t.getCause()); + } else { + return ((ActorException) t).isUnrecoverable(); + } + } else { + return false; + } + } + + private void enqueue(ActorMsg msg, boolean highPriority) { + if (!destroyInProgress.get()) { + if (highPriority) { + highPriorityMQ.add(msg); + } else { + normalPriorityMQ.add(msg); + } + tryProcessQueue(true); + } else { + msg.onActorStopped(stopReason); + } + } + + private void tryProcessQueue(boolean newMsg) { + // 当前邮箱还未就绪 + if (ready.get() != READY) { + log.trace("[{}] MailBox is not ready, new msg: {}", selfId, newMsg); + return; + } + // 当前消息不是新的,且邮箱为空 + if (!newMsg && highPriorityMQ.isEmpty() && normalPriorityMQ.isEmpty()) { + log.trace("[{}] MailBox is empty, new msg: {}", selfId, false); + return; + } + // 当前消息是新消息,或者邮箱非空,则通过CAS方式设置邮箱为繁忙状态。CAS竞争失败则表示当前邮箱正在处理消息 + if (busy.compareAndSet(FREE, BUSY)) { + dispatcher.executor().execute(this::processMailbox); + } else { + log.trace("[{}] MailBox is busy, new msg: {}", selfId, newMsg); + } + } + + private void processMailbox() { + boolean noMoreElements = false; + for (int i = 0; i < settings.actorThroughput(); i++) { + ActorMsg msg = highPriorityMQ.poll(); + if (Objects.isNull(msg)) { + msg = normalPriorityMQ.poll(); + } + if (Objects.nonNull(msg)) { + try { + log.debug("[{}] Going to process message: {}", selfId, msg); + actor.process(msg); + } catch (Throwable t) { + log.debug("[{}] Failed to process message: {}", selfId, msg, t); + ProcessFailureStrategy strategy = actor.onProcessFailure(msg, t); + if (strategy.isStop()) { + system.stop(selfId); + } + } + } else { + noMoreElements = true; + break; + } + } + if (noMoreElements) { + busy.set(FREE); + dispatcher.executor().execute(() -> tryProcessQueue(false)); + } else { + dispatcher.executor().execute(this::processMailbox); + } + } + + @Override + public ActorId getSelf() { + return selfId; + } + + @Override + public void tell(ActorId target, ActorMsg actorMsg) { + system.tell(target, actorMsg); + } + + @Override + public void tell(List targetList, ActorMsg actorMsg) { + targetList.forEach(target -> tell(target, actorMsg)); + } + + @Override + public void broadcastToChildren(ActorMsg msg, boolean highPriority) { + system.broadcastToChildren(selfId, msg, highPriority); + } + + @Override + public void broadcastToChildrenByType(ActorMsg msg, ActorBizType actorBizType) { + broadcastToChildren(msg, actorId -> actorBizType.equals(actorId.bizType())); + } + + @Override + public void broadcastToChildren(ActorMsg msg, Predicate childFilter) { + system.broadcastToChildren(selfId, childFilter, msg); + } + + @Override + public List filterChildren(Predicate childFilter) { + return system.filterChildren(selfId, childFilter); + } + + @Override + public void stop(ActorId target) { + system.stop(target); + } + + @Override + public ActorRef getOrCreateChildActor( + ActorId actorId, + Supplier dispatcher, + Supplier creator, + Supplier createCondition) { + ActorRef actorRef = system.getActor(actorId); + if (Objects.isNull(actorRef) && createCondition.get()) { + return system.createChildActor(dispatcher.get(), creator.get(), selfId); + } else { + return actorRef; + } + } + + public void destroy(Throwable cause) { + if (Objects.isNull(stopReason)) { + stopReason = ActorStopReason.STOPPED; + } + destroyInProgress.set(true); + dispatcher + .executor() + .execute( + () -> { + try { + ready.set(NOT_READY); + actor.destroy(stopReason, cause); + highPriorityMQ.forEach(msg -> msg.onActorStopped(stopReason)); + normalPriorityMQ.forEach(msg -> msg.onActorStopped(stopReason)); + } catch (Throwable ignore) { + } + }); + } + + @Override + public ActorId getActorId() { + return selfId; + } + + @Override + public void tell(ActorMsg actorMsg) { + enqueue(actorMsg, NORMAL_PRIORITY); + } + + @Override + public void tellImportant(ActorMsg actorMsg) { + enqueue(actorMsg, HIGH_PRIORITY); + } +} diff --git a/common/actor/src/main/java/com/thing/common/actor/system/ActorSystemSettings.java b/common/actor/src/main/java/com/thing/common/actor/system/ActorSystemSettings.java new file mode 100644 index 0000000..178574d --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/system/ActorSystemSettings.java @@ -0,0 +1,10 @@ +package com.thing.common.actor.system; + +/** + * actor系统设置 + * + * @param actorThroughput actor系统吞吐量:当一个actor处理完该数量消息后,若还有消息,则重新生成任务再执行。目的是为了避免单个actor占用太多cpu时间 + * @param schedulerPoolSize 定时调度线程池大小 + * @param maxActorInitAttempts actor初始化失败重试次数 + */ +public record ActorSystemSettings(int actorThroughput, int schedulerPoolSize, int maxActorInitAttempts) {} diff --git a/common/actor/src/main/java/com/thing/common/actor/system/DefaultActorSystem.java b/common/actor/src/main/java/com/thing/common/actor/system/DefaultActorSystem.java new file mode 100644 index 0000000..4e3c5bb --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/system/DefaultActorSystem.java @@ -0,0 +1,242 @@ +package com.thing.common.actor.system; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.*; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import com.thing.common.actor.*; +import com.thing.common.actor.exception.ActorNotRegisteredException; +import com.thing.common.util.thread.ThingThreadFactory; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +/** + * @author siyang + * @date 2024/8/1 14:50 + * @description 默认的Actor系统实现 + */ +@Data +@Slf4j +public class DefaultActorSystem implements ActorSystem { + + private final ConcurrentMap dispatchers = new ConcurrentHashMap<>(); + private final ConcurrentMap actors = new ConcurrentHashMap<>(); + private final ConcurrentMap actorCreationLocks = new ConcurrentHashMap<>(); + private final ConcurrentMap> parentChildMap = new ConcurrentHashMap<>(); + + private final ActorSystemSettings settings; + private final ScheduledExecutorService scheduler; + + public DefaultActorSystem(ActorSystemSettings settings) { + this.settings = settings; + this.scheduler = + Executors.newScheduledThreadPool( + settings.schedulerPoolSize(), + ThingThreadFactory.forName("actor-system-scheduler")); + } + + @Override + public void createDispatcher(String dispatcherId, ExecutorService executor) { + ActorDispatcher current = + dispatchers.putIfAbsent(dispatcherId, new ActorDispatcher(dispatcherId, executor)); + if (Objects.nonNull(current)) { + throw new RuntimeException( + "Dispatcher with id [" + dispatcherId + "] is already registered!"); + } + } + + @Override + public void destroyDispatcher(String dispatcherId) { + ActorDispatcher actorDispatcher = dispatchers.remove(dispatcherId); + if (Objects.nonNull(actorDispatcher)) { + actorDispatcher.executor().shutdownNow(); + } + } + + @Override + public ActorRef getActor(ActorId actorId) { + return actors.get(actorId); + } + + @Override + public ActorRef createRootActor(String dispatcherId, ActorCreator creator) { + return createActor(dispatcherId, creator, null); + } + + @Override + public ActorRef createChildActor(String dispatcherId, ActorCreator creator, ActorId parent) { + return createActor(dispatcherId, creator, parent); + } + + private ActorRef createActor(String dispatcherId, ActorCreator creator, ActorId parent) { + ActorDispatcher actorDispatcher = dispatchers.get(dispatcherId); + if (Objects.isNull(actorDispatcher)) { + log.warn("Dispatcher with id [{}] is not registered!", dispatcherId); + throw new RuntimeException( + "Dispatcher with id [" + dispatcherId + "] is not registered!"); + } + + ActorId actorId = creator.createActorId(); + ActorMailbox actorMailbox = actors.get(actorId); + if (Objects.nonNull(actorMailbox)) { + return actorMailbox; + } + + Lock actorCreationLock = + actorCreationLocks.computeIfAbsent(actorId, id -> new ReentrantLock()); + actorCreationLock.lock(); + try { + actorMailbox = actors.get(actorId); + if (Objects.nonNull(actorMailbox)) { + return actorMailbox; + } + + log.debug("Creating actor with id [{}]!", actorId); + Actor actor = creator.createActor(); + ActorRef parentRef = null; + if (Objects.nonNull(parent)) { + parentRef = getActor(parent); + if (Objects.isNull(parentRef)) { + throw new ActorNotRegisteredException( + parent, "Parent Actor with id [" + parent + "] is not registered!"); + } + } + ActorMailbox mailbox = + new ActorMailbox(this, settings, actorId, parentRef, actor, actorDispatcher); + actors.put(actorId, mailbox); + mailbox.initActor(); + actorMailbox = mailbox; + if (Objects.nonNull(parent)) { + parentChildMap + .computeIfAbsent(parent, id -> ConcurrentHashMap.newKeySet()) + .add(actorId); + } + } finally { + actorCreationLock.unlock(); + actorCreationLocks.remove(actorId); + } + return actorMailbox; + } + + @Override + public void tellImportant(ActorId target, ActorMsg actorMsg) { + tell(target, actorMsg, true); + } + + @Override + public void tell(ActorId target, ActorMsg actorMsg) { + tell(target, actorMsg, false); + } + + private void tell(ActorId target, ActorMsg actorMsg, boolean highPriority) { + ActorMailbox mailbox = actors.get(target); + if (Objects.isNull(mailbox)) { + throw new ActorNotRegisteredException( + target, "Actor with id [" + target + "] is not registered!"); + } + if (highPriority) { + mailbox.tellImportant(actorMsg); + } else { + mailbox.tell(actorMsg); + } + } + + @Override + public void broadcastToChildren(ActorId parent, ActorMsg msg) { + broadcastToChildren(parent, msg, false); + } + + @Override + public void broadcastToChildren(ActorId parent, ActorMsg msg, boolean highPriority) { + broadcastToChildren(parent, id -> true, msg, highPriority); + } + + @Override + public void broadcastToChildren(ActorId parent, Predicate childFilter, ActorMsg msg) { + broadcastToChildren(parent, childFilter, msg, false); + } + + private void broadcastToChildren( + ActorId parent, Predicate childFilter, ActorMsg msg, boolean highPriority) { + Set children = parentChildMap.get(parent); + if (Objects.nonNull(children)) { + children.stream() + .filter(childFilter) + .forEach( + id -> { + try { + tell(id, msg, highPriority); + } catch (ActorNotRegisteredException e) { + log.warn("Actor is missing for {}", id); + } + }); + } + } + + @Override + public List filterChildren(ActorId parent, Predicate childFilter) { + Set children = parentChildMap.get(parent); + if (Objects.nonNull(children)) { + return children.stream().filter(childFilter).collect(Collectors.toList()); + } else { + return Collections.emptyList(); + } + } + + @Override + public void stop(ActorRef actorRef) { + stop(actorRef.getActorId()); + } + + @Override + public void stop(ActorId actorId) { + Set children = parentChildMap.remove(actorId); + if (Objects.nonNull(children)) { + for (ActorId child : children) { + stop(child); + } + } + parentChildMap.values().forEach(parentChildren -> parentChildren.remove(actorId)); + + ActorMailbox mailbox = actors.remove(actorId); + if (Objects.nonNull(mailbox)) { + mailbox.destroy(null); + } + } + + @Override + public void stop() { + dispatchers + .values() + .forEach( + actorDispatcher -> { + actorDispatcher.executor().shutdown(); + try { + boolean terminated = + actorDispatcher + .executor() + .awaitTermination(3, TimeUnit.SECONDS); + if (!terminated) { + log.warn( + "[{}] Failed to stop dispatcher", + actorDispatcher.dispatcherId()); + } + } catch (InterruptedException e) { + log.warn( + "[{}] Failed to stop dispatcher", + actorDispatcher.dispatcherId(), + e); + } + }); + if (Objects.nonNull(scheduler)) { + scheduler.shutdownNow(); + } + actors.clear(); + } +} diff --git a/common/actor/src/main/java/com/thing/common/actor/system/InitFailureStrategy.java b/common/actor/src/main/java/com/thing/common/actor/system/InitFailureStrategy.java new file mode 100644 index 0000000..53b1a52 --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/system/InitFailureStrategy.java @@ -0,0 +1,29 @@ +package com.thing.common.actor.system; + +import lombok.Getter; +import lombok.ToString; + +@Getter +@ToString +public class InitFailureStrategy { + + private final boolean stop; + private final long retryDelay; + + private InitFailureStrategy(boolean stop, long retryDelay) { + this.stop = stop; + this.retryDelay = retryDelay; + } + + public static InitFailureStrategy retryImmediately() { + return retryWithDelay(0); + } + + public static InitFailureStrategy retryWithDelay(long ms) { + return new InitFailureStrategy(false, ms); + } + + public static InitFailureStrategy stop() { + return new InitFailureStrategy(true, 0); + } +} diff --git a/common/actor/src/main/java/com/thing/common/actor/system/ProcessFailureStrategy.java b/common/actor/src/main/java/com/thing/common/actor/system/ProcessFailureStrategy.java new file mode 100644 index 0000000..33c7696 --- /dev/null +++ b/common/actor/src/main/java/com/thing/common/actor/system/ProcessFailureStrategy.java @@ -0,0 +1,23 @@ +package com.thing.common.actor.system; + +import lombok.Getter; +import lombok.ToString; + +@Getter +@ToString +public class ProcessFailureStrategy { + + private final boolean stop; + + private ProcessFailureStrategy(boolean stop) { + this.stop = stop; + } + + public static ProcessFailureStrategy stop() { + return new ProcessFailureStrategy(true); + } + + public static ProcessFailureStrategy resume() { + return new ProcessFailureStrategy(false); + } +} diff --git a/common/pom.xml b/common/pom.xml index 52f33fe..c87e731 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -15,6 +15,7 @@ ${basedir}/.. + actor cache core data diff --git a/modules/actor-biz/pom.xml b/modules/actor-biz/pom.xml new file mode 100644 index 0000000..f9da0b2 --- /dev/null +++ b/modules/actor-biz/pom.xml @@ -0,0 +1,27 @@ + + + 4.0.0 + + com.thing + modules + 5.1 + + + com.thing.modules + actor-biz + jar + ThingBI Server Modules Actor Biz + + + + com.thing.common + actor + + + com.thing.modules + thing + + + \ No newline at end of file diff --git a/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/CompanyActor.java b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/CompanyActor.java new file mode 100644 index 0000000..46cb208 --- /dev/null +++ b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/CompanyActor.java @@ -0,0 +1,50 @@ +package com.thing.common.actor.biz.actors.company; + +import com.thing.common.actor.Actor; +import com.thing.common.actor.ActorBizContext; +import com.thing.common.actor.ActorId; +import com.thing.common.actor.ActorMsg; +import com.thing.common.actor.biz.base.BizContextAwareActor; +import com.thing.common.actor.system.AbstractActorCreator; + +import java.util.Optional; + +/** + * @author siyang + * @date 2024/8/8 08:46 + * @description 公司actor,处理公司范畴下的业务 + */ +public class CompanyActor extends BizContextAwareActor { + + public CompanyActor(ActorBizContext actorBizContext) { + super(actorBizContext); + } + + @Override + protected boolean doProcess(ActorMsg msg) { + return false; + } + + public static class ActorCreator extends AbstractActorCreator { + + private final Long companyId; + + private final CompanyActorId companyActorId; + + public ActorCreator(ActorBizContext actorBizContext, Long companyId) { + super(actorBizContext); + this.companyId = companyId; + this.companyActorId = new CompanyActorId(companyId); + } + + @Override + public ActorId createActorId() { + return Optional.ofNullable(companyActorId).orElse(new CompanyActorId(companyId)); + } + + @Override + public Actor createActor() { + return new CompanyActor(actorBizContext); + } + } +} diff --git a/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/CompanyActorId.java b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/CompanyActorId.java new file mode 100644 index 0000000..a6c0fe3 --- /dev/null +++ b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/CompanyActorId.java @@ -0,0 +1,37 @@ +package com.thing.common.actor.biz.actors.company; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.thing.common.actor.enumeration.ActorBizType; +import com.thing.common.actor.system.AbstractActorId; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.io.Serializable; + +/** + * @author siyang + * @date 2024/8/8 08:42 + * @description 公司actor唯一标识 + */ +@Getter +@AllArgsConstructor +public class CompanyActorId extends AbstractActorId { + /** 公司顶级actorId,也是rootActor的id */ + @JsonIgnore public static final CompanyActorId ROOT_ID = new CompanyActorId(-1L); + + /** 非公司顶级actorId */ + @JsonIgnore public static final CompanyActorId NON_COMPANY_ID = new CompanyActorId(-2L); + + private final Long id; + + @Override + public Serializable id() { + return id; + } + + @Override + public ActorBizType bizType() { + return ActorBizType.COMPANY; + } +} diff --git a/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/CompanyChangedMsg.java b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/CompanyChangedMsg.java new file mode 100644 index 0000000..0089222 --- /dev/null +++ b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/CompanyChangedMsg.java @@ -0,0 +1,16 @@ +package com.thing.common.actor.biz.actors.company; + +import com.thing.common.actor.ActorMsg; +import com.thing.common.actor.enumeration.ActorMsgType; + +/** + * @author siyang + * @date 2024/8/5 11:04 + * @description + */ +public record CompanyChangedMsg(Long id, Boolean add, Boolean remove) implements ActorMsg { + @Override + public ActorMsgType getMsgType() { + return ActorMsgType.COMPANY_CHANGED_MSG; + } +} diff --git a/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/NonCompanyActor.java b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/NonCompanyActor.java new file mode 100644 index 0000000..c23ca9d --- /dev/null +++ b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/company/NonCompanyActor.java @@ -0,0 +1,42 @@ +package com.thing.common.actor.biz.actors.company; + +import com.thing.common.actor.Actor; +import com.thing.common.actor.ActorBizContext; +import com.thing.common.actor.ActorId; +import com.thing.common.actor.ActorMsg; +import com.thing.common.actor.biz.base.BizContextAwareActor; +import com.thing.common.actor.system.AbstractActorCreator; + +/** + * @author siyang + * @date 2024/8/8 08:47 + * @description 非公司actor,业务与公司无关 + */ +public class NonCompanyActor extends BizContextAwareActor { + + public NonCompanyActor(ActorBizContext actorBizContext) { + super(actorBizContext); + } + + @Override + protected boolean doProcess(ActorMsg msg) { + return false; + } + + public static class ActorCreator extends AbstractActorCreator { + + public ActorCreator(ActorBizContext actorBizContext) { + super(actorBizContext); + } + + @Override + public ActorId createActorId() { + return CompanyActorId.NON_COMPANY_ID; + } + + @Override + public Actor createActor() { + return new NonCompanyActor(actorBizContext); + } + } +} diff --git a/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/root/ActorSysProperties.java b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/root/ActorSysProperties.java new file mode 100644 index 0000000..77a8efc --- /dev/null +++ b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/root/ActorSysProperties.java @@ -0,0 +1,28 @@ +package com.thing.common.actor.biz.actors.root; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * @author siyang + * @date 2024/8/2 11:36 + * @description actor系统配置 + */ +@Data +@Component +@ConfigurationProperties(prefix = "actors.system") +public class ActorSysProperties { + public static final String ROOT_DISPATCHER = "root-dispatcher"; + public static final String DEVICE_DISPATCHER = "device-dispatcher"; + public static final String COMPANY_DISPATCHER = "company-dispatcher"; + public static final String NON_COMPANY_DISPATCHER = "non-company-dispatcher"; + + private int throughput = 5; + private int maxActorInitAttempts = 10; + private int schedulerPoolSize = 1; + private int rootDispatcherSize = 1; + private int tenantDispatcherSize = 2; + private int nonTenantDispatcherSize = 2; + private int deviceDispatcherSize = 4; +} \ No newline at end of file diff --git a/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/root/RootActor.java b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/root/RootActor.java new file mode 100644 index 0000000..c31f107 --- /dev/null +++ b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/root/RootActor.java @@ -0,0 +1,95 @@ +package com.thing.common.actor.biz.actors.root; + +import com.thing.common.actor.*; +import com.thing.common.actor.biz.actors.company.CompanyActor; +import com.thing.common.actor.biz.actors.company.CompanyActorId; +import com.thing.common.actor.biz.actors.company.CompanyChangedMsg; +import com.thing.common.actor.biz.actors.company.NonCompanyActor; +import com.thing.common.actor.biz.base.BizContextAwareActor; +import com.thing.common.actor.system.AbstractActorCreator; +import com.thing.sys.tenant.entity.SysTenantDetailEntity; + +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +/** + * @author siyang + * @date 2024/8/1 16:59 + * @description 顶级actor + */ +@Slf4j +public class RootActor extends BizContextAwareActor { + + public RootActor(ActorBizContext actorBizContext) { + super(actorBizContext); + } + + @Override + protected boolean doProcess(ActorMsg msg) { + switch (msg.getMsgType()) { + case SYSTEM_INIT_MSG -> systemInit(); + case COMPANY_CHANGED_MSG -> changeCompany((CompanyChangedMsg) msg); + default -> { + return false; + } + } + return true; + } + + private void systemInit() { + initCompanyActors(); + initNonCompanyActors(); + } + + private void changeCompany(CompanyChangedMsg msg) { + if (msg.add()) { + getOrCreateCompanyActor(msg.id()); + } + if (msg.remove()) { + actorSysContext.stop(new CompanyActorId(msg.id())); + } + } + + private void initCompanyActors() { + List list = defaultBizCtx().getSysTenantDetailService().list(); + list.forEach(tenant -> getOrCreateCompanyActor(tenant.getId())); + } + + private void initNonCompanyActors() { + getOrCreateNonCompanyActor(); + } + + private void getOrCreateCompanyActor(Long tenantId) { + actorSysContext.getOrCreateChildActor( + new CompanyActorId(tenantId), + () -> ActorSysProperties.COMPANY_DISPATCHER, + () -> new CompanyActor.ActorCreator(actorBizContext, tenantId), + () -> true); + } + + private void getOrCreateNonCompanyActor() { + actorSysContext.getOrCreateChildActor( + CompanyActorId.NON_COMPANY_ID, + () -> ActorSysProperties.NON_COMPANY_DISPATCHER, + () -> new NonCompanyActor.ActorCreator(actorBizContext), + () -> true); + } + + public static class ActorCreator extends AbstractActorCreator { + + public ActorCreator(ActorBizContext actorBizContext) { + super(actorBizContext); + } + + @Override + public ActorId createActorId() { + return CompanyActorId.ROOT_ID; + } + + @Override + public Actor createActor() { + return new RootActor(actorBizContext); + } + } +} diff --git a/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/root/RootInitMsg.java b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/root/RootInitMsg.java new file mode 100644 index 0000000..6d80021 --- /dev/null +++ b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/actors/root/RootInitMsg.java @@ -0,0 +1,16 @@ +package com.thing.common.actor.biz.actors.root; + +import com.thing.common.actor.ActorMsg; +import com.thing.common.actor.enumeration.ActorMsgType; + +/** + * @author siyang + * @date 2024/8/1 17:01 + * @description 顶级actor初始化消息 + */ +public class RootInitMsg implements ActorMsg { + @Override + public ActorMsgType getMsgType() { + return ActorMsgType.SYSTEM_INIT_MSG; + } +} diff --git a/modules/actor-biz/src/main/java/com/thing/common/actor/biz/base/BizContextAwareActor.java b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/base/BizContextAwareActor.java new file mode 100644 index 0000000..51a0a8a --- /dev/null +++ b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/base/BizContextAwareActor.java @@ -0,0 +1,58 @@ +package com.thing.common.actor.biz.base; + +import com.thing.common.actor.ActorBizContext; +import com.thing.common.actor.ActorMsg; +import com.thing.common.actor.system.AbstractActor; +import com.thing.common.actor.system.ProcessFailureStrategy; + +import lombok.extern.slf4j.Slf4j; + +/** + * @author siyang + * @date 2024/8/1 16:48 + * @description 提供业务上下文能力的Actor,所有涉及业务处理的actor都应继承该类 + */ +@Slf4j +public abstract class BizContextAwareActor extends AbstractActor { + protected final ActorBizContext actorBizContext; + + public BizContextAwareActor(ActorBizContext actorBizContext) { + this.actorBizContext = actorBizContext; + } + + public DefaultActorBizContext defaultBizCtx() { + return (DefaultActorBizContext) actorBizContext; + } + + /** + * 消息处理核心方法,由子类实现 + * + * @param msg 消息内容 + * @return 是否处理成功 + */ + protected abstract boolean doProcess(ActorMsg msg); + + @Override + public void process(ActorMsg msg) { + if (log.isDebugEnabled()) { + log.debug("Processing msg: {}", msg); + } + if (!doProcess(msg)) { + log.warn("Unprocessed message: {}!", msg); + } + } + + @Override + public ProcessFailureStrategy onProcessFailure(ActorMsg msg, Throwable t) { + log.debug("[{}] Processing failure for msg {}", getActorRef().getActorId(), msg, t); + return doProcessFailure(t); + } + + protected ProcessFailureStrategy doProcessFailure(Throwable t) { + if (t instanceof Error) { + return ProcessFailureStrategy.stop(); + } else { + return ProcessFailureStrategy.resume(); + } + } +} diff --git a/modules/actor-biz/src/main/java/com/thing/common/actor/biz/base/DefaultActorBizContext.java b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/base/DefaultActorBizContext.java new file mode 100644 index 0000000..1e6d7b6 --- /dev/null +++ b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/base/DefaultActorBizContext.java @@ -0,0 +1,38 @@ +package com.thing.common.actor.biz.base; + +import com.thing.common.actor.ActorBizContext; +import com.thing.common.actor.ActorRef; +import com.thing.common.actor.ActorSystem; +import com.thing.sys.tenant.service.SysTenantDetailService; + +import lombok.Data; +import lombok.RequiredArgsConstructor; + +import org.springframework.stereotype.Component; + +/** + * @author siyang + * @date 2024/8/7 13:20 + * @description actor业务上下文 + */ +@Data +@Component +@RequiredArgsConstructor +public class DefaultActorBizContext implements ActorBizContext { + /* --------------------- actor系统工具 ---------------------*/ + private ActorRef rootActor; + private ActorSystem actorSystem; + + /* ---------------------- 业务service ----------------------*/ + private final SysTenantDetailService sysTenantDetailService; + + @Override + public ActorRef getRootActor() { + return rootActor; + } + + @Override + public ActorSystem getActorSystem() { + return actorSystem; + } +} diff --git a/modules/actor-biz/src/main/java/com/thing/common/actor/biz/lifecycle/ActorLifecycle.java b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/lifecycle/ActorLifecycle.java new file mode 100644 index 0000000..66c214f --- /dev/null +++ b/modules/actor-biz/src/main/java/com/thing/common/actor/biz/lifecycle/ActorLifecycle.java @@ -0,0 +1,100 @@ +package com.thing.common.actor.biz.lifecycle; + +import static com.thing.common.actor.biz.actors.root.ActorSysProperties.*; + +import com.thing.common.actor.ActorRef; +import com.thing.common.actor.ActorSystem; +import com.thing.common.actor.biz.actors.root.ActorSysProperties; +import com.thing.common.actor.biz.actors.root.RootActor; +import com.thing.common.actor.biz.actors.root.RootInitMsg; +import com.thing.common.actor.biz.base.DefaultActorBizContext; +import com.thing.common.actor.system.ActorSystemSettings; +import com.thing.common.actor.system.DefaultActorSystem; +import com.thing.common.util.thread.ThingExecutors; +import com.thing.common.util.thread.ThingThreadFactory; + +import jakarta.annotation.Nonnull; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.stereotype.Service; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * @author siyang + * @date 2024/8/2 11:28 + * @description 顶级actor服务 + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class ActorLifecycle implements ApplicationListener { + + private final ActorSysProperties actorSysProperties; + private final DefaultActorBizContext actorBizContext; + + private ActorSystem system; + private ActorRef rootActor; + + @PostConstruct + public void initActorSystem() { + log.info("Initializing actor actorSystem."); + ActorSystemSettings settings = + new ActorSystemSettings( + actorSysProperties.getThroughput(), + actorSysProperties.getSchedulerPoolSize(), + actorSysProperties.getMaxActorInitAttempts()); + system = new DefaultActorSystem(settings); + + createDispatcher(ROOT_DISPATCHER, actorSysProperties.getRootDispatcherSize()); + createDispatcher(DEVICE_DISPATCHER, actorSysProperties.getDeviceDispatcherSize()); + createDispatcher(COMPANY_DISPATCHER, actorSysProperties.getTenantDispatcherSize()); + createDispatcher(NON_COMPANY_DISPATCHER, actorSysProperties.getNonTenantDispatcherSize()); + + rootActor = + system.createRootActor( + ROOT_DISPATCHER, new RootActor.ActorCreator(actorBizContext)); + + actorBizContext.setActorSystem(system); + actorBizContext.setRootActor(rootActor); + log.info("Actor actorSystem initialized."); + } + + @PreDestroy + public void stopActorSystem() { + if (system != null) { + log.info("Stopping actor system."); + system.stop(); + log.info("Actor system stopped."); + } + } + + @Override + public void onApplicationEvent(@Nonnull ContextRefreshedEvent event) { + log.info("Application ready. Sending application init message to actor system"); + rootActor.tellImportant(new RootInitMsg()); + } + + private void createDispatcher(String dispatcherName, int poolSize) { + system.createDispatcher(dispatcherName, initDispatcherExecutor(dispatcherName, poolSize)); + } + + private ExecutorService initDispatcherExecutor(String dispatcherName, int poolSize) { + if (poolSize == 0) { + int cores = Runtime.getRuntime().availableProcessors(); + poolSize = Math.max(1, cores / 2); + } + if (poolSize == 1) { + return Executors.newSingleThreadExecutor(ThingThreadFactory.forName(dispatcherName)); + } else { + return ThingExecutors.newWorkStealingPool(poolSize, dispatcherName); + } + } +} diff --git a/modules/pom.xml b/modules/pom.xml index c5ae496..6b5f88b 100644 --- a/modules/pom.xml +++ b/modules/pom.xml @@ -11,6 +11,7 @@ pom + actor-biz dequeue thing msg diff --git a/pom.xml b/pom.xml index 507b021..e660df1 100644 --- a/pom.xml +++ b/pom.xml @@ -484,6 +484,11 @@ + + com.thing.common + actor + ${project.version} + com.thing.common cache @@ -536,6 +541,11 @@ + + com.thing.modules + actor-biz + ${project.version} + com.thing.modules alarm