分享

springboot项目集成dolphinscheduler调度器 实现datax数据同步任务

levycui 发表于 2021-10-27 19:50:23 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 1 6742
问题导读:
1、mysql及hbase之间的数据如何同步?
2、同步代码各模块如何开发?
3、如何创建同步任务配置-mysql->mysql?
4、如何删除同步任务配置?



Datax概述
1.概述
27296696c59845ada4e27b761b15eaa2.png

2.功能清单

CRUD增删改查 、启动任务、停止任务

3.说明:本项目只支持mysql及hbase之间的数据同步
代码模块
配置文件
2021-10-27_194820.jpg

pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <modelVersion>4.0.0</modelVersion>
  6.     <parent>
  7.         <groupId>com.geespace.microservices.bd-platform</groupId>
  8.         <artifactId>all</artifactId>
  9.         <version>1.0-SNAPSHOT</version>
  10.     </parent>
  11.     <artifactId>data-sync-config</artifactId>
  12.     <version>1.0-SNAPSHOT</version>
  13.     <properties>
  14.         <java.version>1.8</java.version>
  15.         <gson.version>2.8.1</gson.version>
  16.     </properties>
  17.     <dependencies>
  18.         <dependency>
  19.             <groupId>com.github.pagehelper</groupId>
  20.             <artifactId>pagehelper-spring-boot-starter</artifactId>
  21.         </dependency>
  22.         <dependency>
  23.             <groupId>com.google.code.gson</groupId>
  24.             <artifactId>gson</artifactId>
  25.             <version>${gson.version}</version>
  26.         </dependency>
  27.         <dependency>
  28.             <groupId>org.springframework.boot</groupId>
  29.             <artifactId>spring-boot-starter-web</artifactId>
  30.         </dependency>
  31.         <!--elasticsearch-->
  32.         <dependency>
  33.             <groupId>org.elasticsearch</groupId>
  34.             <artifactId>elasticsearch</artifactId>
  35.             <version>6.8.12</version>
  36.         </dependency>
  37.         <dependency>
  38.             <groupId>org.elasticsearch.client</groupId>
  39.             <artifactId>elasticsearch-rest-client</artifactId>
  40.             <version>6.8.12</version>
  41.         </dependency>
  42.         <dependency>
  43.             <groupId>org.elasticsearch.client</groupId>
  44.             <artifactId>elasticsearch-rest-high-level-client</artifactId>
  45.             <version>6.8.12</version>
  46.         </dependency>
  47.         <dependency>
  48.             <groupId>org.springframework.boot</groupId>
  49.             <artifactId>spring-boot-devtools</artifactId>
  50.             <scope>runtime</scope>
  51.             <optional>true</optional>
  52.         </dependency>
  53.         <dependency>
  54.             <groupId>mysql</groupId>
  55.             <artifactId>mysql-connector-java</artifactId>
  56.             <scope>runtime</scope>
  57.         </dependency>
  58.         <dependency>
  59.             <groupId>org.springframework.boot</groupId>
  60.             <artifactId>spring-boot-configuration-processor</artifactId>
  61.             <optional>true</optional>
  62.         </dependency>
  63.         <dependency>
  64.             <groupId>org.projectlombok</groupId>
  65.             <artifactId>lombok</artifactId>
  66.             <optional>true</optional>
  67.         </dependency>
  68.         <dependency>
  69.             <groupId>org.springframework.boot</groupId>
  70.             <artifactId>spring-boot-starter-test</artifactId>
  71.             <scope>test</scope>
  72.             <exclusions>
  73.                 <exclusion>
  74.                     <groupId>org.junit.vintage</groupId>
  75.                     <artifactId>junit-vintage-engine</artifactId>
  76.                 </exclusion>
  77.             </exclusions>
  78.         </dependency>
  79.         <dependency>
  80.             <groupId>com.alibaba</groupId>
  81.             <artifactId>fastjson</artifactId>
  82.         </dependency>
  83.         <dependency>
  84.             <groupId>org.codehaus.jackson</groupId>
  85.             <artifactId>jackson-core-asl</artifactId>
  86.             <version>1.9.13</version>
  87.             <scope>compile</scope>
  88.         </dependency>
  89.         <dependency>
  90.             <groupId>com.geespace.microservices.bd-platform</groupId>
  91.             <artifactId>data-config</artifactId>
  92.             <version>1.0-SNAPSHOT</version>
  93.             <scope>compile</scope>
  94.         </dependency>
  95.         <!--httpclient-->
  96.         <dependency>
  97.             <groupId>commons-httpclient</groupId>
  98.             <artifactId>commons-httpclient</artifactId>
  99.             <version>3.1</version>
  100.         </dependency>
  101.     </dependencies>
  102.     <build>
  103.         <plugins>
  104.             <plugin>
  105.                 <groupId>org.apache.maven.plugins</groupId>
  106.                 <artifactId>maven-shade-plugin</artifactId>
  107.                 <version>2.2</version>
  108.                 <dependencies>
  109.                     <dependency>
  110.                         <groupId>org.springframework.boot</groupId>
  111.                         <artifactId>spring-boot-maven-plugin</artifactId>
  112.                         <version>2.1.4.RELEASE</version>
  113.                     </dependency>
  114.                 </dependencies>
  115.                 <configuration>
  116.                     <keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope>
  117.                     <createDependencyReducedPom>true</createDependencyReducedPom>
  118.                     <filters>
  119.                         <filter>
  120.                             <artifact>*:*</artifact>
  121.                             <excludes>
  122.                                 <exclude>META-INF/*.SF</exclude>
  123.                                 <exclude>META-INF/*.DSA</exclude>
  124.                                 <exclude>META-INF/*.RSA</exclude>
  125.                             </excludes>
  126.                         </filter>
  127.                     </filters>
  128.                 </configuration>
  129.                 <executions>
  130.                     <execution>
  131.                         <phase>package</phase>
  132.                         <goals>
  133.                             <goal>shade</goal>
  134.                         </goals>
  135.                         <configuration>
  136.                             <transformers>
  137.                                 <transformer
  138.                                         implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
  139.                                     <resource>META-INF/spring.handlers</resource>
  140.                                 </transformer>
  141.                                 <transformer
  142.                                         implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
  143.                                     <resource>META-INF/spring.factories</resource>
  144.                                 </transformer>
  145.                                 <transformer
  146.                                         implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
  147.                                     <resource>META-INF/spring.schemas</resource>
  148.                                 </transformer>
  149.                                 <transformer
  150.                                         implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
  151.                                 <transformer
  152.                                         implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  153.                                     <mainClass>com.geespace.microservices.dispatcher.DispatchApplication</mainClass>
  154.                                 </transformer>
  155.                             </transformers>
  156.                         </configuration>
  157.                     </execution>
  158.                 </executions>
  159.             </plugin>
  160.         </plugins>
  161.     </build>
  162.    
  163. </project>
复制代码

DataxDolphinschedulerController
  1. import java.io.IOException;
  2. import java.io.UnsupportedEncodingException;
  3. import java.net.URLEncoder;
  4. import java.util.ArrayList;
  5. import java.util.List;
  6. import java.util.Map;
  7. import javax.servlet.http.HttpServletRequest;
  8. import com.alibaba.fastjson.JSONArray;
  9. import com.alibaba.fastjson.JSONObject;
  10. import com.geespace.microservices.builder.dto.ProcessDto;
  11. import com.geespace.microservices.builder.dto.SyncConfigDto;
  12. import com.geespace.microservices.builder.enums.DictionaryEnum;
  13. import com.geespace.microservices.builder.request.ConfigAddForm;
  14. import com.geespace.microservices.builder.request.ConfigSelectForm;
  15. import com.geespace.microservices.builder.request.ConfigUpdateForm;
  16. import com.geespace.microservices.builder.response.BizCode;
  17. import com.geespace.microservices.builder.response.DolphinschedulerResponse;
  18. import com.geespace.microservices.builder.response.Msg;
  19. import com.geespace.microservices.builder.response.PageResult;
  20. import com.geespace.microservices.builder.response.ReturnResult;
  21. import com.geespace.microservices.builder.service.SyncConfigService;
  22. import com.geespace.microservices.builder.tools.JsonTools;
  23. import lombok.extern.slf4j.Slf4j;
  24. import org.apache.commons.httpclient.HttpException;
  25. import org.apache.commons.httpclient.NameValuePair;
  26. import org.apache.commons.httpclient.methods.PostMethod;
  27. import org.springframework.beans.BeanUtils;
  28. import org.springframework.beans.factory.annotation.Autowired;
  29. import org.springframework.beans.factory.annotation.Value;
  30. import org.springframework.http.HttpEntity;
  31. import org.springframework.http.HttpHeaders;
  32. import org.springframework.http.HttpMethod;
  33. import org.springframework.http.ResponseEntity;
  34. import org.springframework.transaction.annotation.Transactional;
  35. import org.springframework.util.StringUtils;
  36. import org.springframework.validation.annotation.Validated;
  37. import org.springframework.web.bind.annotation.DeleteMapping;
  38. import org.springframework.web.bind.annotation.GetMapping;
  39. import org.springframework.web.bind.annotation.PathVariable;
  40. import org.springframework.web.bind.annotation.PostMapping;
  41. import org.springframework.web.bind.annotation.PutMapping;
  42. import org.springframework.web.bind.annotation.RequestBody;
  43. import org.springframework.web.bind.annotation.RequestMapping;
  44. import org.springframework.web.bind.annotation.RequestMethod;
  45. import org.springframework.web.bind.annotation.RequestParam;
  46. import org.springframework.web.bind.annotation.RestController;
  47. import org.springframework.web.client.RestTemplate;
  48. /**
  49. * 迁移dolphinscheduler调度器
  50. *
  51. * @author: liudz
  52. * @date: 2021/5/7
  53. */
  54. @Slf4j
  55. @RestController
  56. @RequestMapping("/dolphinscheduler/v1")
  57. public class DataxDolphinschedulerController {
  58.     @Autowired
  59.     private RestTemplate restTemplate;
  60.     @Value("${dolphinscheduler.token}")
  61.     String token;
  62.     @Value("${dolphinscheduler.address}")
  63.     String address;
  64.     public static final int ZERO = 0;
  65.     public static final int SUCCESS = 200;
  66.     public static final String CREATE = "create";
  67.     public static final String UPDATE = "update";
  68.     public static final String ADD = "add";
  69.     public static final String DELETE = "delete";
  70.     public static final String ONLINE = "ONLINE";
  71.     public static final String OFFLINE = "OFFLINE";
  72.     public static final int ONE_THOUSAND_AND_FIVE_HUNDRED = 1500;
  73.     public static final int SIX = 6;
  74.     public static final int EIGHTY = 80;
  75.     public static final int THREE = 3;
  76.     @Autowired
  77.     private SyncConfigService syncConfigService;
  78.     /**
  79.      * 创建任务-创建用户下唯一工作流,无则创建有则并排添加
  80.      * @param request request
  81.      * @param form 任务参数
  82.      * @author liudz
  83.      * @date 2021/5/8
  84.      * @return 执行结果
  85.      **/
  86.     @PostMapping("/project/process/datax")
  87.     @Transactional(rollbackFor = Exception.class)
  88.     public ReturnResult operatorDataxTask(HttpServletRequest request, @RequestBody @Validated ConfigAddForm form) {
  89.         Long userId = Long.valueOf(request.getUserPrincipal().getName());
  90.         form.setUserId(userId);
  91.         ReturnResult<SyncConfigDto> dataxTaskReturnResult = syncConfigService.addConfig(form);
  92.         if (dataxTaskReturnResult.getCode() != SUCCESS) {
  93.             return dataxTaskReturnResult;
  94.         }
  95.         log.info("--(1)addDataxTaskResult--success");
  96.         form.setId(dataxTaskReturnResult.getData().getId());
  97.         if (dataxTaskReturnResult.getCode() == SUCCESS) {
  98.             Boolean verifyResult = verifyProcessExist(userId + "-dataxTask", form.getProjectName());
  99.             log.info("--(2)verifyProcessExist--success:{}", verifyResult);
  100.             if (!verifyResult) {
  101.                 ProcessDto processDto = packageProcessParam(
  102.                         "create", userId + "-dataxTask", dataxTaskReturnResult.getData(), null);
  103.                 log.info("--(3)packageProcessParam--success");
  104.                 processDto.setProjectName(form.getProjectName());
  105.                 processDto.setProjectId(form.getProjectId());
  106.                 dataxTaskReturnResult =  createProcess(processDto);
  107.             } else {
  108.                 //获取用户下唯一工作流ID
  109.                 DolphinschedulerResponse processInfoList = getUserProcess(form.getProjectName());
  110.                 JSONObject processJson = new JSONObject();
  111.                 log.info("--(3)getUserProcess--success:{}", processInfoList);
  112.                 List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
  113.                 for (Map<String, Object> map : list) {
  114.                     if (map.get("name").equals(userId + "-dataxTask")) {
  115.                         processJson.fluentPutAll(map);
  116.                     }
  117.                 }
  118.                 ProcessDto processDto = packageProcessParam(
  119.                         "add", userId + "-dataxTask", dataxTaskReturnResult.getData(), processJson);
  120.                 processDto.setId(processJson.getInteger("id"));
  121.                 log.info("--(4)packageProcessParam--success");
  122.                 if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
  123.                     releaseProcessDefinition(form.getProjectName(), userId + "-dataxTask",
  124.                             processDto.getId(), 0);
  125.                     log.info("--(5)releaseProcessDefinition--OFFLINE--success");
  126.                 }
  127.                 dataxTaskReturnResult =  updateProcess(form, processDto);
  128.             }
  129.         }
  130.         return dataxTaskReturnResult;
  131.     }
  132.     /**
  133.      * 更新任务
  134.      * @param request request
  135.      * @param form 任务参数
  136.      * @author liudz
  137.      * @date 2021/5/8
  138.      * @return 执行结果
  139.      **/
  140.     @PutMapping("/project/process/datax")
  141.     @Transactional(rollbackFor = Exception.class)
  142.     public ReturnResult updateDataxTask(HttpServletRequest request, @RequestBody @Validated ConfigUpdateForm form) {
  143.         Long userId = Long.valueOf(request.getUserPrincipal().getName());
  144.         form.setUserId(userId);
  145.         ReturnResult<SyncConfigDto> dataxTaskReturnResult = syncConfigService.updateConfig(form);
  146.         log.info("--(1)updateDataxTaskResult--mysql--success");
  147.         if (dataxTaskReturnResult.getCode() == SUCCESS) {
  148.             //获取用户下唯一工作流ID
  149.             DolphinschedulerResponse processInfoList = getUserProcess(form.getProjectName());
  150.             JSONObject processJson = new JSONObject();
  151.             log.info("--(2)getUserProcess--success:{}", processInfoList);
  152.             List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
  153.             for (Map<String, Object> map : list) {
  154.                 if (map.get("name").equals(userId + "-dataxTask")) {
  155.                     processJson.fluentPutAll(map);
  156.                 }
  157.             }
  158.             ProcessDto processDto = packageProcessParam(
  159.                     "update", userId + "-dataxTask", dataxTaskReturnResult.getData(), processJson);
  160.             processDto.setProjectName(form.getProjectName());
  161.             processDto.setProjectId(form.getProjectId());
  162.             processDto.setId(processJson.getInteger("id"));
  163.             log.info("--(3)packageProcessParam--success");
  164.             if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
  165.                 releaseProcessDefinition(form.getProjectName(), userId + "-dataxTask",
  166.                         processDto.getId(), 0);
  167.                 log.info("--(4)releaseProcessDefinition--OFFLINE--success");
  168.             }
  169.             ConfigAddForm configAddForm = new ConfigAddForm();
  170.             BeanUtils.copyProperties(form, configAddForm);
  171.             return updateProcess(configAddForm, processDto);
  172.         }
  173.         return dataxTaskReturnResult;
  174.     }
  175.     /**
  176.      * 删除任务
  177.      * @param request request
  178.      * @param projectName 项目名称
  179.      * @param id 任务ID
  180.      * @author liudz
  181.      * @date 2021/5/8
  182.      * @return 执行结果
  183.      **/
  184.     @DeleteMapping("/project/process/datax/{projectName}/{id}")
  185.     @Transactional(rollbackFor = Exception.class)
  186.     public ReturnResult deleteDataxTask(HttpServletRequest request, @PathVariable("projectName") String projectName,
  187.                                         @PathVariable("id") Long id) {
  188.         Long userId = Long.valueOf(request.getUserPrincipal().getName());
  189.         SyncConfigDto syncConfigDto = new SyncConfigDto();
  190.         syncConfigDto.setId(id);
  191.         ConfigAddForm configAddForm = new ConfigAddForm();
  192.         configAddForm.setProjectName(projectName);
  193.         ReturnResult dataxTaskReturnResult = syncConfigService.delete(id, userId);
  194.         log.info("--(1)deleteDataxTask--mysql--success");
  195.         if (dataxTaskReturnResult.getCode() == SUCCESS) {
  196.             //获取用户下唯一工作流ID
  197.             DolphinschedulerResponse processInfoList = getUserProcess(projectName);
  198.             JSONObject processJson = new JSONObject();
  199.             log.info("--(2)getUserProcess--success:{}", processInfoList);
  200.             List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
  201.             for (Map<String, Object> map : list) {
  202.                 if (map.get("name").equals(userId + "-dataxTask")) {
  203.                     processJson.fluentPutAll(map);
  204.                 }
  205.             }
  206.             ProcessDto processDto = packageProcessParam(
  207.                     "delete", userId + "-dataxTask", syncConfigDto, processJson);
  208.             processDto.setId(processJson.getInteger("id"));
  209.             log.info("--(3)packageProcessParam--success");
  210.             if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
  211.                 releaseProcessDefinition(projectName, userId + "-dataxTask",
  212.                         processDto.getId(), 0);
  213.                 log.info("--(4)releaseProcessDefinition--OFFLINE--success");
  214.             }
  215.             if (JSONObject.parseObject(processDto.getLocations()).keySet().size() == 0) {
  216.                 //删除工作流
  217.                 deleteProcess(configAddForm, processDto);
  218.             } else {
  219.                 //更新工作流
  220.                 updateProcess(configAddForm, processDto);
  221.             }
  222.         }
  223.         return dataxTaskReturnResult;
  224.     }
  225.     /**
  226.      * 校验工作流是否存在
  227.      *
  228.      * @param processName
  229.      *            工作流名称
  230.      * @param projectName 项目名称
  231.      * @author liudz
  232.      * @date 2021/5/8
  233.      * @return boolean
  234.      **/
  235.     public Boolean verifyProcessExist(String processName, String projectName) {
  236.         HttpHeaders headers = new HttpHeaders();
  237.         headers.set("token", token);
  238.         headers.set("Content-Type", "application/json");
  239.         HttpEntity requestEntity = new HttpEntity(headers);
  240.         ResponseEntity<DolphinschedulerResponse> returnResult =
  241.             restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName
  242.                             + "/process/verify-name?name=" + processName,
  243.                 HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
  244.         if (returnResult.getBody().getCode() == ZERO) {
  245.             return false;
  246.         }
  247.         return true;
  248.     }
  249.     /**
  250.      * 创建工作流
  251.      * @param processDto processDto
  252.      * @author liudz
  253.      * @date 2021/5/7
  254.      * @return 执行结果
  255.      **/
  256.     public ReturnResult createProcess(ProcessDto processDto) {
  257.         try {
  258.             String postURL = address + "/dolphinscheduler/projects/"
  259.                    + URLEncoder.encode(processDto.getProjectName(), "utf-8") + "/process/save";
  260.             PostMethod postMethod = new PostMethod(postURL);
  261.             postMethod.setRequestHeader("Content-Type",
  262.                     "application/x-www-form-urlencoded;charset=utf-8");
  263.             postMethod.setRequestHeader("token", token);
  264.             NameValuePair[] data = {new NameValuePair("connects", processDto.getConnects()),
  265.                 new NameValuePair("name", processDto.getName()),
  266.                 new NameValuePair("locations", processDto.getLocations()),
  267.                 new NameValuePair("processDefinitionJson", processDto.getProcessDefinitionJson())};
  268.             postMethod.setRequestBody(data);
  269.             org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
  270.             httpClient.executeMethod(postMethod);
  271.             JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
  272.             log.info("--(5)httpCreateProcess:{}", result);
  273.             if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
  274.                 return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg"));
  275.             }
  276.         } catch (Exception e) {
  277.             log.info("请求异常:{}", e);
  278.         }
  279.         return ReturnResult.success();
  280.     }
  281.     /**
  282.      * 更新工作流
  283.      * @param vo vo
  284.      * @param processDto processDto
  285.      * @author liudz
  286.      * @date 2021/5/7
  287.      * @return 执行结果
  288.      **/
  289.     public ReturnResult updateProcess(ConfigAddForm vo, ProcessDto processDto) {
  290.         try {
  291.             String postURL = address + "/dolphinscheduler/projects/"
  292.                    + URLEncoder.encode(vo.getProjectName(), "utf-8") + "/process/update";
  293.             PostMethod postMethod = new PostMethod(postURL);
  294.             postMethod.setRequestHeader("Content-Type",
  295.                     "application/x-www-form-urlencoded;charset=utf-8");
  296.             postMethod.setRequestHeader("token", token);
  297.             // 参数设置,需要注意的就是里边不能传NULL,要传空字符串
  298.             NameValuePair[] data = {new NameValuePair("connects", processDto.getConnects()),
  299.                 new NameValuePair("name", processDto.getName()),
  300.                 new NameValuePair("locations", processDto.getLocations()),
  301.                 new NameValuePair("id", processDto.getId().toString()),
  302.                 new NameValuePair("processDefinitionJson", processDto.getProcessDefinitionJson())};
  303.             postMethod.setRequestBody(data);
  304.             org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
  305.             httpClient.executeMethod(postMethod);
  306.             JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
  307.             log.info("--(5)httpUpdateProcess:{}", result);
  308.             if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
  309.                 return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg"));
  310.             }
  311.         } catch (Exception e) {
  312.             log.info("请求异常:{}", e);
  313.         }
  314.         return ReturnResult.success();
  315.     }
  316.     /**
  317.      * 删除工作流
  318.      * @param dto dto
  319.      * @param processDto processDto
  320.      * @author liudz
  321.      * @date 2021/5/7
  322.      * @return 执行结果
  323.      **/
  324.     public DolphinschedulerResponse deleteProcess(ConfigAddForm dto, ProcessDto processDto) {
  325.             HttpHeaders headers = new HttpHeaders();
  326.             headers.set("token", token);
  327.             headers.set("Content-Type", "application/json");
  328.             HttpEntity requestEntity = new HttpEntity(headers);
  329.             ResponseEntity<DolphinschedulerResponse> returnResult =
  330.                     restTemplate.exchange(address + "/dolphinscheduler/projects/" + dto.getProjectName()
  331.                                    + "/process/delete?processDefinitionId=" + processDto.getId(),
  332.                             HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
  333.             log.info("--(5)httpDeleteProcess:{}", returnResult);
  334.         return returnResult.getBody();
  335.     }
  336.     /**
  337.      * 获取dolphinscheduler上的资源spark可拖拽jar的id
  338.      *
  339.      * @author liudz
  340.      * @date 2021/5/8
  341.      * @return id
  342.      **/
  343.     public Integer getSparkResourceJarId() {
  344.         Integer resourceId = null;
  345.         HttpHeaders headers = new HttpHeaders();
  346.         headers.set("token", token);
  347.         headers.set("Content-Type", "application/json");
  348.         HttpEntity requestEntity = new HttpEntity(headers);
  349.         ResponseEntity<DolphinschedulerResponse> returnResult =
  350.             restTemplate.exchange(address + "/dolphinscheduler/resources/authorize-resource-tree?userId=1",
  351.                 HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
  352.         List<Map<String, Object>> list = (List<Map<String, Object>>) returnResult.getBody().getData();
  353.         for (Map<String, Object> map : list) {
  354.             if (map.get("name").equals("big_data02.jar")) {
  355.                 resourceId = Integer.valueOf(map.get("id").toString());
  356.             }
  357.         }
  358.         return resourceId;
  359.     }
  360.     /**
  361.      * 获取dolphinscheduler上的某用户下唯一工作流
  362.      * @param projectName 项目名称
  363.      * @author liudz
  364.      * @date 2021/5/8
  365.      * @return id
  366.      **/
  367.     public DolphinschedulerResponse getUserProcess(String projectName) {
  368.         HttpHeaders headers = new HttpHeaders();
  369.         headers.set("token", token);
  370.         headers.set("Content-Type", "application/json");
  371.         HttpEntity requestEntity = new HttpEntity(headers);
  372.         ResponseEntity<DolphinschedulerResponse> returnResult =
  373.             restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName + "/process/list",
  374.                 HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
  375.         return returnResult.getBody();
  376.     }
  377.     /**
  378.      *  封装参数
  379.      * @param type 操作类型
  380.      * @param processName 用户工作流名称
  381.      * @param dto 任务参数
  382.      * @param processJson 工作流json
  383.      * @author liudz
  384.      * @date 2021/5/13
  385.      * @return ProcessDto
  386.      **/
  387.     public ProcessDto packageProcessParam(String type, String processName, SyncConfigDto dto, JSONObject processJson) {
  388.         ProcessDto processDto = new ProcessDto();
  389.         processDto.setConnects("[]");
  390.         processDto.setName(processName);
  391.         JSONObject locationsOne = new JSONObject();
  392.         JSONObject locationsTwo = new JSONObject();
  393.         locationsTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("targetarr", "").fluentPut("nodenumber", "0");
  394.         locationsTwo.fluentPut("x", 0).fluentPut("y", 0);
  395.         locationsOne.put("datax-" + dto.getId(), locationsTwo);
  396.         // 创建工作流
  397.         if (CREATE.equals(type)) {
  398.             processDto = packageProcessParamOfCreate(processDto, dto, locationsOne);
  399.          } else if (ADD.equals(type)) {
  400.             //工作流添加节点
  401.             processDto = packageProcessParamOfAdd(processDto, dto, processJson, locationsOne, locationsTwo);
  402.         } else if (UPDATE.equals(type)) {
  403.             //更新工作流-只更新参数processDefinitionJson的tasks参数
  404.             processDto = packageProcessParamOfUpdate(processDto, processJson, dto);
  405.         } else if (DELETE.equals(type)) {
  406.             //更新工作流或删除工作流-更新则删除参数processDefinitionJson的tasks参数
  407.             processDto = packageProcessParamOfDelete(processDto, processJson, dto);
  408.         }
  409.         return processDto;
  410.     }
  411.     /**
  412.      * packageProcessParamOfCreate
  413.      * @param processDto 工作流参数
  414.      * @param dto 任务参数
  415.      * @param locationsOne locationsOne
  416.      * @author liudz
  417.      * @date 2021/5/7
  418.      * @return ProcessDto
  419.      **/
  420.     public ProcessDto packageProcessParamOfCreate(ProcessDto processDto, SyncConfigDto dto, JSONObject locationsOne) {
  421.         processDto.setLocations(locationsOne.toString());
  422.         JSONObject processOne = new JSONObject();
  423.         processOne.fluentPut("globalParams", new ArrayList<>()).fluentPut("tenantId", THREE).fluentPut("timeout", 0);
  424.         JSONObject processTwo = new JSONObject();
  425.         processTwo.fluentPut("type", "DATAX").fluentPut("id", "datax-" + dto.getId());
  426.         processTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("description", "");
  427.         String taskJsonString = dto.getContent().toString();
  428.         processTwo.put("params", JSONObject.parseObject("{"localParams":[],"customConfig":1,"
  429.                + ""json":"" + taskJsonString.replace(""", "\\"") + ""}"));
  430.         JSONObject jsonTimeout = new JSONObject();
  431.         jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false);
  432.         processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL");
  433.         JSONObject processTree = new JSONObject();
  434.         processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>());
  435.         JSONObject jsonconditionResult = new JSONObject();
  436.         jsonconditionResult.put("successNode", new ArrayList<>());
  437.         jsonconditionResult.put("failedNode", new ArrayList<>());
  438.         processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSONObject());
  439.         processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1");
  440.         processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default");
  441.         processTwo.fluentPut("preTasks", new ArrayList<>());
  442.         JSONArray processTaskArray = new JSONArray();
  443.         processTaskArray.add(processTwo);
  444.         processOne.put("tasks", processTaskArray);
  445.         processDto.setProcessDefinitionJson(processOne.toString());
  446.         return processDto;
  447.     }
  448.     /**
  449.      * packageProcessParamOfAdd
  450.      * @param processDto 工作流参数
  451.      * @param locationsOne locationsOne
  452.      * @param locationsTwo locationsTwo
  453.      * @param dto 任务参数
  454.      * @param processJson 工作流json
  455.      * @author liudz
  456.      * @date 2021/5/7
  457.      * @return ProcessDto
  458.      **/
  459.     public ProcessDto packageProcessParamOfAdd(ProcessDto processDto, SyncConfigDto dto, JSONObject processJson,
  460.                                                JSONObject locationsOne, JSONObject locationsTwo) {
  461.         String maxTaskKey = JsonTools.getJsonStringMaxKey(processJson.getString("locations"));
  462.         Integer x = processJson.getJSONObject("locations").getJSONObject(maxTaskKey).getInteger("x");
  463.         Integer y = processJson.getJSONObject("locations").getJSONObject(maxTaskKey).getInteger("y");
  464.         if (x < ONE_THOUSAND_AND_FIVE_HUNDRED) {
  465.             locationsTwo.fluentPut("x", x + EIGHTY).fluentPut("y", y);
  466.         } else if (x >= ONE_THOUSAND_AND_FIVE_HUNDRED) {
  467.             locationsTwo.fluentPut("y", y + EIGHTY).fluentPut("x", 0);
  468.         }
  469.         locationsOne = processJson.getJSONObject("locations").fluentPut("datax-" + dto.getId(), locationsTwo);
  470.         processDto.setLocations(locationsOne.toString());
  471.         processDto.setId(processJson.getInteger("id"));
  472.         JSONObject processTwo = new JSONObject();
  473.         processTwo.fluentPut("type", "DATAX").fluentPut("id", "datax-" + dto.getId());
  474.         processTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("description", "");
  475.         String taskJsonString = dto.getContent().toString().replace("}}", "} }").replace("{{", "{ {");
  476.         processTwo.put("params", JSONObject.parseObject("{"localParams":[],"customConfig":1,"
  477.                 + ""json":"" + taskJsonString.replace(""", "\\"") + ""}"));
  478.         JSONObject jsonTimeout = new JSONObject();
  479.         jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false);
  480.         processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL");
  481.         JSONObject processTree = new JSONObject();
  482.         processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>());
  483.         JSONObject jsonconditionResult = new JSONObject();
  484.         jsonconditionResult.put("successNode", new ArrayList<>());
  485.         jsonconditionResult.put("failedNode", new ArrayList<>());
  486.         processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSONObject());
  487.         processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1");
  488.         processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default");
  489.         processTwo.fluentPut("preTasks", new ArrayList<>());
  490.         JSONObject jsonNew = processJson.getJSONObject("processDefinitionJson");
  491.         JSONArray jsonArray = jsonNew.getJSONArray("tasks");
  492.         jsonArray.add(processTwo);
  493.         jsonNew.put("tasks", jsonArray);
  494.         processDto.setProcessDefinitionJson(jsonNew.toString());
  495.         return processDto;
  496.     }
  497.     /**
  498.      * packageProcessParamOfUpdate
  499.      * @param processDto 工作流参数
  500.      * @param dto 任务参数
  501.      * @param processJson 工作流json
  502.      * @author liudz
  503.      * @date 2021/5/7
  504.      * @return ProcessDto
  505.      **/
  506.     public ProcessDto packageProcessParamOfUpdate(ProcessDto processDto, JSONObject processJson, SyncConfigDto dto) {
  507.         processDto.setLocations(processJson.getString("locations"));
  508.         processDto.setId(processJson.getInteger("id"));
  509.         JSONArray jsonTasksArray = processJson.getJSONObject("processDefinitionJson").getJSONArray("tasks");
  510.         JSONArray copyJsonTasksArray = new JSONArray();
  511.         copyJsonTasksArray.addAll(jsonTasksArray);
  512.         JSONObject processDefinitionJson = new JSONObject();
  513.         String taskJsonString = dto.getContent().toString();
  514.         for (Object object : jsonTasksArray) {
  515.             JSONObject jsonObject = JSONObject.parseObject(object.toString());
  516.             if (Long.valueOf(jsonObject.getString("id").substring(SIX)) == dto.getId()) {
  517.                 String json = jsonObject.getString("json");
  518.                 json = taskJsonString;
  519.                 copyJsonTasksArray.remove(jsonObject);
  520.                 jsonObject.getJSONObject("params").put("json", json);
  521.                 copyJsonTasksArray.add(jsonObject);
  522.                 processDefinitionJson = processJson.getJSONObject("processDefinitionJson");
  523.                 processDefinitionJson.put("tasks", copyJsonTasksArray);
  524.             }
  525.         }
  526.         processDto.setProcessDefinitionJson(processDefinitionJson.toString());
  527.         return processDto;
  528.     }
  529.     /**
  530.      * packageProcessParamOfDelete
  531.      * @param processDto 工作流参数
  532.      * @param dto 任务参数
  533.      * @param processJson 工作流json
  534.      * @author liudz
  535.      * @date 2021/5/7
  536.      * @return ProcessDto
  537.      **/
  538.     public ProcessDto packageProcessParamOfDelete(ProcessDto processDto, JSONObject processJson, SyncConfigDto dto) {
  539.         processDto.setId(processJson.getInteger("id"));
  540.         JSONObject locationsJson = processJson.getJSONObject("locations");
  541.         JSONObject processDefinitionJson = processJson.getJSONObject("processDefinitionJson");
  542.         JSONArray processDefinitionArray = processDefinitionJson.getJSONArray("tasks");
  543.         JSONArray copyProcessDefinitionArray = new JSONArray();
  544.         copyProcessDefinitionArray.addAll(processDefinitionArray);
  545.         if (locationsJson.containsKey(DictionaryEnum.DATAX.getFiledString() + dto.getId())) {
  546.             locationsJson.remove("datax-" + dto.getId());
  547.             for (Object object : processDefinitionArray) {
  548.                 if (JSONObject.parseObject(object.toString()).getString("id").equals("datax-" + dto.getId())) {
  549.                     copyProcessDefinitionArray.remove(object);
  550.                 }
  551.             }
  552.             processDefinitionJson.put("tasks", copyProcessDefinitionArray);
  553.         }
  554.         processDto.setLocations(locationsJson.toString());
  555.         processDto.setProcessDefinitionJson(processDefinitionJson.toString());
  556.         return processDto;
  557.     }
  558.     /**
  559.      * 工作流【上线或者下线】
  560.      * @param projectName 项目名称
  561.      * @param processName 用户工作流名称
  562.      * @param processId 工作流ID
  563.      * @param releaseState 上下线状态操作【0:下线,1:上线】
  564.      * @author liudz
  565.      * @date 2021/5/7
  566.      * @return 执行结果
  567.      **/
  568.     public ReturnResult releaseProcessDefinition(String projectName, String processName, Integer processId,
  569.                   Integer releaseState) {
  570.         try {
  571.             String postURL = address + "/dolphinscheduler/projects/"
  572.                    + URLEncoder.encode(projectName, "utf-8") + "/process/release";
  573.             PostMethod postMethod = new PostMethod(postURL);
  574.             postMethod.setRequestHeader("Content-Type",
  575.                     "application/x-www-form-urlencoded;charset=utf-8");
  576.             postMethod.setRequestHeader("token", token);
  577.             // 参数设置,需要注意的就是里边不能传NULL,要传空字符串
  578.             NameValuePair[] data = {new NameValuePair("name", processName),
  579.                     new NameValuePair("processId", processId.toString()),
  580.                     new NameValuePair("releaseState", releaseState.toString())};
  581.             postMethod.setRequestBody(data);
  582.             org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
  583.             httpClient.executeMethod(postMethod);
  584.             JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
  585.             if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
  586.                 return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg"));
  587.             }
  588.         } catch (Exception e) {
  589.             log.info("请求异常:{}", e);
  590.         }
  591.         return ReturnResult.success();
  592.     }
  593.     /**
  594.      * 运行流程实例
  595.      * @param projectName 项目名称
  596.      * @param request request
  597.      * @param id 数据同步任务ID
  598.      * @author liudz
  599.      * @date 2021/5/7
  600.      * @return 执行结果
  601.      **/
  602.     @GetMapping("/project/process/datax/start")
  603.     public DolphinschedulerResponse startProcessDataxTask(
  604.             @RequestParam("projectName") String projectName, @RequestParam("id") Integer id,
  605.             HttpServletRequest request) {
  606.         try {
  607.             Long userId = Long.valueOf(request.getUserPrincipal().getName());
  608.             DolphinschedulerResponse processInfoList = getUserProcess(projectName);
  609.             if (processInfoList.getCode() != ZERO) {
  610.                 return processInfoList;
  611.             }
  612.             JSONObject processJson = new JSONObject();
  613.             log.info("--(1)getUserProcess--success:{}", processInfoList);
  614.             List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
  615.             for (Map<String, Object> map : list) {
  616.                 if (map.get("name").equals(userId + "-dataxTask")) {
  617.                     processJson.fluentPutAll(map);
  618.                 }
  619.             }
  620.             if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(OFFLINE)) {
  621.                 releaseProcessDefinition(projectName, userId + "-dataxTask",
  622.                         processJson.getInteger("id"), 1);
  623.                 log.info("--(2)releaseProcessDefinition--ONLINE--success");
  624.             }
  625.             String postURL = address + "/dolphinscheduler/projects/" + URLEncoder.encode(projectName, "utf-8")
  626.                    + "/executors/start-process-instance";
  627.             PostMethod postMethod = new PostMethod(postURL);
  628.             postMethod.setRequestHeader("Content-Type",
  629.                     "application/x-www-form-urlencoded;charset=utf-8");
  630.             postMethod.setRequestHeader("token", token);
  631.             // 参数设置,需要注意的就是里边不能传NULL,要传空字符串
  632.             NameValuePair[] data = packageNameValuePair(processJson, id);
  633.             postMethod.setRequestBody(data);
  634.             org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
  635.             httpClient.executeMethod(postMethod);
  636.             JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
  637.             log.info("--(2)startProcessInstance--result:{}", result);
  638.             if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
  639.                 return DolphinschedulerResponse.error(result.getInteger("code"), result.getString("msg"));
  640.             }
  641.         } catch (Exception e) {
  642.             log.info("请求异常:{}", e);
  643.         }
  644.         return DolphinschedulerResponse.success();
  645.     }
  646.     /**
  647.      *  packageNameValuePair封装参数
  648.      * @param processJson 工作流json
  649.      * @param dragSparkTaskId 任务ID
  650.      * @author liudz
  651.      * @date 2021/5/14
  652.      * @return NameValuePair
  653.      **/
  654.     public NameValuePair[] packageNameValuePair(JSONObject processJson, Integer dragSparkTaskId) {
  655.         NameValuePair[] data = {
  656.                 new NameValuePair("failureStrategy", "CONTINUE"),
  657.                 new NameValuePair("processDefinitionId", processJson.getString("id")),
  658.                 new NameValuePair("processInstancePriority", "MEDIUM"),
  659.                 new NameValuePair("warningGroupId", "0"),
  660.                 new NameValuePair("warningType", "NONE"),
  661.                 new NameValuePair("runMode", "RUN_MODE_SERIAL"),
  662.                 new NameValuePair("startNodeList", "datax-" + dragSparkTaskId),
  663.                 new NameValuePair("taskDependType", "TASK_POST"),
  664.                 new NameValuePair("workerGroup", "default")};
  665.         return data;
  666.     }
  667.     /**
  668.      * stopProcessDataxTask
  669.      * @param id id
  670.      * @param executeType executeType
  671.      * @param projectName 项目名称
  672.      * @return ReturnResult
  673.      * @author: liudz
  674.      * @author: lty update 2020/5/27
  675.      * @date: 2020/4/28 10:31
  676.      */
  677.     @GetMapping(value = "/project/process/datax/execute/{projectName}/{id}/{executeType}")
  678.     public DolphinschedulerResponse<String> stopProcessDataxTask(@PathVariable("projectName") String projectName,
  679.                                  @PathVariable("id") Long id, @PathVariable("executeType") String executeType) {
  680.         log.info("--(1)stopProcessDataxTask--begin--projectName:{},id:{},executeType:{}", projectName, id, executeType);
  681.         try {
  682.             HttpHeaders headers = new HttpHeaders();
  683.             headers.set("token", token);
  684.             headers.set("Content-Type", "application/json");
  685.             HttpEntity requestEntity = new HttpEntity(headers);
  686.             ResponseEntity<JSONObject> returnResult = restTemplate.exchange(address + "/"
  687.    + "dolphinscheduler/projects/" + projectName + "/task-instance/list-paging?"
  688.    + "pageNo=1&pageSize=100&taskName=datax-" + id, HttpMethod.GET, requestEntity, JSONObject.class);
  689.             List<Map<String, Object>> list =
  690.                     (List<Map<String, Object>>) returnResult.getBody().getJSONObject("data").get("totalList");
  691.             Integer processInstanceId = null;
  692.             for (Map<String, Object> map : list) {
  693.                 if (map.get("state").equals("RUNNING_EXEUTION")) {
  694.                     processInstanceId = Integer.valueOf(map.get("processInstanceId").toString());
  695.                 }
  696.             }
  697.             if (StringUtils.isEmpty(processInstanceId)) {
  698.                 return DolphinschedulerResponse.error(Msg.TASK_HAS_BEEN_STOPPED);
  699.             }
  700.             log.info("--(2)getProcessInstanceId--success--:{}", processInstanceId);
  701.             String postURL = address + "/dolphinscheduler/projects/"
  702.                     + URLEncoder.encode(projectName, "utf-8") + "/executors/execute";
  703.             PostMethod postMethod = new PostMethod(postURL);
  704.             postMethod.setRequestHeader("Content-Type",
  705.                     "application/x-www-form-urlencoded;charset=utf-8");
  706.             postMethod.setRequestHeader("token", token);
  707.             NameValuePair[] data = {new NameValuePair("executeType", executeType),
  708.                     new NameValuePair("processInstanceId", processInstanceId.toString())};
  709.             postMethod.setRequestBody(data);
  710.             org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
  711.             httpClient.executeMethod(postMethod);
  712.             JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
  713.             if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
  714.                 return DolphinschedulerResponse.error(result.getInteger("code"), result.getString("msg"));
  715.             }
  716.             log.info("--(3)stopProcessSparkTask--success--:{}", result);
  717.         } catch (UnsupportedEncodingException e) {
  718.             log.info("UnsupportedEncodingException:{}", e);
  719.         } catch (HttpException e) {
  720.             log.info("HttpException:{}", e);
  721.         } catch (IOException e) {
  722.             log.info("IOException:{}", e);
  723.         }
  724.         return DolphinschedulerResponse.success();
  725.     }
  726.     /**
  727.      * 查询全部同步任务配置(分页)
  728.      *
  729.      * @param form
  730.      *            name
  731.      * @param request
  732.      *            含有用户id
  733.      * @return 分页结果
  734.      */
  735.     @RequestMapping(value = "/project/process/datax/list", method = RequestMethod.POST)
  736.     public ReturnResult<PageResult<SyncConfigDto>> findAll(@RequestBody @Validated ConfigSelectForm form,
  737.                                                            HttpServletRequest request) {
  738.         Long userId = Long.valueOf(request.getUserPrincipal().getName());
  739.         return syncConfigService.list(form, userId);
  740.     }
  741.     /**
  742.      * 获取同步任务配置
  743.      *
  744.      * @param id
  745.      *            配置id
  746.      * @param request
  747.      *            用户id
  748.      * @return 添加结果
  749.      */
  750.     @RequestMapping(value = "/project/process/datax", method = RequestMethod.GET)
  751.     public ReturnResult<SyncConfigDto> findById(@RequestParam Long id, HttpServletRequest request) {
  752.         Long userId = Long.valueOf(request.getUserPrincipal().getName());
  753.         return syncConfigService.findById(id, userId);
  754.     }
  755. }
