|
|
@@ -46,7 +46,7 @@ public abstract class AbstractFileWork { |
|
|
|
path = path + "/"; |
|
|
|
} |
|
|
|
path = path + "increment/"; |
|
|
|
log.debug("[解析canal文件] path:" + path); |
|
|
|
log.debug("[解析canal文件]:::path:" + path); |
|
|
|
File dir = new File(path); |
|
|
|
File[] fs = dir.listFiles(new FileFilter() { |
|
|
|
@Override |
|
|
@@ -57,7 +57,7 @@ public abstract class AbstractFileWork { |
|
|
|
if (fs == null) return; |
|
|
|
List<File> files = Arrays.asList(fs); |
|
|
|
Collections.sort(files, (o1, o2) -> StringUtil.chineseCompare(o1.getName(), o2.getName())); |
|
|
|
log.debug("[解析canal文件] size:" + files.size()); |
|
|
|
log.debug("[解析canal文件]:::size:" + files.size()); |
|
|
|
for (File f : files) { |
|
|
|
try { |
|
|
|
//为保证数据一致性,发现错误,不能继续,卡在这 |
|
|
@@ -75,7 +75,7 @@ public abstract class AbstractFileWork { |
|
|
|
FileUtil.copyFile(f, path + "/bak/" + DateUtil.getNowYm() + "/" + f.getName(), true); |
|
|
|
} catch (Exception e) { |
|
|
|
//出错了跳出,避免数据遗失 |
|
|
|
log.error("[解析canal文件] file:" + f.getName() + " 写入数据失败", e); |
|
|
|
log.error("[解析canal文件]:::file:" + f.getName() + " 写入数据失败", e); |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
@@ -116,7 +116,7 @@ public abstract class AbstractFileWork { |
|
|
|
deleteSize++; |
|
|
|
} |
|
|
|
} |
|
|
|
log.debug("[解析canal文件] file:" + file.getName() + " insert size:[" + insertSize + "] update size:[" + updateSize + "] delete size:[" + deleteSize + "] create size:[" + createSize + "] alter size:[" + alterSize + "]"); |
|
|
|
log.debug("[解析canal文件]:::file:" + file.getName() + " insert size:[" + insertSize + "] update size:[" + updateSize + "] delete size:[" + deleteSize + "] create size:[" + createSize + "] alter size:[" + alterSize + "]"); |
|
|
|
} |
|
|
|
|
|
|
|
protected abstract String getDbType(); |
|
|
@@ -125,12 +125,20 @@ public abstract class AbstractFileWork { |
|
|
|
|
|
|
|
protected abstract void doAlterSql(CanalVO canalVO); |
|
|
|
|
|
|
|
protected String getSchemaName(CanalVO canalVO) { |
|
|
|
return ""; |
|
|
|
} |
|
|
|
|
|
|
|
protected void doInsertSql(CanalVO canalVO) { |
|
|
|
Map<String, String> data = canalVO.getData(); |
|
|
|
Set<String> fields = data.keySet(); |
|
|
|
StringBuilder sql = new StringBuilder(); |
|
|
|
List<Object> args = new ArrayList<>(); |
|
|
|
sql.append("insert into ").append(canalVO.getTableName()).append(" ( "); |
|
|
|
sql.append("insert into "); |
|
|
|
if (!StringUtil.isEmpty(getSchemaName(canalVO))) { |
|
|
|
sql.append(getSchemaName(canalVO)).append("."); |
|
|
|
} |
|
|
|
sql.append(canalVO.getTableName()).append(" ( "); |
|
|
|
for (String field : fields) { |
|
|
|
sql.append(field).append(","); |
|
|
|
} |
|
|
@@ -155,7 +163,11 @@ public abstract class AbstractFileWork { |
|
|
|
Set<String> fields = data.keySet(); |
|
|
|
StringBuilder sql = new StringBuilder(); |
|
|
|
List<Object> args = new ArrayList<>(); |
|
|
|
sql.append("update ").append(canalVO.getTableName()).append(" set "); |
|
|
|
sql.append("update "); |
|
|
|
if (!StringUtil.isEmpty(getSchemaName(canalVO))) { |
|
|
|
sql.append(getSchemaName(canalVO)).append("."); |
|
|
|
} |
|
|
|
sql.append(canalVO.getTableName()).append(" set "); |
|
|
|
for (String field : fields) { |
|
|
|
if (field.equals(canalVO.getIdField())) continue; |
|
|
|
sql.append(field).append("=").append("?").append(","); |
|
|
@@ -172,7 +184,11 @@ public abstract class AbstractFileWork { |
|
|
|
} |
|
|
|
|
|
|
|
protected void doDeleteSql(CanalVO canalVO) { |
|
|
|
getDbEngine().update("delete from " + canalVO.getTableName() + " where " + canalVO.getIdField() + " = ? ", canalVO.getId()); |
|
|
|
if (!StringUtil.isEmpty(getSchemaName(canalVO))) { |
|
|
|
getDbEngine().update("delete from " + getSchemaName(canalVO) + "." + canalVO.getTableName() + " where " + canalVO.getIdField() + " = ? ", canalVO.getId()); |
|
|
|
} else { |
|
|
|
getDbEngine().update("delete from " + canalVO.getTableName() + " where " + canalVO.getIdField() + " = ? ", canalVO.getId()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
@@ -184,8 +200,8 @@ public abstract class AbstractFileWork { |
|
|
|
*/ |
|
|
|
protected Map<String, String> decodeCreateSql(String sql) { |
|
|
|
Map<String, String> map = new HashMap<>(); |
|
|
|
String data = sql.substring(sql.indexOf("(") + 1, sql.lastIndexOf(")")).trim().replaceAll("`", ""); |
|
|
|
String[] rows = data.split(","); |
|
|
|
String data = sql.substring(sql.indexOf("(") + 1, sql.lastIndexOf(")")).trim().replaceAll("`", "").replace("\t", ""); |
|
|
|
String[] rows = data.split(",(?=(?:[^\']*\'[^\']*\')*[^\']*$)", -1); |
|
|
|
for (String row : rows) { |
|
|
|
//索引不管 |
|
|
|
if (row.contains("INDEX")) continue; |
|
|
|