diff --git a/example/src/test/java/io/mycat/assemble/UserTest.java b/example/src/test/java/io/mycat/assemble/UserTest.java index 136846121..b0c9a923a 100644 --- a/example/src/test/java/io/mycat/assemble/UserTest.java +++ b/example/src/test/java/io/mycat/assemble/UserTest.java @@ -2,7 +2,14 @@ import com.alibaba.druid.util.JdbcUtils; import com.mysql.cj.jdbc.MysqlDataSource; +import io.mycat.config.ShardingBackEndTableInfoConfig; +import io.mycat.config.ShardingFunction; +import io.mycat.hint.CreateDataSourceHint; +import io.mycat.hint.CreateTableHint; +import io.mycat.router.custom.HttpCustomRuleFunction; +import io.mycat.router.mycat1xfunction.PartitionByRangeMod; import io.mycat.util.NameMap; +import org.apache.groovy.util.Maps; import org.junit.Assert; import org.junit.Test; @@ -62,4 +69,51 @@ public void testKill() throws Exception { Assert.assertTrue(latch.getCount() == 0); } } + + @Test + public void testHttpFunction() throws Exception { + try (Connection mycat = getMySQLConnection(DB_MYCAT); + Connection prototypeMysql = getMySQLConnection(DB1);) { + execute(mycat, RESET_CONFIG); + String db = "testSchema"; + String tableName = "sharding"; + execute(mycat, "drop database if EXISTS " + db); + execute(mycat, "create database " + db); + execute(mycat, "use " + db); + + execute(mycat, CreateDataSourceHint + .create("dw0", DB1)); + execute(mycat, CreateDataSourceHint + .create("dw1", DB2)); + + execute(prototypeMysql, "use mysql"); + + String shardingConfig = CreateTableHint + .createSharding(db, tableName, + "create table " + tableName + "(\n" + + "id int(11) NOT NULL AUTO_INCREMENT,\n" + + "user_id int(11) ,\n" + + "user_name varchar(128), \n" + + "PRIMARY KEY (`id`), \n" + + " GLOBAL INDEX `g_i_user_id`(`user_id`) COVERING (`user_name`) dbpartition by btree(`user_id`) \n" + + ")ENGINE=InnoDB DEFAULT CHARSET=utf8 ", + ShardingBackEndTableInfoConfig.builder().build(), + ShardingFunction.builder() + .clazz(HttpCustomRuleFunction.class.getCanonicalName()) + .properties(Maps.of( + "name", "test", + "shardingDbKeys", "", + "shardingTableKeys", "id", + "shardingTargetKeys", "", + "allScanPartitionTimeout", 5, + "fetchTimeout", 60000, + "routerServiceAddress", "http://127.0.0.1:9066/router_service_address")) + .build()); + execute( + mycat,shardingConfig + ); + hasData(mycat,db,tableName); + System.out.println(shardingConfig); + } + } } diff --git a/mycat2/src/main/java/io/mycat/monitor/MycatSQLLogMonitorImpl.java b/mycat2/src/main/java/io/mycat/monitor/MycatSQLLogMonitorImpl.java index f0be3c377..d7d88d715 100644 --- a/mycat2/src/main/java/io/mycat/monitor/MycatSQLLogMonitorImpl.java +++ b/mycat2/src/main/java/io/mycat/monitor/MycatSQLLogMonitorImpl.java @@ -1,12 +1,10 @@ package io.mycat.monitor; -import io.mycat.ExecutorUtil; -import io.mycat.IOExecutor; -import io.mycat.MetaClusterCurrent; -import io.mycat.NameableExecutor; +import io.mycat.*; import io.mycat.config.MonitorConfig; import io.mycat.config.SqlLogConfig; import io.mycat.config.TimerConfig; +import io.mycat.router.function.IndexDataNode; import io.vertx.core.Handler; import io.vertx.core.MultiMap; import io.vertx.core.Promise; @@ -19,6 +17,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -40,7 +39,7 @@ public class MycatSQLLogMonitorImpl extends MycatSQLLogMonitor { public static final String SHOW_RW_MONITOR_URL = "/ShowRwMonitor"; public static final String QUERY_SQL_LOG = "/QuerySqlLog"; public static final String LOCKSERIVCE_URL = "/lockserivce"; - + public static final String ROUTER_SERVICE_URL = "/router_service_address"; Map lockMap = new ConcurrentHashMap<>(); NameableExecutor lockThread = ExecutorUtil.create("lock_server", 1); @@ -72,6 +71,9 @@ public void handle(HttpServerRequest request) { if (request.uri().startsWith(LOCKSERIVCE_URL.toLowerCase())) { request.setExpectMultipart(true); } + if (request.uri().startsWith(ROUTER_SERVICE_URL.toLowerCase())) { + request.setExpectMultipart(true); + } request.endHandler(v -> { lockThread.execute(new Runnable() { @@ -129,6 +131,17 @@ public void run() { LOGGER.error("", throwable); res = throwable.getLocalizedMessage(); } + } else if (uri.startsWith(ROUTER_SERVICE_URL.toLowerCase())) { + try { + + String tableName = formAttributes.get("tableName"); + String schemaName = formAttributes.get("schemaName"); + IndexDataNode indexDataNode = new IndexDataNode(MetadataManager.getPrototype(), schemaName, tableName, 0, 1, 1); + res = Collections.singletonList(indexDataNode); + } catch (Throwable throwable) { + LOGGER.error("", throwable); + res = throwable.getLocalizedMessage(); + } } request.response().end(Json.encode(res)); } catch (Throwable throwable) { diff --git a/router/src/main/java/io/mycat/router/custom/HttpCustomRuleFunction.java b/router/src/main/java/io/mycat/router/custom/HttpCustomRuleFunction.java index 3131250e7..a86476277 100644 --- a/router/src/main/java/io/mycat/router/custom/HttpCustomRuleFunction.java +++ b/router/src/main/java/io/mycat/router/custom/HttpCustomRuleFunction.java @@ -36,12 +36,13 @@ public class HttpCustomRuleFunction extends CustomRuleFunction { @Override protected void init(ShardingTableHandler tableHandler, Map properties, Map ranges) { + this.properties = properties; this.name = (String) properties.get("name"); - this.shardingDbKeys = new HashSet<>(Arrays.asList(Objects.toString(properties.get("shardingDbKeys")).split(","))); - this.shardingTableKeys = new HashSet<>(Arrays.asList(Objects.toString(properties.get("shardingTableKeys")).split(","))); - this.shardingTargetKeys = new HashSet<>(Arrays.asList(Objects.toString(properties.get("shardingTargetKeys")).split(","))); + this.shardingDbKeys = new HashSet<>(Arrays.asList(Objects.toString(properties.getOrDefault("shardingDbKeys","")).split(","))); + this.shardingTableKeys = new HashSet<>(Arrays.asList(Objects.toString(properties.getOrDefault("shardingTableKeys","")).split(","))); + this.shardingTargetKeys = new HashSet<>(Arrays.asList(Objects.toString(properties.getOrDefault("shardingTargetKeys","")).split(","))); this.erUniqueID = properties.toString(); - this.allScanPartitions = fetchPartitions(Collections.emptyMap()); + this.shardingTableType = ShardingTableType.computeByName(this.allScanPartitions); this.requireShardingKeys = (Set) ImmutableSet.builder() .addAll(this.shardingDbKeys) @@ -49,7 +50,7 @@ protected void init(ShardingTableHandler tableHandler, Map prope .addAll(this.shardingTargetKeys) .build(); this.requireShardingKeyCount = this.requireShardingKeys.size(); - this.properties = properties; + Number allScanPartitionTimeout = (Number) properties.getOrDefault("allScanPartitionTimeout", 5); @@ -92,8 +93,8 @@ public List calculate(Map values) { private synchronized List fetchPartitions(Map values) { try { ShardingTableHandler tableHandler = getTable(); - String lock_server_url = (String) properties.getOrDefault("router_service_address", "http://localhost:9066/routerserivce"); - long timeout = (Long) properties.getOrDefault("properties", 30L); + String router_service_address = (String) properties.getOrDefault("routerServiceAddress", "http://localhost:9066/routerserivce"); + long timeout = Long.parseLong(properties.getOrDefault("fetchTimeout", 30L).toString()); OkHttpClient.Builder builder = new OkHttpClient.Builder(); if (timeout > 0) { builder.connectTimeout(timeout, TimeUnit.MILLISECONDS); @@ -109,7 +110,7 @@ private synchronized List fetchPartitions(Map .build(); final Request request = new Request.Builder() - .url(lock_server_url) + .url(router_service_address) .post(body) .build(); Call call = okHttpClient.newCall(request);