Skip to content

Commit

Permalink
[Core] Fix tracking of model forward time in case of PP>1 (vllm-proje…
Browse files Browse the repository at this point in the history
…ct#7440)

[Core] Fix tracking of model forward time to the span traces in case of PP>1 (vllm-project#7440)
  • Loading branch information
sfc-gh-mkeralapura authored and kylesayrs committed Aug 17, 2024
1 parent 3e5731f commit 9a707fd
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 11 deletions.
8 changes: 5 additions & 3 deletions .buildkite/test-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,9 @@ steps:
- pytest -v -s distributed/test_pynccl.py
- pytest -v -s spec_decode/e2e/test_integration_dist_tp4.py

##### fast check tests #####
##### 1 GPU test #####

- label: Metrics, Tracing Test # 10min
num_gpus: 2
fast_check: true
source_file_dependencies:
- vllm/
- tests/metrics
Expand All @@ -118,6 +117,9 @@ steps:
opentelemetry-semantic-conventions-ai"
- pytest -v -s tracing

##### fast check tests #####
##### 1 GPU test #####

- label: Regression Test # 5min
mirror_hardwares: [amd]
source_file_dependencies:
Expand Down
66 changes: 66 additions & 0 deletions tests/tracing/test_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,71 @@ def test_traces(trace_service):
SpanAttributes.LLM_LATENCY_TIME_TO_FIRST_TOKEN) == ttft
e2e_time = metrics.finished_time - metrics.arrival_time
assert attributes.get(SpanAttributes.LLM_LATENCY_E2E) == e2e_time
assert metrics.scheduler_time > 0
assert attributes.get(
SpanAttributes.LLM_LATENCY_TIME_IN_SCHEDULER) == metrics.scheduler_time
# Model forward and model execute should be none, since detailed traces is
# not enabled.
assert metrics.model_forward_time is None
assert metrics.model_execute_time is None


def test_traces_with_detailed_steps(trace_service):
os.environ[OTEL_EXPORTER_OTLP_TRACES_INSECURE] = "true"

sampling_params = SamplingParams(temperature=0.01,
top_p=0.1,
max_tokens=256)
model = "facebook/opt-125m"
llm = LLM(
model=model,
otlp_traces_endpoint=FAKE_TRACE_SERVER_ADDRESS,
collect_detailed_traces="all",
)
prompts = ["This is a short prompt"]
outputs = llm.generate(prompts, sampling_params=sampling_params)

timeout = 5
if not trace_service.evt.wait(timeout):
raise TimeoutError(
f"The fake trace service didn't receive a trace within "
f"the {timeout} seconds timeout")

attributes = decode_attributes(trace_service.request.resource_spans[0].
scope_spans[0].spans[0].attributes)
assert attributes.get(SpanAttributes.LLM_RESPONSE_MODEL) == model
assert attributes.get(
SpanAttributes.LLM_REQUEST_ID) == outputs[0].request_id
assert attributes.get(
SpanAttributes.LLM_REQUEST_TEMPERATURE) == sampling_params.temperature
assert attributes.get(
SpanAttributes.LLM_REQUEST_TOP_P) == sampling_params.top_p
assert attributes.get(
SpanAttributes.LLM_REQUEST_MAX_TOKENS) == sampling_params.max_tokens
assert attributes.get(
SpanAttributes.LLM_REQUEST_BEST_OF) == sampling_params.best_of
assert attributes.get(SpanAttributes.LLM_REQUEST_N) == sampling_params.n
assert attributes.get(SpanAttributes.LLM_USAGE_PROMPT_TOKENS) == len(
outputs[0].prompt_token_ids)
completion_tokens = sum(len(o.token_ids) for o in outputs[0].outputs)
assert attributes.get(
SpanAttributes.LLM_USAGE_COMPLETION_TOKENS) == completion_tokens
metrics = outputs[0].metrics
assert attributes.get(
SpanAttributes.LLM_LATENCY_TIME_IN_QUEUE) == metrics.time_in_queue
ttft = metrics.first_token_time - metrics.arrival_time
assert attributes.get(
SpanAttributes.LLM_LATENCY_TIME_TO_FIRST_TOKEN) == ttft
e2e_time = metrics.finished_time - metrics.arrival_time
assert attributes.get(SpanAttributes.LLM_LATENCY_E2E) == e2e_time
assert metrics.scheduler_time > 0
assert attributes.get(
SpanAttributes.LLM_LATENCY_TIME_IN_SCHEDULER) == metrics.scheduler_time
assert metrics.model_forward_time > 0
assert attributes.get(
SpanAttributes.LLM_LATENCY_TIME_IN_MODEL_FORWARD) == pytest.approx(
metrics.model_forward_time / 1000)
assert metrics.model_execute_time > 0
assert attributes.get(SpanAttributes.LLM_LATENCY_TIME_IN_MODEL_EXECUTE
) == metrics.model_execute_time
assert metrics.model_forward_time < 1000 * metrics.model_execute_time
2 changes: 1 addition & 1 deletion vllm/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1033,9 +1033,9 @@ def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]:
# Schedule sequence groups.
# This function call changes the internal states of the scheduler
# such as self.running, self.swapped, and self.waiting.
scheduler_start_time = time.perf_counter()
scheduler_outputs = self._schedule()
now = time.time()
scheduler_start_time = time.perf_counter()