复制代码


ConfigAddForm
  1. package com.geespace.microservices.builder.request;
  2. import javax.validation.constraints.NotEmpty;
  3. import javax.validation.constraints.NotNull;
  4. import com.alibaba.fastjson.JSONArray;
  5. import com.alibaba.fastjson.JSONObject;
  6. import lombok.Data;
  7. /**
  8. * @Author: zjr
  9. * @Date: 2020-05-06 09:42
  10. * @Version 1.0
  11. */
  12. @Data
  13. public class ConfigAddForm {
  14.     /**
  15.      * 配置名称
  16.      */
  17.     @NotEmpty(message = "name不能为空")
  18.     private String name;
  19.     /**
  20.      * 配置描述
  21.      */
  22.     private String description;
  23.     /**
  24.      * 实时/全量/增量
  25.      */
  26.     @NotNull(message = "同步方式不能为空")
  27.     private int syncType;
  28.     /**
  29.      * reader 选择的数据源id
  30.      */
  31.     @NotNull(message = "读取数据源id不能为空")
  32.     private Long readerConfigId;
  33.     /**
  34.      * reader
  35.      */
  36.     @NotEmpty(message = "读取参数不能为空")
  37.     private JSONObject readerParam;
  38.     /**
  39.      * writer 选择的数据源id
  40.      */
  41.     @NotNull(message = "写入数据源id不能为空")
  42.     private Long writerConfigId;
  43.     /**
  44.      * writer
  45.      */
  46.     @NotEmpty(message = "写入参数不能为空")
  47.     private JSONObject writerParam;
  48.     /**
  49.      * reader:column left,writer:column right
  50.      */
  51.     @NotEmpty(message = "字段对照表不能为空")
  52.     private JSONArray columnMap;
  53.     private Long userId;
  54.     /**
  55.      *  项目名称
  56.      **/
  57.     String projectName;
  58.     /**
  59.      *  项目id
  60.      **/
  61.     @NotNull(message = "projectId not null")
  62.     Long projectId;
  63.     Long id;
  64. }
