Skip to content

Commit

Permalink
Expose global A/AAAA and SRV DNS ServiceDiscoverer instances (#2844)
Browse files Browse the repository at this point in the history
Motivation:

If users need to wrap a `ServiceDiscoverer` to apply a filtering logic,
currently they have to create their own instance of DNS
`ServiceDiscoverer`. As a result, their use-cases uses separate DNS
cache, while other use-cases in the classpath that do not require
wrapping use global instance and share DNS cache.

Modifications:

- Add 2 new static methods in `DnsServiceDiscoverers`:
`globalARecordsDnsServiceDiscoverer()`, and
`globalSrvRecordsDnsServiceDiscoverer()`;
- Remove similar instances from `GlobalDnsServiceDiscoverer`, rename it
to `InternalServiceDiscoverers`;

Result:

Users can grab a global DNS SD instance and apply wrapping/filtering
if necessary.
  • Loading branch information
idelpivnitskiy committed Feb 21, 2024
1 parent 12190dd commit 0af167a
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ private Executors() {
* by calling {@link Runnable#run()} on the calling thread. {@link Executor#schedule(Runnable, long, TimeUnit)} will
* use a global scheduler.
* <p>
* The lifecycle of this instance shouldn't need to be managed by the user. Don't attempt to close the returned
* instance of {@link Executor}.
* The lifecycle of this instance shouldn't need to be managed by the user. The returned instance of
* {@link Executor} must not be closed.
*
* @return An {@link Executor} that executes all tasks submitted via {@link Executor#execute(Runnable)}
* immediately on the calling thread.
Expand All @@ -60,8 +60,8 @@ public static Executor immediate() {
* It creates as many threads as required but reuses threads when possible. It is therefore 'safe to block' when
* using it.
* <p>
* The lifecycle of this instance shouldn't need to be managed by the user. Don't attempt to close the returned
* instance of {@link Executor}.
* The lifecycle of this instance shouldn't need to be managed by the user. The returned instance of
* {@link Executor} must not be closed.
*
* @return An {@link Executor} which serves as a global mechanism for executing concurrent operations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
package io.servicetalk.dns.discovery.netty;

import io.servicetalk.client.api.ServiceDiscoverer;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.transport.api.HostAndPort;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.List;

import static io.servicetalk.utils.internal.ServiceLoaderUtils.loadProviders;
Expand All @@ -30,8 +33,9 @@
public final class DnsServiceDiscoverers {

private static final Logger LOGGER = LoggerFactory.getLogger(DnsServiceDiscoverers.class);

private static final List<DnsServiceDiscovererBuilderProvider> PROVIDERS;
private static final String GLOBAL_A_ID = "global-a";
private static final String GLOBAL_SRV_ID = "global-srv";

static {
final ClassLoader classLoader = DnsServiceDiscoverers.class.getClassLoader();
Expand Down Expand Up @@ -59,6 +63,70 @@ private static DnsServiceDiscovererBuilder applyProviders(String id, DnsServiceD
*/
@SuppressWarnings("deprecation")
public static DnsServiceDiscovererBuilder builder(final String id) {
LOGGER.debug("Created a new {} for id={}", DnsServiceDiscovererBuilder.class.getSimpleName(), id);
return applyProviders(id, new DefaultDnsServiceDiscovererBuilder(id));
}

/**
* Get the {@link ServiceDiscoverer} that executes <a href="https://tools.ietf.org/html/rfc1035">DNS</a> lookups for
* <a href="https://datatracker.ietf.org/doc/html/rfc1035#autoid-34">A</a>/
* <a href="https://datatracker.ietf.org/doc/html/rfc3596">AAAA</a> records for provided
* {@link HostAndPort#hostName()} with a fixed {@link HostAndPort#port()}.
* <p>
* The returned instance can be customized using {@link DnsServiceDiscovererBuilderProvider} by targeting
* {@value GLOBAL_A_ID} identity.
* <p>
* The lifecycle of this instance shouldn't need to be managed by the user. The returned instance of
* {@link ServiceDiscoverer} must not be closed.
*
* @return the singleton instance
*/
public static ServiceDiscoverer<HostAndPort, InetSocketAddress,
ServiceDiscovererEvent<InetSocketAddress>> globalARecordsDnsServiceDiscoverer() {
return ARecordsDnsServiceDiscoverer.INSTANCE;
}

/**
* Get the {@link ServiceDiscoverer} that executes <a href="https://tools.ietf.org/html/rfc1035">DNS</a> lookups for
* <a href="https://datatracker.ietf.org/doc/html/rfc2782">SRV</a> records for provided service name as a
* {@link String}.
* <p>
* The returned instance can be customized using {@link DnsServiceDiscovererBuilderProvider} by targeting
* {@value GLOBAL_SRV_ID} identity.
* <p>
* The lifecycle of this instance shouldn't need to be managed by the user. The returned instance of
* {@link ServiceDiscoverer} must not be closed.
*
* @return the singleton instance
*/
public static ServiceDiscoverer<String, InetSocketAddress,
ServiceDiscovererEvent<InetSocketAddress>> globalSrvRecordsDnsServiceDiscoverer() {
return SrvRecordsDnsServiceDiscoverer.INSTANCE;
}

private static final class ARecordsDnsServiceDiscoverer {
static final ServiceDiscoverer<HostAndPort, InetSocketAddress, ServiceDiscovererEvent<InetSocketAddress>>
INSTANCE = DnsServiceDiscoverers.builder(GLOBAL_A_ID).buildARecordDiscoverer();

static {
LOGGER.debug("Initialized {}: {}", ARecordsDnsServiceDiscoverer.class.getSimpleName(), INSTANCE);
}

private ARecordsDnsServiceDiscoverer() {
// Singleton
}
}

private static final class SrvRecordsDnsServiceDiscoverer {
static final ServiceDiscoverer<String, InetSocketAddress, ServiceDiscovererEvent<InetSocketAddress>>
INSTANCE = DnsServiceDiscoverers.builder(GLOBAL_SRV_ID).buildSrvDiscoverer();

static {
LOGGER.debug("Initialized {}: {}", SrvRecordsDnsServiceDiscoverer.class.getSimpleName(), INSTANCE);
}

private SrvRecordsDnsServiceDiscoverer() {
// Singleton
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@

import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable;
import static io.servicetalk.concurrent.api.Publisher.failed;
import static io.servicetalk.http.netty.GlobalDnsServiceDiscoverer.globalDnsServiceDiscoverer;
import static io.servicetalk.http.netty.GlobalDnsServiceDiscoverer.globalSrvDnsServiceDiscoverer;
import static io.servicetalk.http.netty.GlobalDnsServiceDiscoverer.mappingServiceDiscoverer;
import static io.servicetalk.http.netty.GlobalDnsServiceDiscoverer.resolvedServiceDiscoverer;
import static io.servicetalk.dns.discovery.netty.DnsServiceDiscoverers.globalARecordsDnsServiceDiscoverer;
import static io.servicetalk.dns.discovery.netty.DnsServiceDiscoverers.globalSrvRecordsDnsServiceDiscoverer;
import static io.servicetalk.http.netty.InternalServiceDiscoverers.mappingServiceDiscoverer;
import static io.servicetalk.http.netty.InternalServiceDiscoverers.resolvedServiceDiscoverer;
import static io.servicetalk.utils.internal.ServiceLoaderUtils.loadProviders;
import static java.util.function.Function.identity;

Expand Down Expand Up @@ -301,9 +301,9 @@ public static SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> for
*/
public static SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> forSingleAddress(
final HostAndPort address, final DiscoveryStrategy discoveryStrategy) {
return forSingleAddress(globalDnsServiceDiscoverer(), address, discoveryStrategy,
GlobalDnsServiceDiscoverer::unresolvedServiceDiscoverer,
ResolvingConnectionFactoryFilter::withGlobalDnsServiceDiscoverer);
return forSingleAddress(globalARecordsDnsServiceDiscoverer(), address, discoveryStrategy,
InternalServiceDiscoverers::unresolvedServiceDiscoverer,
ResolvingConnectionFactoryFilter::withGlobalARecordsDnsServiceDiscoverer);
}

/**
Expand All @@ -321,7 +321,7 @@ public static SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> for
public static SingleAddressHttpClientBuilder<String, InetSocketAddress> forServiceAddress(
final String serviceName) {
final ServiceDiscoverer<String, InetSocketAddress, ServiceDiscovererEvent<InetSocketAddress>> sd =
globalSrvDnsServiceDiscoverer();
globalSrvRecordsDnsServiceDiscoverer();
return applyProviders(serviceName,
new DefaultSingleAddressHttpClientBuilder<>(serviceName, sd))
// We need to pass SD into constructor to align types, but providers won't see that.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.dns.discovery.netty.DnsServiceDiscoverers;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.api.HostAndPort;
import io.servicetalk.transport.netty.internal.BuilderUtils;

Expand All @@ -41,38 +39,17 @@
import static java.util.Objects.requireNonNull;

/**
* ServiceTalk's shared DNS {@link ServiceDiscoverer} with reasonable defaults for APIs when a user doesn't provide one.
* ServiceTalk's internal {@link ServiceDiscoverer}s to serve specific use-cases.
* <p>
* A lazily initialized singleton DNS {@link ServiceDiscoverer} using a default {@link ExecutionContext}, the lifecycle
* of this instance shouldn't need to be managed by the user. Don't attempt to close the {@link ServiceDiscoverer}.
* The lifecycle of these instances shouldn't need to be managed, don't attempt to close them.
*/
final class GlobalDnsServiceDiscoverer {
private static final Logger LOGGER = LoggerFactory.getLogger(GlobalDnsServiceDiscoverer.class);
final class InternalServiceDiscoverers {
private static final Logger LOGGER = LoggerFactory.getLogger(InternalServiceDiscoverers.class);

private GlobalDnsServiceDiscoverer() {
private InternalServiceDiscoverers() {
// No instances
}

/**
* Get the {@link ServiceDiscoverer} targeting fixed ports.
*
* @return the singleton instance
*/
static ServiceDiscoverer<HostAndPort, InetSocketAddress,
ServiceDiscovererEvent<InetSocketAddress>> globalDnsServiceDiscoverer() {
return HostAndPortClientInitializer.HOST_PORT_SD;
}

/**
* Get the {@link ServiceDiscoverer} targeting SRV records.
*
* @return the singleton instance
*/
static ServiceDiscoverer<String, InetSocketAddress,
ServiceDiscovererEvent<InetSocketAddress>> globalSrvDnsServiceDiscoverer() {
return SrvClientInitializer.SRV_SD;
}

/**
* Get the {@link ServiceDiscoverer} transforming a {@link HostAndPort} into a resolved {@link InetSocketAddress}.
*
Expand Down Expand Up @@ -109,32 +86,6 @@ static <U, R> ServiceDiscoverer<U, R, ServiceDiscovererEvent<R>> mappingServiceD
return new MappingServiceDiscoverer<>(toResolvedAddressMapper, description);
}

private static final class HostAndPortClientInitializer {
static final ServiceDiscoverer<HostAndPort, InetSocketAddress, ServiceDiscovererEvent<InetSocketAddress>>
HOST_PORT_SD = DnsServiceDiscoverers.builder("global-a").buildARecordDiscoverer();

static {
LOGGER.debug("Initialized {}", HostAndPortClientInitializer.class);
}

private HostAndPortClientInitializer() {
// Singleton
}
}

private static final class SrvClientInitializer {
static final ServiceDiscoverer<String, InetSocketAddress, ServiceDiscovererEvent<InetSocketAddress>> SRV_SD =
DnsServiceDiscoverers.builder("global-srv").buildSrvDiscoverer();

static {
LOGGER.debug("Initialized {}", SrvClientInitializer.class);
}

private SrvClientInitializer() {
// Singleton
}
}

private static final class ResolvedServiceDiscovererInitializer {

static final ServiceDiscoverer<HostAndPort, InetSocketAddress,
Expand All @@ -144,7 +95,8 @@ private static final class ResolvedServiceDiscovererInitializer {
InetSocketAddress.class.getSimpleName());

static {
LOGGER.debug("Initialized {}", ResolvedServiceDiscovererInitializer.class);
LOGGER.debug("Initialized {}: {}",
ResolvedServiceDiscovererInitializer.class.getSimpleName(), RESOLVED_SD);
}

private ResolvedServiceDiscovererInitializer() {
Expand All @@ -160,7 +112,8 @@ private static final class UnresolvedServiceDiscovererInitializer {
HostAndPort.class.getSimpleName() + " to an unresolved " + InetSocketAddress.class.getSimpleName());

static {
LOGGER.debug("Initialized {}", UnresolvedServiceDiscovererInitializer.class);
LOGGER.debug("Initialized {}: {}",
UnresolvedServiceDiscovererInitializer.class.getSimpleName(), UNRESOLVED_SD);
}

private UnresolvedServiceDiscovererInitializer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@
import javax.annotation.Nullable;

import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.AVAILABLE;
import static io.servicetalk.http.netty.GlobalDnsServiceDiscoverer.globalDnsServiceDiscoverer;
import static io.servicetalk.dns.discovery.netty.DnsServiceDiscoverers.globalARecordsDnsServiceDiscoverer;
import static java.util.Objects.requireNonNull;

/**
* A {@link ConnectionFactoryFilter} that will resolve the passed unresolved {@link InetSocketAddress} on each attempt
* to create a {@link ConnectionFactory#newConnection(Object, ContextMap, TransportObserver) new connection} using
* {@link GlobalDnsServiceDiscoverer#globalDnsServiceDiscoverer()}.
* A {@link ConnectionFactoryFilter} that will resolve the passed {@link U unresolved address} on each attempt
* to create a {@link ConnectionFactory#newConnection(Object, ContextMap, TransportObserver) new connection} using the
* passed {@link ServiceDiscoverer}.
*
* @param <U> the type of address before resolution (unresolved address)
* @param <R> the type of address after resolution (resolved address)
Expand Down Expand Up @@ -123,14 +123,14 @@ public String toString() {
'}';
}

static ResolvingConnectionFactoryFilter<HostAndPort, InetSocketAddress> withGlobalDnsServiceDiscoverer() {
static ResolvingConnectionFactoryFilter<HostAndPort, InetSocketAddress> withGlobalARecordsDnsServiceDiscoverer() {
return DefaultResolvingConnectionFactoryFilterInitializer.INSTANCE;
}

private static final class DefaultResolvingConnectionFactoryFilterInitializer {

static final ResolvingConnectionFactoryFilter<HostAndPort, InetSocketAddress> INSTANCE =
new ResolvingConnectionFactoryFilter<>(HostAndPort::of, globalDnsServiceDiscoverer());
new ResolvingConnectionFactoryFilter<>(HostAndPort::of, globalARecordsDnsServiceDiscoverer());

private DefaultResolvingConnectionFactoryFilterInitializer() {
// Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import javax.annotation.Nullable;
import javax.net.ssl.SSLSession;

import static io.servicetalk.http.netty.GlobalDnsServiceDiscoverer.mappingServiceDiscoverer;
import static io.servicetalk.http.netty.InternalServiceDiscoverers.mappingServiceDiscoverer;
import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.startsWith;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@

import java.net.InetSocketAddress;

import static io.servicetalk.dns.discovery.netty.DnsServiceDiscoverers.globalARecordsDnsServiceDiscoverer;
import static io.servicetalk.http.api.HttpResponseStatus.OK;
import static io.servicetalk.http.netty.GlobalDnsServiceDiscoverer.globalDnsServiceDiscoverer;
import static io.servicetalk.http.netty.GlobalDnsServiceDiscoverer.mappingServiceDiscoverer;
import static io.servicetalk.http.netty.InternalServiceDiscoverers.mappingServiceDiscoverer;
import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress;
import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort;
import static io.servicetalk.transport.netty.internal.BuilderUtils.toResolvedInetSocketAddress;
Expand Down Expand Up @@ -76,7 +76,7 @@ void inetSocketAddress(HttpProtocol protocol) throws Exception {
@Test
void hostAndPortThrowIfSdChanges() {
ServiceDiscoverer<HostAndPort, InetSocketAddress, ServiceDiscovererEvent<InetSocketAddress>> otherSd =
globalDnsServiceDiscoverer();
globalARecordsDnsServiceDiscoverer();
HostAndPort address = HostAndPort.of("127.0.0.1", 8080);
IllegalArgumentException e = assertThrows(IllegalArgumentException.class,
() -> HttpClients.forResolvedAddress(address).serviceDiscoverer(otherSd));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable;
import static io.servicetalk.concurrent.api.Publisher.never;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.dns.discovery.netty.DnsServiceDiscoverers.globalARecordsDnsServiceDiscoverer;
import static io.servicetalk.http.api.HttpResponseStatus.OK;
import static io.servicetalk.http.netty.GlobalDnsServiceDiscoverer.globalDnsServiceDiscoverer;
import static io.servicetalk.http.netty.HttpClients.DiscoveryStrategy.ON_NEW_CONNECTION;
import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress;
import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort;
Expand Down Expand Up @@ -95,8 +95,7 @@ void forMultiAddressUrlWithCustomServiceDiscoverer() throws Exception {
BlockingHttpClient client = HttpClients.forMultiAddressUrl(getClass().getSimpleName(),
// Wrap to pretend this is a custom SD:
new DelegatingServiceDiscoverer<HostAndPort, InetSocketAddress,
ServiceDiscovererEvent<InetSocketAddress>>(
GlobalDnsServiceDiscoverer.globalDnsServiceDiscoverer()) {
ServiceDiscovererEvent<InetSocketAddress>>(globalARecordsDnsServiceDiscoverer()) {
@Override
public Publisher<Collection<ServiceDiscovererEvent<InetSocketAddress>>> discover(
HostAndPort hostAndPort) {
Expand Down Expand Up @@ -175,7 +174,7 @@ void withCustomServiceDiscoverer(HttpProtocol protocol) throws Exception {
@Test
void attemptToOverrideServiceDiscovererThrows() {
ServiceDiscoverer<HostAndPort, InetSocketAddress, ServiceDiscovererEvent<InetSocketAddress>> otherSd =
globalDnsServiceDiscoverer();
globalARecordsDnsServiceDiscoverer();
IllegalArgumentException e = assertThrows(IllegalArgumentException.class,
() -> HttpClients.forSingleAddress("servicetalk.io", 80, ON_NEW_CONNECTION).serviceDiscoverer(otherSd));
assertThat(e.getMessage(), allOf(containsString(ON_NEW_CONNECTION.name()), containsString(otherSd.toString())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static io.servicetalk.concurrent.api.Completable.completed;
import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.dns.discovery.netty.DnsServiceDiscoverers.globalARecordsDnsServiceDiscoverer;
import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_LENGTH;
import static io.servicetalk.http.api.HttpHeaderNames.HOST;
import static io.servicetalk.http.api.HttpHeaderNames.LOCATION;
Expand All @@ -67,7 +68,6 @@
import static io.servicetalk.http.api.HttpResponseStatus.PERMANENT_REDIRECT;
import static io.servicetalk.http.api.HttpResponseStatus.SEE_OTHER;
import static io.servicetalk.http.api.HttpResponseStatus.UNAUTHORIZED;
import static io.servicetalk.http.netty.GlobalDnsServiceDiscoverer.globalDnsServiceDiscoverer;
import static io.servicetalk.transport.netty.internal.AddressUtils.hostHeader;
import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress;
import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort;
Expand Down Expand Up @@ -151,7 +151,7 @@ public Publisher<Collection<ServiceDiscovererEvent<InetSocketAddress>>> discover
return Publisher.failed(new UnknownHostException(
"Special domain name \"" + INVALID_HOSTNAME + "\" always returns NXDOMAIN"));
}
return globalDnsServiceDiscoverer().discover(hostAndPort);
return globalARecordsDnsServiceDiscoverer().discover(hostAndPort);
}

@Override
Expand Down

0 comments on commit 0af167a

Please sign in to comment.