From 2b4931f8f49b086d85d94fce70c975feb72d6685 Mon Sep 17 00:00:00 2001 From: yaoq Date: Thu, 15 Sep 2022 19:34:00 +0800 Subject: [PATCH] =?UTF-8?q?canal=E9=9B=86=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../smtweb/system/canal/example/ClientConsts.java | 9 +++ .../system/canal/example/ClientInstance.java | 80 +++++++++++++++++----- .../client/src/main/resources/client.properties | 2 + .../server/src/main/resources/canal.properties | 6 -- .../cc/smtweb/framework/core/util/CommUtil.java | 9 +++ 5 files changed, 81 insertions(+), 25 deletions(-) diff --git a/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientConsts.java b/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientConsts.java index 752902e..e4cdd70 100644 --- a/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientConsts.java +++ b/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientConsts.java @@ -1,5 +1,6 @@ package cc.smtweb.system.canal.example; +import cc.smtweb.framework.core.util.CommUtil; import org.apache.commons.lang.StringUtils; import java.util.Properties; @@ -20,6 +21,7 @@ public class ClientConsts { public static final String USERNAME = ROOT + "." + "username"; public static final String PASSWORD = ROOT + "." + "password"; public static final String FILTER = ROOT + "." + "filter"; + public static final String FILEPATH = "canal.file.path"; public static String getIp(Properties properties){ @@ -50,6 +52,13 @@ public class ClientConsts { return getProperty(properties,FILTER,".*\\..*"); } + public static String getFilepath(Properties properties){ + if(CommUtil.isLinux()){ + return getProperty(properties,FILEPATH,"/jujia/canal/canalFile"); + } + return getProperty(properties,FILEPATH,"D:/jujia/canal/canalFile"); + } + public static String getProperty(Properties properties, String key, String defaultValue) { String value = getProperty(properties, key); if (StringUtils.isEmpty(value)) { diff --git a/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientInstance.java b/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientInstance.java index 967dc14..6f13f7b 100644 --- a/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientInstance.java +++ b/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientInstance.java @@ -1,22 +1,22 @@ package cc.smtweb.system.canal.example; import cc.smtweb.framework.core.util.CommUtil; +import cc.smtweb.framework.core.util.DateUtil; import cc.smtweb.framework.core.util.JsonUtil; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; +import java.io.File; import java.io.FileInputStream; import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; +import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -112,6 +112,7 @@ public class ClientInstance { } protected void process() { + final int batchSize = ClientConsts.getBatchSize(properties); while (running) { try { connector.disconnect(); @@ -119,11 +120,11 @@ public class ClientInstance { connector.unsubscribe(); connector.subscribe(ClientConsts.getFilter(properties)); while (running) { - Message message = connector.getWithoutAck(ClientConsts.getBatchSize(properties) * 1024); // 获取指定数量的数据 + Message message = connector.getWithoutAck(batchSize * 1024); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { - Thread.sleep(3000L);//无数据时,睡眠。 + Thread.sleep(10 * 1000L);//无数据时,睡眠10秒。 continue; } else { dataHandle(message.getEntries()); @@ -183,19 +184,26 @@ public class ClientInstance { * @param entrys */ private void dataHandle(List entrys) throws Exception { + List data = new ArrayList<>(); for (CanalEntry.Entry entry : entrys) { - if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) { - CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); - CanalEntry.EventType eventType = rowChange.getEventType(); - if (eventType == CanalEntry.EventType.DELETE) { - saveDeleteSql(entry); - } else if (eventType == CanalEntry.EventType.UPDATE) { - saveUpdateSql(entry); - } else if (eventType == CanalEntry.EventType.INSERT) { - saveInsertSql(entry); - } + if (CanalEntry.EntryType.ROWDATA != entry.getEntryType()) continue; + CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); + CanalEntry.EventType eventType = rowChange.getEventType(); + ClientVO row = null; + if (eventType == CanalEntry.EventType.DELETE) { + row = saveDeleteSql(entry); + } else if (eventType == CanalEntry.EventType.UPDATE) { + row = saveUpdateSql(entry); + } else if (eventType == CanalEntry.EventType.INSERT) { + row = saveInsertSql(entry); } + if (row == null) continue; + data.add(row); } + CanalEntry.Entry entry = entrys.get(entrys.size() - 1); + logger.debug("sync data size:" + data.size() + ",canal position:filename:" + entry.getHeader().getLogfileName() + ",offset:" + entry.getHeader().getLogfileOffset()); + if (data.size() == 0) return; + saveToFile(data); } /** @@ -203,7 +211,7 @@ public class ClientInstance { * * @param entry */ - private void saveUpdateSql(CanalEntry.Entry entry) { + private ClientVO saveUpdateSql(CanalEntry.Entry entry) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List rowDatasList = rowChange.getRowDatasList(); @@ -218,10 +226,12 @@ public class ClientInstance { List oldColumnList = rowData.getBeforeColumnsList(); ClientVO update = ClientVO.ok(entry.getHeader().getTableName(), "UPDATE", data, oldColumnList.get(0).getValue()); System.out.println("更新返回 : " + JsonUtil.encodeString(update)); + return update; } } catch (Exception e) { e.printStackTrace(); } + return null; } /** @@ -229,7 +239,7 @@ public class ClientInstance { * * @param entry */ - private void saveDeleteSql(CanalEntry.Entry entry) { + private ClientVO saveDeleteSql(CanalEntry.Entry entry) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List rowDatasList = rowChange.getRowDatasList(); @@ -237,10 +247,12 @@ public class ClientInstance { List oldColumnList = rowData.getBeforeColumnsList(); ClientVO delete = ClientVO.ok(entry.getHeader().getTableName(), "DELETE", null, oldColumnList.get(0).getValue()); System.out.println("删除返回 : " + JsonUtil.encodeString(delete)); + return delete; } } catch (Exception e) { e.printStackTrace(); } + return null; } /** @@ -248,7 +260,7 @@ public class ClientInstance { * * @param entry */ - private void saveInsertSql(CanalEntry.Entry entry) { + private ClientVO saveInsertSql(CanalEntry.Entry entry) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List rowDatasList = rowChange.getRowDatasList(); @@ -262,10 +274,12 @@ public class ClientInstance { }); ClientVO insert = ClientVO.ok(entry.getHeader().getTableName(), "INSERT", data, null); System.out.println("插入返回 : " + JsonUtil.encodeString(insert)); + return insert; } } catch (Exception e) { e.printStackTrace(); } + return null; } /** @@ -285,6 +299,34 @@ public class ClientInstance { matcher.appendTail(sb); return sb.toString(); } + + + private static synchronized File createFile(String basePath) throws Exception { + String path = basePath; + if (!path.endsWith("/")) { + path = path + "/"; + } + File file = new File(path + DateUtil.getNowYm() + "/" + DateUtil.nowDateLong() + "/" + DateUtil.toLongDateTimeString(null) + ".cf"); + if (!file.getParentFile().exists()) { // 如果父目录不存在,创建父目录 + file.getParentFile().mkdirs(); + } + while (file.exists()) { + Thread.sleep(1); + file = new File(path + DateUtil.getNowYm() + "/" + DateUtil.nowDateLong() + "/" + DateUtil.toLongDateTimeString(null) + ".cf"); + /*if (file.exists()) { + return file; + }*/ + } + file.createNewFile(); + if (file.exists()) logger.debug("canal文件生成成功:" + file.getName()); + return file; + } + + private void saveToFile(List rows) throws Exception { + // 将格式化后的字符串写入文件 + FileUtils.writeStringToFile(createFile(ClientConsts.getFilepath(properties)), JsonUtil.encodeString(rows), "UTF-8"); + } + } diff --git a/smtweb-framework/canal/client/src/main/resources/client.properties b/smtweb-framework/canal/client/src/main/resources/client.properties index 2069226..faee6e7 100644 --- a/smtweb-framework/canal/client/src/main/resources/client.properties +++ b/smtweb-framework/canal/client/src/main/resources/client.properties @@ -10,5 +10,7 @@ canal.server.username= canal.server.password= # 数据库匹配规则.*\\..*, scmz\\..* canal.server.filter=scmz\\..* +# 文件存放路径 +canal.file.path=E:/canalFile diff --git a/smtweb-framework/canal/server/src/main/resources/canal.properties b/smtweb-framework/canal/server/src/main/resources/canal.properties index 5994072..289afe5 100644 --- a/smtweb-framework/canal/server/src/main/resources/canal.properties +++ b/smtweb-framework/canal/server/src/main/resources/canal.properties @@ -1,6 +1,3 @@ -################################################# -######### common argument ############# -################################################# # 服务端IP 可以为空 canal.ip = # 服务端口 @@ -9,8 +6,6 @@ canal.metrics.pull.port = 11112 # 服务用户名,密码,可以为空 # canal.user = canal # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 - - #######后面的配置不用管####### canal.register.ip = #canal admin config @@ -22,7 +17,6 @@ canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 #canal.admin.register.auto = true #canal.admin.register.cluster = #canal.admin.register.name = - canal.zkServers = # flush data to zk canal.zookeeper.flush.period = 1000 diff --git a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/util/CommUtil.java b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/util/CommUtil.java index 3e0cc71..c0f1820 100644 --- a/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/util/CommUtil.java +++ b/smtweb-framework/core/src/main/java/cc/smtweb/framework/core/util/CommUtil.java @@ -157,4 +157,13 @@ public class CommUtil { } return s; } + + + public static boolean isLinux() { + return System.getProperty("os.name").toLowerCase().contains("linux"); + } + + public static boolean isWindows() { + return System.getProperty("os.name").toLowerCase().contains("windows"); + } }