Skip to content

Commit

Permalink
添加动态增删改catalog的功能
Browse files Browse the repository at this point in the history
  • Loading branch information
yuananf committed Jun 23, 2015
1 parent 0cf7e34 commit 0fca71d
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,30 @@ private synchronized void addConnector(String catalogName, String connectorId, C
}
}

public synchronized void removeConnector(String catalogName)
{
String connectorId = getConnectorId(catalogName);
metadataManager.removeConnectorMetadata(catalogName);

splitManager.removeConnectorSplitManager(makeInformationSchemaConnectorId(connectorId));
pageSourceManager.removeConnectorPageSourceProvider(makeInformationSchemaConnectorId(connectorId));
metadataManager.removeInformationSchemaMetadata(makeInformationSchemaConnectorId(connectorId), catalogName);

splitManager.removeConnectorSplitManager(makeSystemTablesConnectorId(connectorId));
pageSourceManager.removeConnectorPageSourceProvider(makeSystemTablesConnectorId(connectorId));
metadataManager.removeSystemTablesMetadata(makeSystemTablesConnectorId(connectorId), catalogName);

splitManager.removeConnectorSplitManager(connectorId);
pageSourceManager.removeConnectorPageSourceProvider(connectorId);
handleResolver.removeHandleResolver(connectorId);
metadataManager.removeConnectorsById(connectorId);

pageSinkManager.removeConnectorPageSinkProvider(connectorId);
indexManager.removeIndexResolver(connectorId);

connectors.remove(connectorId);
}

private static String makeInformationSchemaConnectorId(String connectorId)
{
return INFORMATION_SCHEMA_CONNECTOR_PREFIX + connectorId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ public void addIndexResolver(String connectorId, ConnectorIndexResolver resolver
checkState(resolvers.putIfAbsent(connectorId, resolver) == null, "IndexResolver for connector '%s' is already registered", connectorId);
}

public void removeIndexResolver(String connectorId)
{
if (resolvers.containsKey(connectorId)) {
resolvers.remove(connectorId);
}
}

public Optional<ResolvedIndex> resolveIndex(TableHandle tableHandle, Set<ColumnHandle> indexableColumns, TupleDomain<ColumnHandle> tupleDomain)
{
ConnectorIndexResolver resolver = resolvers.get(tableHandle.getConnectorId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.metadata;

import com.facebook.presto.connector.ConnectorManager;
import com.facebook.presto.server.PrestoServer.DatasourceAction;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
Expand All @@ -23,12 +24,23 @@

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.facebook.presto.server.PrestoServer.updateDatasourcesAnnouncement;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Maps.fromProperties;
Expand Down Expand Up @@ -72,6 +84,17 @@ public void loadCatalogs()
}

catalogsLoaded.set(true);

// add catalogs automatically
new Thread(() -> {
try {
log.info("-- Catalog watcher thread start --");
startCatalogWatcher(catalogConfigurationDir);
}
catch (Exception e) {
e.printStackTrace();
}
}).start();
}

private void loadCatalog(File file)
Expand Down Expand Up @@ -111,4 +134,68 @@ private static Map<String, String> loadProperties(File file)
}
return fromProperties(properties);
}

private void startCatalogWatcher(File catalogConfigurationDir) throws IOException, InterruptedException
{
WatchService watchService = FileSystems.getDefault().newWatchService();
Paths.get(catalogConfigurationDir.getAbsolutePath()).register(
watchService, StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY);
while (true) {
WatchKey key = watchService.take();
for (WatchEvent<?> event : key.pollEvents()) {
if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
log.info("New file in catalog directory : " + event.context());
Path newCatalog = (Path) event.context();
addCatalog(newCatalog);
}
else if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
log.info("Delete file from catalog directory : " + event.context());
Path deletedCatalog = (Path) event.context();
deleteCatalog(deletedCatalog);
}
else if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
log.info("Modify file from catalog directory : " + event.context());
Path modifiedCatalog = (Path) event.context();
modifyCatalog(modifiedCatalog);
}
}
boolean valid = key.reset();
if (!valid) {
break;
}
}
}

private void addCatalog(Path catalogPath)
{
File file = new File(catalogConfigurationDir, catalogPath.getFileName().toString());
if (file.isFile() && file.getName().endsWith(".properties")) {
try {
TimeUnit.SECONDS.sleep(5);
loadCatalog(file);
updateDatasourcesAnnouncement(Files.getNameWithoutExtension(catalogPath.getFileName().toString()), DatasourceAction.ADD);
}
catch (Exception e) {
e.printStackTrace();
}
}
}

private void deleteCatalog(Path catalogPath)
{
if (catalogPath.getFileName().toString().endsWith(".properties")) {
String catalogName = Files.getNameWithoutExtension(catalogPath.getFileName().toString());
log.info("-- Removing catalog %s", catalogName);
connectorManager.removeConnector(catalogName);
updateDatasourcesAnnouncement(catalogName, DatasourceAction.DELETE);
}
}

