diff --git a/smtweb-framework/canal/client/canal.example.iml b/smtweb-framework/canal/client/canal.example.iml index 4dce082..e51cbbd 100644 --- a/smtweb-framework/canal/client/canal.example.iml +++ b/smtweb-framework/canal/client/canal.example.iml @@ -27,7 +27,7 @@ - + @@ -68,6 +68,7 @@ + @@ -86,10 +87,9 @@ - - - - + + + diff --git a/smtweb-framework/canal/client/pom.xml b/smtweb-framework/canal/client/pom.xml index 47b1bb1..2d374e9 100644 --- a/smtweb-framework/canal/client/pom.xml +++ b/smtweb-framework/canal/client/pom.xml @@ -9,23 +9,23 @@ cc.smtweb canal.example - 1.1.6 + 1.1.5 com.alibaba.otter canal.client - 1.1.6 + 1.1.5 com.alibaba.otter canal.protocol - 1.1.6 + 1.1.5 com.alibaba druid - 1.2.11 + 1.2.6 @@ -118,9 +118,9 @@ - org.apache.maven.plugins + org.apache.maven.plugins maven-jar-plugin - 2.5 + 2.5 true @@ -198,7 +198,7 @@ ${basedir}/src/main/assembly/release.xml - ${project.artifactId}-1.1.6 + ${project.artifactId}-1.1.5 ${project.parent.build.directory} diff --git a/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientConsts.java b/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientConsts.java index fdda311..752902e 100644 --- a/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientConsts.java +++ b/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientConsts.java @@ -12,7 +12,7 @@ import java.util.Properties; */ public class ClientConsts { - public static final String ROOT = "canal.client"; + public static final String ROOT = "canal.server"; public static final String SIZE = ROOT + "." + "size"; public static final String INSTANCE = ROOT + "." + "instance"; public static final String IP = ROOT + "." + "ip"; diff --git a/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientInstance.java b/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientInstance.java index 4c87a73..967dc14 100644 --- a/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientInstance.java +++ b/smtweb-framework/canal/client/src/main/java/cc/smtweb/system/canal/example/ClientInstance.java @@ -165,9 +165,9 @@ public class ClientInstance { logger.error("canal服务连接异常!", "重新连接canal服务端已失败【" + count + "】次,请检查canal服务是否启动,请及时启动canal服务端,或联系研发人员!"); } logger.debug("canal服务连接中断,第【" + count + "】次重连失败:" + CommUtil.getOrigMsg(e2)); - logger.debug("canal服务连接中断," + (3 * rate) + "分钟后进行第【" + (count + 1) + "】连接测试!"); + logger.debug("canal服务连接中断," + (1 * rate) + "分钟后进行第【" + (count + 1) + "】连接测试!"); try { - Thread.sleep(180 * rate * 1000L); + Thread.sleep(60 * rate * 1000L); } catch (InterruptedException e1) { // ignore } diff --git a/smtweb-framework/canal/client/src/main/resources/client.properties b/smtweb-framework/canal/client/src/main/resources/client.properties index 990225f..2069226 100644 --- a/smtweb-framework/canal/client/src/main/resources/client.properties +++ b/smtweb-framework/canal/client/src/main/resources/client.properties @@ -9,6 +9,6 @@ canal.server.port=11111 canal.server.username= canal.server.password= # 数据库匹配规则.*\\..*, scmz\\..* -canal.server.filter=.*\\..* +canal.server.filter=scmz\\..* diff --git a/smtweb-framework/canal/server/canal.deployer.iml b/smtweb-framework/canal/server/canal.deployer.iml index 70b374f..451cf65 100644 --- a/smtweb-framework/canal/server/canal.deployer.iml +++ b/smtweb-framework/canal/server/canal.deployer.iml @@ -27,9 +27,9 @@ - - - + + + @@ -60,7 +60,7 @@ - + @@ -68,33 +68,34 @@ + - - + + - - - - - - - + + + + + + + - - - + + + - + @@ -106,24 +107,20 @@ - - - - - - - - - - - + + + + + + + @@ -135,7 +132,9 @@ + + diff --git a/smtweb-framework/canal/server/pom.xml b/smtweb-framework/canal/server/pom.xml index c2f91c4..1410b5e 100644 --- a/smtweb-framework/canal/server/pom.xml +++ b/smtweb-framework/canal/server/pom.xml @@ -11,27 +11,27 @@ cc.smtweb canal.deployer - 1.1.6 + 1.1.5 - com.alibaba.otter canal.server - 1.1.6 + 1.1.5 + com.alibaba.otter canal.prometheus - 1.1.6 + 1.1.5 runtime com.alibaba.otter connector.kafka - 1.1.6 + 1.1.5 * @@ -44,7 +44,7 @@ com.alibaba.otter connector.rocketmq - 1.1.6 + 1.1.5 * @@ -57,20 +57,7 @@ com.alibaba.otter connector.rabbitmq - 1.1.6 - - - * - * - - - jar-with-dependencies - provided - - - com.alibaba.otter - connector.pulsarmq - 1.1.6 + 1.1.5 * @@ -82,11 +69,6 @@ - org.springframework.boot - spring-boot-autoconfigure - - - cc.smtweb sw-framework-core 3.1.0-SNAPSHOT @@ -97,16 +79,11 @@ sw-system-bpm 3.1.0-SNAPSHOT + - - - maven-jar-plugin @@ -210,7 +187,7 @@ ${basedir}/src/main/assembly/release.xml - ${project.artifactId}-1.1.6 + ${project.artifactId}-1.1.5 ${project.parent.build.directory} diff --git a/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/CanalController.java b/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/CanalController.java index fde52aa..2716fea 100644 --- a/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/CanalController.java +++ b/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/CanalController.java @@ -1,22 +1,9 @@ package cc.smtweb.system.canal.deployer; -import java.util.Map; -import java.util.Properties; - import cc.smtweb.system.canal.deployer.monitor.InstanceAction; import cc.smtweb.system.canal.deployer.monitor.InstanceConfigMonitor; import cc.smtweb.system.canal.deployer.monitor.ManagerInstanceConfigMonitor; import cc.smtweb.system.canal.deployer.monitor.SpringInstanceConfigMonitor; -import org.I0Itec.zkclient.IZkStateListener; -import org.I0Itec.zkclient.exception.ZkNoNodeException; -import org.I0Itec.zkclient.exception.ZkNodeExistsException; -import org.apache.commons.lang.BooleanUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; - import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.common.zookeeper.ZkClientx; import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils; @@ -24,7 +11,6 @@ import com.alibaba.otter.canal.common.zookeeper.running.ServerRunningData; import com.alibaba.otter.canal.common.zookeeper.running.ServerRunningListener; import com.alibaba.otter.canal.common.zookeeper.running.ServerRunningMonitor; import com.alibaba.otter.canal.common.zookeeper.running.ServerRunningMonitors; -import cc.smtweb.system.canal.deployer.InstanceConfig.InstanceMode; import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator; import com.alibaba.otter.canal.instance.manager.PlainCanalInstanceGenerator; import com.alibaba.otter.canal.instance.manager.plain.PlainCanalConfigClient; @@ -36,6 +22,18 @@ import com.alibaba.otter.canal.server.netty.CanalServerWithNetty; import com.google.common.base.Function; import com.google.common.collect.MapMaker; import com.google.common.collect.MigrateMap; +import org.I0Itec.zkclient.IZkStateListener; +import org.I0Itec.zkclient.exception.ZkNoNodeException; +import org.I0Itec.zkclient.exception.ZkNodeExistsException; +import org.apache.commons.lang.BooleanUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +import java.util.Map; +import java.util.Properties; /** * canal调度控制器 @@ -57,7 +55,7 @@ public class CanalController { // 监听instance config的变化 private boolean autoScan = true; private InstanceAction defaultAction; - private Map instanceConfigMonitors; + private Map instanceConfigMonitors; private CanalServerWithEmbedded embededCanalServer; private CanalServerWithNetty canalServer; @@ -336,9 +334,9 @@ public class CanalController { String modeStr = getProperty(properties, CanalConstants.getInstanceModeKey(CanalConstants.GLOBAL_NAME)); if (StringUtils.isNotEmpty(adminManagerAddress)) { // 如果指定了manager地址,则强制适用manager - globalConfig.setMode(InstanceMode.MANAGER); + globalConfig.setMode(InstanceConfig.InstanceMode.MANAGER); } else if (StringUtils.isNotEmpty(modeStr)) { - globalConfig.setMode(InstanceMode.valueOf(StringUtils.upperCase(modeStr))); + globalConfig.setMode(InstanceConfig.InstanceMode.valueOf(StringUtils.upperCase(modeStr))); } String lazyStr = getProperty(properties, CanalConstants.getInstancLazyKey(CanalConstants.GLOBAL_NAME)); @@ -409,9 +407,9 @@ public class CanalController { String modeStr = getProperty(properties, CanalConstants.getInstanceModeKey(destination)); if (StringUtils.isNotEmpty(adminManagerAddress)) { // 如果指定了manager地址,则强制适用manager - config.setMode(InstanceMode.MANAGER); + config.setMode(InstanceConfig.InstanceMode.MANAGER); } else if (StringUtils.isNotEmpty(modeStr)) { - config.setMode(InstanceMode.valueOf(StringUtils.upperCase(modeStr))); + config.setMode(InstanceConfig.InstanceMode.valueOf(StringUtils.upperCase(modeStr))); } String lazyStr = getProperty(properties, CanalConstants.getInstancLazyKey(destination)); @@ -595,7 +593,7 @@ public class CanalController { this.canalMQStarter = canalMQStarter; } - public Map getInstanceConfigMonitors() { + public Map getInstanceConfigMonitors() { return instanceConfigMonitors; } diff --git a/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/CanalLauncher.java b/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/CanalLauncher.java index 0cba780..f676517 100644 --- a/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/CanalLauncher.java +++ b/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/CanalLauncher.java @@ -1,14 +1,5 @@ package cc.smtweb.system.canal.deployer; -import com.alibaba.otter.canal.common.utils.AddressUtils; -import com.alibaba.otter.canal.common.utils.NamedThreadFactory; -import com.alibaba.otter.canal.instance.manager.plain.PlainCanal; -import com.alibaba.otter.canal.instance.manager.plain.PlainCanalConfigClient; -import org.apache.commons.lang.BooleanUtils; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.FileInputStream; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -16,6 +7,16 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.BooleanUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.otter.canal.common.utils.AddressUtils; +import com.alibaba.otter.canal.common.utils.NamedThreadFactory; +import com.alibaba.otter.canal.instance.manager.plain.PlainCanal; +import com.alibaba.otter.canal.instance.manager.plain.PlainCanalConfigClient; + /** * canal独立版本启动的入口类 * @@ -24,25 +25,21 @@ import java.util.concurrent.TimeUnit; */ public class CanalLauncher { - private static final String CLASSPATH_URL_PREFIX = "classpath:"; - private static final Logger logger = LoggerFactory.getLogger(CanalLauncher.class); - public static final CountDownLatch runningLatch = new CountDownLatch(1); - private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, - new NamedThreadFactory("canal-server-scan")); + private static final String CLASSPATH_URL_PREFIX = "classpath:"; + private static final Logger logger = LoggerFactory.getLogger(CanalLauncher.class); + public static final CountDownLatch runningLatch = new CountDownLatch(1); + private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, + new NamedThreadFactory("canal-server-scan")); public static void main(String[] args) { startServer(); } - public static void startServer() { + public static void startServer(){ try { - logger.info("## canal sever start begin"); logger.info("## set default uncaught exception handler"); setGlobalUncaughtExceptionHandler(); - // 支持rocketmq client 配置日志路径 - System.setProperty("rocketmq.client.logUseSlf4j", "true"); - logger.info("## load canal configurations"); String conf = System.getProperty("canal.conf", "classpath:canal.properties"); Properties properties = new Properties(); @@ -63,10 +60,6 @@ public class CanalLauncher { CanalConstants.CANAL_ADMIN_AUTO_REGISTER)); String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER); String name = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_REGISTER_NAME); - if (StringUtils.isEmpty(name)) { - name = AddressUtils.getHostName(); - } - String registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP); if (StringUtils.isEmpty(registerIp)) { registerIp = AddressUtils.getHostIp(); @@ -126,7 +119,6 @@ public class CanalLauncher { } canalStater.start(); - logger.info("## canal server start success"); runningLatch.await(); executor.shutdownNow(); } catch (Throwable e) { diff --git a/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/CanalStarter.java b/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/CanalStarter.java index f6dfbb0..fb13bc7 100644 --- a/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/CanalStarter.java +++ b/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/CanalStarter.java @@ -1,17 +1,16 @@ package cc.smtweb.system.canal.deployer; -import java.util.Properties; - +import cc.smtweb.system.canal.deployer.admin.CanalAdminController; +import com.alibaba.otter.canal.admin.netty.CanalAdminWithNetty; import com.alibaba.otter.canal.connector.core.config.MQProperties; +import com.alibaba.otter.canal.connector.core.spi.CanalMQProducer; +import com.alibaba.otter.canal.connector.core.spi.ExtensionLoader; +import com.alibaba.otter.canal.server.CanalMQStarter; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.alibaba.otter.canal.admin.netty.CanalAdminWithNetty; -import com.alibaba.otter.canal.connector.core.spi.CanalMQProducer; -import com.alibaba.otter.canal.connector.core.spi.ExtensionLoader; -import cc.smtweb.system.canal.deployer.admin.CanalAdminController; -import com.alibaba.otter.canal.server.CanalMQStarter; +import java.util.Properties; /** * Canal server 启动类 diff --git a/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/admin/CanalAdminController.java b/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/admin/CanalAdminController.java index cc58950..6b6f4e9 100644 --- a/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/admin/CanalAdminController.java +++ b/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/admin/CanalAdminController.java @@ -1,30 +1,29 @@ package cc.smtweb.system.canal.deployer.admin; -import java.io.File; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - import cc.smtweb.system.canal.deployer.CanalStarter; +import cc.smtweb.system.canal.deployer.InstanceConfig; +import cc.smtweb.system.canal.deployer.monitor.InstanceAction; import cc.smtweb.system.canal.deployer.monitor.InstanceConfigMonitor; import cc.smtweb.system.canal.deployer.monitor.ManagerInstanceConfigMonitor; import cc.smtweb.system.canal.deployer.monitor.SpringInstanceConfigMonitor; -import cc.smtweb.system.canal.deployer.monitor.InstanceAction; -import org.apache.commons.io.filefilter.TrueFileFilter; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.alibaba.otter.canal.admin.CanalAdmin; import com.alibaba.otter.canal.common.utils.FileUtils; -import cc.smtweb.system.canal.deployer.InstanceConfig; import com.alibaba.otter.canal.instance.core.CanalInstance; import com.alibaba.otter.canal.protocol.SecurityUtil; import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded; import com.google.common.base.Joiner; +import org.apache.commons.io.filefilter.TrueFileFilter; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; /** * 提供canal admin的管理操作 @@ -37,7 +36,7 @@ public class CanalAdminController implements CanalAdmin { private static final Logger logger = LoggerFactory.getLogger(CanalAdminController.class); private String user; private String passwd; - private CanalStarter canalStater; + private CanalStarter canalStater; public CanalAdminController(CanalStarter canalStater){ this.canalStater = canalStater; diff --git a/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/monitor/ManagerInstanceConfigMonitor.java b/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/monitor/ManagerInstanceConfigMonitor.java index ca413fa..2c6aded 100644 --- a/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/monitor/ManagerInstanceConfigMonitor.java +++ b/smtweb-framework/canal/server/src/main/java/cc/smtweb/system/canal/deployer/monitor/ManagerInstanceConfigMonitor.java @@ -1,6 +1,5 @@ package cc.smtweb.system.canal.deployer.monitor; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; @@ -78,9 +77,9 @@ public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle impleme } final List is = Lists.newArrayList(StringUtils.split(instances, ',')); - List start = new ArrayList<>(); - List stop = new ArrayList<>(); - List restart = new ArrayList<>(); + List start = Lists.newArrayList(); + List stop = Lists.newArrayList(); + List restart = Lists.newArrayList(); for (String instance : is) { if (!configs.containsKey(instance)) { PlainCanal newPlainCanal = configClient.findInstance(instance, null); diff --git a/smtweb-framework/canal/server/src/main/resources/logback.xml.bak b/smtweb-framework/canal/server/src/main/resources/logback.xml similarity index 94% rename from smtweb-framework/canal/server/src/main/resources/logback.xml.bak rename to smtweb-framework/canal/server/src/main/resources/logback.xml index 682e80f..309acdd 100644 --- a/smtweb-framework/canal/server/src/main/resources/logback.xml.bak +++ b/smtweb-framework/canal/server/src/main/resources/logback.xml @@ -79,39 +79,39 @@ - + - + - + - + - + - + - + - + - - + + diff --git a/smtweb-framework/canal/server/src/main/resources/spring/default-instance.xml b/smtweb-framework/canal/server/src/main/resources/spring/default-instance.xml index 9907ae8..add43b7 100644 --- a/smtweb-framework/canal/server/src/main/resources/spring/default-instance.xml +++ b/smtweb-framework/canal/server/src/main/resources/spring/default-instance.xml @@ -206,6 +206,5 @@ - diff --git a/smtweb-framework/canal/server/src/main/resources/spring/file-instance.xml b/smtweb-framework/canal/server/src/main/resources/spring/file-instance.xml index 800f981..65fab92 100644 --- a/smtweb-framework/canal/server/src/main/resources/spring/file-instance.xml +++ b/smtweb-framework/canal/server/src/main/resources/spring/file-instance.xml @@ -192,6 +192,5 @@ - diff --git a/smtweb-framework/canal/server/src/main/resources/spring/group-instance.xml b/smtweb-framework/canal/server/src/main/resources/spring/group-instance.xml index b0b887f..bd808b2 100644 --- a/smtweb-framework/canal/server/src/main/resources/spring/group-instance.xml +++ b/smtweb-framework/canal/server/src/main/resources/spring/group-instance.xml @@ -287,6 +287,5 @@ - diff --git a/smtweb-framework/canal/server/src/main/resources/spring/memory-instance.xml b/smtweb-framework/canal/server/src/main/resources/spring/memory-instance.xml index a7dc634..5ec293e 100644 --- a/smtweb-framework/canal/server/src/main/resources/spring/memory-instance.xml +++ b/smtweb-framework/canal/server/src/main/resources/spring/memory-instance.xml @@ -180,6 +180,5 @@ -