From 0295f7ee4619368e41108f51ee73ec1c3268650f Mon Sep 17 00:00:00 2001 From: Irene Cho Date: Wed, 2 Aug 2017 16:30:48 -0400 Subject: [PATCH 01/16] temp do not push --- .../com/addthis/hydra/query/spawndatastore/AliasBiMap.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasBiMap.java b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasBiMap.java index e0ec141c6..096fe016b 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasBiMap.java +++ b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasBiMap.java @@ -76,7 +76,8 @@ public AliasBiMap(SpawnDataStore spawnDataStore) { alias2jobs = new HashMap<>(); job2alias = new HashMap<>(); /* The interval to refresh cached alias values */ - long cacheRefresh = Parameter.longValue("alias.bimap.refresh", 10000); +// long cacheRefresh = Parameter.longValue("alias.bimap.refresh", 10000); + long cacheRefresh = Parameter.longValue("alias.bimap.refresh", 20000); /* The expiration period for cache values. Off by default, but useful for testing. */ long cacheExpire = Parameter.longValue("alias.bimap.expire", -1); /* The max size of the alias cache */ From 17230d988dc153b8ee5f1aaa69b7002578329452 Mon Sep 17 00:00:00 2001 From: Irene Cho Date: Thu, 10 Aug 2017 09:54:20 -0400 Subject: [PATCH 02/16] init not push --- .../addthis/hydra/job/alias/AliasManager.java | 4 + .../hydra/job/alias/AliasManagerImpl.java | 245 ++++++++++++++- .../com/addthis/hydra/job/spawn/Spawn.java | 3 + .../hydra/job/store/AvailableCache.java | 22 +- .../hydra/job/store/DataStoreUtil.java | 4 +- .../addthis/hydra/query/MeshQueryMaster.java | 2 +- .../query/spawndatastore/AliasBiMap.java | 289 ------------------ .../query/spawndatastore/AliasCache.java | 127 ++++++++ .../spawndatastore/SpawnDataStoreHandler.java | 27 +- .../job/web/resources/AliasResourceTest.java | 62 ++++ ...MapTest.java => AliasManagerImplTest.java} | 48 +-- .../hydra/query/MeshQueryMasterTest.java | 9 + .../query/spawndatastore/AliasCacheTest.java | 121 ++++++++ .../SpawnDataStoreHandlerTest.java | 35 +++ hydra-uber/bin/local-stack.sh | 5 +- 15 files changed, 657 insertions(+), 346 deletions(-) delete mode 100644 hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasBiMap.java create mode 100644 hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java create mode 100644 hydra-main/src/test/java/com/addthis/hydra/job/web/resources/AliasResourceTest.java rename hydra-main/src/test/java/com/addthis/hydra/query/{AliasBiMapTest.java => AliasManagerImplTest.java} (67%) create mode 100644 hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java create mode 100644 hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManager.java b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManager.java index 5e1d95c88..0a139870a 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManager.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManager.java @@ -26,4 +26,8 @@ public interface AliasManager { List aliasToJobs(String alias); + List getJobs(String alias); + + String getLikelyAlias(String jobid); + } diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java index 6375a5b17..31960a93a 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java @@ -13,35 +13,90 @@ */ package com.addthis.hydra.job.alias; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +import java.io.IOException; +import java.io.StringWriter; + +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.locks.ReentrantLock; +import com.addthis.hydra.job.store.DataStoreUtil; import com.addthis.hydra.job.store.SpawnDataStore; -import com.addthis.hydra.query.spawndatastore.AliasBiMap; +import com.addthis.hydra.query.spawndatastore.AliasCache; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@ThreadSafe public class AliasManagerImpl implements AliasManager { private static final Logger log = LoggerFactory.getLogger(AliasManagerImpl.class); - private AliasBiMap aliasBiMap; + //private AliasBiMap aliasBiMap; + private Map aliases; + + public static final String ALIAS_PATH = "/query/alias"; + /* This SpawnDataStore must be the same type (zookeeper/priam) between Spawn and Mqmaster. This should + * be guaranteed by the implementation of DataStoreUtil. */ + private final SpawnDataStore spawnDataStore; + private final HashMap> alias2jobs; + private final HashMap job2alias; + public final ObjectMapper mapper; + private final ReentrantLock mapLock; + + @VisibleForTesting + public AliasManagerImpl() throws Exception{ + this.spawnDataStore = DataStoreUtil.makeCanonicalSpawnDataStore(); // added + this.mapLock = new ReentrantLock(); + this.mapper = new ObjectMapper(); + this.alias2jobs = new HashMap<>(); + this.job2alias = new HashMap<>(); + } + public AliasManagerImpl(SpawnDataStore spawnDataStore) { - this.aliasBiMap = new AliasBiMap(spawnDataStore); - aliasBiMap.loadCurrentValues(); +// this.aliasBiMap = new AliasBiMap(spawnDataStore); +// aliasBiMap.loadCurrentValues(); + this.spawnDataStore = spawnDataStore; // added + this.mapLock = new ReentrantLock(); + this.mapper = new ObjectMapper(); + this.alias2jobs = new HashMap<>(); + this.job2alias = new HashMap<>(); } /** * Returns a map describing alias name => jobIds */ public Map> getAliases() { - return aliasBiMap.viewAliasMap(); + aliases = spawnDataStore.getAllChildren(ALIAS_PATH); + + Map> alias2Jobs = new HashMap<>(); + + for (Map.Entry aliasEntry : aliases.entrySet()) { + String key = aliasEntry.getKey(); + List jobs = decodeAliases(aliasEntry.getValue()); + alias2Jobs.put(key, jobs); + } + return alias2Jobs; } + // getJobs public List aliasToJobs(String alias) { - return aliasBiMap.getJobs(alias); + return getJobs(alias); } /** @@ -51,14 +106,188 @@ public List aliasToJobs(String alias) { */ public void addAlias(String alias, List jobs) { if (jobs.size() > 0) { - aliasBiMap.putAlias(alias, jobs); + putAlias(alias, jobs); } else { log.warn("Ignoring empty jobs addition for alias: {}", alias); } } + // -------------------------------------------------- + public void putAlias(String alias, List jobs) { + mapLock.lock(); + try { + alias2jobs.put(alias, jobs); + job2alias.put(jobs.get(0), alias); + StringWriter sw = new StringWriter(); + mapper.writeValue(sw, jobs); + spawnDataStore.putAsChild(ALIAS_PATH, alias, sw.toString()); + } catch (Exception e) { + log.warn("failed to put alias: {}", alias, e); + throw Throwables.propagate(e); + } finally { + mapLock.unlock(); + } + } + + /** + * Get all jobIds for a given alias + * + * @param alias The alias to check + * @return A list of jobIds, possible null + */ + public List getJobs(String alias) { + refreshAlias(alias); + mapLock.lock(); + try { + return alias2jobs.get(alias); + } finally { + mapLock.unlock(); + } + + } + + /** + * Refresh an alias based on the latest cached value + * + * @param alias The alias to refresh + */ + private void refreshAlias(String alias) { + try { + String data = this.spawnDataStore.getChild(ALIAS_PATH, alias); + updateAlias(alias, data); + } catch (ExecutionException e) { + log.warn("Failed to refresh alias: {}", alias, e); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * Load the jobIds for a particular alias from the SpawnDataStore + * + * @param alias The alias key to check + * @return String The data that was updated (so the cache can be updated) + */ + @Nullable private String updateAlias(String alias, @Nullable String data) { + if (alias == null) { + return data; + } + if ((data == null) || data.isEmpty()) { + deleteAlias(alias); + return data; + } + List jobs = decodeAliases(data); + if (jobs.isEmpty()) { + log.warn("no jobs for alias {}, ignoring {}", alias, alias); + return data; + } + mapLock.lock(); + try { + alias2jobs.put(alias, jobs); + job2alias.put(jobs.get(0), alias); + } finally { + mapLock.unlock(); + } + return data; + } + + @VisibleForTesting public List decodeAliases(@Nonnull Object data) { + try { + return mapper.readValue(data.toString(), new TypeReference>() {}); + } catch (IOException e) { + log.warn("Failed to decode data", e); + return new ArrayList<>(0); + } + } + + /** + * Delete the data for a given alias + * + * @param alias The alias to check + */ + public void deleteAlias(String alias) { - aliasBiMap.deleteAlias(alias); + mapLock.lock(); + try { + List jobs = alias2jobs.get(alias); + alias2jobs.remove(alias); + if ((jobs != null) && !jobs.isEmpty()) { + for (String job : jobs) { + String aliasVal = job2alias.get(job); + if (Objects.equals(aliasVal, alias)) { + job2alias.remove(job); + } + } + } + } finally { + mapLock.unlock(); + } + spawnDataStore.deleteChild(ALIAS_PATH, alias); + + try { + String sJobsAll = spawnDataStore.getChild(ALIAS_PATH, alias); + System.out.println("sJobs from datastore for " + alias + " = " + sJobsAll); + } catch (Exception e) { + e.printStackTrace(); + } + + // todo: delete cache since refresh is initiated when query + AliasCache ac = null; + try { + ac = new AliasCache(); + ac.deleteAlias(alias); + + List jobs = ac.getJobs(alias); + if(jobs == null ) { + System.out.println("jobs is null"); + } + + if(jobs.size() == 0) { + System.out.println("jobs size is 0"); + } + + System.out.println("jobs after remove alias " + alias + " = " + jobs); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * Get an alias for a particular jobId + * + * @param jobid The jobId to check + * @return One of the aliases for that job + */ + public String getLikelyAlias(String jobid) { + mapLock.lock(); + try { + String tmpAlias = job2alias.get(jobid); + if (tmpAlias != null) { + // Check to see if the alias has been deleted + checkAlias(jobid, tmpAlias); + } + return job2alias.get(jobid); + } finally { + mapLock.unlock(); + } } + /** + * Test a job/alias pair to see if an alias has disappeared + * + * @param job The job to test + * @param alias The alias to check + */ + private void checkAlias(String job, String alias) { + mapLock.lock(); + try { + if (!alias2jobs.containsKey(alias) && job2alias.get(job).equals(alias)) { + job2alias.remove(job); + } + } finally { + mapLock.unlock(); + } + + } } diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/spawn/Spawn.java b/hydra-main/src/main/java/com/addthis/hydra/job/spawn/Spawn.java index 92101fd12..31e73390b 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/spawn/Spawn.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/spawn/Spawn.java @@ -1352,12 +1352,15 @@ public void updateJob(@Nullable IJob ijob, boolean reviseReplicas) throws Except } // take action on trigger changes (like # replicas) if ((oldjob != job) && reviseReplicas) { + int oldReplicaCount = oldjob.getReplicas(); int newReplicaCount = job.getReplicas(); + checkArgument((oldReplicaCount == newReplicaCount) || (job.getState() == JobState.IDLE) || (job.getState() == JobState.DEGRADED), "job must be IDLE or DEGRADED to change replicas"); checkArgument(newReplicaCount < hostManager.monitored.size(), "replication factor must be < # live hosts"); + rebalanceReplicas(job); } queueJobTaskUpdateEvent(job); diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/store/AvailableCache.java b/hydra-main/src/main/java/com/addthis/hydra/job/store/AvailableCache.java index 8ea19ceb7..d1f813ed8 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/store/AvailableCache.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/store/AvailableCache.java @@ -25,6 +25,8 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFutureTask; import com.google.common.util.concurrent.MoreExecutors; @@ -67,17 +69,27 @@ public AvailableCache(long refreshMillis, long expireMillis, int maxSize, int fe if (fetchThreads <= 0) { fetchThreads = 2; } + executor = new ThreadPoolExecutor( fetchThreads, fetchThreads, 1000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setNameFormat("avail-cache-%d").setDaemon(true).build()); //noinspection unchecked + + cacheBuilder.removalListener(new RemovalListener, Optional>() { + @Override + public void onRemoval(RemovalNotification, Optional> notification) { + System.out.println("<" + notification.getKey() + ", " + notification.getValue() + "> has been removed!"); + loadingCache.asMap().get(notification.getKey()); + } + }); + this.loadingCache = cacheBuilder.build(new CacheLoader>() { /** * If refreshAfterWrite is enabled, this method is called after returning the old value. * The new value will be inserted into the cache when the load() operation completes. */ @Override - public ListenableFuture> reload(final String key, Optional oldValue) { + public ListenableFuture> reload(final String key, Optional oldValue) throws Exception { ListenableFutureTask> task = ListenableFutureTask.create(() -> load(key)); executor.execute(task); return task; @@ -90,6 +102,10 @@ public Optional load(String key) throws Exception { }); } + public LoadingCache> getLoadingCache() { + return loadingCache; + } + /** * A possibly-lengthy operation to fetch the canonical value for a given id, such as by reading from a SpawnDataStore * @@ -118,6 +134,10 @@ public void clear() { loadingCache.invalidateAll(); } + public void cleanUp() { + loadingCache.cleanUp(); + } + @Override public void close() throws Exception { MoreExecutors.shutdownAndAwaitTermination(executor, 120, TimeUnit.SECONDS); } diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/store/DataStoreUtil.java b/hydra-main/src/main/java/com/addthis/hydra/job/store/DataStoreUtil.java index 963194d76..2efc0ec04 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/store/DataStoreUtil.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/store/DataStoreUtil.java @@ -27,7 +27,7 @@ import com.addthis.basis.util.Parameter; -import com.addthis.hydra.query.spawndatastore.AliasBiMap; +import com.addthis.hydra.job.alias.AliasManagerImpl; import com.google.common.collect.Lists; @@ -50,7 +50,7 @@ public class DataStoreUtil { /* A list of datastore paths with values that should be cutover */ private static final List pathsToImport = Arrays.asList(SPAWN_QUEUE_PATH, SPAWN_BALANCE_PARAM_PATH, SPAWN_HOST_FAIL_WORKER_PATH); /* A list of datastore parent nodes with children that should be cutover */ - private static final List parentsToImport = Arrays.asList(SPAWN_COMMON_COMMAND_PATH, SPAWN_COMMON_MACRO_PATH, SPAWN_JOB_CONFIG_PATH, AliasBiMap.ALIAS_PATH, SPAWN_COMMON_ALERT_PATH); + private static final List parentsToImport = Arrays.asList(SPAWN_COMMON_COMMAND_PATH, SPAWN_COMMON_MACRO_PATH, SPAWN_JOB_CONFIG_PATH, AliasManagerImpl.ALIAS_PATH, SPAWN_COMMON_ALERT_PATH); /* A list of nodes beneath each job node */ private static final List jobParametersToImport = Arrays.asList("config", "queryconfig", "tasks", "alerts"); /* A list of properties of certain job nodes that should be imported as flat values rather than children -- necessary for certain kafka broker info */ diff --git a/hydra-main/src/main/java/com/addthis/hydra/query/MeshQueryMaster.java b/hydra-main/src/main/java/com/addthis/hydra/query/MeshQueryMaster.java index 6d2fed86e..68a2eed06 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/query/MeshQueryMaster.java +++ b/hydra-main/src/main/java/com/addthis/hydra/query/MeshQueryMaster.java @@ -253,7 +253,7 @@ protected void writeQuery(ChannelHandlerContext ctx, Query query, ChannelPromise } } - private List expandAlias(String jobId) { + private List expandAlias(String jobId) throws ExecutionException { if (spawnDataStoreHandler != null) { return spawnDataStoreHandler.expandAlias(jobId); } else { diff --git a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasBiMap.java b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasBiMap.java deleted file mode 100644 index 096fe016b..000000000 --- a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasBiMap.java +++ /dev/null @@ -1,289 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.addthis.hydra.query.spawndatastore; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.annotation.concurrent.ThreadSafe; - -import java.io.IOException; -import java.io.StringWriter; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.locks.ReentrantLock; - -import com.addthis.basis.util.Parameter; - -import com.addthis.hydra.job.store.AvailableCache; -import com.addthis.hydra.job.store.DataStoreUtil; -import com.addthis.hydra.job.store.SpawnDataStore; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Throwables; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -@ThreadSafe -/** - * A class for maintaining a two-way map between aliases and jobIds, used by both Spawn and Mqmaster to stay in sync - */ -public class AliasBiMap { - - private static final Logger log = LoggerFactory.getLogger(AliasBiMap.class); - - public static final String ALIAS_PATH = "/query/alias"; - - /* This SpawnDataStore must be the same type (zookeeper/priam) between Spawn and Mqmaster. This should - * be guaranteed by the implementation of DataStoreUtil. */ - private final SpawnDataStore spawnDataStore; - private final HashMap> alias2jobs; - private final HashMap job2alias; - private final ObjectMapper mapper; - private final ReentrantLock mapLock; - private final AvailableCache mapCache; - - public AliasBiMap() throws Exception { - this(DataStoreUtil.makeCanonicalSpawnDataStore()); - loadCurrentValues(); - } - - public AliasBiMap(SpawnDataStore spawnDataStore) { - this.spawnDataStore = spawnDataStore; - mapLock = new ReentrantLock(); - mapper = new ObjectMapper(); - alias2jobs = new HashMap<>(); - job2alias = new HashMap<>(); - /* The interval to refresh cached alias values */ -// long cacheRefresh = Parameter.longValue("alias.bimap.refresh", 10000); - long cacheRefresh = Parameter.longValue("alias.bimap.refresh", 20000); - /* The expiration period for cache values. Off by default, but useful for testing. */ - long cacheExpire = Parameter.longValue("alias.bimap.expire", -1); - /* The max size of the alias cache */ - int cacheSize = Parameter.intValue("alias.bimap.cache.size", 1000); - mapCache = new AvailableCache(cacheRefresh, cacheExpire, cacheSize, 2) { - @Override public String fetchValue(String id) { - try { - return updateAlias(id, AliasBiMap.this.spawnDataStore.getChild(ALIAS_PATH, id)); - } catch (Exception e) { - log.warn("Exception when fetching alias: {}", id, e); - return updateAlias(id, null); - } - } - }; - } - - @VisibleForTesting - protected List decodeAliases(@Nonnull Object data) { - try { - return mapper.readValue(data.toString(), new TypeReference>() {}); - } catch (IOException e) { - log.warn("Failed to decode data", e); - return new ArrayList<>(0); - } - } - - /** - * Read from the SpawnDataStore to determine the newest values of the alias map - */ - public void loadCurrentValues() { - mapLock.lock(); - try { - alias2jobs.clear(); - job2alias.clear(); - Map aliases = spawnDataStore.getAllChildren(ALIAS_PATH); - if ((aliases == null) || aliases.isEmpty()) { - log.warn("No aliases found, unless this is on first cluster startup something is probably wrong"); - return; - } - mapCache.clear(); - for (Map.Entry aliasEntry : aliases.entrySet()) { - mapCache.put(aliasEntry.getKey(), aliasEntry.getValue()); - updateAlias(aliasEntry.getKey(), aliasEntry.getValue()); - } - } finally { - mapLock.unlock(); - } - } - - /** - * Refresh an alias based on the latest cached value - * - * @param alias The alias to refresh - */ - private void refreshAlias(String alias) { - try { - updateAlias(alias, mapCache.get(alias)); - } catch (ExecutionException e) { - log.warn("Failed to refresh alias: {}", alias, e); - } - } - - /** - * Load the jobIds for a particular alias from the SpawnDataStore - * - * @param alias The alias key to check - * @return String The data that was updated (so the cache can be updated) - */ - @Nullable private String updateAlias(String alias, @Nullable String data) { - if (alias == null) { - return data; - } - if ((data == null) || data.isEmpty()) { - deleteAlias(alias); - return data; - } - List jobs = decodeAliases(data); - if (jobs.isEmpty()) { - log.warn("no jobs for alias {}, ignoring {}", alias, alias); - return data; - } - mapLock.lock(); - try { - alias2jobs.put(alias, jobs); - job2alias.put(jobs.get(0), alias); - } finally { - mapLock.unlock(); - } - return data; - } - - /** - * Get an unmodifiable view of the current Alias map - * - * @return A map describing alias name => jobIds - */ - public Map> viewAliasMap() { - mapLock.lock(); - try { - return Collections.unmodifiableMap(alias2jobs); - } finally { - mapLock.unlock(); - } - } - - /** - * Update the SpawnDataStore with a new alias value - * - * @param alias The alias to add/change - * @param jobs The jobs to store under that alias - */ - public void putAlias(String alias, List jobs) { - mapLock.lock(); - try { - alias2jobs.put(alias, jobs); - job2alias.put(jobs.get(0), alias); - StringWriter sw = new StringWriter(); - mapper.writeValue(sw, jobs); - spawnDataStore.putAsChild(ALIAS_PATH, alias, sw.toString()); - } catch (Exception e) { - log.warn("failed to put alias: {}", alias, e); - throw Throwables.propagate(e); - } finally { - mapLock.unlock(); - } - } - - /** - * Delete the data for a given alias - * - * @param alias The alias to check - */ - public void deleteAlias(String alias) { - mapLock.lock(); - try { - List jobs = alias2jobs.get(alias); - alias2jobs.remove(alias); - if ((jobs != null) && !jobs.isEmpty()) { - for (String job : jobs) { - String aliasVal = job2alias.get(job); - if (Objects.equals(aliasVal, alias)) { - job2alias.remove(job); - } - } - } - } finally { - mapLock.unlock(); - } - spawnDataStore.deleteChild(ALIAS_PATH, alias); - mapCache.remove(alias); - } - - /** - * Test a job/alias pair to see if an alias has disappeared - * - * @param job The job to test - * @param alias The alias to check - */ - private void checkAlias(String job, String alias) { - mapLock.lock(); - try { - if (!alias2jobs.containsKey(alias) && job2alias.get(job).equals(alias)) { - job2alias.remove(job); - } - } finally { - mapLock.unlock(); - } - - } - - /** - * Get all jobIds for a given alias - * - * @param alias The alias to check - * @return A list of jobIds, possible null - */ - public List getJobs(String alias) { - refreshAlias(alias); - mapLock.lock(); - try { - return alias2jobs.get(alias); - } finally { - mapLock.unlock(); - } - - } - - /** - * Get an alias for a particular jobId - * - * @param jobid The jobId to check - * @return One of the aliases for that job - */ - public String getLikelyAlias(String jobid) { - mapLock.lock(); - try { - String tmpAlias = job2alias.get(jobid); - if (tmpAlias != null) { - // Check to see if the alias has been deleted - checkAlias(jobid, tmpAlias); - } - return job2alias.get(jobid); - } finally { - mapLock.unlock(); - } - - } - -} diff --git a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java new file mode 100644 index 000000000..9f2a594a9 --- /dev/null +++ b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java @@ -0,0 +1,127 @@ +package com.addthis.hydra.query.spawndatastore; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import com.addthis.basis.util.Parameter; + +import com.addthis.hydra.job.store.AvailableCache; +import com.addthis.hydra.job.store.DataStoreUtil; +import com.addthis.hydra.job.store.SpawnDataStore; + +import com.google.common.base.Strings; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AliasCache { + private static final Logger log = LoggerFactory.getLogger(AliasCache.class); + + /* The interval to refresh cached alias values */ + private static long DEFAULT_REFRESH_INTERVAL = Parameter.longValue("alias.bimap.refresh", 200); + /* The expiration period for cache values. Off by default, but useful for testing. */ + private static long DEFAULT_CACHE_EXPIRE = Parameter.longValue("alias.bimap.expire", -1); + /* The max size of the alias cache */ + private static int DEFAULT_CACHE_SIZE = Parameter.intValue("alias.bimap.cache.size", 1000); + + private static final String ALIAS_PATH = "/query/alias"; + + private SpawnDataStore spawnDataStore; + private HashMap> alias2jobs; + + private final AvailableCache> mapCache; + private final List jobs = new ArrayList<>(); + + // System.out.println("sJobs = " + sJobs); // sJobs = ["job11","job12"] + public AliasCache() throws Exception { + spawnDataStore = DataStoreUtil.makeCanonicalSpawnDataStore(); + + mapCache = new AvailableCache>(DEFAULT_REFRESH_INTERVAL, DEFAULT_CACHE_EXPIRE, DEFAULT_CACHE_SIZE, 2) { + + @Override public List fetchValue(String id) { + + String child; + try { + child = spawnDataStore.getChild(ALIAS_PATH, id); + if(child == null) { + return null; + } + } catch (Exception e) { + e.printStackTrace(); + return null; + } + + try { + String sJobs = getJobsFromDatastore(id, spawnDataStore.getChild(ALIAS_PATH, id)); + ObjectMapper mapper = new ObjectMapper(); + + if(Strings.isNullOrEmpty(sJobs.toString())) { + System.out.println("sJobs is empty !"); + return null; + } else { + System.out.println("sJobs in cache = " + sJobs); + } + + List jobs = mapper.readValue(sJobs.toString(), new TypeReference>() {}); + return jobs; + } catch (Exception e) { + log.warn("Exception when fetching alias: {}", id, e); + try { + return null; + } catch (Exception e1) { + e1.printStackTrace(); + } + } + return null; + } + }; + } + + public List getJobs(String alias) throws ExecutionException { + mapCache.cleanUp(); + + + List jobs = mapCache.get(alias); + if(jobs == null || jobs.size() == 0) { + System.out.println("not jobs for alias " + alias); + return null; + } else { + System.out.println("AliasCache getJobs = " + alias); + return jobs; + } + } + + public void deleteAlias(String alias) { + mapCache.remove(alias); + mapCache.cleanUp(); + } + + @Nullable public String getJobsFromDatastore(String alias, @Nullable String data) throws Exception { + if (alias == null) { + return data; + } + if ((data == null) || data.isEmpty()) { + mapCache.remove(alias); + return data; + } + String jobs = spawnDataStore.getChild(ALIAS_PATH, alias); + if(Strings.isNullOrEmpty(jobs)) { + System.out.println(alias + " has been deleted from datastore !!!"); + return null; + } + + return jobs; + } + + public AvailableCache> getMapCache() { + return mapCache; + } + +} \ No newline at end of file diff --git a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandler.java b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandler.java index 2476969d3..e0d24e4d1 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandler.java +++ b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandler.java @@ -30,8 +30,11 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -public class SpawnDataStoreHandler implements AutoCloseable { +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +public class SpawnDataStoreHandler implements AutoCloseable { + private static final Logger log = LoggerFactory.getLogger(SpawnDataStoreHandler.class); /** * A ZooKeeper/Priam backed data structure that keeps track of * the Hydra jobs in the cluster. We are specifically @@ -57,7 +60,7 @@ public class SpawnDataStoreHandler implements AutoCloseable { * A ZooKeeper backed data structure that maintains a * bi-directional mapping of job aliases to job IDs */ - private final AliasBiMap aliasBiMap; + private final AliasCache aliasCache; /** * a cache of job configuration data, used to reduce load placed on ZK server with high volume queries @@ -76,30 +79,38 @@ public SpawnDataStoreHandler() throws Exception { spawnDataStore = DataStoreUtil.makeCanonicalSpawnDataStore(); this.jobConfigManager = new JobConfigManager(spawnDataStore); this.queryConfigWatcher = new QueryConfigWatcher(spawnDataStore); - this.aliasBiMap = new AliasBiMap(spawnDataStore); - this.aliasBiMap.loadCurrentValues(); + this.aliasCache = new AliasCache(); +// this.aliasCache.loadCurrentValues(); } @Override public void close() { spawnDataStore.close(); } + public AliasCache getAliasCache() { + return aliasCache; + } + public void validateJobForQuery(String job) { if (!queryConfigWatcher.safeToQuery(job)) { throw new QueryException("job is not safe to query (are queries enabled for this job in spawn?): " + job); } } - public List expandAlias(String job) { - List possibleJobs = aliasBiMap.getJobs(job); +// Query master does't need the two maps +// it can just rely on the cache which refresh the values by loading from the spawn data store. + // todo: get jobs from cache + // todo: old + public List expandAlias(String job) throws ExecutionException { + List possibleJobs = aliasCache.getJobs(job); if ((possibleJobs != null) && !possibleJobs.isEmpty()) { return possibleJobs; } return Collections.singletonList(job); } - public String resolveAlias(String job) { - List possibleJobs = aliasBiMap.getJobs(job); + public String resolveAlias(String job) throws ExecutionException { + List possibleJobs = aliasCache.getJobs(job); if ((possibleJobs != null) && !possibleJobs.isEmpty()) { return possibleJobs.get(0); } diff --git a/hydra-main/src/test/java/com/addthis/hydra/job/web/resources/AliasResourceTest.java b/hydra-main/src/test/java/com/addthis/hydra/job/web/resources/AliasResourceTest.java new file mode 100644 index 000000000..d9b644589 --- /dev/null +++ b/hydra-main/src/test/java/com/addthis/hydra/job/web/resources/AliasResourceTest.java @@ -0,0 +1,62 @@ +package com.addthis.hydra.job.web.resources; + +import java.util.List; +import java.util.stream.IntStream; + +import com.addthis.basis.kv.KVPairs; + +import com.addthis.hydra.job.alias.AliasManager; + +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; + +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +public class AliasResourceTest { + + AliasManager aliasManager; + private KVPairs kv; + + @Before + public void setUp() throws Exception { + aliasManager = mock(AliasManager.class); + kv = new KVPairs(); + } + + /* + + query Request URL:http://adq05:2222/query/call?path=%2F(25)%2B%3A%2Bcount%2C%2Bnodes%2C%2Bmem&ops=gather%3Dksaau%3Bsort%3D0%3As%3Aa&format=json&job=8939260c-395c-4355-b5a0-3ca23c4a113c&sender=spawn&nocache=1 + path=%2F(25)%2B%3A%2Bcount%2C%2Bnodes%2C%2Bmem&ops=gather%3Dksaau%3Bsort%3D0%3As%3Aa&format=json&job=8939260c-395c-4355-b5a0-3ca23c4a113c&sender=spawn&nocache=1 + + path:/(25)+:+count,+nodes,+mem + ops:gather=ksaau;sort=0:s:a + format:json + job:8939260c-395c-4355-b5a0-3ca23c4a113c + sender:spawn + nocache:1 + +http://spawn-iad:5052/alias/save + +name=test-alias1&jobs=8939260c-395c-4355-b5a0-3ca23c4a113c + */ + + @Test + public void postAlias() throws Exception { + kv.add("name", "test-alias1"); + kv.add("jobs", "8939260c-395c-4355-b5a0-3ca23c4a113c"); + + List jobs = Lists.newArrayList(Splitter.on(',').split(kv.getValue("jobs"))); + + IntStream.rangeClosed(1, 1000).forEach(i -> { + System.out.println(i); + aliasManager.addAlias(kv.getValue("name"), jobs); + }); + + + + } + +} \ No newline at end of file diff --git a/hydra-main/src/test/java/com/addthis/hydra/query/AliasBiMapTest.java b/hydra-main/src/test/java/com/addthis/hydra/query/AliasManagerImplTest.java similarity index 67% rename from hydra-main/src/test/java/com/addthis/hydra/query/AliasBiMapTest.java rename to hydra-main/src/test/java/com/addthis/hydra/query/AliasManagerImplTest.java index a55b9dc28..06d66a0f7 100644 --- a/hydra-main/src/test/java/com/addthis/hydra/query/AliasBiMapTest.java +++ b/hydra-main/src/test/java/com/addthis/hydra/query/AliasManagerImplTest.java @@ -1,59 +1,41 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package com.addthis.hydra.query; -import com.addthis.basis.test.SlowTest; - -import com.addthis.hydra.query.spawndatastore.AliasBiMap; -import com.addthis.hydra.util.ZkCodecStartUtil; +import com.addthis.hydra.job.alias.AliasManager; +import com.addthis.hydra.job.alias.AliasManagerImpl; import com.google.common.collect.ImmutableList; import org.junit.Test; -import org.junit.experimental.categories.Category; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -@Category(SlowTest.class) -public class AliasBiMapTest extends ZkCodecStartUtil { - +public class AliasManagerImplTest { @Test public void testConstruction() throws Exception { - AliasBiMap abm = new AliasBiMap(); + AliasManager abm = new AliasManagerImpl(); assertNull(abm.getJobs("foo")); assertNull(abm.getLikelyAlias("foo")); } @Test public void testInit() throws Exception { - AliasBiMap abm = new AliasBiMap(); + AliasManagerImpl abm = new AliasManagerImpl(); assertNull(abm.getJobs("foo")); assertNull(abm.getLikelyAlias("foo")); } @Test public void testInit2() throws Exception { - AliasBiMap abm = new AliasBiMap(); + AliasManagerImpl abm = new com.addthis.hydra.job.alias.AliasManagerImpl(); assertNull(abm.getJobs("foo")); assertNull(abm.getLikelyAlias("foo")); } @Test public void testLocalGetPut() throws Exception { - AliasBiMap abm = new AliasBiMap(); + AliasManagerImpl abm = new com.addthis.hydra.job.alias.AliasManagerImpl(); assertNull(abm.getJobs("foo")); abm.putAlias("a1", ImmutableList.of("j1", "j2")); assertEquals(ImmutableList.of("j1", "j2"), abm.getJobs("a1")); @@ -64,14 +46,13 @@ public void testLocalGetPut() throws Exception { assertNull(abm.getLikelyAlias("j1")); } - @Test public void testPropagation() throws Exception { - System.setProperty("alias.bimap.expire", "50"); - AliasBiMap abm = new AliasBiMap(); +// System.setProperty("alias.bimap.expire", "50"); + AliasManagerImpl abm = new com.addthis.hydra.job.alias.AliasManagerImpl(); abm.putAlias("a1", ImmutableList.of("j1", "j2")); - AliasBiMap r_abm = new AliasBiMap(); + AliasManagerImpl r_abm = new com.addthis.hydra.job.alias.AliasManagerImpl(); assertEquals(ImmutableList.of("j1", "j2"), r_abm.getJobs("a1")); assertEquals("a1", r_abm.getLikelyAlias("j1")); @@ -94,9 +75,8 @@ public void testPropagation() throws Exception { @Test public void updateLoop() throws Exception { System.setProperty("alias.bimap.refresh", "500"); - AliasBiMap abm = new AliasBiMap(); - - AliasBiMap r_abm = new AliasBiMap(); + AliasManagerImpl abm = new com.addthis.hydra.job.alias.AliasManagerImpl(); + AliasManagerImpl r_abm = new com.addthis.hydra.job.alias.AliasManagerImpl(); for (int i = 0; i < 5; i++) { int retries = 10; @@ -113,6 +93,4 @@ public void updateLoop() throws Exception { assertTrue("failed to register updates after retrying", succeeded); } } - - -} \ No newline at end of file +} diff --git a/hydra-main/src/test/java/com/addthis/hydra/query/MeshQueryMasterTest.java b/hydra-main/src/test/java/com/addthis/hydra/query/MeshQueryMasterTest.java index 741969117..66333a885 100644 --- a/hydra-main/src/test/java/com/addthis/hydra/query/MeshQueryMasterTest.java +++ b/hydra-main/src/test/java/com/addthis/hydra/query/MeshQueryMasterTest.java @@ -288,6 +288,13 @@ public void requestedTaskValidation_allowPartialOn() { verifyTaskValidationFail("all req missing", tasks(0, 2), tasks(1), true); } + // todo + @Test + public void expandAliasTest() { + + } + + private Set tasks(Integer... args) { return Sets.newHashSet(args); } @@ -309,4 +316,6 @@ private void verifyTaskValidationFail(String failMsg, Set avail, Set 0) { + if (ImmutableList.of(is).equals(ac.getJobs("a1"))) { + succeeded = true; + break; + } + Thread.sleep(500); + } + assertTrue("failed to register updates after retrying", succeeded); + } + } + + +} + diff --git a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java new file mode 100644 index 000000000..360c80105 --- /dev/null +++ b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java @@ -0,0 +1,35 @@ +package com.addthis.hydra.query.spawndatastore; + +import com.addthis.hydra.job.alias.AliasManagerImpl; + +import com.google.common.collect.ImmutableList; + +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class SpawnDataStoreHandlerTest { + private AliasCache ac; + private AliasManagerImpl abm; + + @Before + public void setUp() throws Exception { + ac = new AliasCache(); + abm = new AliasManagerImpl(); + abm.putAlias("a1", ImmutableList.of("j11", "j12")); + Thread.sleep(350); + } + + @Test + public void testExpandAlias() throws Exception { + SpawnDataStoreHandler spawnDataStoreHandler = new SpawnDataStoreHandler(); + assertEquals(ImmutableList.of("j11", "j12"), spawnDataStoreHandler.expandAlias("a1")); + } + + @Test + public void testResolveAlias() throws Exception { + SpawnDataStoreHandler spawnDataStoreHandler = new SpawnDataStoreHandler(); + assertEquals("j11", spawnDataStoreHandler.resolveAlias("a1")); + } +} \ No newline at end of file diff --git a/hydra-uber/bin/local-stack.sh b/hydra-uber/bin/local-stack.sh index 5606bd6b8..56ce60323 100755 --- a/hydra-uber/bin/local-stack.sh +++ b/hydra-uber/bin/local-stack.sh @@ -128,7 +128,8 @@ export MQ_MASTER_OPT="${LOG4J_PROPERTIES} -Xmx1284M -Deps.mem.debug=10000 -Dje.m export MQ_WORKER_OPT="${LOG4J_PROPERTIES} -Xmx1284M -Dmesh.local.handlers=com.addthis.hydra.data.query.source.MeshQuerySource \ -Dmeshy.stream.prefetch=true -Dmeshy.senders=1 -Dcom.addthis.hydra.query.MeshQueryWorker.bindAddress=5101 \ -Dcom.addthis.hydra.query.MeshQueryWorker.root=${HYDRA_LOCAL_DIR} \ --Dcom.addthis.hydra.query.MeshQueryWorker.peers=localhost:5100" +-Dcom.addthis.hydra.query.MeshQueryWorker.peers=localhost:5100 \ +-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005" export MINION_OPT="${LOG4J_PROPERTIES} -Xmx512M -Dminion.mem=512 -Dminion.localhost=localhost -Dminion.group=local \ -Dminion.web.port=0 -Dspawn.localhost=localhost -Dhttp.post.max=327680 -Dminion.sparse.updates=1 \ @@ -143,7 +144,7 @@ export SPAWN_OPT="-Xmx512M ${LOG4J_PROPERTIES} -Dspawn.localhost=localhost -Dspa -Dcom.addthis.hydra.job.web.SpawnServiceConfiguration.keyManagerPassword=$HYDRA_LOCAL_DIR/cert/keystore.password \ -Dcom.addthis.hydra.job.web.SpawnServiceConfiguration.keyStorePath=$HYDRA_LOCAL_DIR/cert/keystore.jks \ -Dcom.addthis.hydra.job.web.SpawnServiceConfiguration.defaultSSL=false \ --agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005" +-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006" export MESHY_OPT="-Xmx128M -Xms128M ${LOG4J_PROPERTIES} -Dmeshy.autoMesh=false -Dmeshy.throttleLog=true \ -Dmeshy.buffers.enable=true -Dmeshy.stream.maxopen=10000" From 11aedfa174dd0da1d953487374615854cee5da31 Mon Sep 17 00:00:00 2001 From: Irene Cho Date: Thu, 10 Aug 2017 15:20:21 -0400 Subject: [PATCH 03/16] do not push --- .../main/java/com/addthis/hydra/job/alias/AliasManager.java | 2 ++ .../java/com/addthis/hydra/job/alias/AliasManagerImpl.java | 5 ++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManager.java b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManager.java index 0a139870a..3aaba252c 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManager.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManager.java @@ -30,4 +30,6 @@ public interface AliasManager { String getLikelyAlias(String jobid); + void putAlias(String alias, List jobs); + } diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java index 31960a93a..22ced693f 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java @@ -58,7 +58,6 @@ public class AliasManagerImpl implements AliasManager { public final ObjectMapper mapper; private final ReentrantLock mapLock; - @VisibleForTesting public AliasManagerImpl() throws Exception{ this.spawnDataStore = DataStoreUtil.makeCanonicalSpawnDataStore(); // added this.mapLock = new ReentrantLock(); @@ -67,9 +66,7 @@ public AliasManagerImpl() throws Exception{ this.job2alias = new HashMap<>(); } - public AliasManagerImpl(SpawnDataStore spawnDataStore) { -// this.aliasBiMap = new AliasBiMap(spawnDataStore); // aliasBiMap.loadCurrentValues(); this.spawnDataStore = spawnDataStore; // added this.mapLock = new ReentrantLock(); @@ -240,10 +237,12 @@ public void deleteAlias(String alias) { List jobs = ac.getJobs(alias); if(jobs == null ) { System.out.println("jobs is null"); + return; } if(jobs.size() == 0) { System.out.println("jobs size is 0"); + return; } System.out.println("jobs after remove alias " + alias + " = " + jobs); From fa367eb9e38079d24656d63be9782851a04949b9 Mon Sep 17 00:00:00 2001 From: Irene Cho Date: Thu, 10 Aug 2017 16:49:39 -0400 Subject: [PATCH 04/16] initCommit-only query master reads alias cacheref T68833 --- .../addthis/hydra/job/alias/AliasManager.java | 1 - .../hydra/job/alias/AliasManagerImpl.java | 64 ++++------ .../hydra/job/store/AvailableCache.java | 16 +-- .../query/spawndatastore/AliasCache.java | 110 ++++++++++-------- .../spawndatastore/SpawnDataStoreHandler.java | 12 +- .../hydra/query/AliasManagerImplTest.java | 96 --------------- .../query/spawndatastore/AliasCacheTest.java | 95 +++++---------- .../SpawnDataStoreHandlerTest.java | 28 ++++- 8 files changed, 144 insertions(+), 278 deletions(-) delete mode 100644 hydra-main/src/test/java/com/addthis/hydra/query/AliasManagerImplTest.java diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManager.java b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManager.java index 3aaba252c..b3e2c62b1 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManager.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManager.java @@ -31,5 +31,4 @@ public interface AliasManager { String getLikelyAlias(String jobid); void putAlias(String alias, List jobs); - } diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java index 22ced693f..4fe61bdd9 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java @@ -33,6 +33,7 @@ import com.addthis.hydra.query.spawndatastore.AliasCache; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.fasterxml.jackson.core.type.TypeReference; @@ -43,36 +44,34 @@ @ThreadSafe public class AliasManagerImpl implements AliasManager { - private static final Logger log = LoggerFactory.getLogger(AliasManagerImpl.class); - - //private AliasBiMap aliasBiMap; private Map aliases; - - public static final String ALIAS_PATH = "/query/alias"; /* This SpawnDataStore must be the same type (zookeeper/priam) between Spawn and Mqmaster. This should * be guaranteed by the implementation of DataStoreUtil. */ + public static final String ALIAS_PATH = "/query/alias"; private final SpawnDataStore spawnDataStore; private final HashMap> alias2jobs; private final HashMap job2alias; - public final ObjectMapper mapper; + private final ObjectMapper mapper; private final ReentrantLock mapLock; + private final AliasCache ac; public AliasManagerImpl() throws Exception{ - this.spawnDataStore = DataStoreUtil.makeCanonicalSpawnDataStore(); // added + this.spawnDataStore = DataStoreUtil.makeCanonicalSpawnDataStore(); this.mapLock = new ReentrantLock(); this.mapper = new ObjectMapper(); this.alias2jobs = new HashMap<>(); this.job2alias = new HashMap<>(); + this.ac = new AliasCache(); } - public AliasManagerImpl(SpawnDataStore spawnDataStore) { -// aliasBiMap.loadCurrentValues(); - this.spawnDataStore = spawnDataStore; // added + public AliasManagerImpl(SpawnDataStore spawnDataStore) throws Exception { + this.spawnDataStore = spawnDataStore; this.mapLock = new ReentrantLock(); this.mapper = new ObjectMapper(); this.alias2jobs = new HashMap<>(); this.job2alias = new HashMap<>(); + this.ac = new AliasCache(); } /** @@ -91,7 +90,6 @@ public Map> getAliases() { return alias2Jobs; } - // getJobs public List aliasToJobs(String alias) { return getJobs(alias); } @@ -109,7 +107,6 @@ public void addAlias(String alias, List jobs) { } } - // -------------------------------------------------- public void putAlias(String alias, List jobs) { mapLock.lock(); try { @@ -140,22 +137,20 @@ public List getJobs(String alias) { } finally { mapLock.unlock(); } - } /** - * Refresh an alias based on the latest cached value + * Refresh an alias based on datastore * * @param alias The alias to refresh */ private void refreshAlias(String alias) { try { - String data = this.spawnDataStore.getChild(ALIAS_PATH, alias); - updateAlias(alias, data); + updateAlias(alias, this.spawnDataStore.getChild(ALIAS_PATH, alias)); } catch (ExecutionException e) { log.warn("Failed to refresh alias: {}", alias, e); } catch (Exception e) { - e.printStackTrace(); + log.error("",e); } } @@ -166,10 +161,10 @@ private void refreshAlias(String alias) { * @return String The data that was updated (so the cache can be updated) */ @Nullable private String updateAlias(String alias, @Nullable String data) { - if (alias == null) { + if (Strings.isNullOrEmpty(alias)) { return data; } - if ((data == null) || data.isEmpty()) { + if (Strings.isNullOrEmpty(data)) { deleteAlias(alias); return data; } @@ -188,7 +183,8 @@ private void refreshAlias(String alias) { return data; } - @VisibleForTesting public List decodeAliases(@Nonnull Object data) { + @VisibleForTesting + protected List decodeAliases(@Nonnull Object data) { try { return mapper.readValue(data.toString(), new TypeReference>() {}); } catch (IOException e) { @@ -202,7 +198,6 @@ private void refreshAlias(String alias) { * * @param alias The alias to check */ - public void deleteAlias(String alias) { mapLock.lock(); try { @@ -222,33 +217,14 @@ public void deleteAlias(String alias) { spawnDataStore.deleteChild(ALIAS_PATH, alias); try { - String sJobsAll = spawnDataStore.getChild(ALIAS_PATH, alias); - System.out.println("sJobs from datastore for " + alias + " = " + sJobsAll); - } catch (Exception e) { - e.printStackTrace(); - } - - // todo: delete cache since refresh is initiated when query - AliasCache ac = null; - try { - ac = new AliasCache(); ac.deleteAlias(alias); - List jobs = ac.getJobs(alias); - if(jobs == null ) { - System.out.println("jobs is null"); - return; - } - - if(jobs.size() == 0) { - System.out.println("jobs size is 0"); + if(jobs == null || jobs.size() == 0 ) { + log.error("There is no jobs for alias {}", alias); return; } - - System.out.println("jobs after remove alias " + alias + " = " + jobs); - - } catch (Exception e) { - e.printStackTrace(); + } catch (ExecutionException e) { + log.error("",e); } } diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/store/AvailableCache.java b/hydra-main/src/main/java/com/addthis/hydra/job/store/AvailableCache.java index d1f813ed8..00f009058 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/store/AvailableCache.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/store/AvailableCache.java @@ -32,6 +32,8 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; + /** * A cache implementation that never blocks unless there is no data for a given ID. Stale values are refreshed asynchronously * and the old value is returned in the mean time. @@ -39,6 +41,7 @@ * @param The class that will be stored in the cache */ public abstract class AvailableCache implements AutoCloseable { + private static final Logger log = org.slf4j.LoggerFactory.getLogger(AvailableCache.class); /* A LoadingCache used to save fetched objects */ private final LoadingCache> loadingCache; @@ -69,19 +72,16 @@ public AvailableCache(long refreshMillis, long expireMillis, int maxSize, int fe if (fetchThreads <= 0) { fetchThreads = 2; } - - executor = new ThreadPoolExecutor( - fetchThreads, fetchThreads, 1000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), - new ThreadFactoryBuilder().setNameFormat("avail-cache-%d").setDaemon(true).build()); - //noinspection unchecked - cacheBuilder.removalListener(new RemovalListener, Optional>() { @Override public void onRemoval(RemovalNotification, Optional> notification) { - System.out.println("<" + notification.getKey() + ", " + notification.getValue() + "> has been removed!"); - loadingCache.asMap().get(notification.getKey()); + log.info("alias {} and its job {} removed", notification.getKey(), notification.getValue()); } }); + executor = new ThreadPoolExecutor( + fetchThreads, fetchThreads, 1000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setNameFormat("avail-cache-%d").setDaemon(true).build()); + //noinspection unchecked this.loadingCache = cacheBuilder.build(new CacheLoader>() { /** diff --git a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java index 9f2a594a9..05396efed 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java +++ b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java @@ -2,10 +2,14 @@ import javax.annotation.Nullable; -import java.util.ArrayList; -import java.util.HashMap; +import java.io.IOException; + import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import com.addthis.basis.util.Parameter; @@ -14,6 +18,8 @@ import com.addthis.hydra.job.store.SpawnDataStore; import com.google.common.base.Strings; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -25,103 +31,109 @@ public class AliasCache { private static final Logger log = LoggerFactory.getLogger(AliasCache.class); /* The interval to refresh cached alias values */ - private static long DEFAULT_REFRESH_INTERVAL = Parameter.longValue("alias.bimap.refresh", 200); + private static long DEFAULT_REFRESH_INTERVAL = Parameter.longValue("alias.bimap.refresh", 1000); /* The expiration period for cache values. Off by default, but useful for testing. */ private static long DEFAULT_CACHE_EXPIRE = Parameter.longValue("alias.bimap.expire", -1); /* The max size of the alias cache */ private static int DEFAULT_CACHE_SIZE = Parameter.intValue("alias.bimap.cache.size", 1000); + private static final long maintenanceInterval = 1000; private static final String ALIAS_PATH = "/query/alias"; + private final SpawnDataStore spawnDataStore; + private AvailableCache> mapCache; - private SpawnDataStore spawnDataStore; - private HashMap> alias2jobs; - - private final AvailableCache> mapCache; - private final List jobs = new ArrayList<>(); - - // System.out.println("sJobs = " + sJobs); // sJobs = ["job11","job12"] public AliasCache() throws Exception { spawnDataStore = DataStoreUtil.makeCanonicalSpawnDataStore(); - mapCache = new AvailableCache>(DEFAULT_REFRESH_INTERVAL, DEFAULT_CACHE_EXPIRE, DEFAULT_CACHE_SIZE, 2) { - @Override public List fetchValue(String id) { - String child; try { child = spawnDataStore.getChild(ALIAS_PATH, id); - if(child == null) { + if(Strings.isNullOrEmpty(child)) { return null; } } catch (Exception e) { - e.printStackTrace(); + log.error("Error occurred while getting alias {} from Spawn datastore", id, e); return null; } try { - String sJobs = getJobsFromDatastore(id, spawnDataStore.getChild(ALIAS_PATH, id)); + String sJobs = getJobsFromDatastore(id, child); ObjectMapper mapper = new ObjectMapper(); - - if(Strings.isNullOrEmpty(sJobs.toString())) { - System.out.println("sJobs is empty !"); + if(Strings.isNullOrEmpty(sJobs)) { + log.error("There is no jobs for alias {}", id); return null; - } else { - System.out.println("sJobs in cache = " + sJobs); } - - List jobs = mapper.readValue(sJobs.toString(), new TypeReference>() {}); + List jobs = mapper.readValue(sJobs, new TypeReference>() {}); return jobs; } catch (Exception e) { - log.warn("Exception when fetching alias: {}", id, e); - try { - return null; - } catch (Exception e1) { - e1.printStackTrace(); - } + log.error("Error occurred while fetching alias: {}", id, e); + return null; } - return null; } }; - } + maybeInitMaintenance(); + } + + public void loadCurrentValues() throws IOException { + Map aliases = spawnDataStore.getAllChildren(ALIAS_PATH); + if ((aliases == null) || aliases.isEmpty()) { + log.warn("No aliases found, unless this is on first cluster startup something is probably wrong"); + return; + } + mapCache.clear(); + ObjectMapper mapper = new ObjectMapper(); + for (Map.Entry aliasEntry : aliases.entrySet()) { + List jobs = mapper.readValue(aliasEntry.getValue(), new TypeReference>() {}); + mapCache.put(aliasEntry.getKey(), jobs); + } + } - public List getJobs(String alias) throws ExecutionException { - mapCache.cleanUp(); + private void maybeInitMaintenance() { + if (maintenanceInterval > 0) { + aliasCacheMaintainer.scheduleAtFixedRate(() -> { + mapCache.cleanUp(); + mapCache.getLoadingCache().asMap().keySet().forEach(mapCache.getLoadingCache()::getIfPresent); + }, maintenanceInterval, maintenanceInterval, TimeUnit.MILLISECONDS); + } + } + /** + * thread pool for cache maintenance runs. Should only need one thread. + */ + private final ScheduledExecutorService aliasCacheMaintainer = MoreExecutors.getExitingScheduledExecutorService( + new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("aliasCacheMaintainer=%d").build())); - List jobs = mapCache.get(alias); + public List getJobs(String alias) throws ExecutionException { + List jobs = mapCache.get(alias); if(jobs == null || jobs.size() == 0) { - System.out.println("not jobs for alias " + alias); + log.error("There is no job(s) for alias " + alias); return null; - } else { - System.out.println("AliasCache getJobs = " + alias); - return jobs; } - } - - public void deleteAlias(String alias) { - mapCache.remove(alias); - mapCache.cleanUp(); + return jobs; } - @Nullable public String getJobsFromDatastore(String alias, @Nullable String data) throws Exception { - if (alias == null) { + @Nullable protected String getJobsFromDatastore(String alias, @Nullable String data) throws Exception { + if (Strings.isNullOrEmpty(alias)) { return data; } - if ((data == null) || data.isEmpty()) { + if (Strings.isNullOrEmpty(data)) { mapCache.remove(alias); return data; } String jobs = spawnDataStore.getChild(ALIAS_PATH, alias); if(Strings.isNullOrEmpty(jobs)) { - System.out.println(alias + " has been deleted from datastore !!!"); + log.error("There is no alias {} in datastore", alias); return null; } - return jobs; } + public void deleteAlias(String alias) { + mapCache.remove(alias); + } + public AvailableCache> getMapCache() { return mapCache; } - } \ No newline at end of file diff --git a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandler.java b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandler.java index e0d24e4d1..becf55895 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandler.java +++ b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandler.java @@ -80,16 +80,16 @@ public SpawnDataStoreHandler() throws Exception { this.jobConfigManager = new JobConfigManager(spawnDataStore); this.queryConfigWatcher = new QueryConfigWatcher(spawnDataStore); this.aliasCache = new AliasCache(); -// this.aliasCache.loadCurrentValues(); + this.aliasCache.loadCurrentValues(); } @Override public void close() { spawnDataStore.close(); } - public AliasCache getAliasCache() { - return aliasCache; - } +// public AliasCache getAliasCache() { +// return aliasCache; +// } public void validateJobForQuery(String job) { if (!queryConfigWatcher.safeToQuery(job)) { @@ -97,10 +97,6 @@ public void validateJobForQuery(String job) { } } -// Query master does't need the two maps -// it can just rely on the cache which refresh the values by loading from the spawn data store. - // todo: get jobs from cache - // todo: old public List expandAlias(String job) throws ExecutionException { List possibleJobs = aliasCache.getJobs(job); if ((possibleJobs != null) && !possibleJobs.isEmpty()) { diff --git a/hydra-main/src/test/java/com/addthis/hydra/query/AliasManagerImplTest.java b/hydra-main/src/test/java/com/addthis/hydra/query/AliasManagerImplTest.java deleted file mode 100644 index 06d66a0f7..000000000 --- a/hydra-main/src/test/java/com/addthis/hydra/query/AliasManagerImplTest.java +++ /dev/null @@ -1,96 +0,0 @@ -package com.addthis.hydra.query; - -import com.addthis.hydra.job.alias.AliasManager; -import com.addthis.hydra.job.alias.AliasManagerImpl; - -import com.google.common.collect.ImmutableList; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -public class AliasManagerImplTest { - @Test - public void testConstruction() throws Exception { - AliasManager abm = new AliasManagerImpl(); - assertNull(abm.getJobs("foo")); - assertNull(abm.getLikelyAlias("foo")); - } - - @Test - public void testInit() throws Exception { - AliasManagerImpl abm = new AliasManagerImpl(); - assertNull(abm.getJobs("foo")); - assertNull(abm.getLikelyAlias("foo")); - } - - @Test - public void testInit2() throws Exception { - AliasManagerImpl abm = new com.addthis.hydra.job.alias.AliasManagerImpl(); - assertNull(abm.getJobs("foo")); - assertNull(abm.getLikelyAlias("foo")); - } - - @Test - public void testLocalGetPut() throws Exception { - AliasManagerImpl abm = new com.addthis.hydra.job.alias.AliasManagerImpl(); - assertNull(abm.getJobs("foo")); - abm.putAlias("a1", ImmutableList.of("j1", "j2")); - assertEquals(ImmutableList.of("j1", "j2"), abm.getJobs("a1")); - assertEquals("a1", abm.getLikelyAlias("j1")); - - abm.deleteAlias("a1"); - assertNull(abm.getJobs("a1")); - assertNull(abm.getLikelyAlias("j1")); - } - - @Test - public void testPropagation() throws Exception { -// System.setProperty("alias.bimap.expire", "50"); - AliasManagerImpl abm = new com.addthis.hydra.job.alias.AliasManagerImpl(); - abm.putAlias("a1", ImmutableList.of("j1", "j2")); - - AliasManagerImpl r_abm = new com.addthis.hydra.job.alias.AliasManagerImpl(); - assertEquals(ImmutableList.of("j1", "j2"), r_abm.getJobs("a1")); - assertEquals("a1", r_abm.getLikelyAlias("j1")); - - abm.putAlias("a2", ImmutableList.of("j21")); - Thread.sleep(350); - assertEquals(ImmutableList.of("j21"), r_abm.getJobs("a2")); - assertEquals("a2", r_abm.getLikelyAlias("j21")); - - abm.putAlias("a1", ImmutableList.of("j7", "j8")); - Thread.sleep(350); - assertEquals(ImmutableList.of("j7", "j8"), r_abm.getJobs("a1")); - assertEquals("a1", r_abm.getLikelyAlias("j7")); - - abm.deleteAlias("a1"); - Thread.sleep(350); - assertNull(r_abm.getJobs("a1")); - assertNull(r_abm.getLikelyAlias("j1")); - } - - @Test - public void updateLoop() throws Exception { - System.setProperty("alias.bimap.refresh", "500"); - AliasManagerImpl abm = new com.addthis.hydra.job.alias.AliasManagerImpl(); - AliasManagerImpl r_abm = new com.addthis.hydra.job.alias.AliasManagerImpl(); - - for (int i = 0; i < 5; i++) { - int retries = 10; - boolean succeeded = false; - String is = Integer.toString(i); - abm.putAlias("a1", ImmutableList.of(is)); - while (retries-- > 0) { - if (ImmutableList.of(is).equals(r_abm.getJobs("a1"))) { - succeeded = true; - break; - } - Thread.sleep(500); - } - assertTrue("failed to register updates after retrying", succeeded); - } - } -} diff --git a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java index 5823a31b7..4fa0253f7 100644 --- a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java +++ b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java @@ -1,110 +1,71 @@ package com.addthis.hydra.query.spawndatastore; +import com.addthis.hydra.job.alias.AliasManager; import com.addthis.hydra.job.alias.AliasManagerImpl; -import com.addthis.hydra.job.store.DataStoreUtil; -import com.addthis.hydra.job.store.SpawnDataStore; import com.google.common.collect.ImmutableList; import org.junit.Test; +import static junit.framework.TestCase.assertNull; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class AliasCacheTest { - private static final String ALIAS_PATH = "/query/alias"; - @Test - public void testPropagationFromAliasCache() throws Exception { - // 1. AliasManagerImpl - AliasManagerImpl abm = new AliasManagerImpl(); - abm.putAlias("a1", ImmutableList.of("j1", "j2")); + public void testGetJobs() throws Exception { + AliasManager abm1 = new AliasManagerImpl(); + abm1.putAlias("a1", ImmutableList.of("j11", "j12")); - AliasManagerImpl r_abm = new AliasManagerImpl(); - assertEquals(ImmutableList.of("j1", "j2"), r_abm.getJobs("a1")); -// assertEquals("a1", r_abm.getLikelyAlias("j1")); + AliasManager abm2 = new AliasManagerImpl(); + abm2.putAlias("a2", ImmutableList.of("j21", "j22")); - // 2. AliasCache AliasCache ac = new AliasCache(); - SpawnDataStore spawnDataStore = DataStoreUtil.makeCanonicalSpawnDataStore(); - System.out.println("udpate alis = " + ac.getJobsFromDatastore("a1", spawnDataStore.getChild(ALIAS_PATH, "a1"))); + assertEquals(ImmutableList.of("j11", "j12"), ac.getJobs("a1")); + assertEquals(ImmutableList.of("j21", "j22"), ac.getJobs("a2")); } @Test - public void testGet() throws Exception { - AliasManagerImpl abm = new AliasManagerImpl(); - abm.putAlias("test_alias1", ImmutableList.of("tJob1", "tJob2")); + public void testGetJob_Update() throws Exception { + AliasManager abm1 = new AliasManagerImpl(); + abm1.putAlias("a1", ImmutableList.of("j11", "j12")); - AliasManagerImpl r_abm = new AliasManagerImpl(); - assertEquals(ImmutableList.of("tJob1", "tJob2"), r_abm.getJobs("test_alias1")); + AliasManager abm2 = new AliasManagerImpl(); + abm2.putAlias("a2", ImmutableList.of("j21", "j22")); - abm.putAlias("test_alias1", ImmutableList.of("tJob11", "tJob12")); - r_abm.putAlias("test_alias1", ImmutableList.of("tJob31", "tJob32")); - - // 2. AliasCache AliasCache ac = new AliasCache(); - SpawnDataStore spawnDataStore = DataStoreUtil.makeCanonicalSpawnDataStore(); - System.out.println("get jobs for test_alias1 = " + ac.getJobs("test_alias1")); + assertEquals(ImmutableList.of("j11", "j12"), ac.getJobs("a1")); + assertEquals(ImmutableList.of("j21", "j22"), ac.getJobs("a2")); + + abm1.putAlias("a1", ImmutableList.of("j110", "j120")); + Thread.sleep(3000); + assertEquals(ImmutableList.of("j110", "j120"), ac.getJobs("a1")); } @Test - public void testPropagation() throws Exception { - AliasCache ac = new AliasCache(); - SpawnDataStore spawnDataStore = DataStoreUtil.makeCanonicalSpawnDataStore(); - - - AliasManagerImpl abm = new AliasManagerImpl(spawnDataStore); - abm.putAlias("A1", ImmutableList.of("J1", "J2")); - - AliasManagerImpl r_abm = new AliasManagerImpl(spawnDataStore); - assertEquals(ImmutableList.of("J1", "J2"), ac.getJobs("A1")); - - abm.putAlias("a2", ImmutableList.of("j21")); - Thread.sleep(350); - - assertEquals(ImmutableList.of("j21"), ac.getJobs("a2")); - - abm.putAlias("a1", ImmutableList.of("j7", "j8")); + public void testGetJob_Remove() throws Exception { + AliasManager abm = new AliasManagerImpl(); + abm.putAlias("a1", ImmutableList.of("j1", "j2")); Thread.sleep(350); - assertEquals(ImmutableList.of("j7", "j8"), ac.getJobs("a1")); - - abm.deleteAlias("a1"); -// ac.deleteAlias("a1"); - Thread.sleep(2000); - assertNull(ac.getJobs("a1")); - } - - @Test - public void testRemove() throws Exception { AliasCache ac = new AliasCache(); - - AliasManagerImpl abm = new AliasManagerImpl(); - abm.putAlias("a1", ImmutableList.of("j7", "j8")); - Thread.sleep(350); - assertEquals(ImmutableList.of("j7", "j8"), ac.getJobs("a1")); + assertEquals(ImmutableList.of("j1", "j2"), ac.getJobs("a1")); abm.deleteAlias("a1"); -// ac.deleteAlias("a1"); Thread.sleep(3000); - assertNull(ac.getJobs("a1")); } @Test - public void updateLoop() throws Exception { + public void testGetJob_Loop() throws Exception { AliasCache ac = new AliasCache(); - AliasManagerImpl abm = new AliasManagerImpl(); - AliasManagerImpl r_abm = new AliasManagerImpl(); + AliasManager abm1 = new AliasManagerImpl(); for (int i = 0; i < 5; i++) { int retries = 10; boolean succeeded = false; String is = Integer.toString(i); - - abm.putAlias("a1", ImmutableList.of(is)); - r_abm.putAlias("a1", ImmutableList.of(is)); + abm1.putAlias("a1", ImmutableList.of(is)); while (retries-- > 0) { if (ImmutableList.of(is).equals(ac.getJobs("a1"))) { succeeded = true; @@ -115,7 +76,5 @@ public void updateLoop() throws Exception { assertTrue("failed to register updates after retrying", succeeded); } } - - } diff --git a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java index 360c80105..236362287 100644 --- a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java +++ b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java @@ -11,13 +11,16 @@ public class SpawnDataStoreHandlerTest { private AliasCache ac; - private AliasManagerImpl abm; + private AliasManagerImpl abm1; + private AliasManagerImpl abm2; @Before public void setUp() throws Exception { - ac = new AliasCache(); - abm = new AliasManagerImpl(); - abm.putAlias("a1", ImmutableList.of("j11", "j12")); + abm1 = new AliasManagerImpl(); + abm2 = new AliasManagerImpl(); + + abm1.putAlias("a1", ImmutableList.of("j11", "j12")); + abm2.putAlias("a2", ImmutableList.of("j21", "j22")); Thread.sleep(350); } @@ -25,6 +28,7 @@ public void setUp() throws Exception { public void testExpandAlias() throws Exception { SpawnDataStoreHandler spawnDataStoreHandler = new SpawnDataStoreHandler(); assertEquals(ImmutableList.of("j11", "j12"), spawnDataStoreHandler.expandAlias("a1")); + assertEquals(ImmutableList.of("j21", "j22"), spawnDataStoreHandler.expandAlias("a2")); } @Test @@ -32,4 +36,20 @@ public void testResolveAlias() throws Exception { SpawnDataStoreHandler spawnDataStoreHandler = new SpawnDataStoreHandler(); assertEquals("j11", spawnDataStoreHandler.resolveAlias("a1")); } + + @Test + public void testExpandAlias_Update() throws Exception { + SpawnDataStoreHandler spawnDataStoreHandler = new SpawnDataStoreHandler(); + abm1.putAlias("a1", ImmutableList.of("j110", "j120")); + Thread.sleep(3000); + assertEquals(ImmutableList.of("j110", "j120"), spawnDataStoreHandler.expandAlias("a1")); + } + + @Test + public void testResolveAlias_Update() throws Exception { + SpawnDataStoreHandler spawnDataStoreHandler = new SpawnDataStoreHandler(); + abm1.putAlias("a1", ImmutableList.of("j110", "j120")); + Thread.sleep(3000); + assertEquals("j110", spawnDataStoreHandler.resolveAlias("a1")); + } } \ No newline at end of file From 87085e06fde7ef901a4ffbc9b34b9618c9293206 Mon Sep 17 00:00:00 2001 From: Irene Cho Date: Mon, 14 Aug 2017 11:23:44 -0400 Subject: [PATCH 05/16] aliasCache2 init work --- .../hydra/job/alias/AliasManagerImpl.java | 11 +---- .../query/spawndatastore/AliasCacheTest.java | 45 +++++++++---------- 2 files changed, 23 insertions(+), 33 deletions(-) diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java index 4fe61bdd9..91759b437 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java @@ -216,16 +216,7 @@ public void deleteAlias(String alias) { } spawnDataStore.deleteChild(ALIAS_PATH, alias); - try { - ac.deleteAlias(alias); - List jobs = ac.getJobs(alias); - if(jobs == null || jobs.size() == 0 ) { - log.error("There is no jobs for alias {}", alias); - return; - } - } catch (ExecutionException e) { - log.error("",e); - } + ac.deleteAlias(alias); } /** diff --git a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java index 4fa0253f7..d71c1aa6f 100644 --- a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java +++ b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java @@ -7,9 +7,8 @@ import org.junit.Test; -import static junit.framework.TestCase.assertNull; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; public class AliasCacheTest { @Test @@ -55,26 +54,26 @@ public void testGetJob_Remove() throws Exception { Thread.sleep(3000); assertNull(ac.getJobs("a1")); } - - @Test - public void testGetJob_Loop() throws Exception { - AliasCache ac = new AliasCache(); - AliasManager abm1 = new AliasManagerImpl(); - - for (int i = 0; i < 5; i++) { - int retries = 10; - boolean succeeded = false; - String is = Integer.toString(i); - abm1.putAlias("a1", ImmutableList.of(is)); - while (retries-- > 0) { - if (ImmutableList.of(is).equals(ac.getJobs("a1"))) { - succeeded = true; - break; - } - Thread.sleep(500); - } - assertTrue("failed to register updates after retrying", succeeded); - } - } +// +// @Test +// public void testGetJob_Loop() throws Exception { +// AliasCache ac = new AliasCache(); +// AliasManager abm1 = new AliasManagerImpl(); +// +// for (int i = 0; i < 5; i++) { +// int retries = 10; +// boolean succeeded = false; +// String is = Integer.toString(i); +// abm1.putAlias("a1", ImmutableList.of(is)); +// while (retries-- > 0) { +// if (ImmutableList.of(is).equals(ac.getJobs("a1"))) { +// succeeded = true; +// break; +// } +// Thread.sleep(500); +// } +// assertTrue("failed to register updates after retrying", succeeded); +// } +// } } From e23d236e2e8bba85fa47b861ec47c1856491949c Mon Sep 17 00:00:00 2001 From: Irene Cho Date: Mon, 14 Aug 2017 13:09:12 -0400 Subject: [PATCH 06/16] alias cache init commit --- .../query/spawndatastore/AliasCacheTest.java | 43 ++++++++++--------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java index d71c1aa6f..4fdc42a05 100644 --- a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java +++ b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java @@ -9,6 +9,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class AliasCacheTest { @Test @@ -54,26 +55,26 @@ public void testGetJob_Remove() throws Exception { Thread.sleep(3000); assertNull(ac.getJobs("a1")); } -// -// @Test -// public void testGetJob_Loop() throws Exception { -// AliasCache ac = new AliasCache(); -// AliasManager abm1 = new AliasManagerImpl(); -// -// for (int i = 0; i < 5; i++) { -// int retries = 10; -// boolean succeeded = false; -// String is = Integer.toString(i); -// abm1.putAlias("a1", ImmutableList.of(is)); -// while (retries-- > 0) { -// if (ImmutableList.of(is).equals(ac.getJobs("a1"))) { -// succeeded = true; -// break; -// } -// Thread.sleep(500); -// } -// assertTrue("failed to register updates after retrying", succeeded); -// } -// } + + @Test + public void testGetJob_Loop() throws Exception { + AliasCache ac = new AliasCache(); + AliasManager abm1 = new AliasManagerImpl(); + + for (int i = 0; i < 5; i++) { + int retries = 10; + boolean succeeded = false; + String is = Integer.toString(i); + abm1.putAlias("a1", ImmutableList.of(is)); + while (retries-- > 0) { + if (ImmutableList.of(is).equals(ac.getJobs("a1"))) { + succeeded = true; + break; + } + Thread.sleep(500); + } + assertTrue("failed to register updates after retrying", succeeded); + } + } } From 7ac2e22bf7aea3def1266cf479c966414cd2b41d Mon Sep 17 00:00:00 2001 From: Irene Cho Date: Mon, 14 Aug 2017 14:55:49 -0400 Subject: [PATCH 07/16] clean code --- .../hydra/job/alias/AliasManagerImpl.java | 4 +- .../hydra/job/store/AvailableCache.java | 2 +- .../query/spawndatastore/AliasCache.java | 54 ++++--------------- .../spawndatastore/SpawnDataStoreHandler.java | 4 -- .../query/spawndatastore/AliasCacheTest.java | 4 +- 5 files changed, 13 insertions(+), 55 deletions(-) diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java index 91759b437..a511a0735 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java @@ -56,6 +56,7 @@ public class AliasManagerImpl implements AliasManager { private final ReentrantLock mapLock; private final AliasCache ac; + @VisibleForTesting public AliasManagerImpl() throws Exception{ this.spawnDataStore = DataStoreUtil.makeCanonicalSpawnDataStore(); this.mapLock = new ReentrantLock(); @@ -79,9 +80,7 @@ public AliasManagerImpl(SpawnDataStore spawnDataStore) throws Exception { */ public Map> getAliases() { aliases = spawnDataStore.getAllChildren(ALIAS_PATH); - Map> alias2Jobs = new HashMap<>(); - for (Map.Entry aliasEntry : aliases.entrySet()) { String key = aliasEntry.getKey(); List jobs = decodeAliases(aliasEntry.getValue()); @@ -215,7 +214,6 @@ public void deleteAlias(String alias) { mapLock.unlock(); } spawnDataStore.deleteChild(ALIAS_PATH, alias); - ac.deleteAlias(alias); } diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/store/AvailableCache.java b/hydra-main/src/main/java/com/addthis/hydra/job/store/AvailableCache.java index 00f009058..4b0ef0f61 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/store/AvailableCache.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/store/AvailableCache.java @@ -75,7 +75,7 @@ public AvailableCache(long refreshMillis, long expireMillis, int maxSize, int fe cacheBuilder.removalListener(new RemovalListener, Optional>() { @Override public void onRemoval(RemovalNotification, Optional> notification) { - log.info("alias {} and its job {} removed", notification.getKey(), notification.getValue()); + log.info("alias {} and its job {} removed from cache", notification.getKey(), notification.getValue()); } }); executor = new ThreadPoolExecutor( diff --git a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java index 05396efed..ca77b055b 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java +++ b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java @@ -1,7 +1,5 @@ package com.addthis.hydra.query.spawndatastore; -import javax.annotation.Nullable; - import java.io.IOException; import java.util.List; @@ -45,29 +43,18 @@ public class AliasCache { public AliasCache() throws Exception { spawnDataStore = DataStoreUtil.makeCanonicalSpawnDataStore(); mapCache = new AvailableCache>(DEFAULT_REFRESH_INTERVAL, DEFAULT_CACHE_EXPIRE, DEFAULT_CACHE_SIZE, 2) { - @Override public List fetchValue(String id) { - String child; + @Override public List fetchValue(String alias) { + String jobs; try { - child = spawnDataStore.getChild(ALIAS_PATH, id); - if(Strings.isNullOrEmpty(child)) { + jobs = spawnDataStore.getChild(ALIAS_PATH, alias); + if(Strings.isNullOrEmpty(jobs)) { + log.error("There is no jobs for alias {}", alias); return null; + } else { + return new ObjectMapper().readValue(jobs, new TypeReference>() {}); } } catch (Exception e) { - log.error("Error occurred while getting alias {} from Spawn datastore", id, e); - return null; - } - - try { - String sJobs = getJobsFromDatastore(id, child); - ObjectMapper mapper = new ObjectMapper(); - if(Strings.isNullOrEmpty(sJobs)) { - log.error("There is no jobs for alias {}", id); - return null; - } - List jobs = mapper.readValue(sJobs, new TypeReference>() {}); - return jobs; - } catch (Exception e) { - log.error("Error occurred while fetching alias: {}", id, e); + log.error("Error occurred while getting alias {} from Spawn datastore", alias, e); return null; } } @@ -98,32 +85,13 @@ private void maybeInitMaintenance() { } } - /** - * thread pool for cache maintenance runs. Should only need one thread. - */ private final ScheduledExecutorService aliasCacheMaintainer = MoreExecutors.getExitingScheduledExecutorService( new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("aliasCacheMaintainer=%d").build())); public List getJobs(String alias) throws ExecutionException { List jobs = mapCache.get(alias); if(jobs == null || jobs.size() == 0) { - log.error("There is no job(s) for alias " + alias); - return null; - } - return jobs; - } - - @Nullable protected String getJobsFromDatastore(String alias, @Nullable String data) throws Exception { - if (Strings.isNullOrEmpty(alias)) { - return data; - } - if (Strings.isNullOrEmpty(data)) { - mapCache.remove(alias); - return data; - } - String jobs = spawnDataStore.getChild(ALIAS_PATH, alias); - if(Strings.isNullOrEmpty(jobs)) { - log.error("There is no alias {} in datastore", alias); + log.error("There is no job for alias {} ", alias); return null; } return jobs; @@ -132,8 +100,4 @@ public List getJobs(String alias) throws ExecutionException { public void deleteAlias(String alias) { mapCache.remove(alias); } - - public AvailableCache> getMapCache() { - return mapCache; - } } \ No newline at end of file diff --git a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandler.java b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandler.java index becf55895..41cac2661 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandler.java +++ b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandler.java @@ -87,10 +87,6 @@ public SpawnDataStoreHandler() throws Exception { spawnDataStore.close(); } -// public AliasCache getAliasCache() { -// return aliasCache; -// } - public void validateJobForQuery(String job) { if (!queryConfigWatcher.safeToQuery(job)) { throw new QueryException("job is not safe to query (are queries enabled for this job in spawn?): " + job); diff --git a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java index 4fdc42a05..9ac845f54 100644 --- a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java +++ b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java @@ -34,8 +34,6 @@ public void testGetJob_Update() throws Exception { abm2.putAlias("a2", ImmutableList.of("j21", "j22")); AliasCache ac = new AliasCache(); - assertEquals(ImmutableList.of("j11", "j12"), ac.getJobs("a1")); - assertEquals(ImmutableList.of("j21", "j22"), ac.getJobs("a2")); abm1.putAlias("a1", ImmutableList.of("j110", "j120")); Thread.sleep(3000); @@ -67,6 +65,8 @@ public void testGetJob_Loop() throws Exception { String is = Integer.toString(i); abm1.putAlias("a1", ImmutableList.of(is)); while (retries-- > 0) { + System.out.println("retries = " + retries + ", ImmutableList.of(is) = " + ImmutableList.of(is) + ", ac.getJobw(a1) = " + ac.getJobs("a1")); + if (ImmutableList.of(is).equals(ac.getJobs("a1"))) { succeeded = true; break; From 11ca37a847d5de950aa81ecffc494c394b96dba0 Mon Sep 17 00:00:00 2001 From: Irene Cho Date: Mon, 14 Aug 2017 15:17:08 -0400 Subject: [PATCH 08/16] clean code --- .../hydra/job/alias/AliasManagerImpl.java | 1 - .../com/addthis/hydra/job/spawn/Spawn.java | 3 - .../job/web/resources/AliasResourceTest.java | 62 ------------------- .../hydra/query/MeshQueryMasterTest.java | 10 --- hydra-uber/bin/local-stack.sh | 5 +- 5 files changed, 2 insertions(+), 79 deletions(-) delete mode 100644 hydra-main/src/test/java/com/addthis/hydra/job/web/resources/AliasResourceTest.java diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java index a511a0735..ae5afcc0b 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java @@ -56,7 +56,6 @@ public class AliasManagerImpl implements AliasManager { private final ReentrantLock mapLock; private final AliasCache ac; - @VisibleForTesting public AliasManagerImpl() throws Exception{ this.spawnDataStore = DataStoreUtil.makeCanonicalSpawnDataStore(); this.mapLock = new ReentrantLock(); diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/spawn/Spawn.java b/hydra-main/src/main/java/com/addthis/hydra/job/spawn/Spawn.java index 31e73390b..92101fd12 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/spawn/Spawn.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/spawn/Spawn.java @@ -1352,15 +1352,12 @@ public void updateJob(@Nullable IJob ijob, boolean reviseReplicas) throws Except } // take action on trigger changes (like # replicas) if ((oldjob != job) && reviseReplicas) { - int oldReplicaCount = oldjob.getReplicas(); int newReplicaCount = job.getReplicas(); - checkArgument((oldReplicaCount == newReplicaCount) || (job.getState() == JobState.IDLE) || (job.getState() == JobState.DEGRADED), "job must be IDLE or DEGRADED to change replicas"); checkArgument(newReplicaCount < hostManager.monitored.size(), "replication factor must be < # live hosts"); - rebalanceReplicas(job); } queueJobTaskUpdateEvent(job); diff --git a/hydra-main/src/test/java/com/addthis/hydra/job/web/resources/AliasResourceTest.java b/hydra-main/src/test/java/com/addthis/hydra/job/web/resources/AliasResourceTest.java deleted file mode 100644 index d9b644589..000000000 --- a/hydra-main/src/test/java/com/addthis/hydra/job/web/resources/AliasResourceTest.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.addthis.hydra.job.web.resources; - -import java.util.List; -import java.util.stream.IntStream; - -import com.addthis.basis.kv.KVPairs; - -import com.addthis.hydra.job.alias.AliasManager; - -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; - -import org.junit.Before; -import org.junit.Test; - -import static org.mockito.Mockito.mock; - -public class AliasResourceTest { - - AliasManager aliasManager; - private KVPairs kv; - - @Before - public void setUp() throws Exception { - aliasManager = mock(AliasManager.class); - kv = new KVPairs(); - } - - /* - - query Request URL:http://adq05:2222/query/call?path=%2F(25)%2B%3A%2Bcount%2C%2Bnodes%2C%2Bmem&ops=gather%3Dksaau%3Bsort%3D0%3As%3Aa&format=json&job=8939260c-395c-4355-b5a0-3ca23c4a113c&sender=spawn&nocache=1 - path=%2F(25)%2B%3A%2Bcount%2C%2Bnodes%2C%2Bmem&ops=gather%3Dksaau%3Bsort%3D0%3As%3Aa&format=json&job=8939260c-395c-4355-b5a0-3ca23c4a113c&sender=spawn&nocache=1 - - path:/(25)+:+count,+nodes,+mem - ops:gather=ksaau;sort=0:s:a - format:json - job:8939260c-395c-4355-b5a0-3ca23c4a113c - sender:spawn - nocache:1 - -http://spawn-iad:5052/alias/save - -name=test-alias1&jobs=8939260c-395c-4355-b5a0-3ca23c4a113c - */ - - @Test - public void postAlias() throws Exception { - kv.add("name", "test-alias1"); - kv.add("jobs", "8939260c-395c-4355-b5a0-3ca23c4a113c"); - - List jobs = Lists.newArrayList(Splitter.on(',').split(kv.getValue("jobs"))); - - IntStream.rangeClosed(1, 1000).forEach(i -> { - System.out.println(i); - aliasManager.addAlias(kv.getValue("name"), jobs); - }); - - - - } - -} \ No newline at end of file diff --git a/hydra-main/src/test/java/com/addthis/hydra/query/MeshQueryMasterTest.java b/hydra-main/src/test/java/com/addthis/hydra/query/MeshQueryMasterTest.java index 66333a885..562178e16 100644 --- a/hydra-main/src/test/java/com/addthis/hydra/query/MeshQueryMasterTest.java +++ b/hydra-main/src/test/java/com/addthis/hydra/query/MeshQueryMasterTest.java @@ -288,13 +288,6 @@ public void requestedTaskValidation_allowPartialOn() { verifyTaskValidationFail("all req missing", tasks(0, 2), tasks(1), true); } - // todo - @Test - public void expandAliasTest() { - - } - - private Set tasks(Integer... args) { return Sets.newHashSet(args); } @@ -315,7 +308,4 @@ private void verifyTaskValidationFail(String failMsg, Set avail, Set Date: Mon, 14 Aug 2017 16:18:06 -0400 Subject: [PATCH 09/16] add cleaup in tests --- .../hydra/query/spawndatastore/AliasCacheTest.java | 8 ++++++++ .../query/spawndatastore/SpawnDataStoreHandlerTest.java | 7 +++++++ 2 files changed, 15 insertions(+) diff --git a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java index 9ac845f54..b8316b53b 100644 --- a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java +++ b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java @@ -5,6 +5,7 @@ import com.google.common.collect.ImmutableList; +import org.junit.After; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -76,5 +77,12 @@ public void testGetJob_Loop() throws Exception { assertTrue("failed to register updates after retrying", succeeded); } } + + @After + public void cleanUp() throws Exception { + AliasManager abm1 = new AliasManagerImpl(); + abm1.deleteAlias("a1"); + abm1.deleteAlias("a2"); + } } diff --git a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java index 236362287..e45cb94be 100644 --- a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java +++ b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java @@ -4,6 +4,7 @@ import com.google.common.collect.ImmutableList; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -52,4 +53,10 @@ public void testResolveAlias_Update() throws Exception { Thread.sleep(3000); assertEquals("j110", spawnDataStoreHandler.resolveAlias("a1")); } + + @After + public void cleanUp() { + abm1.deleteAlias("a1"); + abm1.deleteAlias("a2"); + } } \ No newline at end of file From cd152a928851d84c466d57a076d20999953bbe03 Mon Sep 17 00:00:00 2001 From: Irene Cho Date: Mon, 14 Aug 2017 16:21:42 -0400 Subject: [PATCH 10/16] added missing test --- .../hydra/job/alias/AliasManagerImplTest.java | 92 +++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 hydra-main/src/test/java/com/addthis/hydra/job/alias/AliasManagerImplTest.java diff --git a/hydra-main/src/test/java/com/addthis/hydra/job/alias/AliasManagerImplTest.java b/hydra-main/src/test/java/com/addthis/hydra/job/alias/AliasManagerImplTest.java new file mode 100644 index 000000000..37a40404d --- /dev/null +++ b/hydra-main/src/test/java/com/addthis/hydra/job/alias/AliasManagerImplTest.java @@ -0,0 +1,92 @@ +package com.addthis.hydra.job.alias; + +import com.google.common.collect.ImmutableList; + +import org.junit.After; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class AliasManagerImplTest { + @Test + public void testConstruction() throws Exception { + AliasManager abm = new AliasManagerImpl(); + assertNull(abm.getJobs("foo")); + assertNull(abm.getLikelyAlias("foo")); + } + + @Test + public void testInit() throws Exception { + AliasManager abm = new AliasManagerImpl(); + assertNull(abm.getJobs("foo")); + assertNull(abm.getLikelyAlias("foo")); + } + + @Test + public void testLocalGetPut() throws Exception { + AliasManager abm = new AliasManagerImpl(); + assertNull(abm.getJobs("foo")); + abm.putAlias("a1", ImmutableList.of("j1", "j2")); + assertEquals(ImmutableList.of("j1", "j2"), abm.getJobs("a1")); + assertEquals("a1", abm.getLikelyAlias("j1")); + + abm.deleteAlias("a1"); + assertNull(abm.getJobs("a1")); + assertNull(abm.getLikelyAlias("j1")); + } + + @Test + public void testPropagation() throws Exception { + AliasManager abm = new AliasManagerImpl(); + abm.putAlias("a1", ImmutableList.of("j1", "j2")); + + AliasManagerImpl r_abm = new AliasManagerImpl(); + assertEquals(ImmutableList.of("j1", "j2"), r_abm.getJobs("a1")); + assertEquals("a1", r_abm.getLikelyAlias("j1")); + + abm.putAlias("a2", ImmutableList.of("j21")); + Thread.sleep(350); + assertEquals(ImmutableList.of("j21"), r_abm.getJobs("a2")); + assertEquals("a2", r_abm.getLikelyAlias("j21")); + + abm.putAlias("a1", ImmutableList.of("j7", "j8")); + Thread.sleep(350); + assertEquals(ImmutableList.of("j7", "j8"), r_abm.getJobs("a1")); + assertEquals("a1", r_abm.getLikelyAlias("j7")); + + abm.deleteAlias("a1"); + Thread.sleep(350); + assertNull(r_abm.getJobs("a1")); + assertNull(r_abm.getLikelyAlias("j1")); + } + + @Test + public void updateLoop() throws Exception { + AliasManager abm = new AliasManagerImpl(); + AliasManager r_abm = new AliasManagerImpl(); + + for (int i = 0; i < 5; i++) { + int retries = 10; + boolean succeeded = false; + String is = Integer.toString(i); + abm.putAlias("a1", ImmutableList.of(is)); + while (retries-- > 0) { + if (ImmutableList.of(is).equals(r_abm.getJobs("a1"))) { + succeeded = true; + break; + } + Thread.sleep(500); + } + assertTrue("failed to register updates after retrying", succeeded); + } + } + + @After + public void cleanUp() throws Exception { + AliasManager abm1 = new AliasManagerImpl(); + abm1.deleteAlias("a1"); + abm1.deleteAlias("a2"); + } +} From e9c40f6e3937462c224a2dacfb385100246c5955 Mon Sep 17 00:00:00 2001 From: Irene Cho Date: Mon, 14 Aug 2017 16:24:58 -0400 Subject: [PATCH 11/16] remove unrelated file --- .../test/java/com/addthis/hydra/query/MeshQueryMasterTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hydra-main/src/test/java/com/addthis/hydra/query/MeshQueryMasterTest.java b/hydra-main/src/test/java/com/addthis/hydra/query/MeshQueryMasterTest.java index 562178e16..00554676d 100644 --- a/hydra-main/src/test/java/com/addthis/hydra/query/MeshQueryMasterTest.java +++ b/hydra-main/src/test/java/com/addthis/hydra/query/MeshQueryMasterTest.java @@ -308,4 +308,5 @@ private void verifyTaskValidationFail(String failMsg, Set avail, Set Date: Mon, 14 Aug 2017 16:36:25 -0400 Subject: [PATCH 12/16] clean up --- .../com/addthis/hydra/query/spawndatastore/AliasCache.java | 3 +-- .../test/java/com/addthis/hydra/query/MeshQueryMasterTest.java | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java index ca77b055b..b4630e86d 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java +++ b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java @@ -44,9 +44,8 @@ public AliasCache() throws Exception { spawnDataStore = DataStoreUtil.makeCanonicalSpawnDataStore(); mapCache = new AvailableCache>(DEFAULT_REFRESH_INTERVAL, DEFAULT_CACHE_EXPIRE, DEFAULT_CACHE_SIZE, 2) { @Override public List fetchValue(String alias) { - String jobs; try { - jobs = spawnDataStore.getChild(ALIAS_PATH, alias); + String jobs = spawnDataStore.getChild(ALIAS_PATH, alias); if(Strings.isNullOrEmpty(jobs)) { log.error("There is no jobs for alias {}", alias); return null; diff --git a/hydra-main/src/test/java/com/addthis/hydra/query/MeshQueryMasterTest.java b/hydra-main/src/test/java/com/addthis/hydra/query/MeshQueryMasterTest.java index 00554676d..562178e16 100644 --- a/hydra-main/src/test/java/com/addthis/hydra/query/MeshQueryMasterTest.java +++ b/hydra-main/src/test/java/com/addthis/hydra/query/MeshQueryMasterTest.java @@ -308,5 +308,4 @@ private void verifyTaskValidationFail(String failMsg, Set avail, Set Date: Mon, 14 Aug 2017 16:52:56 -0400 Subject: [PATCH 13/16] clean up --- .../hydra/query/spawndatastore/AliasCache.java | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java index b4630e86d..9de989be1 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java +++ b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java @@ -3,7 +3,6 @@ import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -62,17 +61,7 @@ public AliasCache() throws Exception { } public void loadCurrentValues() throws IOException { - Map aliases = spawnDataStore.getAllChildren(ALIAS_PATH); - if ((aliases == null) || aliases.isEmpty()) { - log.warn("No aliases found, unless this is on first cluster startup something is probably wrong"); - return; - } - mapCache.clear(); - ObjectMapper mapper = new ObjectMapper(); - for (Map.Entry aliasEntry : aliases.entrySet()) { - List jobs = mapper.readValue(aliasEntry.getValue(), new TypeReference>() {}); - mapCache.put(aliasEntry.getKey(), jobs); - } + mapCache.getLoadingCache(); } private void maybeInitMaintenance() { From 8bf42505c9334ff3939ab9a797d3443f41de85da Mon Sep 17 00:00:00 2001 From: Irene Cho Date: Tue, 15 Aug 2017 15:11:56 -0400 Subject: [PATCH 14/16] modified code --- .../addthis/hydra/job/alias/AliasManager.java | 2 +- .../hydra/job/alias/AliasManagerImpl.java | 48 ++++++++++--------- .../hydra/job/alias/AliasManagerImplTest.java | 17 +++++++ .../query/spawndatastore/AliasCacheTest.java | 2 - .../SpawnDataStoreHandlerTest.java | 2 +- 5 files changed, 45 insertions(+), 26 deletions(-) diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManager.java b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManager.java index b3e2c62b1..93ad7a6f9 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManager.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManager.java @@ -22,7 +22,7 @@ public interface AliasManager { void addAlias(String alias, List jobs); - void deleteAlias(String alias); + void deleteAlias(String alias) throws Exception; List aliasToJobs(String alias); diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java index ae5afcc0b..3ec16d1ae 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java @@ -50,6 +50,7 @@ public class AliasManagerImpl implements AliasManager { * be guaranteed by the implementation of DataStoreUtil. */ public static final String ALIAS_PATH = "/query/alias"; private final SpawnDataStore spawnDataStore; + private final HashMap> alias2jobs; private final HashMap job2alias; private final ObjectMapper mapper; @@ -111,8 +112,7 @@ public void putAlias(String alias, List jobs) { alias2jobs.put(alias, jobs); job2alias.put(jobs.get(0), alias); StringWriter sw = new StringWriter(); - mapper.writeValue(sw, jobs); - spawnDataStore.putAsChild(ALIAS_PATH, alias, sw.toString()); + spawnDataStore.putAsChild(ALIAS_PATH, alias, mapper.writeValueAsString(jobs)); } catch (Exception e) { log.warn("failed to put alias: {}", alias, e); throw Throwables.propagate(e); @@ -146,45 +146,45 @@ private void refreshAlias(String alias) { try { updateAlias(alias, this.spawnDataStore.getChild(ALIAS_PATH, alias)); } catch (ExecutionException e) { - log.warn("Failed to refresh alias: {}", alias, e); + log.error("Failed to refresh alias: {}", alias, e); } catch (Exception e) { - log.error("",e); + log.error("Unexpected error while refreshing alias {}", alias, e); } } /** - * Load the jobIds for a particular alias from the SpawnDataStore + * If jobs is available, update jobs in two maps based on the given alias + * Otherwise, delete the job for a given alias from two maps * * @param alias The alias key to check - * @return String The data that was updated (so the cache can be updated) + * @param jobs The jobs to be updated for alias key + * @return String The job that was updated */ - @Nullable private String updateAlias(String alias, @Nullable String data) { + @Nullable private String updateAlias(String alias, @Nullable String jobs) throws Exception { if (Strings.isNullOrEmpty(alias)) { - return data; + log.warn("Ignoring alias since alias {} is null or empty ", alias); + return jobs; } - if (Strings.isNullOrEmpty(data)) { + if (Strings.isNullOrEmpty(jobs)) { + log.warn("Ignoring alias {} since there are no jobs and delete alias from two maps", alias); deleteAlias(alias); - return data; - } - List jobs = decodeAliases(data); - if (jobs.isEmpty()) { - log.warn("no jobs for alias {}, ignoring {}", alias, alias); - return data; + return jobs; } + List jobList = decodeAliases(jobs); mapLock.lock(); try { - alias2jobs.put(alias, jobs); - job2alias.put(jobs.get(0), alias); + alias2jobs.put(alias, jobList); + job2alias.put(jobList.get(0), alias); } finally { mapLock.unlock(); } - return data; + return jobs; } @VisibleForTesting - protected List decodeAliases(@Nonnull Object data) { + protected List decodeAliases(@Nonnull String data) { try { - return mapper.readValue(data.toString(), new TypeReference>() {}); + return mapper.readValue(data, new TypeReference>() {}); } catch (IOException e) { log.warn("Failed to decode data", e); return new ArrayList<>(0); @@ -196,7 +196,12 @@ protected List decodeAliases(@Nonnull Object data) { * * @param alias The alias to check */ - public void deleteAlias(String alias) { + public void deleteAlias(String alias) throws Exception { + spawnDataStore.deleteChild(ALIAS_PATH, alias); + if( !Strings.isNullOrEmpty(spawnDataStore.getChild(ALIAS_PATH, alias))) { + log.error("Couldn't delete alias from spawn datastore for alias {}", alias); + return; + } mapLock.lock(); try { List jobs = alias2jobs.get(alias); @@ -212,7 +217,6 @@ public void deleteAlias(String alias) { } finally { mapLock.unlock(); } - spawnDataStore.deleteChild(ALIAS_PATH, alias); ac.deleteAlias(alias); } diff --git a/hydra-main/src/test/java/com/addthis/hydra/job/alias/AliasManagerImplTest.java b/hydra-main/src/test/java/com/addthis/hydra/job/alias/AliasManagerImplTest.java index 37a40404d..24b8ca5d0 100644 --- a/hydra-main/src/test/java/com/addthis/hydra/job/alias/AliasManagerImplTest.java +++ b/hydra-main/src/test/java/com/addthis/hydra/job/alias/AliasManagerImplTest.java @@ -10,6 +10,23 @@ import static org.junit.Assert.assertTrue; public class AliasManagerImplTest { + +// @Test +// public void testTemp() throws Exception { +// String ALIAS_PATH = "/query/alias"; +// SpawnDataStore spawnDataStore = DataStoreUtil.makeCanonicalSpawnDataStore(); +// +// AliasManager abm = new AliasManagerImpl(); +// abm.putAlias("a1", ImmutableList.of("j1", "j2")); +// +// //String sJob = spawnDataStore.getChild(ALIAS_PATH, "a1"); +// if(!Strings.isNullOrEmpty(spawnDataStore.getChild(ALIAS_PATH, "a1"))) { +// System.out.println("sJob = " + spawnDataStore.getChild(ALIAS_PATH, "a1")); +// } else { +// System.out.println("sJob = null or empty"); +// } +// } + @Test public void testConstruction() throws Exception { AliasManager abm = new AliasManagerImpl(); diff --git a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java index b8316b53b..02cebeef5 100644 --- a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java +++ b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java @@ -66,8 +66,6 @@ public void testGetJob_Loop() throws Exception { String is = Integer.toString(i); abm1.putAlias("a1", ImmutableList.of(is)); while (retries-- > 0) { - System.out.println("retries = " + retries + ", ImmutableList.of(is) = " + ImmutableList.of(is) + ", ac.getJobw(a1) = " + ac.getJobs("a1")); - if (ImmutableList.of(is).equals(ac.getJobs("a1"))) { succeeded = true; break; diff --git a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java index e45cb94be..39cb97940 100644 --- a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java +++ b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java @@ -55,7 +55,7 @@ public void testResolveAlias_Update() throws Exception { } @After - public void cleanUp() { + public void cleanUp() throws Exception { abm1.deleteAlias("a1"); abm1.deleteAlias("a2"); } From fd25ffbbe1700a5f9b1d41a8427f6b23ae85c891 Mon Sep 17 00:00:00 2001 From: Irene Cho Date: Tue, 15 Aug 2017 16:07:16 -0400 Subject: [PATCH 15/16] add licence to new files --- .../hydra/job/alias/AliasManagerImpl.java | 9 +++--- .../query/spawndatastore/AliasCache.java | 13 ++++++++ .../hydra/job/alias/AliasManagerImplTest.java | 30 ++++++++----------- .../query/spawndatastore/AliasCacheTest.java | 13 ++++++++ .../SpawnDataStoreHandlerTest.java | 13 ++++++++ 5 files changed, 56 insertions(+), 22 deletions(-) diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java index 3ec16d1ae..37cf23be4 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java @@ -50,9 +50,8 @@ public class AliasManagerImpl implements AliasManager { * be guaranteed by the implementation of DataStoreUtil. */ public static final String ALIAS_PATH = "/query/alias"; private final SpawnDataStore spawnDataStore; - - private final HashMap> alias2jobs; - private final HashMap job2alias; + private final Map> alias2jobs; + private final Map job2alias; private final ObjectMapper mapper; private final ReentrantLock mapLock; private final AliasCache ac; @@ -162,7 +161,7 @@ private void refreshAlias(String alias) { */ @Nullable private String updateAlias(String alias, @Nullable String jobs) throws Exception { if (Strings.isNullOrEmpty(alias)) { - log.warn("Ignoring alias since alias {} is null or empty ", alias); + log.warn("Ignoring alias {} since it is null or empty ", alias); return jobs; } if (Strings.isNullOrEmpty(jobs)) { @@ -199,7 +198,7 @@ protected List decodeAliases(@Nonnull String data) { public void deleteAlias(String alias) throws Exception { spawnDataStore.deleteChild(ALIAS_PATH, alias); if( !Strings.isNullOrEmpty(spawnDataStore.getChild(ALIAS_PATH, alias))) { - log.error("Couldn't delete alias from spawn datastore for alias {}", alias); + log.error("Fail to delete alias {} from spawn datastore", alias); return; } mapLock.lock(); diff --git a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java index 9de989be1..2a13d6e18 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java +++ b/hydra-main/src/main/java/com/addthis/hydra/query/spawndatastore/AliasCache.java @@ -1,3 +1,16 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.addthis.hydra.query.spawndatastore; import java.io.IOException; diff --git a/hydra-main/src/test/java/com/addthis/hydra/job/alias/AliasManagerImplTest.java b/hydra-main/src/test/java/com/addthis/hydra/job/alias/AliasManagerImplTest.java index 24b8ca5d0..a3c34d9b3 100644 --- a/hydra-main/src/test/java/com/addthis/hydra/job/alias/AliasManagerImplTest.java +++ b/hydra-main/src/test/java/com/addthis/hydra/job/alias/AliasManagerImplTest.java @@ -1,3 +1,16 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.addthis.hydra.job.alias; import com.google.common.collect.ImmutableList; @@ -10,23 +23,6 @@ import static org.junit.Assert.assertTrue; public class AliasManagerImplTest { - -// @Test -// public void testTemp() throws Exception { -// String ALIAS_PATH = "/query/alias"; -// SpawnDataStore spawnDataStore = DataStoreUtil.makeCanonicalSpawnDataStore(); -// -// AliasManager abm = new AliasManagerImpl(); -// abm.putAlias("a1", ImmutableList.of("j1", "j2")); -// -// //String sJob = spawnDataStore.getChild(ALIAS_PATH, "a1"); -// if(!Strings.isNullOrEmpty(spawnDataStore.getChild(ALIAS_PATH, "a1"))) { -// System.out.println("sJob = " + spawnDataStore.getChild(ALIAS_PATH, "a1")); -// } else { -// System.out.println("sJob = null or empty"); -// } -// } - @Test public void testConstruction() throws Exception { AliasManager abm = new AliasManagerImpl(); diff --git a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java index 02cebeef5..e14f915d3 100644 --- a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java +++ b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/AliasCacheTest.java @@ -1,3 +1,16 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.addthis.hydra.query.spawndatastore; import com.addthis.hydra.job.alias.AliasManager; diff --git a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java index 39cb97940..6f9ee578c 100644 --- a/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java +++ b/hydra-main/src/test/java/com/addthis/hydra/query/spawndatastore/SpawnDataStoreHandlerTest.java @@ -1,3 +1,16 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.addthis.hydra.query.spawndatastore; import com.addthis.hydra.job.alias.AliasManagerImpl; From a1931a9539df2521b6971f6ec621c92b2ca3afab Mon Sep 17 00:00:00 2001 From: Irene Cho Date: Tue, 15 Aug 2017 16:37:36 -0400 Subject: [PATCH 16/16] removed unused variable --- .../main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java index 37cf23be4..8c1b20f00 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/alias/AliasManagerImpl.java @@ -18,7 +18,6 @@ import javax.annotation.concurrent.ThreadSafe; import java.io.IOException; -import java.io.StringWriter; import java.util.ArrayList; import java.util.HashMap; @@ -110,7 +109,6 @@ public void putAlias(String alias, List jobs) { try { alias2jobs.put(alias, jobs); job2alias.put(jobs.get(0), alias); - StringWriter sw = new StringWriter(); spawnDataStore.putAsChild(ALIAS_PATH, alias, mapper.writeValueAsString(jobs)); } catch (Exception e) { log.warn("failed to put alias: {}", alias, e);