复制代码


ConfigUpdateForm
  1. package com.geespace.microservices.builder.request;
  2. import javax.validation.constraints.NotEmpty;
  3. import javax.validation.constraints.NotNull;
  4. import com.alibaba.fastjson.JSONArray;
  5. import com.alibaba.fastjson.JSONObject;
  6. import lombok.Data;
  7. /**
  8. * @Author: zjr
  9. * @Date: 2020-05-06 09:42
  10. * @Version 1.0
  11. */
  12. @Data
  13. public class ConfigUpdateForm {
  14.     @NotNull(message = "同步配置id不能为空")
  15.     private Long id;
  16.     /**
  17.      * 配置名称
  18.      */
  19.     @NotEmpty(message = "name不能为空")
  20.     private String name;
  21.     /**
  22.      * 配置描述
  23.      */
  24.     private String description;
  25.     /**
  26.      * 实时/全量/增量
  27.      */
  28.     @NotNull(message = "同步方式不能为空")
  29.     private int syncType;
  30.     /**
  31.      * reader 选择的数据源id
  32.      */
  33.     @NotNull(message = "读取数据源id不能为空")
  34.     private Long readerConfigId;
  35.     /**
  36.      * reader
  37.      */
  38.     @NotEmpty(message = "读取参数不能为空")
  39.     private JSONObject readerParam;
  40.     /**
  41.      * writer 选择的数据源id
  42.      */
  43.     @NotNull(message = "写入数据源id不能为空")
  44.     private Long writerConfigId;
  45.     /**
  46.      * writer
  47.      */
  48.     @NotEmpty(message = "写入参数不能为空")
  49.     private JSONObject writerParam;
  50.     /**
  51.      * reader:column left,writer:column right
  52.      */
  53.     @NotEmpty(message = "字段对照表不能为空")
  54.     private JSONArray columnMap;
  55.     private Long userId;
  56.     /**
  57.      *  项目id
  58.      **/
  59.     @NotNull(message = "projectId not null")
  60.     Long projectId;
  61.     /**
  62.      *  项目名称
  63.      **/
  64.     String projectName;
  65. }
