mirror of
https://github.com/dataease/dataease.git
synced 2026-05-23 22:08:34 +08:00
feat: 支持集群(kettle)
This commit is contained in:
@@ -33,7 +33,7 @@ import java.util.*;
|
||||
public class ExcelXlsxReader extends DefaultHandler {
|
||||
|
||||
/**
|
||||
* 自定义获取表格某些信息
|
||||
* 自定义获取表格某些信
|
||||
*/
|
||||
public Map map = new TreeMap<String,String>();
|
||||
/**
|
||||
|
||||
@@ -10,6 +10,7 @@ import io.dataease.controller.request.dataset.DataSetGroupRequest;
|
||||
import io.dataease.dto.dataset.DataSetGroupDTO;
|
||||
import io.dataease.service.dataset.DataSetGroupService;
|
||||
import io.dataease.service.dataset.ExtractDataService;
|
||||
import io.dataease.service.kettle.KettleService;
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.annotations.ApiOperation;
|
||||
import org.apache.shiro.authz.annotation.Logical;
|
||||
@@ -32,6 +33,8 @@ public class DataSetGroupController {
|
||||
private DataSetGroupService dataSetGroupService;
|
||||
@Resource
|
||||
private ExtractDataService extractDataService;
|
||||
@Resource
|
||||
private KettleService kettleService;
|
||||
|
||||
@DePermissions(value = {
|
||||
@DePermission(type = DePermissionType.DATASET, value = "id"),
|
||||
@@ -71,6 +74,6 @@ public class DataSetGroupController {
|
||||
@ApiIgnore
|
||||
@PostMapping("/isKettleRunning")
|
||||
public boolean isKettleRunning() {
|
||||
return extractDataService.isKettleRunning();
|
||||
return kettleService.isKettleRunning();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,4 +42,6 @@ public class EngineController {
|
||||
return engineService.save(engine);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
package io.dataease.controller.engine;
|
||||
|
||||
|
||||
import com.github.pagehelper.Page;
|
||||
import com.github.pagehelper.PageHelper;
|
||||
import io.dataease.auth.annotation.DePermission;
|
||||
import io.dataease.base.domain.DeEngine;
|
||||
import io.dataease.commons.constants.DePermissionType;
|
||||
import io.dataease.commons.constants.ResourceAuthLevel;
|
||||
import io.dataease.commons.utils.PageUtils;
|
||||
import io.dataease.commons.utils.Pager;
|
||||
import io.dataease.controller.ResultHolder;
|
||||
import io.dataease.dto.KettleDTO;
|
||||
import io.dataease.plugins.common.entity.XpackConditionEntity;
|
||||
import io.dataease.plugins.common.entity.XpackGridRequest;
|
||||
import io.dataease.plugins.config.SpringContextUtil;
|
||||
import io.dataease.plugins.xpack.auth.dto.request.DataSetColumnPermissionsDTO;
|
||||
import io.dataease.plugins.xpack.auth.service.ColumnPermissionService;
|
||||
import io.dataease.service.kettle.KettleService;
|
||||
import io.swagger.annotations.ApiOperation;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import springfox.documentation.annotations.ApiIgnore;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@ApiIgnore
|
||||
@RequestMapping("kettle")
|
||||
@RestController
|
||||
public class KettleController {
|
||||
|
||||
@Resource
|
||||
private KettleService kettleService;
|
||||
|
||||
@ApiIgnore
|
||||
@PostMapping("save")
|
||||
public ResultHolder save(@RequestBody DeEngine engine) throws Exception{
|
||||
return kettleService.save(engine);
|
||||
}
|
||||
|
||||
|
||||
@ApiIgnore
|
||||
@PostMapping("validate")
|
||||
public void validate(@RequestBody KettleDTO kettleDTO) throws Exception{
|
||||
kettleService.validate(kettleDTO);
|
||||
}
|
||||
|
||||
@ApiIgnore
|
||||
@PostMapping("validate/{id}")
|
||||
public ResultHolder validate(@PathVariable String id) throws Exception{
|
||||
return kettleService.validate(id);
|
||||
}
|
||||
|
||||
@PostMapping("/pageList/{goPage}/{pageSize}")
|
||||
public Pager<List<DeEngine>> pageList( @PathVariable int goPage, @PathVariable int pageSize) {
|
||||
Page<Object> page = PageHelper.startPage(goPage, pageSize, true);
|
||||
return PageUtils.setPageInfo(page, kettleService.pageList());
|
||||
}
|
||||
|
||||
@ApiIgnore
|
||||
@DeleteMapping("delete/{id}")
|
||||
public void delete(@PathVariable String id) throws Exception{
|
||||
kettleService.delete(id);
|
||||
}
|
||||
}
|
||||
11
backend/src/main/java/io/dataease/dto/KettleDTO.java
Normal file
11
backend/src/main/java/io/dataease/dto/KettleDTO.java
Normal file
@@ -0,0 +1,11 @@
|
||||
package io.dataease.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class KettleDTO {
|
||||
private String carte;
|
||||
private String port;
|
||||
private String user;
|
||||
private String passwd;
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package io.dataease.job.sechedule;
|
||||
import com.fit2cloud.quartz.anno.QuartzScheduled;
|
||||
import io.dataease.service.datasource.DatasourceService;
|
||||
import io.dataease.service.dataset.DataSetTableService;
|
||||
import io.dataease.service.kettle.KettleService;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
@@ -13,6 +14,8 @@ public class Schedular {
|
||||
private DataSetTableService dataSetTableService;
|
||||
@Resource
|
||||
private DatasourceService datasourceService;
|
||||
@Resource
|
||||
private KettleService kettleService;
|
||||
|
||||
@QuartzScheduled(cron = "0 0/3 * * * ?")
|
||||
public void updateDatasetTableStatus() {
|
||||
@@ -24,4 +27,9 @@ public class Schedular {
|
||||
datasourceService.updateDatasourceStatus();
|
||||
}
|
||||
|
||||
@QuartzScheduled(cron = "0 0/30 * * * ?")
|
||||
public void updateKettleStatus() {
|
||||
kettleService.updateKettleStatus();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import io.dataease.exception.DataEaseException;
|
||||
import io.dataease.listener.util.CacheUtils;
|
||||
import io.dataease.provider.QueryProvider;
|
||||
import io.dataease.service.engine.EngineService;
|
||||
import io.dataease.service.kettle.KettleService;
|
||||
import io.dataease.service.message.DeMsgutil;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
@@ -98,6 +99,8 @@ public class ExtractDataService {
|
||||
private ExtChartViewMapper extChartViewMapper;
|
||||
@Resource
|
||||
private EngineService engineService;
|
||||
@Resource
|
||||
private KettleService kettleService;
|
||||
|
||||
private static final String lastUpdateTime = "${__last_update_time__}";
|
||||
private static final String currentUpdateTime = "${__current_update_time__}";
|
||||
@@ -107,14 +110,6 @@ public class ExtractDataService {
|
||||
|
||||
@Value("${kettle.files.keep:false}")
|
||||
private boolean kettleFilesKeep;
|
||||
@Value("${carte.host:127.0.0.1}")
|
||||
private String carte;
|
||||
@Value("${carte.port:8080}")
|
||||
private String port;
|
||||
@Value("${carte.user:cluster}")
|
||||
private String user;
|
||||
@Value("${carte.passwd:cluster}")
|
||||
private String passwd;
|
||||
|
||||
private static final String shellScript = "result=`curl --location-trusted -u %s:%s -H \"label:%s\" -H \"column_separator:%s\" -H \"columns:%s\" -H \"merge_type: %s\" -T %s -XPUT http://%s:%s/api/%s/%s/_stream_load`\n" +
|
||||
"if [ $? -eq 0 ] ; then\n" +
|
||||
@@ -730,7 +725,7 @@ public class ExtractDataService {
|
||||
break;
|
||||
}
|
||||
|
||||
SlaveServer remoteSlaveServer = getSlaveServer();
|
||||
SlaveServer remoteSlaveServer = kettleService.getSlaveServer();
|
||||
JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
|
||||
jobExecutionConfiguration.setRemoteServer(remoteSlaveServer);
|
||||
jobExecutionConfiguration.setRepository(repository);
|
||||
@@ -738,7 +733,6 @@ public class ExtractDataService {
|
||||
TransExecutionConfiguration transExecutionConfiguration = new TransExecutionConfiguration();
|
||||
transExecutionConfiguration.setRepository(repository);
|
||||
transExecutionConfiguration.setRemoteServer(remoteSlaveServer);
|
||||
|
||||
String lastTranceId = Trans.sendToSlaveServer(transMeta, transExecutionConfiguration, repository, null);
|
||||
SlaveServerTransStatus transStatus = null;
|
||||
boolean executing = true;
|
||||
@@ -772,15 +766,6 @@ public class ExtractDataService {
|
||||
}
|
||||
}
|
||||
|
||||
private SlaveServer getSlaveServer() {
|
||||
SlaveServer remoteSlaveServer = new SlaveServer();
|
||||
remoteSlaveServer.setHostname(carte);// 设置远程IP
|
||||
remoteSlaveServer.setPort(port);// 端口
|
||||
remoteSlaveServer.setUsername(user);
|
||||
remoteSlaveServer.setPassword(passwd);
|
||||
return remoteSlaveServer;
|
||||
}
|
||||
|
||||
private void generateJobFile(String extractType, DatasetTable datasetTable, String columnFields) throws Exception {
|
||||
if (engineService.isSimpleMode()) {
|
||||
return;
|
||||
@@ -1251,33 +1236,6 @@ public class ExtractDataService {
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isKettleRunning() {
|
||||
try {
|
||||
if (!InetAddress.getByName(carte).isReachable(1000)) {
|
||||
return false;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
HttpGet getMethod = new HttpGet("http://" + carte + ":" + port);
|
||||
HttpClientManager.HttpClientBuilderFacade clientBuilder = HttpClientManager.getInstance().createBuilder();
|
||||
clientBuilder.setConnectionTimeout(1);
|
||||
clientBuilder.setCredentials(user, passwd);
|
||||
try {
|
||||
CloseableHttpClient httpClient = clientBuilder.build();
|
||||
HttpResponse httpResponse = httpClient.execute(getMethod);
|
||||
int statusCode = httpResponse.getStatusLine().getStatusCode();
|
||||
if (statusCode != -1 && statusCode < 400) {
|
||||
httpResponse.getEntity().getContent().close();
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private final static String handleBinaryType = " \t\tif(\"FIELD\".equalsIgnoreCase(filed)){\n" +
|
||||
" get(Fields.Out, filed).setValue(r, \"\");\n" +
|
||||
" get(Fields.Out, filed).getValueMeta().setType(2);\n" +
|
||||
|
||||
@@ -0,0 +1,150 @@
|
||||
package io.dataease.service.kettle;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import io.dataease.base.domain.DeEngine;
|
||||
import io.dataease.base.domain.DeEngineExample;
|
||||
import io.dataease.base.mapper.DeEngineMapper;
|
||||
import io.dataease.commons.utils.HttpClientConfig;
|
||||
import io.dataease.commons.utils.HttpClientUtil;
|
||||
import io.dataease.controller.ResultHolder;
|
||||
import io.dataease.dto.KettleDTO;
|
||||
import io.dataease.service.engine.EngineService;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.checkerframework.checker.units.qual.K;
|
||||
import org.pentaho.di.cluster.SlaveServer;
|
||||
import org.pentaho.di.core.util.HttpClientManager;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.net.InetAddress;
|
||||
import java.util.Base64;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
public class KettleService {
|
||||
|
||||
@Resource
|
||||
private Environment env;
|
||||
@Resource
|
||||
private DeEngineMapper deEngineMapper;
|
||||
@Resource
|
||||
private EngineService engineService;
|
||||
|
||||
public ResultHolder save(DeEngine kettle) throws Exception {
|
||||
try {
|
||||
validate(new Gson().fromJson(kettle.getConfiguration(), KettleDTO.class));
|
||||
kettle.setStatus("Success");
|
||||
}catch (Exception e){
|
||||
kettle.setStatus("Error");
|
||||
}
|
||||
|
||||
if (StringUtils.isEmpty(kettle.getId())) {
|
||||
kettle.setId(UUID.randomUUID().toString());
|
||||
kettle.setType("kettle");
|
||||
deEngineMapper.insert(kettle);
|
||||
} else {
|
||||
deEngineMapper.updateByPrimaryKeyWithBLOBs(kettle);
|
||||
}
|
||||
return ResultHolder.success(kettle);
|
||||
}
|
||||
|
||||
public void delete(String id){
|
||||
deEngineMapper.deleteByPrimaryKey(id);
|
||||
}
|
||||
|
||||
public void validate(KettleDTO kettleDTO) throws Exception {
|
||||
HttpClientConfig httpClientConfig = new HttpClientConfig();
|
||||
String authValue = "Basic " + Base64.getUrlEncoder().encodeToString((kettleDTO.getUser()
|
||||
+ ":" + kettleDTO.getPasswd()).getBytes());
|
||||
httpClientConfig.addHeader("Authorization", authValue);
|
||||
String response = HttpClientUtil.get("http://" + kettleDTO.getCarte() + ":" + kettleDTO.getPort() + "/kettle/status/", httpClientConfig);
|
||||
}
|
||||
|
||||
public ResultHolder validate(String id) {
|
||||
DeEngine kettle = deEngineMapper.selectByPrimaryKey(id);
|
||||
try {
|
||||
validate(new Gson().fromJson(kettle.getConfiguration(), KettleDTO.class));
|
||||
kettle.setStatus("Success");
|
||||
deEngineMapper.updateByPrimaryKeyWithBLOBs(kettle);
|
||||
return ResultHolder.success(kettle);
|
||||
}catch (Exception e){
|
||||
kettle.setStatus("Error");
|
||||
deEngineMapper.updateByPrimaryKeyWithBLOBs(kettle);
|
||||
return ResultHolder.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public List<DeEngine> pageList(){
|
||||
DeEngineExample deEngineExample = new DeEngineExample();
|
||||
deEngineExample.createCriteria().andTypeEqualTo("kettle");
|
||||
return deEngineMapper.selectByExampleWithBLOBs(deEngineExample);
|
||||
}
|
||||
|
||||
public void updateKettleStatus(){
|
||||
if(!engineService.isClusterMode()){
|
||||
return;
|
||||
}
|
||||
List<DeEngine>kettles = pageList();
|
||||
kettles.forEach(kettle -> {
|
||||
validate(kettle.getId());
|
||||
});
|
||||
}
|
||||
|
||||
public SlaveServer getSlaveServer() throws Exception{
|
||||
SlaveServer remoteSlaveServer = new SlaveServer();
|
||||
if(engineService.isLocalMode()){
|
||||
remoteSlaveServer.setHostname(env.getProperty("carte.host", "127.0.0.1"));
|
||||
remoteSlaveServer.setPort(env.getProperty("carte.port", "8080"));
|
||||
remoteSlaveServer.setUsername(env.getProperty("carte.user", "cluster"));
|
||||
remoteSlaveServer.setPassword(env.getProperty("carte.passwd", "cluster"));
|
||||
}else {
|
||||
List<DeEngine> kettles = pageList().stream().filter(kettle -> kettle.getStatus() != null && kettle.getStatus().equalsIgnoreCase("Success"))
|
||||
.collect(Collectors.toList());
|
||||
if(CollectionUtils.isEmpty(kettles)){
|
||||
throw new Exception("No valid kettle service.");
|
||||
}
|
||||
DeEngine kettle = kettles.get(new Random().nextInt(kettles.size()));
|
||||
KettleDTO kettleDTO = new Gson().fromJson(kettle.getConfiguration(), KettleDTO.class);
|
||||
remoteSlaveServer.setHostname(kettleDTO.getCarte());
|
||||
remoteSlaveServer.setPort(kettleDTO.getPort());
|
||||
remoteSlaveServer.setUsername(kettleDTO.getUser());
|
||||
remoteSlaveServer.setPort(kettleDTO.getPasswd());
|
||||
}
|
||||
return remoteSlaveServer;
|
||||
}
|
||||
|
||||
public boolean isKettleRunning() {
|
||||
if(engineService.isLocalMode()){
|
||||
try {
|
||||
KettleDTO kettleDTO = new KettleDTO();
|
||||
kettleDTO.setCarte(env.getProperty("carte.host", "127.0.0.1"));
|
||||
kettleDTO.setPort(env.getProperty("carte.port", "8080"));
|
||||
kettleDTO.setUser(env.getProperty("carte.user", "cluster"));
|
||||
kettleDTO.setPasswd(env.getProperty("carte.passwd", "cluster"));
|
||||
validate(kettleDTO);
|
||||
return true;
|
||||
}catch (Exception e){
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if(engineService.isClusterMode()){
|
||||
List<DeEngine> kettles = pageList().stream().filter(kettle -> kettle.getStatus() != null && kettle.getStatus().equalsIgnoreCase("Success"))
|
||||
.collect(Collectors.toList());
|
||||
if(CollectionUtils.isEmpty(kettles)){
|
||||
return false;
|
||||
}else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user