mirror of
https://github.com/dataease/dataease.git
synced 2026-05-15 13:32:18 +08:00
fix(同步管理): 优化任务的启动停止逻辑
This commit is contained in:
committed by
jianneng-fit2cloud
parent
97063869fc
commit
c99387d3d6
@@ -4,6 +4,7 @@ import io.dataease.license.config.XpackInteract;
|
||||
import jakarta.annotation.Resource;
|
||||
import org.quartz.JobDataMap;
|
||||
import org.quartz.JobKey;
|
||||
import org.quartz.SchedulerException;
|
||||
import org.quartz.TriggerKey;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@@ -14,6 +15,7 @@ import java.util.Map;
|
||||
public class DeXpackDataSyncTaskExecutor {
|
||||
|
||||
private static final String SYNC_JOB_GROUP = "SYNC_TASK";
|
||||
private static final String SYNC_SIMPLE_JOB_GROUP = "SYNC_SIMPLE_TASK";
|
||||
|
||||
@Resource
|
||||
private ScheduleManager scheduleManager;
|
||||
@@ -44,7 +46,6 @@ public class DeXpackDataSyncTaskExecutor {
|
||||
JobDataMap jobDataMap = jobData != null ? new JobDataMap(jobData) : new JobDataMap();
|
||||
jobDataMap.put("taskId", taskId);
|
||||
// 调度任务
|
||||
jobDataMap.put("cron", cron);
|
||||
jobDataMap.put("startTime", startTime);
|
||||
jobDataMap.put("endTime", endTime);
|
||||
jobDataMap.put("executeOnce", Boolean.FALSE);
|
||||
@@ -52,12 +53,54 @@ public class DeXpackDataSyncTaskExecutor {
|
||||
cron, new Date(startTime), endTime != null ? new Date(endTime) : null, jobDataMap);
|
||||
}
|
||||
|
||||
public void removeSyncTask(String taskId) {
|
||||
JobKey jobKey = new JobKey(taskId, SYNC_JOB_GROUP);
|
||||
TriggerKey triggerKey = new TriggerKey(taskId, SYNC_JOB_GROUP);
|
||||
/**
|
||||
* 添加或更新简单轮询任务
|
||||
*
|
||||
* @param taskId 任务ID
|
||||
* @param period 间隔时间,5m 5h
|
||||
* @param jobData 任务数据
|
||||
*/
|
||||
public void addOrUpdateSyncSimpleJob(String taskId, String period, Long startTime, Long endTime, Map<String, Object> jobData) {
|
||||
JobKey jobKey = new JobKey(taskId, SYNC_SIMPLE_JOB_GROUP);
|
||||
TriggerKey triggerKey = new TriggerKey(taskId, SYNC_SIMPLE_JOB_GROUP);
|
||||
// 准备JobDataMap
|
||||
JobDataMap jobDataMap = jobData != null ? new JobDataMap(jobData) : new JobDataMap();
|
||||
jobDataMap.put("taskId", taskId);
|
||||
jobDataMap.put("executeOnce", Boolean.FALSE);
|
||||
try {
|
||||
scheduleManager.addOrUpdateSimpleJobForCustomTime(jobKey, triggerKey, DeXpackDataSyncTaskScheduleJob.class, new Date(startTime), endTime != null ? new Date(endTime) : null, period, jobDataMap);
|
||||
} catch (SchedulerException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void removeSyncTask(String taskId, boolean isSimpleJob) {
|
||||
String jobGroup = isSimpleJob ? SYNC_SIMPLE_JOB_GROUP : SYNC_JOB_GROUP;
|
||||
JobKey jobKey = new JobKey(taskId, jobGroup);
|
||||
TriggerKey triggerKey = new TriggerKey(taskId, jobGroup);
|
||||
if (scheduleManager.exist(jobKey)) {
|
||||
scheduleManager.removeJob(jobKey, triggerKey);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取间隔任务的下一次执行时间
|
||||
*/
|
||||
public Long getSimpleJobNextFireTime(String taskId, Date currentTime) {
|
||||
TriggerKey triggerKey = new TriggerKey(taskId, SYNC_SIMPLE_JOB_GROUP);
|
||||
return scheduleManager.getNextSimpleTriggerTime(triggerKey, currentTime);
|
||||
}
|
||||
|
||||
public void pauseTrigger(String taskId, boolean isSimpleJob) {
|
||||
String jobGroup = isSimpleJob ? SYNC_SIMPLE_JOB_GROUP : SYNC_JOB_GROUP;
|
||||
TriggerKey triggerKey = new TriggerKey(taskId, jobGroup);
|
||||
scheduleManager.pauseTrigger(triggerKey);
|
||||
}
|
||||
|
||||
public void resumeTrigger(String taskId, boolean isSimpleJob) {
|
||||
String jobGroup = isSimpleJob ? SYNC_SIMPLE_JOB_GROUP : SYNC_JOB_GROUP;
|
||||
TriggerKey triggerKey = new TriggerKey(taskId, jobGroup);
|
||||
scheduleManager.resumeTrigger(triggerKey);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -445,4 +445,153 @@ public class ScheduleManager {
|
||||
scheduler.unscheduleJobs(new ArrayList<>(triggerKeys));
|
||||
scheduler.deleteJobs(new ArrayList<>(jobKeys));
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加或修改 simpleJob,自定义开始时间和结束时间
|
||||
*
|
||||
*/
|
||||
public void addOrUpdateSimpleJobForCustomTime(JobKey jobKey, TriggerKey triggerKey, Class clz, Date startTime, Date endTime,
|
||||
String period, JobDataMap jobDataMap) throws SchedulerException {
|
||||
|
||||
if (scheduler.checkExists(triggerKey)) {
|
||||
modifySimpleJobTimeForCustomTime(triggerKey, period, startTime, endTime);
|
||||
} else {
|
||||
addSimpleJobForCustomTime(jobKey, triggerKey, clz, period, startTime, endTime, jobDataMap);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加 simpleJob,自定义开始时间和结束时间
|
||||
*
|
||||
*/
|
||||
public void addSimpleJobForCustomTime(JobKey jobKey, TriggerKey triggerKey, Class<? extends Job> cls,
|
||||
String period, Date startTime, Date endTime, JobDataMap jobDataMap)
|
||||
throws SchedulerException {
|
||||
JobDataMap dateMap = jobDataMap != null ? jobDataMap : new JobDataMap();
|
||||
dateMap.put("period", period);
|
||||
JobDetail jobDetail = JobBuilder.newJob(cls)
|
||||
.withIdentity(jobKey)
|
||||
.usingJobData(dateMap)
|
||||
.build();
|
||||
TriggerBuilder<SimpleTrigger> triggerBuilder = simpleJobTriggerBuilder(triggerKey, period, startTime, endTime);
|
||||
triggerBuilder.usingJobData(dateMap);
|
||||
scheduler.scheduleJob(jobDetail, triggerBuilder.build());
|
||||
}
|
||||
|
||||
/**
|
||||
* 修改simpleTrigger触发器的触发时间,自定义开始时间和结束时间
|
||||
*
|
||||
*/
|
||||
public void modifySimpleJobTimeForCustomTime(TriggerKey triggerKey, String period, Date startTime, Date endTime) {
|
||||
try {
|
||||
LogUtil.info("modifySimpleJobTimeForCustomTime: " + triggerKey.getName() + "," + triggerKey.getGroup());
|
||||
SimpleTrigger trigger = (SimpleTrigger) scheduler.getTrigger(triggerKey);
|
||||
if (trigger == null) {
|
||||
return;
|
||||
}
|
||||
Date oldStartTime = trigger.getStartTime();
|
||||
Date oldEndTime = trigger.getEndTime();
|
||||
String oldPeriod = trigger.getJobDataMap().getString("period");
|
||||
boolean startTimeChanged = !Objects.equals(oldStartTime, startTime);
|
||||
boolean endTimeChanged = !Objects.equals(oldEndTime, endTime);
|
||||
boolean periodChanged = !Objects.equals(oldPeriod, period);
|
||||
if (startTimeChanged || endTimeChanged || periodChanged) {
|
||||
TriggerBuilder<SimpleTrigger> triggerBuilder = simpleJobTriggerBuilder(triggerKey, period, startTime, endTime);
|
||||
triggerBuilder.usingJobData(trigger.getJobDataMap());
|
||||
scheduler.rescheduleJob(triggerKey, triggerBuilder.build());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LogUtil.error(e.getMessage(), e);
|
||||
DEException.throwException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建simpleTrigger
|
||||
*
|
||||
*/
|
||||
private TriggerBuilder<SimpleTrigger> simpleJobTriggerBuilder(TriggerKey triggerKey, String period, Date startTime, Date endTime) {
|
||||
SimpleScheduleBuilder scheduleBuilder = SimpleScheduleBuilder.simpleSchedule();
|
||||
if (period != null && period.length() > 1) {
|
||||
String number = period.substring(0, period.length() - 1);
|
||||
char unit = period.charAt(period.length() - 1);
|
||||
switch (unit) {
|
||||
case 's':
|
||||
scheduleBuilder.withIntervalInSeconds(Integer.parseInt(number));
|
||||
break;
|
||||
case 'm':
|
||||
scheduleBuilder.withIntervalInMinutes(Integer.parseInt(number));
|
||||
break;
|
||||
case 'h':
|
||||
scheduleBuilder.withIntervalInHours(Integer.parseInt(number));
|
||||
break;
|
||||
case 'd':
|
||||
scheduleBuilder.withIntervalInHours(Integer.parseInt(number) * 24);
|
||||
break;
|
||||
default:
|
||||
scheduleBuilder.withIntervalInMinutes(1);
|
||||
}
|
||||
scheduleBuilder.repeatForever();
|
||||
} else {
|
||||
scheduleBuilder.withIntervalInMinutes(1);
|
||||
}
|
||||
TriggerBuilder<SimpleTrigger> triggerBuilder = TriggerBuilder.newTrigger()
|
||||
.withIdentity(triggerKey)
|
||||
.withSchedule(scheduleBuilder);
|
||||
if (startTime != null) {
|
||||
triggerBuilder.startAt(startTime);
|
||||
} else {
|
||||
triggerBuilder.startNow();
|
||||
}
|
||||
triggerBuilder.endAt(endTime);
|
||||
return triggerBuilder;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取间隔任务的下一次执行时间
|
||||
*/
|
||||
public Long getNextSimpleTriggerTime(TriggerKey triggerKey, Date currentTime) {
|
||||
try {
|
||||
SimpleTrigger trigger = (SimpleTrigger) scheduler.getTrigger(triggerKey);
|
||||
if (trigger == null) {
|
||||
LogUtil.warn("getNextSimpleTriggerTime: " + triggerKey.getName() + "," + triggerKey.getGroup());
|
||||
return null;
|
||||
}
|
||||
LogUtil.debug("SimpleTriggerNextTime: " + triggerKey.getName() + "," + triggerKey.getGroup() + "," + trigger.getFireTimeAfter(currentTime));
|
||||
return trigger.getFireTimeAfter(currentTime) != null ? trigger.getFireTimeAfter(currentTime).getTime() : null;
|
||||
} catch (Exception e) {
|
||||
LogUtil.error(e.getMessage(), e);
|
||||
DEException.throwException(e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public void pauseTrigger(TriggerKey triggerKey) {
|
||||
try {
|
||||
Trigger trigger = scheduler.getTrigger(triggerKey);
|
||||
if (trigger != null) {
|
||||
scheduler.pauseTrigger(triggerKey);
|
||||
} else {
|
||||
LogUtil.warn("pauseTrigger: " + triggerKey.getName() + "," + triggerKey.getGroup());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LogUtil.error(e.getMessage(), e);
|
||||
DEException.throwException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void resumeTrigger(TriggerKey triggerKey) {
|
||||
try {
|
||||
Trigger trigger = scheduler.getTrigger(triggerKey);
|
||||
if (trigger != null) {
|
||||
scheduler.resumeTrigger(triggerKey);
|
||||
} else {
|
||||
LogUtil.warn("resumeTrigger: " + triggerKey.getName() + "," + triggerKey.getGroup());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LogUtil.error(e.getMessage(), e);
|
||||
DEException.throwException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
2
de-xpack
2
de-xpack
Submodule de-xpack updated: a4a09fe8f6...0f3d7f7731
Reference in New Issue
Block a user