Skip to content

Commit

Permalink
optimize: optimize fileListener to decrease cpu time usage (apache#3222)
Browse files Browse the repository at this point in the history
  • Loading branch information
caohdgege committed Oct 28, 2020
1 parent ce41bce commit 80573c9
Showing 1 changed file with 31 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.net.URL;
import java.net.URLDecoder;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -73,6 +74,8 @@ public class FileConfiguration extends AbstractConfiguration {

private final String name;

private final FileListener fileListener = new FileListener();

private final boolean allowDynamicRefresh;

/**
Expand Down Expand Up @@ -230,8 +233,7 @@ public void addConfigListener(String dataId, ConfigurationChangeListener listene
listenedConfigMap.put(dataId, ConfigurationFactory.getInstance().getConfig(dataId));

// Start config change listener for the dataId.
FileListener fileListener = new FileListener(dataId, listener);
fileListener.onProcessEvent(new ConfigurationChangeEvent());
fileListener.addListener(dataId, listener);
}

@Override
Expand Down Expand Up @@ -333,39 +335,48 @@ private void setFailResult(ConfigFuture configFuture) {
*/
class FileListener implements ConfigurationChangeListener {

private final String dataId;
private final ConfigurationChangeListener listener;
private final Map<String, Set<ConfigurationChangeListener>> dataIdMap = new HashMap<>();

private final ExecutorService executor = new ThreadPoolExecutor(CORE_LISTENER_THREAD, MAX_LISTENER_THREAD, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
new NamedThreadFactory("fileListener", MAX_LISTENER_THREAD));

/**
* Instantiates a new FileListener.
*
* @param dataId the data id
* @param listener the listener
*/
public FileListener(String dataId, ConfigurationChangeListener listener) {
this.dataId = dataId;
this.listener = listener;
FileListener() {}

public synchronized void addListener(String dataId, ConfigurationChangeListener listener) {
// only the first time add listener will trigger on process event
if (dataIdMap.isEmpty()) {
fileListener.onProcessEvent(new ConfigurationChangeEvent());
}

dataIdMap .computeIfAbsent(dataId, value -> new HashSet<>()).add(listener);
}

@Override
public void onChangeEvent(ConfigurationChangeEvent event) {
while (true) {
try {
String currentConfig =
ConfigurationFactory.getInstance().getLatestConfig(dataId, null, DEFAULT_CONFIG_TIMEOUT);
if (StringUtils.isNotBlank(currentConfig)) {
String oldConfig = listenedConfigMap.get(dataId);
if (ObjectUtils.notEqual(currentConfig, oldConfig)) {
listenedConfigMap.put(dataId, currentConfig);
event.setDataId(dataId).setNewValue(currentConfig).setOldValue(oldConfig);
listener.onChangeEvent(event);
for (String dataId : dataIdMap.keySet()) {
try {
String currentConfig =
ConfigurationFactory.getInstance().getLatestConfig(dataId, null, DEFAULT_CONFIG_TIMEOUT);
if (StringUtils.isNotBlank(currentConfig)) {
String oldConfig = listenedConfigMap.get(dataId);
if (ObjectUtils.notEqual(currentConfig, oldConfig)) {
listenedConfigMap.put(dataId, currentConfig);
event.setDataId(dataId).setNewValue(currentConfig).setOldValue(oldConfig);

for (ConfigurationChangeListener listener : dataIdMap.get(dataId)) {
listener.onChangeEvent(event);
}
}
}
} catch (Exception exx) {
LOGGER.error("fileListener execute error, dataId :{}", dataId, exx);
}
} catch (Exception exx) {
LOGGER.error("fileListener execute error:{}", exx.getMessage());
}
try {
Thread.sleep(LISTENER_CONFIG_INTERVAL);
Expand Down

0 comments on commit 80573c9

Please sign in to comment.