private void modifyCatalog(Path catalogPath)
{
deleteCatalog(catalogPath);
addCatalog(catalogPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ public void addHandleResolver(String id, ConnectorHandleResolver connectorHandle
checkState(existingResolver == null, "Id %s is already assigned to resolver %s", id, existingResolver);
}

public void removeHandleResolver(String id)
{
if (handleIdResolvers.containsKey(id)) {
handleIdResolvers.remove(id);
}
}

public String getId(ConnectorTableHandle tableHandle)
{
for (Entry<String, ConnectorHandleResolver> entry : handleIdResolvers.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ public synchronized void addConnectorMetadata(String connectorId, String catalog
connectorsByCatalog.put(catalogName, new ConnectorMetadataEntry(connectorId, connectorMetadata));
}

public synchronized void removeConnectorMetadata(String catalogName)
{
if (connectorsByCatalog.containsKey(catalogName)) {
connectorsByCatalog.remove(catalogName);
}
}

public synchronized void addInformationSchemaMetadata(String connectorId, String catalogName, InformationSchemaMetadata metadata)
{
checkMetadataArguments(connectorId, catalogName, metadata);
Expand All @@ -137,6 +144,16 @@ public synchronized void addInformationSchemaMetadata(String connectorId, String
informationSchemasByCatalog.put(catalogName, new ConnectorMetadataEntry(connectorId, metadata));
}

public synchronized void removeInformationSchemaMetadata(String connectorId, String catalogName)
{
if (informationSchemasByCatalog.containsKey(catalogName)) {
informationSchemasByCatalog.remove(catalogName);
}
if (connectorsById.containsKey(connectorId)) {
connectorsById.remove(connectorId);
}
}

public synchronized void addSystemTablesMetadata(String connectorId, String catalogName, ConnectorMetadata metadata)
{
checkMetadataArguments(connectorId, catalogName, metadata);
Expand All @@ -146,6 +163,23 @@ public synchronized void addSystemTablesMetadata(String connectorId, String cata
systemTablesByCatalog.put(catalogName, new ConnectorMetadataEntry(connectorId, metadata));
}

public synchronized void removeSystemTablesMetadata(String connectorId, String catalogName)
{
if (systemTablesByCatalog.containsKey(catalogName)) {
systemTablesByCatalog.remove(catalogName);
}
if (connectorsById.containsKey(connectorId)) {
connectorsById.remove(connectorId);
}
}

public synchronized void removeConnectorsById(String connectorId)
{
if (connectorsById.containsKey(connectorId)) {
connectorsById.remove(connectorId);
}
}

private void checkMetadataArguments(String connectorId, String catalogName, ConnectorMetadata metadata)
{
checkNotNull(connectorId, "connectorId is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.airlift.tracetoken.TraceTokenModule;
import org.weakref.jmx.guice.MBeanModule;

import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -54,6 +55,12 @@
public class PrestoServer
implements Runnable
{
private static Announcer announcer;

public enum DatasourceAction {
ADD, DELETE;
}

public static void main(String[] args)
{
new PrestoServer().run();
Expand Down Expand Up @@ -106,14 +113,15 @@ public void run()

injector.getInstance(CatalogManager.class).loadCatalogs();

announcer = injector.getInstance(Announcer.class);
// TODO: remove this huge hack
updateDatasources(
injector.getInstance(Announcer.class),
announcer,
injector.getInstance(Metadata.class),
injector.getInstance(ServerConfig.class),
injector.getInstance(NodeSchedulerConfig.class));

injector.getInstance(Announcer.class).start();
announcer.start();

log.info("======== SERVER STARTED ========");
}
Expand Down Expand Up @@ -166,6 +174,29 @@ private static void updateDatasources(Announcer announcer, Metadata metadata, Se
announcer.addServiceAnnouncement(builder.build());
}

public static void updateDatasourcesAnnouncement(String connectorId, DatasourceAction action)
{
// get existing announcement
ServiceAnnouncement announcement = getPrestoAnnouncement(announcer.getServiceAnnouncements());

// update datasources property
Map<String, String> properties = new LinkedHashMap<>(announcement.getProperties());
String property = nullToEmpty(properties.get("datasources"));
Set<String> datasources = new LinkedHashSet<>(Splitter.on(',').trimResults().omitEmptyStrings().splitToList(property));
if (action == DatasourceAction.ADD) {
datasources.add(connectorId);
}
else if (action == DatasourceAction.DELETE) {
datasources.remove(connectorId);
}
properties.put("datasources", Joiner.on(',').join(datasources));

// update announcement
announcer.removeServiceAnnouncement(announcement.getId());
announcer.addServiceAnnouncement(serviceAnnouncement(announcement.getType()).addProperties(properties).build());
announcer.forceAnnounce();
}

private static ServiceAnnouncement getPrestoAnnouncement(Set<ServiceAnnouncement> announcements)
{
for (ServiceAnnouncement announcement : announcements) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ public void addConnectorPageSinkProvider(String connectorId, ConnectorPageSinkPr
pageSinkProviders.put(connectorId, connectorPageSinkProvider);
}

public void removeConnectorPageSinkProvider(String connectorId)
{
if (pageSinkProviders.containsKey(connectorId)) {
pageSinkProviders.remove(connectorId);
}
}

@Override
public ConnectorPageSink createPageSink(OutputTableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ public void addConnectorPageSourceProvider(String connectorId, ConnectorPageSour
pageSourceProviders.put(connectorId, connectorPageSourceProvider);
}

public void removeConnectorPageSourceProvider(String connectorId)
{
if (pageSourceProviders.containsKey(connectorId)) {
pageSourceProviders.remove(connectorId);
}
}

@Override
public ConnectorPageSource createPageSource(Split split, List<ColumnHandle> columns)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ public void addConnectorSplitManager(String connectorId, ConnectorSplitManager c
checkState(splitManagers.putIfAbsent(connectorId, connectorSplitManager) == null, "SplitManager for connector '%s' is already registered", connectorId);
}

public void removeConnectorSplitManager(String connectorId)
{
if (splitManagers.containsKey(connectorId)) {
splitManagers.remove(connectorId);
}
}

public SplitSource getSplits(TableLayoutHandle layout)
{
String connectorId = layout.getConnectorId();
Expand Down

0 comments on commit 0fca71d

Please sign in to comment.