复制代码


ProcessDto
  1. package com.geespace.microservices.builder.dto;
  2. import lombok.Data;
  3. import lombok.EqualsAndHashCode;
  4. import lombok.ToString;
  5. /**
  6. * dolphinscheduler调度器中工作流参数
  7. * @Author: liudz
  8. * @Date: 2020-03-23
  9. **/
  10. @Data
  11. @EqualsAndHashCode(callSuper = false)
  12. @ToString(callSuper = true)
  13. public class ProcessDto {
  14.     /**
  15.      * 流程定义ID
  16.      **/
  17.     private Integer id;
  18.     /**
  19.      * 流程定义节点图标连接信息(json格式)
  20.      **/
  21.     private String connects;
  22.     /**
  23.      * 流程定义节点坐标位置信息(json格式)
  24.      **/
  25.     private String locations;
  26.     /**
  27.      * 流程定义名称
  28.      **/
  29.     private String name;
  30.     /**
  31.      * 流程定义详细信息(json格式)
  32.      **/
  33.     private String processDefinitionJson;
  34.     /**
  35.      *  项目名称
  36.      **/
  37.     String projectName;
  38.     /**
  39.      *  项目id
  40.      **/
  41.     Long projectId;
  42. }
复制代码


SyncConfigDto
  1. package com.geespace.microservices.builder.dto;
  2. import com.alibaba.fastjson.JSONObject;
  3. import lombok.Data;
  4. /**
  5. * @Author: zjr
  6. * @Date: 2020-05-05 17:03
  7. * @Version 1.0
  8. */
  9. @Data
  10. public class SyncConfigDto {
  11.     private Long id;
  12.     /**
  13.      * 配置名称
  14.      */
  15.     private String name;
  16.     /**
  17.      * 配置描述
  18.      */
  19.     private String description;
  20.     /**
  21.      * 实时/全量/增量
  22.      */
  23.     private int syncType;
  24.     /**
  25.      * json base64
  26.      */
  27.     private JSONObject content;
  28.     /**
  29.      *  项目名称
  30.      **/
  31.     String projectName;
  32.     /**
  33.      *  项目id
  34.      **/
  35.     Long projectId;
  36. }
复制代码


SyncConfigService
  1. package com.geespace.microservices.builder.service;
  2. import com.geespace.microservices.builder.dto.SyncConfigDto;
  3. import com.geespace.microservices.builder.request.ConfigAddForm;
  4. import com.geespace.microservices.builder.request.ConfigSelectForm;
  5. import com.geespace.microservices.builder.request.ConfigUpdateForm;
  6. import com.geespace.microservices.builder.response.PageResult;
  7. import com.geespace.microservices.builder.response.ReturnResult;
  8. /**
  9. * @Author: zjr
  10. * @Date: 2020-05-05 13:59
  11. * @Version 1.0
  12. */
  13. public interface SyncConfigService {
  14.     /**
  15.      * 添加同步任务配置
  16.      *
  17.      * @param form
  18.      *            任务配置参数
  19.      * @return 添加结果
  20.      */
  21.     ReturnResult<SyncConfigDto> addConfig(ConfigAddForm form);
  22.     /**
  23.      * 修改同步任务配置
  24.      *
  25.      * @param form
  26.      *            任务配置参数(含id)
  27.      * @return 修改结果
  28.      */
  29.     ReturnResult<SyncConfigDto> updateConfig(ConfigUpdateForm form);
  30.     /**
  31.      * 查找同步任务配置
  32.      *
  33.      * @param id
  34.      *            同步任务配置id
  35.      * @param userId
  36.      *            用户id
  37.      * @return 查询结果
  38.      */
  39.     ReturnResult<SyncConfigDto> findById(Long id, Long userId);
  40.     /**
  41.      * 删除同步任务配置
  42.      *
  43.      * @param id
  44.      *            任务配置id
  45.      * @param userId
  46.      *            用户id
  47.      * @return 删除结果
  48.      */
  49.     ReturnResult delete(Long id, Long userId);
  50.     /**
  51.      * 查询全部同步任务配置(分页)
  52.      *
  53.      * @param form
  54.      *            name
  55.      * @param userId
  56.      *            用户id
  57.      * @return 分页结果
  58.      */
  59.     ReturnResult<PageResult<SyncConfigDto>> list(ConfigSelectForm form, Long userId);
  60. }
复制代码


