Skip to content

Commit

Permalink
[SYSTEMDS-3355] Federated monitoring backend worker communication
Browse files Browse the repository at this point in the history
Closes apache#1608.
  • Loading branch information
MKehayov authored and mboehm7 committed May 15, 2022
1 parent 7153749 commit f33b516
Show file tree
Hide file tree
Showing 20 changed files with 999 additions and 190 deletions.
38 changes: 37 additions & 1 deletion bin/systemds
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ Worker Usage: $0 [-r] WORKER [SystemDS.jar] <portnumber> [arguments] [-help]
port : The port to open for the federated worker.
Federated Monitoring Usage: $0 [-r] FEDMONITOR [SystemDS.jar] <portnumber> [arguments] [-help]
port : The port to open for the federated monitoring tool.
Set custom launch configuration by setting/editing SYSTEMDS_STANDALONE_OPTS
and/or SYSTEMDS_DISTRIBUTED_OPTS.
Expand Down Expand Up @@ -256,6 +260,20 @@ elif echo "$1" | grep -q "WORKER"; then
printUsage
fi
shift
elif echo "$1" | grep -q "FEDMONITOR"; then
FEDMONITOR=1
shift
if echo "$1" | grep -q "jar"; then
SYSTEMDS_JAR_FILE=$1
shift
fi
PORT=$1
re='^[0-9]+$'
if ! [[ $PORT =~ $re ]] ; then
echo "error: Port is not a number"
printUsage
fi
shift
else
# handle optional '-f' before DML file (for consistency)
if echo "$1" | grep -q "\-f"; then
Expand All @@ -272,6 +290,9 @@ if [ -z "$WORKER" ] ; then
WORKER=0
fi

if [ -z "$FEDMONITOR" ] ; then
FEDMONITOR=0
fi

# find me a SystemDS jar file to run
if [ -z "$SYSTEMDS_JAR_FILE" ];then
Expand Down Expand Up @@ -409,7 +430,7 @@ print_out "# HADOOP_HOME= $HADOOP_HOME"
#build the command to run
if [ $WORKER == 1 ]; then
print_out "#"
print_out "# starting Fedederated worker on port $PORT"
print_out "# starting Federated worker on port $PORT"
print_out "###############################################################################"
CMD=" \
java $SYSTEMDS_STANDALONE_OPTS \
Expand All @@ -422,6 +443,21 @@ if [ $WORKER == 1 ]; then
print_out "Executing command: $CMD"
print_out ""

if [ $FEDMONITORING == 1 ]; then
print_out "#"
print_out "# starting Federated backend monitoring on port $PORT"
print_out "###############################################################################"
CMD=" \
java $SYSTEMDS_STANDALONE_OPTS \
-cp $CLASSPATH \
$LOG4JPROP \
org.apache.sysds.api.DMLScript \
-fedMonitor $PORT \
$CONFIG_FILE \
$*"
print_out "Executing command: $CMD"
print_out ""

elif [ $SYSDS_DISTRIBUTED == 0 ]; then
print_out "#"
print_out "# Running script $SCRIPT_FILE locally with opts: $*"
Expand Down
31 changes: 25 additions & 6 deletions src/main/java/org/apache/sysds/api/DMLOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ public class DMLOptions {
public boolean lineage_debugger = false; // whether enable lineage debugger
public boolean fedWorker = false;
public int fedWorkerPort = -1;
public int pythonPort = -1;
public boolean fedMonitoring = false;
public int fedMonitoringPort = -1;
public int pythonPort = -1;
public boolean checkPrivacy = false; // Check which privacy constraints are loaded and checked during federated execution
public boolean federatedCompilation = false; // Compile federated instructions based on input federation state and privacy constraints.
public boolean noFedRuntimeConversion = false; // If activated, no runtime conversion of CP instructions to FED instructions will be performed.
Expand All @@ -95,6 +97,7 @@ public String toString() {
", statsCount=" + statsCount +
", fedStats=" + fedStats +
", fedStatsCount=" + fedStatsCount +
", fedMonitor=" + fedMonitoring +
", memStats=" + memStats +
", explainType=" + explainType +
", execMode=" + execMode +
Expand Down Expand Up @@ -217,6 +220,7 @@ else if (lineageType.equalsIgnoreCase("debugger"))
}
}
}

