Skip to content

Commit

Permalink
Move MemoryPool JMX export out of LocalMemoryManager
Browse files Browse the repository at this point in the history
  • Loading branch information
cberner committed Jun 4, 2015
1 parent 30c7a57 commit 80b481e
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,10 @@
*/
package com.facebook.presto.memory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import org.weakref.jmx.JmxException;
import org.weakref.jmx.MBeanExporter;
import org.weakref.jmx.ObjectNames;

import javax.annotation.PreDestroy;
import javax.inject.Inject;

import java.util.List;
Expand All @@ -36,26 +30,23 @@ public final class LocalMemoryManager
{
public static final MemoryPoolId GENERAL_POOL = new MemoryPoolId("general");
public static final MemoryPoolId RESERVED_POOL = new MemoryPoolId("reserved");
private static final Logger log = Logger.get(LocalMemoryManager.class);

private final DataSize maxMemory;
private final MBeanExporter exporter;
private final Map<MemoryPoolId, MemoryPool> pools;

@Inject
public LocalMemoryManager(MemoryManagerConfig config, ReservedSystemMemoryConfig systemMemoryConfig, MBeanExporter exporter)
public LocalMemoryManager(MemoryManagerConfig config, ReservedSystemMemoryConfig systemMemoryConfig)
{
this.exporter = requireNonNull(exporter, "exporter is null");
requireNonNull(config, "config is null");
requireNonNull(systemMemoryConfig, "systemMemoryConfig is null");
long maxHeap = Runtime.getRuntime().maxMemory();
checkArgument(systemMemoryConfig.getReservedSystemMemory().toBytes() < maxHeap, "Reserved memory %s is greater than available heap %s", systemMemoryConfig.getReservedSystemMemory(), new DataSize(maxHeap, BYTE));
maxMemory = new DataSize(maxHeap - systemMemoryConfig.getReservedSystemMemory().toBytes(), BYTE);

ImmutableMap.Builder<MemoryPoolId, MemoryPool> builder = ImmutableMap.builder();
builder.put(RESERVED_POOL, createPool(RESERVED_POOL, config.getMaxQueryMemoryPerNode(), exporter, config.isClusterMemoryManagerEnabled()));
builder.put(RESERVED_POOL, new MemoryPool(RESERVED_POOL, config.getMaxQueryMemoryPerNode(), config.isClusterMemoryManagerEnabled()));
DataSize generalPoolSize = new DataSize(Math.max(0, maxMemory.toBytes() - config.getMaxQueryMemoryPerNode().toBytes()), BYTE);
builder.put(GENERAL_POOL, createPool(GENERAL_POOL, generalPoolSize, exporter, config.isClusterMemoryManagerEnabled()));
builder.put(GENERAL_POOL, new MemoryPool(GENERAL_POOL, generalPoolSize, config.isClusterMemoryManagerEnabled()));
this.pools = builder.build();
}

Expand All @@ -68,7 +59,6 @@ public MemoryInfo getInfo()
return new MemoryInfo(maxMemory, builder.build());
}

@VisibleForTesting
public List<MemoryPool> getPools()
{
return ImmutableList.copyOf(pools.values());
Expand All @@ -78,31 +68,4 @@ public MemoryPool getPool(MemoryPoolId id)
{
return pools.get(id);
}

private static MemoryPool createPool(MemoryPoolId id, DataSize size, MBeanExporter exporter, boolean enableBlocking)
{
MemoryPool pool = new MemoryPool(id, size, enableBlocking);
try {
String objectName = ObjectNames.builder(MemoryPool.class, pool.getId().toString()).build();
exporter.export(objectName, pool);
}
catch (JmxException e) {
log.warn(e, "Unable to export memory pool %s", id);
}
return pool;
}

@PreDestroy
public void destroy()
{
for (MemoryPool pool : pools.values()) {
String objectName = ObjectNames.builder(MemoryPool.class, pool.getId().toString()).build();
try {
exporter.unexport(objectName);
}
catch (JmxException e) {
log.warn(e, "Unable to unexport memory pool %s", pool.getId());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.memory;

import org.weakref.jmx.JmxException;
import org.weakref.jmx.MBeanExporter;
import org.weakref.jmx.ObjectNames;

import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;

import java.util.ArrayList;
import java.util.List;

import static java.util.Objects.requireNonNull;

public final class LocalMemoryManagerExporter
{
private final MBeanExporter exporter;
@GuardedBy("this")
private final List<MemoryPool> pools = new ArrayList<>();

@Inject
public LocalMemoryManagerExporter(LocalMemoryManager memoryManager, MBeanExporter exporter)
{
this.exporter = requireNonNull(exporter, "exporter is null");
for (MemoryPool pool : memoryManager.getPools()) {
addPool(pool);
}
}

private synchronized void addPool(MemoryPool pool)
{
try {
String objectName = ObjectNames.builder(MemoryPool.class, pool.getId().toString()).build();
exporter.export(objectName, pool);
pools.add(pool);
}
catch (JmxException e) {
// ignored
}
}

@PreDestroy
public synchronized void destroy()
{
for (MemoryPool pool : pools) {
String objectName = ObjectNames.builder(MemoryPool.class, pool.getId().toString()).build();
try {
exporter.unexport(objectName);
}
catch (JmxException e) {
// ignored
}
}
pools.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.facebook.presto.memory.ClusterMemoryManager;
import com.facebook.presto.memory.ForMemoryManager;
import com.facebook.presto.memory.LocalMemoryManager;
import com.facebook.presto.memory.LocalMemoryManagerExporter;
import com.facebook.presto.memory.MemoryInfo;
import com.facebook.presto.memory.MemoryManagerConfig;
import com.facebook.presto.memory.MemoryPoolAssignmentsRequest;
Expand Down Expand Up @@ -154,7 +155,7 @@ protected void setup(Binder binder)
newExporter(binder).export(ClusterMemoryManager.class).withGeneratedName();
binder.bind(ClusterMemoryManager.class).in(Scopes.SINGLETON);
binder.bind(LocalMemoryManager.class).in(Scopes.SINGLETON);
newExporter(binder).export(LocalMemoryManager.class).withGeneratedName();
binder.bind(LocalMemoryManagerExporter.class).in(Scopes.SINGLETON);
newExporter(binder).export(TaskManager.class).withGeneratedName();
binder.bind(TaskExecutor.class).in(Scopes.SINGLETON);
newExporter(binder).export(TaskExecutor.class).withGeneratedName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import io.airlift.units.Duration;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
import org.weakref.jmx.MBeanExporter;
import org.weakref.jmx.testing.TestingMBeanServer;

import java.net.URI;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -62,7 +60,7 @@ public class TestSqlTaskManager

public TestSqlTaskManager()
{
localMemoryManager = new LocalMemoryManager(new MemoryManagerConfig(), new ReservedSystemMemoryConfig(), new MBeanExporter(new TestingMBeanServer()));
localMemoryManager = new LocalMemoryManager(new MemoryManagerConfig(), new ReservedSystemMemoryConfig());
taskExecutor = new TaskExecutor(8, 16);
taskExecutor.start();
}
Expand All @@ -72,7 +70,6 @@ public void tearDown()
throws Exception
{
taskExecutor.stop();
localMemoryManager.destroy();
}

@Test
Expand Down

0 comments on commit 80b481e

Please sign in to comment.