diff --git a/smtweb-framework/canal/client/canal.example.iml b/smtweb-framework/canal/client/canal.example.iml
index c02bc28..4ee90e0 100644
--- a/smtweb-framework/canal/client/canal.example.iml
+++ b/smtweb-framework/canal/client/canal.example.iml
@@ -92,8 +92,7 @@
-
-
+
@@ -102,7 +101,11 @@
+
+
+
+
@@ -112,7 +115,7 @@
-
+
@@ -131,32 +134,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/smtweb-framework/canal/client/pom.xml b/smtweb-framework/canal/client/pom.xml
index 2d374e9..b4af862 100644
--- a/smtweb-framework/canal/client/pom.xml
+++ b/smtweb-framework/canal/client/pom.xml
@@ -30,7 +30,7 @@
cc.smtweb
- sw-system-bpm
+ sw-framework-core
3.1.0-SNAPSHOT
diff --git a/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientInstance.java b/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientInstance.java
index cc19818..e2fccd4 100644
--- a/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientInstance.java
+++ b/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientInstance.java
@@ -1,5 +1,7 @@
package cc.smtweb.system.canal.example;
+import cc.smtweb.framework.core.common.CanalVO;
+import cc.smtweb.framework.core.common.SwEnum;
import cc.smtweb.framework.core.util.CommUtil;
import cc.smtweb.framework.core.util.DateUtil;
import cc.smtweb.framework.core.util.JsonUtil;
@@ -17,8 +19,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.util.*;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
/**
* @Author yaoq
@@ -184,18 +184,22 @@ public class ClientInstance {
* @param entrys
*/
private void dataHandle(List entrys) throws Exception {
- List data = new ArrayList<>();
+ List data = new ArrayList<>();
for (CanalEntry.Entry entry : entrys) {
if (CanalEntry.EntryType.ROWDATA != entry.getEntryType()) continue;
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChange.getEventType();
- ClientVO row = null;
- if (eventType == CanalEntry.EventType.DELETE) {
- row = saveDeleteSql(entry);
+ CanalVO row = null;
+ if (eventType == CanalEntry.EventType.CREATE) {
+ row = saveCreateSql(entry, rowChange);
+ } else if (eventType == CanalEntry.EventType.ALTER) {
+ row = saveAlterSql(entry, rowChange);
+ } else if (eventType == CanalEntry.EventType.DELETE) {
+ row = saveDeleteSql(entry, rowChange);
} else if (eventType == CanalEntry.EventType.UPDATE) {
- row = saveUpdateSql(entry);
+ row = saveUpdateSql(entry, rowChange);
} else if (eventType == CanalEntry.EventType.INSERT) {
- row = saveInsertSql(entry);
+ row = saveInsertSql(entry, rowChange);
}
if (row == null) continue;
data.add(row);
@@ -206,49 +210,30 @@ public class ClientInstance {
saveToFile(data);
}
+
/**
- * 保存更新语句
+ * 保存数据表新增语句
*
* @param entry
*/
- private ClientVO saveUpdateSql(CanalEntry.Entry entry) {
+ private CanalVO saveCreateSql(CanalEntry.Entry entry, CanalEntry.RowChange rowChange) {
try {
- CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
- List rowDatasList = rowChange.getRowDatasList();
- for (CanalEntry.RowData rowData : rowDatasList) {
- List newColumnList = rowData.getAfterColumnsList();
- Map data = new HashMap<>();
- newColumnList.stream().forEach(item -> {
- if (StringUtils.isNotEmpty(item.getValue())) {
- data.put(lineToHump(item.getName()), item.getValue());
- }
- });
- List oldColumnList = rowData.getBeforeColumnsList();
- ClientVO update = ClientVO.ok(entry.getHeader().getTableName(), "UPDATE", data, oldColumnList.get(0).getValue());
- System.out.println("更新返回 : " + JsonUtil.encodeString(update));
- return update;
- }
+ return CanalVO.ok(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), SwEnum.DbEventType.CREATE.value, null, rowChange.getSql(), null, null);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
+
/**
- * 保存删除语句
+ * 保存结构变更语句
*
* @param entry
*/
- private ClientVO saveDeleteSql(CanalEntry.Entry entry) {
+ private CanalVO saveAlterSql(CanalEntry.Entry entry, CanalEntry.RowChange rowChange) {
try {
- CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
- List rowDatasList = rowChange.getRowDatasList();
- for (CanalEntry.RowData rowData : rowDatasList) {
- List oldColumnList = rowData.getBeforeColumnsList();
- ClientVO delete = ClientVO.ok(entry.getHeader().getTableName(), "DELETE", null, oldColumnList.get(0).getValue());
- System.out.println("删除返回 : " + JsonUtil.encodeString(delete));
- return delete;
- }
+ return CanalVO.ok(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), SwEnum.DbEventType.ALTER.value, null, rowChange.getSql(), null, null);
} catch (Exception e) {
e.printStackTrace();
}
@@ -260,21 +245,16 @@ public class ClientInstance {
*
* @param entry
*/
- private ClientVO saveInsertSql(CanalEntry.Entry entry) {
+ private CanalVO saveInsertSql(CanalEntry.Entry entry, CanalEntry.RowChange rowChange) {
try {
- CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
List rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List columnList = rowData.getAfterColumnsList();
Map data = new HashMap<>();
columnList.stream().forEach(item -> {
- if (StringUtils.isNotEmpty(item.getValue())) {
- data.put(lineToHump(item.getName()), item.getValue());
- }
+ data.put(item.getName(), item.getValue());
});
- ClientVO insert = ClientVO.ok(entry.getHeader().getTableName(), "INSERT", data, null);
- System.out.println("插入返回 : " + JsonUtil.encodeString(insert));
- return insert;
+ return CanalVO.ok(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), SwEnum.DbEventType.INSERT.value, data, null, null, null);
}
} catch (Exception e) {
e.printStackTrace();
@@ -283,36 +263,58 @@ public class ClientInstance {
}
/**
- * 下划线转驼峰
+ * 保存更新语句
*
- * @param str
+ * @param entry
*/
- private static Pattern linePattern = Pattern.compile("_(\\w)");
-
- public static String lineToHump(String str) {
- str = str.toLowerCase();
- Matcher matcher = linePattern.matcher(str);
- StringBuffer sb = new StringBuffer();
- while (matcher.find()) {
- matcher.appendReplacement(sb, matcher.group(1).toUpperCase());
+ private CanalVO saveUpdateSql(CanalEntry.Entry entry, CanalEntry.RowChange rowChange) {
+ try {
+ List rowDatasList = rowChange.getRowDatasList();
+ for (CanalEntry.RowData rowData : rowDatasList) {
+ List newColumnList = rowData.getAfterColumnsList();
+ Map data = new HashMap<>();
+ newColumnList.stream().forEach(item -> {
+ data.put(item.getName(), item.getValue());
+ });
+ List oldColumnList = rowData.getBeforeColumnsList();
+ return CanalVO.ok(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), SwEnum.DbEventType.UPDATE.value, data, null, oldColumnList.get(0).getName(), oldColumnList.get(0).getValue());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
}
- matcher.appendTail(sb);
- return sb.toString();
+ return null;
}
+ /**
+ * 保存删除语句
+ *
+ * @param entry
+ */
+ private CanalVO saveDeleteSql(CanalEntry.Entry entry, CanalEntry.RowChange rowChange) {
+ try {
+ List rowDatasList = rowChange.getRowDatasList();
+ for (CanalEntry.RowData rowData : rowDatasList) {
+ List oldColumnList = rowData.getBeforeColumnsList();
+ return CanalVO.ok(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), SwEnum.DbEventType.DELETE.value, null, null, oldColumnList.get(0).getName(), oldColumnList.get(0).getValue());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
private static synchronized File createFile(String basePath) throws Exception {
String path = basePath;
if (!path.endsWith("/")) {
path = path + "/";
}
- File file = new File(path + DateUtil.getNowYm() + "/" + DateUtil.nowDateLong() + "/" + DateUtil.toLongDateTimeString(null) + ".cf");
+ File file = new File(path + "/" + DateUtil.toLongDateTimeString(null) + ".cf");
if (!file.getParentFile().exists()) { // 如果父目录不存在,创建父目录
file.getParentFile().mkdirs();
}
while (file.exists()) {
Thread.sleep(1);
- file = new File(path + DateUtil.getNowYm() + "/" + DateUtil.nowDateLong() + "/" + DateUtil.toLongDateTimeString(null) + ".cf");
+ file = new File(path + "/" + DateUtil.toLongDateTimeString(null) + ".cf");
/*if (file.exists()) {
return file;
}*/
@@ -322,7 +324,7 @@ public class ClientInstance {
return file;
}
- private void saveToFile(List rows) throws Exception {
+ private void saveToFile(List rows) throws Exception {
// 将格式化后的字符串写入文件
FileUtils.writeStringToFile(createFile(ClientConsts.getFilepath(properties)), JsonUtil.encodeString(rows), "UTF-8");
}
diff --git a/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientVO.java b/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientVO.java
deleted file mode 100644
index d55abec..0000000
--- a/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientVO.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package cc.smtweb.system.canal.example;
-
-import java.util.Map;
-
-/**
- * @Author yaoq
- * @Date 2022年09月13日 18:20
- * @Description
- */
-public class ClientVO {
- private String tableName; // 表名
- private String type; // 类型(更新、删除、插入)
- private Map data; // 数据JSON 自己转对应表格实体类
- private String id; // 更新或删除都是根据ID来
-
- public static ClientVO ok(String tableName, String type, Map data, String id){
- ClientVO canalVO=new ClientVO();
- canalVO.setId(id);
- canalVO.setTableName(tableName);
- canalVO.setType(type);
- canalVO.setData(data);
- return canalVO;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public Map getData() {
- return data;
- }
-
- public void setData(Map data) {
- this.data = data;
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-}
diff --git a/smtweb-framework/canal/client/src/main/resources/client.properties b/smtweb-framework/canal/client/src/main/resources/client.properties
index d49f6c9..fc57cf1 100644
--- a/smtweb-framework/canal/client/src/main/resources/client.properties
+++ b/smtweb-framework/canal/client/src/main/resources/client.properties
@@ -9,14 +9,9 @@ canal.server.port=11111
canal.server.username=
canal.server.password=
# 数据库匹配规则.*\\..*, scmz\\..*
-canal.server.filter=scmz\\..*
+canal.server.filter=tzrs\\..*
# 文件存放路径 E:/canalFile
-canal.file.path=
+canal.file.path=E:/canalFile
# mysql、dmsql、kbsql、oracle
-canal.db.type=kbsql
-canal.db.driverName=com.kingbase8.Driver
-canal.db.jdbcUrl=jdbc:kingbase8://172.28.123.205:54321/HLJTY?useUnicode=true&characterEncoding=utf-8
-canal.db.username=system
-canal.db.password=system
diff --git a/smtweb-framework/canal/file/pom.xml b/smtweb-framework/canal/file/pom.xml
index cc63eac..be23a69 100644
--- a/smtweb-framework/canal/file/pom.xml
+++ b/smtweb-framework/canal/file/pom.xml
@@ -39,12 +39,6 @@
cc.smtweb
- sw-system-bpm
- 3.1.0-SNAPSHOT
-
-
-
- cc.smtweb
sw-framework-core
3.1.0-SNAPSHOT
@@ -73,13 +67,6 @@
dmsql
1.0.0
-
- cc.smtweb
- canal.deployer
- 1.1.5
- compile
-
-
diff --git a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/FileApplication.java b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/CanalFileApplication.java
similarity index 70%
rename from smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/FileApplication.java
rename to smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/CanalFileApplication.java
index b93cdea..19d91bd 100644
--- a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/FileApplication.java
+++ b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/CanalFileApplication.java
@@ -4,10 +4,10 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
-public class FileApplication {
+public class CanalFileApplication {
public static void main(String[] args) {
- SpringApplication.run(FileApplication.class, args);
+ SpringApplication.run(CanalFileApplication.class, args);
}
}
diff --git a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/FileConfiguration.java b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/CanalFileConfiguration.java
similarity index 84%
rename from smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/FileConfiguration.java
rename to smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/CanalFileConfiguration.java
index 5cf4db8..7c3fdfb 100644
--- a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/FileConfiguration.java
+++ b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/CanalFileConfiguration.java
@@ -10,12 +10,12 @@ import org.springframework.context.annotation.Configuration;
*/
@Configuration
@ComponentScan
-public class FileConfiguration {
+public class CanalFileConfiguration {
/**
* 配置自定义service扫描路径 {module}/{service}/{method}
*/
@Bean
- public ControllerConfig canalConfiguration() {
+ public ControllerConfig CanalFileConfiguration() {
return new ControllerConfig("canalFile", "cc.smtweb.system.canal.file", null);
}
}
diff --git a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/FileStartedListener.java b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/CanalFileStartedListener.java
similarity index 52%
rename from smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/FileStartedListener.java
rename to smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/CanalFileStartedListener.java
index 1410876..80c52bc 100644
--- a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/FileStartedListener.java
+++ b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/CanalFileStartedListener.java
@@ -3,9 +3,8 @@ package cc.smtweb.system.canal.file;
import cc.smtweb.framework.core.annotation.SwStartListener;
import cc.smtweb.framework.core.common.SwConsts;
import cc.smtweb.framework.core.mvc.controller.IStartListener;
-import cc.smtweb.framework.core.systask.SysThreadPool;
-import cc.smtweb.framework.core.systask.SysThreadWorker;
-import cc.smtweb.system.canal.deployer.CanalLauncher;
+import cc.smtweb.framework.core.systask.SysServiceFactory;
+import cc.smtweb.system.canal.file.common.FileDecodeService;
import lombok.extern.slf4j.Slf4j;
/**
@@ -15,7 +14,7 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
@SwStartListener
-public class FileStartedListener implements IStartListener {
+public class CanalFileStartedListener implements IStartListener {
@Override
public int order() {
@@ -26,16 +25,12 @@ public class FileStartedListener implements IStartListener {
@Override
public void init() {
SwConsts.SysParam.enableCanal = true;
+ SwConsts.SysParam.RUN_PROJECTS = "";
+ SysServiceFactory.getInstance().reg(new FileDecodeService());
}
@Override
public void run() {
- if (!SwConsts.SysParam.enableCanal) return;
- SysThreadPool.getInstance().addTask(new SysThreadWorker("canal server") {
- @Override
- public void localWork() throws Exception {
- CanalLauncher.startServer();
- }
- });
+
}
}
diff --git a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/common/AbstractFileWork.java b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/common/AbstractFileWork.java
new file mode 100644
index 0000000..35fbe83
--- /dev/null
+++ b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/common/AbstractFileWork.java
@@ -0,0 +1,173 @@
+package cc.smtweb.system.canal.file.common;
+
+import cc.smtweb.framework.core.common.CanalVO;
+import cc.smtweb.framework.core.db.DbEngine;
+import cc.smtweb.framework.core.db.jdbc.IDbWorker;
+import cc.smtweb.framework.core.exception.SwException;
+import cc.smtweb.framework.core.util.*;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.springframework.stereotype.Component;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.util.*;
+
+@Slf4j
+@Component
+public abstract class AbstractFileWork {
+ private final static String suffix = ".cf";
+
+ protected FileDecodeConfig fileConfig;
+
+ protected DbEngine getDbEngine() {
+ return DbEngine.getInstance();
+ }
+
+
+ public AbstractFileWork() {
+ fileConfig = SpringUtil.getBean(FileDecodeConfig.class);
+ }
+
+ /**
+ * @Author yaoq
+ * @Date 2022/9/20 9:25
+ * @Params
+ * @Return
+ * @Description 解析文件
+ */
+ public void doWork() {
+ String path = fileConfig.getPath();
+ if (!path.endsWith("/")) {
+ path = path + "/";
+ }
+ log.debug("[解析canal文件] path:" + path);
+ File dir = new File(path);
+ File[] fs = dir.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return !file.isDirectory() && file.getName().endsWith(suffix);
+ }
+ });
+ if (fs == null) return;
+ List files = Arrays.asList(fs);
+ Collections.sort(files, (o1, o2) -> StringUtil.chineseCompare(o1.getName(), o2.getName()));
+ log.debug("[解析canal文件] size:" + files.size());
+ for (File f : files) {
+ try {
+ //为保证数据一致性,发现错误,不能继续,卡在这
+ getDbEngine().doTrans(new IDbWorker() {
+ @Override
+ public void work() {
+ try {
+ decodeFile(f);
+ } catch (Exception e) {
+ throw new SwException(e);
+ }
+ }
+ });
+ //备份文件
+ FileUtil.copyFile(f, path + "/bak/" + DateUtil.getNowYm() + "/" + f.getName(), true);
+ } catch (Exception e) {
+ //出错了跳出,避免数据遗失
+ log.error("[解析canal文件] file:" + f.getName() + " 写入数据失败", e);
+ break;
+ }
+ }
+ }
+
+ private void decodeFile(File file) throws Exception {
+ String fileStr = FileUtils.readFileToString(file, "UTF-8");
+ if (StringUtil.isEmpty(fileStr)) return;
+ List list = JsonUtil.parseList(fileStr, CanalVO.class);
+ int insertSize = 0;
+ int updateSize = 0;
+ int deleteSize = 0;
+ int createSize = 0;
+ int alterSize = 0;
+ for (CanalVO canal : list) {
+ if (canal.createIs()) {
+ doCreateSql(canal);
+ createSize++;
+ continue;
+ }
+ if (canal.alterIs()) {
+ doAlterSql(canal);
+ alterSize++;
+ continue;
+ }
+ if (canal.insertIs()) {
+ doInsertSql(canal);
+ insertSize++;
+ continue;
+ }
+ if (canal.updateIs()) {
+ doUpdateSql(canal);
+ updateSize++;
+ continue;
+ }
+ if (canal.deleteIs()) {
+ doDeleteSql(canal);
+ deleteSize++;
+ }
+ }
+ log.debug("[解析canal文件] file:" + file.getName() + " insert size:[" + insertSize + "] update size:[" + updateSize + "] delete size:[" + deleteSize + "] create size:[" + createSize + "] alter size:[" + alterSize + "]");
+ }
+
+ protected abstract String getDbType();
+
+ protected abstract void doCreateSql(CanalVO canalVO);
+
+ protected abstract void doAlterSql(CanalVO canalVO);
+
+ protected void doInsertSql(CanalVO canalVO) {
+ Map data = canalVO.getData();
+ Set fields = data.keySet();
+ StringBuilder sql = new StringBuilder();
+ List