Skip to content

vllm.v1.core.sched.request_queue

FCFSRequestQueue

Bases: deque[Request], RequestQueue

A first-come-first-served queue that supports deque operations.

Source code in vllm/v1/core/sched/request_queue.py
class FCFSRequestQueue(deque[Request], RequestQueue):
    """A first-come-first-served queue that supports deque operations."""

    def add_request(self, request: Request) -> None:
        """Add a request to the queue according to FCFS policy."""
        self.append(request)

    def pop_request(self) -> Request:
        """Pop a request from the queue according to FCFS policy."""
        return self.popleft()

    def peek_request(self) -> Request:
        """Peek at the next request in the queue without removing it."""
        if not self:
            raise IndexError("peek from an empty queue")
        return self[0]

    def prepend_request(self, request: Request) -> None:
        """Prepend a request to the front of the queue."""
        self.appendleft(request)

    def prepend_requests(self, requests: RequestQueue) -> None:
        """Prepend all requests from another queue to the front of this
        queue."""
        self.extendleft(reversed(requests))

    def remove_request(self, request: Request) -> None:
        """Remove a specific request from the queue."""
        self.remove(request)

    def remove_requests(self, requests: Iterable[Request]) -> None:
        """Remove multiple specific requests from the queue."""
        requests_to_remove = set(requests)
        filtered_requests = [req for req in self if req not in requests_to_remove]
        # deque does not support in-place filtering, so we need to clear
        # and extend
        self.clear()
        self.extend(filtered_requests)

    def __bool__(self) -> bool:
        """Check if queue has any requests."""
        return len(self) > 0

    def __len__(self) -> int:
        """Get number of requests in queue."""
        return super().__len__()

    def __iter__(self) -> Iterator[Request]:
        """Iterate over the queue according to FCFS policy."""
        return super().__iter__()

    def __reversed__(self) -> Iterator[Request]:
        """Iterate over the queue in reverse order."""
        return super().__reversed__()

__bool__

__bool__() -> bool

Check if queue has any requests.

Source code in vllm/v1/core/sched/request_queue.py
def __bool__(self) -> bool:
    """Check if queue has any requests."""
    return len(self) > 0

__iter__

__iter__() -> Iterator[Request]

Iterate over the queue according to FCFS policy.

Source code in vllm/v1/core/sched/request_queue.py
def __iter__(self) -> Iterator[Request]:
    """Iterate over the queue according to FCFS policy."""
    return super().__iter__()

__len__

__len__() -> int

Get number of requests in queue.

Source code in vllm/v1/core/sched/request_queue.py
def __len__(self) -> int:
    """Get number of requests in queue."""
    return super().__len__()

__reversed__

__reversed__() -> Iterator[Request]

Iterate over the queue in reverse order.

Source code in vllm/v1/core/sched/request_queue.py
def __reversed__(self) -> Iterator[Request]:
    """Iterate over the queue in reverse order."""
    return super().__reversed__()

add_request

add_request(request: Request) -> None

Add a request to the queue according to FCFS policy.

Source code in vllm/v1/core/sched/request_queue.py
def add_request(self, request: Request) -> None:
    """Add a request to the queue according to FCFS policy."""
    self.append(request)

peek_request

peek_request() -> Request

Peek at the next request in the queue without removing it.

Source code in vllm/v1/core/sched/request_queue.py
def peek_request(self) -> Request:
    """Peek at the next request in the queue without removing it."""
    if not self:
        raise IndexError("peek from an empty queue")
    return self[0]

pop_request

pop_request() -> Request

Pop a request from the queue according to FCFS policy.

Source code in vllm/v1/core/sched/request_queue.py
def pop_request(self) -> Request:
    """Pop a request from the queue according to FCFS policy."""
    return self.popleft()

prepend_request

prepend_request(request: Request) -> None

Prepend a request to the front of the queue.

Source code in vllm/v1/core/sched/request_queue.py
def prepend_request(self, request: Request) -> None:
    """Prepend a request to the front of the queue."""
    self.appendleft(request)

prepend_requests

prepend_requests(requests: RequestQueue) -> None

Prepend all requests from another queue to the front of this queue.

Source code in vllm/v1/core/sched/request_queue.py
def prepend_requests(self, requests: RequestQueue) -> None:
    """Prepend all requests from another queue to the front of this
    queue."""
    self.extendleft(reversed(requests))

remove_request

remove_request(request: Request) -> None

Remove a specific request from the queue.

Source code in vllm/v1/core/sched/request_queue.py
def remove_request(self, request: Request) -> None:
    """Remove a specific request from the queue."""
    self.remove(request)

remove_requests

remove_requests(requests: Iterable[Request]) -> None

Remove multiple specific requests from the queue.

