diff --git a/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/BpmStartedListener.java b/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/BpmStartedListener.java index 60c8c9b..3c830c4 100644 --- a/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/BpmStartedListener.java +++ b/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/BpmStartedListener.java @@ -7,7 +7,10 @@ import cc.smtweb.framework.core.db.impl.DatabaseUtil; import cc.smtweb.framework.core.db.vo.ModelCatalog; import cc.smtweb.framework.core.mvc.controller.IStartListener; import cc.smtweb.framework.core.mvc.service.TreeHelper; +import cc.smtweb.framework.core.systask.SysServiceFactory; import cc.smtweb.system.bpm.web.design.db.ModelCatalogTreeHelper; +import cc.smtweb.system.bpm.web.sys.oneTimeService.OneTimeServiceFactory; +import cc.smtweb.system.bpm.web.sys.oneTimeService.OneTimeTaskCleanService; /** * Created by Akmm at 2022/7/8 19:57 @@ -21,6 +24,8 @@ public class BpmStartedListener implements IStartListener { @Override public void init() { SwConsts.SysParam.RUN_PROJECTS = "bpm"; + SysServiceFactory.getInstance().reg(new OneTimeTaskCleanService()); + TreeHelper.regTreeHelper(ModelCatalog.ENTITY_NAME, ModelCatalogTreeHelper.class); } @Override @@ -29,7 +34,6 @@ public class BpmStartedListener implements IStartListener { new DatabaseUtil(true, false).checkDb(); //初始化缓存 CacheManager.getIntance().init(); - - TreeHelper.regTreeHelper(ModelCatalog.ENTITY_NAME, ModelCatalogTreeHelper.class); + OneTimeServiceFactory.getInstance().start(); } } diff --git a/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/sys/oneTimeService/BaseOneTimeHandler.java b/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/sys/oneTimeService/BaseOneTimeHandler.java new file mode 100644 index 0000000..fa7729d --- /dev/null +++ b/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/sys/oneTimeService/BaseOneTimeHandler.java @@ -0,0 +1,33 @@ +package cc.smtweb.system.bpm.web.sys.oneTimeService; + +import cc.smtweb.framework.core.common.R; +import cc.smtweb.framework.core.common.SwMap; +import cc.smtweb.framework.core.util.JsonUtil; + +/** + * Created by Akmm at 2021/12/2 19:30 + */ +public abstract class BaseOneTimeHandler { + private OneTimeSysService service; + //输入参数 + protected SwMap params; + + public BaseOneTimeHandler() { + } + + public void setService(OneTimeSysService service) { + this.service = service; + } + + public void setParams(String inParam) { + this.params = JsonUtil.parse(inParam, SwMap.class); + } + + //执行 + public abstract R work() throws Exception; + + //回写进度,百分比 + protected void setProcess(int process) throws Exception { + service.setProcess(process); + } +} diff --git a/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/sys/oneTimeService/OneTimeServiceFactory.java b/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/sys/oneTimeService/OneTimeServiceFactory.java new file mode 100644 index 0000000..1ce3b32 --- /dev/null +++ b/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/sys/oneTimeService/OneTimeServiceFactory.java @@ -0,0 +1,138 @@ +package cc.smtweb.system.bpm.web.sys.oneTimeService; + +import cc.smtweb.framework.core.common.SwConsts; +import cc.smtweb.framework.core.db.DbEngine; +import cc.smtweb.framework.core.exception.BizException; +import cc.smtweb.framework.core.exception.SwException; +import cc.smtweb.framework.core.session.UserSession; +import cc.smtweb.framework.core.util.DateUtil; +import cc.smtweb.framework.core.util.JsonUtil; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Created by Akmm at 2018/11/27 23:40 + * 后台一次性系统任务工厂,不允许用户干预的、即时执行的任务 + */ +public class OneTimeServiceFactory { + //允许的最大线程个数 + private static final int max_thread_size = 200; + private static OneTimeServiceFactory instance = null; + + static { + instance = new OneTimeServiceFactory(); + } + + public static OneTimeServiceFactory getInstance() { + return instance; + } + + + private ScheduledExecutorService schedule; + + private Map> mapType = new HashMap<>(); + + private OnetimeTaskHelper taskHelper = null; + + public OneTimeServiceFactory() { + } + + public void regHandle(String type, Class cls) { + mapType.put(type, cls); + } + + public BaseOneTimeHandler getHandle(String type){ + Class cls = mapType.get(type); + if (cls == null) throw new SwException("没有注册此服务类(" + type + ")!"); + + try { + return cls.newInstance(); + } catch (Exception e) { + throw new SwException(e); + } + } + + public OnetimeTaskHelper getTasksHelper() { + if (taskHelper == null) { + synchronized (OnetimeTaskHelper.class) { + if (taskHelper == null) { + taskHelper = new OnetimeTaskHelper(SwConsts.SysParam.machineId, DbEngine.getInstance().findDao(OnetimeTask.class)); + } + } + } + return taskHelper; + } + + /** + * 添加任务 + * @param type 任务类别,必须先调用regHandle注册任务执行类 + * @param key 任务唯一key,同一type下不允许重复 + * @param name 任务名称,共查询时察看 + * @param bizId 业务id,可空,方便业务查询 + * @param inparams 执行Api时的输入参数,json格式字符串 + * @param us 登录信息 + * @throws Exception + */ + public void push(String type, String key, String name, long bizId, String inparams, UserSession us) { + if (getTasksHelper().isExistByKey(type, key)) { + throw new BizException("该任务已经存在,不能新增!"); + } + OnetimeTask task = new OnetimeTask(); + task.init(); + task.setType(type); + task.setCode(key); + task.setName(name); + task.setBizId(bizId); + task.setInparams(inparams); + task.setStatu(OneTimeStatu.WAIT.value); + task.setCreateTime(DateUtil.nowDateTimeLong()); + task.setCreateUserId(us.getUserId()); + + schedule.schedule(new OneTimeSysService(task), 1L, TimeUnit.MILLISECONDS); + } + + public void push(String type, String key, String name, long bizId, Map inparams, UserSession us) { + push(type, key, name, bizId, JsonUtil.encodeString(inparams), us); + } + + public void push(String type, String key, String name, long bizId) { + push(type, key, name, bizId, "", UserSession.createSys()); + } + + /** + * 重新执行指定任务 + * @param type 任务类别,必须先调用regHandle注册任务执行类 + * @param key 任务唯一key,同一type下不允许重复 + * @throws Exception + */ + public void restart(String type, String key) { + OnetimeTask task = getTasksHelper().findOneByTk(type, key); + if (task != null && !task.isNew()) { + schedule.schedule(new OneTimeSysService(task), 1L, TimeUnit.MILLISECONDS); + } + } + + //启动任务 + public void start() { + if (schedule != null) { + stop(); + } + List listService = getTasksHelper().findWaiting(); + schedule = Executors.newScheduledThreadPool(max_thread_size); + for (OnetimeTask task : listService) { + schedule.schedule(new OneTimeSysService(task), 10L, TimeUnit.SECONDS); + } + } + + //停止任务 + public void stop() { + if (schedule != null) { + schedule.shutdown(); + } + } +} diff --git a/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/sys/oneTimeService/OneTimeStatu.java b/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/sys/oneTimeService/OneTimeStatu.java new file mode 100644 index 0000000..51590e1 --- /dev/null +++ b/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/sys/oneTimeService/OneTimeStatu.java @@ -0,0 +1,15 @@ +package cc.smtweb.system.bpm.web.sys.oneTimeService; + +import cc.smtweb.framework.core.common.IntEnum; + +/** + * Created by Akmm at 2021/12/2 18:46 + * 任务状态,0待执行,1执行中,8执行失败, 9执行成功 + */ +public class OneTimeStatu extends IntEnum { + public static OneTimeStatu instance = new OneTimeStatu(); + public static IntEnumBean WAIT = instance.addEnum(0, "待执行"); + public static IntEnumBean RUNNING = instance.addEnum(1, "执行中"); + public static IntEnumBean FAILED = instance.addEnum(8, "执行失败"); + public static IntEnumBean SUCCESS = instance.addEnum(9, "执行成功"); +} diff --git a/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/sys/oneTimeService/OneTimeSysService.java b/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/sys/oneTimeService/OneTimeSysService.java new file mode 100644 index 0000000..ef0ad58 --- /dev/null +++ b/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/sys/oneTimeService/OneTimeSysService.java @@ -0,0 +1,72 @@ +package cc.smtweb.system.bpm.web.sys.oneTimeService; + +import cc.smtweb.framework.core.db.DbEngine; +import cc.smtweb.framework.core.db.jdbc.IDbWorker; +import cc.smtweb.framework.core.util.CommUtil; +import cc.smtweb.framework.core.util.DateUtil; +import lombok.extern.slf4j.Slf4j; + +/** + * Created by Akmm at 2018/11/27 23:46 + * 系统任务基类 + */ +@Slf4j +public class OneTimeSysService implements Runnable { + private OnetimeTask task; + + public OneTimeSysService(OnetimeTask task) { + this.task = task; + } + + + @Override + public void run() { + long time = System.currentTimeMillis(); + try { + log.debug(task.getName() + ":::开始执行........."); + BaseOneTimeHandler handler = OneTimeServiceFactory.getInstance().getHandle(task.getType()); + handler.setParams(task.getInparams()); + handler.setService(this); + + task.setBeginTime(DateUtil.nowDateTimeLong()); + task.setStatu(OneTimeStatu.RUNNING.value); + task.setInfo(null); + task.setProcess(0); + OneTimeServiceFactory.getInstance().getTasksHelper().updateTask(task); + + handler.work(); + + task.setEndTime(DateUtil.nowDateTimeLong()); + task.setStatu(OneTimeStatu.SUCCESS.value); + task.setProcess(100); + OneTimeServiceFactory.getInstance().getTasksHelper().updateTask(task); + + log.debug(task.getName() + ":::执行完成,耗时:" + (System.currentTimeMillis() - time) + "毫秒........."); + } catch (Exception e) { + task.setEndTime(DateUtil.nowDateTimeLong()); + task.setStatu(OneTimeStatu.FAILED.value); + task.setInfo(CommUtil.getOrigMsg(e)); + + try { + OneTimeServiceFactory.getInstance().getTasksHelper().updateTask(task); + } catch (Exception exception) { + log.error("更新任务状态失败", e); + } + log.error(task.getName() + ":::执行失败", e); + } + } + + //回写进度,百分比 + protected void setProcess(int process) throws Exception { + if (task.getProcess() != process) { + task.setProcess(process); + DbEngine.getInstance().doTransSingle(new IDbWorker() { + @Override + public void work() { + OneTimeServiceFactory.getInstance().getTasksHelper().updatProcess(task); + } + }); + + } + } +} diff --git a/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/sys/oneTimeService/OneTimeTaskCleanService.java b/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/sys/oneTimeService/OneTimeTaskCleanService.java new file mode 100644 index 0000000..a7c9c7e --- /dev/null +++ b/smtweb-framework/bpm/src/main/java/cc/smtweb/system/bpm/web/sys/oneTimeService/OneTimeTaskCleanService.java @@ -0,0 +1,28 @@ +package cc.smtweb.system.bpm.web.sys.oneTimeService; + +import cc.smtweb.framework.core.db.DbEngine; +import cc.smtweb.framework.core.db.EntityHelper; +import cc.smtweb.framework.core.systask.BaseSysService; +import cc.smtweb.framework.core.util.DateUtil; + +/** + * Created by Akmm at 2021/12/6 19:19 + * 清除当前已结束任务, 10分钟一次 + */ +public class OneTimeTaskCleanService extends BaseSysService { + @Override + public String getTitle() { + return "清理后台任务"; + } + + @Override + public int getInterval() { + return 600; + } + + @Override + public void work() throws Exception { + String t = DateUtil.getNowYm() + "01000000"; + DbEngine.getInstance().update("delete from " + EntityHelper.getSchemaTableName(OnetimeTask.ENTITY_NAME) + " where statu=? and create_time dao; + + public OnetimeTaskHelper(int runServer, EntityDao dao) { + this.runServer = runServer; + this.dao = dao; + } + + //获取所有等待中的任务,服务启动时执行一次 + public List findWaiting() { + return dao.queryWhere(" sot_run_server=? and sot_statu < ?", runServer, OneTimeStatu.FAILED.value); + } + + public OnetimeTask findOneByTk(String type, String key) { + return dao.queryEntityWhere(" sot_type=? and sot_code=?", type, key); + } + + public boolean isExistByKey(String type, String key) { + return DbEngine.getInstance().isExists("select sot_id from " + EntityHelper.getSchemaTableName(OnetimeTask.ENTITY_NAME) + " where sot_type=? and sot_code=?", type, key); + } + + public void updateTask(OnetimeTask task) { + if (task.isNew()) { + task.setEntityId(DbEngine.getInstance().nextId()); + task.setRunServer(runServer); + DbEngine.getInstance().doTrans(new IDbWorker() { + @Override + public void work() { + dao.insertEntity(task); + } + + @Override + public void doAfterDbRollback() { + task.setEntityId(0L); + } + }); + } else { + DbEngine.getInstance().doTrans(new IDbWorker() { + @Override + public void work() { + task.setRunServer(runServer); + dao.updateEntity(task, "sot_begin_time,sot_end_time,sot_statu,sot_info,sot_process,sot_run_server"); + } + }); + } + } + + public void updatProcess(OnetimeTask task) { + dao.updateEntity(task, "sot_process"); + } +} \ No newline at end of file diff --git a/smtweb-framework/bpm/src/main/resources/config/application.yaml b/smtweb-framework/bpm/src/main/resources/config/application.yaml index f2f87e9..16677bf 100644 --- a/smtweb-framework/bpm/src/main/resources/config/application.yaml +++ b/smtweb-framework/bpm/src/main/resources/config/application.yaml @@ -1,5 +1,6 @@ smtweb: machine-id: 1 + enable-job: true file: local-path: /data/sw/files/ url: http://127.0.0.1:8888/sw/files/ diff --git a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/CoreApplicationStartedListener.java b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/CoreApplicationStartedListener.java index cbe83ad..629b789 100644 --- a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/CoreApplicationStartedListener.java +++ b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/CoreApplicationStartedListener.java @@ -4,7 +4,6 @@ import cc.smtweb.framework.core.common.SwConsts; import cc.smtweb.framework.core.mvc.controller.IStartListener; import cc.smtweb.framework.core.mvc.controller.scan.ApplicationScanner; import cc.smtweb.framework.core.mvc.controller.scan.BeanManager; -import cc.smtweb.framework.core.systask.TaskStartEvent; import cc.smtweb.framework.core.systask.WebStartedEvent; import lombok.SneakyThrows; import org.springframework.boot.context.event.ApplicationStartedEvent; @@ -28,7 +27,6 @@ public class CoreApplicationStartedListener implements ApplicationListener(new RedisPooledObjectFactory(redisClient), config); -// pool = ConnectionPoolSupport.createGenericObjectPool( -// () -> redisClient.connect(ByteArrayCodec.INSTANCE), -// new GenericObjectPoolConfig(), false); - redisSysTask = new RedisSysTask(applicationContext, redisClient, SCRIBE_SYSTEM + redisUri.getDatabase() + "_*", channel); - sysTaskManager.add(redisSysTask); + redisSysTask.run(); } @PreDestroy diff --git a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/cache/redis/RedisSysTask.java b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/cache/redis/RedisSysTask.java index 5042292..148bb50 100644 --- a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/cache/redis/RedisSysTask.java +++ b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/cache/redis/RedisSysTask.java @@ -1,6 +1,5 @@ package cc.smtweb.framework.core.cache.redis; -import cc.smtweb.framework.core.systask.ISysTask; import cc.smtweb.framework.core.util.JsonUtil; import io.lettuce.core.RedisClient; import io.lettuce.core.pubsub.RedisPubSubAdapter; @@ -8,7 +7,7 @@ import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; import org.springframework.context.ApplicationContext; -public class RedisSysTask implements ISysTask { +public class RedisSysTask { private ApplicationContext applicationContext; private StatefulRedisPubSubConnection connection; private RedisClient redisClient; @@ -24,7 +23,6 @@ public class RedisSysTask implements ISysTask { this.selfChannel = selfChannel; } - @Override public int run() { // 非集群模式下的发布订阅 if (connection == null) { diff --git a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/cache/redis/config/RedisConfig.java b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/cache/redis/config/RedisConfig.java index 3a768e1..796f90c 100644 --- a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/cache/redis/config/RedisConfig.java +++ b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/cache/redis/config/RedisConfig.java @@ -2,7 +2,6 @@ package cc.smtweb.framework.core.cache.redis.config; import cc.smtweb.framework.core.cache.redis.RedisManager; -import cc.smtweb.framework.core.systask.SysTaskManager; import io.lettuce.core.RedisURI; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -40,8 +39,6 @@ public class RedisConfig { @Autowired private ApplicationContext applicationContext; - @Autowired - private SysTaskManager sysTaskManager; @Bean public RedisManager redisManager() { @@ -59,6 +56,6 @@ public class RedisConfig { redisUri.setPassword((CharSequence) redisPassword.trim()); } - return new RedisManager(applicationContext, sysTaskManager, redisUri); + return new RedisManager(applicationContext, redisUri); } } diff --git a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/common/SwConsts.java b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/common/SwConsts.java index a4149f5..6eeb901 100644 --- a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/common/SwConsts.java +++ b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/common/SwConsts.java @@ -11,6 +11,10 @@ public interface SwConsts { public static boolean SYS_STARTED = false; //运行的项目,多个用半角逗号分隔 public static String RUN_PROJECTS = ""; + //启动时赋值:服务器编号 + public static int machineId; + //是否执行定时任务 + public static boolean enableJob = false; } //错误码 @@ -46,4 +50,7 @@ public interface SwConsts { String DEF_PWD = "abc@123456"; //初始密码 String LOGIN_VERIFY_CODE = "_VERIFY_CODE"; String _LOGIN_USER_ID_IN_SESSION = "_LOGIN_USER_ID_IN_SESSION"; + + //定时任务key + String REDIS_KEY_BASE_JOB = "jobexe"; } diff --git a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/BaseSysService.java b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/BaseSysService.java new file mode 100644 index 0000000..66e1a35 --- /dev/null +++ b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/BaseSysService.java @@ -0,0 +1,46 @@ +package cc.smtweb.framework.core.systask; + +import cc.smtweb.framework.core.common.SwConsts; +import lombok.extern.slf4j.Slf4j; + +/** + * Created by Akmm at 2018/11/27 23:46 + * 系统任务基类 + */ +@Slf4j +public abstract class BaseSysService implements Runnable { + public boolean isPrintLog() { + return true; + } + + //标题,描述性文字 + public abstract String getTitle(); + + //间隔时间,单位秒 + public abstract int getInterval(); + + @Override + public void run() { + try { + final String id = this.getClass().getName(); + SingleRequestHelper.singleRequest(SwConsts.REDIS_KEY_BASE_JOB, id, "任务执行中,本次忽略.........", new SingleRequestHelper.ISingleWork() { + @Override + public void doSingleWork() throws Exception { + long time = System.currentTimeMillis(); + if (isPrintLog()) { + log.debug(getTitle() + ":::开始执行........."); + } + work(); + if (isPrintLog()) { + log.debug(getTitle() + ":::执行完成,耗时:" + (System.currentTimeMillis() - time) + "毫秒........."); + } + } + }); + } catch (Exception e) { + log.error(getTitle() + ":::执行失败", e); + } + } + + //主要要干的活 + protected abstract void work() throws Exception; +} diff --git a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/ISysTask.java b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/ISysTask.java deleted file mode 100644 index fb96bcc..0000000 --- a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/ISysTask.java +++ /dev/null @@ -1,8 +0,0 @@ -package cc.smtweb.framework.core.systask; - -/** - * 任务接口 - */ -public interface ISysTask { - int run(); -} diff --git a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/SingleRequestHelper.java b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/SingleRequestHelper.java new file mode 100644 index 0000000..5997a52 --- /dev/null +++ b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/SingleRequestHelper.java @@ -0,0 +1,33 @@ +package cc.smtweb.framework.core.systask; + +import cc.smtweb.framework.core.cache.redis.RedisManager; +import cc.smtweb.framework.core.exception.SwException; +import cc.smtweb.framework.core.util.DateUtil; +import cc.smtweb.framework.core.util.StringUtil; + +public class SingleRequestHelper { + + public static interface ISingleWork { + void doSingleWork() throws Exception; + + } + + public static void singleRequest(String region_key, String key, ISingleWork singleWork) throws Exception { + singleRequest(region_key, key, "其他人正在执行该操作,请等待!", singleWork); + } + + public static void singleRequest(String region_key, String key, String tip_msg, ISingleWork singleWork) throws Exception { + boolean clearCache = false; + try { + String lastTime = RedisManager.getInstance().hGet(region_key, key, String.class); + if (StringUtil.isNotEmpty(lastTime)) { + throw new SwException(tip_msg); + } + RedisManager.getInstance().hSet(region_key, key, DateUtil.nowDateString()); + clearCache = true; + singleWork.doSingleWork(); + } finally { + if (clearCache) RedisManager.getInstance().hdel(region_key, key); + } + } +} diff --git a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/SysServiceFactory.java b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/SysServiceFactory.java new file mode 100644 index 0000000..bfa82d5 --- /dev/null +++ b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/SysServiceFactory.java @@ -0,0 +1,58 @@ +package cc.smtweb.framework.core.systask; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Created by Akmm at 2018/11/27 23:40 + * 后台系统任务工厂,不允许用户干预的任务放在这里 + */ +public class SysServiceFactory { + //允许的最大线程个数 + private static final int max_thread_size = 16; + private static SysServiceFactory instance = null; + + static { + instance = new SysServiceFactory(); + } + + public static SysServiceFactory getInstance() { + return instance; + } + + + private ScheduledExecutorService schedule; + //待执行的任务 + private List listService = new ArrayList<>(); + + public SysServiceFactory() { + + } + + //注册任务 + public void reg(BaseSysService service) { + listService.add(service); + } + + + //启动任务 + protected void start() { + if (schedule != null) { + stop(); + } + int size = listService.size(); + if (size > max_thread_size) size = max_thread_size; + schedule = Executors.newScheduledThreadPool(size); + for (BaseSysService s : listService) { + schedule.scheduleWithFixedDelay(s, 60L, (long) s.getInterval(), TimeUnit.SECONDS); + } + } + + //停止任务 + protected void stop() { + schedule.shutdown(); + } +} diff --git a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/SysTaskManager.java b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/SysTaskManager.java deleted file mode 100644 index f32ff36..0000000 --- a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/SysTaskManager.java +++ /dev/null @@ -1,40 +0,0 @@ -package cc.smtweb.framework.core.systask; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationContext; -import org.springframework.context.event.EventListener; -import org.springframework.stereotype.Service; - -import java.util.ArrayList; -import java.util.List; - -@Service -public class SysTaskManager { - @Autowired - private ApplicationContext applicationContext; - - private List tasks = new ArrayList<>(); - - public void add(ISysTask task) { - tasks.add(task); - } - - @EventListener - public void onTaskStartEvent(TaskStartEvent event) { - runAll(); - } - - // 每隔60秒定时执行 -// @Scheduled(fixedRate = 60000) -// public void fixedRateJob() { -// System.out.println("fixedRate 每隔60秒" + new java.util.Date()); -// -// runAll(); -// } - - private void runAll() { - for (ISysTask task : tasks) { - task.run(); - } - } -} diff --git a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/SysThreadPool.java b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/SysThreadPool.java new file mode 100644 index 0000000..605d9d0 --- /dev/null +++ b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/SysThreadPool.java @@ -0,0 +1,89 @@ +package cc.smtweb.framework.core.systask; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Created by Akmm at 2018/11/27 23:40 + * 通用线程池发放 + */ +@Slf4j +public class SysThreadPool { + //允许的最大线程池 + private static final int max_pool_size = 100; + private static SysThreadPool instance = null; + + static { + instance = new SysThreadPool(); + } + + public static SysThreadPool getInstance() { + return instance; + } + + private ThreadPoolExecutor threadPool; + + static class SysThreadFactory implements ThreadFactory { + private static final AtomicInteger poolNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + SysThreadFactory() { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + namePrefix = "DfpPool-" + + poolNumber.getAndIncrement() + + "-Thread-"; + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + threadNumber.getAndIncrement(), + 0); + if (t.isDaemon()) + t.setDaemon(false); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + } + } + + public void addTask(SysThreadWorker worker) { + if (threadPool == null) { + threadPool = new ThreadPoolExecutor(2, max_pool_size, + 60L, TimeUnit.SECONDS, + new ArrayBlockingQueue(100), new SysThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); + } + try { + threadPool.execute(worker); + log.debug("add task to DfpPool success -> [PoolSize:" + threadPool.getPoolSize() + ",ActiveCount:" + threadPool.getActiveCount() + ",CompletedTaskCount:" + threadPool.getCompletedTaskCount() + "]"); + } catch (Exception e) { + log.debug("add task to DfpPool pools error:", e); + } + } + + //停止任务 + protected void stop() { + if (threadPool != null) threadPool.shutdown(); + } + + public static void main(String[] args) { + for (int i = 1; i < 100; i++) { + int j = i; + SysThreadPool.getInstance().addTask(new SysThreadWorker(null, null) { + @Override + public void localWork() throws Exception { + Thread.sleep(1000); + log.debug("线程:[" + j + "]执行"); + } + }); + } + } +} diff --git a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/SysThreadWorker.java b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/SysThreadWorker.java new file mode 100644 index 0000000..bf79f7e --- /dev/null +++ b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/SysThreadWorker.java @@ -0,0 +1,39 @@ +package cc.smtweb.framework.core.systask; + + +import cc.smtweb.framework.core.util.StringUtil; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class SysThreadWorker implements Runnable { + private String loginPartyId; + + private String loginOwnerPartyId; + + private String bizName; + + protected SysThreadWorker(String loginPartyId, String loginOwnerPartyId) { + this.loginPartyId = loginPartyId; + this.loginOwnerPartyId = loginOwnerPartyId; + } + + protected SysThreadWorker(String loginPartyId, String loginOwnerPartyId, String bizName) { + this.loginPartyId = loginPartyId; + this.loginOwnerPartyId = loginOwnerPartyId; + this.bizName = StringUtil.checkNull(bizName); + } + + + @Override + public void run() { + try { +// SysParams.setLoginOwnerPartyId(loginOwnerPartyId); +// SysParams.setLoginPartyId(loginPartyId); + localWork(); + } catch (Exception e) { + log.error("DfpPool handle error:", e); + } + } + + public abstract void localWork() throws Exception; +} diff --git a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/TaskStartEvent.java b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/TaskStartEvent.java deleted file mode 100644 index 35b8bf0..0000000 --- a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/systask/TaskStartEvent.java +++ /dev/null @@ -1,4 +0,0 @@ -package cc.smtweb.framework.core.systask; - -public class TaskStartEvent { -} diff --git a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/util/Consts.java b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/util/Consts.java index aeb9e8f..68c5881 100644 --- a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/util/Consts.java +++ b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/util/Consts.java @@ -18,4 +18,5 @@ public class Consts { public final static String FIELD_LAST_TIME = "last_time";//最后更新时间,字段名 public final static String FIELD_CREATE_PARTY = "create_party_id";//创建单位,字段名 + }