SyncConfigServiceImpl
  1. package com.geespace.microservices.builder.service.impl;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.stream.Collectors;
  5. import com.alibaba.fastjson.JSONArray;
  6. import com.alibaba.fastjson.JSONObject;
  7. import com.geespace.microservices.builder.biz.Contants;
  8. import com.geespace.microservices.builder.dao.SyncConfigMapper;
  9. import com.geespace.microservices.builder.dto.ColumnMap;
  10. import com.geespace.microservices.builder.dto.SyncConfigDto;
  11. import com.geespace.microservices.builder.entity.SyncConfig;
  12. import com.geespace.microservices.builder.factory.BaseParamTool;
  13. import com.geespace.microservices.builder.factory.ParamToolFactory;
  14. import com.geespace.microservices.builder.request.ConfigAddForm;
  15. import com.geespace.microservices.builder.request.ConfigSelectForm;
  16. import com.geespace.microservices.builder.request.ConfigUpdateForm;
  17. import com.geespace.microservices.builder.response.BizCode;
  18. import com.geespace.microservices.builder.response.PageResult;
  19. import com.geespace.microservices.builder.response.ReturnResult;
  20. import com.geespace.microservices.builder.service.SyncConfigService;
  21. import com.geespace.microservices.datasource.dto.JdbcDataSourceDto;
  22. import com.geespace.microservices.datasource.response.Response;
  23. import com.geespace.microservices.datasource.service.JdbcDataSourceService;
  24. import com.github.pagehelper.PageHelper;
  25. import com.github.pagehelper.PageInfo;
  26. import lombok.extern.slf4j.Slf4j;
  27. import org.springframework.beans.BeanUtils;
  28. import org.springframework.beans.factory.annotation.Autowired;
  29. import org.springframework.stereotype.Service;
  30. import org.springframework.util.CollectionUtils;
  31. /**
  32. * @Author: zjr
  33. * @Date: 2020-05-05 13:59
  34. * @Version 1.0
  35. */
  36. @Service
  37. @Slf4j
  38. public class SyncConfigServiceImpl implements SyncConfigService {
  39.     public static final int ZERO = 0;
  40.     public static final String HBASE = "hbase";
  41.     @Autowired
  42.     private SyncConfigMapper syncConfigMapper;
  43.     @Autowired
  44.     private JdbcDataSourceService dataSourceService;
  45.     @Override
  46.     public ReturnResult<SyncConfigDto> addConfig(ConfigAddForm form) {
  47.         Integer checkResult = syncConfigMapper.checkNameUnique(form.getUserId(), form.getName(), null);
  48.         if (checkResult == ZERO) {
  49.             ColumnMap columnMap = makeColumnMap(form.getColumnMap());
  50.             if (columnMap == null) {
  51.                 return ReturnResult.error(BizCode.COLUMN_MATCHING_ERROR);
  52.             }
  53.             // 查询reader数据源 填充reader
  54.             JSONObject reader = packageReader(form.getReaderConfigId(), form.getReaderParam(), columnMap.getReader());
  55.             // 查询writer数据源 填充writer
  56.             JSONObject writer = packageWriter(form.getWriterConfigId(), form.getWriterParam(), columnMap.getWriter());
  57.             JSONArray contentArray = new JSONArray();
  58.             JSONObject content = new JSONObject();
  59.             content.put("reader", reader);
  60.             content.put("writer", writer);
  61.             contentArray.add(content);
  62.             SyncConfig syncConfig = new SyncConfig();
  63.             syncConfig.setContent(packageJob(contentArray));
  64.             syncConfig.setName(form.getName());
  65.             syncConfig.setDescription(form.getDescription());
  66.             syncConfig.setSyncType(form.getSyncType());
  67.             syncConfig.setCreatedTimestamp(System.currentTimeMillis());
  68.             syncConfig.setCreatedUser(form.getUserId());
  69.             syncConfig.setModifiedTimestamp(System.currentTimeMillis());
  70.             syncConfig.setProjectName(form.getProjectName());
  71.             syncConfig.setProjectId(form.getProjectId());
  72.             syncConfigMapper.insert(syncConfig);
  73.             return ReturnResult.success(entityToDto(syncConfig));
  74.         }
  75.         log.error("SyncConfigServiceImpl--addConfig--NAME_IS_EXIST!");
  76.         return ReturnResult.error(BizCode.NAME_IS_EXIST);
  77.     }
  78.     @Override
  79.     public ReturnResult<SyncConfigDto> updateConfig(ConfigUpdateForm form) {
  80.         SyncConfig syncConfig = syncConfigMapper.findById(form.getId());
  81.         if (syncConfig == null || syncConfig.getCreatedUser() != form.getUserId()) {
  82.             return ReturnResult.error(BizCode.UPDATE_OBJECT_NOT_EXIST);
  83.         }
  84.         Integer checkResult = syncConfigMapper.checkNameUnique(form.getUserId(), form.getName(), form.getId());
  85.         if (checkResult == ZERO) {
  86.             ColumnMap columnMap = makeColumnMap(form.getColumnMap());
  87.             JSONObject reader = packageReader(form.getReaderConfigId(), form.getReaderParam(), columnMap.getReader());
  88.             JSONObject writer = packageWriter(form.getWriterConfigId(), form.getWriterParam(), columnMap.getWriter());
  89.             JSONArray contentArray = new JSONArray();
  90.             JSONObject content = new JSONObject();
  91.             content.put("reader", reader);
  92.             content.put("writer", writer);
  93.             contentArray.add(content);
  94.             syncConfig.setContent(packageJob(contentArray));
  95.             syncConfig.setName(form.getName());
  96.             syncConfig.setDescription(form.getDescription());
  97.             syncConfig.setSyncType(form.getSyncType());
  98.             syncConfig.setModifiedTimestamp(System.currentTimeMillis());
  99.             syncConfig.setProjectName(form.getProjectName());
  100.             syncConfig.setProjectId(form.getProjectId());
  101.             syncConfigMapper.update(syncConfig);
  102.             return ReturnResult.success(entityToDto(syncConfig));
  103.         }
  104.         log.error("SyncConfigServiceImpl--updateConfig--NAME_IS_EXIST!");
  105.         return ReturnResult.error(BizCode.NAME_IS_EXIST);
  106.     }
  107.     @Override
  108.     public ReturnResult<SyncConfigDto> findById(Long id, Long userId) {
  109.         SyncConfig syncConfig = syncConfigMapper.findById(id);
  110.         if (syncConfig == null || syncConfig.getCreatedUser() != userId) {
  111.             return ReturnResult.success(new SyncConfigDto());
  112.         }
  113.         return ReturnResult.success(entityToDto(syncConfig));
  114.     }
  115.     @Override
  116.     public ReturnResult delete(Long id, Long userId) {
  117.         log.debug("****id:{},userId:{}****", id, userId);
  118.         SyncConfig syncConfig = syncConfigMapper.findById(id);
  119.         log.debug("****syncConfig:{}****", syncConfig);
  120.         log.debug("****syncConfig != null:{}", syncConfig != null);
  121.         log.debug("****syncConfig.getCreatedUser():{},userId:{},syncConfig.getCreatedUser().equals(userId):{}",
  122.                 syncConfig.getCreatedUser(), userId, syncConfig.getCreatedUser().equals(userId));
  123.         if (syncConfig != null && syncConfig.getCreatedUser().equals(userId)) {
  124.             syncConfigMapper.delete(id);
  125.             log.debug("****delete success!");
  126.         }
  127.         return ReturnResult.success();
  128.     }
  129.     @Override
  130.     public ReturnResult<PageResult<SyncConfigDto>> list(ConfigSelectForm form, Long userId) {
  131.         SyncConfig syncConfig = new SyncConfig();
  132.         syncConfig.setCreatedUser(userId);
  133.         syncConfig.setName(form.getName());
  134.         syncConfig.setProjectId(form.getProjectId());
  135.         PageHelper.startPage(form.getPageNum(), form.getPageSize());
  136.         PageInfo<SyncConfig> configPageInfo = new PageInfo<>(syncConfigMapper.list(syncConfig));
  137.         PageResult<SyncConfigDto> result = new PageResult<>();
  138.         result.setPageNum(configPageInfo.getPageNum());
  139.         result.setPageSize(configPageInfo.getPageSize());
  140.         result.setTotalCount(configPageInfo.getTotal());
  141.         result.setTotalPage(configPageInfo.getPages());
  142.         List<SyncConfigDto> dtoList =
  143.             configPageInfo.getList().stream().map(this::entityToDto).collect(Collectors.toList());
  144.         result.setList(dtoList);
  145.         return ReturnResult.success(result);
  146.     }
  147.     /**
  148.      * 将reader writer对照list查分成2个独立list(保持顺序)
  149.      *
  150.      * @param columnMap
  151.      *            [{"reader":"col l1","writer":"col r1"},{"reader":"col l2","writer":"col r2"}]
  152.      * @return object contants reader(list<String>) and writer(list<String>)
  153.      */
  154.     private ColumnMap makeColumnMap(JSONArray columnMap) {
  155.         List<String> readerColumns = new ArrayList<>();
  156.         List<String> writerColumns = new ArrayList<>();
  157.         for (int i = 0; i < columnMap.size(); i++) {
  158.             JSONObject column = columnMap.getJSONObject(i);
  159.             readerColumns.add(column.getString("reader"));
  160.             writerColumns.add(column.getString("writer"));
  161.         }
  162.         if (CollectionUtils.isEmpty(readerColumns) || CollectionUtils.isEmpty(writerColumns)) {
  163.             return null;
  164.         }
  165.         ColumnMap column = new ColumnMap();
  166.         column.setReader(readerColumns);
  167.         column.setWriter(writerColumns);
  168.         return column;
  169.     }
  170.     /**
  171.      * 封装reader json
  172.      *
  173.      * @param readerConfigId
  174.      *            数据源id
  175.      * @param readerParam
  176.      *            页面填写reader 配置属性信息(table、where...)
  177.      * @param readerColumns
  178.      *            选择的数据字段
  179.      * @return reader json
  180.      */
  181.     private JSONObject packageReader(Long readerConfigId, JSONObject readerParam, List<String> readerColumns) {
  182.         Response<JdbcDataSourceDto> descrypt = dataSourceService.findDescrypt(readerConfigId);
  183.         if (!descrypt.responseSuccess()) {
  184.             return null;
  185.         }
  186.         JdbcDataSourceDto jdbcDataSource = descrypt.getInfo();
  187.         String sourceType = jdbcDataSource.getSourceType();
  188.         BaseParamTool baseParamTool = ParamToolFactory.getByType(sourceType);
  189.         JSONObject reader = baseParamTool.makeReaderJson(jdbcDataSource, readerParam, readerColumns);
  190.         return reader;
  191.     }
  192.     /**
  193.      * 封装writer json
  194.      *
  195.      * @param writerConfigId
  196.      *            数据源id
  197.      * @param writerParam
  198.      *            页面填写writer 配置属性信息(table、where...)
  199.      * @param writerColumns
  200.      *            选择的映射字段
  201.      * @return writer json
  202.      */
  203.     private JSONObject packageWriter(Long writerConfigId, JSONObject writerParam, List<String> writerColumns) {
  204.         Response<JdbcDataSourceDto> descrypt = dataSourceService.findDescrypt(writerConfigId);
  205.         if (!descrypt.responseSuccess()) {
  206.             return null;
  207.         }
  208.         JdbcDataSourceDto jdbcDataSource = descrypt.getInfo();
  209.         String sourceType = jdbcDataSource.getSourceType();
  210.         BaseParamTool baseParamTool = ParamToolFactory.getByType(sourceType);
  211.         JSONObject writer = baseParamTool.makeWriterJson(jdbcDataSource, writerParam, writerColumns);
  212.         return writer;
  213.     }
  214.     /**
  215.      * 封装执行job json
  216.      *
  217.      * @param content
  218.      *            reader and writer
  219.      * @return job
  220.      */
  221.     private JSONObject packageJob(JSONArray content) {
  222.         JSONObject job = new JSONObject();
  223.         JSONObject setting = new JSONObject();
  224.         JSONObject speed = new JSONObject();
  225.         speed.put("channel", 1);
  226.         JSONObject errorLimit = new JSONObject();
  227.         errorLimit.put("record", 0);
  228.         errorLimit.put("percentage", Contants.PERCENTAGE);
  229.         setting.put("speed", speed);
  230.         setting.put("errorLimit", errorLimit);
  231.         job.put("setting", setting);
  232.         job.put("content", content);
  233.         JSONObject jobContent = new JSONObject();
  234.         jobContent.put("job", job);
  235.         return jobContent;
  236.     }
  237.     /**
  238.      * entity转dto
  239.      *
  240.      * @param syncConfig
  241.      *            entity
  242.      * @return dto
  243.      */
  244.     private SyncConfigDto entityToDto(SyncConfig syncConfig) {
  245.         SyncConfigDto configDto = new SyncConfigDto();
  246.         BeanUtils.copyProperties(syncConfig, configDto);
  247.         return configDto;
  248.     }
  249. }
复制代码