dmlOptions.memStats = line.hasOption("mem");

dmlOptions.clean = line.hasOption("clean");
Expand All @@ -230,6 +234,11 @@ else if (lineageType.equalsIgnoreCase("debugger"))
dmlOptions.fedWorkerPort = Integer.parseInt(line.getOptionValue("w"));
}

if (line.hasOption("fedMonitor")) {
dmlOptions.fedMonitoring= true;
dmlOptions.fedMonitoringPort = Integer.parseInt(line.getOptionValue("fedMonitor"));
}

if (line.hasOption("f")){
dmlOptions.filePath = line.getOptionValue("f");
}
Expand Down Expand Up @@ -314,7 +323,8 @@ private static Options createCLIOptions() {
Option configOpt = OptionBuilder.withArgName("filename")
.withDescription("uses a given configuration file (can be on local/hdfs/gpfs; default values in SystemDS-config.xml")
.hasArg().create("config");
Option cleanOpt = OptionBuilder.withDescription("cleans up all SystemDS working directories (FS, DFS); all other flags are ignored in this mode.")
Option cleanOpt = OptionBuilder
.withDescription("cleans up all SystemDS working directories (FS, DFS); all other flags are ignored in this mode.")
.create("clean");
Option statsOpt = OptionBuilder.withArgName("count")
.withDescription("monitors and reports summary execution statistics; heavy hitter <count> is 10 unless overridden; default off")
Expand All @@ -335,20 +345,27 @@ private static Options createCLIOptions() {
.hasOptionalArg().create("gpu");
Option debugOpt = OptionBuilder.withDescription("runs in debug mode; default off")
.create("debug");
Option pythonOpt = OptionBuilder.withDescription("Python Context start with port argument for communication to python")
Option pythonOpt = OptionBuilder
.withDescription("Python Context start with port argument for communication to python")
.isRequired().hasArg().create("python");
Option fileOpt = OptionBuilder.withArgName("filename")
.withDescription("specifies dml/pydml file to execute; path can be local/hdfs/gpfs (prefixed with appropriate URI)")
.isRequired().hasArg().create("f");
Option scriptOpt = OptionBuilder.withArgName("script_contents")
.withDescription("specified script string to execute directly")
.isRequired().hasArg().create("s");
Option helpOpt = OptionBuilder.withDescription("shows usage message")
Option helpOpt = OptionBuilder
.withDescription("shows usage message")
.create("help");
Option lineageOpt = OptionBuilder.withDescription("computes lineage traces")
Option lineageOpt = OptionBuilder
.withDescription("computes lineage traces")
.hasOptionalArgs().create("lineage");
Option fedOpt = OptionBuilder.withDescription("starts a federated worker with the given argument as the port.")
Option fedOpt = OptionBuilder
.withDescription("starts a federated worker with the given argument as the port.")
.hasOptionalArg().create("w");
Option monitorOpt = OptionBuilder
.withDescription("Starts a federated monitoring backend with the given argument as the port.")
.hasOptionalArg().create("fedMonitor");
Option checkPrivacy = OptionBuilder
.withDescription("Check which privacy constraints are loaded and checked during federated execution")
.create("checkPrivacy");
Expand All @@ -375,6 +392,7 @@ private static Options createCLIOptions() {
options.addOption(debugOpt);
options.addOption(lineageOpt);
options.addOption(fedOpt);
options.addOption(monitorOpt);
options.addOption(checkPrivacy);
options.addOption(federatedCompilation);
options.addOption(noFedRuntimeConversion);
Expand All @@ -387,6 +405,7 @@ private static Options createCLIOptions() {
.addOption(cleanOpt)
.addOption(helpOpt)
.addOption(fedOpt)
.addOption(monitorOpt)
.addOption(pythonOpt);
fileOrScriptOpt.setRequired(true);
options.addOptionGroup(fileOrScriptOpt);
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/apache/sysds/api/DMLScript.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
import org.apache.sysds.runtime.controlprogram.federated.FederatedWorker;
import org.apache.sysds.runtime.controlprogram.federated.monitoring.FederatedMonitoringServer;
import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysds.runtime.controlprogram.parfor.util.IDHandler;
import org.apache.sysds.runtime.instructions.gpu.context.GPUContextPool;
Expand Down Expand Up @@ -284,6 +285,11 @@ public static boolean executeScript( Configuration conf, String[] args )
return true;
}

if(dmlOptions.fedMonitoring) {
new FederatedMonitoringServer(dmlOptions.fedMonitoringPort, dmlOptions.debug);
return true;
}

LineageCacheConfig.setConfig(LINEAGE_REUSE);
LineageCacheConfig.setCachePolicy(LINEAGE_POLICY);
LineageCacheConfig.setEstimator(LINEAGE_ESTIMATE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,12 @@ public static String displayFedWorkerStats() {
}

public static String displayStatistics(int numHeavyHitters) {
StringBuilder sb = new StringBuilder();
FedStatsCollection fedStats = collectFedStats();
return displayStatistics(fedStats, numHeavyHitters);
}

public static String displayStatistics(FedStatsCollection fedStats, int numHeavyHitters) {
StringBuilder sb = new StringBuilder();
sb.append("SystemDS Federated Statistics:\n");
sb.append(displayCacheStats(fedStats.cacheStats));
sb.append(String.format("Total JIT compile time:\t\t%.3f sec.\n", fedStats.jitCompileTime));
Expand Down Expand Up @@ -499,7 +503,7 @@ public static String displayFedSerializationReuseStats(long srCount, long srByte
return "";
}

private static class FedStatsCollectFunction extends FederatedUDF {
public static class FedStatsCollectFunction extends FederatedUDF {
private static final long serialVersionUID = 1L;

public FedStatsCollectFunction() {
Expand All @@ -519,7 +523,7 @@ public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
}
}

protected static class FedStatsCollection implements Serializable {
public static class FedStatsCollection implements Serializable {
private static final long serialVersionUID = 1L;

private void collectStats() {
Expand All @@ -531,7 +535,7 @@ private void collectStats() {
heavyHitters = Statistics.getHeavyHittersHashMap();
}

private void aggregate(FedStatsCollection that) {
public void aggregate(FedStatsCollection that) {
cacheStats.aggregate(that.cacheStats);
jitCompileTime += that.jitCompileTime;
gcStats.aggregate(that.gcStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,55 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
import org.apache.log4j.Logger;

public class FederatedMonitoringServer {
private final int _port;
protected static Logger log = Logger.getLogger(FederatedMonitoringServer.class);
private final int _port;

public FederatedMonitoringServer(int port) {
_port = (port == -1) ? 4201 : port;
}
private final boolean _debug;

public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
public FederatedMonitoringServer(int port, boolean debug) {
_port = (port == -1) ? 4201 : port;

try {
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel ch) {
ChannelPipeline pipeline = ch.pipeline();
_debug = debug;

pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new FederatedMonitoringServerHandler());
}
});
run();
}

ChannelFuture f = server.bind(_port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public void run() {
log.info("Setting up Federated Monitoring Backend on port " + _port);
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel ch) {
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new FederatedMonitoringServerHandler());
}
});

log.info("Starting Federated Monitoring Backend server at port: " + _port);
ChannelFuture f = server.bind(_port).sync();
log.info("Started Federated Monitoring Backend at port: " + _port);
f.channel().closeFuture().sync();
} catch(Exception e) {
log.info("Federated Monitoring Backend Interrupted");
if (_debug) {
log.error(e.getMessage());
e.printStackTrace();
}
} finally{
log.info("Federated Monitoring Backend Shutting down.");
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
Loading

0 comments on commit f33b516

Please sign in to comment.