Skip to content

Commit

Permalink
test HttpCustomRuleFunction
Browse files Browse the repository at this point in the history
  • Loading branch information
junwen12221 committed May 27, 2022
1 parent 87644b4 commit 0a3e20f
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 13 deletions.
54 changes: 54 additions & 0 deletions example/src/test/java/io/mycat/assemble/UserTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@

import com.alibaba.druid.util.JdbcUtils;
import com.mysql.cj.jdbc.MysqlDataSource;
import io.mycat.config.ShardingBackEndTableInfoConfig;
import io.mycat.config.ShardingFunction;
import io.mycat.hint.CreateDataSourceHint;
import io.mycat.hint.CreateTableHint;
import io.mycat.router.custom.HttpCustomRuleFunction;
import io.mycat.router.mycat1xfunction.PartitionByRangeMod;
import io.mycat.util.NameMap;
import org.apache.groovy.util.Maps;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -62,4 +69,51 @@ public void testKill() throws Exception {
Assert.assertTrue(latch.getCount() == 0);
}
}

@Test
public void testHttpFunction() throws Exception {
try (Connection mycat = getMySQLConnection(DB_MYCAT);
Connection prototypeMysql = getMySQLConnection(DB1);) {
execute(mycat, RESET_CONFIG);
String db = "testSchema";
String tableName = "sharding";
execute(mycat, "drop database if EXISTS " + db);
execute(mycat, "create database " + db);
execute(mycat, "use " + db);

execute(mycat, CreateDataSourceHint
.create("dw0", DB1));
execute(mycat, CreateDataSourceHint
.create("dw1", DB2));

execute(prototypeMysql, "use mysql");

String shardingConfig = CreateTableHint
.createSharding(db, tableName,
"create table " + tableName + "(\n" +
"id int(11) NOT NULL AUTO_INCREMENT,\n" +
"user_id int(11) ,\n" +
"user_name varchar(128), \n" +
"PRIMARY KEY (`id`), \n" +
" GLOBAL INDEX `g_i_user_id`(`user_id`) COVERING (`user_name`) dbpartition by btree(`user_id`) \n" +
")ENGINE=InnoDB DEFAULT CHARSET=utf8 ",
ShardingBackEndTableInfoConfig.builder().build(),
ShardingFunction.builder()
.clazz(HttpCustomRuleFunction.class.getCanonicalName())
.properties(Maps.of(
"name", "test",
"shardingDbKeys", "",
"shardingTableKeys", "id",
"shardingTargetKeys", "",
"allScanPartitionTimeout", 5,
"fetchTimeout", 60000,
"routerServiceAddress", "http://127.0.0.1:9066/router_service_address"))
.build());
execute(
mycat,shardingConfig
);
hasData(mycat,db,tableName);
System.out.println(shardingConfig);
}
}
}
23 changes: 18 additions & 5 deletions mycat2/src/main/java/io/mycat/monitor/MycatSQLLogMonitorImpl.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package io.mycat.monitor;

import io.mycat.ExecutorUtil;
import io.mycat.IOExecutor;
import io.mycat.MetaClusterCurrent;
import io.mycat.NameableExecutor;
import io.mycat.*;
import io.mycat.config.MonitorConfig;
import io.mycat.config.SqlLogConfig;
import io.mycat.config.TimerConfig;
import io.mycat.router.function.IndexDataNode;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
Expand All @@ -19,6 +17,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -40,7 +39,7 @@ public class MycatSQLLogMonitorImpl extends MycatSQLLogMonitor {
public static final String SHOW_RW_MONITOR_URL = "/ShowRwMonitor";
public static final String QUERY_SQL_LOG = "/QuerySqlLog";
public static final String LOCKSERIVCE_URL = "/lockserivce";

public static final String ROUTER_SERVICE_URL = "/router_service_address";
Map<String, LockContext> lockMap = new ConcurrentHashMap<>();
NameableExecutor lockThread = ExecutorUtil.create("lock_server", 1);

Expand Down Expand Up @@ -72,6 +71,9 @@ public void handle(HttpServerRequest request) {
if (request.uri().startsWith(LOCKSERIVCE_URL.toLowerCase())) {
request.setExpectMultipart(true);
}
if (request.uri().startsWith(ROUTER_SERVICE_URL.toLowerCase())) {
request.setExpectMultipart(true);
}
request.endHandler(v -> {

lockThread.execute(new Runnable() {
Expand Down Expand Up @@ -129,6 +131,17 @@ public void run() {
LOGGER.error("", throwable);
res = throwable.getLocalizedMessage();
}
} else if (uri.startsWith(ROUTER_SERVICE_URL.toLowerCase())) {
try {

String tableName = formAttributes.get("tableName");
String schemaName = formAttributes.get("schemaName");
IndexDataNode indexDataNode = new IndexDataNode(MetadataManager.getPrototype(), schemaName, tableName, 0, 1, 1);
res = Collections.singletonList(indexDataNode);
} catch (Throwable throwable) {
LOGGER.error("", throwable);
res = throwable.getLocalizedMessage();
}
}
request.response().end(Json.encode(res));
} catch (Throwable throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,21 @@ public class HttpCustomRuleFunction extends CustomRuleFunction {

@Override
protected void init(ShardingTableHandler tableHandler, Map<String, Object> properties, Map<String, Object> ranges) {
this.properties = properties;
this.name = (String) properties.get("name");
this.shardingDbKeys = new HashSet<>(Arrays.asList(Objects.toString(properties.get("shardingDbKeys")).split(",")));
this.shardingTableKeys = new HashSet<>(Arrays.asList(Objects.toString(properties.get("shardingTableKeys")).split(",")));
this.shardingTargetKeys = new HashSet<>(Arrays.asList(Objects.toString(properties.get("shardingTargetKeys")).split(",")));
this.shardingDbKeys = new HashSet<>(Arrays.asList(Objects.toString(properties.getOrDefault("shardingDbKeys","")).split(",")));
this.shardingTableKeys = new HashSet<>(Arrays.asList(Objects.toString(properties.getOrDefault("shardingTableKeys","")).split(",")));
this.shardingTargetKeys = new HashSet<>(Arrays.asList(Objects.toString(properties.getOrDefault("shardingTargetKeys","")).split(",")));
this.erUniqueID = properties.toString();
this.allScanPartitions = fetchPartitions(Collections.emptyMap());

this.shardingTableType = ShardingTableType.computeByName(this.allScanPartitions);
this.requireShardingKeys = (Set) ImmutableSet.builder()
.addAll(this.shardingDbKeys)
.addAll(this.shardingTableKeys)
.addAll(this.shardingTargetKeys)
.build();
this.requireShardingKeyCount = this.requireShardingKeys.size();
this.properties = properties;



Number allScanPartitionTimeout = (Number) properties.getOrDefault("allScanPartitionTimeout", 5);
Expand Down Expand Up @@ -92,8 +93,8 @@ public List<Partition> calculate(Map<String, RangeVariable> values) {
private synchronized List<Partition> fetchPartitions(Map<String, RangeVariable> values) {
try {
ShardingTableHandler tableHandler = getTable();
String lock_server_url = (String) properties.getOrDefault("router_service_address", "http://localhost:9066/routerserivce");
long timeout = (Long) properties.getOrDefault("properties", 30L);
String router_service_address = (String) properties.getOrDefault("routerServiceAddress", "http://localhost:9066/routerserivce");
long timeout = Long.parseLong(properties.getOrDefault("fetchTimeout", 30L).toString());
OkHttpClient.Builder builder = new OkHttpClient.Builder();
if (timeout > 0) {
builder.connectTimeout(timeout, TimeUnit.MILLISECONDS);
Expand All @@ -109,7 +110,7 @@ private synchronized List<Partition> fetchPartitions(Map<String, RangeVariable>
.build();

final Request request = new Request.Builder()
.url(lock_server_url)
.url(router_service_address)
.post(body)
.build();
Call call = okHttpClient.newCall(request);
Expand Down

0 comments on commit 0a3e20f

Please sign in to comment.