Source code in vllm/v1/core/sched/request_queue.py
def remove_requests(self, requests: Iterable[Request]) -> None:
    """Remove multiple specific requests from the queue."""
    requests_to_remove = set(requests)
    filtered_requests = [req for req in self if req not in requests_to_remove]
    # deque does not support in-place filtering, so we need to clear
    # and extend
    self.clear()
    self.extend(filtered_requests)

PriorityRequestQueue

Bases: RequestQueue

A priority queue that supports heap operations.

Respects the ordering defined in the Request class, where requests with a smaller value of priority are processed first. If multiple requests have the same priority, the one with the earlier arrival_time is processed first.

Source code in vllm/v1/core/sched/request_queue.py
class PriorityRequestQueue(RequestQueue):
    """
    A priority queue that supports heap operations.

    Respects the ordering defined in the Request class, where
    requests with a smaller value of `priority` are processed first.
    If multiple requests have the same priority, the one with the earlier
    `arrival_time` is processed first.
    """

    def __init__(self, use_sla: bool = False) -> None:
        self._heap: list[tuple[tuple, Request]] = []
        self.use_sla = use_sla

    def add_request(self, request: Request) -> None:
        """Add a request to the queue according to priority policy."""
        heapq.heappush(self._heap, (self._make_key(request), request))

    def pop_request(self) -> Request:
        """Pop a request from the queue according to priority policy."""
        if not self._heap:
            raise IndexError("pop from empty heap")
        return heapq.heappop(self._heap)[1]

    def peek_request(self) -> Request:
        """Peek at the next request in the queue without removing it."""
        if not self._heap:
            raise IndexError("peek from empty heap")
        return self._heap[0][1]

    def prepend_request(self, request: Request) -> None:
        """Add a request to the queue according to priority policy.

        Note: In a priority queue, there is no concept of prepending to the
        front. Requests are ordered by (priority, arrival_time)."""
        self.add_request(request)

    def prepend_requests(self, requests: RequestQueue) -> None:
        """Add all requests from another queue according to priority policy.

        Note: In a priority queue, there is no concept of prepending to the
        front. Requests are ordered by (priority, arrival_time)."""
        for request in requests:
            self.add_request(request)

    def remove_request(self, request: Request) -> None:
        """Remove a specific request from the queue."""
        for idx, (_, req) in enumerate(self._heap):
            if req is request:
                self._heap.pop(idx)
                break
        heapq.heapify(self._heap)

    def remove_requests(self, requests: Iterable[Request]) -> None:
        """Remove multiple specific requests from the queue."""
        requests_to_remove = requests if isinstance(requests, set) else set(requests)
        self._heap = [item for item in self._heap if item[1] not in requests_to_remove]
        heapq.heapify(self._heap)

    def __bool__(self) -> bool:
        """Check if queue has any requests."""
        return bool(self._heap)

    def __len__(self) -> int:
        """Get number of requests in queue."""
        return len(self._heap)

    def __iter__(self) -> Iterator[Request]:
        """Iterate over the queue according to priority policy."""
        heap_copy = self._heap[:]
        while heap_copy:
            yield heapq.heappop(heap_copy)[1]

    def __reversed__(self) -> Iterator[Request]:
        """Iterate over the queue in reverse priority order."""
        return reversed(list(self))

    def _make_key(self, request: Request) -> tuple:
        if self.use_sla:
            return (
                _sla_rank(getattr(request, "sla_tier", None)),
                request.priority,
                request.arrival_time,
                request.request_id,
            )
        return (request.priority, request.arrival_time, request.request_id)

_heap instance-attribute

_heap: list[tuple[tuple, Request]] = []

use_sla instance-attribute

use_sla = use_sla

__bool__

__bool__() -> bool

Check if queue has any requests.

Source code in vllm/v1/core/sched/request_queue.py
def __bool__(self) -> bool:
    """Check if queue has any requests."""
    return bool(self._heap)

__init__

__init__(use_sla: bool = False) -> None
Source code in vllm/v1/core/sched/request_queue.py
def __init__(self, use_sla: bool = False) -> None:
    self._heap: list[tuple[tuple, Request]] = []
    self.use_sla = use_sla

__iter__

__iter__() -> Iterator[Request]

Iterate over the queue according to priority policy.

Source code in vllm/v1/core/sched/request_queue.py
def __iter__(self) -> Iterator[Request]:
    """Iterate over the queue according to priority policy."""
    heap_copy = self._heap[:]
    while heap_copy:
        yield heapq.heappop(heap_copy)[1]

__len__

__len__() -> int

Get number of requests in queue.

Source code in vllm/v1/core/sched/request_queue.py
def __len__(self) -> int:
    """Get number of requests in queue."""
    return len(self._heap)

__reversed__

__reversed__() -> Iterator[Request]

Iterate over the queue in reverse priority order.

