Skip to content

Commit

Permalink
Invalidate TLS caches when a Tracker is created
Browse files Browse the repository at this point in the history
In the child process after a fork, we needed to tell the thread-local
Python stack tracker that it was now using a new Tracker, so anything it
previously wrote is not in the current output file.

Fork isn't the only case where that's required. If a long running thread
survives one Tracker being destroyed and a new one being created, that
thread must invalidate its cached information about what was already
written, since nothing has been written to the new tracker.

Make the class that manages the TLS caches detect this case, by keeping
a counter of how many trackers have been created in the process, and
remembering which tracker its cached information refers to. Before using
any cached information it checks if the tracker has been replaced, and
invalidates the cache if so.
  • Loading branch information
godlygeek committed Apr 4, 2022
1 parent c05872c commit 3d94fed
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 18 deletions.
40 changes: 22 additions & 18 deletions src/bloomberg/pensieve/_pensieve/tracking_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ starts_with(const std::string& haystack, const std::string_view& needle)
return haystack.compare(0, needle.size(), needle) == 0;
}

// Track how many times a new Tracker has been created
std::atomic<unsigned int> g_tracker_generation;

} // namespace

namespace pensieve::tracking_api {
Expand Down Expand Up @@ -114,10 +117,10 @@ class PythonStackTracker
void setMostRecentFrameLineNumber(int lineno);
int pushPythonFrame(PyFrameObject* frame);
void popPythonFrame();
void resetInChildProcess() noexcept;

private:
uint32_t d_num_pending_pops{};
uint32_t d_tracker_generation{};
std::vector<LazilyEmittedFrame>* d_stack{};
};

Expand All @@ -141,8 +144,22 @@ PythonStackTracker::reset(PyFrameObject* current_frame)
inline void
PythonStackTracker::emitPendingPops()
{
Tracker::getTracker()->popFrames(d_num_pending_pops);
d_num_pending_pops = 0;
if (d_tracker_generation != g_tracker_generation) {
// The Tracker has changed underneath us (either by an after-fork
// handler, or by another thread destroying the Tracker we were using
// and installing a new one). Either way, update our state to reflect
// that nothing has been emitted to the (new) output file.
d_tracker_generation = g_tracker_generation;
d_num_pending_pops = 0;
if (d_stack) {
for (auto it = d_stack->begin(); it != d_stack->end(); it++) {
it->emitted = false;
}
}
} else {
Tracker::getTracker()->popFrames(d_num_pending_pops);
d_num_pending_pops = 0;
}
}

void
Expand Down Expand Up @@ -244,18 +261,6 @@ PythonStackTracker::popPythonFrame()
}
}

void
PythonStackTracker::resetInChildProcess() noexcept
{
// Nothing has been emitted to the output file in this child process yet.
d_num_pending_pops = 0;
if (d_stack) {
for (auto it = d_stack->begin(); it != d_stack->end(); it++) {
it->emitted = false;
}
}
}

std::atomic<bool> Tracker::d_active = false;
std::unique_ptr<Tracker> Tracker::d_instance_owner;
std::atomic<Tracker*> Tracker::d_instance = nullptr;
Expand All @@ -271,6 +276,8 @@ Tracker::Tracker(
, d_memory_interval(memory_interval)
, d_follow_fork(follow_fork)
{
g_tracker_generation++;

// Note: this must be set before the hooks are installed.
d_instance = this;

Expand Down Expand Up @@ -416,9 +423,6 @@ Tracker::parentFork()
void
Tracker::childFork()
{
// Reset thread-local state.
t_python_stack_tracker.resetInChildProcess();

// Intentionally leak any old tracker. Its destructor cannot be called,
// because it would try to destroy mutexes that might be locked by threads
// that no longer exist, and to join a background thread that no longer
Expand Down
47 changes: 47 additions & 0 deletions tests/integration/test_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,53 @@ def foo():
assert second_alloc1.stack_id != second_alloc2.stack_id


def test_thread_surviving_multiple_trackers(tmp_path):
# GIVEN
orig_tracker_used = threading.Event()
new_tracker_installed = threading.Event()
allocator = MemoryAllocator()
output1 = tmp_path / "test.bin.1"
output2 = tmp_path / "test.bin.2"

def deeper_function():
allocator.valloc(1234)
allocator.free()
orig_tracker_used.set()
new_tracker_installed.wait()
allocator.valloc(1234)
allocator.free()

def tracking_function():
deeper_function()

# WHEN
with Tracker(output1):
bg_thread = threading.Thread(target=tracking_function)
bg_thread.start()
orig_tracker_used.wait()

with Tracker(output2):
new_tracker_installed.set()
bg_thread.join()

# THEN
tracker1_allocations = list(FileReader(output1).get_allocation_records())
tracker2_allocations = list(FileReader(output2).get_allocation_records())

tracker1_vallocs = [
event
for event in tracker1_allocations
if event.size == 1234 and event.allocator == AllocatorType.VALLOC
]
tracker2_vallocs = [
event
for event in tracker2_allocations
if event.size == 1234 and event.allocator == AllocatorType.VALLOC
]
assert len(tracker1_vallocs) == len(tracker2_vallocs) == 1
assert tracker1_vallocs[0].stack_trace() != tracker2_vallocs[0].stack_trace()


class TestMmap:
@classmethod
def allocating_function(cls):
Expand Down

0 comments on commit 3d94fed

Please sign in to comment.