SyncConfigMapper
  1. package com.geespace.microservices.builder.dao;
  2. import java.util.List;
  3. import com.geespace.microservices.builder.entity.SyncConfig;
  4. import org.apache.ibatis.annotations.Delete;
  5. import org.apache.ibatis.annotations.Insert;
  6. import org.apache.ibatis.annotations.Mapper;
  7. import org.apache.ibatis.annotations.Result;
  8. import org.apache.ibatis.annotations.ResultMap;
  9. import org.apache.ibatis.annotations.Results;
  10. import org.apache.ibatis.annotations.Select;
  11. import org.apache.ibatis.annotations.SelectKey;
  12. import org.apache.ibatis.annotations.SelectProvider;
  13. import org.apache.ibatis.annotations.Update;
  14. import org.apache.ibatis.type.JdbcType;
  15. /**
  16. * @Author: zjr
  17. * @Date: 2020-05-05 10:40
  18. * @Version 1.0
  19. */
  20. @Mapper
  21. public interface SyncConfigMapper {
  22.     /**
  23.      * 插入一条数据
  24.      *
  25.      * @param syncConfig
  26.      *            插入对象
  27.      * @return 结果
  28.      */
  29.     @Insert({"insert into sync_config (name, description, content, sync_type, created_timestamp, created_user, ",
  30.         "modified_timestamp,project_id,project_name) values (#{name,jdbcType=VARCHAR},#{description,jdbcType=VARCHAR},",
  31.         "#{content,jdbcType=OTHER, typeHandler=com.geespace.microservices.builder.handler.MySqlJsonHandler}, ",
  32.         "#{syncType,jdbcType=TINYINT}, #{createdTimestamp,jdbcType=BIGINT}, #{createdUser,jdbcType=BIGINT}, ",
  33.         "#{modifiedTimestamp,jdbcType=BIGINT},#{projectId},#{projectName})"})
  34.     @SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "id", before = false, resultType = Long.class)
  35.     Long insert(SyncConfig syncConfig);
  36.     /**
  37.      * 更新数据
  38.      *
  39.      * @param syncConfig
  40.      *            插入对象
  41.      * @return 结果
  42.      */
  43.     @Update({"update sync_config set ",
  44.         "name = #{name,jdbcType=VARCHAR}, description = #{description,jdbcType=VARCHAR}, ",
  45.         "sync_type = #{syncType,jdbcType=TINYINT}, content = #{content,jdbcType=OTHER,",
  46.         "typeHandler=com.geespace.microservices.builder.handler.MySqlJsonHandler},project_id=#{projectId}, ",
  47.         "created_timestamp = #{createdTimestamp,jdbcType=BIGINT}, created_user = #{createdUser,jdbcType=BIGINT}, ",
  48.         "modified_timestamp = #{modifiedTimestamp,jdbcType=BIGINT},project_name=#{projectName},project_id=#{projectId}",
  49.         " where id = #{id,jdbcType=BIGINT}"})
  50.     int update(SyncConfig syncConfig);
  51.     /**
  52.      * 删除数据
  53.      *
  54.      * @param id
  55.      *            config id
  56.      * @return 影响行数
  57.      */
  58.     @Delete("delete from sync_config where id = #{id,jdbcType=BIGINT}")
  59.     int delete(Long id);
  60.     /**
  61.      * 查询
  62.      *
  63.      * @param syncConfig
  64.      *            name
  65.      * @return list结果
  66.      */
  67.     @SelectProvider(type = SyncConfigSqlProvider.class, method = "select")
  68.     @Results(id = "resultMap",
  69.         value = {@Result(column = "id", property = "id", jdbcType = JdbcType.BIGINT, id = true),
  70.             @Result(column = "name", property = "name", jdbcType = JdbcType.VARCHAR),
  71.             @Result(column = "description", property = "description", jdbcType = JdbcType.VARCHAR),
  72.             @Result(column = "sync_type", property = "syncType", jdbcType = JdbcType.TINYINT),
  73.             @Result(column = "content", property = "content", jdbcType = JdbcType.OTHER,
  74.                 typeHandler = com.geespace.microservices.builder.handler.MySqlJsonHandler.class),
  75.             @Result(column = "created_timestamp", property = "createdTimestamp", jdbcType = JdbcType.BIGINT),
  76.             @Result(column = "created_user", property = "createdUser", jdbcType = JdbcType.BIGINT),
  77.             @Result(column = "project_id", property = "projectId", jdbcType = JdbcType.BIGINT),
  78.             @Result(column = "project_name", property = "projectName", jdbcType = JdbcType.VARCHAR),
  79.             @Result(column = "modified_timestamp", property = "modifiedTimestamp", jdbcType = JdbcType.BIGINT)})
  80.     List<SyncConfig> list(SyncConfig syncConfig);
  81.     /**
  82.      * id 查询
  83.      *
  84.      * @param id
  85.      *            id
  86.      * @return 结果
  87.      */
  88.     @Select({"select id,project_id,project_name,name, description, sync_type, content,"
  89.             + " created_timestamp, created_user, modified_timestamp ",
  90.         "from sync_config where id = #{id,jdbcType=BIGINT}"})
  91.     @ResultMap("resultMap")
  92.     SyncConfig findById(Long id);
  93.     /**
  94.      * 校验任务名称唯一性,用于新增功能
  95.      * @author: liudz
  96.      * @param createdUser 用户ID
  97.      * @param name 任务名称
  98.      * @param id 任务ID
  99.      * @date: 2020/7/23
  100.      * @return SparkTask
  101.      */
  102.     @SelectProvider(type = SyncConfigSqlProvider.class, method = "checkNameUnique")
  103.     Integer checkNameUnique(Long createdUser, String name, Long id);
  104. }
复制代码


SyncConfigSqlProvider
  1. package com.geespace.microservices.builder.dao;
  2. import com.geespace.microservices.builder.entity.SyncConfig;
  3. import org.apache.commons.lang3.StringUtils;
  4. import org.apache.ibatis.jdbc.SQL;
  5. /**
  6. * @Author: zjr
  7. * @Date: 2020-05-22 13:35
  8. * @Version 1.0
  9. */
  10. public class SyncConfigSqlProvider {
  11.     /**
  12.      * 条件查询
  13.      *
  14.      * @param syncConfig
  15.      *            name
  16.      * @return sql
  17.      */
  18.     public String select(SyncConfig syncConfig) {
  19.         SQL sql = new SQL();
  20.         sql.SELECT("id,project_id,project_name,name, description, sync_type, content, created_timestamp,"
  21.                 + " created_user, modified_timestamp");
  22.         sql.FROM("sync_config");
  23.         sql.WHERE("created_user = #{createdUser,jdbcType=BIGINT}");
  24.         if (!org.springframework.util.StringUtils.isEmpty(syncConfig.getProjectId())) {
  25.             sql.WHERE("project_id=#{projectId}");
  26.         }
  27.         if (!StringUtils.isBlank(syncConfig.getName())) {
  28.             sql.WHERE("name like concat('%', #{name,jdbcType=VARCHAR}, '%')");
  29.         }
  30.         sql.ORDER_BY("id desc");
  31.         return sql.toString();
  32.     }
  33.     /**
  34.      * 校验任务名称唯一性,用于新增功能
  35.      *
  36.      * @author: liudz
  37.      * @date 2019/12/3
  38.      * @author: liudz
  39.      * @param createdUser 用户ID
  40.      * @param name 任务名称
  41.      * @param id 任务ID
  42.      * @return sql
  43.      */
  44.     public String checkNameUnique(Long createdUser, String name, Long id) {
  45.         SQL sql = new SQL();
  46.         sql.SELECT("COUNT(name)");
  47.         sql.FROM("sync_config");
  48.         if (!org.springframework.util.StringUtils.isEmpty(id)) {
  49.             sql.WHERE("id != #{id}");
  50.         }
  51.         sql.WHERE("created_user=#{createdUser} and name=#{name}");
  52.         return sql.toString();
  53.     }
  54. }
复制代码

JdbcDataSourceService
  1. package com.geespace.microservices.datasource.service;
  2. import java.util.List;
  3. import com.geespace.microservices.datasource.dto.JdbcDataSourceDto;
  4. import com.geespace.microservices.datasource.form.datasource.DataSourceAddForm;
  5. import com.geespace.microservices.datasource.form.datasource.DataSourceSelectForm;
  6. import com.geespace.microservices.datasource.form.datasource.DataSourceUpdateForm;
  7. import com.geespace.microservices.datasource.response.PageResult;
  8. import com.geespace.microservices.datasource.response.Response;
  9. /**
  10. * @Author: zjr
  11. * @Date: 2020-04-07 17:44
  12. * @Version 1.0
  13. */
  14. public interface JdbcDataSourceService {
  15.     /**
  16.      * 添加数据源信息
  17.      *
  18.      * @param dataSourceAddForm
  19.      *            数据源信息
  20.      * @return 添加成功的信息
  21.      */
  22.     Response<JdbcDataSourceDto> addDataSource(DataSourceAddForm dataSourceAddForm);
  23.     /**
  24.      * 修改数据源信息
  25.      *
  26.      * @param dataSourceUpdateForm
  27.      *            数据源信息
  28.      * @return 修改后的信息
  29.      */
  30.     Response<JdbcDataSourceDto> updateDataSource(DataSourceUpdateForm dataSourceUpdateForm);
  31.     /**
  32.      * 删除数据源信息
  33.      *
  34.      * @param id
  35.      *            数据源id
  36.      * @return 删除是否成功
  37.      */
  38.     Response deleteDataSource(Long id);
  39.     /**
  40.      * 数据源列表查询-全量
  41.      *
  42.      * @param creator
  43.      *            创建者
  44.      * @return 全量列表
  45.      */
  46.     Response<List<JdbcDataSourceDto>> list(Long creator);
  47.     /**
  48.      * 数据源列表查询-按类型查询
  49.      *
  50.      * @param type
  51.      *            数据源类型
  52.      * @param creator
  53.      *            创建者
  54.      * @return 全量列表
  55.      */
  56.     Response<List<JdbcDataSourceDto>> listByType(Long creator, List<String> type);
  57.     /**
  58.      * 内部数据源列表查询-全量
  59.      *
  60.      * @return 全量列表
  61.      */
  62.     Response<List<JdbcDataSourceDto>> listMeta();
  63.     /**
  64.      * 数据源列表查询-分页
  65.      *
  66.      * @param form
  67.      *            查询条件
  68.      * @return 分页列表
  69.      */
  70.     Response<PageResult<JdbcDataSourceDto>> select(DataSourceSelectForm form);
  71.     /**
  72.      * 通过id查找数据源
  73.      *
  74.      * @param id
  75.      *            数据源id
  76.      * @return 查询结果
  77.      */
  78.     Response<JdbcDataSourceDto> find(Long id);
  79.     /**
  80.      * 通过id查找元数据源
  81.      *
  82.      * @param id
  83.      *            数据源id
  84.      * @return 查询结果
  85.      */
  86.     Response<JdbcDataSourceDto> findMetaDataSource(Long id);
  87.     /**
  88.      * 通过id查找数据源-明文
  89.      *
  90.      * @param id
  91.      *            数据源id
  92.      * @return 查询结果
  93.      */
  94.     Response<JdbcDataSourceDto> findDescrypt(Long id);
  95. }
复制代码