Source code in vllm/v1/core/sched/request_queue.py
def __reversed__(self) -> Iterator[Request]:
    """Iterate over the queue in reverse priority order."""
    return reversed(list(self))

_make_key

_make_key(request: Request) -> tuple
Source code in vllm/v1/core/sched/request_queue.py
def _make_key(self, request: Request) -> tuple:
    if self.use_sla:
        return (
            _sla_rank(getattr(request, "sla_tier", None)),
            request.priority,
            request.arrival_time,
            request.request_id,
        )
    return (request.priority, request.arrival_time, request.request_id)

add_request

add_request(request: Request) -> None

Add a request to the queue according to priority policy.

Source code in vllm/v1/core/sched/request_queue.py
def add_request(self, request: Request) -> None:
    """Add a request to the queue according to priority policy."""
    heapq.heappush(self._heap, (self._make_key(request), request))

peek_request

peek_request() -> Request

Peek at the next request in the queue without removing it.

Source code in vllm/v1/core/sched/request_queue.py
def peek_request(self) -> Request:
    """Peek at the next request in the queue without removing it."""
    if not self._heap:
        raise IndexError("peek from empty heap")
    return self._heap[0][1]

pop_request

pop_request() -> Request

Pop a request from the queue according to priority policy.

Source code in vllm/v1/core/sched/request_queue.py
def pop_request(self) -> Request:
    """Pop a request from the queue according to priority policy."""
    if not self._heap:
        raise IndexError("pop from empty heap")
    return heapq.heappop(self._heap)[1]

prepend_request

prepend_request(request: Request) -> None

Add a request to the queue according to priority policy.

Note: In a priority queue, there is no concept of prepending to the front. Requests are ordered by (priority, arrival_time).

Source code in vllm/v1/core/sched/request_queue.py
def prepend_request(self, request: Request) -> None:
    """Add a request to the queue according to priority policy.

    Note: In a priority queue, there is no concept of prepending to the
    front. Requests are ordered by (priority, arrival_time)."""
    self.add_request(request)

prepend_requests

prepend_requests(requests: RequestQueue) -> None

Add all requests from another queue according to priority policy.

Note: In a priority queue, there is no concept of prepending to the front. Requests are ordered by (priority, arrival_time).

Source code in vllm/v1/core/sched/request_queue.py
def prepend_requests(self, requests: RequestQueue) -> None:
    """Add all requests from another queue according to priority policy.

    Note: In a priority queue, there is no concept of prepending to the
    front. Requests are ordered by (priority, arrival_time)."""
    for request in requests:
        self.add_request(request)

remove_request

remove_request(request: Request) -> None

Remove a specific request from the queue.

Source code in vllm/v1/core/sched/request_queue.py
def remove_request(self, request: Request) -> None:
    """Remove a specific request from the queue."""
    for idx, (_, req) in enumerate(self._heap):
        if req is request:
            self._heap.pop(idx)
            break
    heapq.heapify(self._heap)

remove_requests

remove_requests(requests: Iterable[Request]) -> None

Remove multiple specific requests from the queue.

Source code in vllm/v1/core/sched/request_queue.py
def remove_requests(self, requests: Iterable[Request]) -> None:
    """Remove multiple specific requests from the queue."""
    requests_to_remove = requests if isinstance(requests, set) else set(requests)
    self._heap = [item for item in self._heap if item[1] not in requests_to_remove]
    heapq.heapify(self._heap)

RequestQueue

Bases: ABC

Abstract base class for request queues.

Source code in vllm/v1/core/sched/request_queue.py
class RequestQueue(ABC):
    """Abstract base class for request queues."""

    @abstractmethod
    def add_request(self, request: Request) -> None:
        """Add a request to the queue according to the policy."""
        pass

    @abstractmethod
    def pop_request(self) -> Request:
        """Pop a request from the queue according to the policy."""
        pass

    @abstractmethod
    def peek_request(self) -> Request:
        """Peek at the request at the front of the queue without removing it."""
        pass

    @abstractmethod
    def prepend_request(self, request: Request) -> None:
        """Prepend a request to the front of the queue."""
        pass

    @abstractmethod
    def prepend_requests(self, requests: "RequestQueue") -> None:
        """Prepend all requests from another queue to the front of this
        queue."""
        pass

    @abstractmethod
    def remove_request(self, request: Request) -> None:
        """Remove a specific request from the queue."""
        pass

    @abstractmethod
    def remove_requests(self, requests: Iterable[Request]) -> None:
        """Remove multiple specific requests from the queue."""
        pass

    @abstractmethod
    def __bool__(self) -> bool:
        """Check if queue has any requests."""
        pass

    @abstractmethod
    def __len__(self) -> int:
        """Get number of requests in queue."""
        pass

    @abstractmethod
    def __iter__(self) -> Iterator[Request]:
        """Iterate over the queue according to the policy."""
        pass

    @abstractmethod
    def __reversed__(self) -> Iterator[Request]:
        """Iterate over the queue in reverse order."""
        pass

