Parcourir la source

canal集成

4.0
yaoq il y a 2 ans
Parent
révision
2b4931f8f4
5 fichiers modifiés avec 81 ajouts et 25 suppressions
  1. +9
    -0
      smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientConsts.java
  2. +61
    -19
      smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientInstance.java
  3. +2
    -0
      smtweb-framework/canal/client/src/main/resources/client.properties
  4. +0
    -6
      smtweb-framework/canal/server/src/main/resources/canal.properties
  5. +9
    -0
      smtweb-framework/core/src/main/java/cc/smtweb/framework/core/util/CommUtil.java

+ 9
- 0
smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientConsts.java Voir le fichier

@@ -1,5 +1,6 @@
package cc.smtweb.system.canal.example;

import cc.smtweb.framework.core.util.CommUtil;
import org.apache.commons.lang.StringUtils;

import java.util.Properties;
@@ -20,6 +21,7 @@ public class ClientConsts {
public static final String USERNAME = ROOT + "." + "username";
public static final String PASSWORD = ROOT + "." + "password";
public static final String FILTER = ROOT + "." + "filter";
public static final String FILEPATH = "canal.file.path";


public static String getIp(Properties properties){
@@ -50,6 +52,13 @@ public class ClientConsts {
return getProperty(properties,FILTER,".*\\..*");
}

public static String getFilepath(Properties properties){
if(CommUtil.isLinux()){
return getProperty(properties,FILEPATH,"/jujia/canal/canalFile");
}
return getProperty(properties,FILEPATH,"D:/jujia/canal/canalFile");
}

public static String getProperty(Properties properties, String key, String defaultValue) {
String value = getProperty(properties, key);
if (StringUtils.isEmpty(value)) {


+ 61
- 19
smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientInstance.java Voir le fichier

@@ -1,22 +1,22 @@
package cc.smtweb.system.canal.example;

import cc.smtweb.framework.core.util.CommUtil;
import cc.smtweb.framework.core.util.DateUtil;
import cc.smtweb.framework.core.util.JsonUtil;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

import java.io.File;
import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@@ -112,6 +112,7 @@ public class ClientInstance {
}

protected void process() {
final int batchSize = ClientConsts.getBatchSize(properties);
while (running) {
try {
connector.disconnect();
@@ -119,11 +120,11 @@ public class ClientInstance {
connector.unsubscribe();
connector.subscribe(ClientConsts.getFilter(properties));
while (running) {
Message message = connector.getWithoutAck(ClientConsts.getBatchSize(properties) * 1024); // 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize * 1024); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(3000L);//无数据时,睡眠
Thread.sleep(10 * 1000L);//无数据时,睡眠10秒
continue;
} else {
dataHandle(message.getEntries());
@@ -183,19 +184,26 @@ public class ClientInstance {
* @param entrys
*/
private void dataHandle(List<CanalEntry.Entry> entrys) throws Exception {
List<ClientVO> data = new ArrayList<>();
for (CanalEntry.Entry entry : entrys) {
if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChange.getEventType();
if (eventType == CanalEntry.EventType.DELETE) {
saveDeleteSql(entry);
} else if (eventType == CanalEntry.EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == CanalEntry.EventType.INSERT) {
saveInsertSql(entry);
}
if (CanalEntry.EntryType.ROWDATA != entry.getEntryType()) continue;
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChange.getEventType();
ClientVO row = null;
if (eventType == CanalEntry.EventType.DELETE) {
row = saveDeleteSql(entry);
} else if (eventType == CanalEntry.EventType.UPDATE) {
row = saveUpdateSql(entry);
} else if (eventType == CanalEntry.EventType.INSERT) {
row = saveInsertSql(entry);
}
if (row == null) continue;
data.add(row);
}
CanalEntry.Entry entry = entrys.get(entrys.size() - 1);
logger.debug("sync data size:" + data.size() + ",canal position:filename:" + entry.getHeader().getLogfileName() + ",offset:" + entry.getHeader().getLogfileOffset());
if (data.size() == 0) return;
saveToFile(data);
}

/**
@@ -203,7 +211,7 @@ public class ClientInstance {
*
* @param entry
*/
private void saveUpdateSql(CanalEntry.Entry entry) {
private ClientVO saveUpdateSql(CanalEntry.Entry entry) {
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
@@ -218,10 +226,12 @@ public class ClientInstance {
List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList();
ClientVO update = ClientVO.ok(entry.getHeader().getTableName(), "UPDATE", data, oldColumnList.get(0).getValue());
System.out.println("更新返回 : " + JsonUtil.encodeString(update));
return update;
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

/**
@@ -229,7 +239,7 @@ public class ClientInstance {
*
* @param entry
*/
private void saveDeleteSql(CanalEntry.Entry entry) {
private ClientVO saveDeleteSql(CanalEntry.Entry entry) {
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
@@ -237,10 +247,12 @@ public class ClientInstance {
List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList();
ClientVO delete = ClientVO.ok(entry.getHeader().getTableName(), "DELETE", null, oldColumnList.get(0).getValue());
System.out.println("删除返回 : " + JsonUtil.encodeString(delete));
return delete;
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

/**
@@ -248,7 +260,7 @@ public class ClientInstance {
*
* @param entry
*/
private void saveInsertSql(CanalEntry.Entry entry) {
private ClientVO saveInsertSql(CanalEntry.Entry entry) {
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
@@ -262,10 +274,12 @@ public class ClientInstance {
});
ClientVO insert = ClientVO.ok(entry.getHeader().getTableName(), "INSERT", data, null);
System.out.println("插入返回 : " + JsonUtil.encodeString(insert));
return insert;
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

/**
@@ -285,6 +299,34 @@ public class ClientInstance {
matcher.appendTail(sb);
return sb.toString();
}


private static synchronized File createFile(String basePath) throws Exception {
String path = basePath;
if (!path.endsWith("/")) {
path = path + "/";
}
File file = new File(path + DateUtil.getNowYm() + "/" + DateUtil.nowDateLong() + "/" + DateUtil.toLongDateTimeString(null) + ".cf");
if (!file.getParentFile().exists()) { // 如果父目录不存在,创建父目录
file.getParentFile().mkdirs();
}
while (file.exists()) {
Thread.sleep(1);
file = new File(path + DateUtil.getNowYm() + "/" + DateUtil.nowDateLong() + "/" + DateUtil.toLongDateTimeString(null) + ".cf");
/*if (file.exists()) {
return file;
}*/
}
file.createNewFile();
if (file.exists()) logger.debug("canal文件生成成功:" + file.getName());
return file;
}

private void saveToFile(List<ClientVO> rows) throws Exception {
// 将格式化后的字符串写入文件
FileUtils.writeStringToFile(createFile(ClientConsts.getFilepath(properties)), JsonUtil.encodeString(rows), "UTF-8");
}

}



+ 2
- 0
smtweb-framework/canal/client/src/main/resources/client.properties Voir le fichier

@@ -10,5 +10,7 @@ canal.server.username=
canal.server.password=
# 数据库匹配规则.*\\..*, scmz\\..*
canal.server.filter=scmz\\..*
# 文件存放路径
canal.file.path=E:/canalFile



+ 0
- 6
smtweb-framework/canal/server/src/main/resources/canal.properties Voir le fichier

@@ -1,6 +1,3 @@
#################################################
######### common argument #############
#################################################
# 服务端IP 可以为空
canal.ip =
# 服务端口
@@ -9,8 +6,6 @@ canal.metrics.pull.port = 11112
# 服务用户名,密码,可以为空
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458


#######后面的配置不用管#######
canal.register.ip =
#canal admin config
@@ -22,7 +17,6 @@ canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =

canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000


+ 9
- 0
smtweb-framework/core/src/main/java/cc/smtweb/framework/core/util/CommUtil.java Voir le fichier

@@ -157,4 +157,13 @@ public class CommUtil {
}
return s;
}


public static boolean isLinux() {
return System.getProperty("os.name").toLowerCase().contains("linux");
}

public static boolean isWindows() {
return System.getProperty("os.name").toLowerCase().contains("windows");
}
}

Chargement…
Annuler
Enregistrer