JdbcDataSourceServiceImpl
  1. package com.geespace.microservices.datasource.service.impl;
  2. import java.util.Date;
  3. import java.util.List;
  4. import java.util.stream.Collectors;
  5. import com.geespace.microservices.datasource.dao.DataSourceMapper;
  6. import com.geespace.microservices.datasource.dao.MetaDataSourceMapper;
  7. import com.geespace.microservices.datasource.dto.JdbcDataSourceDto;
  8. import com.geespace.microservices.datasource.entity.JdbcDataSource;
  9. import com.geespace.microservices.datasource.enums.JdbcDataSourceStatusEnum;
  10. import com.geespace.microservices.datasource.form.datasource.DataSourceAddForm;
  11. import com.geespace.microservices.datasource.form.datasource.DataSourceSelectForm;
  12. import com.geespace.microservices.datasource.form.datasource.DataSourceUpdateForm;
  13. import com.geespace.microservices.datasource.response.Msg;
  14. import com.geespace.microservices.datasource.response.PageResult;
  15. import com.geespace.microservices.datasource.response.Response;
  16. import com.geespace.microservices.datasource.service.JdbcDataSourceService;
  17. import com.geespace.microservices.datasource.util.AesUtil;
  18. import com.geespace.microservices.datasource.util.LocalCacheUtil;
  19. import com.github.pagehelper.PageHelper;
  20. import com.github.pagehelper.PageInfo;
  21. import lombok.extern.slf4j.Slf4j;
  22. import org.apache.commons.lang.StringUtils;
  23. import org.springframework.beans.BeanUtils;
  24. import org.springframework.beans.factory.annotation.Autowired;
  25. import org.springframework.stereotype.Service;
  26. /**
  27. * @Author: zjr
  28. * @Date: 2020-04-07 17:45
  29. * @Version 1.0
  30. */
  31. @Slf4j
  32. @Service
  33. public class JdbcDataSourceServiceImpl implements JdbcDataSourceService {
  34.     @Autowired
  35.     private DataSourceMapper dataSourceMapper;
  36.     @Autowired
  37.     private MetaDataSourceMapper metaDataSourceMapper;
  38.     @Override
  39.     public Response<JdbcDataSourceDto> addDataSource(DataSourceAddForm dataSourceAddForm) {
  40.         JdbcDataSource dataSource = new JdbcDataSource();
  41.         BeanUtils.copyProperties(dataSourceAddForm, dataSource);
  42.         JdbcDataSource exist = dataSourceMapper.nameExist(dataSource);
  43.         if (exist != null) {
  44.             return Response.error(Msg.DATASOURCE_NAME_ALREAD_EXIST);
  45.         }
  46.         String userName = AesUtil.decrypt(dataSource.getUserName());
  47.         // 判断账密是否为密文
  48.         if (userName == null) {
  49.             dataSource.setUserName(AesUtil.encrypt(dataSource.getUserName()));
  50.         }
  51.         String pwd = AesUtil.decrypt(dataSource.getPassword());
  52.         if (pwd == null) {
  53.             dataSource.setPassword(AesUtil.encrypt(dataSource.getPassword()));
  54.         }
  55.         dataSource.setCreateTime(new Date());
  56.         dataSource.setUpdateTime(new Date());
  57.         dataSource.setStatus(JdbcDataSourceStatusEnum.USING.getStatus());
  58.         dataSourceMapper.insert(dataSource);
  59.         JdbcDataSourceDto dataSourceDto = new JdbcDataSourceDto();
  60.         BeanUtils.copyProperties(dataSource, dataSourceDto);
  61.         return Response.success(dataSourceDto);
  62.     }
  63.     @Override
  64.     public Response<JdbcDataSourceDto> updateDataSource(DataSourceUpdateForm dataSourceUpdateForm) {
  65.         JdbcDataSource dataSource = dataSourceMapper.find(dataSourceUpdateForm.getId());
  66.         if (dataSource == null || dataSourceUpdateForm.getCreator() != dataSource.getCreator()) {
  67.             return Response.error(Msg.DATASOURCE_NOT_EXIST);
  68.         }
  69.         String userName = AesUtil.decrypt(dataSourceUpdateForm.getUserName());
  70.         // 判断账密是否为密文
  71.         if (userName == null) {
  72.             dataSourceUpdateForm.setUserName(AesUtil.encrypt(dataSourceUpdateForm.getUserName()));
  73.         }
  74.         String pwd = AesUtil.decrypt(dataSourceUpdateForm.getPassword());
  75.         if (pwd == null) {
  76.             dataSourceUpdateForm.setPassword(AesUtil.encrypt(dataSourceUpdateForm.getPassword()));
  77.         }
  78.         String originName = dataSource.getSourceName();
  79.         // 注意copyProperties是将source中的属性全部copy到target中
  80.         BeanUtils.copyProperties(dataSourceUpdateForm, dataSource);
  81.         JdbcDataSource exist = dataSourceMapper.nameExist(dataSource);
  82.         if (exist != null && !exist.getSourceName().equals(originName)) {
  83.             return Response.error(Msg.DATASOURCE_NAME_ALREAD_EXIST);
  84.         }
  85.         dataSource.setUpdateTime(new Date());
  86.         dataSourceMapper.update(dataSource);
  87.         LocalCacheUtil.remove(dataSource.getCreator() + originName);
  88.         JdbcDataSourceDto dataSourceDto = new JdbcDataSourceDto();
  89.         BeanUtils.copyProperties(dataSource, dataSourceDto);
  90.         return Response.success(dataSourceDto);
  91.     }
  92.     @Override
  93.     public Response deleteDataSource(Long id) {
  94.         dataSourceMapper.delete(id);
  95.         return Response.success();
  96.     }
  97.     @Override
  98.     public Response<List<JdbcDataSourceDto>> list(Long creator) {
  99.         List<JdbcDataSource> list = dataSourceMapper.list(creator);
  100.         List<JdbcDataSourceDto> listDto = list.stream().map(this::getDto).collect(Collectors.toList());
  101.         return Response.success(listDto);
  102.     }
  103.     @Override
  104.     public Response<List<JdbcDataSourceDto>> listByType(Long creator, List<String> type) {
  105.         List<JdbcDataSource> list = dataSourceMapper.listByType(creator, type);
  106.         List<JdbcDataSourceDto> listDto = list.stream().map(this::getDto).collect(Collectors.toList());
  107.         return Response.success(listDto);
  108.     }
  109.     @Override
  110.     public Response<List<JdbcDataSourceDto>> listMeta() {
  111.         List<JdbcDataSource> list = metaDataSourceMapper.list();
  112.         List<JdbcDataSourceDto> listDto = list.stream().map(this::getDto).collect(Collectors.toList());
  113.         return Response.success(listDto);
  114.     }
  115.     @Override
  116.     public Response<PageResult<JdbcDataSourceDto>> select(DataSourceSelectForm form) {
  117.         JdbcDataSource select = new JdbcDataSource();
  118.         select.setSourceName(form.getSourceName());
  119.         select.setCreator(form.getCreator());
  120.         PageHelper.startPage(form.getPageNum(), form.getPageSize());
  121.         List<JdbcDataSource> list = dataSourceMapper.select(select);
  122.         PageInfo<JdbcDataSource> pageInfo = new PageInfo<>(list);
  123.         PageResult<JdbcDataSourceDto> pageResult = new PageResult<>();
  124.         pageResult.setPageNum(pageInfo.getPageNum());
  125.         pageResult.setPageSize(pageInfo.getPageSize());
  126.         pageResult.setTotalPage(pageInfo.getPages());
  127.         pageResult.setTotalCount(pageInfo.getTotal());
  128.         pageResult.setList(pageInfo.getList().stream().map(this::getDto).collect(Collectors.toList()));
  129.         return Response.success(pageResult);
  130.     }
  131.     @Override
  132.     public Response<JdbcDataSourceDto> find(Long id) {
  133.         JdbcDataSource jdbcDataSource = dataSourceMapper.find(id);
  134.         if (jdbcDataSource == null) {
  135.             return Response.error(Msg.DATASOURCE_NOT_EXIST);
  136.         }
  137.         return Response.success(getDto(jdbcDataSource));
  138.     }
  139.     @Override
  140.     public Response<JdbcDataSourceDto> findMetaDataSource(Long id) {
  141.         JdbcDataSource jdbcDataSource = this.metaDataSourceMapper.find(id);
  142.         if (jdbcDataSource == null) {
  143.             return Response.error(Msg.DATASOURCE_NOT_EXIST);
  144.         }
  145.         return Response.success(getDto(jdbcDataSource));
  146.     }
  147.     @Override
  148.     public Response<JdbcDataSourceDto> findDescrypt(Long id) {
  149.         JdbcDataSource jdbcDataSource = dataSourceMapper.find(id);
  150.         if (jdbcDataSource == null) {
  151.             return Response.error(Msg.DATASOURCE_NOT_EXIST);
  152.         }
  153.         if (!StringUtils.isBlank(jdbcDataSource.getUserName())) {
  154.             jdbcDataSource.setUserName(AesUtil.decrypt(jdbcDataSource.getUserName()));
  155.         }
  156.         if (!StringUtils.isBlank(jdbcDataSource.getPassword())) {
  157.             jdbcDataSource.setPassword(AesUtil.decrypt(jdbcDataSource.getPassword()));
  158.         }
  159.         return Response.success(getDto(jdbcDataSource));
  160.     }
  161.     /**
  162.      * 获取dto
  163.      *
  164.      * @param jdbcDataSource
  165.      *            source
  166.      * @return dto
  167.      */
  168.     private JdbcDataSourceDto getDto(JdbcDataSource jdbcDataSource) {
  169.         JdbcDataSourceDto dto = new JdbcDataSourceDto();
  170.         BeanUtils.copyProperties(jdbcDataSource, dto);
  171.         return dto;
  172.     }
  173. }
复制代码

DataSourceMapper
  1. package com.geespace.microservices.datasource.dao;
  2. import java.util.List;
  3. import com.geespace.microservices.datasource.entity.JdbcDataSource;
  4. import org.apache.ibatis.annotations.Delete;
  5. import org.apache.ibatis.annotations.Insert;
  6. import org.apache.ibatis.annotations.Mapper;
  7. import org.apache.ibatis.annotations.Options;
  8. import org.apache.ibatis.annotations.Param;
  9. import org.apache.ibatis.annotations.Result;
  10. import org.apache.ibatis.annotations.ResultMap;
  11. import org.apache.ibatis.annotations.Results;
  12. import org.apache.ibatis.annotations.Select;
  13. import org.apache.ibatis.annotations.SelectKey;
  14. import org.apache.ibatis.annotations.SelectProvider;
  15. import org.apache.ibatis.annotations.Update;
  16. import org.apache.ibatis.type.JdbcType;
  17. /**
  18. * @Author: zjr
  19. * @Date: 2020-04-07 17:05
  20. * @Version 1.0
  21. */
  22. @Mapper
  23. public interface DataSourceMapper {
  24.     /**
  25.      * 添加数据源信息
  26.      *
  27.      * @param source
  28.      *            数据源
  29.      * @return id
  30.      */
  31.     @Insert({
  32.         "insert into ge_jdbc_datasource (source_type, source_name, jdbc_url, user_name, password, zk_address, znode, ",
  33.         "database_name, jdbc_driver_class, remark, creator, create_time, update_time, status)",
  34.         "values (#{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{jdbcUrl,jdbcType=VARCHAR}, ",
  35.         "#{userName,jdbcType=VARCHAR}, #{password,jdbcType=VARCHAR}, #{zkAddress,jdbcType=VARCHAR}, ",
  36.         "#{znode,jdbcType=VARCHAR}, #{databaseName,jdbcType=VARCHAR}, #{jdbcDriverClass,jdbcType=VARCHAR}, ",
  37.         "#{remark,jdbcType=VARCHAR}, #{creator,jdbcType=BIGINT}, #{createTime,jdbcType=TIMESTAMP}, ",
  38.         "#{updateTime,jdbcType=TIMESTAMP}, #{status,jdbcType=TINYINT})"})
  39.     @SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "id", before = false, resultType = Long.class)
  40.     int insert(JdbcDataSource source);
  41.     /**
  42.      * 修改数据源
  43.      *
  44.      * @param source
  45.      *            全量修改
  46.      * @return 影响行数
  47.      */
  48.     @Update({"update ge_jdbc_datasource",
  49.         "set source_type = #{sourceType,jdbcType=VARCHAR}, source_name = #{sourceName,jdbcType=VARCHAR}, ",
  50.         "jdbc_url = #{jdbcUrl,jdbcType=VARCHAR}, user_name = #{userName,jdbcType=VARCHAR}, ",
  51.         "password = #{password,jdbcType=VARCHAR}, zk_address = #{zkAddress,jdbcType=VARCHAR}, ",
  52.         "znode = #{znode,jdbcType=VARCHAR}, database_name = #{databaseName,jdbcType=VARCHAR}, ",
  53.         "jdbc_driver_class = #{jdbcDriverClass,jdbcType=VARCHAR}, remark = #{remark,jdbcType=VARCHAR}, ",
  54.         "update_time = #{updateTime,jdbcType=TIMESTAMP}, status = #{status,jdbcType=TINYINT}",
  55.         "where id = #{id,jdbcType=BIGINT} and status = 1"})
  56.     int update(JdbcDataSource source);
  57.     /**
  58.      * 删除数据源配置
  59.      *
  60.      * @param id
  61.      *            数据源id
  62.      * @return 影响行数
  63.      */
  64.     @Delete("delete from ge_jdbc_datasource where id = #{id,jdbcType=BIGINT}")
  65.     int delete(Long id);
  66.     /**
  67.      * 查询用户数据源配置
  68.      *
  69.      * @param creator
  70.      *            创建者id
  71.      * @return 数据源列表
  72.      */
  73.     @Select({"select * from ge_jdbc_datasource where creator = #{creator,jdbcType=BIGINT} and status = 1",
  74.         " order by id desc"})
  75.     @Results(id = "resultMap",
  76.         value = {@Result(column = "id", property = "id", jdbcType = JdbcType.BIGINT, id = true),
  77.             @Result(column = "source_type", property = "sourceType", jdbcType = JdbcType.VARCHAR),
  78.             @Result(column = "source_name", property = "sourceName", jdbcType = JdbcType.VARCHAR),
  79.             @Result(column = "jdbc_url", property = "jdbcUrl", jdbcType = JdbcType.VARCHAR),
  80.             @Result(column = "user_name", property = "userName", jdbcType = JdbcType.VARCHAR),
  81.             @Result(column = "password", property = "password", jdbcType = JdbcType.VARCHAR),
  82.             @Result(column = "zk_address", property = "zkAddress", jdbcType = JdbcType.VARCHAR),
  83.             @Result(column = "znode", property = "znode", jdbcType = JdbcType.VARCHAR),
  84.             @Result(column = "database_name", property = "databaseName", jdbcType = JdbcType.VARCHAR),
  85.             @Result(column = "jdbc_driver_class", property = "jdbcDriverClass", jdbcType = JdbcType.VARCHAR),
  86.             @Result(column = "remark", property = "remark", jdbcType = JdbcType.VARCHAR),
  87.             @Result(column = "creator", property = "creator", jdbcType = JdbcType.BIGINT),
  88.             @Result(column = "create_time", property = "createTime", jdbcType = JdbcType.TIMESTAMP),
  89.             @Result(column = "update_time", property = "updateTime", jdbcType = JdbcType.TIMESTAMP),
  90.             @Result(column = "status", property = "status", jdbcType = JdbcType.TINYINT)})
  91.     List<JdbcDataSource> list(Long creator);
  92.     /**
  93.      * 查询用户数据源配置
  94.      *
  95.      * @param creator
  96.      *            创建者id
  97.      * @param type
  98.      *            数据源类型
  99.      * @return 数据源列表
  100.      */
  101.     @Select({"<script>", "select * from ge_jdbc_datasource where creator = #{creator,jdbcType=BIGINT} ",
  102.         "and status = 1 and source_type in ",
  103.         "<foreach item='item' index='index' collection='type' open='(' separator=',' close=')'>",
  104.         "#{item,jdbcType=VARCHAR}", "</foreach>", " order by id desc", "</script>"})
  105.     @ResultMap("resultMap")
  106.     List<JdbcDataSource> listByType(@Param("creator") Long creator, @Param("type") List<String> type);
  107.     /**
  108.      * 条件查询数据源
  109.      *
  110.      * @param jdbcDataSource
  111.      *            查询条件
  112.      * @return 查询结果
  113.      */
  114.     @SelectProvider(type = DataSourceSqlProvider.class, method = "select")
  115.     @ResultMap("resultMap")
  116.     List<JdbcDataSource> select(JdbcDataSource jdbcDataSource);
  117.     /**
  118.      * id 查找
  119.      *
  120.      * @param id
  121.      *            id
  122.      * @return 数据源
  123.      */
  124.     @Options(flushCache = Options.FlushCachePolicy.TRUE)
  125.     @Select("select * from ge_jdbc_datasource where id = #{id,jdbcType=BIGINT} and status = 1")
  126.     @ResultMap("resultMap")
  127.     JdbcDataSource find(Long id);
  128.     /**
  129.      * 数据源名称是否存在
  130.      *
  131.      * @param jdbcDataSource
  132.      *            数据源名称
  133.      * @return 数据源
  134.      */
  135.     @Select({"select * from ge_jdbc_datasource ",
  136.         "where source_name = #{sourceName,jdbcType=VARCHAR} and creator = #{creator,jdbcType=BIGINT}"})
  137.     @ResultMap("resultMap")
  138.     JdbcDataSource nameExist(JdbcDataSource jdbcDataSource);
  139. }