__bool__ abstractmethod

__bool__() -> bool

Check if queue has any requests.

Source code in vllm/v1/core/sched/request_queue.py
@abstractmethod
def __bool__(self) -> bool:
    """Check if queue has any requests."""
    pass

__iter__ abstractmethod

__iter__() -> Iterator[Request]

Iterate over the queue according to the policy.

Source code in vllm/v1/core/sched/request_queue.py
@abstractmethod
def __iter__(self) -> Iterator[Request]:
    """Iterate over the queue according to the policy."""
    pass

__len__ abstractmethod

__len__() -> int

Get number of requests in queue.

Source code in vllm/v1/core/sched/request_queue.py
@abstractmethod
def __len__(self) -> int:
    """Get number of requests in queue."""
    pass

__reversed__ abstractmethod

__reversed__() -> Iterator[Request]

Iterate over the queue in reverse order.

Source code in vllm/v1/core/sched/request_queue.py
@abstractmethod
def __reversed__(self) -> Iterator[Request]:
    """Iterate over the queue in reverse order."""
    pass

add_request abstractmethod

add_request(request: Request) -> None

Add a request to the queue according to the policy.

Source code in vllm/v1/core/sched/request_queue.py
@abstractmethod
def add_request(self, request: Request) -> None:
    """Add a request to the queue according to the policy."""
    pass

peek_request abstractmethod

peek_request() -> Request

Peek at the request at the front of the queue without removing it.

Source code in vllm/v1/core/sched/request_queue.py
@abstractmethod
def peek_request(self) -> Request:
    """Peek at the request at the front of the queue without removing it."""
    pass

pop_request abstractmethod

pop_request() -> Request

Pop a request from the queue according to the policy.

Source code in vllm/v1/core/sched/request_queue.py
@abstractmethod
def pop_request(self) -> Request:
    """Pop a request from the queue according to the policy."""
    pass

prepend_request abstractmethod

prepend_request(request: Request) -> None

Prepend a request to the front of the queue.

Source code in vllm/v1/core/sched/request_queue.py
@abstractmethod
def prepend_request(self, request: Request) -> None:
    """Prepend a request to the front of the queue."""
    pass

prepend_requests abstractmethod

prepend_requests(requests: RequestQueue) -> None

Prepend all requests from another queue to the front of this queue.

Source code in vllm/v1/core/sched/request_queue.py
@abstractmethod
def prepend_requests(self, requests: "RequestQueue") -> None:
    """Prepend all requests from another queue to the front of this
    queue."""
    pass

remove_request abstractmethod

remove_request(request: Request) -> None

Remove a specific request from the queue.

Source code in vllm/v1/core/sched/request_queue.py
@abstractmethod
def remove_request(self, request: Request) -> None:
    """Remove a specific request from the queue."""
    pass

remove_requests abstractmethod

remove_requests(requests: Iterable[Request]) -> None

Remove multiple specific requests from the queue.

Source code in vllm/v1/core/sched/request_queue.py
@abstractmethod
def remove_requests(self, requests: Iterable[Request]) -> None:
    """Remove multiple specific requests from the queue."""
    pass

SchedulingPolicy

Bases: Enum

Enum for scheduling policies.

Source code in vllm/v1/core/sched/request_queue.py
class SchedulingPolicy(Enum):
    """Enum for scheduling policies."""

    FCFS = "fcfs"
    PRIORITY = "priority"

FCFS class-attribute instance-attribute

FCFS = 'fcfs'

PRIORITY class-attribute instance-attribute

PRIORITY = 'priority'

_sla_rank

_sla_rank(tier: str | None) -> int
Source code in vllm/v1/core/sched/request_queue.py
def _sla_rank(tier: str | None) -> int:
    # Lower rank = higher priority for scheduling.
    if tier == "interactive":
        return 0
    if tier == "batch":
        return 1
    if tier == "background":
        return 2
    return 1

create_request_queue

create_request_queue(
    policy: SchedulingPolicy, *, use_sla: bool = False
) -> RequestQueue

Create request queue based on scheduling policy.

Source code in vllm/v1/core/sched/request_queue.py
def create_request_queue(
    policy: SchedulingPolicy, *, use_sla: bool = False
) -> RequestQueue:
    """Create request queue based on scheduling policy."""
    if policy == SchedulingPolicy.PRIORITY:
        return PriorityRequestQueue(use_sla=use_sla)
    elif policy == SchedulingPolicy.FCFS:
        return FCFSRequestQueue()
    else:
        raise ValueError(f"Unknown scheduling policy: {policy}")