Skip to content

Commit

Permalink
Remove ZK candidate dep (linkerd#2361)
Browse files Browse the repository at this point in the history
The artifact for com.twitter.common.zookeeper" % "candidate" % "0.0.84" on maven.twttr.com is not present anymore. Therefore we need to cpy the bits that we use from it and have it in our sourcecode.

Fixes linkerd#2360
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
  • Loading branch information
zaharidichev authored and adleong committed Jan 7, 2020
1 parent e038d9e commit 5651f35
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// =================================================================================================
// Copyright 2011 Twitter, Inc.
// -------------------------------------------------------------------------------------------------
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this work except in compliance with the License.
// You may obtain a copy of the License in the LICENSE file, or at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =================================================================================================

package com.twitter.common.zookeeper;

import com.google.common.base.Optional;
import com.google.common.base.Supplier;

import com.twitter.finagle.common.zookeeper.Group.JoinException;
import com.twitter.finagle.common.zookeeper.Group.WatchException;
import com.twitter.finagle.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;

import org.apache.zookeeper.KeeperException;


/**
* Interface definition for becoming or querying for a ZooKeeper-based group leader.
*/
public interface Candidate {

/**
* Returns the current group leader by querying ZooKeeper synchronously.
*
* @return the current group leader's identifying data or {@link Optional#absent()} if there is
* no leader
* @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
* @throws KeeperException if there was a problem reading the leader information
* @throws InterruptedException if this thread is interrupted getting the leader
*/
public Optional<byte[]> getLeaderData()
throws ZooKeeperConnectionException, KeeperException, InterruptedException;

/**
* Encapsulates a leader that can be elected and subsequently defeated.
*/
interface Leader {

/**
* Called when this leader has been elected.
*
* @param abdicate a command that can be used to abdicate leadership and force a new election
*/
void onElected(ExceptionalCommand<JoinException> abdicate);

/**
* Called when the leader has been ousted. Can occur either if the leader abdicates or if an
* external event causes the leader to lose its leadership role (session expiration).
*/
void onDefeated();
}

/**
* Offers this candidate in leadership elections for as long as the current jvm process is alive.
* Upon election, the {@code onElected} callback will be executed and a command that can be used
* to abdicate leadership will be passed in. If the elected leader jvm process dies or the
* elected leader successfully abdicates then a new leader will be elected. Leaders that
* successfully abdicate are removed from the group and will not be eligible for leadership
* election unless {@link #offerLeadership(Leader)} is called again.
*
* @param leader the leader to notify of election and defeat events
* @throws JoinException if there was a problem joining the group
* @throws WatchException if there is a problem generating the 1st group membership list
* @throws InterruptedException if interrupted waiting to join the group and determine initial
* election results
* @return a supplier that can be queried to find out if this leader is currently elected
*/
public Supplier<Boolean> offerLeadership(Leader leader)
throws JoinException, WatchException, InterruptedException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// =================================================================================================
// Copyright 2011 Twitter, Inc.
// -------------------------------------------------------------------------------------------------
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this work except in compliance with the License.
// You may obtain a copy of the License in the LICENSE file, or at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =================================================================================================

package com.twitter.common.zookeeper;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.annotation.Nullable;

import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import org.apache.zookeeper.KeeperException;
import com.twitter.finagle.common.zookeeper.Group;
import com.twitter.finagle.common.zookeeper.Group.JoinException;
import com.twitter.finagle.common.zookeeper.Group.GroupChangeListener;
import com.twitter.finagle.common.zookeeper.Group.Membership;
import com.twitter.finagle.common.zookeeper.Group.WatchException;
import com.twitter.finagle.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;

/**
* Implements leader election for small groups of candidates. This implementation is subject to the
* <a href="http://hadoop.apache.org/zookeeper/docs/r3.2.1/recipes.html#sc_leaderElection">
* herd effect</a> for a given group and should only be used for small (~10 member) candidate pools.
*/
public class CandidateImpl implements Candidate {
private static final Logger LOG = Logger.getLogger(CandidateImpl.class.getName());

private static final byte[] UNKNOWN_CANDIDATE_DATA = "<unknown>".getBytes(Charsets.UTF_8);

private static final Supplier<byte[]> IP_ADDRESS_DATA_SUPPLIER = new Supplier<byte[]>() {
@Override public byte[] get() {
try {
return InetAddress.getLocalHost().getHostAddress().getBytes();
} catch (UnknownHostException e) {
LOG.log(Level.WARNING, "Failed to determine local address!", e);
return UNKNOWN_CANDIDATE_DATA;
}
}
};

private static final Function<Iterable<String>, String> MOST_RECENT_JUDGE =
new Function<Iterable<String>, String>() {
@Override public String apply(Iterable<String> candidates) {
return Ordering.natural().min(candidates);
}
};

private final Group group;
private final Function<Iterable<String>, String> judge;
private final Supplier<byte[]> dataSupplier;

/**
* Equivalent to {@link #CandidateImpl(Group, com.google.common.base.Function, Supplier)} using a
* judge that always picks the lowest numbered candidate ephemeral node - by proxy the oldest or
* 1st candidate and a default supplier that provides the ip address of this host according to
* {@link java.net.InetAddress#getLocalHost()} as the leader identifying data.
*/
public CandidateImpl(Group group) {
this(group, MOST_RECENT_JUDGE, IP_ADDRESS_DATA_SUPPLIER);
}

/**
* Creates a candidate that can be used to offer leadership for the given {@code group} using
* a judge that always picks the lowest numbered candidate ephemeral node - by proxy the oldest
* or 1st. The dataSupplier should produce bytes that identify this process as leader. These bytes
* will become available to all participants via the {@link Candidate#getLeaderData()} method.
*/
public CandidateImpl(Group group, Supplier<byte[]> dataSupplier) {
this(group, MOST_RECENT_JUDGE, dataSupplier);
}

/**
* Creates a candidate that can be used to offer leadership for the given {@code group}. The
* {@code judge} is used to pick the current leader from all group members whenever the group
* membership changes. To form a well-behaved election group with one leader, all candidates
* should use the same judge. The dataSupplier should produce bytes that identify this process
* as leader. These bytes will become available to all participants via the
* {@link Candidate#getLeaderData()} method.
*/
public CandidateImpl(
Group group,
Function<Iterable<String>, String> judge,
Supplier<byte[]> dataSupplier) {
this.group = Preconditions.checkNotNull(group);
this.judge = Preconditions.checkNotNull(judge);
this.dataSupplier = Preconditions.checkNotNull(dataSupplier);
}

@Override
public Optional<byte[]> getLeaderData()
throws ZooKeeperConnectionException, KeeperException, InterruptedException {

String leaderId = getLeader(group.getMemberIds());
return leaderId == null
? Optional.<byte[]>absent()
: Optional.of(group.getMemberData(leaderId));
}

@Override
public Supplier<Boolean> offerLeadership(final Leader leader)
throws JoinException, WatchException, InterruptedException {

final Membership membership = group.join(dataSupplier, () -> leader.onDefeated());

final AtomicBoolean elected = new AtomicBoolean(false);
final AtomicBoolean abdicated = new AtomicBoolean(false);
group.watch(new GroupChangeListener() {
@Override public void onGroupChange(Iterable<String> memberIds) {
boolean noCandidates = Iterables.isEmpty(memberIds);
String memberId = membership.getMemberId();

if (noCandidates) {
LOG.warning("All candidates have temporarily left the group: " + group);
} else if (!Iterables.contains(memberIds, memberId)) {
LOG.severe(String.format(
"Current member ID %s is not a candidate for leader, current voting: %s",
memberId, memberIds));
} else {
boolean electedLeader = memberId.equals(getLeader(memberIds));
boolean previouslyElected = elected.getAndSet(electedLeader);

if (!previouslyElected && electedLeader) {
LOG.info(String.format("Candidate %s is now leader of group: %s",
membership.getMemberPath(), memberIds));

leader.onElected(new ExceptionalCommand<JoinException>() {
@Override public void execute() throws JoinException {
membership.cancel();
abdicated.set(true);
}
});
} else if (!electedLeader) {
if (previouslyElected) {
leader.onDefeated();
}
LOG.info(String.format(
"Candidate %s waiting for the next leader election, current voting: %s",
membership.getMemberPath(), memberIds));
}
}
}
});

return new Supplier<Boolean>() {
@Override public Boolean get() {
return !abdicated.get() && elected.get();
}
};
}

@Nullable
private String getLeader(Iterable<String> memberIds) {
return Iterables.isEmpty(memberIds) ? null : judge.apply(memberIds);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// =================================================================================================
// Copyright 2011 Twitter, Inc.
// -------------------------------------------------------------------------------------------------
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this work except in compliance with the License.
// You may obtain a copy of the License in the LICENSE file, or at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =================================================================================================

package com.twitter.common.zookeeper;

/**
* An interface that captures a unit of work.
*
* @param <E> The type of exception that the command throws.
*
* @author John Sirois
*/
public interface ExceptionalCommand<E extends Exception> {

/**
* Performs a unit of work, possibly throwing {@code E} in the process.
*
* @throws E if there was a problem performing the work
*/
void execute() throws E;
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package io.buoyant.namer.zk

import com.twitter.common.zookeeper.Group.GroupChangeListener
import com.twitter.common.zookeeper._
import com.twitter.finagle.util.InetSocketAddressUtil
import com.twitter.finagle.{Group => _, _}
import com.twitter.util._
import io.buoyant.config.types.HostAndPort
import java.net.InetSocketAddress

import com.twitter.common.zookeeper.{Candidate, CandidateImpl}
import com.twitter.finagle.common.zookeeper.Group.GroupChangeListener
import com.twitter.finagle.common.zookeeper.{Group, ZooKeeperClient, ZooKeeperUtils}
import org.apache.zookeeper.KeeperException.NoNodeException
import org.apache.zookeeper.data.ACL

import scala.collection.JavaConverters._

/**
Expand Down Expand Up @@ -51,7 +54,7 @@ case class ZkLeaderNamer(
)

Closable.make { _ =>
stop.execute()
stop.run()
Future.Unit
}
}
Expand Down
5 changes: 1 addition & 4 deletions project/Deps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ object Deps {

def twitterUtil(mod: String) =
"com.twitter" %% s"util-$mod" % "19.5.1"

// networking
def finagle(mod: String) =
"com.twitter" %% s"finagle-$mod" % "19.5.1"
Expand All @@ -22,10 +23,6 @@ object Deps {

val boringssl = "io.netty" % "netty-tcnative-boringssl-static" % "2.0.19.Final"

def zkCandidate =
("com.twitter.common.zookeeper" % "candidate" % "0.0.84")
.exclude("com.twitter.common", "util")

// Jackson (parsing)
val jacksonVersion = "2.9.6"
val jacksonCore =
Expand Down
2 changes: 1 addition & 1 deletion project/LinkerdBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ object LinkerdBuild extends Base {

val zkLeader = projectDir("namer/zk-leader")
.dependsOn(core)
.withLib(Deps.zkCandidate)
.withTwitterLib(Deps.finagle("serversets").exclude("org.slf4j", "slf4j-jdk14"))
.withTests()

val rancher = projectDir("namer/rancher")
Expand Down

0 comments on commit 5651f35

Please sign in to comment.