@@ -7,7 +7,10 @@ import cc.smtweb.framework.core.db.impl.DatabaseUtil; | |||
import cc.smtweb.framework.core.db.vo.ModelCatalog; | |||
import cc.smtweb.framework.core.mvc.controller.IStartListener; | |||
import cc.smtweb.framework.core.mvc.service.TreeHelper; | |||
import cc.smtweb.framework.core.systask.SysServiceFactory; | |||
import cc.smtweb.system.bpm.web.design.db.ModelCatalogTreeHelper; | |||
import cc.smtweb.system.bpm.web.sys.oneTimeService.OneTimeServiceFactory; | |||
import cc.smtweb.system.bpm.web.sys.oneTimeService.OneTimeTaskCleanService; | |||
/** | |||
* Created by Akmm at 2022/7/8 19:57 | |||
@@ -21,6 +24,8 @@ public class BpmStartedListener implements IStartListener { | |||
@Override | |||
public void init() { | |||
SwConsts.SysParam.RUN_PROJECTS = "bpm"; | |||
SysServiceFactory.getInstance().reg(new OneTimeTaskCleanService()); | |||
TreeHelper.regTreeHelper(ModelCatalog.ENTITY_NAME, ModelCatalogTreeHelper.class); | |||
} | |||
@Override | |||
@@ -29,7 +34,6 @@ public class BpmStartedListener implements IStartListener { | |||
new DatabaseUtil(true, false).checkDb(); | |||
//初始化缓存 | |||
CacheManager.getIntance().init(); | |||
TreeHelper.regTreeHelper(ModelCatalog.ENTITY_NAME, ModelCatalogTreeHelper.class); | |||
OneTimeServiceFactory.getInstance().start(); | |||
} | |||
} |
@@ -0,0 +1,33 @@ | |||
package cc.smtweb.system.bpm.web.sys.oneTimeService; | |||
import cc.smtweb.framework.core.common.R; | |||
import cc.smtweb.framework.core.common.SwMap; | |||
import cc.smtweb.framework.core.util.JsonUtil; | |||
/** | |||
* Created by Akmm at 2021/12/2 19:30 | |||
*/ | |||
public abstract class BaseOneTimeHandler { | |||
private OneTimeSysService service; | |||
//输入参数 | |||
protected SwMap params; | |||
public BaseOneTimeHandler() { | |||
} | |||
public void setService(OneTimeSysService service) { | |||
this.service = service; | |||
} | |||
public void setParams(String inParam) { | |||
this.params = JsonUtil.parse(inParam, SwMap.class); | |||
} | |||
//执行 | |||
public abstract R work() throws Exception; | |||
//回写进度,百分比 | |||
protected void setProcess(int process) throws Exception { | |||
service.setProcess(process); | |||
} | |||
} |
@@ -0,0 +1,138 @@ | |||
package cc.smtweb.system.bpm.web.sys.oneTimeService; | |||
import cc.smtweb.framework.core.common.SwConsts; | |||
import cc.smtweb.framework.core.db.DbEngine; | |||
import cc.smtweb.framework.core.exception.BizException; | |||
import cc.smtweb.framework.core.exception.SwException; | |||
import cc.smtweb.framework.core.session.UserSession; | |||
import cc.smtweb.framework.core.util.DateUtil; | |||
import cc.smtweb.framework.core.util.JsonUtil; | |||
import java.util.HashMap; | |||
import java.util.List; | |||
import java.util.Map; | |||
import java.util.concurrent.Executors; | |||
import java.util.concurrent.ScheduledExecutorService; | |||
import java.util.concurrent.TimeUnit; | |||
/** | |||
* Created by Akmm at 2018/11/27 23:40 | |||
* 后台一次性系统任务工厂,不允许用户干预的、即时执行的任务 | |||
*/ | |||
public class OneTimeServiceFactory { | |||
//允许的最大线程个数 | |||
private static final int max_thread_size = 200; | |||
private static OneTimeServiceFactory instance = null; | |||
static { | |||
instance = new OneTimeServiceFactory(); | |||
} | |||
public static OneTimeServiceFactory getInstance() { | |||
return instance; | |||
} | |||
private ScheduledExecutorService schedule; | |||
private Map<String, Class<? extends BaseOneTimeHandler>> mapType = new HashMap<>(); | |||
private OnetimeTaskHelper taskHelper = null; | |||
public OneTimeServiceFactory() { | |||
} | |||
public void regHandle(String type, Class<? extends BaseOneTimeHandler> cls) { | |||
mapType.put(type, cls); | |||
} | |||
public BaseOneTimeHandler getHandle(String type){ | |||
Class<? extends BaseOneTimeHandler> cls = mapType.get(type); | |||
if (cls == null) throw new SwException("没有注册此服务类(" + type + ")!"); | |||
try { | |||
return cls.newInstance(); | |||
} catch (Exception e) { | |||
throw new SwException(e); | |||
} | |||
} | |||
public OnetimeTaskHelper getTasksHelper() { | |||
if (taskHelper == null) { | |||
synchronized (OnetimeTaskHelper.class) { | |||
if (taskHelper == null) { | |||
taskHelper = new OnetimeTaskHelper(SwConsts.SysParam.machineId, DbEngine.getInstance().findDao(OnetimeTask.class)); | |||
} | |||
} | |||
} | |||
return taskHelper; | |||
} | |||
/** | |||
* 添加任务 | |||
* @param type 任务类别,必须先调用regHandle注册任务执行类 | |||
* @param key 任务唯一key,同一type下不允许重复 | |||
* @param name 任务名称,共查询时察看 | |||
* @param bizId 业务id,可空,方便业务查询 | |||
* @param inparams 执行Api时的输入参数,json格式字符串 | |||
* @param us 登录信息 | |||
* @throws Exception | |||
*/ | |||
public void push(String type, String key, String name, long bizId, String inparams, UserSession us) { | |||
if (getTasksHelper().isExistByKey(type, key)) { | |||
throw new BizException("该任务已经存在,不能新增!"); | |||
} | |||
OnetimeTask task = new OnetimeTask(); | |||
task.init(); | |||
task.setType(type); | |||
task.setCode(key); | |||
task.setName(name); | |||
task.setBizId(bizId); | |||
task.setInparams(inparams); | |||
task.setStatu(OneTimeStatu.WAIT.value); | |||
task.setCreateTime(DateUtil.nowDateTimeLong()); | |||
task.setCreateUserId(us.getUserId()); | |||
schedule.schedule(new OneTimeSysService(task), 1L, TimeUnit.MILLISECONDS); | |||
} | |||
public void push(String type, String key, String name, long bizId, Map<String, Object> inparams, UserSession us) { | |||
push(type, key, name, bizId, JsonUtil.encodeString(inparams), us); | |||
} | |||
public void push(String type, String key, String name, long bizId) { | |||
push(type, key, name, bizId, "", UserSession.createSys()); | |||
} | |||
/** | |||
* 重新执行指定任务 | |||
* @param type 任务类别,必须先调用regHandle注册任务执行类 | |||
* @param key 任务唯一key,同一type下不允许重复 | |||
* @throws Exception | |||
*/ | |||
public void restart(String type, String key) { | |||
OnetimeTask task = getTasksHelper().findOneByTk(type, key); | |||
if (task != null && !task.isNew()) { | |||
schedule.schedule(new OneTimeSysService(task), 1L, TimeUnit.MILLISECONDS); | |||
} | |||
} | |||
//启动任务 | |||
public void start() { | |||
if (schedule != null) { | |||
stop(); | |||
} | |||
List<OnetimeTask> listService = getTasksHelper().findWaiting(); | |||
schedule = Executors.newScheduledThreadPool(max_thread_size); | |||
for (OnetimeTask task : listService) { | |||
schedule.schedule(new OneTimeSysService(task), 10L, TimeUnit.SECONDS); | |||
} | |||
} | |||
//停止任务 | |||
public void stop() { | |||
if (schedule != null) { | |||
schedule.shutdown(); | |||
} | |||
} | |||
} |
@@ -0,0 +1,15 @@ | |||
package cc.smtweb.system.bpm.web.sys.oneTimeService; | |||
import cc.smtweb.framework.core.common.IntEnum; | |||
/** | |||
* Created by Akmm at 2021/12/2 18:46 | |||
* 任务状态,0待执行,1执行中,8执行失败, 9执行成功 | |||
*/ | |||
public class OneTimeStatu extends IntEnum { | |||
public static OneTimeStatu instance = new OneTimeStatu(); | |||
public static IntEnumBean WAIT = instance.addEnum(0, "待执行"); | |||
public static IntEnumBean RUNNING = instance.addEnum(1, "执行中"); | |||
public static IntEnumBean FAILED = instance.addEnum(8, "执行失败"); | |||
public static IntEnumBean SUCCESS = instance.addEnum(9, "执行成功"); | |||
} |
@@ -0,0 +1,72 @@ | |||
package cc.smtweb.system.bpm.web.sys.oneTimeService; | |||
import cc.smtweb.framework.core.db.DbEngine; | |||
import cc.smtweb.framework.core.db.jdbc.IDbWorker; | |||
import cc.smtweb.framework.core.util.CommUtil; | |||
import cc.smtweb.framework.core.util.DateUtil; | |||
import lombok.extern.slf4j.Slf4j; | |||
/** | |||
* Created by Akmm at 2018/11/27 23:46 | |||
* 系统任务基类 | |||
*/ | |||
@Slf4j | |||
public class OneTimeSysService implements Runnable { | |||
private OnetimeTask task; | |||
public OneTimeSysService(OnetimeTask task) { | |||
this.task = task; | |||
} | |||
@Override | |||
public void run() { | |||
long time = System.currentTimeMillis(); | |||
try { | |||
log.debug(task.getName() + ":::开始执行........."); | |||
BaseOneTimeHandler handler = OneTimeServiceFactory.getInstance().getHandle(task.getType()); | |||
handler.setParams(task.getInparams()); | |||
handler.setService(this); | |||
task.setBeginTime(DateUtil.nowDateTimeLong()); | |||
task.setStatu(OneTimeStatu.RUNNING.value); | |||
task.setInfo(null); | |||
task.setProcess(0); | |||
OneTimeServiceFactory.getInstance().getTasksHelper().updateTask(task); | |||
handler.work(); | |||
task.setEndTime(DateUtil.nowDateTimeLong()); | |||
task.setStatu(OneTimeStatu.SUCCESS.value); | |||
task.setProcess(100); | |||
OneTimeServiceFactory.getInstance().getTasksHelper().updateTask(task); | |||
log.debug(task.getName() + ":::执行完成,耗时:" + (System.currentTimeMillis() - time) + "毫秒........."); | |||
} catch (Exception e) { | |||
task.setEndTime(DateUtil.nowDateTimeLong()); | |||
task.setStatu(OneTimeStatu.FAILED.value); | |||
task.setInfo(CommUtil.getOrigMsg(e)); | |||
try { | |||
OneTimeServiceFactory.getInstance().getTasksHelper().updateTask(task); | |||
} catch (Exception exception) { | |||
log.error("更新任务状态失败", e); | |||
} | |||
log.error(task.getName() + ":::执行失败", e); | |||
} | |||
} | |||
//回写进度,百分比 | |||
protected void setProcess(int process) throws Exception { | |||
if (task.getProcess() != process) { | |||
task.setProcess(process); | |||
DbEngine.getInstance().doTransSingle(new IDbWorker() { | |||
@Override | |||
public void work() { | |||
OneTimeServiceFactory.getInstance().getTasksHelper().updatProcess(task); | |||
} | |||
}); | |||
} | |||
} | |||
} |
@@ -0,0 +1,28 @@ | |||
package cc.smtweb.system.bpm.web.sys.oneTimeService; | |||
import cc.smtweb.framework.core.db.DbEngine; | |||
import cc.smtweb.framework.core.db.EntityHelper; | |||
import cc.smtweb.framework.core.systask.BaseSysService; | |||
import cc.smtweb.framework.core.util.DateUtil; | |||
/** | |||
* Created by Akmm at 2021/12/6 19:19 | |||
* 清除当前已结束任务, 10分钟一次 | |||
*/ | |||
public class OneTimeTaskCleanService extends BaseSysService { | |||
@Override | |||
public String getTitle() { | |||
return "清理后台任务"; | |||
} | |||
@Override | |||
public int getInterval() { | |||
return 600; | |||
} | |||
@Override | |||
public void work() throws Exception { | |||
String t = DateUtil.getNowYm() + "01000000"; | |||
DbEngine.getInstance().update("delete from " + EntityHelper.getSchemaTableName(OnetimeTask.ENTITY_NAME) + " where statu=? and create_time<?", OneTimeStatu.SUCCESS.value, t); | |||
} | |||
} |
@@ -0,0 +1,227 @@ | |||
package cc.smtweb.system.bpm.web.sys.oneTimeService; | |||
import cc.smtweb.framework.core.annotation.SwTable; | |||
import cc.smtweb.framework.core.db.impl.DefaultEntity; | |||
/** | |||
* Created by 1 at 2022-09-13 09:31:42 | |||
* 实体【[系统一次性任务](SYS_ONETIME_TASK)】的Entity类 | |||
*/ | |||
@SwTable("SYS_ONETIME_TASK") | |||
public class OnetimeTask extends DefaultEntity { | |||
public static final String ENTITY_NAME = "SYS_ONETIME_TASK"; | |||
public OnetimeTask() { | |||
super(ENTITY_NAME); | |||
} | |||
/** | |||
* 主键 | |||
*/ | |||
public long getId() { | |||
return getLong("sot_id"); | |||
} | |||
/** | |||
* 主键 | |||
*/ | |||
public void setId(long sot_id) { | |||
put("sot_id", sot_id); | |||
} | |||
/** | |||
* 编码 | |||
*/ | |||
public String getCode() { | |||
return getStr("sot_code"); | |||
} | |||
/** | |||
* 编码 | |||
*/ | |||
public void setCode(String sot_code) { | |||
put("sot_code", sot_code); | |||
} | |||
/** | |||
* 名称 | |||
*/ | |||
public String getName() { | |||
return getStr("sot_name"); | |||
} | |||
/** | |||
* 名称 | |||
*/ | |||
public void setName(String sot_name) { | |||
put("sot_name", sot_name); | |||
} | |||
/** | |||
* 业务id | |||
*/ | |||
public long getBizId() { | |||
return getLong("sot_biz_id"); | |||
} | |||
/** | |||
* 业务id | |||
*/ | |||
public void setBizId(long sot_biz_id) { | |||
put("sot_biz_id", sot_biz_id); | |||
} | |||
/** | |||
* 任务类别 | |||
*/ | |||
public String getType() { | |||
return getStr("sot_type"); | |||
} | |||
/** | |||
* 任务类别 | |||
*/ | |||
public void setType(String sot_type) { | |||
put("sot_type", sot_type); | |||
} | |||
/** | |||
* 输入参数 | |||
*/ | |||
public String getInparams() { | |||
return getStr("sot_inparams"); | |||
} | |||
/** | |||
* 输入参数 | |||
*/ | |||
public void setInparams(String sot_inparams) { | |||
put("sot_inparams", sot_inparams); | |||
} | |||
/** | |||
* 任务状态 | |||
*/ | |||
public int getStatu() { | |||
return getInt("sot_statu"); | |||
} | |||
/** | |||
* 任务状态 | |||
*/ | |||
public void setStatu(int sot_statu) { | |||
put("sot_statu", sot_statu); | |||
} | |||
/** | |||
* 执行进度 | |||
*/ | |||
public int getProcess() { | |||
return getInt("sot_process"); | |||
} | |||
/** | |||
* 执行进度 | |||
*/ | |||
public void setProcess(int sot_process) { | |||
put("sot_process", sot_process); | |||
} | |||
/** | |||
* 执行返回 | |||
*/ | |||
public String getInfo() { | |||
return getStr("sot_info"); | |||
} | |||
/** | |||
* 执行返回 | |||
*/ | |||
public void setInfo(String sot_info) { | |||
put("sot_info", sot_info); | |||
} | |||
/** | |||
* 开始时间 | |||
*/ | |||
public long getBeginTime() { | |||
return getLong("sot_begin_time"); | |||
} | |||
/** | |||
* 开始时间 | |||
*/ | |||
public void setBeginTime(long sot_begin_time) { | |||
put("sot_begin_time", sot_begin_time); | |||
} | |||
/** | |||
* 结束时间 | |||
*/ | |||
public long getEndTime() { | |||
return getLong("sot_end_time"); | |||
} | |||
/** | |||
* 结束时间 | |||
*/ | |||
public void setEndTime(long sot_end_time) { | |||
put("sot_end_time", sot_end_time); | |||
} | |||
/** | |||
* 执行机器 | |||
*/ | |||
public int getRunServer() { | |||
return getInt("sot_run_server"); | |||
} | |||
/** | |||
* 执行机器 | |||
*/ | |||
public void setRunServer(int sot_run_server) { | |||
put("sot_run_server", sot_run_server); | |||
} | |||
/** | |||
* 任务发布者 | |||
*/ | |||
public long getCreateUserId() { | |||
return getLong("sot_create_user_id"); | |||
} | |||
/** | |||
* 任务发布者 | |||
*/ | |||
public void setCreateUserId(long sot_create_user_id) { | |||
put("sot_create_user_id", sot_create_user_id); | |||
} | |||
/** | |||
* 任务发布时间 | |||
*/ | |||
public long getCreateTime() { | |||
return getLong("sot_create_time"); | |||
} | |||
/** | |||
* 任务发布时间 | |||
*/ | |||
public void setCreateTime(long sot_create_time) { | |||
put("sot_create_time", sot_create_time); | |||
} | |||
/** | |||
* 备注 | |||
*/ | |||
public String getRemark() { | |||
return getStr("sot_remark"); | |||
} | |||
/** | |||
* 备注 | |||
*/ | |||
public void setRemark(String sot_remark) { | |||
put("sot_remark", sot_remark); | |||
} | |||
} |
@@ -0,0 +1,68 @@ | |||
package cc.smtweb.system.bpm.web.sys.oneTimeService; | |||
import cc.smtweb.framework.core.common.SwConsts; | |||
import cc.smtweb.framework.core.db.DbEngine; | |||
import cc.smtweb.framework.core.db.EntityDao; | |||
import cc.smtweb.framework.core.db.EntityHelper; | |||
import cc.smtweb.framework.core.db.jdbc.IDbWorker; | |||
import java.util.List; | |||
/** | |||
* Created by zhenggm at 2021-12-02 18:12:57 | |||
* 实体【[系统一次性任务](TB_SYS_ONETIME_TASKS)】的数据库读写Dao类 | |||
*/ | |||
public final class OnetimeTaskHelper { | |||
//服务器编号 | |||
private int runServer = SwConsts.SysParam.machineId; | |||
private EntityDao<OnetimeTask> dao; | |||
public OnetimeTaskHelper(int runServer, EntityDao<OnetimeTask> dao) { | |||
this.runServer = runServer; | |||
this.dao = dao; | |||
} | |||
//获取所有等待中的任务,服务启动时执行一次 | |||
public List<OnetimeTask> findWaiting() { | |||
return dao.queryWhere(" sot_run_server=? and sot_statu < ?", runServer, OneTimeStatu.FAILED.value); | |||
} | |||
public OnetimeTask findOneByTk(String type, String key) { | |||
return dao.queryEntityWhere(" sot_type=? and sot_code=?", type, key); | |||
} | |||
public boolean isExistByKey(String type, String key) { | |||
return DbEngine.getInstance().isExists("select sot_id from " + EntityHelper.getSchemaTableName(OnetimeTask.ENTITY_NAME) + " where sot_type=? and sot_code=?", type, key); | |||
} | |||
public void updateTask(OnetimeTask task) { | |||
if (task.isNew()) { | |||
task.setEntityId(DbEngine.getInstance().nextId()); | |||
task.setRunServer(runServer); | |||
DbEngine.getInstance().doTrans(new IDbWorker() { | |||
@Override | |||
public void work() { | |||
dao.insertEntity(task); | |||
} | |||
@Override | |||
public void doAfterDbRollback() { | |||
task.setEntityId(0L); | |||
} | |||
}); | |||
} else { | |||
DbEngine.getInstance().doTrans(new IDbWorker() { | |||
@Override | |||
public void work() { | |||
task.setRunServer(runServer); | |||
dao.updateEntity(task, "sot_begin_time,sot_end_time,sot_statu,sot_info,sot_process,sot_run_server"); | |||
} | |||
}); | |||
} | |||
} | |||
public void updatProcess(OnetimeTask task) { | |||
dao.updateEntity(task, "sot_process"); | |||
} | |||
} |
@@ -1,5 +1,6 @@ | |||
smtweb: | |||
machine-id: 1 | |||
enable-job: true | |||
file: | |||
local-path: /data/sw/files/ | |||
url: http://127.0.0.1:8888/sw/files/ | |||
@@ -4,7 +4,6 @@ import cc.smtweb.framework.core.common.SwConsts; | |||
import cc.smtweb.framework.core.mvc.controller.IStartListener; | |||
import cc.smtweb.framework.core.mvc.controller.scan.ApplicationScanner; | |||
import cc.smtweb.framework.core.mvc.controller.scan.BeanManager; | |||
import cc.smtweb.framework.core.systask.TaskStartEvent; | |||
import cc.smtweb.framework.core.systask.WebStartedEvent; | |||
import lombok.SneakyThrows; | |||
import org.springframework.boot.context.event.ApplicationStartedEvent; | |||
@@ -28,7 +27,6 @@ public class CoreApplicationStartedListener implements ApplicationListener<Appli | |||
System.out.println("onApplicationEvent============="); | |||
ConfigurableApplicationContext applicationContext = event.getApplicationContext(); | |||
applicationContext.publishEvent(new TaskStartEvent()); | |||
//包扫描 | |||
ApplicationScanner.scan(applicationContext); | |||
@@ -1,5 +1,6 @@ | |||
package cc.smtweb.framework.core; | |||
import cc.smtweb.framework.core.common.SwConsts; | |||
import cc.smtweb.framework.core.db.jdbc.IdGenerator; | |||
import cc.smtweb.framework.core.mvc.config.ControllerConfig; | |||
import org.springframework.beans.factory.annotation.Value; | |||
@@ -22,9 +23,13 @@ public class CoreAutoConfiguration implements WebMvcConfigurer { | |||
*/ | |||
@Value("${smtweb.machine-id}") | |||
private int machineId; | |||
@Value("${smtweb.enable-job}") | |||
private boolean enableJob; | |||
@Bean | |||
public IdGenerator idGenerator() { | |||
SwConsts.SysParam.machineId = machineId; | |||
SwConsts.SysParam.enableJob = enableJob; | |||
return new IdGenerator(machineId); | |||
} | |||
@@ -1,7 +1,6 @@ | |||
package cc.smtweb.framework.core.cache.redis; | |||
import cc.smtweb.framework.core.exception.SwException; | |||
import cc.smtweb.framework.core.systask.SysTaskManager; | |||
import cc.smtweb.framework.core.util.JsonUtil; | |||
import cc.smtweb.framework.core.util.SpringUtil; | |||
import io.lettuce.core.RedisClient; | |||
@@ -56,7 +55,7 @@ public class RedisManager implements DisposableBean { | |||
/** | |||
* 初始化Redis连接池 | |||
*/ | |||
public RedisManager(final ApplicationContext applicationContext, SysTaskManager sysTaskManager, RedisURI redisUri) { | |||
public RedisManager(final ApplicationContext applicationContext, RedisURI redisUri) { | |||
// this.applicationContext = applicationContext; | |||
redisClient = RedisClient.create(redisUri); | |||
channel = SCRIBE_SYSTEM + redisUri.getDatabase() + "_" + redisUri.getClientName(); | |||
@@ -65,12 +64,8 @@ public class RedisManager implements DisposableBean { | |||
config.setTestWhileIdle(true); | |||
pool = new GenericObjectPool<>(new RedisPooledObjectFactory(redisClient), config); | |||
// pool = ConnectionPoolSupport.createGenericObjectPool( | |||
// () -> redisClient.connect(ByteArrayCodec.INSTANCE), | |||
// new GenericObjectPoolConfig(), false); | |||
redisSysTask = new RedisSysTask(applicationContext, redisClient, SCRIBE_SYSTEM + redisUri.getDatabase() + "_*", channel); | |||
sysTaskManager.add(redisSysTask); | |||
redisSysTask.run(); | |||
} | |||
@PreDestroy | |||
@@ -1,6 +1,5 @@ | |||
package cc.smtweb.framework.core.cache.redis; | |||
import cc.smtweb.framework.core.systask.ISysTask; | |||
import cc.smtweb.framework.core.util.JsonUtil; | |||
import io.lettuce.core.RedisClient; | |||
import io.lettuce.core.pubsub.RedisPubSubAdapter; | |||
@@ -8,7 +7,7 @@ import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; | |||
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; | |||
import org.springframework.context.ApplicationContext; | |||
public class RedisSysTask implements ISysTask { | |||
public class RedisSysTask { | |||
private ApplicationContext applicationContext; | |||
private StatefulRedisPubSubConnection<String, String> connection; | |||
private RedisClient redisClient; | |||
@@ -24,7 +23,6 @@ public class RedisSysTask implements ISysTask { | |||
this.selfChannel = selfChannel; | |||
} | |||
@Override | |||
public int run() { | |||
// 非集群模式下的发布订阅 | |||
if (connection == null) { | |||
@@ -2,7 +2,6 @@ package cc.smtweb.framework.core.cache.redis.config; | |||
import cc.smtweb.framework.core.cache.redis.RedisManager; | |||
import cc.smtweb.framework.core.systask.SysTaskManager; | |||
import io.lettuce.core.RedisURI; | |||
import org.apache.commons.lang3.StringUtils; | |||
import org.springframework.beans.factory.annotation.Autowired; | |||
@@ -40,8 +39,6 @@ public class RedisConfig { | |||
@Autowired | |||
private ApplicationContext applicationContext; | |||
@Autowired | |||
private SysTaskManager sysTaskManager; | |||
@Bean | |||
public RedisManager redisManager() { | |||
@@ -59,6 +56,6 @@ public class RedisConfig { | |||
redisUri.setPassword((CharSequence) redisPassword.trim()); | |||
} | |||
return new RedisManager(applicationContext, sysTaskManager, redisUri); | |||
return new RedisManager(applicationContext, redisUri); | |||
} | |||
} |
@@ -11,6 +11,10 @@ public interface SwConsts { | |||
public static boolean SYS_STARTED = false; | |||
//运行的项目,多个用半角逗号分隔 | |||
public static String RUN_PROJECTS = ""; | |||
//启动时赋值:服务器编号 | |||
public static int machineId; | |||
//是否执行定时任务 | |||
public static boolean enableJob = false; | |||
} | |||
//错误码 | |||
@@ -46,4 +50,7 @@ public interface SwConsts { | |||
String DEF_PWD = "abc@123456"; //初始密码 | |||
String LOGIN_VERIFY_CODE = "_VERIFY_CODE"; | |||
String _LOGIN_USER_ID_IN_SESSION = "_LOGIN_USER_ID_IN_SESSION"; | |||
//定时任务key | |||
String REDIS_KEY_BASE_JOB = "jobexe"; | |||
} |
@@ -0,0 +1,46 @@ | |||
package cc.smtweb.framework.core.systask; | |||
import cc.smtweb.framework.core.common.SwConsts; | |||
import lombok.extern.slf4j.Slf4j; | |||
/** | |||
* Created by Akmm at 2018/11/27 23:46 | |||
* 系统任务基类 | |||
*/ | |||
@Slf4j | |||
public abstract class BaseSysService implements Runnable { | |||
public boolean isPrintLog() { | |||
return true; | |||
} | |||
//标题,描述性文字 | |||
public abstract String getTitle(); | |||
//间隔时间,单位秒 | |||
public abstract int getInterval(); | |||
@Override | |||
public void run() { | |||
try { | |||
final String id = this.getClass().getName(); | |||
SingleRequestHelper.singleRequest(SwConsts.REDIS_KEY_BASE_JOB, id, "任务执行中,本次忽略.........", new SingleRequestHelper.ISingleWork() { | |||
@Override | |||
public void doSingleWork() throws Exception { | |||
long time = System.currentTimeMillis(); | |||
if (isPrintLog()) { | |||
log.debug(getTitle() + ":::开始执行........."); | |||
} | |||
work(); | |||
if (isPrintLog()) { | |||
log.debug(getTitle() + ":::执行完成,耗时:" + (System.currentTimeMillis() - time) + "毫秒........."); | |||
} | |||
} | |||
}); | |||
} catch (Exception e) { | |||
log.error(getTitle() + ":::执行失败", e); | |||
} | |||
} | |||
//主要要干的活 | |||
protected abstract void work() throws Exception; | |||
} |
@@ -1,8 +0,0 @@ | |||
package cc.smtweb.framework.core.systask; | |||
/** | |||
* 任务接口 | |||
*/ | |||
public interface ISysTask { | |||
int run(); | |||
} |
@@ -0,0 +1,33 @@ | |||
package cc.smtweb.framework.core.systask; | |||
import cc.smtweb.framework.core.cache.redis.RedisManager; | |||
import cc.smtweb.framework.core.exception.SwException; | |||
import cc.smtweb.framework.core.util.DateUtil; | |||
import cc.smtweb.framework.core.util.StringUtil; | |||
public class SingleRequestHelper { | |||
public static interface ISingleWork { | |||
void doSingleWork() throws Exception; | |||
} | |||
public static void singleRequest(String region_key, String key, ISingleWork singleWork) throws Exception { | |||
singleRequest(region_key, key, "其他人正在执行该操作,请等待!", singleWork); | |||
} | |||
public static void singleRequest(String region_key, String key, String tip_msg, ISingleWork singleWork) throws Exception { | |||
boolean clearCache = false; | |||
try { | |||
String lastTime = RedisManager.getInstance().hGet(region_key, key, String.class); | |||
if (StringUtil.isNotEmpty(lastTime)) { | |||
throw new SwException(tip_msg); | |||
} | |||
RedisManager.getInstance().hSet(region_key, key, DateUtil.nowDateString()); | |||
clearCache = true; | |||
singleWork.doSingleWork(); | |||
} finally { | |||
if (clearCache) RedisManager.getInstance().hdel(region_key, key); | |||
} | |||
} | |||
} |
@@ -0,0 +1,58 @@ | |||
package cc.smtweb.framework.core.systask; | |||
import java.util.ArrayList; | |||
import java.util.List; | |||
import java.util.concurrent.Executors; | |||
import java.util.concurrent.ScheduledExecutorService; | |||
import java.util.concurrent.TimeUnit; | |||
/** | |||
* Created by Akmm at 2018/11/27 23:40 | |||
* 后台系统任务工厂,不允许用户干预的任务放在这里 | |||
*/ | |||
public class SysServiceFactory { | |||
//允许的最大线程个数 | |||
private static final int max_thread_size = 16; | |||
private static SysServiceFactory instance = null; | |||
static { | |||
instance = new SysServiceFactory(); | |||
} | |||
public static SysServiceFactory getInstance() { | |||
return instance; | |||
} | |||
private ScheduledExecutorService schedule; | |||
//待执行的任务 | |||
private List<BaseSysService> listService = new ArrayList<>(); | |||
public SysServiceFactory() { | |||
} | |||
//注册任务 | |||
public void reg(BaseSysService service) { | |||
listService.add(service); | |||
} | |||
//启动任务 | |||
protected void start() { | |||
if (schedule != null) { | |||
stop(); | |||
} | |||
int size = listService.size(); | |||
if (size > max_thread_size) size = max_thread_size; | |||
schedule = Executors.newScheduledThreadPool(size); | |||
for (BaseSysService s : listService) { | |||
schedule.scheduleWithFixedDelay(s, 60L, (long) s.getInterval(), TimeUnit.SECONDS); | |||
} | |||
} | |||
//停止任务 | |||
protected void stop() { | |||
schedule.shutdown(); | |||
} | |||
} |
@@ -1,40 +0,0 @@ | |||
package cc.smtweb.framework.core.systask; | |||
import org.springframework.beans.factory.annotation.Autowired; | |||
import org.springframework.context.ApplicationContext; | |||
import org.springframework.context.event.EventListener; | |||
import org.springframework.stereotype.Service; | |||
import java.util.ArrayList; | |||
import java.util.List; | |||
@Service | |||
public class SysTaskManager { | |||
@Autowired | |||
private ApplicationContext applicationContext; | |||
private List<ISysTask> tasks = new ArrayList<>(); | |||
public void add(ISysTask task) { | |||
tasks.add(task); | |||
} | |||
@EventListener | |||
public void onTaskStartEvent(TaskStartEvent event) { | |||
runAll(); | |||
} | |||
// 每隔60秒定时执行 | |||
// @Scheduled(fixedRate = 60000) | |||
// public void fixedRateJob() { | |||
// System.out.println("fixedRate 每隔60秒" + new java.util.Date()); | |||
// | |||
// runAll(); | |||
// } | |||
private void runAll() { | |||
for (ISysTask task : tasks) { | |||
task.run(); | |||
} | |||
} | |||
} |
@@ -0,0 +1,89 @@ | |||
package cc.smtweb.framework.core.systask; | |||
import lombok.extern.slf4j.Slf4j; | |||
import java.util.concurrent.ArrayBlockingQueue; | |||
import java.util.concurrent.ThreadFactory; | |||
import java.util.concurrent.ThreadPoolExecutor; | |||
import java.util.concurrent.TimeUnit; | |||
import java.util.concurrent.atomic.AtomicInteger; | |||
/** | |||
* Created by Akmm at 2018/11/27 23:40 | |||
* 通用线程池发放 | |||
*/ | |||
@Slf4j | |||
public class SysThreadPool { | |||
//允许的最大线程池 | |||
private static final int max_pool_size = 100; | |||
private static SysThreadPool instance = null; | |||
static { | |||
instance = new SysThreadPool(); | |||
} | |||
public static SysThreadPool getInstance() { | |||
return instance; | |||
} | |||
private ThreadPoolExecutor threadPool; | |||
static class SysThreadFactory implements ThreadFactory { | |||
private static final AtomicInteger poolNumber = new AtomicInteger(1); | |||
private final ThreadGroup group; | |||
private final AtomicInteger threadNumber = new AtomicInteger(1); | |||
private final String namePrefix; | |||
SysThreadFactory() { | |||
SecurityManager s = System.getSecurityManager(); | |||
group = (s != null) ? s.getThreadGroup() : | |||
Thread.currentThread().getThreadGroup(); | |||
namePrefix = "DfpPool-" + | |||
poolNumber.getAndIncrement() + | |||
"-Thread-"; | |||
} | |||
public Thread newThread(Runnable r) { | |||
Thread t = new Thread(group, r, | |||
namePrefix + threadNumber.getAndIncrement(), | |||
0); | |||
if (t.isDaemon()) | |||
t.setDaemon(false); | |||
if (t.getPriority() != Thread.NORM_PRIORITY) | |||
t.setPriority(Thread.NORM_PRIORITY); | |||
return t; | |||
} | |||
} | |||
public void addTask(SysThreadWorker worker) { | |||
if (threadPool == null) { | |||
threadPool = new ThreadPoolExecutor(2, max_pool_size, | |||
60L, TimeUnit.SECONDS, | |||
new ArrayBlockingQueue<Runnable>(100), new SysThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); | |||
} | |||
try { | |||
threadPool.execute(worker); | |||
log.debug("add task to DfpPool success -> [PoolSize:" + threadPool.getPoolSize() + ",ActiveCount:" + threadPool.getActiveCount() + ",CompletedTaskCount:" + threadPool.getCompletedTaskCount() + "]"); | |||
} catch (Exception e) { | |||
log.debug("add task to DfpPool pools error:", e); | |||
} | |||
} | |||
//停止任务 | |||
protected void stop() { | |||
if (threadPool != null) threadPool.shutdown(); | |||
} | |||
public static void main(String[] args) { | |||
for (int i = 1; i < 100; i++) { | |||
int j = i; | |||
SysThreadPool.getInstance().addTask(new SysThreadWorker(null, null) { | |||
@Override | |||
public void localWork() throws Exception { | |||
Thread.sleep(1000); | |||
log.debug("线程:[" + j + "]执行"); | |||
} | |||
}); | |||
} | |||
} | |||
} |
@@ -0,0 +1,39 @@ | |||
package cc.smtweb.framework.core.systask; | |||
import cc.smtweb.framework.core.util.StringUtil; | |||
import lombok.extern.slf4j.Slf4j; | |||
@Slf4j | |||
public abstract class SysThreadWorker implements Runnable { | |||
private String loginPartyId; | |||
private String loginOwnerPartyId; | |||
private String bizName; | |||
protected SysThreadWorker(String loginPartyId, String loginOwnerPartyId) { | |||
this.loginPartyId = loginPartyId; | |||
this.loginOwnerPartyId = loginOwnerPartyId; | |||
} | |||
protected SysThreadWorker(String loginPartyId, String loginOwnerPartyId, String bizName) { | |||
this.loginPartyId = loginPartyId; | |||
this.loginOwnerPartyId = loginOwnerPartyId; | |||
this.bizName = StringUtil.checkNull(bizName); | |||
} | |||
@Override | |||
public void run() { | |||
try { | |||
// SysParams.setLoginOwnerPartyId(loginOwnerPartyId); | |||
// SysParams.setLoginPartyId(loginPartyId); | |||
localWork(); | |||
} catch (Exception e) { | |||
log.error("DfpPool handle error:", e); | |||
} | |||
} | |||
public abstract void localWork() throws Exception; | |||
} |
@@ -1,4 +0,0 @@ | |||
package cc.smtweb.framework.core.systask; | |||
public class TaskStartEvent { | |||
} |
@@ -18,4 +18,5 @@ public class Consts { | |||
public final static String FIELD_LAST_TIME = "last_time";//最后更新时间,字段名 | |||
public final static String FIELD_CREATE_PARTY = "create_party_id";//创建单位,字段名 | |||
} |