Last active
February 10, 2026 18:40
-
-
Save yushangdi/bdb752c8274fe02698edc247bcda9f87 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # 🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨 | |
| # This file was automatically generated from src/transformers/models/qwen3_vl/modular_qwen3_vl.py. | |
| # Do NOT edit this file manually as any edits will be overwritten by the generation of | |
| # the file from the modular. If any change should be done, please apply the change to the | |
| # modular_qwen3_vl.py file directly. One of our CI enforces this. | |
| # 🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨 | |
| # coding=utf-8 | |
| # Copyright 2025 The Qwen Team and The HuggingFace Inc. team. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| from dataclasses import dataclass | |
| from typing import Any, Callable, Optional, Union | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from ...activations import ACT2FN | |
| from ...cache_utils import Cache, DynamicCache | |
| from ...generation import GenerationMixin | |
| from ...integrations import use_kernel_forward_from_hub | |
| from ...masking_utils import create_causal_mask | |
| from ...modeling_flash_attention_utils import FlashAttentionKwargs | |
| from ...modeling_layers import GradientCheckpointingLayer | |
| from ...modeling_outputs import BaseModelOutputWithPast, ModelOutput | |
| from ...modeling_rope_utils import ROPE_INIT_FUNCTIONS, dynamic_rope_update | |
| from ...modeling_utils import ALL_ATTENTION_FUNCTIONS, PreTrainedModel | |
| from ...processing_utils import Unpack | |
| from ...utils import TransformersKwargs, auto_docstring, is_torchdynamo_compiling | |
| from ...utils.deprecation import deprecate_kwarg | |
| from ...utils.generic import check_model_inputs | |
| from .configuration_qwen3_vl import Qwen3VLConfig, Qwen3VLTextConfig, Qwen3VLVisionConfig | |
| from torch.fx.experimental.symbolic_shapes import guard_size_oblivious | |
| class Qwen3VLVisionMLP(nn.Module): | |
| def __init__(self, config): | |
| super().__init__() | |
| self.hidden_size = config.hidden_size | |
| self.intermediate_size = config.intermediate_size | |
| self.linear_fc1 = nn.Linear(self.hidden_size, self.intermediate_size, bias=True) | |
| self.linear_fc2 = nn.Linear(self.intermediate_size, self.hidden_size, bias=True) | |
| self.act_fn = ACT2FN[config.hidden_act] | |
| def forward(self, hidden_state): | |
| return self.linear_fc2(self.act_fn(self.linear_fc1(hidden_state))) | |
| class Qwen3VLVisionPatchEmbed(nn.Module): | |
| def __init__(self, config) -> None: | |
| super().__init__() | |
| self.patch_size = config.patch_size | |
| self.temporal_patch_size = config.temporal_patch_size | |
| self.in_channels = config.in_channels | |
| self.embed_dim = config.hidden_size | |
| kernel_size = [self.temporal_patch_size, self.patch_size, self.patch_size] | |
| self.proj = nn.Conv3d(self.in_channels, self.embed_dim, kernel_size=kernel_size, stride=kernel_size, bias=True) | |
| def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: | |
| target_dtype = self.proj.weight.dtype | |
| hidden_states = hidden_states.view( | |
| -1, self.in_channels, self.temporal_patch_size, self.patch_size, self.patch_size | |
| ) | |
| hidden_states = self.proj(hidden_states.to(dtype=target_dtype)).view(-1, self.embed_dim) | |
| return hidden_states | |
| class Qwen3VLVisionRotaryEmbedding(nn.Module): | |
| inv_freq: torch.Tensor # fix linting for `register_buffer` | |
| def __init__(self, dim: int, theta: float = 10000.0) -> None: | |
| super().__init__() | |
| inv_freq = 1.0 / (theta ** (torch.arange(0, dim, 2, dtype=torch.float) / dim)) | |
| self.register_buffer("inv_freq", inv_freq, persistent=False) | |
| def forward(self, seqlen: int) -> torch.Tensor: | |
| seq = torch.arange(seqlen, device=self.inv_freq.device, dtype=self.inv_freq.dtype) | |
| freqs = torch.outer(seq, self.inv_freq) | |
| return freqs | |
| class Qwen3VLVisionPatchMerger(nn.Module): | |
| def __init__(self, config: Qwen3VLVisionConfig, use_postshuffle_norm=False) -> None: | |
| super().__init__() | |
| self.hidden_size = config.hidden_size * (config.spatial_merge_size**2) | |
| self.use_postshuffle_norm = use_postshuffle_norm | |
| self.norm = nn.LayerNorm(self.hidden_size if use_postshuffle_norm else config.hidden_size, eps=1e-6) | |
| self.linear_fc1 = nn.Linear(self.hidden_size, self.hidden_size) | |
| self.act_fn = nn.GELU() | |
| self.linear_fc2 = nn.Linear(self.hidden_size, config.out_hidden_size) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| x = self.norm(x.view(-1, self.hidden_size) if self.use_postshuffle_norm else x).view(-1, self.hidden_size) | |
| x = self.linear_fc2(self.act_fn(self.linear_fc1(x))) | |
| return x | |
| def rotate_half(x): | |
| """Rotates half the hidden dims of the input.""" | |
| x1 = x[..., : x.shape[-1] // 2] | |
| x2 = x[..., x.shape[-1] // 2 :] | |
| return torch.cat((-x2, x1), dim=-1) | |
| def apply_rotary_pos_emb_vision( | |
| q: torch.Tensor, k: torch.Tensor, cos: torch.Tensor, sin: torch.Tensor | |
| ) -> tuple[torch.Tensor, torch.Tensor]: | |
| orig_q_dtype = q.dtype | |
| orig_k_dtype = k.dtype | |
| q, k = q.float(), k.float() | |
| cos, sin = cos.unsqueeze(-2).float(), sin.unsqueeze(-2).float() | |
| q_embed = (q * cos) + (rotate_half(q) * sin) | |
| k_embed = (k * cos) + (rotate_half(k) * sin) | |
| q_embed = q_embed.to(orig_q_dtype) | |
| k_embed = k_embed.to(orig_k_dtype) | |
| return q_embed, k_embed | |
| def repeat_kv(hidden_states: torch.Tensor, n_rep: int) -> torch.Tensor: | |
| """ | |
| This is the equivalent of torch.repeat_interleave(x, dim=1, repeats=n_rep). The hidden states go from (batch, | |
| num_key_value_heads, seqlen, head_dim) to (batch, num_attention_heads, seqlen, head_dim) | |
| """ | |
| batch, num_key_value_heads, slen, head_dim = hidden_states.shape | |
| if n_rep == 1: | |
| return hidden_states | |
| hidden_states = hidden_states[:, :, None, :, :].expand(batch, num_key_value_heads, n_rep, slen, head_dim) | |
| return hidden_states.reshape(batch, num_key_value_heads * n_rep, slen, head_dim) | |
| def eager_attention_forward( | |
| module: nn.Module, | |
| query: torch.Tensor, | |
| key: torch.Tensor, | |
| value: torch.Tensor, | |
| attention_mask: Optional[torch.Tensor], | |
| scaling: float, | |
| dropout: float = 0.0, | |
| **kwargs: Unpack[TransformersKwargs], | |
| ): | |
| key_states = repeat_kv(key, module.num_key_value_groups) | |
| value_states = repeat_kv(value, module.num_key_value_groups) | |
| attn_weights = torch.matmul(query, key_states.transpose(2, 3)) * scaling | |
| if attention_mask is not None: | |
| causal_mask = attention_mask[:, :, :, : key_states.shape[-2]] | |
| attn_weights = attn_weights + causal_mask | |
| attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query.dtype) | |
| attn_weights = nn.functional.dropout(attn_weights, p=dropout, training=module.training) | |
| attn_output = torch.matmul(attn_weights, value_states) | |
| attn_output = attn_output.transpose(1, 2).contiguous() | |
| return attn_output, attn_weights | |
| class Qwen3VLVisionAttention(nn.Module): | |
| def __init__(self, config: Qwen3VLVisionConfig) -> None: | |
| super().__init__() | |
| self.dim = config.hidden_size | |
| self.num_heads = config.num_heads | |
| self.head_dim = self.dim // self.num_heads | |
| self.num_key_value_groups = 1 # needed for eager attention | |
| self.qkv = nn.Linear(self.dim, self.dim * 3, bias=True) | |
| self.proj = nn.Linear(self.dim, self.dim) | |
| self.scaling = self.head_dim**-0.5 | |
| self.config = config | |
| self.attention_dropout = 0.0 | |
| self.is_causal = False | |
| def forward( | |
| self, | |
| hidden_states: torch.Tensor, | |
| cu_seqlens: torch.Tensor, | |
| rotary_pos_emb: Optional[torch.Tensor] = None, | |
| position_embeddings: Optional[tuple[torch.Tensor, torch.Tensor]] = None, | |
| **kwargs, | |
| ) -> torch.Tensor: | |
| seq_length = hidden_states.shape[0] | |
| query_states, key_states, value_states = ( | |
| self.qkv(hidden_states).reshape(seq_length, 3, self.num_heads, -1).permute(1, 0, 2, 3).unbind(0) | |
| ) | |
| cos, sin = position_embeddings | |
| query_states, key_states = apply_rotary_pos_emb_vision(query_states, key_states, cos, sin) | |
| query_states = query_states.transpose(0, 1).unsqueeze(0) | |
| key_states = key_states.transpose(0, 1).unsqueeze(0) | |
| value_states = value_states.transpose(0, 1).unsqueeze(0) | |
| attention_interface: Callable = eager_attention_forward | |
| if self.config._attn_implementation != "eager": | |
| attention_interface = ALL_ATTENTION_FUNCTIONS[self.config._attn_implementation] | |
| if self.config._attn_implementation == "flash_attention_2": | |
| # Flash Attention 2: Use cu_seqlens for variable length attention | |
| max_seqlen = (cu_seqlens[1:] - cu_seqlens[:-1]).max() | |
| attn_output, _ = attention_interface( | |
| self, | |
| query_states, | |
| key_states, | |
| value_states, | |
| attention_mask=None, | |
| scaling=self.scaling, | |
| dropout=0.0 if not self.training else self.attention_dropout, | |
| cu_seq_lens_q=cu_seqlens, | |
| cu_seq_lens_k=cu_seqlens, | |
| max_length_q=max_seqlen, | |
| max_length_k=max_seqlen, | |
| is_causal=False, | |
| **kwargs, | |
| ) | |
| else: | |
| lengths = cu_seqlens[1:] - cu_seqlens[:-1] | |
| B, H, T, D = query_states.shape | |
| device = query_states.device | |
| lengths = cu_seqlens[1:] - cu_seqlens[:-1] | |
| token_idx = torch.arange(T, device=device) | |
| key_padding_mask = token_idx.unsqueeze(0) >= lengths.unsqueeze(1) | |
| query_states = query_states.transpose(1, 2) | |
| key_states = key_states.transpose(1, 2) | |
| value_states = value_states.transpose(1, 2) | |
| attn_output, _ = attention_interface( | |
| self, | |
| query_states, | |
| key_states, | |
| value_states, | |
| attention_mask=None, | |
| key_padding_mask=key_padding_mask, | |
| scaling=self.scaling, | |
| dropout=0.0 if not self.training else self.attention_dropout, | |
| is_causal=False, | |
| **kwargs, | |
| ) | |
| attn_output = attn_output.transpose(1, 2).contiguous() | |
| attn_output = attn_output.reshape(B * T, H * D) | |
| attn_output = attn_output.reshape(seq_length, -1).contiguous() | |
| attn_output = self.proj(attn_output) | |
| return attn_output | |
| class Qwen3VLVisionBlock(GradientCheckpointingLayer): | |
| def __init__(self, config, attn_implementation: str = "sdpa") -> None: | |
| super().__init__() | |
| self.norm1 = nn.LayerNorm(config.hidden_size, eps=1e-6) | |
| self.norm2 = nn.LayerNorm(config.hidden_size, eps=1e-6) | |
| self.attn = Qwen3VLVisionAttention(config=config) | |
| self.mlp = Qwen3VLVisionMLP(config=config) | |
| def forward( | |
| self, | |
| hidden_states: torch.Tensor, | |
| cu_seqlens: torch.Tensor, | |
| rotary_pos_emb: Optional[torch.Tensor] = None, | |
| position_embeddings: Optional[tuple[torch.Tensor, torch.Tensor]] = None, | |
| **kwargs, | |
| ) -> torch.Tensor: | |
| hidden_states = hidden_states + self.attn( | |
| self.norm1(hidden_states), | |
| cu_seqlens=cu_seqlens, | |
| rotary_pos_emb=rotary_pos_emb, | |
| position_embeddings=position_embeddings, | |
| **kwargs, | |
| ) | |
| hidden_states = hidden_states + self.mlp(self.norm2(hidden_states)) | |
| return hidden_states | |
| class Qwen3VLTextRotaryEmbedding(nn.Module): | |
| inv_freq: torch.Tensor # fix linting for `register_buffer` | |
| def __init__(self, config: Qwen3VLTextConfig, device=None): | |
| super().__init__() | |
| if hasattr(config, "rope_scaling") and config.rope_scaling is not None: | |
| self.rope_type = config.rope_scaling.get("rope_type", "default") | |
| else: | |
| self.rope_type = "default" | |
| self.max_seq_len_cached = config.max_position_embeddings | |
| self.original_max_seq_len = config.max_position_embeddings | |
| self.config = config | |
| self.rope_init_fn = ROPE_INIT_FUNCTIONS[self.rope_type] | |
| inv_freq, self.attention_scaling = self.rope_init_fn(self.config, device) | |
| self.register_buffer("inv_freq", inv_freq, persistent=False) | |
| self.original_inv_freq = self.inv_freq | |
| self.mrope_section = config.rope_scaling.get("mrope_section", [24, 20, 20]) | |
| def apply_interleaved_mrope(self, freqs, mrope_section): | |
| """Apply interleaved MRoPE to 3D rotary embeddings. | |
| Reorganizes frequency layout from chunked [TTT...HHH...WWW] to | |
| interleaved [THTHWHTHW...TT], preserving frequency continuity. | |
| args: | |
| x: (3, bs, seq_len, head_dim // 2) | |
| mrope_section: (3,) | |
| returns: | |
| x_t: (bs, seq_len, head_dim // 2) | |
| """ | |
| freqs_t = freqs[0] # just overwrite the first dimension T | |
| for dim, offset in enumerate((1, 2), start=1): # H, W | |
| length = mrope_section[dim] * 3 | |
| idx = slice(offset, length, 3) | |
| freqs_t[..., idx] = freqs[dim, ..., idx] | |
| return freqs_t | |
| @torch.no_grad() | |
| @dynamic_rope_update # power user: used with advanced RoPE types (e.g. dynamic rope) | |
| def forward(self, x, position_ids): | |
| # In contrast to other models, Qwen3VL has different position ids for the grids | |
| # So we expand the inv_freq to shape (3, ...) | |
| if position_ids.ndim == 2: | |
| position_ids = position_ids[None, ...].expand(3, position_ids.shape[0], -1) | |
| inv_freq_expanded = self.inv_freq[None, None, :, None].float().expand(3, position_ids.shape[1], -1, 1) | |
| position_ids_expanded = position_ids[:, :, None, :].float() # shape (3, bs, 1, positions) | |
| device_type = x.device.type if isinstance(x.device.type, str) and x.device.type != "mps" else "cpu" | |
| with torch.autocast(device_type=device_type, enabled=False): # Force float32 | |
| freqs = (inv_freq_expanded.float() @ position_ids_expanded.float()).transpose(2, 3) | |
| freqs = self.apply_interleaved_mrope(freqs, self.mrope_section) | |
| emb = torch.cat((freqs, freqs), dim=-1) | |
| cos = emb.cos() * self.attention_scaling | |
| sin = emb.sin() * self.attention_scaling | |
| return cos.to(dtype=x.dtype), sin.to(dtype=x.dtype) | |
| @use_kernel_forward_from_hub("RMSNorm") | |
| class Qwen3VLTextRMSNorm(nn.Module): | |
| def __init__(self, hidden_size, eps: float = 1e-6) -> None: | |
| """ | |
| Qwen3VLTextRMSNorm is equivalent to T5LayerNorm | |
| """ | |
| super().__init__() | |
| self.weight = nn.Parameter(torch.ones(hidden_size)) | |
| self.variance_epsilon = eps | |
| def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: | |
| input_dtype = hidden_states.dtype | |
| hidden_states = hidden_states.to(torch.float32) | |
| variance = hidden_states.pow(2).mean(-1, keepdim=True) | |
| hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon) | |
| return self.weight * hidden_states.to(input_dtype) | |
| def extra_repr(self): | |
| return f"{tuple(self.weight.shape)}, eps={self.variance_epsilon}" | |
| def apply_rotary_pos_emb(q, k, cos, sin, position_ids=None, unsqueeze_dim=1): | |
| """Applies Rotary Position Embedding to the query and key tensors. | |
| Args: | |
| q (`torch.Tensor`): The query tensor. | |
| k (`torch.Tensor`): The key tensor. | |
| cos (`torch.Tensor`): The cosine part of the rotary embedding. | |
| sin (`torch.Tensor`): The sine part of the rotary embedding. | |
| position_ids (`torch.Tensor`, *optional*): | |
| Deprecated and unused. | |
| unsqueeze_dim (`int`, *optional*, defaults to 1): | |
| The 'unsqueeze_dim' argument specifies the dimension along which to unsqueeze cos[position_ids] and | |
| sin[position_ids] so that they can be properly broadcasted to the dimensions of q and k. For example, note | |
| that cos[position_ids] and sin[position_ids] have the shape [batch_size, seq_len, head_dim]. Then, if q and | |
| k have the shape [batch_size, heads, seq_len, head_dim], then setting unsqueeze_dim=1 makes | |
| cos[position_ids] and sin[position_ids] broadcastable to the shapes of q and k. Similarly, if q and k have | |
| the shape [batch_size, seq_len, heads, head_dim], then set unsqueeze_dim=2. | |
| Returns: | |
| `tuple(torch.Tensor)` comprising of the query and key tensors rotated using the Rotary Position Embedding. | |
| """ | |
| cos = cos.unsqueeze(unsqueeze_dim) | |
| sin = sin.unsqueeze(unsqueeze_dim) | |
| q_embed = (q * cos) + (rotate_half(q) * sin) | |
| k_embed = (k * cos) + (rotate_half(k) * sin) | |
| return q_embed, k_embed | |
| class Qwen3VLTextAttention(nn.Module): | |
| """Multi-headed attention from 'Attention Is All You Need' paper""" | |
| def __init__(self, config: Qwen3VLTextConfig, layer_idx: int): | |
| super().__init__() | |
| self.config = config | |
| self.layer_idx = layer_idx | |
| self.head_dim = getattr(config, "head_dim", config.hidden_size // config.num_attention_heads) | |
| self.num_key_value_groups = config.num_attention_heads // config.num_key_value_heads | |
| self.scaling = self.head_dim**-0.5 | |
| self.attention_dropout = config.attention_dropout | |
| self.is_causal = True | |
| self.q_proj = nn.Linear( | |
| config.hidden_size, config.num_attention_heads * self.head_dim, bias=config.attention_bias | |
| ) | |
| self.k_proj = nn.Linear( | |
| config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.attention_bias | |
| ) | |
| self.v_proj = nn.Linear( | |
| config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.attention_bias | |
| ) | |
| self.o_proj = nn.Linear( | |
| config.num_attention_heads * self.head_dim, config.hidden_size, bias=config.attention_bias | |
| ) | |
| self.q_norm = Qwen3VLTextRMSNorm(self.head_dim, eps=config.rms_norm_eps) # unlike olmo, only on the head dim! | |
| self.k_norm = Qwen3VLTextRMSNorm( | |
| self.head_dim, eps=config.rms_norm_eps | |
| ) # thus post q_norm does not need reshape | |
| @deprecate_kwarg("past_key_value", new_name="past_key_values", version="4.58") | |
| def forward( | |
| self, | |
| hidden_states: torch.Tensor, | |
| position_embeddings: tuple[torch.Tensor, torch.Tensor], | |
| attention_mask: Optional[torch.Tensor], | |
| past_key_values: Optional[Cache] = None, | |
| cache_position: Optional[torch.LongTensor] = None, | |
| **kwargs: Unpack[FlashAttentionKwargs], | |
| ) -> tuple[torch.Tensor, Optional[torch.Tensor]]: | |
| input_shape = hidden_states.shape[:-1] | |
| hidden_shape = (*input_shape, -1, self.head_dim) | |
| query_states = self.q_norm(self.q_proj(hidden_states).view(hidden_shape)).transpose(1, 2) | |
| key_states = self.k_norm(self.k_proj(hidden_states).view(hidden_shape)).transpose(1, 2) | |
| value_states = self.v_proj(hidden_states).view(hidden_shape).transpose(1, 2) | |
| cos, sin = position_embeddings | |
| query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin) | |
| if past_key_values is not None: | |
| # sin and cos are specific to RoPE models; cache_position needed for the static cache | |
| cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position} | |
| key_states, value_states = past_key_values.update(key_states, value_states, self.layer_idx, cache_kwargs) | |
| attention_interface: Callable = eager_attention_forward | |
| if self.config._attn_implementation != "eager": | |
| attention_interface = ALL_ATTENTION_FUNCTIONS[self.config._attn_implementation] | |
| attn_output, attn_weights = attention_interface( | |
| self, | |
| query_states, | |
| key_states, | |
| value_states, | |
| attention_mask, | |
| dropout=0.0 if not self.training else self.attention_dropout, | |
| scaling=self.scaling, | |
| **kwargs, | |
| ) | |
| attn_output = attn_output.reshape(*input_shape, -1).contiguous() | |
| attn_output = self.o_proj(attn_output) | |
| return attn_output, attn_weights | |
| class Qwen3VLTextMLP(nn.Module): | |
| def __init__(self, config): | |
| super().__init__() | |
| self.config = config | |
| self.hidden_size = config.hidden_size | |
| self.intermediate_size = config.intermediate_size | |
| self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) | |
| self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) | |
| self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False) | |
| self.act_fn = ACT2FN[config.hidden_act] | |
| def forward(self, x): | |
| down_proj = self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x)) | |
| return down_proj | |
| class Qwen3VLTextDecoderLayer(GradientCheckpointingLayer): | |
| def __init__(self, config: Qwen3VLTextConfig, layer_idx: int): | |
| super().__init__() | |
| self.hidden_size = config.hidden_size | |
| self.self_attn = Qwen3VLTextAttention(config=config, layer_idx=layer_idx) | |
| self.mlp = Qwen3VLTextMLP(config) | |
| self.input_layernorm = Qwen3VLTextRMSNorm(config.hidden_size, eps=config.rms_norm_eps) | |
| self.post_attention_layernorm = Qwen3VLTextRMSNorm(config.hidden_size, eps=config.rms_norm_eps) | |
| @deprecate_kwarg("past_key_value", new_name="past_key_values", version="4.58") | |
| def forward( | |
| self, | |
| hidden_states: torch.Tensor, | |
| position_embeddings: tuple[torch.Tensor, torch.Tensor], | |
| attention_mask: Optional[torch.Tensor] = None, | |
| position_ids: Optional[torch.LongTensor] = None, | |
| past_key_values: Optional[Cache] = None, | |
| use_cache: Optional[bool] = False, | |
| cache_position: Optional[torch.LongTensor] = None, | |
| **kwargs: Unpack[TransformersKwargs], | |
| ) -> torch.Tensor: | |
| residual = hidden_states | |
| hidden_states = self.input_layernorm(hidden_states) | |
| # Self Attention | |
| hidden_states, _ = self.self_attn( | |
| hidden_states=hidden_states, | |
| attention_mask=attention_mask, | |
| position_ids=position_ids, | |
| past_key_values=past_key_values, | |
| use_cache=use_cache, | |
| cache_position=cache_position, | |
| position_embeddings=position_embeddings, | |
| **kwargs, | |
| ) | |
| hidden_states = residual + hidden_states | |
| # Fully Connected | |
| residual = hidden_states | |
| hidden_states = self.post_attention_layernorm(hidden_states) | |
| hidden_states = self.mlp(hidden_states) | |
| hidden_states = residual + hidden_states | |
| return hidden_states | |
| @dataclass | |
| @auto_docstring( | |
| custom_intro=""" | |
| Base class for Llava outputs, with hidden states and attentions. | |
| """ | |
| ) | |
| class Qwen3VLModelOutputWithPast(ModelOutput): | |
| r""" | |
| past_key_values (`Cache`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`): | |
| It is a [`~cache_utils.Cache`] instance. For more details, see our [kv cache guide](https://huggingface.co/docs/transformers/en/kv_cache). | |
| Contains pre-computed hidden-states (key and values in the self-attention blocks) that can be used (see | |
| `past_key_values` input) to speed up sequential decoding. | |
| rope_deltas (`torch.LongTensor` of shape `(batch_size, )`, *optional*): | |
| The rope index difference between sequence length and multimodal rope. | |
| """ | |
| last_hidden_state: Optional[torch.FloatTensor] = None | |
| past_key_values: Optional[Cache] = None | |
| hidden_states: Optional[tuple[torch.FloatTensor]] = None | |
| attentions: Optional[tuple[torch.FloatTensor]] = None | |
| rope_deltas: Optional[torch.LongTensor] = None | |
| @auto_docstring | |
| class Qwen3VLPreTrainedModel(PreTrainedModel): | |
| config: Qwen3VLConfig | |
| base_model_prefix = "model" | |
| supports_gradient_checkpointing = True | |
| _no_split_modules = ["Qwen3VLTextDecoderLayer", "Qwen3VLVisionBlock"] | |
| _skip_keys_device_placement = "past_key_values" | |
| _supports_flash_attn = True | |
| _supports_sdpa = True | |
| _can_compile_fullgraph = True | |
| _supports_attention_backend = True | |
| _can_record_outputs = { | |
| "hidden_states": Qwen3VLTextDecoderLayer, | |
| "attentions": Qwen3VLTextAttention, | |
| } | |
| class Qwen3VLVisionModel(Qwen3VLPreTrainedModel): | |
| config: Qwen3VLVisionConfig | |
| _no_split_modules = ["Qwen3VLVisionBlock"] | |
| def __init__(self, config, *inputs, **kwargs) -> None: | |
| super().__init__(config, *inputs, **kwargs) | |
| self.spatial_merge_size = config.spatial_merge_size | |
| self.patch_size = config.patch_size | |
| self.spatial_merge_unit = self.spatial_merge_size * self.spatial_merge_size | |
| self.patch_embed = Qwen3VLVisionPatchEmbed( | |
| config=config, | |
| ) | |
| self.pos_embed = nn.Embedding(config.num_position_embeddings, config.hidden_size) | |
| self.num_grid_per_side = int(config.num_position_embeddings**0.5) | |
| head_dim = config.hidden_size // config.num_heads | |
| self.rotary_pos_emb = Qwen3VLVisionRotaryEmbedding(head_dim // 2) | |
| self.blocks = nn.ModuleList([Qwen3VLVisionBlock(config) for _ in range(config.depth)]) | |
| self.merger = Qwen3VLVisionPatchMerger( | |
| config=config, | |
| use_postshuffle_norm=False, | |
| ) | |
| self.deepstack_visual_indexes = config.deepstack_visual_indexes | |
| self.deepstack_merger_list = nn.ModuleList( | |
| [ | |
| Qwen3VLVisionPatchMerger( | |
| config=config, | |
| use_postshuffle_norm=True, | |
| ) | |
| for _ in range(len(config.deepstack_visual_indexes)) | |
| ] | |
| ) | |
| self.gradient_checkpointing = False | |
| def rot_pos_emb(self, grid_thw: torch.Tensor) -> torch.Tensor: | |
| merge_size = self.spatial_merge_size | |
| #### | |
| max_h = grid_thw[:, 1:].amax() | |
| freq_table = self.rotary_pos_emb(max_h) # (max_hw, dim // 2) | |
| device = freq_table.device | |
| #### | |
| total_tokens = torch.prod(grid_thw, dim=1).sum() | |
| pos_ids = torch.empty((total_tokens, 2), dtype=torch.long, device=device) | |
| offset = 0 | |
| pos_ids_chunks = [] | |
| for num_frames, height, width in grid_thw: | |
| merged_h, merged_w = height // merge_size, width // merge_size | |
| block_rows = torch.arange(merged_h, device=device) # block row indices | |
| block_cols = torch.arange(merged_w, device=device) # block col indices | |
| intra_row = torch.arange(merge_size, device=device) # intra-block row offsets | |
| intra_col = torch.arange(merge_size, device=device) # intra-block col offsets | |
| # Compute full-resolution positions | |
| row_idx = block_rows[:, None, None, None] * merge_size + intra_row[None, None, :, None] | |
| col_idx = block_cols[None, :, None, None] * merge_size + intra_col[None, None, None, :] | |
| row_idx = row_idx.expand(merged_h, merged_w, merge_size, merge_size).reshape(-1) | |
| col_idx = col_idx.expand(merged_h, merged_w, merge_size, merge_size).reshape(-1) | |
| coords = torch.stack((row_idx, col_idx), dim=-1) | |
| #### | |
| coords = coords.repeat(num_frames, 1) | |
| num_tokens = coords.shape[0] | |
| # pos_ids[offset : offset + num_tokens] = coords | |
| pos_ids_chunks.append(coords) | |
| offset += num_tokens | |
| pos_ids = torch.cat(pos_ids_chunks, dim=0) | |
| embeddings = freq_table[pos_ids] # lookup rotary embeddings | |
| embeddings = embeddings.flatten(1) | |
| return embeddings | |
| def fast_pos_embed_interpolate(self, grid_thw): | |
| grid_ts, grid_hs, grid_ws = grid_thw[:, 0], grid_thw[:, 1], grid_thw[:, 2] | |
| idx_list = [[], [], [], []] | |
| idxs = torch.arange(self.num_grid_per_side) | |
| weight_list = [[], [], [], []] | |
| for t, h, w in zip(grid_ts, grid_hs, grid_ws): | |
| #### | |
| scale_h = (self.num_grid_per_side - 1) // (h - 1) | |
| scale_w = (self.num_grid_per_side - 1) // (w - 1) | |
| h_vals = idxs * scale_h | |
| w_vals = idxs * scale_w | |
| h_mask = idxs < h | |
| w_mask = idxs < w | |
| h_idxs = torch.where(h_mask, h_vals, torch.zeros_like(h_vals)) | |
| w_idxs = torch.where(w_mask, w_vals, torch.zeros_like(w_vals)) | |
| valid_h = h_mask.nonzero(as_tuple=False).squeeze(1) | |
| valid_w = w_mask.nonzero(as_tuple=False).squeeze(1) | |
| h_idxs_floor = h_idxs.int() | |
| w_idxs_floor = w_idxs.int() | |
| h_idxs = h_idxs.index_select(0, valid_h) | |
| w_idxs = w_idxs.index_select(0, valid_w) | |
| h_idxs_floor = h_idxs.int() | |
| w_idxs_floor = w_idxs.int() | |
| h_idxs_ceil = (h_idxs.int() + 1).clip(max=self.num_grid_per_side - 1) | |
| w_idxs_ceil = (w_idxs.int() + 1).clip(max=self.num_grid_per_side - 1) | |
| dh = h_idxs - h_idxs_floor | |
| dw = w_idxs - w_idxs_floor | |
| base_h = h_idxs_floor * self.num_grid_per_side | |
| base_h_ceil = h_idxs_ceil * self.num_grid_per_side | |
| indices = [ | |
| (base_h[None].T + w_idxs_floor[None]).flatten(), | |
| (base_h[None].T + w_idxs_ceil[None]).flatten(), | |
| (base_h_ceil[None].T + w_idxs_floor[None]).flatten(), | |
| (base_h_ceil[None].T + w_idxs_ceil[None]).flatten(), | |
| ] | |
| weights = [ | |
| ((1 - dh)[None].T * (1 - dw)[None]).flatten(), | |
| ((1 - dh)[None].T * dw[None]).flatten(), | |
| (dh[None].T * (1 - dw)[None]).flatten(), | |
| (dh[None].T * dw[None]).flatten(), | |
| ] | |
| #### | |
| for i in range(4): | |
| idx_list[i].append(indices[i]) | |
| weight_list[i].append(weights[i]) | |
| idx_tensor = torch.stack([torch.cat(x, dim=0) for x in idx_list], dim=0) | |
| weight_tensor = torch.stack([torch.cat(x, dim=0) for x in weight_list], dim=0) | |
| pos_embeds = self.pos_embed(idx_tensor) * weight_tensor[:, :, None] | |
| patch_pos_embeds = pos_embeds[0] + pos_embeds[1] + pos_embeds[2] + pos_embeds[3] | |
| patch_pos_embeds = patch_pos_embeds.split([h * w for h, w in zip(grid_hs, grid_ws)]) | |
| patch_pos_embeds_permute = [] | |
| merge_size = self.config.spatial_merge_size | |
| for pos_embed, t, h, w in zip(patch_pos_embeds, grid_ts, grid_hs, grid_ws): | |
| pos_embed = pos_embed.repeat(t, 1) | |
| pos_embed = ( | |
| pos_embed.view(t, h // merge_size, merge_size, w // merge_size, merge_size, -1) | |
| .permute(0, 1, 3, 2, 4, 5) | |
| .flatten(0, 4) | |
| ) | |
| patch_pos_embeds_permute.append(pos_embed) | |
| patch_pos_embeds = torch.cat(patch_pos_embeds_permute) | |
| return patch_pos_embeds | |
| def forward(self, hidden_states: torch.Tensor, grid_thw: torch.Tensor, **kwargs) -> torch.Tensor: | |
| """ | |
| Args: | |
| hidden_states (`torch.Tensor` of shape `(seq_len, hidden_size)`): | |
| The final hidden states of the model. | |
| grid_thw (`torch.Tensor` of shape `(num_images_or_videos, 3)`): | |
| The temporal, height and width of feature shape of each image in LLM. | |
| Returns: | |
| `torch.Tensor`: hidden_states. | |
| """ | |
| hidden_states = self.patch_embed(hidden_states) | |
| pos_embeds = self.fast_pos_embed_interpolate(grid_thw) | |
| hidden_states = hidden_states + pos_embeds | |
| rotary_pos_emb = self.rot_pos_emb(grid_thw) | |
| seq_len, _ = hidden_states.size() | |
| hidden_states = hidden_states.reshape(seq_len, -1) | |
| rotary_pos_emb = rotary_pos_emb.reshape(seq_len, -1) | |
| emb = torch.cat((rotary_pos_emb, rotary_pos_emb), dim=-1) | |
| position_embeddings = (emb.cos(), emb.sin()) | |
| cu_seqlens = torch.repeat_interleave(grid_thw[:, 1] * grid_thw[:, 2], grid_thw[:, 0]).cumsum( | |
| dim=0, | |
| # Select dtype based on the following factors: | |
| # - FA2 requires that cu_seqlens_q must have dtype int32 | |
| # - torch.onnx.export requires that cu_seqlens_q must have same dtype as grid_thw | |
| # See https://github.com/huggingface/transformers/pull/34852 for more information | |
| dtype=grid_thw.dtype if torch.jit.is_tracing() else torch.int32, | |
| ) | |
| cu_seqlens = F.pad(cu_seqlens, (1, 0), value=0) | |
| deepstack_feature_lists = [] | |
| for layer_num, blk in enumerate(self.blocks): | |
| hidden_states = blk( | |
| hidden_states, | |
| cu_seqlens=cu_seqlens, | |
| position_embeddings=position_embeddings, | |
| **kwargs, | |
| ) | |
| if layer_num in self.deepstack_visual_indexes: | |
| deepstack_feature = self.deepstack_merger_list[self.deepstack_visual_indexes.index(layer_num)]( | |
| hidden_states | |
| ) | |
| deepstack_feature_lists.append(deepstack_feature) | |
| hidden_states = self.merger(hidden_states) | |
| return hidden_states, deepstack_feature_lists | |
| @auto_docstring( | |
| custom_intro=( | |
| "Text part of Qwen3VL, " | |
| "not a pure text-only model, as DeepStack integrates visual features into the early hidden states." | |
| ) | |
| ) | |
| class Qwen3VLTextModel(Qwen3VLPreTrainedModel): | |
| config: Qwen3VLTextConfig | |
| _no_split_modules = ["Qwen3VLTextDecoderLayer"] | |
| def __init__(self, config: Qwen3VLTextConfig): | |
| super().__init__(config) | |
| self.padding_idx = config.pad_token_id | |
| self.vocab_size = config.vocab_size | |
| self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size, self.padding_idx) | |
| self.layers = nn.ModuleList( | |
| [Qwen3VLTextDecoderLayer(config, layer_idx) for layer_idx in range(config.num_hidden_layers)] | |
| ) | |
| self.norm = Qwen3VLTextRMSNorm(config.hidden_size, eps=config.rms_norm_eps) | |
| self.rotary_emb = Qwen3VLTextRotaryEmbedding(config=config) | |
| self.gradient_checkpointing = False | |
| # Initialize weights and apply final processing | |
| self.post_init() | |
| @check_model_inputs | |
| @auto_docstring | |
| def forward( | |
| self, | |
| input_ids: Optional[torch.LongTensor] = None, | |
| attention_mask: Optional[torch.Tensor] = None, | |
| position_ids: Optional[torch.LongTensor] = None, | |
| past_key_values: Optional[Cache] = None, | |
| inputs_embeds: Optional[torch.FloatTensor] = None, | |
| use_cache: Optional[bool] = None, | |
| cache_position: Optional[torch.LongTensor] = None, | |
| # args for deepstack | |
| visual_pos_masks: Optional[torch.Tensor] = None, | |
| deepstack_visual_embeds: Optional[list[torch.Tensor]] = None, | |
| **kwargs: Unpack[FlashAttentionKwargs], | |
| ) -> Union[tuple, BaseModelOutputWithPast]: | |
| r""" | |
| visual_pos_masks (`torch.Tensor` of shape `(batch_size, seqlen)`, *optional*): | |
| The mask of the visual positions. | |
| deepstack_visual_embeds (`list[torch.Tensor]`, *optional*): | |
| The deepstack visual embeddings. The shape is (num_layers, visual_seqlen, embed_dim). | |
| The feature is extracted from the different visual encoder layers, and fed to the decoder | |
| hidden states. It's from the paper DeepStack(https://arxiv.org/abs/2406.04334). | |
| """ | |
| if (input_ids is None) ^ (inputs_embeds is not None): | |
| raise ValueError("You must specify exactly one of input_ids or inputs_embeds") | |
| # torch.jit.trace() doesn't support cache objects in the output | |
| if use_cache and past_key_values is None and not torch.jit.is_tracing(): | |
| past_key_values = DynamicCache(config=self.config) | |
| if inputs_embeds is None: | |
| inputs_embeds = self.embed_tokens(input_ids) | |
| if cache_position is None: | |
| past_seen_tokens = past_key_values.get_seq_length() if past_key_values is not None else 0 | |
| cache_position = torch.arange( | |
| past_seen_tokens, past_seen_tokens + inputs_embeds.shape[1], device=inputs_embeds.device | |
| ) | |
| # the hard coded `3` is for temporal, height and width. | |
| if position_ids is None: | |
| position_ids = cache_position.view(1, 1, -1).expand(3, inputs_embeds.shape[0], -1) | |
| elif position_ids.ndim == 2: | |
| position_ids = position_ids[None, ...].expand(3, position_ids.shape[0], -1) | |
| if position_ids.ndim == 3 and position_ids.shape[0] == 4: | |
| text_position_ids = position_ids[0] | |
| position_ids = position_ids[1:] | |
| else: | |
| text_position_ids = position_ids[0] | |
| attention_mask = create_causal_mask( | |
| config=self.config, | |
| input_embeds=inputs_embeds, | |
| attention_mask=attention_mask, | |
| cache_position=cache_position, | |
| past_key_values=past_key_values, | |
| position_ids=text_position_ids, | |
| ) | |
| hidden_states = inputs_embeds | |
| # create position embeddings to be shared across the decoder layers | |
| position_embeddings = self.rotary_emb(hidden_states, position_ids) | |
| # decoder layers | |
| for layer_idx, decoder_layer in enumerate(self.layers): | |
| layer_outputs = decoder_layer( | |
| hidden_states, | |
| attention_mask=attention_mask, | |
| position_ids=text_position_ids, | |
| past_key_values=past_key_values, | |
| cache_position=cache_position, | |
| position_embeddings=position_embeddings, | |
| **kwargs, | |
| ) | |
| hidden_states = layer_outputs | |
| # add visual features to the hidden states of first several layers | |
| if deepstack_visual_embeds is not None and layer_idx in range(len(deepstack_visual_embeds)): | |
| hidden_states = self._deepstack_process( | |
| hidden_states, | |
| visual_pos_masks, | |
| deepstack_visual_embeds[layer_idx], | |
| ) | |
| hidden_states = self.norm(hidden_states) | |
| return BaseModelOutputWithPast( | |
| last_hidden_state=hidden_states, | |
| past_key_values=past_key_values, | |
| ) | |
| def _deepstack_process( | |
| self, hidden_states: torch.Tensor, visual_pos_masks: torch.Tensor, visual_embeds: torch.Tensor | |
| ): | |
| visual_pos_masks = visual_pos_masks.to(hidden_states.device) | |
| visual_embeds = visual_embeds.to(hidden_states.device, hidden_states.dtype) | |
| local_this = hidden_states[visual_pos_masks, :].clone() + visual_embeds | |
| hidden_states[visual_pos_masks, :] = local_this | |
| return hidden_states | |
| @auto_docstring | |
| class Qwen3VLModel(Qwen3VLPreTrainedModel): | |
| base_model_prefix = "" | |
| _checkpoint_conversion_mapping = {} | |
| # Reference: fix gemma3 grad acc #37208 | |
| accepts_loss_kwargs = False | |
| config: Qwen3VLConfig | |
| _no_split_modules = ["Qwen3VLTextDecoderLayer", "Qwen3VLVisionBlock"] | |
| def __init__(self, config): | |
| super().__init__(config) | |
| self.visual = Qwen3VLVisionModel._from_config(config.vision_config) | |
| self.language_model = Qwen3VLTextModel._from_config(config.text_config) | |
| self.rope_deltas = None # cache rope_deltas here | |
| # Initialize weights and apply final processing | |
| self.post_init() | |
| def get_input_embeddings(self): | |
| return self.language_model.get_input_embeddings() | |
| def set_input_embeddings(self, value): | |
| self.language_model.set_input_embeddings(value) | |
| def set_decoder(self, decoder): | |
| self.language_model = decoder | |
| def get_decoder(self): | |
| return self.language_model | |
| def get_rope_index( | |
| self, | |
| input_ids: torch.LongTensor, | |
| image_grid_thw: torch.LongTensor | None = None, | |
| video_grid_thw: torch.LongTensor | None = None, | |
| attention_mask: torch.Tensor | None = None, | |
| ): | |
| device = input_ids.device | |
| B, S = input_ids.shape | |
| vision_start_token_id = self.config.vision_start_token_id | |
| image_token_id = self.config.image_token_id | |
| video_token_id = self.config.video_token_id | |
| spatial_merge_size = self.config.vision_config.spatial_merge_size | |
| if attention_mask is None: | |
| attention_mask = torch.ones_like(input_ids, dtype=torch.long) | |
| #### | |
| if image_grid_thw is None and video_grid_thw is None: | |
| pos = attention_mask.cumsum(-1) - 1 | |
| pos.masked_fill_(attention_mask == 0, 0) | |
| position_ids = pos.unsqueeze(0).expand(3, -1, -1).contiguous() | |
| rope_deltas = torch.zeros((B, 1), device=device, dtype=torch.long) | |
| return position_ids, rope_deltas | |
| if video_grid_thw is not None and video_grid_thw.numel() > 0: | |
| video_grid_thw = torch.repeat_interleave( | |
| video_grid_thw, video_grid_thw[:, 0], dim=0 | |
| ) | |
| video_grid_thw[:, 0] = 1 | |
| ##### | |
| position_ids = torch.zeros((3, B, S), device=device, dtype=torch.long) | |
| rope_deltas = torch.zeros((B, 1), device=device, dtype=torch.long) | |
| zero_scalar = torch.zeros((), device=device, dtype=torch.long) | |
| zero_thw = torch.zeros(3, device=device, dtype=torch.long) | |
| for b in range(B): | |
| mask = attention_mask[b].bool() | |
| tokens = input_ids[b] | |
| #### | |
| masked_indices = torch.nonzero(mask, as_tuple=False).squeeze(1) | |
| comp_pos = attention_mask[b].to(torch.long).cumsum(0) - 1 | |
| comp_pos = torch.clamp(comp_pos, min=0) | |
| llm_pos_chunks = [] | |
| st = torch.zeros((), device=device, dtype=torch.long) | |
| img_ptr = torch.zeros((), device=device, dtype=torch.long) | |
| vid_ptr = torch.zeros((), device=device, dtype=torch.long) | |
| base = torch.zeros((), device=device, dtype=torch.long) | |
| masked_indices_safe = torch.cat([masked_indices, masked_indices.new_zeros((1,))], dim=0).to(device) | |
| num_masked = masked_indices.numel() | |
| num_masked_t = torch.tensor(num_masked, device=device, dtype=torch.long) | |
| max_masked_idx = torch.clamp(num_masked_t - 1, min=0) | |
| for pos in range(S): | |
| #### | |
| pos_t = comp_pos[pos] | |
| is_vs = mask[pos] & (tokens[pos] == vision_start_token_id) | |
| text_len = torch.where(is_vs, pos_t - st, zero_scalar) | |
| chunk = torch.arange(text_len, device=device, dtype=torch.long).view(1, -1).expand(3, -1) + base | |
| llm_pos_chunks.append(chunk) | |
| base = base + text_len | |
| next_pos_idx = torch.minimum(pos_t + 1, max_masked_idx) | |
| next_pos_idx_i = next_pos_idx.to(torch.long).unsqueeze(0) | |
| next_abs_pos_tensor = torch.gather(masked_indices_safe, 0, next_pos_idx_i) | |
| next_abs_pos = next_abs_pos_tensor[0] | |
| # ----------------------------------------------------------------------------------------- | |
| # next_tok = tokens[next_abs_pos] | |
| next_abs_pos_i = next_abs_pos.unsqueeze(0) | |
| next_tok = torch.gather(tokens, 0, next_abs_pos_i)[0] | |
| # -------------------------------------------------------------------------------------------------- | |
| # print("next_tok: ", next_tok) | |
| is_image = is_vs & (next_tok == image_token_id) | |
| is_video = is_vs & (next_tok == video_token_id) | |
| # img_thw = ( | |
| # image_grid_thw[img_ptr] | |
| # if image_grid_thw is not None and img_ptr < image_grid_thw.shape[0] | |
| # else zero_thw | |
| # ) | |
| # vid_thw = ( | |
| # video_grid_thw[vid_ptr] | |
| # if video_grid_thw is not None and vid_ptr < video_grid_thw.shape[0] | |
| # else zero_thw | |
| # ) | |
| # -------------------------------------------------------------------------------------------------------------------- | |
| if image_grid_thw is None: | |
| image_grid_thw = torch.zeros((1, 3), device=device, dtype=torch.long) | |
| else: | |
| image_grid_thw = image_grid_thw.to(device) | |
| if video_grid_thw is None: | |
| video_grid_thw = torch.zeros((1, 3), device=device, dtype=torch.long) | |
| else: | |
| video_grid_thw = video_grid_thw.to(device) | |
| img_len = image_grid_thw.shape[0] | |
| img_idx = torch.clamp(img_ptr, max=img_len - 1) | |
| img_thw = torch.index_select(image_grid_thw, 0, img_idx.unsqueeze(0))[0] | |
| # vid_thw = ( | |
| # video_grid_thw[vid_ptr] | |
| # if video_grid_thw is not None and vid_ptr < video_grid_thw.shape[0] | |
| # else zero_thw | |
| # ) | |
| vid_len = video_grid_thw.shape[0] | |
| vid_idx = torch.clamp(vid_ptr, max=vid_len - 1) | |
| vid_thw = torch.index_select(video_grid_thw, 0, vid_idx.unsqueeze(0))[0] | |
| # vid_cond = ( | |
| # torch.tensor(video_grid_thw is not None, device=vid_ptr.device) | |
| # & (vid_ptr < video_grid_thw.shape[0]) | |
| # ) | |
| # vid_thw = torch.where( | |
| # vid_cond, | |
| # torch.index_select(video_grid_thw, 0, vid_ptr.unsqueeze(0).to(device))[0], | |
| # zero_thw | |
| # ) | |
| # ----------------------------------------------------------------------------------------------------------------------------- | |
| thw = torch.where(is_image, img_thw, vid_thw) | |
| t, h, w = thw | |
| gh = torch.where(is_vs, h // spatial_merge_size, zero_scalar) | |
| gw = torch.where(is_vs, w // spatial_merge_size, zero_scalar) | |
| vision_len = gh * gw | |
| t_idx = torch.zeros(vision_len, device=device, dtype=torch.long) | |
| gw_safe = torch.clamp(gw, min=1) | |
| h_idx = torch.arange(vision_len, device=device) // gw_safe | |
| w_idx = torch.arange(vision_len, device=device) % gw_safe | |
| patch = torch.stack([t_idx, h_idx, w_idx]) + base | |
| llm_pos_chunks.append(patch) | |
| base = base + torch.where(is_vs, torch.maximum(gh, gw), zero_scalar) | |
| img_ptr += is_image.to(torch.long) | |
| vid_ptr += is_video.to(torch.long) | |
| st = torch.where(is_vs, pos_t + vision_len, st) | |
| tail_len = num_masked - st | |
| tail_chunk = torch.arange(tail_len, device=device, dtype=torch.long).view(1, -1).expand(3, -1) + base | |
| llm_pos_chunks.append(tail_chunk) | |
| if not llm_pos_chunks: | |
| llm_positions = torch.zeros((3, 0), device=device, dtype=torch.long) | |
| else: | |
| llm_positions = torch.cat(llm_pos_chunks, dim=1) | |
| position_ids[:, b, masked_indices] = llm_positions | |
| rope_deltas[b, 0] = llm_positions.max() + 1 - num_masked | |
| return position_ids, rope_deltas | |
| def get_video_features( | |
| self, pixel_values_videos: torch.FloatTensor, video_grid_thw: Optional[torch.LongTensor] = None | |
| ): | |
| """ | |
| Encodes videos into continuous embeddings that can be forwarded to the language model. The deepstack visual features are also returned. | |
| Args: | |
| pixel_values_videos (`torch.FloatTensor` of shape `(batch_size, num_channels, image_size, image_size)`): | |
| The tensors corresponding to the input videos. | |
| video_grid_thw (`torch.LongTensor` of shape `(num_videos, 3)`, *optional*): | |
| The temporal, height and width of feature shape of each video in LLM. | |
| """ | |
| # Same implementation as for images | |
| return self.get_image_features(pixel_values_videos, video_grid_thw) | |
| def get_image_features(self, pixel_values: torch.FloatTensor, image_grid_thw: Optional[torch.LongTensor] = None): | |
| """ | |
| Encodes images into continuous embeddings that can be forwarded to the language model. The deepstack visual features are also returned. | |
| Args: | |
| pixel_values (`torch.FloatTensor` of shape `(batch_size, num_channels, image_size, image_size)`): | |
| The tensors corresponding to the input images. | |
| image_grid_thw (`torch.LongTensor` of shape `(num_images, 3)`, *optional*): | |
| The temporal, height and width of feature shape of each image in LLM. | |
| """ | |
| pixel_values = pixel_values.type(self.visual.dtype) | |
| image_embeds, deepstack_image_embeds = self.visual(pixel_values, grid_thw=image_grid_thw) | |
| split_sizes = (image_grid_thw.prod(-1) // self.visual.spatial_merge_size**2).tolist() | |
| image_embeds = torch.split(image_embeds, split_sizes) | |
| return image_embeds, deepstack_image_embeds | |
| def get_placeholder_mask( | |
| self, | |
| input_ids: torch.LongTensor, | |
| inputs_embeds: torch.FloatTensor, | |
| image_features: Optional[torch.FloatTensor] = None, | |
| video_features: Optional[torch.FloatTensor] = None, | |
| ): | |
| """ | |
| Obtains multimodal placeholder mask from `input_ids` or `inputs_embeds`, and checks that the placeholder token count is | |
| equal to the length of multimodal features. If the lengths are different, an error is raised. | |
| """ | |
| if input_ids is None: | |
| special_image_mask = inputs_embeds == self.get_input_embeddings()( | |
| torch.tensor(self.config.image_token_id, dtype=torch.long, device=inputs_embeds.device) | |
| ) | |
| special_image_mask = special_image_mask.all(-1) | |
| special_video_mask = inputs_embeds == self.get_input_embeddings()( | |
| torch.tensor(self.config.video_token_id, dtype=torch.long, device=inputs_embeds.device) | |
| ) | |
| special_video_mask = special_video_mask.all(-1) | |
| else: | |
| special_image_mask = input_ids == self.config.image_token_id | |
| special_video_mask = input_ids == self.config.video_token_id | |
| n_image_tokens = special_image_mask.sum() | |
| special_image_mask = special_image_mask.unsqueeze(-1).expand_as(inputs_embeds).to(inputs_embeds.device) | |
| # if image_features is not None and inputs_embeds[special_image_mask].numel() != image_features.numel(): | |
| # raise ValueError( | |
| # f"Image features and image tokens do not match: tokens: {n_image_tokens}, features {image_features.shape[0]}" | |
| # ) | |
| n_video_tokens = special_video_mask.sum() | |
| special_video_mask = special_video_mask.unsqueeze(-1).expand_as(inputs_embeds).to(inputs_embeds.device) | |
| if video_features is not None and inputs_embeds[special_video_mask].numel() != video_features.numel(): | |
| raise ValueError( | |
| f"Videos features and video tokens do not match: tokens: {n_video_tokens}, features {video_features.shape[0]}" | |
| ) | |
| return special_image_mask, special_video_mask | |
| @auto_docstring | |
| @check_model_inputs | |
| def forward( | |
| self, | |
| input_ids: torch.LongTensor = None, | |
| attention_mask: Optional[torch.Tensor] = None, | |
| position_ids: Optional[torch.LongTensor] = None, | |
| past_key_values: Optional[Cache] = None, | |
| inputs_embeds: Optional[torch.FloatTensor] = None, | |
| pixel_values: Optional[torch.Tensor] = None, | |
| pixel_values_videos: Optional[torch.FloatTensor] = None, | |
| image_grid_thw: Optional[torch.LongTensor] = None, | |
| video_grid_thw: Optional[torch.LongTensor] = None, | |
| cache_position: Optional[torch.LongTensor] = None, | |
| **kwargs: Unpack[TransformersKwargs], | |
| ) -> Union[tuple, Qwen3VLModelOutputWithPast]: | |
| r""" | |
| image_grid_thw (`torch.LongTensor` of shape `(num_images, 3)`, *optional*): | |
| The temporal, height and width of feature shape of each image in LLM. | |
| video_grid_thw (`torch.LongTensor` of shape `(num_videos, 3)`, *optional*): | |
| The temporal, height and width of feature shape of each video in LLM. | |
| """ | |
| if (input_ids is None) ^ (inputs_embeds is not None): | |
| raise ValueError("You must specify exactly one of input_ids or inputs_embeds") | |
| if inputs_embeds is None: | |
| inputs_embeds = self.get_input_embeddings()(input_ids) | |
| image_mask = None | |
| video_mask = None | |
| if pixel_values is not None: | |
| image_embeds, deepstack_image_embeds = self.get_image_features(pixel_values, image_grid_thw) | |
| image_embeds = torch.cat(image_embeds, dim=0).to(inputs_embeds.device, inputs_embeds.dtype) | |
| image_mask, _ = self.get_placeholder_mask( | |
| input_ids, inputs_embeds=inputs_embeds, image_features=image_embeds | |
| ) | |
| inputs_embeds = inputs_embeds.masked_scatter(image_mask, image_embeds) | |
| if pixel_values_videos is not None: | |
| video_embeds, deepstack_video_embeds = self.get_video_features(pixel_values_videos, video_grid_thw) | |
| video_embeds = torch.cat(video_embeds, dim=0).to(inputs_embeds.device, inputs_embeds.dtype) | |
| _, video_mask = self.get_placeholder_mask( | |
| input_ids, inputs_embeds=inputs_embeds, video_features=video_embeds | |
| ) | |
| inputs_embeds = inputs_embeds.masked_scatter(video_mask, video_embeds) | |
| visual_pos_masks = None | |
| deepstack_visual_embeds = None | |
| if image_mask is not None and video_mask is not None: | |
| # aggregate visual_pos_masks and deepstack_visual_embeds | |
| image_mask = image_mask[..., 0] | |
| video_mask = video_mask[..., 0] | |
| visual_pos_masks = image_mask | video_mask | |
| deepstack_visual_embeds = [] | |
| image_mask_joint = image_mask[visual_pos_masks] | |
| video_mask_joint = video_mask[visual_pos_masks] | |
| for img_embed, vid_embed in zip(deepstack_image_embeds, deepstack_video_embeds): | |
| embed_joint = img_embed.new_zeros(visual_pos_masks.sum(), img_embed.shape[-1]).to(img_embed.device) | |
| embed_joint[image_mask_joint, :] = img_embed | |
| embed_joint[video_mask_joint, :] = vid_embed | |
| deepstack_visual_embeds.append(embed_joint) | |
| elif image_mask is not None: | |
| image_mask = image_mask[..., 0] | |
| visual_pos_masks = image_mask | |
| deepstack_visual_embeds = deepstack_image_embeds | |
| elif video_mask is not None: | |
| video_mask = video_mask[..., 0] | |
| visual_pos_masks = video_mask | |
| deepstack_visual_embeds = deepstack_video_embeds | |
| if position_ids is None: | |
| attention_mask_tensor = ( | |
| attention_mask if not isinstance(attention_mask, dict) else attention_mask["full_attention"] | |
| ) | |
| if attention_mask_tensor is not None and attention_mask_tensor.ndim == 4: | |
| attention_mask_tensor = torch.diagonal(attention_mask_tensor[:, 0], dim1=1, dim2=2) | |
| # Only apply conversion for floating point tensors (inverted masks) | |
| if attention_mask_tensor.dtype.is_floating_point: | |
| attention_mask_tensor = attention_mask_tensor / torch.finfo(attention_mask_tensor.dtype).min | |
| attention_mask_tensor = (1.0 - attention_mask_tensor).int() | |
| # Calculate RoPE index once per generation in the pre-fill stage only. | |
| # When compiling, we can't check tensor values thus we check only input length | |
| # It is safe to assume that `length!=1` means we're in pre-fill because compiled | |
| # models currently cannot do asssisted decoding | |
| prefill_compiled_stage = is_torchdynamo_compiling() and ( | |
| (input_ids is not None and input_ids.shape[1] != 1) | |
| or (inputs_embeds is not None and inputs_embeds.shape[1] != 1) | |
| ) | |
| prefill_noncompiled_stage = not is_torchdynamo_compiling() and ( | |
| (cache_position is not None and cache_position[0] == 0) | |
| or (past_key_values is None or past_key_values.get_seq_length() == 0) | |
| ) | |
| if (prefill_compiled_stage or prefill_noncompiled_stage) or self.rope_deltas is None: | |
| position_ids, rope_deltas = self.get_rope_index( | |
| input_ids, | |
| image_grid_thw, | |
| video_grid_thw, | |
| attention_mask=attention_mask_tensor, | |
| ) | |
| self.rope_deltas = rope_deltas | |
| # then use the prev pre-calculated rope-deltas to get the correct position ids | |
| else: | |
| batch_size, seq_length, _ = inputs_embeds.shape | |
| delta = ( | |
| (cache_position[0] + self.rope_deltas).to(inputs_embeds.device) | |
| if cache_position is not None | |
| else 0 | |
| ) | |
| position_ids = torch.arange(seq_length, device=inputs_embeds.device) | |
| position_ids = position_ids.view(1, -1).expand(batch_size, -1) | |
| if cache_position is not None: # otherwise `deltas` is an int `0` | |
| delta = delta.repeat_interleave(batch_size // delta.shape[0], dim=0) | |
| position_ids = position_ids.add(delta) | |
| position_ids = position_ids.unsqueeze(0).expand(3, -1, -1) | |
| outputs = self.language_model( | |
| input_ids=None, | |
| position_ids=position_ids, | |
| attention_mask=attention_mask, | |
| past_key_values=past_key_values, | |
| inputs_embeds=inputs_embeds, | |
| cache_position=cache_position, | |
| visual_pos_masks=visual_pos_masks, | |
| deepstack_visual_embeds=deepstack_visual_embeds, | |
| **kwargs, | |
| ) | |
| return Qwen3VLModelOutputWithPast( | |
| last_hidden_state=outputs.last_hidden_state, | |
| past_key_values=outputs.past_key_values, | |
| rope_deltas=self.rope_deltas, | |
| ) | |
| @dataclass | |
| @auto_docstring( | |
| custom_intro=""" | |
| Base class for Qwen3VL causal language model (or autoregressive) outputs. | |
| """ | |
| ) | |
| class Qwen3VLCausalLMOutputWithPast(ModelOutput): | |
| r""" | |
| loss (`torch.FloatTensor` of shape `(1,)`, *optional*, returned when `labels` is provided): | |
| Language modeling loss (for next-token prediction). | |
| logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.vocab_size)`): | |
| Prediction scores of the language modeling head (scores for each vocabulary token before SoftMax). | |
| past_key_values (`Cache`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`): | |
| It is a [`~cache_utils.Cache`] instance. For more details, see our [kv cache guide](https://huggingface.co/docs/transformers/en/kv_cache). | |
| Contains pre-computed hidden-states (key and values in the self-attention blocks) that can be used (see | |
| `past_key_values` input) to speed up sequential decoding. | |
| rope_deltas (`torch.LongTensor` of shape `(batch_size, )`, *optional*): | |
| The rope index difference between sequence length and multimodal rope. | |
| """ | |
| loss: Optional[torch.FloatTensor] = None | |
| logits: Optional[torch.FloatTensor] = None | |
| past_key_values: Optional[Cache] = None | |
| hidden_states: Optional[tuple[torch.FloatTensor]] = None | |
| attentions: Optional[tuple[torch.FloatTensor]] = None | |
| rope_deltas: Optional[torch.LongTensor] = None | |
| class Qwen3VLForConditionalGeneration(Qwen3VLPreTrainedModel, GenerationMixin): | |
| _checkpoint_conversion_mapping = {} | |
| _tied_weights_keys = ["lm_head.weight"] | |
| # Reference: fix gemma3 grad acc #37208 | |
| accepts_loss_kwargs = False | |
| config: Qwen3VLConfig | |
| def __init__(self, config): | |
| super().__init__(config) | |
| self.model = Qwen3VLModel(config) | |
| self.lm_head = nn.Linear(config.text_config.hidden_size, config.text_config.vocab_size, bias=False) | |
| self.post_init() | |
| def get_input_embeddings(self): | |
| return self.model.get_input_embeddings() | |
| def set_input_embeddings(self, value): | |
| self.model.set_input_embeddings(value) | |
| def set_decoder(self, decoder): | |
| self.model.set_decoder(decoder) | |
| def get_decoder(self): | |
| return self.model.get_decoder() | |
| def get_video_features( | |
| self, pixel_values_videos: torch.FloatTensor, video_grid_thw: Optional[torch.LongTensor] = None | |
| ): | |
| return self.model.get_video_features(pixel_values_videos, video_grid_thw) | |
| def get_image_features(self, pixel_values: torch.FloatTensor, image_grid_thw: Optional[torch.LongTensor] = None): | |
| return self.model.get_image_features(pixel_values, image_grid_thw) | |
| # Make modules available through conditional class for BC | |
| @property | |
| def language_model(self): | |
| return self.model.language_model | |
| @property | |
| def visual(self): | |
| return self.model.visual | |
| @check_model_inputs | |
| def forward( | |
| self, | |
| input_ids: torch.LongTensor = None, | |
| attention_mask: Optional[torch.Tensor] = None, | |
| position_ids: Optional[torch.LongTensor] = None, | |
| past_key_values: Optional[Cache] = None, | |
| inputs_embeds: Optional[torch.FloatTensor] = None, | |
| labels: Optional[torch.LongTensor] = None, | |
| pixel_values: Optional[torch.Tensor] = None, | |
| pixel_values_videos: Optional[torch.FloatTensor] = None, | |
| image_grid_thw: Optional[torch.LongTensor] = None, | |
| video_grid_thw: Optional[torch.LongTensor] = None, | |
| cache_position: Optional[torch.LongTensor] = None, | |
| logits_to_keep: Union[int, torch.Tensor] = 0, | |
| **kwargs: Unpack[TransformersKwargs], | |
| ) -> Union[tuple, Qwen3VLCausalLMOutputWithPast]: | |
| r""" | |
| labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): | |
| Labels for computing the masked language modeling loss. Indices should either be in `[0, ..., | |
| config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored | |
| (masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`. | |
| image_grid_thw (`torch.LongTensor` of shape `(num_images, 3)`, *optional*): | |
| The temporal, height and width of feature shape of each image in LLM. | |
| video_grid_thw (`torch.LongTensor` of shape `(num_videos, 3)`, *optional*): | |
| The temporal, height and width of feature shape of each video in LLM. | |
| Example: | |
| TODO: Add example | |
| """ | |
| outputs = self.model( | |
| input_ids=input_ids, | |
| pixel_values=pixel_values, | |
| pixel_values_videos=pixel_values_videos, | |
| image_grid_thw=image_grid_thw, | |
| video_grid_thw=video_grid_thw, | |
| position_ids=position_ids, | |
| attention_mask=attention_mask, | |
| past_key_values=past_key_values, | |
| inputs_embeds=inputs_embeds, | |
| cache_position=cache_position, | |
| **kwargs, | |
| ) | |
| hidden_states = outputs[0] | |
| # Only compute necessary logits, and do not upcast them to float if we are not computing the loss | |
| slice_indices = slice(-logits_to_keep, None) if isinstance(logits_to_keep, int) else logits_to_keep | |
| logits = self.lm_head(hidden_states[:, slice_indices, :]) | |
| loss = None | |
| if labels is not None: | |
| loss = self.loss_function(logits=logits, labels=labels, vocab_size=self.config.text_config.vocab_size) | |
| return Qwen3VLCausalLMOutputWithPast( | |
| loss=loss, | |
| logits=logits, | |
| past_key_values=outputs.past_key_values, | |
| rope_deltas=outputs.rope_deltas, | |
| ) | |
| def prepare_inputs_for_generation( | |
| self, | |
| input_ids, | |
| past_key_values=None, | |
| attention_mask=None, | |
| inputs_embeds=None, | |
| cache_position=None, | |
| position_ids=None, | |
| use_cache=True, | |
| pixel_values=None, | |
| pixel_values_videos=None, | |
| image_grid_thw=None, | |
| video_grid_thw=None, | |
| **kwargs, | |
| ): | |
| # Overwritten -- in specific circumstances we don't want to forward image inputs to the model | |
| model_inputs = super().prepare_inputs_for_generation( | |
| input_ids, | |
| past_key_values=past_key_values, | |
| attention_mask=attention_mask, | |
| inputs_embeds=inputs_embeds, | |
| cache_position=cache_position, | |
| position_ids=position_ids, | |
| pixel_values=pixel_values, | |
| pixel_values_videos=pixel_values_videos, | |
| image_grid_thw=image_grid_thw, | |
| video_grid_thw=video_grid_thw, | |
| use_cache=use_cache, | |
| **kwargs, | |
| ) | |
| # Qwen3VL position_ids are prepareed with rope_deltas in forward | |
| model_inputs["position_ids"] = None | |
| if cache_position[0] != 0: | |
| model_inputs["pixel_values"] = None | |
| model_inputs["pixel_values_videos"] = None | |
| return model_inputs | |
| def _get_image_nums_and_video_nums( | |
| self, | |
| input_ids: Optional[torch.LongTensor], | |
| inputs_embeds: Optional[torch.Tensor] = None, | |
| ) -> tuple[torch.Tensor, torch.Tensor]: | |
| """ | |
| Get the number of images and videos for each sample to calculate the separation length of the sample tensor. | |
| These parameters are not passed through the processor to avoid unpredictable impacts from interface modifications. | |
| Args: | |
| input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`): | |
| Indices of input sequence tokens in the vocabulary. | |
| Returns: | |
| image_nums (`torch.LongTensor` of shape `(batch_size, num_images_sample)`) | |
| video_nums (`torch.LongTensor` of shape `(batch_size, num_videos_sample)`) | |
| """ | |
| image_token_id = self.config.image_token_id | |
| video_token_id = self.config.video_token_id | |
| vision_start_token_id = self.config.vision_start_token_id | |
| if inputs_embeds is not None: | |
| vision_start_mask = ( | |
| inputs_embeds | |
| == self.get_input_embeddings()( | |
| torch.tensor(vision_start_token_id, dtype=torch.long, device=inputs_embeds.device) | |
| ) | |
| )[..., 0] | |
| image_mask = ( | |
| inputs_embeds | |
| == self.get_input_embeddings()( | |
| torch.tensor(image_token_id, dtype=torch.long, device=inputs_embeds.device) | |
| ) | |
| )[..., 0] | |
| video_mask = ( | |
| inputs_embeds | |
| == self.get_input_embeddings()( | |
| torch.tensor(video_token_id, dtype=torch.long, device=inputs_embeds.device) | |
| ) | |
| )[..., 0] | |
| else: | |
| vision_start_mask = input_ids == vision_start_token_id | |
| image_mask = input_ids == image_token_id | |
| video_mask = input_ids == video_token_id | |
| vision_first_mask = torch.roll(vision_start_mask, shifts=1, dims=1) | |
| image_nums = torch.sum(vision_first_mask & image_mask, dim=1) | |
| video_nums = torch.sum(vision_first_mask & video_mask, dim=1) | |
| return image_nums, video_nums | |
| def _expand_inputs_for_generation( | |
| self, | |
| expand_size: int = 1, | |
| is_encoder_decoder: bool = False, | |
| input_ids: Optional[torch.LongTensor] = None, | |
| **model_kwargs, | |
| ) -> tuple[torch.LongTensor, dict[str, Any]]: | |
| # Overwritten -- Support for expanding tensors without a batch size dimension | |
| # e.g., pixel_values, image_grid_thw, pixel_values_videos, video_grid_thw, second_per_grid_t | |
| # pixel_values.shape[0] is sum(seqlen_images for samples) | |
| # image_grid_thw.shape[0] is sum(num_images for samples) | |
| if expand_size == 1: | |
| return input_ids, model_kwargs | |
| visual_keys = ["pixel_values", "image_grid_thw", "pixel_values_videos", "video_grid_thw", "second_per_grid_ts"] | |
| def _expand_dict_for_generation_visual(dict_to_expand): | |
| image_grid_thw = model_kwargs.get("image_grid_thw", None) | |
| video_grid_thw = model_kwargs.get("video_grid_thw", None) | |
| image_nums, video_nums = self._get_image_nums_and_video_nums( | |
| input_ids, inputs_embeds=model_kwargs.get("inputs_embeds", None) | |
| ) | |
| def _repeat_interleave_samples(x, lengths, repeat_times): | |
| samples = torch.split(x, lengths) | |
| repeat_args = [repeat_times] + [1] * (x.dim() - 1) | |
| result = torch.cat([sample.repeat(*repeat_args) for sample in samples], dim=0) | |
| return result | |
| for key in dict_to_expand: | |
| if key == "pixel_values": | |
| # split images into samples | |
| samples = torch.split(image_grid_thw, list(image_nums)) | |
| # compute the sequence length of images for each sample | |
| lengths = [torch.prod(sample, dim=1).sum() for sample in samples] | |
| dict_to_expand[key] = _repeat_interleave_samples( | |
| dict_to_expand[key], lengths=lengths, repeat_times=expand_size | |
| ) | |
| elif key == "image_grid_thw": | |
| # get the num of images for each sample | |
| lengths = list(image_nums) | |
| dict_to_expand[key] = _repeat_interleave_samples( | |
| dict_to_expand[key], lengths=lengths, repeat_times=expand_size | |
| ) | |
| elif key == "pixel_values_videos": | |
| samples = torch.split(video_grid_thw, list(video_nums)) | |
| lengths = [torch.prod(sample, dim=1).sum() for sample in samples] | |
| dict_to_expand[key] = _repeat_interleave_samples( | |
| dict_to_expand[key], lengths=lengths, repeat_times=expand_size | |
| ) | |
| elif key == "video_grid_thw": | |
| lengths = list(video_nums) | |
| dict_to_expand[key] = _repeat_interleave_samples( | |
| dict_to_expand[key], lengths=lengths, repeat_times=expand_size | |
| ) | |
| elif key == "second_per_grid_ts": | |
| dict_to_expand[key] = _repeat_interleave_samples( | |
| dict_to_expand[key], lengths=list(video_nums), repeat_times=expand_size | |
| ) | |
| return dict_to_expand | |
| def _expand_dict_for_generation(dict_to_expand): | |
| for key in dict_to_expand: | |
| if ( | |
| key != "cache_position" | |
| and dict_to_expand[key] is not None | |
| and isinstance(dict_to_expand[key], torch.Tensor) | |
| and key not in visual_keys | |
| ): | |
| dict_to_expand[key] = dict_to_expand[key].repeat_interleave(expand_size, dim=0) | |
| return dict_to_expand | |
| model_kwargs = _expand_dict_for_generation_visual(model_kwargs) | |
| if input_ids is not None: | |
| input_ids = input_ids.repeat_interleave(expand_size, dim=0) | |
| model_kwargs = _expand_dict_for_generation(model_kwargs) | |
| if is_encoder_decoder: | |
| if model_kwargs.get("encoder_outputs") is None: | |
| raise ValueError("If `is_encoder_decoder` is True, make sure that `encoder_outputs` is defined.") | |
| model_kwargs["encoder_outputs"] = _expand_dict_for_generation(model_kwargs["encoder_outputs"]) | |
| return input_ids, model_kwargs | |
| __all__ = [ | |
| "Qwen3VLVisionModel", | |
| "Qwen3VLForConditionalGeneration", | |
| "Qwen3VLModel", | |
| "Qwen3VLPreTrainedModel", | |
| "Qwen3VLTextModel", | |
| ] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment