Skip to content

Commit

Permalink
Major logical completion
Browse files Browse the repository at this point in the history
  • Loading branch information
springliao committed Jul 26, 2020
1 parent 3b5365c commit e90ac7e
Show file tree
Hide file tree
Showing 12 changed files with 386 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ public interface Serializer {

Map<String, Class<?>> CLASS_CACHE = new ConcurrentHashMap<>(8);

/**
* Deserialize the data.
*
* @param data byte[]
* @param <T> class type
* @return target object instance
*/
<T> T deserialize(byte[] data);

/**
* Deserialize the data.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ public class MetadataKey {

public static final String RAFT_GROUP_MEMBER = "raftGroupMember";

public static final String ERR_MSG = "errMsg";

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,24 @@ public class HessianSerializer implements Serializer {
public HessianSerializer() {
}

@Override
public <T> T deserialize(byte[] data) {
if (ByteUtils.isEmpty(data)) {
return null;
}

Hessian2Input input = new Hessian2Input(new ByteArrayInputStream(data));
input.setSerializerFactory(serializerFactory);
Object resultObject;
try {
resultObject = input.readObject();
input.close();
} catch (IOException e) {
throw new RuntimeException("IOException occurred when Hessian serializer decode!", e);
}
return (T) resultObject;
}

@Override
public <T> T deserialize(byte[] data, Class cls) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,14 @@ public void onEvent(RaftEvent event) {
final String leader = event.getLeader();
final Long term = event.getTerm();
final List<String> raftClusterInfo = event.getRaftClusterInfo();
final String errMsg = event.getErrMsg();

// Leader information needs to be selectively updated. If it is valid data,
// the information in the protocol metadata is updated.
MapUtils.putIfValNoEmpty(properties, MetadataKey.LEADER_META_DATA, leader);
MapUtils.putIfValNoNull(properties, MetadataKey.TERM_META_DATA, term);
MapUtils.putIfValNoEmpty(properties, MetadataKey.RAFT_GROUP_MEMBER, raftClusterInfo);
MapUtils.putIfValNoEmpty(properties, MetadataKey.ERR_MSG, errMsg);

value.put(groupId, properties);
metaData.load(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ public void onConfigurationCommitted(Configuration conf) {
public void onError(RaftException e) {
super.onError(e);
processor.onError(e);
NotifyCenter.publishEvent(
RaftEvent.builder().groupId(groupId).leader(leaderIp).term(term).raftClusterInfo(allPeers())
.errMsg(e.toString())
.build());
}

public boolean isLeader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class RaftEvent extends SlowEvent {

private Long term = null;

private String errMsg = "";

private List<String> raftClusterInfo = Collections.emptyList();

public static RaftEventBuilder builder() {
Expand Down Expand Up @@ -75,6 +77,14 @@ public void setRaftClusterInfo(List<String> raftClusterInfo) {
this.raftClusterInfo = raftClusterInfo;
}

public String getErrMsg() {
return errMsg;
}

public void setErrMsg(String errMsg) {
this.errMsg = errMsg;
}

@Override
public String toString() {
return "RaftEvent{" + "groupId='" + groupId + '\'' + ", leader='" + leader + '\'' + ", term=" + term
Expand All @@ -91,6 +101,8 @@ public static final class RaftEventBuilder {

private List<String> raftClusterInfo = Collections.emptyList();

private String errMsg = "";

private RaftEventBuilder() {
}

Expand All @@ -113,13 +125,19 @@ public RaftEventBuilder raftClusterInfo(List<String> raftClusterInfo) {
this.raftClusterInfo = raftClusterInfo;
return this;
}

public RaftEventBuilder errMsg(String errMsg) {
this.errMsg = errMsg;
return this;
}

public RaftEvent build() {
RaftEvent raftEvent = new RaftEvent();
raftEvent.setGroupId(groupId);
raftEvent.setLeader(leader);
raftEvent.setTerm(term);
raftEvent.setRaftClusterInfo(raftClusterInfo);
raftEvent.setErrMsg(errMsg);
return raftEvent;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.persistent;

import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.pojo.Record;
import org.jboss.netty.util.internal.ConcurrentHashMap;

import java.util.Map;
import java.util.function.Function;

/**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public final class PersistentNotifier extends Subscriber<ValueChangeEvent> {

private final Map<String, ConcurrentHashSet<RecordListener>> listenerMap = new ConcurrentHashMap<>();

private final Function<String, Record> find;

public PersistentNotifier(Function<String, Record> find) {
this.find = find;
}

public void registerListener(final String key, final RecordListener listener) {
listenerMap.computeIfAbsent(key, s -> new ConcurrentHashSet<>());
listenerMap.get(key).add(listener);
}

public void deregisterListener(final String key, final RecordListener listener) {
if (!listenerMap.containsKey(key)) {
return;
}
listenerMap.get(key).remove(listener);
}

public void addTask(final String key, final ApplyAction action) {

}

public int tasksSize() {
return 0;
}

public <T extends Record> void notify(final String key, final ApplyAction action, final T value) {
if (listenerMap.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) {

if (KeyBuilder.matchServiceMetaKey(key) && !KeyBuilder.matchSwitchKey(key)) {

for (RecordListener listener : listenerMap.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
try {
if (action == ApplyAction.CHANGE) {
listener.onChange(key, value);
}

if (action == ApplyAction.DELETE) {
listener.onDelete(key);
}
} catch (Throwable e) {
Loggers.RAFT
.error("[NACOS-RAFT] error while notifying listener of key: {}", key,
e);
}
}
}
}

if (!listenerMap.containsKey(key)) {
return;
}

for (RecordListener listener : listenerMap.get(key)) {
try {
if (action == ApplyAction.CHANGE) {
listener.onChange(key, value);
continue;
}

if (action == ApplyAction.DELETE) {
listener.onDelete(key);
}
} catch (Throwable e) {
Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", key, e);
}
}
}

@Override
public void onEvent(ValueChangeEvent event) {
notify(event.getKey(), event.getAction(), find.apply(event.getKey()));
}

@Override
public Class<? extends Event> subscribeType() {
return ValueChangeEvent.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.persistent;

import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.naming.pojo.Record;

/**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public class ValueChangeEvent extends Event {

private final String key;

private final Record value;

private final ApplyAction action;

public ValueChangeEvent(String key, Record value, ApplyAction action) {
this.key = key;
this.value = value;
this.action = action;
}

public String getKey() {
return key;
}

public Record getValue() {
return value;
}

public ApplyAction getAction() {
return action;
}

public static final class ValueChangeEventBuilder {

private String key;

private Record value;

private ApplyAction action;

private ValueChangeEventBuilder() {
}

public static ValueChangeEventBuilder aValueChangeEvent() {
return new ValueChangeEventBuilder();
}

public ValueChangeEventBuilder key(String key) {
this.key = key;
return this;
}

public ValueChangeEventBuilder value(Record value) {
this.value = value;
return this;
}

public ValueChangeEventBuilder action(ApplyAction action) {
this.action = action;
return this;
}

public ValueChangeEvent build() {
return new ValueChangeEvent(key, value, action);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@

package com.alibaba.nacos.naming.consistency.persistent.impl;

import java.util.ArrayList;
import java.util.List;

/**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public class BatchWriteRequest {

private List< byte[]> keys;
private List<byte[]> values;
private List< byte[]> keys = new ArrayList<>();
private List<byte[]> values = new ArrayList<>();

public List<byte[]> getKeys() {
return keys;
Expand All @@ -41,4 +42,9 @@ public List<byte[]> getValues() {
public void setValues(List<byte[]> values) {
this.values = values;
}

public void append(byte[] key, byte[] value) {
keys.add(key);
values.add(value);
}
}
Loading

0 comments on commit e90ac7e

Please sign in to comment.