@@ -92,8 +92,7 @@ | |||||
<orderEntry type="library" name="Maven: com.alibaba.otter:canal.protocol:1.1.5" level="project" /> | <orderEntry type="library" name="Maven: com.alibaba.otter:canal.protocol:1.1.5" level="project" /> | ||||
<orderEntry type="library" name="Maven: com.alibaba.otter:canal.common:1.1.5" level="project" /> | <orderEntry type="library" name="Maven: com.alibaba.otter:canal.common:1.1.5" level="project" /> | ||||
<orderEntry type="library" name="Maven: com.alibaba:druid:1.2.6" level="project" /> | <orderEntry type="library" name="Maven: com.alibaba:druid:1.2.6" level="project" /> | ||||
<orderEntry type="module" module-name="sw-system-bpm" /> | |||||
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter-web:2.6.9" level="project" /> | |||||
<orderEntry type="module" module-name="sw-framework-core" /> | |||||
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter:2.6.9" level="project" /> | <orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter:2.6.9" level="project" /> | ||||
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot:2.6.9" level="project" /> | <orderEntry type="library" name="Maven: org.springframework.boot:spring-boot:2.6.9" level="project" /> | ||||
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-autoconfigure:2.6.9" level="project" /> | <orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-autoconfigure:2.6.9" level="project" /> | ||||
@@ -102,7 +101,11 @@ | |||||
<orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-api:2.17.2" level="project" /> | <orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-api:2.17.2" level="project" /> | ||||
<orderEntry type="library" name="Maven: org.slf4j:jul-to-slf4j:1.7.36" level="project" /> | <orderEntry type="library" name="Maven: org.slf4j:jul-to-slf4j:1.7.36" level="project" /> | ||||
<orderEntry type="library" name="Maven: jakarta.annotation:jakarta.annotation-api:1.3.5" level="project" /> | <orderEntry type="library" name="Maven: jakarta.annotation:jakarta.annotation-api:1.3.5" level="project" /> | ||||
<orderEntry type="library" name="Maven: org.yaml:snakeyaml:1.29" level="project" /> | |||||
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter-web:2.6.9" level="project" /> | |||||
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter-json:2.6.9" level="project" /> | <orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter-json:2.6.9" level="project" /> | ||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.13.3" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.13.3" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.13.3" level="project" /> | <orderEntry type="library" name="Maven: com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.13.3" level="project" /> | ||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.3" level="project" /> | <orderEntry type="library" name="Maven: com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.3" level="project" /> | ||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.module:jackson-module-parameter-names:2.13.3" level="project" /> | <orderEntry type="library" name="Maven: com.fasterxml.jackson.module:jackson-module-parameter-names:2.13.3" level="project" /> | ||||
@@ -112,7 +115,7 @@ | |||||
<orderEntry type="library" name="Maven: org.apache.tomcat.embed:tomcat-embed-websocket:9.0.64" level="project" /> | <orderEntry type="library" name="Maven: org.apache.tomcat.embed:tomcat-embed-websocket:9.0.64" level="project" /> | ||||
<orderEntry type="library" name="Maven: org.springframework:spring-web:5.3.21" level="project" /> | <orderEntry type="library" name="Maven: org.springframework:spring-web:5.3.21" level="project" /> | ||||
<orderEntry type="library" name="Maven: org.springframework:spring-webmvc:5.3.21" level="project" /> | <orderEntry type="library" name="Maven: org.springframework:spring-webmvc:5.3.21" level="project" /> | ||||
<orderEntry type="module" module-name="sw-framework-core" /> | |||||
<orderEntry type="library" name="Maven: org.apache.httpcomponents:httpclient:4.5.13" level="project" /> | |||||
<orderEntry type="library" name="Maven: org.apache.httpcomponents:httpcore:4.4.15" level="project" /> | <orderEntry type="library" name="Maven: org.apache.httpcomponents:httpcore:4.4.15" level="project" /> | ||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.13.3" level="project" /> | <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.13.3" level="project" /> | ||||
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter-jdbc:2.6.9" level="project" /> | <orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter-jdbc:2.6.9" level="project" /> | ||||
@@ -131,32 +134,6 @@ | |||||
<orderEntry type="library" name="Maven: org.apache.tika:tika-core:2.1.0" level="project" /> | <orderEntry type="library" name="Maven: org.apache.tika:tika-core:2.1.0" level="project" /> | ||||
<orderEntry type="library" name="Maven: com.github.ben-manes.caffeine:caffeine:2.9.3" level="project" /> | <orderEntry type="library" name="Maven: com.github.ben-manes.caffeine:caffeine:2.9.3" level="project" /> | ||||
<orderEntry type="library" name="Maven: org.checkerframework:checker-qual:3.19.0" level="project" /> | <orderEntry type="library" name="Maven: org.checkerframework:checker-qual:3.19.0" level="project" /> | ||||
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter-freemarker:2.6.9" level="project" /> | |||||
<orderEntry type="library" name="Maven: org.freemarker:freemarker:2.3.31" level="project" /> | |||||
<orderEntry type="library" name="Maven: org.springframework:spring-context-support:5.3.21" level="project" /> | |||||
<orderEntry type="library" name="Maven: org.quartz-scheduler:quartz:2.3.2" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.mchange:mchange-commons-java:0.2.15" level="project" /> | |||||
<orderEntry type="library" name="Maven: net.coobird:thumbnailator:0.4.17" level="project" /> | |||||
<orderEntry type="library" name="Maven: org.jclarion:image4j:0.7" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.3" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.13.3" level="project" /> | |||||
<orderEntry type="library" name="Maven: org.yaml:snakeyaml:1.29" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.13.3" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.13.3" level="project" /> | |||||
<orderEntry type="library" name="Maven: org.codehaus.woodstox:stax2-api:4.2.1" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.fasterxml.woodstox:woodstox-core:6.2.7" level="project" /> | |||||
<orderEntry type="library" name="Maven: org.apache.velocity:velocity-engine-core:2.3" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.aliyun:aliyun-java-sdk-core:4.0.6" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.google.code.gson:gson:2.8.9" level="project" /> | |||||
<orderEntry type="library" name="Maven: javax.xml.bind:jaxb-api:2.3.1" level="project" /> | |||||
<orderEntry type="library" name="Maven: javax.activation:javax.activation-api:1.2.0" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.sun.xml.bind:jaxb-core:2.1.14" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.sun.xml.bind:jaxb-impl:2.1" level="project" /> | |||||
<orderEntry type="library" name="Maven: javax.activation:activation:1.1.1" level="project" /> | |||||
<orderEntry type="library" name="Maven: commons-net:commons-net:3.6" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.jcraft:jsch:0.1.53" level="project" /> | |||||
<orderEntry type="library" name="Maven: commons-fileupload:commons-fileupload:1.4" level="project" /> | |||||
<orderEntry type="library" name="Maven: org.apache.httpcomponents:httpclient:4.5.13" level="project" /> | |||||
<orderEntry type="library" name="Maven: mysql:mysql-connector-java:8.0.29" level="project" /> | <orderEntry type="library" name="Maven: mysql:mysql-connector-java:8.0.29" level="project" /> | ||||
<orderEntry type="library" name="Maven: org.apache.ddlutils:ddlutils:1.0" level="project" /> | <orderEntry type="library" name="Maven: org.apache.ddlutils:ddlutils:1.0" level="project" /> | ||||
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.0.4" level="project" /> | <orderEntry type="library" name="Maven: commons-logging:commons-logging:1.0.4" level="project" /> | ||||
@@ -30,7 +30,7 @@ | |||||
<dependency> | <dependency> | ||||
<groupId>cc.smtweb</groupId> | <groupId>cc.smtweb</groupId> | ||||
<artifactId>sw-system-bpm</artifactId> | |||||
<artifactId>sw-framework-core</artifactId> | |||||
<version>3.1.0-SNAPSHOT</version> | <version>3.1.0-SNAPSHOT</version> | ||||
</dependency> | </dependency> | ||||
@@ -1,5 +1,7 @@ | |||||
package cc.smtweb.system.canal.example; | package cc.smtweb.system.canal.example; | ||||
import cc.smtweb.framework.core.common.CanalVO; | |||||
import cc.smtweb.framework.core.common.SwEnum; | |||||
import cc.smtweb.framework.core.util.CommUtil; | import cc.smtweb.framework.core.util.CommUtil; | ||||
import cc.smtweb.framework.core.util.DateUtil; | import cc.smtweb.framework.core.util.DateUtil; | ||||
import cc.smtweb.framework.core.util.JsonUtil; | import cc.smtweb.framework.core.util.JsonUtil; | ||||
@@ -17,8 +19,6 @@ import java.io.File; | |||||
import java.io.FileInputStream; | import java.io.FileInputStream; | ||||
import java.net.InetSocketAddress; | import java.net.InetSocketAddress; | ||||
import java.util.*; | import java.util.*; | ||||
import java.util.regex.Matcher; | |||||
import java.util.regex.Pattern; | |||||
/** | /** | ||||
* @Author yaoq | * @Author yaoq | ||||
@@ -184,18 +184,22 @@ public class ClientInstance { | |||||
* @param entrys | * @param entrys | ||||
*/ | */ | ||||
private void dataHandle(List<CanalEntry.Entry> entrys) throws Exception { | private void dataHandle(List<CanalEntry.Entry> entrys) throws Exception { | ||||
List<ClientVO> data = new ArrayList<>(); | |||||
List<CanalVO> data = new ArrayList<>(); | |||||
for (CanalEntry.Entry entry : entrys) { | for (CanalEntry.Entry entry : entrys) { | ||||
if (CanalEntry.EntryType.ROWDATA != entry.getEntryType()) continue; | if (CanalEntry.EntryType.ROWDATA != entry.getEntryType()) continue; | ||||
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); | CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); | ||||
CanalEntry.EventType eventType = rowChange.getEventType(); | CanalEntry.EventType eventType = rowChange.getEventType(); | ||||
ClientVO row = null; | |||||
if (eventType == CanalEntry.EventType.DELETE) { | |||||
row = saveDeleteSql(entry); | |||||
CanalVO row = null; | |||||
if (eventType == CanalEntry.EventType.CREATE) { | |||||
row = saveCreateSql(entry, rowChange); | |||||
} else if (eventType == CanalEntry.EventType.ALTER) { | |||||
row = saveAlterSql(entry, rowChange); | |||||
} else if (eventType == CanalEntry.EventType.DELETE) { | |||||
row = saveDeleteSql(entry, rowChange); | |||||
} else if (eventType == CanalEntry.EventType.UPDATE) { | } else if (eventType == CanalEntry.EventType.UPDATE) { | ||||
row = saveUpdateSql(entry); | |||||
row = saveUpdateSql(entry, rowChange); | |||||
} else if (eventType == CanalEntry.EventType.INSERT) { | } else if (eventType == CanalEntry.EventType.INSERT) { | ||||
row = saveInsertSql(entry); | |||||
row = saveInsertSql(entry, rowChange); | |||||
} | } | ||||
if (row == null) continue; | if (row == null) continue; | ||||
data.add(row); | data.add(row); | ||||
@@ -206,49 +210,30 @@ public class ClientInstance { | |||||
saveToFile(data); | saveToFile(data); | ||||
} | } | ||||
/** | /** | ||||
* 保存更新语句 | |||||
* 保存数据表新增语句 | |||||
* | * | ||||
* @param entry | * @param entry | ||||
*/ | */ | ||||
private ClientVO saveUpdateSql(CanalEntry.Entry entry) { | |||||
private CanalVO saveCreateSql(CanalEntry.Entry entry, CanalEntry.RowChange rowChange) { | |||||
try { | try { | ||||
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); | |||||
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); | |||||
for (CanalEntry.RowData rowData : rowDatasList) { | |||||
List<CanalEntry.Column> newColumnList = rowData.getAfterColumnsList(); | |||||
Map<String, String> data = new HashMap<>(); | |||||
newColumnList.stream().forEach(item -> { | |||||
if (StringUtils.isNotEmpty(item.getValue())) { | |||||
data.put(lineToHump(item.getName()), item.getValue()); | |||||
} | |||||
}); | |||||
List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList(); | |||||
ClientVO update = ClientVO.ok(entry.getHeader().getTableName(), "UPDATE", data, oldColumnList.get(0).getValue()); | |||||
System.out.println("更新返回 : " + JsonUtil.encodeString(update)); | |||||
return update; | |||||
} | |||||
return CanalVO.ok(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), SwEnum.DbEventType.CREATE.value, null, rowChange.getSql(), null, null); | |||||
} catch (Exception e) { | } catch (Exception e) { | ||||
e.printStackTrace(); | e.printStackTrace(); | ||||
} | } | ||||
return null; | return null; | ||||
} | } | ||||
/** | /** | ||||
* 保存删除语句 | |||||
* 保存结构变更语句 | |||||
* | * | ||||
* @param entry | * @param entry | ||||
*/ | */ | ||||
private ClientVO saveDeleteSql(CanalEntry.Entry entry) { | |||||
private CanalVO saveAlterSql(CanalEntry.Entry entry, CanalEntry.RowChange rowChange) { | |||||
try { | try { | ||||
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); | |||||
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); | |||||
for (CanalEntry.RowData rowData : rowDatasList) { | |||||
List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList(); | |||||
ClientVO delete = ClientVO.ok(entry.getHeader().getTableName(), "DELETE", null, oldColumnList.get(0).getValue()); | |||||
System.out.println("删除返回 : " + JsonUtil.encodeString(delete)); | |||||
return delete; | |||||
} | |||||
return CanalVO.ok(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), SwEnum.DbEventType.ALTER.value, null, rowChange.getSql(), null, null); | |||||
} catch (Exception e) { | } catch (Exception e) { | ||||
e.printStackTrace(); | e.printStackTrace(); | ||||
} | } | ||||
@@ -260,21 +245,16 @@ public class ClientInstance { | |||||
* | * | ||||
* @param entry | * @param entry | ||||
*/ | */ | ||||
private ClientVO saveInsertSql(CanalEntry.Entry entry) { | |||||
private CanalVO saveInsertSql(CanalEntry.Entry entry, CanalEntry.RowChange rowChange) { | |||||
try { | try { | ||||
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); | |||||
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); | List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); | ||||
for (CanalEntry.RowData rowData : rowDatasList) { | for (CanalEntry.RowData rowData : rowDatasList) { | ||||
List<CanalEntry.Column> columnList = rowData.getAfterColumnsList(); | List<CanalEntry.Column> columnList = rowData.getAfterColumnsList(); | ||||
Map<String, String> data = new HashMap<>(); | Map<String, String> data = new HashMap<>(); | ||||
columnList.stream().forEach(item -> { | columnList.stream().forEach(item -> { | ||||
if (StringUtils.isNotEmpty(item.getValue())) { | |||||
data.put(lineToHump(item.getName()), item.getValue()); | |||||
} | |||||
data.put(item.getName(), item.getValue()); | |||||
}); | }); | ||||
ClientVO insert = ClientVO.ok(entry.getHeader().getTableName(), "INSERT", data, null); | |||||
System.out.println("插入返回 : " + JsonUtil.encodeString(insert)); | |||||
return insert; | |||||
return CanalVO.ok(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), SwEnum.DbEventType.INSERT.value, data, null, null, null); | |||||
} | } | ||||
} catch (Exception e) { | } catch (Exception e) { | ||||
e.printStackTrace(); | e.printStackTrace(); | ||||
@@ -283,36 +263,58 @@ public class ClientInstance { | |||||
} | } | ||||
/** | /** | ||||
* 下划线转驼峰 | |||||
* 保存更新语句 | |||||
* | * | ||||
* @param str | |||||
* @param entry | |||||
*/ | */ | ||||
private static Pattern linePattern = Pattern.compile("_(\\w)"); | |||||
public static String lineToHump(String str) { | |||||
str = str.toLowerCase(); | |||||
Matcher matcher = linePattern.matcher(str); | |||||
StringBuffer sb = new StringBuffer(); | |||||
while (matcher.find()) { | |||||
matcher.appendReplacement(sb, matcher.group(1).toUpperCase()); | |||||
private CanalVO saveUpdateSql(CanalEntry.Entry entry, CanalEntry.RowChange rowChange) { | |||||
try { | |||||
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); | |||||
for (CanalEntry.RowData rowData : rowDatasList) { | |||||
List<CanalEntry.Column> newColumnList = rowData.getAfterColumnsList(); | |||||
Map<String, String> data = new HashMap<>(); | |||||
newColumnList.stream().forEach(item -> { | |||||
data.put(item.getName(), item.getValue()); | |||||
}); | |||||
List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList(); | |||||
return CanalVO.ok(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), SwEnum.DbEventType.UPDATE.value, data, null, oldColumnList.get(0).getName(), oldColumnList.get(0).getValue()); | |||||
} | |||||
} catch (Exception e) { | |||||
e.printStackTrace(); | |||||
} | } | ||||
matcher.appendTail(sb); | |||||
return sb.toString(); | |||||
return null; | |||||
} | } | ||||
/** | |||||
* 保存删除语句 | |||||
* | |||||
* @param entry | |||||
*/ | |||||
private CanalVO saveDeleteSql(CanalEntry.Entry entry, CanalEntry.RowChange rowChange) { | |||||
try { | |||||
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); | |||||
for (CanalEntry.RowData rowData : rowDatasList) { | |||||
List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList(); | |||||
return CanalVO.ok(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), SwEnum.DbEventType.DELETE.value, null, null, oldColumnList.get(0).getName(), oldColumnList.get(0).getValue()); | |||||
} | |||||
} catch (Exception e) { | |||||
e.printStackTrace(); | |||||
} | |||||
return null; | |||||
} | |||||
private static synchronized File createFile(String basePath) throws Exception { | private static synchronized File createFile(String basePath) throws Exception { | ||||
String path = basePath; | String path = basePath; | ||||
if (!path.endsWith("/")) { | if (!path.endsWith("/")) { | ||||
path = path + "/"; | path = path + "/"; | ||||
} | } | ||||
File file = new File(path + DateUtil.getNowYm() + "/" + DateUtil.nowDateLong() + "/" + DateUtil.toLongDateTimeString(null) + ".cf"); | |||||
File file = new File(path + "/" + DateUtil.toLongDateTimeString(null) + ".cf"); | |||||
if (!file.getParentFile().exists()) { // 如果父目录不存在,创建父目录 | if (!file.getParentFile().exists()) { // 如果父目录不存在,创建父目录 | ||||
file.getParentFile().mkdirs(); | file.getParentFile().mkdirs(); | ||||
} | } | ||||
while (file.exists()) { | while (file.exists()) { | ||||
Thread.sleep(1); | Thread.sleep(1); | ||||
file = new File(path + DateUtil.getNowYm() + "/" + DateUtil.nowDateLong() + "/" + DateUtil.toLongDateTimeString(null) + ".cf"); | |||||
file = new File(path + "/" + DateUtil.toLongDateTimeString(null) + ".cf"); | |||||
/*if (file.exists()) { | /*if (file.exists()) { | ||||
return file; | return file; | ||||
}*/ | }*/ | ||||
@@ -322,7 +324,7 @@ public class ClientInstance { | |||||
return file; | return file; | ||||
} | } | ||||
private void saveToFile(List<ClientVO> rows) throws Exception { | |||||
private void saveToFile(List<CanalVO> rows) throws Exception { | |||||
// 将格式化后的字符串写入文件 | // 将格式化后的字符串写入文件 | ||||
FileUtils.writeStringToFile(createFile(ClientConsts.getFilepath(properties)), JsonUtil.encodeString(rows), "UTF-8"); | FileUtils.writeStringToFile(createFile(ClientConsts.getFilepath(properties)), JsonUtil.encodeString(rows), "UTF-8"); | ||||
} | } | ||||
@@ -1,56 +0,0 @@ | |||||
package cc.smtweb.system.canal.example; | |||||
import java.util.Map; | |||||
/** | |||||
* @Author yaoq | |||||
* @Date 2022年09月13日 18:20 | |||||
* @Description | |||||
*/ | |||||
public class ClientVO { | |||||
private String tableName; // 表名 | |||||
private String type; // 类型(更新、删除、插入) | |||||
private Map<String,String> data; // 数据JSON 自己转对应表格实体类 | |||||
private String id; // 更新或删除都是根据ID来 | |||||
public static ClientVO ok(String tableName, String type, Map<String,String> data, String id){ | |||||
ClientVO canalVO=new ClientVO(); | |||||
canalVO.setId(id); | |||||
canalVO.setTableName(tableName); | |||||
canalVO.setType(type); | |||||
canalVO.setData(data); | |||||
return canalVO; | |||||
} | |||||
public String getTableName() { | |||||
return tableName; | |||||
} | |||||
public void setTableName(String tableName) { | |||||
this.tableName = tableName; | |||||
} | |||||
public String getType() { | |||||
return type; | |||||
} | |||||
public void setType(String type) { | |||||
this.type = type; | |||||
} | |||||
public Map<String, String> getData() { | |||||
return data; | |||||
} | |||||
public void setData(Map<String, String> data) { | |||||
this.data = data; | |||||
} | |||||
public String getId() { | |||||
return id; | |||||
} | |||||
public void setId(String id) { | |||||
this.id = id; | |||||
} | |||||
} |
@@ -9,14 +9,9 @@ canal.server.port=11111 | |||||
canal.server.username= | canal.server.username= | ||||
canal.server.password= | canal.server.password= | ||||
# 数据库匹配规则.*\\..*, scmz\\..* | # 数据库匹配规则.*\\..*, scmz\\..* | ||||
canal.server.filter=scmz\\..* | |||||
canal.server.filter=tzrs\\..* | |||||
# 文件存放路径 E:/canalFile | # 文件存放路径 E:/canalFile | ||||
canal.file.path= | |||||
canal.file.path=E:/canalFile | |||||
# mysql、dmsql、kbsql、oracle | # mysql、dmsql、kbsql、oracle | ||||
canal.db.type=kbsql | |||||
canal.db.driverName=com.kingbase8.Driver | |||||
canal.db.jdbcUrl=jdbc:kingbase8://172.28.123.205:54321/HLJTY?useUnicode=true&characterEncoding=utf-8 | |||||
canal.db.username=system | |||||
canal.db.password=system | |||||
@@ -39,12 +39,6 @@ | |||||
<!--平台--> | <!--平台--> | ||||
<dependency> | <dependency> | ||||
<groupId>cc.smtweb</groupId> | <groupId>cc.smtweb</groupId> | ||||
<artifactId>sw-system-bpm</artifactId> | |||||
<version>3.1.0-SNAPSHOT</version> | |||||
</dependency> | |||||
<dependency> | |||||
<groupId>cc.smtweb</groupId> | |||||
<artifactId>sw-framework-core</artifactId> | <artifactId>sw-framework-core</artifactId> | ||||
<version>3.1.0-SNAPSHOT</version> | <version>3.1.0-SNAPSHOT</version> | ||||
</dependency> | </dependency> | ||||
@@ -73,13 +67,6 @@ | |||||
<artifactId>dmsql</artifactId> | <artifactId>dmsql</artifactId> | ||||
<version>1.0.0</version> | <version>1.0.0</version> | ||||
</dependency> | </dependency> | ||||
<dependency> | |||||
<groupId>cc.smtweb</groupId> | |||||
<artifactId>canal.deployer</artifactId> | |||||
<version>1.1.5</version> | |||||
<scope>compile</scope> | |||||
</dependency> | |||||
</dependencies> | </dependencies> | ||||
<repositories> | <repositories> | ||||
@@ -4,10 +4,10 @@ import org.springframework.boot.SpringApplication; | |||||
import org.springframework.boot.autoconfigure.SpringBootApplication; | import org.springframework.boot.autoconfigure.SpringBootApplication; | ||||
@SpringBootApplication | @SpringBootApplication | ||||
public class FileApplication { | |||||
public class CanalFileApplication { | |||||
public static void main(String[] args) { | public static void main(String[] args) { | ||||
SpringApplication.run(FileApplication.class, args); | |||||
SpringApplication.run(CanalFileApplication.class, args); | |||||
} | } | ||||
} | } |
@@ -10,12 +10,12 @@ import org.springframework.context.annotation.Configuration; | |||||
*/ | */ | ||||
@Configuration | @Configuration | ||||
@ComponentScan | @ComponentScan | ||||
public class FileConfiguration { | |||||
public class CanalFileConfiguration { | |||||
/** | /** | ||||
* 配置自定义service扫描路径 {module}/{service}/{method} | * 配置自定义service扫描路径 {module}/{service}/{method} | ||||
*/ | */ | ||||
@Bean | @Bean | ||||
public ControllerConfig canalConfiguration() { | |||||
public ControllerConfig CanalFileConfiguration() { | |||||
return new ControllerConfig("canalFile", "cc.smtweb.system.canal.file", null); | return new ControllerConfig("canalFile", "cc.smtweb.system.canal.file", null); | ||||
} | } | ||||
} | } |
@@ -3,9 +3,8 @@ package cc.smtweb.system.canal.file; | |||||
import cc.smtweb.framework.core.annotation.SwStartListener; | import cc.smtweb.framework.core.annotation.SwStartListener; | ||||
import cc.smtweb.framework.core.common.SwConsts; | import cc.smtweb.framework.core.common.SwConsts; | ||||
import cc.smtweb.framework.core.mvc.controller.IStartListener; | import cc.smtweb.framework.core.mvc.controller.IStartListener; | ||||
import cc.smtweb.framework.core.systask.SysThreadPool; | |||||
import cc.smtweb.framework.core.systask.SysThreadWorker; | |||||
import cc.smtweb.system.canal.deployer.CanalLauncher; | |||||
import cc.smtweb.framework.core.systask.SysServiceFactory; | |||||
import cc.smtweb.system.canal.file.common.FileDecodeService; | |||||
import lombok.extern.slf4j.Slf4j; | import lombok.extern.slf4j.Slf4j; | ||||
/** | /** | ||||
@@ -15,7 +14,7 @@ import lombok.extern.slf4j.Slf4j; | |||||
*/ | */ | ||||
@Slf4j | @Slf4j | ||||
@SwStartListener | @SwStartListener | ||||
public class FileStartedListener implements IStartListener { | |||||
public class CanalFileStartedListener implements IStartListener { | |||||
@Override | @Override | ||||
public int order() { | public int order() { | ||||
@@ -26,16 +25,12 @@ public class FileStartedListener implements IStartListener { | |||||
@Override | @Override | ||||
public void init() { | public void init() { | ||||
SwConsts.SysParam.enableCanal = true; | SwConsts.SysParam.enableCanal = true; | ||||
SwConsts.SysParam.RUN_PROJECTS = ""; | |||||
SysServiceFactory.getInstance().reg(new FileDecodeService()); | |||||
} | } | ||||
@Override | @Override | ||||
public void run() { | public void run() { | ||||
if (!SwConsts.SysParam.enableCanal) return; | |||||
SysThreadPool.getInstance().addTask(new SysThreadWorker("canal server") { | |||||
@Override | |||||
public void localWork() throws Exception { | |||||
CanalLauncher.startServer(); | |||||
} | |||||
}); | |||||
} | } | ||||
} | } |
@@ -0,0 +1,173 @@ | |||||
package cc.smtweb.system.canal.file.common; | |||||
import cc.smtweb.framework.core.common.CanalVO; | |||||
import cc.smtweb.framework.core.db.DbEngine; | |||||
import cc.smtweb.framework.core.db.jdbc.IDbWorker; | |||||
import cc.smtweb.framework.core.exception.SwException; | |||||
import cc.smtweb.framework.core.util.*; | |||||
import lombok.extern.slf4j.Slf4j; | |||||
import org.apache.commons.io.FileUtils; | |||||
import org.apache.commons.lang.StringUtils; | |||||
import org.springframework.stereotype.Component; | |||||
import java.io.File; | |||||
import java.io.FileFilter; | |||||
import java.util.*; | |||||
@Slf4j | |||||
@Component | |||||
public abstract class AbstractFileWork { | |||||
private final static String suffix = ".cf"; | |||||
protected FileDecodeConfig fileConfig; | |||||
protected DbEngine getDbEngine() { | |||||
return DbEngine.getInstance(); | |||||
} | |||||
public AbstractFileWork() { | |||||
fileConfig = SpringUtil.getBean(FileDecodeConfig.class); | |||||
} | |||||
/** | |||||
* @Author yaoq | |||||
* @Date 2022/9/20 9:25 | |||||
* @Params | |||||
* @Return | |||||
* @Description 解析文件 | |||||
*/ | |||||
public void doWork() { | |||||
String path = fileConfig.getPath(); | |||||
if (!path.endsWith("/")) { | |||||
path = path + "/"; | |||||
} | |||||
log.debug("[解析canal文件] path:" + path); | |||||
File dir = new File(path); | |||||
File[] fs = dir.listFiles(new FileFilter() { | |||||
@Override | |||||
public boolean accept(File file) { | |||||
return !file.isDirectory() && file.getName().endsWith(suffix); | |||||
} | |||||
}); | |||||
if (fs == null) return; | |||||
List<File> files = Arrays.asList(fs); | |||||
Collections.sort(files, (o1, o2) -> StringUtil.chineseCompare(o1.getName(), o2.getName())); | |||||
log.debug("[解析canal文件] size:" + files.size()); | |||||
for (File f : files) { | |||||
try { | |||||
//为保证数据一致性,发现错误,不能继续,卡在这 | |||||
getDbEngine().doTrans(new IDbWorker() { | |||||
@Override | |||||
public void work() { | |||||
try { | |||||
decodeFile(f); | |||||
} catch (Exception e) { | |||||
throw new SwException(e); | |||||
} | |||||
} | |||||
}); | |||||
//备份文件 | |||||
FileUtil.copyFile(f, path + "/bak/" + DateUtil.getNowYm() + "/" + f.getName(), true); | |||||
} catch (Exception e) { | |||||
//出错了跳出,避免数据遗失 | |||||
log.error("[解析canal文件] file:" + f.getName() + " 写入数据失败", e); | |||||
break; | |||||
} | |||||
} | |||||
} | |||||
private void decodeFile(File file) throws Exception { | |||||
String fileStr = FileUtils.readFileToString(file, "UTF-8"); | |||||
if (StringUtil.isEmpty(fileStr)) return; | |||||
List<CanalVO> list = JsonUtil.parseList(fileStr, CanalVO.class); | |||||
int insertSize = 0; | |||||
int updateSize = 0; | |||||
int deleteSize = 0; | |||||
int createSize = 0; | |||||
int alterSize = 0; | |||||
for (CanalVO canal : list) { | |||||
if (canal.createIs()) { | |||||
doCreateSql(canal); | |||||
createSize++; | |||||
continue; | |||||
} | |||||
if (canal.alterIs()) { | |||||
doAlterSql(canal); | |||||
alterSize++; | |||||
continue; | |||||
} | |||||
if (canal.insertIs()) { | |||||
doInsertSql(canal); | |||||
insertSize++; | |||||
continue; | |||||
} | |||||
if (canal.updateIs()) { | |||||
doUpdateSql(canal); | |||||
updateSize++; | |||||
continue; | |||||
} | |||||
if (canal.deleteIs()) { | |||||
doDeleteSql(canal); | |||||
deleteSize++; | |||||
} | |||||
} | |||||
log.debug("[解析canal文件] file:" + file.getName() + " insert size:[" + insertSize + "] update size:[" + updateSize + "] delete size:[" + deleteSize + "] create size:[" + createSize + "] alter size:[" + alterSize + "]"); | |||||
} | |||||
protected abstract String getDbType(); | |||||
protected abstract void doCreateSql(CanalVO canalVO); | |||||
protected abstract void doAlterSql(CanalVO canalVO); | |||||
protected void doInsertSql(CanalVO canalVO) { | |||||
Map<String, String> data = canalVO.getData(); | |||||
Set<String> fields = data.keySet(); | |||||
StringBuilder sql = new StringBuilder(); | |||||
List<Object> args = new ArrayList<>(); | |||||
sql.append("insert into ").append(canalVO.getTableName()).append(" ( "); | |||||
for (String field : fields) { | |||||
sql.append(field).append(","); | |||||
} | |||||
sql.deleteCharAt(sql.length() - 1); | |||||
sql.append(")").append("values").append(" ( "); | |||||
for (String field : fields) { | |||||
sql.append("?").append(","); | |||||
String value = data.get(field); | |||||
if (StringUtils.isEmpty(value)) { | |||||
value = null; | |||||
} else { | |||||
args.add(value); | |||||
} | |||||
} | |||||
sql.deleteCharAt(sql.length() - 1); | |||||
sql.append(")"); | |||||
getDbEngine().update(sql.toString(), args.toArray()); | |||||
} | |||||
protected void doUpdateSql(CanalVO canalVO) { | |||||
Map<String, String> data = canalVO.getData(); | |||||
Set<String> fields = data.keySet(); | |||||
StringBuilder sql = new StringBuilder(); | |||||
List<Object> args = new ArrayList<>(); | |||||
sql.append("update ").append(canalVO.getTableName()).append(" set "); | |||||
for (String field : fields) { | |||||
if (field.equals(canalVO.getIdField())) continue; | |||||
sql.append(field).append("=").append("?").append(","); | |||||
String value = data.get(field); | |||||
if (StringUtils.isEmpty(value)) { | |||||
args.add(null); | |||||
} else { | |||||
args.add(value); | |||||
} | |||||
} | |||||
sql.deleteCharAt(sql.length() - 1); | |||||
sql.append(" where ").append(canalVO.getIdField()).append("=").append(canalVO.getId()); | |||||
getDbEngine().update(sql.toString(), args.toArray()); | |||||
} | |||||
protected void doDeleteSql(CanalVO canalVO) { | |||||
getDbEngine().update("delete from " + canalVO.getTableName() + " where " + canalVO.getIdField() + " = ? ", canalVO.getId()); | |||||
} | |||||
} |
@@ -0,0 +1,22 @@ | |||||
package cc.smtweb.system.canal.file.common; | |||||
import org.springframework.beans.factory.annotation.Value; | |||||
import org.springframework.context.annotation.Configuration; | |||||
/** | |||||
* @Author yaoq | |||||
* @Date 2022年07月20日 10:38 | |||||
* @Description | |||||
*/ | |||||
@Configuration | |||||
public class FileDecodeConfig { | |||||
@Value("${canal.file.path}") | |||||
private String path; | |||||
public String getPath() { | |||||
return path; | |||||
} | |||||
} | |||||
@@ -0,0 +1,47 @@ | |||||
package cc.smtweb.system.canal.file.common; | |||||
import cc.smtweb.framework.core.common.SwEnum; | |||||
import cc.smtweb.framework.core.exception.SwException; | |||||
import cc.smtweb.system.canal.file.impl.DmSqlFileWork; | |||||
import cc.smtweb.system.canal.file.impl.KbSqlFileWork; | |||||
import cc.smtweb.system.canal.file.impl.MySqlFileWork; | |||||
import cc.smtweb.system.canal.file.impl.OracleFileWork; | |||||
import java.util.HashMap; | |||||
import java.util.Map; | |||||
/** | |||||
* @Author yaoq | |||||
* @Date 2022年09月20日 9:26 | |||||
* @Description | |||||
*/ | |||||
public class FileDecodeFactory { | |||||
private volatile static FileDecodeFactory instance = null; | |||||
public static FileDecodeFactory getInstance() { | |||||
if (instance == null) { | |||||
synchronized (FileDecodeFactory.class) { | |||||
if (instance == null) instance = new FileDecodeFactory(); | |||||
} | |||||
} | |||||
return instance; | |||||
} | |||||
private Map<String, Class<? extends AbstractFileWork>> mapWorks; | |||||
private FileDecodeFactory() { | |||||
mapWorks = new HashMap<>(); | |||||
mapWorks.put(SwEnum.DbType.DMSQL.value.toLowerCase(), DmSqlFileWork.class); | |||||
mapWorks.put(SwEnum.DbType.KBSQL.value.toLowerCase(), KbSqlFileWork.class); | |||||
mapWorks.put(SwEnum.DbType.MYSQL.value.toLowerCase(), MySqlFileWork.class); | |||||
mapWorks.put(SwEnum.DbType.ORACLE.value.toLowerCase(), OracleFileWork.class); | |||||
} | |||||
public AbstractFileWork getFileWork(String key) throws Exception { | |||||
Class<? extends AbstractFileWork> cls = mapWorks.get(key.toLowerCase()); | |||||
if (cls == null) throw new SwException("暂不支持[" + key + "]类型数据库文件解析!"); | |||||
return cls.newInstance(); | |||||
} | |||||
} |
@@ -0,0 +1,32 @@ | |||||
package cc.smtweb.system.canal.file.common; | |||||
import cc.smtweb.framework.core.db.DbEngine; | |||||
import cc.smtweb.framework.core.systask.BaseSysService; | |||||
import lombok.extern.slf4j.Slf4j; | |||||
/** | |||||
* Created by Akmm at 2021/12/6 19:19 | |||||
* 清除当前已结束任务, 10分钟一次 | |||||
*/ | |||||
@Slf4j | |||||
public class FileDecodeService extends BaseSysService { | |||||
@Override | |||||
public String getTitle() { | |||||
return "解析canal文件"; | |||||
} | |||||
@Override | |||||
public int getInterval() { | |||||
return 10; | |||||
} | |||||
@Override | |||||
public void work() { | |||||
try { | |||||
AbstractFileWork fileWork = FileDecodeFactory.getInstance().getFileWork(DbEngine.getInstance().getDbType()); | |||||
fileWork.doWork(); | |||||
} catch (Exception e) { | |||||
log.error("解析canal文件失败:", e); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,29 @@ | |||||
package cc.smtweb.system.canal.file.impl; | |||||
import cc.smtweb.framework.core.common.CanalVO; | |||||
import cc.smtweb.framework.core.common.SwEnum; | |||||
import cc.smtweb.system.canal.file.common.AbstractFileWork; | |||||
/** | |||||
* @Author yaoq | |||||
* @Date 2022年09月20日 10:29 | |||||
* @Description | |||||
*/ | |||||
public class DmSqlFileWork extends AbstractFileWork { | |||||
@Override | |||||
protected String getDbType() { | |||||
return SwEnum.DbType.DMSQL.value; | |||||
} | |||||
@Override | |||||
protected void doCreateSql(CanalVO canalVO) { | |||||
} | |||||
@Override | |||||
protected void doAlterSql(CanalVO canalVO) { | |||||
} | |||||
} |
@@ -0,0 +1,28 @@ | |||||
package cc.smtweb.system.canal.file.impl; | |||||
import cc.smtweb.framework.core.common.CanalVO; | |||||
import cc.smtweb.framework.core.common.SwEnum; | |||||
import cc.smtweb.system.canal.file.common.AbstractFileWork; | |||||
/** | |||||
* @Author yaoq | |||||
* @Date 2022年09月20日 10:29 | |||||
* @Description | |||||
*/ | |||||
public class KbSqlFileWork extends AbstractFileWork { | |||||
@Override | |||||
protected String getDbType() { | |||||
return SwEnum.DbType.KBSQL.value; | |||||
} | |||||
@Override | |||||
protected void doCreateSql(CanalVO canalVO) { | |||||
} | |||||
@Override | |||||
protected void doAlterSql(CanalVO canalVO) { | |||||
} | |||||
} |
@@ -0,0 +1,87 @@ | |||||
package cc.smtweb.system.canal.file.impl; | |||||
import cc.smtweb.framework.core.common.CanalVO; | |||||
import cc.smtweb.framework.core.common.SwEnum; | |||||
import cc.smtweb.system.canal.file.common.AbstractFileWork; | |||||
import org.apache.commons.lang.StringUtils; | |||||
import java.util.ArrayList; | |||||
import java.util.List; | |||||
import java.util.Map; | |||||
import java.util.Set; | |||||
/** | |||||
* @Author yaoq | |||||
* @Date 2022年09月20日 10:29 | |||||
* @Description | |||||
*/ | |||||
public class MySqlFileWork extends AbstractFileWork { | |||||
@Override | |||||
protected String getDbType() { | |||||
return SwEnum.DbType.MYSQL.value; | |||||
} | |||||
@Override | |||||
protected void doCreateSql(CanalVO canalVO) { | |||||
getDbEngine().update(canalVO.getSql()); | |||||
} | |||||
@Override | |||||
protected void doAlterSql(CanalVO canalVO) { | |||||
getDbEngine().update(canalVO.getSql()); | |||||
} | |||||
@Override | |||||
protected void doInsertSql(CanalVO canalVO) { | |||||
Map<String, String> data = canalVO.getData(); | |||||
Set<String> fields = data.keySet(); | |||||
StringBuilder sql = new StringBuilder(); | |||||
List<Object> args = new ArrayList<>(); | |||||
sql.append("insert into ").append(canalVO.getSchemaName()).append(".").append(canalVO.getTableName()).append(" ( "); | |||||
for (String field : fields) { | |||||
sql.append(field).append(","); | |||||
} | |||||
sql.deleteCharAt(sql.length() - 1); | |||||
sql.append(")").append("values").append(" ( "); | |||||
for (String field : fields) { | |||||
sql.append("?").append(","); | |||||
String value = data.get(field); | |||||
if (StringUtils.isEmpty(value)) { | |||||
args.add(null); | |||||
} else { | |||||
args.add(value); | |||||
} | |||||
} | |||||
sql.deleteCharAt(sql.length() - 1); | |||||
sql.append(")"); | |||||
getDbEngine().update(sql.toString(), args.toArray()); | |||||
} | |||||
@Override | |||||
protected void doUpdateSql(CanalVO canalVO) { | |||||
Map<String, String> data = canalVO.getData(); | |||||
Set<String> fields = data.keySet(); | |||||
StringBuilder sql = new StringBuilder(); | |||||
List<Object> args = new ArrayList<>(); | |||||
sql.append("update ").append(canalVO.getSchemaName()).append(".").append(canalVO.getTableName()).append(" set "); | |||||
for (String field : fields) { | |||||
if (field.equals(canalVO.getIdField())) continue; | |||||
sql.append(field).append("=").append("?").append(","); | |||||
String value = data.get(field); | |||||
if (StringUtils.isEmpty(value)) { | |||||
args.add(null); | |||||
} else { | |||||
args.add(value); | |||||
} | |||||
} | |||||
sql.deleteCharAt(sql.length() - 1); | |||||
sql.append(" where ").append(canalVO.getIdField()).append("=").append(canalVO.getId()); | |||||
getDbEngine().update(sql.toString(), args.toArray()); | |||||
} | |||||
@Override | |||||
protected void doDeleteSql(CanalVO canalVO) { | |||||
getDbEngine().update("delete from " + canalVO.getSchemaName() + "." + canalVO.getTableName() + " where " + canalVO.getIdField() + " = ? ", canalVO.getId()); | |||||
} | |||||
} |
@@ -0,0 +1,29 @@ | |||||
package cc.smtweb.system.canal.file.impl; | |||||
import cc.smtweb.framework.core.common.CanalVO; | |||||
import cc.smtweb.framework.core.common.SwEnum; | |||||
import cc.smtweb.system.canal.file.common.AbstractFileWork; | |||||
/** | |||||
* @Author yaoq | |||||
* @Date 2022年09月20日 10:29 | |||||
* @Description | |||||
*/ | |||||
public class OracleFileWork extends AbstractFileWork { | |||||
@Override | |||||
protected String getDbType() { | |||||
return SwEnum.DbType.ORACLE.value; | |||||
} | |||||
@Override | |||||
protected void doCreateSql(CanalVO canalVO) { | |||||
} | |||||
@Override | |||||
protected void doAlterSql(CanalVO canalVO) { | |||||
} | |||||
} |
@@ -1,2 +1,2 @@ | |||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ | org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ | ||||
cc.smtweb.system.canal.file.FileConfiguration | |||||
cc.smtweb.system.canal.file.CanalFileConfiguration |
@@ -1,6 +1,6 @@ | |||||
smtweb: | smtweb: | ||||
machine-id: 1 | machine-id: 1 | ||||
enable-job: false | |||||
enable-job: true | |||||
bpm: | bpm: | ||||
mode: 1 | mode: 1 | ||||
debug: true | debug: true | ||||
@@ -11,6 +11,9 @@ smtweb: | |||||
rule: | rule: | ||||
prefix: _smt_ | prefix: _smt_ | ||||
replace: smt_ | replace: smt_ | ||||
file: | |||||
local-path: | |||||
url: | |||||
# 服务模块 | # 服务模块 | ||||
devtools: | devtools: | ||||
restart: | restart: | ||||
@@ -32,9 +35,9 @@ spring: | |||||
datasource: | datasource: | ||||
driver-class-name: com.mysql.cj.jdbc.Driver | driver-class-name: com.mysql.cj.jdbc.Driver | ||||
# url: jdbc:mysql://139.9.38.43:6032/smt?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false | # url: jdbc:mysql://139.9.38.43:6032/smt?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false | ||||
url: jdbc:mysql://172.28.123.145:4418/smt?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false | |||||
url: jdbc:mysql://127.0.0.1:3306/tzrs?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false | |||||
username: root | username: root | ||||
password: Ncmz@2022_jjkj | |||||
password: root | |||||
servlet: | servlet: | ||||
multipart: | multipart: | ||||
max-file-size: 104857600000 | max-file-size: 104857600000 | ||||
@@ -50,6 +53,7 @@ spring: | |||||
# canal配置 | # canal配置 | ||||
canal: | canal: | ||||
file: | file: | ||||
path: E:/canalFile | |||||
# http 规则配置 | # http 规则配置 | ||||
http-config: | http-config: | ||||
@@ -107,8 +107,8 @@ | |||||
<orderEntry type="library" scope="RUNTIME" name="Maven: io.prometheus:simpleclient_httpserver:0.12.0" level="project" /> | <orderEntry type="library" scope="RUNTIME" name="Maven: io.prometheus:simpleclient_httpserver:0.12.0" level="project" /> | ||||
<orderEntry type="library" scope="RUNTIME" name="Maven: io.prometheus:simpleclient_common:0.12.0" level="project" /> | <orderEntry type="library" scope="RUNTIME" name="Maven: io.prometheus:simpleclient_common:0.12.0" level="project" /> | ||||
<orderEntry type="library" scope="RUNTIME" name="Maven: io.prometheus:simpleclient_pushgateway:0.12.0" level="project" /> | <orderEntry type="library" scope="RUNTIME" name="Maven: io.prometheus:simpleclient_pushgateway:0.12.0" level="project" /> | ||||
<orderEntry type="library" name="Maven: javax.xml.bind:jaxb-api:2.3.1" level="project" /> | |||||
<orderEntry type="library" name="Maven: javax.activation:javax.activation-api:1.2.0" level="project" /> | |||||
<orderEntry type="library" scope="RUNTIME" name="Maven: javax.xml.bind:jaxb-api:2.3.1" level="project" /> | |||||
<orderEntry type="library" scope="RUNTIME" name="Maven: javax.activation:javax.activation-api:1.2.0" level="project" /> | |||||
<orderEntry type="library" scope="PROVIDED" name="Maven: com.alibaba.otter:connector.kafka:jar-with-dependencies:1.1.5" level="project" /> | <orderEntry type="library" scope="PROVIDED" name="Maven: com.alibaba.otter:connector.kafka:jar-with-dependencies:1.1.5" level="project" /> | ||||
<orderEntry type="library" scope="PROVIDED" name="Maven: com.alibaba.otter:connector.rocketmq:jar-with-dependencies:1.1.5" level="project" /> | <orderEntry type="library" scope="PROVIDED" name="Maven: com.alibaba.otter:connector.rocketmq:jar-with-dependencies:1.1.5" level="project" /> | ||||
<orderEntry type="library" scope="PROVIDED" name="Maven: com.alibaba.otter:connector.rabbitmq:jar-with-dependencies:1.1.5" level="project" /> | <orderEntry type="library" scope="PROVIDED" name="Maven: com.alibaba.otter:connector.rabbitmq:jar-with-dependencies:1.1.5" level="project" /> | ||||
@@ -126,6 +126,8 @@ | |||||
<orderEntry type="library" name="Maven: org.yaml:snakeyaml:1.29" level="project" /> | <orderEntry type="library" name="Maven: org.yaml:snakeyaml:1.29" level="project" /> | ||||
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter-web:2.6.9" level="project" /> | <orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter-web:2.6.9" level="project" /> | ||||
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter-json:2.6.9" level="project" /> | <orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter-json:2.6.9" level="project" /> | ||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.13.3" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.13.3" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.13.3" level="project" /> | <orderEntry type="library" name="Maven: com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.13.3" level="project" /> | ||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.3" level="project" /> | <orderEntry type="library" name="Maven: com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.3" level="project" /> | ||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.module:jackson-module-parameter-names:2.13.3" level="project" /> | <orderEntry type="library" name="Maven: com.fasterxml.jackson.module:jackson-module-parameter-names:2.13.3" level="project" /> | ||||
@@ -167,34 +169,11 @@ | |||||
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.2" level="project" /> | <orderEntry type="library" name="Maven: commons-logging:commons-logging:1.2" level="project" /> | ||||
<orderEntry type="library" name="Maven: org.apache.tika:tika-core:2.1.0" level="project" /> | <orderEntry type="library" name="Maven: org.apache.tika:tika-core:2.1.0" level="project" /> | ||||
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.36" level="project" /> | <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.36" level="project" /> | ||||
<orderEntry type="library" name="Maven: commons-io:commons-io:2.11.0" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.github.ben-manes.caffeine:caffeine:2.9.3" level="project" /> | <orderEntry type="library" name="Maven: com.github.ben-manes.caffeine:caffeine:2.9.3" level="project" /> | ||||
<orderEntry type="library" name="Maven: org.checkerframework:checker-qual:3.19.0" level="project" /> | <orderEntry type="library" name="Maven: org.checkerframework:checker-qual:3.19.0" level="project" /> | ||||
<orderEntry type="library" name="Maven: com.google.errorprone:error_prone_annotations:2.10.0" level="project" /> | <orderEntry type="library" name="Maven: com.google.errorprone:error_prone_annotations:2.10.0" level="project" /> | ||||
<orderEntry type="library" name="Maven: mysql:mysql-connector-java:8.0.29" level="project" /> | <orderEntry type="library" name="Maven: mysql:mysql-connector-java:8.0.29" level="project" /> | ||||
<orderEntry type="library" name="Maven: commons-collections:commons-collections:3.2.2" level="project" /> | <orderEntry type="library" name="Maven: commons-collections:commons-collections:3.2.2" level="project" /> | ||||
<orderEntry type="module" module-name="sw-system-bpm" /> | |||||
<orderEntry type="library" name="Maven: org.springframework.boot:spring-boot-starter-freemarker:2.6.9" level="project" /> | |||||
<orderEntry type="library" name="Maven: org.freemarker:freemarker:2.3.31" level="project" /> | |||||
<orderEntry type="library" name="Maven: org.springframework:spring-context-support:5.3.21" level="project" /> | |||||
<orderEntry type="library" name="Maven: org.quartz-scheduler:quartz:2.3.2" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.mchange:mchange-commons-java:0.2.15" level="project" /> | |||||
<orderEntry type="library" name="Maven: net.coobird:thumbnailator:0.4.17" level="project" /> | |||||
<orderEntry type="library" name="Maven: org.jclarion:image4j:0.7" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.3" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.13.3" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.13.3" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.13.3" level="project" /> | |||||
<orderEntry type="library" name="Maven: org.codehaus.woodstox:stax2-api:4.2.1" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.fasterxml.woodstox:woodstox-core:6.2.7" level="project" /> | |||||
<orderEntry type="library" name="Maven: org.apache.velocity:velocity-engine-core:2.3" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.aliyun:aliyun-java-sdk-core:4.0.6" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.google.code.gson:gson:2.8.9" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.sun.xml.bind:jaxb-core:2.1.14" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.sun.xml.bind:jaxb-impl:2.1" level="project" /> | |||||
<orderEntry type="library" name="Maven: javax.activation:activation:1.1.1" level="project" /> | |||||
<orderEntry type="library" name="Maven: commons-net:commons-net:3.6" level="project" /> | |||||
<orderEntry type="library" name="Maven: com.jcraft:jsch:0.1.53" level="project" /> | |||||
<orderEntry type="library" name="Maven: commons-fileupload:commons-fileupload:1.4" level="project" /> | |||||
<orderEntry type="library" name="Maven: commons-io:commons-io:2.11.0" level="project" /> | |||||
</component> | </component> | ||||
</module> | </module> |
@@ -73,13 +73,6 @@ | |||||
<artifactId>sw-framework-core</artifactId> | <artifactId>sw-framework-core</artifactId> | ||||
<version>3.1.0-SNAPSHOT</version> | <version>3.1.0-SNAPSHOT</version> | ||||
</dependency> | </dependency> | ||||
<dependency> | |||||
<groupId>cc.smtweb</groupId> | |||||
<artifactId>sw-system-bpm</artifactId> | |||||
<version>3.1.0-SNAPSHOT</version> | |||||
</dependency> | |||||
</dependencies> | </dependencies> | ||||
<build> | <build> | ||||
@@ -37,7 +37,7 @@ canal.instance.enableDruid=false | |||||
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== | #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== | ||||
# table regex | # table regex | ||||
canal.instance.filter.regex=scmz\\..* | |||||
canal.instance.filter.regex=tzrs\\..* | |||||
# table black regex | # table black regex | ||||
canal.instance.filter.black.regex=mysql\\.slave_.* | canal.instance.filter.black.regex=mysql\\.slave_.* | ||||
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) | # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) | ||||
@@ -0,0 +1,106 @@ | |||||
package cc.smtweb.framework.core.common; | |||||
import java.util.Map; | |||||
/** | |||||
* @Author yaoq | |||||
* @Date 2022年09月13日 18:20 | |||||
* @Description | |||||
*/ | |||||
public class CanalVO { | |||||
private String tableName; // 表名 | |||||
private String schemaName; // 库名 | |||||
private String type; // 类型(更新、删除、插入) | |||||
private Map<String, String> data; // 数据JSON 自己转对应表格实体类 | |||||
private String idField; // 更新或删除都是根据ID来 | |||||
private String id; // 更新或删除都是根据ID来 | |||||
private String sql; // 执行的sql | |||||
public static CanalVO ok(String schemaName, String tableName, String type, Map<String, String> data, String sql, String idField, String id) { | |||||
CanalVO canalVO = new CanalVO(); | |||||
canalVO.setId(id); | |||||
canalVO.setSchemaName(schemaName); | |||||
canalVO.setTableName(tableName); | |||||
canalVO.setType(type); | |||||
canalVO.setIdField(idField); | |||||
canalVO.setData(data); | |||||
canalVO.setSql(sql); | |||||
return canalVO; | |||||
} | |||||
public String getTableName() { | |||||
return tableName; | |||||
} | |||||
public void setTableName(String tableName) { | |||||
this.tableName = tableName; | |||||
} | |||||
public String getSchemaName() { | |||||
return schemaName; | |||||
} | |||||
public void setSchemaName(String schemaName) { | |||||
this.schemaName = schemaName; | |||||
} | |||||
public String getType() { | |||||
return type; | |||||
} | |||||
public void setType(String type) { | |||||
this.type = type; | |||||
} | |||||
public Map<String, String> getData() { | |||||
return data; | |||||
} | |||||
public void setData(Map<String, String> data) { | |||||
this.data = data; | |||||
} | |||||
public String getIdField() { | |||||
return idField; | |||||
} | |||||
public void setIdField(String idField) { | |||||
this.idField = idField; | |||||
} | |||||
public String getId() { | |||||
return id; | |||||
} | |||||
public void setId(String id) { | |||||
this.id = id; | |||||
} | |||||
public String getSql() { | |||||
return sql; | |||||
} | |||||
public void setSql(String sql) { | |||||
this.sql = sql; | |||||
} | |||||
public boolean createIs() { | |||||
return SwEnum.DbEventType.CREATE.value.equals(type); | |||||
} | |||||
public boolean alterIs() { | |||||
return SwEnum.DbEventType.ALTER.value.equals(type); | |||||
} | |||||
public boolean insertIs() { | |||||
return SwEnum.DbEventType.INSERT.value.equals(type); | |||||
} | |||||
public boolean updateIs() { | |||||
return SwEnum.DbEventType.UPDATE.value.equals(type); | |||||
} | |||||
public boolean deleteIs() { | |||||
return SwEnum.DbEventType.DELETE.value.equals(type); | |||||
} | |||||
} |
@@ -384,6 +384,7 @@ public interface SwEnum { | |||||
public static IntEnumBean LOCK = instance.addEnum(2, "已锁定"); | public static IntEnumBean LOCK = instance.addEnum(2, "已锁定"); | ||||
public static IntEnumBean STOP = instance.addEnum(9, "已停用"); | public static IntEnumBean STOP = instance.addEnum(9, "已停用"); | ||||
} | } | ||||
// 用户等级 | // 用户等级 | ||||
class UserLevel extends IntEnum { | class UserLevel extends IntEnum { | ||||
public static UserLevel instance = new UserLevel(); | public static UserLevel instance = new UserLevel(); | ||||
@@ -391,4 +392,25 @@ public interface SwEnum { | |||||
public static IntEnumBean outer = instance.addEnum(102101, "外部用户"); | public static IntEnumBean outer = instance.addEnum(102101, "外部用户"); | ||||
public static IntEnumBean pub = instance.addEnum(102102, "公众用户"); | public static IntEnumBean pub = instance.addEnum(102102, "公众用户"); | ||||
} | } | ||||
// 数据库操作类型 | |||||
class DbEventType extends StrEnum { | |||||
public static DbEventType instance = new DbEventType(); | |||||
public static StrEnumBean CREATE = instance.addEnum("CREATE", "创建表"); | |||||
public static StrEnumBean ALTER = instance.addEnum("ALTER", "修改表"); | |||||
public static StrEnumBean INSERT = instance.addEnum("INSERT", "新增"); | |||||
public static StrEnumBean UPDATE = instance.addEnum("UPDATE", "修改"); | |||||
public static StrEnumBean DELETE = instance.addEnum("DELETE", "删除"); | |||||
} | |||||
/** | |||||
* 字段编辑类型 | |||||
*/ | |||||
class DbType extends StrEnum { | |||||
public static DbType instance = new DbType(); | |||||
public static StrEnumBean MYSQL = instance.addEnum("mysql", "MYSQL"); | |||||
public static StrEnumBean ORACLE = instance.addEnum("oracle", "ORACLE"); | |||||
public static StrEnumBean KBSQL = instance.addEnum("kbsql", "人大金仓V8"); | |||||
public static StrEnumBean DMSQL = instance.addEnum("dmsql", "达梦数据库"); | |||||
} | |||||
} | } |
@@ -1,5 +1,6 @@ | |||||
package cc.smtweb.framework.core.db.jdbc; | package cc.smtweb.framework.core.db.jdbc; | ||||
import cc.smtweb.framework.core.common.SwEnum; | |||||
import cc.smtweb.framework.core.common.SwMap; | import cc.smtweb.framework.core.common.SwMap; | ||||
import cc.smtweb.framework.core.db.impl.BaseBean; | import cc.smtweb.framework.core.db.impl.BaseBean; | ||||
import cc.smtweb.framework.core.exception.DbException; | import cc.smtweb.framework.core.exception.DbException; | ||||
@@ -25,7 +26,6 @@ import java.util.function.Function; | |||||
* JDBC访问类,包装了spring jdbcTemplate对象 | * JDBC访问类,包装了spring jdbcTemplate对象 | ||||
*/ | */ | ||||
public class JdbcEngine { | public class JdbcEngine { | ||||
private final static String DB_TYPE_MYSQL = "mysql"; | |||||
private Map<Thread, JdbcTrans> mapThreadTrans = new ConcurrentHashMap<>(); | private Map<Thread, JdbcTrans> mapThreadTrans = new ConcurrentHashMap<>(); | ||||
private JdbcTemplate jdbcTemplate; | private JdbcTemplate jdbcTemplate; | ||||
@@ -44,7 +44,11 @@ public class JdbcEngine { | |||||
} | } | ||||
public boolean isMysql() { | public boolean isMysql() { | ||||
return DB_TYPE_MYSQL.equalsIgnoreCase(type); | |||||
return SwEnum.DbType.MYSQL.value.equalsIgnoreCase(type); | |||||
} | |||||
public String getDbType() { | |||||
return type; | |||||
} | } | ||||
/** | /** | ||||
@@ -97,4 +97,23 @@ public class FileUtil { | |||||
png_base64 = png_base64.replaceAll("\n", "").replaceAll("\r", "");//删除 \r\n | png_base64 = png_base64.replaceAll("\n", "").replaceAll("\r", "");//删除 \r\n | ||||
return "data:image/png;base64," + png_base64; | return "data:image/png;base64," + png_base64; | ||||
} | } | ||||
public static boolean copyFile(File oldfile, String newPath, boolean isDelOld) { | |||||
if (!oldfile.exists()) return true; | |||||
int byteread; | |||||
try { | |||||
File f = new File(newPath); | |||||
f.getParentFile().mkdirs(); | |||||
try (FileOutputStream fs = new FileOutputStream(newPath); InputStream inStream = new FileInputStream(oldfile)) { | |||||
byte[] buffer = new byte[1444]; | |||||
while ((byteread = inStream.read(buffer)) != -1) { | |||||
fs.write(buffer, 0, byteread); | |||||
} | |||||
} | |||||
if (isDelOld) oldfile.delete(); | |||||
return true; | |||||
} catch (Exception e) { | |||||
return false; | |||||
} | |||||
} | |||||
} | } |