import asyncio
from typing import List
from copy import deepcopy
from transformers import AutoTokenizer
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
from vllm.v1.engine.core_client import DPLBAsyncMPClienttry:
model_id = "ByteDance/Ouro-2.6B-Thinking"
dp_size = 8
def get_engine_args():
global model_id
global dp_size
engine_args = AsyncEngineArgs(
model=model_id,
tokenizer=model_id,
tensor_parallel_size=1,
data_parallel_size=dp_size,
dtype="bfloat16",
trust_remote_code=True,
max_model_len=32768,
enforce_eager = False,
model_impl = 'auto'
)
return engine_args
def create_vllm_async_engine():
engine_args = get_engine_args()
llm = AsyncLLMEngine.from_engine_args(engine_args)
return llm
async def _shutdown_engine(engine: AsyncLLMEngine | None):
if engine is None:
return
try:
engine.shutdown()
print("vLLM AsyncEngine shut down gracefully.")
except Exception as e:
print(f"Error when closing engind: {e}")
def shutdown_vllm_async_engine(llm):
asyncio.run(_shutdown_engine(llm))
async def _fetch_answer(llm: AsyncLLMEngine,
tokens: List[int],
dp_queue_size: list[int],
prompt_idx: int):
## <--- Load Balancer --->
dp_idx = 0
best_queue = float('inf')
for i, v in enumerate(dp_queue_size):
if v < best_queue:
best_queue = v
dp_idx = i
## <--- Load Balancer --->
sampling_params = SamplingParams(n = 1,
temperature = 1.0,
top_p = 0.95,
max_tokens=30000
)
response = "<LLM Error>"
try:
dp_queue_size[dp_idx] += 1
request_id = f"request-{prompt_idx}"
# Get a prompt idx from the queue
generator = llm.generate(tokens,
sampling_params,
request_id=request_id,
data_parallel_rank = dp_idx)
final_result = None
async for result in generator:
final_result = result
outs = [x.text for x in final_result.outputs]
response = outs[0]
except Exception as e:
raise e
finally:
dp_queue_size[dp_idx] -= 1
return response
async def main(llm: AsyncLLMEngine):
global model_id
global dp_size
problem = [{"content": "Think before answering the question. Provide very detailed explanation of your solution with examples when possible. You are an expert C# programmer. You will be given a question (problem specification) and will generate a correct C# program that matches the specification and passes all tests.\",\"role\":\"system\"},{\"content\":\"### Question:\\nYou are given a string caption of length n. A good caption is a string where every character appears in groups of at least 3 consecutive occurrences.\\nFor example:\\n\\n\\\"aaabbb\\\" and \\\"aaaaccc\\\" are good captions.\\n\\\"aabbb\\\" and \\\"ccccd\\\" are not good captions.\\n\\nYou can perform the following operation any number of times:\\nChoose an index i (where 0 <= i < n) and change the character at that index to either:\\n\\nThe character immediately before it in the alphabet (if caption[i] != 'a').\\nThe character immediately after it in the alphabet (if caption[i] != 'z').\\n\\nYour task is to convert the given caption into a good caption using the minimum number of operations, and return it. If there are multiple possible good captions, return the lexicographically smallest one among them. If it is impossible to create a good caption, return an empty string \\\"\\\".\\n \\nExample 1:\\n\\nInput: caption = \\\"cdcd\\\"\\nOutput: \\\"cccc\\\"\\nExplanation:\\nIt can be shown that the given caption cannot be transformed into a good caption with fewer than 2 operations. The possible good captions that can be created using exactly 2 operations are:\\n\\n\\\"dddd\\\": Change caption[0] and caption[2] to their next character 'd'.\\n\\\"cccc\\\": Change caption[1] and caption[3] to their previous character 'c'.\\n\\nSince \\\"cccc\\\" is lexicographically smaller than \\\"dddd\\\", return \\\"cccc\\\".\\n\\nExample 2:\\n\\nInput: caption = \\\"aca\\\"\\nOutput: \\\"aaa\\\"\\nExplanation:\\nIt can be proven that the given caption requires at least 2 operations to be transformed into a good caption. The only good caption that can be obtained with exactly 2 operations is as follows:\\n\\nOperation 1: Change caption[1] to 'b'. caption = \\\"aba\\\".\\nOperation 2: Change caption[1] to 'a'. caption = \\\"aaa\\\".\\n\\nThus, return \\\"aaa\\\".\\n\\nExample 3:\\n\\nInput: caption = \\\"bc\\\"\\nOutput: \\\"\\\"\\nExplanation:\\nIt can be shown that the given caption cannot be converted to a good caption by using any number of operations.\\n\\n \\nConstraints:\\n\\n1 <= caption.length <= 5 * 10^4\\ncaption consists only of lowercase English letters.\\n\\n\\n\\n### Format: Read the inputs from stdin solve the problem and write the answer to stdout (do not directly test on the sample inputs). Enclose your code within delimiters as follows. Ensure that when the csharp program runs, it reads the inputs, runs the algorithm and writes output to STDOUT.\\n\\n\\n For 2D arrays, the first line indicates the number of rows, followed by newline-separated rows.\\n\\nSample Input 1:\\n\\ncdcd\\n\\nSample Output 1:\\n\\ncccc\\n\\n\\nSample Input 2:\\n\\naca\\n\\nSample Output 2:\\n\\naaa\\n\\n\\nSample Input 3:\\n\\nbc\\n\\nSample Output 3:\\n\\n\\n\\n\\n```csharp\\n// YOUR CODE HERE\\n```\\n\\n### Answer: (use the provided format with backticks)\\n\\n",
"role":"user"}]
tokenizer = AutoTokenizer.from_pretrained(model_id)
tasks = set()
queue_size = 40
prompt_idx = 0
dp_queue_size = [0] * dp_size
while True:
while len(tasks)<queue_size:
chat = deepcopy(problem)
content = chat[0]['content']
prompt_idx += 1
# randomization in the prompt prevents using same kv cache
chat[0]['content'] = f"Message id={prompt_idx}\n\n" + content
tokens = tokenizer.apply_chat_template(
chat,
tokenize=True,
add_generation_prompt=True)
task = asyncio.create_task(_fetch_answer(llm, tokens, dp_queue_size, prompt_idx))
tasks.add(task)
done, tasks = await asyncio.wait(
tasks, timeout=1, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
if task.cancelled():
raise NotImplementedError
ex = task.exception()
if ex is not None:
raise ex
else:
resp = task.result()
print(resp)
if __name__ == "__main__":
llm = None
try:
llm = create_vllm_async_engine()
asyncio.run(main(llm))
finally:
shutdown_vllm_async_engine(llm)
Your current environment
torch==2.10.0
triton==3.6.0
cuda-nvcc==12.8.61
vLLM Version==0.19.0
🐛 Describe the bug
Vllm load balancer fail to distribute samples across data parallel axis. This happens after 48-72 hours of running models with long responses.
I'm running vllm using `vllm serve` and connect from python openai async client (aprx 50 async queries). If you have long running requests (>=10 min per generation) than at some point you can see that most requests stuck on single or several cards and other GPUs are staying without utilization. See screenshot for example.
In the attached image I'm showing my GPU utilization distribution for one of the models...
This behaviour can be manually patched if you know number of data shards in vllm server and manually balance requests between shards (see second code example below). However this is possible only with `AsyncLLMEngine`, as far as I know `AsyncOpenAI` don't accept `data_parallel_rank` parameter.
vllm serve ByteDance/Ouro-2.6B-Thinking \ --tensor-parallel-size 1 \ -dp 8 \ --dtype bfloat16 \ --trust-remote-code \ --port 5000 \ --max-model-len 32768🐛 Manual Load balancing using
AsyncLLMEngineclassWorkaround with manual load balancing using `AsyncLLMEngine` class can't be done in `AsyncOpenAI`
Unfortunately, as far as I know
AsyncOpenAIdoesn't acceptdata_parallel_rankin params thus you can't manually redistribute prompts between data shards.In the code below search for
## <--- Load Balancer --->anddata_parallel_rank = dp_idxvllm serve ByteDance/Ouro-2.6B-Thinking \ --tensor-parallel-size 1 \ -dp 8 \ --dtype bfloat16 \ --trust-remote-code \ --port 5000 \ --max-model-len 32768Before submitting a new issue...