复制代码


DataSourceSqlProvider
  1. package com.geespace.microservices.datasource.dao;
  2. import com.geespace.microservices.datasource.entity.JdbcDataSource;
  3. import org.apache.commons.lang.StringUtils;
  4. import org.apache.ibatis.jdbc.SQL;
  5. /**
  6. * @Author: zjr
  7. * @Date: 2020-04-09 14:20
  8. * @Version 1.0
  9. */
  10. public class DataSourceSqlProvider {
  11.     /**
  12.      * 条件查询sql语句生成
  13.      *
  14.      * @param jdbcDataSource
  15.      *            查询条件
  16.      * @return sql语句
  17.      */
  18.     public String select(JdbcDataSource jdbcDataSource) {
  19.         SQL sql = new SQL();
  20.         sql.SELECT("*");
  21.         sql.FROM("ge_jdbc_datasource");
  22.         sql.WHERE("status = 1");
  23.         if (jdbcDataSource.getCreator() != null) {
  24.             sql.WHERE("creator = #{creator,jdbcType=BIGINT}");
  25.         }
  26.         if (!StringUtils.isBlank(jdbcDataSource.getSourceName())) {
  27.             sql.WHERE("source_name like concat('%', #{sourceName,jdbcType=VARCHAR}, '%')");
  28.         }
  29.         sql.ORDER_BY("id desc");
  30.         return sql.toString();
  31.     }
  32. }
复制代码


MetaDataSourceMapper
  1. package com.geespace.microservices.datasource.dao;
  2. import java.util.List;
  3. import com.geespace.microservices.datasource.entity.JdbcDataSource;
  4. import org.apache.ibatis.annotations.Mapper;
  5. import org.apache.ibatis.annotations.Result;
  6. import org.apache.ibatis.annotations.ResultMap;
  7. import org.apache.ibatis.annotations.Results;
  8. import org.apache.ibatis.annotations.Select;
  9. import org.apache.ibatis.type.JdbcType;
  10. /**
  11. * 内部数据源,系统配置,和外部数据源保持一致
  12. * @Author: zjr
  13. * @Date: 2020-04-07 17:05
  14. * @Version 1.0
  15. */
  16. @Mapper
  17. public interface MetaDataSourceMapper {
  18.     /**
  19.      * 查询用户数据源配置
  20.      *
  21.      * @return 数据源列表
  22.      */
  23.     @Select("select * from ge_meta_datasource ")
  24.     @Results(id = "resultMap",
  25.         value = {@Result(column = "id", property = "id", jdbcType = JdbcType.BIGINT, id = true),
  26.             @Result(column = "source_type", property = "sourceType", jdbcType = JdbcType.VARCHAR),
  27.             @Result(column = "source_name", property = "sourceName", jdbcType = JdbcType.VARCHAR),
  28.             @Result(column = "jdbc_url", property = "jdbcUrl", jdbcType = JdbcType.VARCHAR),
  29.             @Result(column = "user_name", property = "userName", jdbcType = JdbcType.VARCHAR),
  30.             @Result(column = "password", property = "password", jdbcType = JdbcType.VARCHAR),
  31.             @Result(column = "zk_address", property = "zkAddress", jdbcType = JdbcType.VARCHAR),
  32.             @Result(column = "znode", property = "znode", jdbcType = JdbcType.VARCHAR),
  33.             @Result(column = "database_name", property = "databaseName", jdbcType = JdbcType.VARCHAR),
  34.             @Result(column = "jdbc_driver_class", property = "jdbcDriverClass", jdbcType = JdbcType.VARCHAR),
  35.             @Result(column = "remark", property = "remark", jdbcType = JdbcType.VARCHAR),
  36.             @Result(column = "creator", property = "creator", jdbcType = JdbcType.BIGINT),
  37.             @Result(column = "create_time", property = "createTime", jdbcType = JdbcType.TIMESTAMP),
  38.             @Result(column = "update_time", property = "updateTime", jdbcType = JdbcType.TIMESTAMP),
  39.             @Result(column = "status", property = "status", jdbcType = JdbcType.TINYINT)})
  40.     List<JdbcDataSource> list();
  41.     /**
  42.      * id 查找
  43.      *
  44.      * @param id
  45.      *            id
  46.      * @return 数据源
  47.      */
  48.     @Select("select * from ge_meta_datasource where id = #{id,jdbcType=BIGINT} ")
  49.     @ResultMap("resultMap")
  50.     JdbcDataSource find(Long id);
  51. }
复制代码

DataSourceSelectForm

  1. package com.geespace.microservices.datasource.form.datasource;
  2. import javax.validation.constraints.NotNull;
  3. import lombok.Data;
  4. /**
  5. * @Author: zjr
  6. * @Date: 2020-04-09 14:13
  7. * @Version 1.0
  8. */
  9. @Data
  10. public class DataSourceSelectForm {
  11.     /**
  12.      * 数据源名称模糊查询
  13.      */
  14.     private String sourceName;
  15.     /**
  16.      * 创建者
  17.      */
  18.     private Long creator;
  19.     /**
  20.      * 页码
  21.      */
  22.     @NotNull(message = "pageSize不能为空")
  23.     private int pageSize;
  24.     /**
  25.      * 每页数据量
  26.      */
  27.     @NotNull(message = "pageNum不能为空")
  28.     private int pageNum;
  29. }
复制代码



DataSourceAddForm
  1. package com.geespace.microservices.datasource.form.datasource;
  2. import javax.validation.constraints.NotBlank;
  3. import lombok.Data;
  4. /**
  5. * @Author: zjr
  6. * @Date: 2020-04-07 17:46
  7. * @Version 1.0
  8. */
  9. @Data
  10. public class DataSourceAddForm {
  11.     /**
  12.      * 数据源类型
  13.      */
  14.     @NotBlank(message = "数据源类型不能为空")
  15.     private String sourceType;
  16.     /**
  17.      * 数据源名称
  18.      */
  19.     @NotBlank(message = "数据源名称不能为空")
  20.     private String sourceName;
  21.     /**
  22.      * jdbc url
  23.      */
  24.     private String jdbcUrl;
  25.     /**
  26.      * 用户名
  27.      */
  28.     private String userName;
  29.     /**
  30.      * 密码
  31.      */
  32.     private String password;
  33.     /**
  34.      * zk地址
  35.      */
  36.     private String zkAddress;
  37.     /**
  38.      * hbase znode
  39.      */
  40.     private String znode;
  41.     /**
  42.      * 数据库名称
  43.      */
  44.     private String databaseName;
  45.     /**
  46.      * 驱动类
  47.      */
  48.     private String jdbcDriverClass;
  49.     /**
  50.      * 备注
  51.      */
  52.     private String remark;
  53.     /**
  54.      * 创建者
  55.      */
  56.     private Long creator;
  57. }
复制代码



DataSourceUpdateForm
  1. package com.geespace.microservices.datasource.form.datasource;
  2. import javax.validation.constraints.NotBlank;
  3. import javax.validation.constraints.NotNull;
  4. import lombok.Data;
  5. /**
  6. * @Author: zjr
  7. * @Date: 2020-04-07 17:46
  8. * @Version 1.0
  9. */
  10. @Data
  11. public class DataSourceUpdateForm {
  12.     /**
  13.      * id
  14.      */
  15.     @NotNull(message = "id不能为空")
  16.     private Long id;
  17.     /**
  18.      * 数据源类型
  19.      */
  20.     @NotBlank(message = "数据源类型不能为空")
  21.     private String sourceType;
  22.     /**
  23.      * 数据源名称
  24.      */
  25.     @NotBlank(message = "数据源名称不能为空")
  26.     private String sourceName;
  27.     /**
  28.      * jdbc url
  29.      */
  30.     private String jdbcUrl;
  31.     /**
  32.      * 用户名
  33.      */
  34.     private String userName;
  35.     /**
  36.      * 密码
  37.      */
  38.     private String password;
  39.     /**
  40.      * zk地址
  41.      */
  42.     private String zkAddress;
  43.     /**
  44.      * hbase znode
  45.      */
  46.     private String znode;
  47.     /**
  48.      * 数据库名称
  49.      */
  50.     private String databaseName;
  51.     /**
  52.      * 驱动类
  53.      */
  54.     private String jdbcDriverClass;
  55.     /**
  56.      * 备注
  57.      */
  58.     private String remark;
  59.     /**
  60.      * 创建者
  61.      */
  62.     private Long creator;
  63. }
复制代码

YAPI测试用例
5.1查询全部同步任务配置(分页)
2021-10-27_195455.jpg

  1. {
  2.   "pageNum": 1,
  3.   "pageSize": 10,
  4.   "projectId": 28,
  5.   "name": "测试"
  6. }
复制代码

5.2 创建同步任务配置-mysql->mysql
  1. {
  2.   "name": "测试同步任务配置-mysql-mysql-1",
  3.   "description": "测试同步任务配置-mysql-mysql-1",
  4.   "projectName": "test测试1",
  5.   "projectId": 28,
  6.   "syncType": 2,
  7.   "readerConfigId": 1,
  8.   "readerParam": {
  9.     "table": "test_test"
  10.   },
  11.   "writerConfigId": 1,
  12.   "writerParam": {
  13.     "table": "test_test_1"
  14.   },
  15.   "columnMap": [
  16.     {
  17.       "reader": "id",
  18.       "writer": "id"
  19.     },
  20.     {
  21.       "reader": "name",
  22.       "writer": "name"
  23.     }
  24.   ]
  25. }
复制代码

5.3 创建同步任务配置-hbase->hbase
  1. {
  2.   "name": "测试同步任务配置-hbase-hbase-1",
  3.   "description": "测试同步任务配置-hbase-hbase-1",
  4.   "projectName": "test测试1",
  5.   "projectId": 28,
  6.   "syncType": 2,
  7.   "readerConfigId": 130,
  8.   "readerParam": {
  9.     "table": "test_test"
  10.   },
  11.   "writerConfigId": 130,
  12.   "writerParam": {
  13.     "table": "test_test_1",
  14.     "rowkeyColumns": [
  15.       "f:id",
  16.       "f:name"
  17.     ]
  18.   },
  19.   "columnMap": [
  20.     {
  21.       "reader": "f:id",
  22.       "writer": "f:id"
  23.     },
  24.     {
  25.       "reader": "f:name",
  26.       "writer": "f:name"
  27.     }
  28.   ]
  29. }
复制代码

5.4 创建同步任务配置-mysql->hbase
  1. {
  2.   "name": "测试同步任务配置-mysql-hbase-1",
  3.   "description": "测试同步任务配置mysql-hbase-1",
  4.   "projectName": "test测试1",
  5.   "projectId": 28,
  6.   "syncType": 2,
  7.   "readerConfigId": 1,
  8.   "readerParam": {
  9.     "table": "test_test"
  10.   },
  11.   "writerConfigId": 130,
  12.   "writerParam": {
  13.     "table": "test_test_1",
  14.     "rowkeyColumns": [
  15.       "f:id",
  16.       "f:name"
  17.     ]
  18.   },
  19.   "columnMap": [
  20.     {
  21.       "reader": "id",
  22.       "writer": "f:id"
  23.     },
  24.     {
  25.       "reader": "name",
  26.       "writer": "f:name"
  27.     }
  28.   ]
  29. }
复制代码

5.5 创建同步任务配置-hbase->mysql
  1. {
  2.   "name": "测试同步任务配置-hbase-mysql-1",
  3.   "description": "测试同步任务配置-hbase-mysql-1",
  4.   "projectName": "test测试1",
  5.   "projectId": 28,
  6.   "syncType": 2,
  7.   "readerConfigId": 130,
  8.   "readerParam": {
  9.     "table": "test_test",
  10.     "rowkeyColumns": [
  11.       "f:id",
  12.       "f:name"
  13.     ]
  14.   },
  15.   "writerConfigId": 1,
  16.   "writerParam": {
  17.     "table": "test_test_1"
  18.   },
  19.   "columnMap": [
  20.     {
  21.       "reader": "f:id",
  22.       "writer": "id"
  23.     },
  24.     {
  25.       "reader": "f:name",
  26.       "writer": "name"
  27.     }
  28.   ]
  29. }
复制代码

5.6 更新同步任务配置
  1. {
  2.   "id": 82,
  3.   "name": "测试同步任务配置-mysql-3",
  4.   "description": "测试同步任务配置-mysql-3",
  5.   "projectName": "数据同步任务",
  6.   "projectId": 19,
  7.   "syncType": 2,
  8.   "readerConfigId": 1,
  9.   "readerParam": {
  10.     "table": "test_test"
  11.   },
  12.   "writerConfigId": 1,
  13.   "writerParam": {
  14.     "table": "test_test_1"
  15.   },
  16.   "columnMap": [
  17.     {
  18.       "reader": "id",
  19.       "writer": "id"
  20.     },
  21.     {
  22.       "reader": "name",
  23.       "writer": "name"
  24.     }
  25.   ]
  26. }
复制代码

5.7 删除同步任务配置
2021-10-27_195648.jpg

5.8 查询同步任务配置
2021-10-27_195720.jpg

5.9 执行数据同步任务
2021-10-27_195752.jpg

5.10 停止数据同步任务
2021-10-27_195822.jpg

作者:刘大猫.
来源:https://blog.csdn.net/a924382407/article/details/120951230


最新经典文章,欢迎关注公众号


已有(1)人评论

跳转到指定楼层
若无梦何远方 发表于 2021-10-28 14:36:40
是不是太秀了没太看懂在干嘛
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条