package com.hivekion.room.bean; import com.hivekion.Global; import com.hivekion.room.func.TaskAction; import com.hivekion.scenario.bean.ScenarioWsParam; import com.hivekion.scenario.entity.ScenarioTask; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.springframework.web.reactive.function.client.WebClient; /** * [类的简要说明] *
* [详细描述,可选] *
* * @author LiDongYU * @since 2025/7/22 */ public abstract class AbtParentTask implements TaskAction { protected final AtomicLong duringTime = new AtomicLong(0); protected final ScheduledExecutorService schedule = Executors.newScheduledThreadPool( 1); protected ScheduledFuture> scheduledFuture; //任务数据 protected final ScenarioTask scenarioTask; protected final String roomId; protected WebClient webClient = WebClient.create(); ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, // 核心线程数 10, // 最大线程数 60L, // 空闲线程存活时间 TimeUnit.SECONDS, // 时间单位 new LinkedBlockingQueue<>(100), // 任务队列 new CustomThreadFactory("MyPool"), // 线程工厂 new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略 ); public AbtParentTask(ScenarioTask scenarioTask, String roomId) { this.scenarioTask = scenarioTask; this.roomId = roomId; } protected void start() { scheduledFuture = schedule.scheduleAtFixedRate(() -> { ScenarioWsParam scenarioWsParam = Global.roomParamMap.get( scenarioTask.getScenarioId() + "_" + roomId); if (scenarioWsParam == null) { duringTime.getAndSet(1); } else { duringTime.getAndSet(scenarioWsParam.getMag()); } business(); }, 0, 1, TimeUnit.SECONDS); } protected abstract void finished(); protected abstract void setMag(int mag); @Override public void doSomeThing() { } @Override public String getId() { return scenarioTask.getId(); } @Override public String getType() { return scenarioTask.getTaskType(); } public void cancelAllTask() { if (scheduledFuture != null) { scheduledFuture.cancel(true); } } protected abstract void business(); } // 自定义线程工厂 class CustomThreadFactory implements ThreadFactory { private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; public CustomThreadFactory(String namePrefix) { this.namePrefix = namePrefix + "-thread-"; } @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement()); thread.setDaemon(false); // 设置为非守护线程 thread.setPriority(Thread.NORM_PRIORITY); return thread; } }