if not self.cache_config.enable_prefix_caching:
common_computed_block_nums = []
Expand Down
5 changes: 0 additions & 5 deletions vllm/engine/arg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,11 +948,6 @@ def create_engine_config(self, ) -> EngineConfig:
raise ValueError(
f"Invalid module {m} in collect_detailed_traces. "
f"Valid modules are {ALLOWED_DETAILED_TRACE_MODULES}")
if (m == "model"
or m == "all") and self.pipeline_parallel_size > 1:
raise ValueError(
"Collection of detailed traces for the 'model' module is "
"not yet supported with pipeline parallelism.")
observability_config = ObservabilityConfig(
otlp_traces_endpoint=self.otlp_traces_endpoint,
collect_model_forward_time="model" in detailed_trace_modules
Expand Down
24 changes: 22 additions & 2 deletions vllm/worker/model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1554,6 +1554,21 @@ def execute_model(

# Compute the logits in the last pipeline stage.
if not get_pp_group().is_last_rank:
if (self.is_driver_worker
and hidden_or_intermediate_states is not None
and isinstance(hidden_or_intermediate_states,
IntermediateTensors)
and self.observability_config is not None
and self.observability_config.collect_model_forward_time):
model_forward_end.synchronize()
model_forward_time = model_forward_start.elapsed_time(
model_forward_end)
orig_model_forward_time = 0.0
if intermediate_tensors is not None:
orig_model_forward_time = intermediate_tensors.tensors.get(
"model_forward_time", torch.tensor(0.0)).item()
hidden_or_intermediate_states.tensors["model_forward_time"] = (
torch.tensor(model_forward_time + orig_model_forward_time))
return hidden_or_intermediate_states

logits = self.model.compute_logits(hidden_or_intermediate_states,
Expand All @@ -1573,11 +1588,16 @@ def execute_model(
model_forward_end.synchronize()
model_forward_time = model_forward_start.elapsed_time(
model_forward_end)
orig_model_forward_time = 0.0
if intermediate_tensors is not None:
orig_model_forward_time = intermediate_tensors.tensors.get(
"model_forward_time", torch.tensor(0.0)).item()
# If there are multiple workers, we are still tracking the latency
# from the start time of the driver worker to the end time of the
# driver worker. The model forward time will then end up covering
# the communication time as well.
output.model_forward_time = model_forward_time
output.model_forward_time = (orig_model_forward_time +
model_forward_time)

if self.return_hidden_states:
# we only need to pass hidden states of most recent token
Expand Down Expand Up @@ -1735,7 +1755,7 @@ def forward(
**kwargs)
if intermediate_tensors is not None:
for key in intermediate_tensors.tensors:
if key != "model_execute_time":
if key != "model_execute_time" and key != "model_forward_time":
self.input_buffers[key].copy_(intermediate_tensors[key],
non_blocking=True)
# Run the graph.
Expand Down

0 comments on commit 9a707fd

Please sign in to comment.