Skip to content

Commit

Permalink
Merge branch 'mwoznisk/invalidate_tls_caches_on_tracker_creation'
Browse files Browse the repository at this point in the history
Invalidate TLS caches when a Tracker is created.

Authored-by: Matt Wozniski <mwozniski@bloomberg.net>
Reviewed-by: Pablo Galindo <pgalindo3@bloomberg.net>
Signed-off-by: Matt Wozniski <mwozniski@bloomberg.net>
  • Loading branch information
medusa authored and GitHub Enterprise committed Apr 5, 2022
2 parents d7dd91e + 3d94fed commit 0513460
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 18 deletions.
40 changes: 22 additions & 18 deletions src/bloomberg/pensieve/_pensieve/tracking_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ starts_with(const std::string& haystack, const std::string_view& needle)
return haystack.compare(0, needle.size(), needle) == 0;
}

// Track how many times a new Tracker has been created
std::atomic<unsigned int> g_tracker_generation;

} // namespace

namespace pensieve::tracking_api {
Expand Down Expand Up @@ -114,10 +117,10 @@ class PythonStackTracker
void setMostRecentFrameLineNumber(int lineno);
int pushPythonFrame(PyFrameObject* frame);
void popPythonFrame();
void resetInChildProcess() noexcept;

private:
uint32_t d_num_pending_pops{};
uint32_t d_tracker_generation{};
std::vector<LazilyEmittedFrame>* d_stack{};
};

Expand All @@ -141,8 +144,22 @@ PythonStackTracker::reset(PyFrameObject* current_frame)
inline void
PythonStackTracker::emitPendingPops()
{
Tracker::getTracker()->popFrames(d_num_pending_pops);
d_num_pending_pops = 0;
if (d_tracker_generation != g_tracker_generation) {
// The Tracker has changed underneath us (either by an after-fork
// handler, or by another thread destroying the Tracker we were using
// and installing a new one). Either way, update our state to reflect
// that nothing has been emitted to the (new) output file.
d_tracker_generation = g_tracker_generation;
d_num_pending_pops = 0;
if (d_stack) {
for (auto it = d_stack->begin(); it != d_stack->end(); it++) {
it->emitted = false;
}
}
} else {
Tracker::getTracker()->popFrames(d_num_pending_pops);
d_num_pending_pops = 0;
}
}

void
Expand Down Expand Up @@ -244,18 +261,6 @@ PythonStackTracker::popPythonFrame()
}
}

void
PythonStackTracker::resetInChildProcess() noexcept
{
// Nothing has been emitted to the output file in this child process yet.
d_num_pending_pops = 0;
if (d_stack) {
for (auto it = d_stack->begin(); it != d_stack->end(); it++) {
it->emitted = false;
}
}
}

std::atomic<bool> Tracker::d_active = false;
std::unique_ptr<Tracker> Tracker::d_instance_owner;
std::atomic<Tracker*> Tracker::d_instance = nullptr;
Expand All @@ -271,6 +276,8 @@ Tracker::Tracker(
, d_memory_interval(memory_interval)
, d_follow_fork(follow_fork)
{
g_tracker_generation++;

// Note: this must be set before the hooks are installed.
d_instance = this;

Expand Down Expand Up @@ -416,9 +423,6 @@ Tracker::parentFork()
void
Tracker::childFork()
{
// Reset thread-local state.
t_python_stack_tracker.resetInChildProcess();

// Intentionally leak any old tracker. Its destructor cannot be called,
// because it would try to destroy mutexes that might be locked by threads
// that no longer exist, and to join a background thread that no longer
Expand Down
47 changes: 47 additions & 0 deletions tests/integration/test_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,53 @@ def foo():
assert second_alloc1.stack_id != second_alloc2.stack_id


def test_thread_surviving_multiple_trackers(tmp_path):
# GIVEN
orig_tracker_used = threading.Event()
new_tracker_installed = threading.Event()
allocator = MemoryAllocator()
output1 = tmp_path / "test.bin.1"
output2 = tmp_path / "test.bin.2"

def deeper_function():
allocator.valloc(1234)
allocator.free()
orig_tracker_used.set()
new_tracker_installed.wait()
allocator.valloc(1234)
allocator.free()

def tracking_function():
deeper_function()

# WHEN
with Tracker(output1):
bg_thread = threading.Thread(target=tracking_function)
bg_thread.start()
orig_tracker_used.wait()

with Tracker(output2):
new_tracker_installed.set()
bg_thread.join()

# THEN
tracker1_allocations = list(FileReader(output1).get_allocation_records())
tracker2_allocations = list(FileReader(output2).get_allocation_records())

tracker1_vallocs = [
event
for event in tracker1_allocations
if event.size == 1234 and event.allocator == AllocatorType.VALLOC
]
tracker2_vallocs = [
event
for event in tracker2_allocations
if event.size == 1234 and event.allocator == AllocatorType.VALLOC
]
assert len(tracker1_vallocs) == len(tracker2_vallocs) == 1
assert tracker1_vallocs[0].stack_trace() != tracker2_vallocs[0].stack_trace()


class TestMmap:
@classmethod
def allocating_function(cls):
Expand Down

0 comments on commit 0513460

Please sign in to comment.