From 2de2796819cd6c85ca6ab5e9ef82e6c8394359a6 Mon Sep 17 00:00:00 2001 From: eson Date: Wed, 15 Jun 2022 18:31:05 +0800 Subject: [PATCH] TODO: finish --- pom.xml | 1 - .../com/yuandian/dataflow/config/Config.java | 379 ++++++++++-------- .../yuandian/dataflow/config/ConfigTest.java | 111 +++++ 3 files changed, 315 insertions(+), 176 deletions(-) create mode 100644 src/test/java/com/yuandian/dataflow/config/ConfigTest.java diff --git a/pom.xml b/pom.xml index 0bdcd54..bc60ba6 100644 --- a/pom.xml +++ b/pom.xml @@ -191,7 +191,6 @@ - diff --git a/src/main/java/com/yuandian/dataflow/config/Config.java b/src/main/java/com/yuandian/dataflow/config/Config.java index a3eeed4..177e182 100644 --- a/src/main/java/com/yuandian/dataflow/config/Config.java +++ b/src/main/java/com/yuandian/dataflow/config/Config.java @@ -9,23 +9,16 @@ package com.yuandian.dataflow.config; -import java.io.FileInputStream; import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringBufferInputStream; -import java.io.StringReader; -import java.io.StringWriter; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.Executor; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.boot.system.SystemProperties; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.PropertySource; -import org.springframework.stereotype.Component; import org.yaml.snakeyaml.Yaml; import com.alibaba.nacos.api.NacosFactory; @@ -33,14 +26,13 @@ import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.Listener; import com.alibaba.nacos.api.exception.NacosException; -import com.google.api.client.http.InputStreamContent; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; /** - * description + * description nacos配置. * * @author eson *2022年6月13日-17:08:46 @@ -50,192 +42,229 @@ import lombok.extern.slf4j.Slf4j; @Setter public class Config { + public static String DEFAULT_CONFIT_FILE = "application.properties"; + public static String DEFAULT_CONFIG_ADDR = "yuandian.dataflow.config.nacos.server.addr"; + // public static String DEFAULT_CONFIG_DATAID = "yuandian.dataflow.config.nacos.dataid"; + // public static String DEFAULT_CONFIG_GROUP = "yuandian.dataflow.config.nacos.group";+ - public Map configMap; + // 默认 + public static String DEFAULT_GROUP_DATAID = "yuandian.dataflow"; + // 所有生成的nacos客户端 + private static HashMap configDict = new HashMap<>(); - @Value("${yuandian.dataflow.config.nacos.server.addr}") - public String serverAddr ; + // 配置的所有值主类 + public Map data; + // nacos地址 + public String serverAddr ; + // nacos dataId public String dataId ; + // nacos group public String group ; + // 线程安全配置锁 + private Lock datalock; - private static Config config; + // nacos 客户端类 private ConfigService configService; - private Config() throws NacosException, IOException { - - - - Properties prop = new Properties(); - - - - serverAddr = prop.getProperty("yuandian.dataflow.config.nacos.server.addr"); - dataId = prop.getProperty("yuandian.dataflow.config.nacos.dataid") ; - group = prop.getProperty("yuandian.dataflow.config.nacos.group"); - - Properties properties = new Properties(); - properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr); - configService = NacosFactory.createConfigService(properties); - - String content = configService.getConfig(dataId, group, 5000); - Yaml yaml = new Yaml(); - configMap = yaml.load(content); - log.info(content); - - configService.addListener(dataId, group, new Listener() { - @Override - public void receiveConfigInfo(String configInfo) { - System.out.println("recieve:" + configInfo); - synchronized(configMap){ - configMap = (Map)new Yaml().load(configInfo); - System.out.println(configMap); - } - } - - @Override - public Executor getExecutor() { - return null; - } - }); - - // boolean isPublishOk = configService.publishConfig(dataId, group, Prop.toString()); - // System.out.println(isPublishOk); - - // Thread.sleep(3000); - // content = configService.getConfig(dataId, group, 5000); - // System.out.println(content); - - // boolean isRemoveOk = configService.removeConfig(dataId, group); - // System.out.println(isRemoveOk); - // Thread.sleep(3000); - - // content = configService.getConfig(dataId, group, 5000); - // System.out.println(content); - // Thread.sleep(300000); + private Config(String GroupAndDataId) throws Exception { + String[] gad = GroupAndDataId.split("\\."); + if(gad.length != 2) { + throw new Exception("Group 或者 DataId 不能存在 '.' 的命令"); + } + this.group = gad[0] + ENV_TEST; + this.dataId = gad[1] + ENV_TEST; + connect(); } + /** + * 连接nacos + * @throws IOException + * @throws NacosException + */ + private void connect() throws IOException, NacosException { + + if(configService != null) { + configService.shutDown(); + configService = null; + } + + // 获取 app + Properties prop = new Properties(); + prop.load(Config.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIT_FILE)); + serverAddr = prop.getProperty(DEFAULT_CONFIG_ADDR + ENV_TEST).trim(); + // dataId = prop.getProperty(DEFAULT_CONFIG_DATAID).trim(); + // group = prop.getProperty(DEFAULT_CONFIG_GROUP).trim(); + + Properties properties = new Properties(); + properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr); + configService = NacosFactory.createConfigService(properties); + + String content = configService.getConfig(dataId, group, 5000); + Yaml yaml = new Yaml(); + data = yaml.load(content); + log.info(content); + + datalock = new ReentrantLock(); + // 监听 配置更新事件 + configService.addListener(dataId, group, new Listener() { + @Override + public void receiveConfigInfo(String configInfo) { + log.debug("recieve:" + configInfo); + try { + datalock.lock(); + data = (Map)new Yaml().load(configInfo); + log.debug("{}",data); + } finally { + datalock.unlock(); + } + } + + @Override + public Executor getExecutor() { + return null; + } + }); + + + } + + + /** + * 根据多个key获取yaml的值 keys 路径 + * @param keys 获取的key值 + * @return + */ + public Object get(String ...keys) { + var cur = data; + for(var i = 0; i < keys.length - 1;i++ ) { + var key = keys[i]; + cur = (Map) cur.get(key); + } + return cur.get(keys[keys.length - 1]); + } + + /** + * + * 用于定位keys的路径后的操作. 创建keys后赋值. 如果存在keys, 可以直接赋值, 不存在则直接报错 + * + * @author eson + *2022年6月15日-下午12:06:37 + */ + public class Operator { + Config config; + String[] keys; + + Operator(Config config, String[] keys) { + this.config = config; + this.keys = keys; + } + + /** + * 创建seek的key + * @return + */ + public Operator createKeys() { + var cur = config.data; + for(var i = 0; i < keys.length;i++ ) { + var key = keys[i]; + var vobj = cur.get(key); + if (vobj == null) { + vobj = new LinkedHashMap<>(); + cur.put(key, vobj); + } + cur = (Map) vobj; + } + return this; + } + + /** + * 定位后赋值 + * @param value + */ + public void set(Object value) { + var cur = config.data; + for(var i = 0; i < keys.length - 1;i++ ) { + var key = keys[i]; + cur = (Map) cur.get(key); + } + cur.put(keys[keys.length - 1], value); + } + } + + /** + * 定位. eg. seek("key1", "key2") + * @param keys + * @return + */ + public Operator seek(String ...keys) { + return new Operator(this, keys); + } + + + + /** + * 删除 key的值. 类型map操作. keys是一个路径 remove("a","b") --> {"a": {"b": 1}} 删除b + * @param keys + * @return + */ + public Object remove(String ...keys) { + var cur = data; + for(var i = 0; i < keys.length - 1;i++ ) { + var key = keys[i]; + cur = (Map) cur.get(key); + } + return cur.remove(keys[keys.length - 1]); + } + /** * 更新配置 * @return 返回是否发布成功. * @throws NacosException */ public Boolean update() throws NacosException { - return configService.publishConfig(dataId, group,new Yaml().dumpAsMap(configMap)); + return configService.publishConfig(dataId, group,new Yaml().dumpAsMap(data)); } + public static String ENV_TEST = ""; - + /** + * 统一使用配置的入口函数 线程安全 + * @param GroupAndDataID 使用配置的标签 eg."group.dataId" + * @param execute 匿名函数 + * @return 函数返回的值, 如果不需要直接返回null + * @throws Exception + */ + public static Object UseConfig(String GroupAndDataID, Function execute) throws Exception { + + Config cnf; + synchronized(configDict) { + cnf = configDict.get(GroupAndDataID); + if(cnf == null) { + cnf = new Config(GroupAndDataID); + configDict.put(GroupAndDataID, cnf); + } + } - - + try { + cnf.datalock.lock(); + var res = execute.apply(cnf); + return res; + } catch (Exception e) { + throw e; + } finally { + cnf.datalock.unlock(); + } + } /** - * - * @param execute 统一使用配置的入口 + * 统一使用配置的入口函数 线程安全 + * @param execute 统一使用配置的入口匿名方法 * @return 函数返回的值, 如果不需要直接返回null - * @throws NacosException - * @throws IOException + * @throws Exception */ - public static Object UseConfig(Function execute) throws NacosException, IOException { - if(config == null) { - config = new Config(); - } - synchronized(config) { - return execute.apply(config); - } + public static Object UseConfig(Function execute) throws Exception { + return UseConfig(DEFAULT_GROUP_DATAID, execute); } - // public Properties Prop ; - - // private static Config cnf ; - // public Config(Properties prop) { - // Prop = prop; - // } - - // public static Config GetConfig() throws NacosException, IOException { - // if(cnf == null) { - // cnf = new Config(); - // } - // return cnf; - // } - - - - - - public static void main(String[] args) throws NacosException, InterruptedException, IOException { - // String serverAddr = "192.168.1.113:8848"; - // String dataId = "dataflow"; - // String group = "yuandian"; - - // Properties properties = new Properties(); - // properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr); - // ConfigService configService = NacosFactory.createConfigService(properties); - - // String content = configService.getConfig(dataId, group, 5000); - // Yaml yaml = new Yaml(); - // cnf = yaml.load(content); - // log.info(content); - // // Properties cnf = new Properties(); - // // cnf.load(new StringReader(content)); - - // System.out.println(cnf); - - // configService.addListener(dataId, group, new Listener() { - // @Override - // public void receiveConfigInfo(String configInfo) { - // System.out.println("recieve:" + configInfo); - // synchronized(cnf){ - // cnf = (Map)new Yaml().load(configInfo); - // } - - // System.out.println(cnf); - // } - - // @Override - // public Executor getExecutor() { - // return null; - // } - // }); - - - - - // cnf.put("publish", "123423112332"); - - // log.info(yaml.dumpAsMap(cnf)); - - Config.UseConfig((cnf) -> { - log.info("{}",cnf.configMap); - cnf.configMap.put("use_config", "12"); - try { - log.info("{}",cnf.update()); - } catch (NacosException e) { - e.printStackTrace(); - } - return null; - }); - - - // boolean isPublishOk = configService.publishConfig(dataId, group, yaml.dumpAsMap(cnf)); - // System.out.println(isPublishOk); - - Thread.sleep(300000); - - // boolean isPublishOk = configService.publishConfig(dataId, group, "content"); - // System.out.println(isPublishOk); - - // Thread.sleep(3000); - // content = configService.getConfig(dataId, group, 5000); - // System.out.println(content); - - // boolean isRemoveOk = configService.removeConfig(dataId, group); - // System.out.println(isRemoveOk); - // Thread.sleep(3000); - - // content = configService.getConfig(dataId, group, 5000); - // System.out.println(content); - // Thread.sleep(300000); - }; } diff --git a/src/test/java/com/yuandian/dataflow/config/ConfigTest.java b/src/test/java/com/yuandian/dataflow/config/ConfigTest.java new file mode 100644 index 0000000..f7df421 --- /dev/null +++ b/src/test/java/com/yuandian/dataflow/config/ConfigTest.java @@ -0,0 +1,111 @@ +package com.yuandian.dataflow.config; + +import java.io.IOException; +import java.time.Instant; + +import org.junit.Assert; +import org.junit.FixMethodOrder; +import org.junit.experimental.theories.suppliers.TestedOn; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +import com.alibaba.nacos.api.exception.NacosException; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ConfigTest { + + + /** + * 测试配置基础用法 + * @throws Exception + */ + @Test + @Order(1) + void testUseConfig() throws Exception { + Config.ENV_TEST = "-test"; + + Config.UseConfig((cnf) -> { + log.info("{}",cnf.data); + Assert.assertEquals(cnf.get("key1", "key2"), "key_path"); + Instant now = Instant.now(); + cnf.data.put("use_config", now.toString()); + try { + log.info("{}",cnf.update()); + } catch (NacosException e) { + e.printStackTrace(); + } + return null; + }); + } + + @Test + @Order(2) + void testRemove() throws Exception { + Config.ENV_TEST = "-test"; + + Config.UseConfig((cnf) -> { + cnf.remove("create1", "create2"); + try { + log.info("{}",cnf.update()); // 所有增加删除操作要最后同步到nacos. 都需要update + } catch (NacosException e) { + e.printStackTrace(); + } + + cnf.remove("create1"); + try { + log.info("{}",cnf.update()); // 所有增加删除操作要最后同步到nacos. 都需要update + } catch (NacosException e) { + e.printStackTrace(); + } + return null; + }); + } + + + @Test + @Order(1) + void testCreateKeys() throws Exception { + Config.ENV_TEST = "-test"; + + Config.UseConfig((cnf) -> { + log.info("{}",cnf.data); + cnf.seek("create1", "create2", "create3").createKeys().set(Instant.now().toString());; + try { + log.info("{}",cnf.update()); // 所有增加删除操作要最后同步到nacos. 都需要update + } catch (NacosException e) { + e.printStackTrace(); + } + return null; + }); + } + + + + @Test + void testUpdate() throws Exception { + Config.ENV_TEST = "-test"; + + Config.UseConfig((cnf) -> { + Instant now = Instant.now(); + cnf.data.put("use_config", now.toString()); + try { + log.info("{}",cnf.update()); + } catch (NacosException e) { + e.printStackTrace(); + } + return null; + }); + } + + @Test + void testLabelConfig() throws Exception { + Config.UseConfig("org.fortest", (cnf)->{ + log.info("{}", cnf.get("test")); + Assert.assertEquals(cnf.get("test"), "groupAndDataId"); + return null; + }); + } +}