From b708a3006103597bcb7e94d31109bd9475db5e10 Mon Sep 17 00:00:00 2001 From: yaoq Date: Wed, 12 Oct 2022 09:20:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../client/src/main/resources/client.properties | 2 +- smtweb-framework/canal/file/pom.xml | 13 +++++++-- .../system/canal/file/CanalFileApplication.java | 17 ++++++++--- .../system/canal/file/common/AbstractFileWork.java | 34 ++++++++++++++++------ .../canal/file/common/FileDecodeService.java | 14 ++++++++- .../system/canal/file/impl/DmSqlFileWork.java | 1 - .../system/canal/file/impl/KbSqlFileWork.java | 5 ++++ .../system/canal/file/impl/MySqlFileWork.java | 5 ++++ .../src/main/resources/config/application.yaml | 16 +++++----- smtweb-framework/canal/server/pom.xml | 4 +-- .../src/main/resources/example/instance.properties | 2 +- 11 files changed, 83 insertions(+), 30 deletions(-) diff --git a/smtweb-framework/canal/client/src/main/resources/client.properties b/smtweb-framework/canal/client/src/main/resources/client.properties index 61fb744..8da493c 100644 --- a/smtweb-framework/canal/client/src/main/resources/client.properties +++ b/smtweb-framework/canal/client/src/main/resources/client.properties @@ -9,7 +9,7 @@ canal.server.port=11111 canal.server.username= canal.server.password= # 数据库匹配规则.*\\..*, scmz\\..* -canal.server.filter=pgzx\\..* +canal.server.filter=pgzx.*\\..* # 文件存放路径 E:/canalFile canal.file.path=/data/canalFile # mysql、dmsql、kbsql、oracle diff --git a/smtweb-framework/canal/file/pom.xml b/smtweb-framework/canal/file/pom.xml index 8284aa1..0f2a8c6 100644 --- a/smtweb-framework/canal/file/pom.xml +++ b/smtweb-framework/canal/file/pom.xml @@ -12,6 +12,7 @@ cc.smtweb canal.file 1.1.5 + war 1.8 @@ -28,13 +29,19 @@ test - org.springframework.boot - spring-boot-devtools - true + spring-boot-starter-tomcat + provided + + + diff --git a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/CanalFileApplication.java b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/CanalFileApplication.java index 19d91bd..24b4e7c 100644 --- a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/CanalFileApplication.java +++ b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/CanalFileApplication.java @@ -2,12 +2,21 @@ package cc.smtweb.system.canal.file; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.boot.web.servlet.ServletComponentScan; +import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; @SpringBootApplication -public class CanalFileApplication { +@ServletComponentScan +public class CanalFileApplication extends SpringBootServletInitializer { - public static void main(String[] args) { - SpringApplication.run(CanalFileApplication.class, args); - } + @Override + protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) { + return builder.sources(CanalFileApplication.class); + } + + public static void main(String[] args) { + SpringApplication.run(CanalFileApplication.class, args); + } } diff --git a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/common/AbstractFileWork.java b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/common/AbstractFileWork.java index 89967e4..4f71826 100644 --- a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/common/AbstractFileWork.java +++ b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/common/AbstractFileWork.java @@ -46,7 +46,7 @@ public abstract class AbstractFileWork { path = path + "/"; } path = path + "increment/"; - log.debug("[解析canal文件] path:" + path); + log.debug("[解析canal文件]:::path:" + path); File dir = new File(path); File[] fs = dir.listFiles(new FileFilter() { @Override @@ -57,7 +57,7 @@ public abstract class AbstractFileWork { if (fs == null) return; List files = Arrays.asList(fs); Collections.sort(files, (o1, o2) -> StringUtil.chineseCompare(o1.getName(), o2.getName())); - log.debug("[解析canal文件] size:" + files.size()); + log.debug("[解析canal文件]:::size:" + files.size()); for (File f : files) { try { //为保证数据一致性,发现错误,不能继续,卡在这 @@ -75,7 +75,7 @@ public abstract class AbstractFileWork { FileUtil.copyFile(f, path + "/bak/" + DateUtil.getNowYm() + "/" + f.getName(), true); } catch (Exception e) { //出错了跳出,避免数据遗失 - log.error("[解析canal文件] file:" + f.getName() + " 写入数据失败", e); + log.error("[解析canal文件]:::file:" + f.getName() + " 写入数据失败", e); break; } } @@ -116,7 +116,7 @@ public abstract class AbstractFileWork { deleteSize++; } } - log.debug("[解析canal文件] file:" + file.getName() + " insert size:[" + insertSize + "] update size:[" + updateSize + "] delete size:[" + deleteSize + "] create size:[" + createSize + "] alter size:[" + alterSize + "]"); + log.debug("[解析canal文件]:::file:" + file.getName() + " insert size:[" + insertSize + "] update size:[" + updateSize + "] delete size:[" + deleteSize + "] create size:[" + createSize + "] alter size:[" + alterSize + "]"); } protected abstract String getDbType(); @@ -125,12 +125,20 @@ public abstract class AbstractFileWork { protected abstract void doAlterSql(CanalVO canalVO); + protected String getSchemaName(CanalVO canalVO) { + return ""; + } + protected void doInsertSql(CanalVO canalVO) { Map data = canalVO.getData(); Set fields = data.keySet(); StringBuilder sql = new StringBuilder(); List args = new ArrayList<>(); - sql.append("insert into ").append(canalVO.getTableName()).append(" ( "); + sql.append("insert into "); + if (!StringUtil.isEmpty(getSchemaName(canalVO))) { + sql.append(getSchemaName(canalVO)).append("."); + } + sql.append(canalVO.getTableName()).append(" ( "); for (String field : fields) { sql.append(field).append(","); } @@ -155,7 +163,11 @@ public abstract class AbstractFileWork { Set fields = data.keySet(); StringBuilder sql = new StringBuilder(); List args = new ArrayList<>(); - sql.append("update ").append(canalVO.getTableName()).append(" set "); + sql.append("update "); + if (!StringUtil.isEmpty(getSchemaName(canalVO))) { + sql.append(getSchemaName(canalVO)).append("."); + } + sql.append(canalVO.getTableName()).append(" set "); for (String field : fields) { if (field.equals(canalVO.getIdField())) continue; sql.append(field).append("=").append("?").append(","); @@ -172,7 +184,11 @@ public abstract class AbstractFileWork { } protected void doDeleteSql(CanalVO canalVO) { - getDbEngine().update("delete from " + canalVO.getTableName() + " where " + canalVO.getIdField() + " = ? ", canalVO.getId()); + if (!StringUtil.isEmpty(getSchemaName(canalVO))) { + getDbEngine().update("delete from " + getSchemaName(canalVO) + "." + canalVO.getTableName() + " where " + canalVO.getIdField() + " = ? ", canalVO.getId()); + } else { + getDbEngine().update("delete from " + canalVO.getTableName() + " where " + canalVO.getIdField() + " = ? ", canalVO.getId()); + } } /** @@ -184,8 +200,8 @@ public abstract class AbstractFileWork { */ protected Map decodeCreateSql(String sql) { Map map = new HashMap<>(); - String data = sql.substring(sql.indexOf("(") + 1, sql.lastIndexOf(")")).trim().replaceAll("`", ""); - String[] rows = data.split(","); + String data = sql.substring(sql.indexOf("(") + 1, sql.lastIndexOf(")")).trim().replaceAll("`", "").replace("\t", ""); + String[] rows = data.split(",(?=(?:[^\']*\'[^\']*\')*[^\']*$)", -1); for (String row : rows) { //索引不管 if (row.contains("INDEX")) continue; diff --git a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/common/FileDecodeService.java b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/common/FileDecodeService.java index 8264ada..299e298 100644 --- a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/common/FileDecodeService.java +++ b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/common/FileDecodeService.java @@ -4,12 +4,15 @@ import cc.smtweb.framework.core.db.DbEngine; import cc.smtweb.framework.core.systask.BaseSysService; import lombok.extern.slf4j.Slf4j; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + @Slf4j public class FileDecodeService extends BaseSysService { @Override public String getTitle() { - return "解析canal文件"; + return "[解析canal文件]"; } @Override @@ -26,4 +29,13 @@ public class FileDecodeService extends BaseSysService { log.error("解析canal文件失败:", e); } } + + public static void main( String args[] ) { + String sentence = "^pgzx.*\\..*"; + Pattern pattern = Pattern.compile(sentence); + Matcher matcher = pattern.matcher("pgzx.tb_sys_job"); + Matcher matcher1 = pattern.matcher("pgzx_zx.tb_sys_job"); + System.out.println(matcher.find()); + System.out.println(matcher1.find()); + } } diff --git a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/impl/DmSqlFileWork.java b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/impl/DmSqlFileWork.java index 42b818c..05aaed2 100644 --- a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/impl/DmSqlFileWork.java +++ b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/impl/DmSqlFileWork.java @@ -17,7 +17,6 @@ public class DmSqlFileWork extends AbstractFileWork { return SwEnum.DbType.DMSQL.value; } - @Override protected void doCreateSql(CanalVO canalVO) { throw new SwException("暂不支持新增表"); diff --git a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/impl/KbSqlFileWork.java b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/impl/KbSqlFileWork.java index 7213192..7bbec9f 100644 --- a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/impl/KbSqlFileWork.java +++ b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/impl/KbSqlFileWork.java @@ -21,6 +21,11 @@ public class KbSqlFileWork extends AbstractFileWork { } @Override + protected String getSchemaName(CanalVO canalVO) { + return "public"; + } + + @Override protected void doCreateSql(CanalVO canalVO) { Map sqlMap = decodeCreateSql(canalVO.getSql()); StringBuilder sql = new StringBuilder(); diff --git a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/impl/MySqlFileWork.java b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/impl/MySqlFileWork.java index 752ea97..a55103d 100644 --- a/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/impl/MySqlFileWork.java +++ b/smtweb-framework/canal/file/src/main/java/cc/smtweb/system/canal/file/impl/MySqlFileWork.java @@ -33,6 +33,11 @@ public class MySqlFileWork extends AbstractFileWork { } @Override + protected String getSchemaName(CanalVO canalVO) { + return canalVO.getSchemaName(); + } + + @Override protected void doInsertSql(CanalVO canalVO) { Map data = canalVO.getData(); Set fields = data.keySet(); diff --git a/smtweb-framework/canal/file/src/main/resources/config/application.yaml b/smtweb-framework/canal/file/src/main/resources/config/application.yaml index 84d5ad5..80bdf84 100644 --- a/smtweb-framework/canal/file/src/main/resources/config/application.yaml +++ b/smtweb-framework/canal/file/src/main/resources/config/application.yaml @@ -17,10 +17,10 @@ smtweb: # 服务模块 devtools: restart: - enabled: true # 热部署开关 + enabled: false # 热部署开关 additional-paths: src/main/java #重启目录 server: - port: 8888 + port: 8088 servlet: context-path: logging: @@ -32,13 +32,12 @@ spring: host: 127.0.0.1 port: 6379 password: + database: 9 datasource: driver-class-name: com.kingbase8.Driver - # url: jdbc:mysql://139.9.38.43:6032/smt?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false - # url: jdbc:mysql://127.0.0.1:3306/tzrs?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false - url: jdbc:kingbase8://172.28.123.205:54321/HLJTY?useUnicode=true&characterEncoding=utf-8 - username: system - password: system + url: jdbc:kingbase8://10.176.246.198:54321/pgzx?useUnicode=true&characterEncoding=utf-8 + username: 'zhengxie' + password: '%TGB6yhn' servlet: multipart: max-file-size: 104857600000 @@ -54,7 +53,8 @@ spring: # canal配置 canal: file: - path: E:/canalFile + enable: true + path: /data/canalFile # http 规则配置 http-config: diff --git a/smtweb-framework/canal/server/pom.xml b/smtweb-framework/canal/server/pom.xml index 2d0229e..7dceb69 100644 --- a/smtweb-framework/canal/server/pom.xml +++ b/smtweb-framework/canal/server/pom.xml @@ -85,9 +85,9 @@ **/logback.xml - **/canal.properties + **/mq.yml diff --git a/smtweb-framework/canal/server/src/main/resources/example/instance.properties b/smtweb-framework/canal/server/src/main/resources/example/instance.properties index e998b74..cc59d02 100644 --- a/smtweb-framework/canal/server/src/main/resources/example/instance.properties +++ b/smtweb-framework/canal/server/src/main/resources/example/instance.properties @@ -37,7 +37,7 @@ canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex -canal.instance.filter.regex=pgzx\\..* +canal.instance.filter.regex=pgzx.*\\..* # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)