diff --git a/core/core-backend/src/main/java/io/dataease/job/schedule/DeXpackDataSyncTaskExecutor.java b/core/core-backend/src/main/java/io/dataease/job/schedule/DeXpackDataSyncTaskExecutor.java index fd1865c741..bda6d8d714 100644 --- a/core/core-backend/src/main/java/io/dataease/job/schedule/DeXpackDataSyncTaskExecutor.java +++ b/core/core-backend/src/main/java/io/dataease/job/schedule/DeXpackDataSyncTaskExecutor.java @@ -4,6 +4,7 @@ import io.dataease.license.config.XpackInteract; import jakarta.annotation.Resource; import org.quartz.JobDataMap; import org.quartz.JobKey; +import org.quartz.SchedulerException; import org.quartz.TriggerKey; import org.springframework.stereotype.Component; @@ -14,6 +15,7 @@ import java.util.Map; public class DeXpackDataSyncTaskExecutor { private static final String SYNC_JOB_GROUP = "SYNC_TASK"; + private static final String SYNC_SIMPLE_JOB_GROUP = "SYNC_SIMPLE_TASK"; @Resource private ScheduleManager scheduleManager; @@ -44,7 +46,6 @@ public class DeXpackDataSyncTaskExecutor { JobDataMap jobDataMap = jobData != null ? new JobDataMap(jobData) : new JobDataMap(); jobDataMap.put("taskId", taskId); // 调度任务 - jobDataMap.put("cron", cron); jobDataMap.put("startTime", startTime); jobDataMap.put("endTime", endTime); jobDataMap.put("executeOnce", Boolean.FALSE); @@ -52,12 +53,54 @@ public class DeXpackDataSyncTaskExecutor { cron, new Date(startTime), endTime != null ? new Date(endTime) : null, jobDataMap); } - public void removeSyncTask(String taskId) { - JobKey jobKey = new JobKey(taskId, SYNC_JOB_GROUP); - TriggerKey triggerKey = new TriggerKey(taskId, SYNC_JOB_GROUP); + /** + * 添加或更新简单轮询任务 + * + * @param taskId 任务ID + * @param period 间隔时间,5m 5h + * @param jobData 任务数据 + */ + public void addOrUpdateSyncSimpleJob(String taskId, String period, Long startTime, Long endTime, Map jobData) { + JobKey jobKey = new JobKey(taskId, SYNC_SIMPLE_JOB_GROUP); + TriggerKey triggerKey = new TriggerKey(taskId, SYNC_SIMPLE_JOB_GROUP); + // 准备JobDataMap + JobDataMap jobDataMap = jobData != null ? new JobDataMap(jobData) : new JobDataMap(); + jobDataMap.put("taskId", taskId); + jobDataMap.put("executeOnce", Boolean.FALSE); + try { + scheduleManager.addOrUpdateSimpleJobForCustomTime(jobKey, triggerKey, DeXpackDataSyncTaskScheduleJob.class, new Date(startTime), endTime != null ? new Date(endTime) : null, period, jobDataMap); + } catch (SchedulerException e) { + throw new RuntimeException(e); + } + + } + + public void removeSyncTask(String taskId, boolean isSimpleJob) { + String jobGroup = isSimpleJob ? SYNC_SIMPLE_JOB_GROUP : SYNC_JOB_GROUP; + JobKey jobKey = new JobKey(taskId, jobGroup); + TriggerKey triggerKey = new TriggerKey(taskId, jobGroup); if (scheduleManager.exist(jobKey)) { scheduleManager.removeJob(jobKey, triggerKey); } } + /** + * 获取间隔任务的下一次执行时间 + */ + public Long getSimpleJobNextFireTime(String taskId, Date currentTime) { + TriggerKey triggerKey = new TriggerKey(taskId, SYNC_SIMPLE_JOB_GROUP); + return scheduleManager.getNextSimpleTriggerTime(triggerKey, currentTime); + } + + public void pauseTrigger(String taskId, boolean isSimpleJob) { + String jobGroup = isSimpleJob ? SYNC_SIMPLE_JOB_GROUP : SYNC_JOB_GROUP; + TriggerKey triggerKey = new TriggerKey(taskId, jobGroup); + scheduleManager.pauseTrigger(triggerKey); + } + + public void resumeTrigger(String taskId, boolean isSimpleJob) { + String jobGroup = isSimpleJob ? SYNC_SIMPLE_JOB_GROUP : SYNC_JOB_GROUP; + TriggerKey triggerKey = new TriggerKey(taskId, jobGroup); + scheduleManager.resumeTrigger(triggerKey); + } } diff --git a/core/core-backend/src/main/java/io/dataease/job/schedule/ScheduleManager.java b/core/core-backend/src/main/java/io/dataease/job/schedule/ScheduleManager.java index e97029442a..22947c556b 100644 --- a/core/core-backend/src/main/java/io/dataease/job/schedule/ScheduleManager.java +++ b/core/core-backend/src/main/java/io/dataease/job/schedule/ScheduleManager.java @@ -445,4 +445,153 @@ public class ScheduleManager { scheduler.unscheduleJobs(new ArrayList<>(triggerKeys)); scheduler.deleteJobs(new ArrayList<>(jobKeys)); } + + /** + * 添加或修改 simpleJob,自定义开始时间和结束时间 + * + */ + public void addOrUpdateSimpleJobForCustomTime(JobKey jobKey, TriggerKey triggerKey, Class clz, Date startTime, Date endTime, + String period, JobDataMap jobDataMap) throws SchedulerException { + + if (scheduler.checkExists(triggerKey)) { + modifySimpleJobTimeForCustomTime(triggerKey, period, startTime, endTime); + } else { + addSimpleJobForCustomTime(jobKey, triggerKey, clz, period, startTime, endTime, jobDataMap); + } + + } + + /** + * 添加 simpleJob,自定义开始时间和结束时间 + * + */ + public void addSimpleJobForCustomTime(JobKey jobKey, TriggerKey triggerKey, Class cls, + String period, Date startTime, Date endTime, JobDataMap jobDataMap) + throws SchedulerException { + JobDataMap dateMap = jobDataMap != null ? jobDataMap : new JobDataMap(); + dateMap.put("period", period); + JobDetail jobDetail = JobBuilder.newJob(cls) + .withIdentity(jobKey) + .usingJobData(dateMap) + .build(); + TriggerBuilder triggerBuilder = simpleJobTriggerBuilder(triggerKey, period, startTime, endTime); + triggerBuilder.usingJobData(dateMap); + scheduler.scheduleJob(jobDetail, triggerBuilder.build()); + } + + /** + * 修改simpleTrigger触发器的触发时间,自定义开始时间和结束时间 + * + */ + public void modifySimpleJobTimeForCustomTime(TriggerKey triggerKey, String period, Date startTime, Date endTime) { + try { + LogUtil.info("modifySimpleJobTimeForCustomTime: " + triggerKey.getName() + "," + triggerKey.getGroup()); + SimpleTrigger trigger = (SimpleTrigger) scheduler.getTrigger(triggerKey); + if (trigger == null) { + return; + } + Date oldStartTime = trigger.getStartTime(); + Date oldEndTime = trigger.getEndTime(); + String oldPeriod = trigger.getJobDataMap().getString("period"); + boolean startTimeChanged = !Objects.equals(oldStartTime, startTime); + boolean endTimeChanged = !Objects.equals(oldEndTime, endTime); + boolean periodChanged = !Objects.equals(oldPeriod, period); + if (startTimeChanged || endTimeChanged || periodChanged) { + TriggerBuilder triggerBuilder = simpleJobTriggerBuilder(triggerKey, period, startTime, endTime); + triggerBuilder.usingJobData(trigger.getJobDataMap()); + scheduler.rescheduleJob(triggerKey, triggerBuilder.build()); + } + } catch (Exception e) { + LogUtil.error(e.getMessage(), e); + DEException.throwException(e); + } + } + + /** + * 构建simpleTrigger + * + */ + private TriggerBuilder simpleJobTriggerBuilder(TriggerKey triggerKey, String period, Date startTime, Date endTime) { + SimpleScheduleBuilder scheduleBuilder = SimpleScheduleBuilder.simpleSchedule(); + if (period != null && period.length() > 1) { + String number = period.substring(0, period.length() - 1); + char unit = period.charAt(period.length() - 1); + switch (unit) { + case 's': + scheduleBuilder.withIntervalInSeconds(Integer.parseInt(number)); + break; + case 'm': + scheduleBuilder.withIntervalInMinutes(Integer.parseInt(number)); + break; + case 'h': + scheduleBuilder.withIntervalInHours(Integer.parseInt(number)); + break; + case 'd': + scheduleBuilder.withIntervalInHours(Integer.parseInt(number) * 24); + break; + default: + scheduleBuilder.withIntervalInMinutes(1); + } + scheduleBuilder.repeatForever(); + } else { + scheduleBuilder.withIntervalInMinutes(1); + } + TriggerBuilder triggerBuilder = TriggerBuilder.newTrigger() + .withIdentity(triggerKey) + .withSchedule(scheduleBuilder); + if (startTime != null) { + triggerBuilder.startAt(startTime); + } else { + triggerBuilder.startNow(); + } + triggerBuilder.endAt(endTime); + return triggerBuilder; + } + + /** + * 获取间隔任务的下一次执行时间 + */ + public Long getNextSimpleTriggerTime(TriggerKey triggerKey, Date currentTime) { + try { + SimpleTrigger trigger = (SimpleTrigger) scheduler.getTrigger(triggerKey); + if (trigger == null) { + LogUtil.warn("getNextSimpleTriggerTime: " + triggerKey.getName() + "," + triggerKey.getGroup()); + return null; + } + LogUtil.debug("SimpleTriggerNextTime: " + triggerKey.getName() + "," + triggerKey.getGroup() + "," + trigger.getFireTimeAfter(currentTime)); + return trigger.getFireTimeAfter(currentTime) != null ? trigger.getFireTimeAfter(currentTime).getTime() : null; + } catch (Exception e) { + LogUtil.error(e.getMessage(), e); + DEException.throwException(e); + } + return null; + } + + public void pauseTrigger(TriggerKey triggerKey) { + try { + Trigger trigger = scheduler.getTrigger(triggerKey); + if (trigger != null) { + scheduler.pauseTrigger(triggerKey); + } else { + LogUtil.warn("pauseTrigger: " + triggerKey.getName() + "," + triggerKey.getGroup()); + } + } catch (Exception e) { + LogUtil.error(e.getMessage(), e); + DEException.throwException(e); + } + } + + public void resumeTrigger(TriggerKey triggerKey) { + try { + Trigger trigger = scheduler.getTrigger(triggerKey); + if (trigger != null) { + scheduler.resumeTrigger(triggerKey); + } else { + LogUtil.warn("resumeTrigger: " + triggerKey.getName() + "," + triggerKey.getGroup()); + } + } catch (Exception e) { + LogUtil.error(e.getMessage(), e); + DEException.throwException(e); + } + } } diff --git a/de-xpack b/de-xpack index a4a09fe8f6..0f3d7f7731 160000 --- a/de-xpack +++ b/de-xpack @@ -1 +1 @@ -Subproject commit a4a09fe8f6ed54548d538daf1d835b58908c3656 +Subproject commit 0f3d7f77316b905c4f7a3f5f89ef8211e1737411