mirror of
https://github.com/dataease/dataease.git
synced 2026-06-17 21:08:31 +08:00
feat: 任务管理
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
package io.dataease.commons.constants;
|
||||
|
||||
public enum TaskStatus {
|
||||
Underway, Stopped
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package io.dataease.commons.constants;
|
||||
|
||||
public enum TriggerType {
|
||||
Cron, Custom
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package io.dataease.dto.dataset;
|
||||
|
||||
import io.dataease.base.domain.DatasetTableTask;
|
||||
import io.dataease.base.domain.DatasetTableTaskLog;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
/**
|
||||
* @Author gin
|
||||
* @Date 2021/3/9 3:19 下午
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
public class DataSetTaskDTO extends DatasetTableTask {
|
||||
private String datasetName;
|
||||
private Long nextExecTime;
|
||||
private String taskStatus;
|
||||
}
|
||||
@@ -57,7 +57,7 @@ public class DataSetTableTaskService {
|
||||
public DatasetTableTask save(DataSetTaskRequest dataSetTaskRequest) throws Exception {
|
||||
checkName(dataSetTaskRequest);
|
||||
DatasetTableTask datasetTableTask = dataSetTaskRequest.getDatasetTableTask();
|
||||
if(!datasetTableTask.getType().equalsIgnoreCase("all_scope")){
|
||||
if(!datasetTableTask.getType().equalsIgnoreCase("add_scope")){
|
||||
dataSetTableService.saveIncrementalConfig(dataSetTaskRequest.getDatasetTableIncrementalConfig());
|
||||
}
|
||||
// check
|
||||
@@ -82,14 +82,24 @@ public class DataSetTableTaskService {
|
||||
datasetTableTask.setId(UUID.randomUUID().toString());
|
||||
datasetTableTask.setCreateTime(System.currentTimeMillis());
|
||||
datasetTableTask.setStatus(TaskStatus.Underway.name());
|
||||
if (datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) { // SIMPLE 类型,提前占位
|
||||
execNow(datasetTableTask);
|
||||
}
|
||||
datasetTableTaskMapper.insert(datasetTableTask);
|
||||
} else {
|
||||
datasetTableTaskMapper.updateByPrimaryKeySelective(datasetTableTask);
|
||||
}
|
||||
scheduleService.addSchedule(datasetTableTask);
|
||||
if (datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString()) && datasetTableTask.getStatus().equalsIgnoreCase(TaskStatus.Underway.name())) { // SIMPLE 类型,提前占位
|
||||
execNow(datasetTableTask);
|
||||
datasetTableTask.setLastExecStatus(JobStatus.Underway.name());
|
||||
datasetTableTask.setLastExecTime(System.currentTimeMillis());
|
||||
update(datasetTableTask);
|
||||
}
|
||||
if(!datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.name())){
|
||||
scheduleService.addSchedule(datasetTableTask);
|
||||
}else {
|
||||
if(datasetTableTask.getStatus().equalsIgnoreCase(JobStatus.Underway.name())){
|
||||
scheduleService.addSchedule(datasetTableTask);
|
||||
}
|
||||
}
|
||||
|
||||
return datasetTableTask;
|
||||
}
|
||||
|
||||
@@ -100,17 +110,17 @@ public class DataSetTableTaskService {
|
||||
DataEaseException.throwException(Translator.get("i18n_not_exec_add_sync"));
|
||||
}
|
||||
}
|
||||
if (extractDataService.updateSyncStatusIsNone(dataSetTableService.get(datasetTableTask.getTableId()))) {
|
||||
if (extractDataService.existSyncTask(dataSetTableService.get(datasetTableTask.getTableId()), null)) {
|
||||
DataEaseException.throwException(Translator.get("i18n_sync_job_exists"));
|
||||
} else { //write log
|
||||
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
|
||||
datasetTableTaskLog.setTableId(datasetTableTask.getTableId());
|
||||
datasetTableTaskLog.setTaskId(datasetTableTask.getId());
|
||||
datasetTableTaskLog.setStatus(JobStatus.Underway.name());
|
||||
datasetTableTaskLog.setStartTime(System.currentTimeMillis());
|
||||
datasetTableTaskLog.setTriggerType(TriggerType.Custom.name());
|
||||
dataSetTableTaskLogService.save(datasetTableTaskLog);
|
||||
}
|
||||
//write log
|
||||
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
|
||||
datasetTableTaskLog.setTableId(datasetTableTask.getTableId());
|
||||
datasetTableTaskLog.setTaskId(datasetTableTask.getId());
|
||||
datasetTableTaskLog.setStatus(JobStatus.Underway.name());
|
||||
datasetTableTaskLog.setStartTime(System.currentTimeMillis());
|
||||
datasetTableTaskLog.setTriggerType(TriggerType.Custom.name());
|
||||
dataSetTableTaskLogService.save(datasetTableTaskLog);
|
||||
}
|
||||
|
||||
public void delete(String id) {
|
||||
@@ -195,14 +205,16 @@ public class DataSetTableTaskService {
|
||||
}
|
||||
|
||||
public void execTask(DatasetTableTask datasetTableTask) throws Exception{
|
||||
execNow(datasetTableTask);
|
||||
// datasetTableTask.setStatus(TaskStatus.Underway.name());
|
||||
datasetTableTask.setLastExecStatus(JobStatus.Underway.name());
|
||||
datasetTableTask.setLastExecTime(System.currentTimeMillis());
|
||||
update(datasetTableTask);
|
||||
|
||||
if(datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.CRON.toString())){
|
||||
scheduleService.fireNow(datasetTableTask);
|
||||
}
|
||||
if(datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())){
|
||||
execNow(datasetTableTask);
|
||||
datasetTableTask.setStatus(TaskStatus.Underway.name());
|
||||
datasetTableTask.setLastExecStatus(JobStatus.Underway.name());
|
||||
update(datasetTableTask);
|
||||
scheduleService.addSchedule(datasetTableTask);
|
||||
}
|
||||
|
||||
|
||||
@@ -131,24 +131,42 @@ public class ExtractDataService {
|
||||
"fi\n" +
|
||||
"rm -rf %s\n";
|
||||
|
||||
public synchronized boolean updateSyncStatusIsNone(DatasetTable datasetTable ){
|
||||
public synchronized boolean existSyncTask(DatasetTable datasetTable, DatasetTableTask datasetTableTask){
|
||||
datasetTable.setSyncStatus(JobStatus.Underway.name());
|
||||
|
||||
DatasetTableExample example = new DatasetTableExample();
|
||||
example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusNotEqualTo(JobStatus.Underway.name());
|
||||
example.or(example.createCriteria().andIdEqualTo(datasetTable.getId()).andSyncStatusIsNull());
|
||||
return datasetTableMapper.updateByExampleSelective(datasetTable, example) == 0;
|
||||
Boolean existSyncTask = datasetTableMapper.updateByExampleSelective(datasetTable, example) == 0;
|
||||
if(existSyncTask){
|
||||
if(datasetTableTask != null){
|
||||
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
|
||||
datasetTableTaskLog.setTaskId(datasetTableTask.getId());
|
||||
datasetTableTaskLog.setTableId(datasetTable.getId());
|
||||
datasetTableTaskLog.setStatus(JobStatus.Underway.name());
|
||||
List<DatasetTableTaskLog> datasetTableTaskLogs = dataSetTableTaskLogService.select(datasetTableTaskLog);
|
||||
if(CollectionUtils.isNotEmpty(datasetTableTaskLogs) && datasetTableTaskLogs.get(0).getTriggerType().equalsIgnoreCase(TriggerType.Custom.name())){
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public void extractData(String datasetTableId, String taskId, String type, JobExecutionContext context) {
|
||||
DatasetTable datasetTable = getDatasetTable(datasetTableId);
|
||||
if(datasetTable == null){
|
||||
LogUtil.error("Can not find DatasetTable: " + datasetTableId);
|
||||
return;
|
||||
}
|
||||
DatasetTableTask datasetTableTask = datasetTableTaskMapper.selectByPrimaryKey(taskId);
|
||||
boolean isCronJob = (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.CRON.toString()));
|
||||
if(updateSyncStatusIsNone(datasetTable) && isCronJob){
|
||||
LogUtil.info("Skip synchronization task for table : " + datasetTableId);
|
||||
if(datasetTableTask.getStatus().equalsIgnoreCase(TaskStatus.Stopped.name()) && !datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.name())){
|
||||
LogUtil.info("Skip synchronization task, task ID : " + datasetTableTask.getId());
|
||||
return;
|
||||
}
|
||||
if(existSyncTask(datasetTable, datasetTableTask)){
|
||||
LogUtil.info("Skip synchronization task for dataset, dataset ID : " + datasetTableId);
|
||||
return;
|
||||
}
|
||||
datasetTableTask.setLastExecTime(System.currentTimeMillis());
|
||||
@@ -210,21 +228,25 @@ public class ExtractDataService {
|
||||
|
||||
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
|
||||
|
||||
// if (datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
|
||||
// datasetTableTask.setStatus(TaskStatus.Stopped.name());
|
||||
// }
|
||||
datasetTableTask.setLastExecStatus(JobStatus.Completed.name());
|
||||
dataSetTableTaskService.update(datasetTableTask);
|
||||
|
||||
}catch (Exception e){
|
||||
saveErrorLog(datasetTableId, taskId, e);
|
||||
datasetTableTask.setLastExecStatus(JobStatus.Error.name());
|
||||
// if (datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
|
||||
// datasetTableTask.setStatus(TaskStatus.Stopped.name());
|
||||
// }
|
||||
dataSetTableTaskService.update(datasetTableTask);
|
||||
|
||||
sendWebMsg(datasetTable, taskId,false);
|
||||
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null);
|
||||
dropDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)));
|
||||
deleteFile("all_scope", datasetTableId);
|
||||
}finally {
|
||||
if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
|
||||
datasetTableTask.setStatus(TaskStatus.Stopped.name());
|
||||
dataSetTableTaskService.update(datasetTableTask);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
@@ -283,6 +305,9 @@ public class ExtractDataService {
|
||||
|
||||
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
|
||||
datasetTableTask.setLastExecStatus(JobStatus.Completed.name());
|
||||
// if (datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
|
||||
// datasetTableTask.setStatus(TaskStatus.Stopped.name());
|
||||
// }
|
||||
dataSetTableTaskService.update(datasetTableTask);
|
||||
}
|
||||
}catch (Exception e){
|
||||
@@ -290,14 +315,13 @@ public class ExtractDataService {
|
||||
sendWebMsg(datasetTable, taskId,false);
|
||||
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null);
|
||||
datasetTableTask.setLastExecStatus(JobStatus.Error.name());
|
||||
// if (datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
|
||||
// datasetTableTask.setStatus(TaskStatus.Stopped.name());
|
||||
// }
|
||||
dataSetTableTaskService.update(datasetTableTask);
|
||||
deleteFile("incremental_add", datasetTableId);
|
||||
deleteFile("incremental_delete", datasetTableId);
|
||||
}finally {
|
||||
if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
|
||||
datasetTableTask.setStatus(TaskStatus.Stopped.name());
|
||||
dataSetTableTaskService.update(datasetTableTask);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user