Spaces:
Paused
Paused
| # Adapted from https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/unet_2d_condition.py | |
| import os, json | |
| from dataclasses import dataclass | |
| from typing import List, Optional, Tuple, Union | |
| import torch | |
| import torch.nn as nn | |
| import torch.utils.checkpoint | |
| from diffusers.configuration_utils import ConfigMixin, register_to_config | |
| from diffusers.models.modeling_utils import ModelMixin, _get_model_file | |
| from diffusers.utils import BaseOutput, logging | |
| from diffusers.models.unet_2d_condition import UNet2DConditionModel | |
| from diffusers.models.activations import get_activation | |
| from diffusers.models.embeddings import ( | |
| TimestepEmbedding, | |
| Timesteps, | |
| GaussianFourierProjection, | |
| ) | |
| from .unet_2d_blocks import ( | |
| CrossAttnDownBlock2D, | |
| CrossAttnUpBlock2D, | |
| DownBlock2D, | |
| UNetMidBlock2DCrossAttn, | |
| UpBlock2D, | |
| get_down_block, | |
| get_up_block, | |
| ) | |
| from .resnet_2d import InflatedConv3d | |
| logger = logging.get_logger(__name__) # pylint: disable=invalid-name | |
| class UNet2DConditionOutput(BaseOutput): | |
| """ | |
| Args: | |
| sample (`torch.FloatTensor` of shape `(batch_size, num_channels, height, width)`): | |
| Hidden states conditioned on `encoder_hidden_states` input. Output of last layer of model. | |
| """ | |
| sample: torch.FloatTensor | |
| class CustomUNet2DConditionModel(UNet2DConditionModel): | |
| r""" | |
| A custom conditional 2D UNet that takes external features as key and values in the attention layer and return | |
| an output with the same size of input latent. | |
| This model inherits from [`ModelMixin`]. Check the superclass documentation for it's generic methods implemented | |
| for all models (such as downloading or saving). | |
| Parameters: | |
| sample_size (`int` or `Tuple[int, int]`, *optional*, defaults to `None`): | |
| Height and width of input/output sample. | |
| in_channels (`int`, *optional*, defaults to 4): Number of channels in the input sample. | |
| out_channels (`int`, *optional*, defaults to 4): Number of channels in the output. | |
| center_input_sample (`bool`, *optional*, defaults to `False`): Whether to center the input sample. | |
| flip_sin_to_cos (`bool`, *optional*, defaults to `True`): | |
| Whether to flip the sin to cos in the time embedding. | |
| freq_shift (`int`, *optional*, defaults to 0): The frequency shift to apply to the time embedding. | |
| down_block_types (`Tuple[str]`, *optional*, defaults to `("CrossAttnDownBlock2D", "CrossAttnDownBlock2D", "CrossAttnDownBlock2D", "DownBlock2D")`): | |
| The tuple of downsample blocks to use. | |
| mid_block_type (`str`, *optional*, defaults to `"UNetMidBlock2DCrossAttn"`): | |
| Block type for middle of UNet, it can be one of `UNetMidBlock2DCrossAttn`, `UNetMidBlock2D`, or | |
| `UNetMidBlock2DSimpleCrossAttn`. If `None`, the mid block layer is skipped. | |
| up_block_types (`Tuple[str]`, *optional*, defaults to `("UpBlock2D", "CrossAttnUpBlock2D", "CrossAttnUpBlock2D", "CrossAttnUpBlock2D")`): | |
| The tuple of upsample blocks to use. | |
| only_cross_attention(`bool` or `Tuple[bool]`, *optional*, default to `False`): | |
| Whether to include self-attention in the basic transformer blocks, see | |
| [`~models.attention.BasicTransformerBlock`]. | |
| block_out_channels (`Tuple[int]`, *optional*, defaults to `(320, 640, 1280, 1280)`): | |
| The tuple of output channels for each block. | |
| layers_per_block (`int`, *optional*, defaults to 2): The number of layers per block. | |
| downsample_padding (`int`, *optional*, defaults to 1): The padding to use for the downsampling convolution. | |
| mid_block_scale_factor (`float`, *optional*, defaults to 1.0): The scale factor to use for the mid block. | |
| dropout (`float`, *optional*, defaults to 0.0): The dropout probability to use. | |
| act_fn (`str`, *optional*, defaults to `"silu"`): The activation function to use. | |
| norm_num_groups (`int`, *optional*, defaults to 32): The number of groups to use for the normalization. | |
| If `None`, normalization and activation layers is skipped in post-processing. | |
| norm_eps (`float`, *optional*, defaults to 1e-5): The epsilon to use for the normalization. | |
| cross_attention_dim (`int` or `Tuple[int]`, *optional*, defaults to 1280): | |
| The dimension of the cross attention features. | |
| transformer_layers_per_block (`int`, `Tuple[int]`, or `Tuple[Tuple]` , *optional*, defaults to 1): | |
| The number of transformer blocks of type [`~models.attention.BasicTransformerBlock`]. Only relevant for | |
| [`~models.unet_2d_blocks.CrossAttnDownBlock2D`], [`~models.unet_2d_blocks.CrossAttnUpBlock2D`], | |
| [`~models.unet_2d_blocks.UNetMidBlock2DCrossAttn`]. | |
| reverse_transformer_layers_per_block : (`Tuple[Tuple]`, *optional*, defaults to None): | |
| The number of transformer blocks of type [`~models.attention.BasicTransformerBlock`], in the upsampling | |
| blocks of the U-Net. Only relevant if `transformer_layers_per_block` is of type `Tuple[Tuple]` and for | |
| [`~models.unet_2d_blocks.CrossAttnDownBlock2D`], [`~models.unet_2d_blocks.CrossAttnUpBlock2D`], | |
| [`~models.unet_2d_blocks.UNetMidBlock2DCrossAttn`]. | |
| encoder_hid_dim (`int`, *optional*, defaults to None): | |
| If `encoder_hid_dim_type` is defined, `encoder_hidden_states` will be projected from `encoder_hid_dim` | |
| dimension to `cross_attention_dim`. | |
| encoder_hid_dim_type (`str`, *optional*, defaults to `None`): | |
| If given, the `encoder_hidden_states` and potentially other embeddings are down-projected to text | |
| embeddings of dimension `cross_attention` according to `encoder_hid_dim_type`. | |
| attention_head_dim (`int`, *optional*, defaults to 8): The dimension of the attention heads. | |
| num_attention_heads (`int`, *optional*): | |
| The number of attention heads. If not defined, defaults to `attention_head_dim` | |
| resnet_time_scale_shift (`str`, *optional*, defaults to `"default"`): Time scale shift config | |
| for ResNet blocks (see [`~models.resnet.ResnetBlock2D`]). Choose from `default` or `scale_shift`. | |
| class_embed_type (`str`, *optional*, defaults to `None`): | |
| The type of class embedding to use which is ultimately summed with the time embeddings. Choose from `None`, | |
| `"timestep"`, `"identity"`, `"projection"`, or `"simple_projection"`. | |
| addition_embed_type (`str`, *optional*, defaults to `None`): | |
| Configures an optional embedding which will be summed with the time embeddings. Choose from `None` or | |
| "text". "text" will use the `TextTimeEmbedding` layer. | |
| addition_time_embed_dim: (`int`, *optional*, defaults to `None`): | |
| Dimension for the timestep embeddings. | |
| num_class_embeds (`int`, *optional*, defaults to `None`): | |
| Input dimension of the learnable embedding matrix to be projected to `time_embed_dim`, when performing | |
| class conditioning with `class_embed_type` equal to `None`. | |
| time_embedding_type (`str`, *optional*, defaults to `positional`): | |
| The type of position embedding to use for timesteps. Choose from `positional` or `fourier`. | |
| time_embedding_dim (`int`, *optional*, defaults to `None`): | |
| An optional override for the dimension of the projected time embedding. | |
| time_embedding_act_fn (`str`, *optional*, defaults to `None`): | |
| Optional activation function to use only once on the time embeddings before they are passed to the rest of | |
| the UNet. Choose from `silu`, `mish`, `gelu`, and `swish`. | |
| timestep_post_act (`str`, *optional*, defaults to `None`): | |
| The second activation function to use in timestep embedding. Choose from `silu`, `mish` and `gelu`. | |
| time_cond_proj_dim (`int`, *optional*, defaults to `None`): | |
| The dimension of `cond_proj` layer in the timestep embedding. | |
| conv_in_kernel (`int`, *optional*, default to `3`): The kernel size of `conv_in` layer. | |
| conv_out_kernel (`int`, *optional*, default to `3`): The kernel size of `conv_out` layer. | |
| projection_class_embeddings_input_dim (`int`, *optional*): The dimension of the `class_labels` input when | |
| `class_embed_type="projection"`. Required when `class_embed_type="projection"`. | |
| class_embeddings_concat (`bool`, *optional*, defaults to `False`): Whether to concatenate the time | |
| embeddings with the class embeddings. | |
| mid_block_only_cross_attention (`bool`, *optional*, defaults to `None`): | |
| Whether to use cross attention with the mid block when using the `UNetMidBlock2DSimpleCrossAttn`. If | |
| `only_cross_attention` is given as a single boolean and `mid_block_only_cross_attention` is `None`, the | |
| `only_cross_attention` value is used as the value for `mid_block_only_cross_attention`. Default to `False` | |
| otherwise. | |
| """ | |
| _supports_gradient_checkpointing = True | |
| def __init__( | |
| self, | |
| sample_size: Optional[int] = None, | |
| in_channels: int = 4, | |
| out_channels: int = 4, | |
| center_input_sample: bool = False, | |
| flip_sin_to_cos: bool = True, | |
| freq_shift: int = 0, | |
| down_block_types: Tuple[str] = ( | |
| "CrossAttnDownBlock2D", | |
| "CrossAttnDownBlock2D", | |
| "CrossAttnDownBlock2D", | |
| "DownBlock2D", | |
| ), | |
| mid_block_type: Optional[str] = "UNetMidBlock2DCrossAttn", | |
| up_block_types: Tuple[str] = ( | |
| "UpBlock2D", | |
| "CrossAttnUpBlock2D", | |
| "CrossAttnUpBlock2D", | |
| "CrossAttnUpBlock2D", | |
| ), | |
| only_cross_attention: Union[bool, Tuple[bool]] = False, | |
| block_out_channels: Tuple[int] = (320, 640, 1280, 1280), | |
| layers_per_block: Union[int, Tuple[int]] = 2, | |
| downsample_padding: int = 1, | |
| mid_block_scale_factor: float = 1, | |
| # dropout: float = 0.0, | |
| act_fn: str = "silu", | |
| norm_num_groups: Optional[int] = 32, | |
| norm_eps: float = 1e-5, | |
| cross_attention_dim: Union[int, Tuple[int]] = 1280, | |
| transformer_layers_per_block: Union[int, Tuple[int], Tuple[Tuple]] = 1, | |
| reverse_transformer_layers_per_block: Optional[Tuple[Tuple[int]]] = None, | |
| encoder_hid_dim: Optional[int] = None, | |
| encoder_hid_dim_type: Optional[str] = None, | |
| attention_head_dim: Union[int, Tuple[int]] = 8, | |
| num_attention_heads: Optional[Union[int, Tuple[int]]] = None, | |
| dual_cross_attention: bool = False, | |
| use_linear_projection: bool = False, | |
| class_embed_type: Optional[str] = None, | |
| addition_embed_type: Optional[str] = None, | |
| addition_time_embed_dim: Optional[int] = None, | |
| num_class_embeds: Optional[int] = None, | |
| upcast_attention: bool = False, | |
| resnet_time_scale_shift: str = "default", | |
| resnet_skip_time_act: bool = False, | |
| resnet_out_scale_factor: float = 1.0, | |
| time_embedding_type: str = "positional", | |
| time_embedding_dim: Optional[int] = None, | |
| time_embedding_act_fn: Optional[str] = None, | |
| timestep_post_act: Optional[str] = None, | |
| time_cond_proj_dim: Optional[int] = None, | |
| conv_in_kernel: int = 3, | |
| conv_out_kernel: int = 3, | |
| projection_class_embeddings_input_dim: Optional[int] = None, | |
| attention_type: str = "default", | |
| class_embeddings_concat: bool = False, | |
| mid_block_only_cross_attention: Optional[bool] = None, | |
| cross_attention_norm: Optional[str] = None, | |
| addition_embed_type_num_heads: int = 64, | |
| # adapt old version | |
| use_sc_attn: bool = False, | |
| use_st_attn: bool = False, | |
| st_attn_idx: int = None, | |
| ): | |
| super().__init__( | |
| sample_size=sample_size, | |
| in_channels=in_channels, | |
| out_channels=out_channels, | |
| center_input_sample=center_input_sample, | |
| flip_sin_to_cos=flip_sin_to_cos, | |
| freq_shift=freq_shift, | |
| down_block_types=down_block_types, | |
| mid_block_type=mid_block_type, | |
| up_block_types=up_block_types, | |
| only_cross_attention=only_cross_attention, | |
| block_out_channels=block_out_channels, | |
| layers_per_block=layers_per_block, | |
| downsample_padding=downsample_padding, | |
| mid_block_scale_factor=mid_block_scale_factor, | |
| # dropout=dropout, | |
| act_fn=act_fn, | |
| norm_num_groups=norm_num_groups, | |
| norm_eps=norm_eps, | |
| cross_attention_dim=cross_attention_dim, | |
| transformer_layers_per_block=transformer_layers_per_block, | |
| # reverse_transformer_layers_per_block=reverse_transformer_layers_per_block, | |
| encoder_hid_dim=encoder_hid_dim, | |
| encoder_hid_dim_type=encoder_hid_dim_type, | |
| attention_head_dim=attention_head_dim, | |
| num_attention_heads=num_attention_heads, | |
| dual_cross_attention=dual_cross_attention, | |
| use_linear_projection=use_linear_projection, | |
| class_embed_type=class_embed_type, | |
| addition_embed_type=addition_embed_type, | |
| addition_time_embed_dim=addition_time_embed_dim, | |
| num_class_embeds=num_class_embeds, | |
| upcast_attention=upcast_attention, | |
| resnet_time_scale_shift=resnet_time_scale_shift, | |
| resnet_skip_time_act=resnet_skip_time_act, | |
| resnet_out_scale_factor=resnet_out_scale_factor, | |
| time_embedding_type=time_embedding_type, | |
| time_embedding_dim=time_embedding_dim, | |
| time_embedding_act_fn=time_embedding_act_fn, | |
| timestep_post_act=timestep_post_act, | |
| time_cond_proj_dim=time_cond_proj_dim, | |
| conv_in_kernel=conv_in_kernel, | |
| conv_out_kernel=conv_out_kernel, | |
| projection_class_embeddings_input_dim=projection_class_embeddings_input_dim, | |
| # attention_type=attention_type, | |
| class_embeddings_concat=class_embeddings_concat, | |
| mid_block_only_cross_attention=mid_block_only_cross_attention, | |
| cross_attention_norm=cross_attention_norm, | |
| addition_embed_type_num_heads=addition_embed_type_num_heads, | |
| ) | |
| self.in_channels = in_channels | |
| self.sample_size = sample_size | |
| # input | |
| self.conv_in = InflatedConv3d( | |
| in_channels, block_out_channels[0], kernel_size=3, padding=(1, 1) | |
| ) | |
| # time | |
| time_embed_dim, timestep_input_dim = self._set_time_proj( | |
| time_embedding_type, | |
| block_out_channels=block_out_channels, | |
| flip_sin_to_cos=flip_sin_to_cos, | |
| freq_shift=freq_shift, | |
| time_embedding_dim=time_embedding_dim, | |
| ) | |
| self.time_embedding = TimestepEmbedding( | |
| timestep_input_dim, | |
| time_embed_dim, | |
| act_fn=act_fn, | |
| post_act_fn=timestep_post_act, | |
| cond_proj_dim=time_cond_proj_dim, | |
| ) | |
| self._set_encoder_hid_proj( | |
| encoder_hid_dim_type, | |
| cross_attention_dim=cross_attention_dim, | |
| encoder_hid_dim=encoder_hid_dim, | |
| ) | |
| # class embedding | |
| self._set_class_embedding( | |
| class_embed_type, | |
| act_fn=act_fn, | |
| num_class_embeds=num_class_embeds, | |
| projection_class_embeddings_input_dim=projection_class_embeddings_input_dim, | |
| time_embed_dim=time_embed_dim, | |
| timestep_input_dim=timestep_input_dim, | |
| ) | |
| self._set_add_embedding( | |
| addition_embed_type, | |
| addition_embed_type_num_heads=addition_embed_type_num_heads, | |
| addition_time_embed_dim=addition_time_embed_dim, | |
| cross_attention_dim=cross_attention_dim, | |
| encoder_hid_dim=encoder_hid_dim, | |
| flip_sin_to_cos=flip_sin_to_cos, | |
| freq_shift=freq_shift, | |
| projection_class_embeddings_input_dim=projection_class_embeddings_input_dim, | |
| time_embed_dim=time_embed_dim, | |
| ) | |
| if time_embedding_act_fn is None: | |
| self.time_embed_act = None | |
| else: | |
| self.time_embed_act = get_activation(time_embedding_act_fn) | |
| self.down_blocks = nn.ModuleList([]) | |
| self.mid_block = None | |
| self.up_blocks = nn.ModuleList([]) | |
| if isinstance(only_cross_attention, bool): | |
| only_cross_attention = [only_cross_attention] * len(down_block_types) | |
| if isinstance(attention_head_dim, int): | |
| attention_head_dim = (attention_head_dim,) * len(down_block_types) | |
| if isinstance(cross_attention_dim, int): | |
| cross_attention_dim = (cross_attention_dim,) * len(down_block_types) | |
| if class_embeddings_concat: | |
| # The time embeddings are concatenated with the class embeddings. The dimension of the | |
| # time embeddings passed to the down, middle, and up blocks is twice the dimension of the | |
| # regular time embeddings | |
| blocks_time_embed_dim = time_embed_dim * 2 | |
| else: | |
| blocks_time_embed_dim = time_embed_dim | |
| # down | |
| output_channel = block_out_channels[0] | |
| for i, down_block_type in enumerate(down_block_types): | |
| input_channel = output_channel | |
| output_channel = block_out_channels[i] | |
| is_final_block = i == len(block_out_channels) - 1 | |
| down_block = get_down_block( | |
| down_block_type, | |
| num_layers=layers_per_block, | |
| in_channels=input_channel, | |
| out_channels=output_channel, | |
| temb_channels=blocks_time_embed_dim, | |
| add_downsample=not is_final_block, | |
| resnet_eps=norm_eps, | |
| resnet_act_fn=act_fn, | |
| resnet_groups=norm_num_groups, | |
| cross_attention_dim=cross_attention_dim[i], | |
| attn_num_head_channels=attention_head_dim[i], | |
| downsample_padding=downsample_padding, | |
| dual_cross_attention=dual_cross_attention, | |
| use_linear_projection=use_linear_projection, | |
| only_cross_attention=only_cross_attention[i], | |
| upcast_attention=upcast_attention, | |
| resnet_time_scale_shift=resnet_time_scale_shift, | |
| use_sc_attn=use_sc_attn, | |
| # idx range from 0 to 2, i.e., ['CrossAttnDownBlock2D', 'CrossAttnDownBlock2D', 'CrossAttnDownBlock2D', 'DownBlock2D'] | |
| use_st_attn=True if (use_st_attn and i == st_attn_idx) else False, | |
| layer_id=i, | |
| ) | |
| down_block.resolution_idx = i | |
| self.down_blocks.append(down_block) | |
| # mid | |
| if mid_block_type == "UNetMidBlock2DCrossAttn": | |
| self.mid_block = UNetMidBlock2DCrossAttn( | |
| in_channels=block_out_channels[-1], | |
| temb_channels=blocks_time_embed_dim, | |
| resnet_eps=norm_eps, | |
| resnet_act_fn=act_fn, | |
| output_scale_factor=mid_block_scale_factor, | |
| resnet_time_scale_shift=resnet_time_scale_shift, | |
| cross_attention_dim=cross_attention_dim[-1], | |
| attn_num_head_channels=attention_head_dim[-1], | |
| resnet_groups=norm_num_groups, | |
| dual_cross_attention=dual_cross_attention, | |
| use_linear_projection=use_linear_projection, | |
| upcast_attention=upcast_attention, | |
| use_sc_attn=use_sc_attn, | |
| use_st_attn=use_st_attn, | |
| ) | |
| else: | |
| raise ValueError(f"unknown mid_block_type : {mid_block_type}") | |
| # count how many layers upsample the videos | |
| self.num_upsamplers = 0 | |
| # up | |
| reversed_block_out_channels = list(reversed(block_out_channels)) | |
| reversed_attention_head_dim = list(reversed(attention_head_dim)) | |
| reversed_cross_attention_dim = list(reversed(cross_attention_dim)) | |
| only_cross_attention = list(reversed(only_cross_attention)) | |
| output_channel = reversed_block_out_channels[0] | |
| for i, up_block_type in enumerate(up_block_types): | |
| is_final_block = i == len(block_out_channels) - 1 | |
| prev_output_channel = output_channel | |
| output_channel = reversed_block_out_channels[i] | |
| input_channel = reversed_block_out_channels[ | |
| min(i + 1, len(block_out_channels) - 1) | |
| ] | |
| # add upsample block for all BUT final layer | |
| if not is_final_block: | |
| add_upsample = True | |
| self.num_upsamplers += 1 | |
| else: | |
| add_upsample = False | |
| up_block = get_up_block( | |
| up_block_type, | |
| num_layers=layers_per_block + 1, | |
| in_channels=input_channel, | |
| out_channels=output_channel, | |
| prev_output_channel=prev_output_channel, | |
| temb_channels=blocks_time_embed_dim, | |
| add_upsample=add_upsample, | |
| resnet_eps=norm_eps, | |
| resnet_act_fn=act_fn, | |
| resnet_groups=norm_num_groups, | |
| cross_attention_dim=reversed_cross_attention_dim[i], | |
| attn_num_head_channels=reversed_attention_head_dim[i], | |
| dual_cross_attention=dual_cross_attention, | |
| use_linear_projection=use_linear_projection, | |
| only_cross_attention=only_cross_attention[i], | |
| upcast_attention=upcast_attention, | |
| resnet_time_scale_shift=resnet_time_scale_shift, | |
| use_sc_attn=use_sc_attn, | |
| # idx range from 0 to 2, i.e., ['UpBlock2D', 'CrossAttnUpBlock2D', 'CrossAttnUpBlock2D', 'CrossAttnUpBlock2D'] | |
| use_st_attn=True if (use_st_attn and i - 1 == st_attn_idx) else False, | |
| layer_id=i, | |
| ) | |
| up_block.resolution_idx = i | |
| self.up_blocks.append(up_block) | |
| prev_output_channel = output_channel | |
| # out | |
| self.conv_norm_out = nn.GroupNorm( | |
| num_channels=block_out_channels[0], num_groups=norm_num_groups, eps=norm_eps | |
| ) | |
| self.conv_act = nn.SiLU() | |
| self.conv_out = InflatedConv3d( | |
| block_out_channels[0], out_channels, kernel_size=3, padding=1 | |
| ) | |
| def set_attention_slice(self, slice_size): | |
| r""" | |
| Enable sliced attention computation. | |
| When this option is enabled, the attention module will split the input tensor in slices, to compute attention | |
| in several steps. This is useful to save some memory in exchange for a small speed decrease. | |
| Args: | |
| slice_size (`str` or `int` or `list(int)`, *optional*, defaults to `"auto"`): | |
| When `"auto"`, halves the input to the attention heads, so attention will be computed in two steps. If | |
| `"max"`, maxium amount of memory will be saved by running only one slice at a time. If a number is | |
| provided, uses as many slices as `attention_head_dim // slice_size`. In this case, `attention_head_dim` | |
| must be a multiple of `slice_size`. | |
| """ | |
| sliceable_head_dims = [] | |
| def fn_recursive_retrieve_slicable_dims(module: torch.nn.Module): | |
| if hasattr(module, "set_attention_slice"): | |
| sliceable_head_dims.append(module.sliceable_head_dim) | |
| for child in module.children(): | |
| fn_recursive_retrieve_slicable_dims(child) | |
| # retrieve number of attention layers | |
| for module in self.children(): | |
| fn_recursive_retrieve_slicable_dims(module) | |
| num_slicable_layers = len(sliceable_head_dims) | |
| if slice_size == "auto": | |
| # half the attention head size is usually a good trade-off between | |
| # speed and memory | |
| slice_size = [dim // 2 for dim in sliceable_head_dims] | |
| elif slice_size == "max": | |
| # make smallest slice possible | |
| slice_size = num_slicable_layers * [1] | |
| slice_size = ( | |
| num_slicable_layers * [slice_size] | |
| if not isinstance(slice_size, list) | |
| else slice_size | |
| ) | |
| if len(slice_size) != len(sliceable_head_dims): | |
| raise ValueError( | |
| f"You have provided {len(slice_size)}, but {self.config} has {len(sliceable_head_dims)} different" | |
| f" attention layers. Make sure to match `len(slice_size)` to be {len(sliceable_head_dims)}." | |
| ) | |
| for i in range(len(slice_size)): | |
| size = slice_size[i] | |
| dim = sliceable_head_dims[i] | |
| if size is not None and size > dim: | |
| raise ValueError(f"size {size} has to be smaller or equal to {dim}.") | |
| # Recursively walk through all the children. | |
| # Any children which exposes the set_attention_slice method | |
| # gets the message | |
| def fn_recursive_set_attention_slice( | |
| module: torch.nn.Module, slice_size: List[int] | |
| ): | |
| if hasattr(module, "set_attention_slice"): | |
| module.set_attention_slice(slice_size.pop()) | |
| for child in module.children(): | |
| fn_recursive_set_attention_slice(child, slice_size) | |
| reversed_slice_size = list(reversed(slice_size)) | |
| for module in self.children(): | |
| fn_recursive_set_attention_slice(module, reversed_slice_size) | |
| def _set_gradient_checkpointing(self, module, value=False): | |
| if isinstance( | |
| module, (CrossAttnDownBlock2D, DownBlock2D, CrossAttnUpBlock2D, UpBlock2D) | |
| ): | |
| module.gradient_checkpointing = value | |
| def _set_time_proj( | |
| self, | |
| time_embedding_type: str, | |
| block_out_channels: int, | |
| flip_sin_to_cos: bool, | |
| freq_shift: float, | |
| time_embedding_dim: int, | |
| ) -> Tuple[int, int]: | |
| if time_embedding_type == "fourier": | |
| time_embed_dim = time_embedding_dim or block_out_channels[0] * 2 | |
| if time_embed_dim % 2 != 0: | |
| raise ValueError( | |
| f"`time_embed_dim` should be divisible by 2, but is {time_embed_dim}." | |
| ) | |
| self.time_proj = GaussianFourierProjection( | |
| time_embed_dim // 2, | |
| set_W_to_weight=False, | |
| log=False, | |
| flip_sin_to_cos=flip_sin_to_cos, | |
| ) | |
| timestep_input_dim = time_embed_dim | |
| elif time_embedding_type == "positional": | |
| time_embed_dim = time_embedding_dim or block_out_channels[0] * 4 | |
| self.time_proj = Timesteps( | |
| block_out_channels[0], flip_sin_to_cos, freq_shift | |
| ) | |
| timestep_input_dim = block_out_channels[0] | |
| else: | |
| raise ValueError( | |
| f"{time_embedding_type} does not exist. Please make sure to use one of `fourier` or `positional`." | |
| ) | |
| return time_embed_dim, timestep_input_dim | |
| def _set_encoder_hid_proj( | |
| self, | |
| encoder_hid_dim_type: Optional[str], | |
| cross_attention_dim: Union[int, Tuple[int]], | |
| encoder_hid_dim: Optional[int], | |
| ): | |
| if encoder_hid_dim_type is None and encoder_hid_dim is not None: | |
| encoder_hid_dim_type = "text_proj" | |
| self.register_to_config(encoder_hid_dim_type=encoder_hid_dim_type) | |
| logger.info( | |
| "encoder_hid_dim_type defaults to 'text_proj' as `encoder_hid_dim` is defined." | |
| ) | |
| if encoder_hid_dim is None and encoder_hid_dim_type is not None: | |
| raise ValueError( | |
| f"`encoder_hid_dim` has to be defined when `encoder_hid_dim_type` is set to {encoder_hid_dim_type}." | |
| ) | |
| if encoder_hid_dim_type == "text_proj": | |
| self.encoder_hid_proj = nn.Linear(encoder_hid_dim, cross_attention_dim) | |
| elif encoder_hid_dim_type == "text_image_proj": | |
| # image_embed_dim DOESN'T have to be `cross_attention_dim`. To not clutter the __init__ too much | |
| # they are set to `cross_attention_dim` here as this is exactly the required dimension for the currently only use | |
| # case when `addition_embed_type == "text_image_proj"` (Kandinsky 2.1)` | |
| self.encoder_hid_proj = TextImageProjection( | |
| text_embed_dim=encoder_hid_dim, | |
| image_embed_dim=cross_attention_dim, | |
| cross_attention_dim=cross_attention_dim, | |
| ) | |
| elif encoder_hid_dim_type == "image_proj": | |
| # Kandinsky 2.2 | |
| self.encoder_hid_proj = ImageProjection( | |
| image_embed_dim=encoder_hid_dim, | |
| cross_attention_dim=cross_attention_dim, | |
| ) | |
| elif encoder_hid_dim_type is not None: | |
| raise ValueError( | |
| f"encoder_hid_dim_type: {encoder_hid_dim_type} must be None, 'text_proj' or 'text_image_proj'." | |
| ) | |
| else: | |
| self.encoder_hid_proj = None | |
| def _set_class_embedding( | |
| self, | |
| class_embed_type: Optional[str], | |
| act_fn: str, | |
| num_class_embeds: Optional[int], | |
| projection_class_embeddings_input_dim: Optional[int], | |
| time_embed_dim: int, | |
| timestep_input_dim: int, | |
| ): | |
| if class_embed_type is None and num_class_embeds is not None: | |
| self.class_embedding = nn.Embedding(num_class_embeds, time_embed_dim) | |
| elif class_embed_type == "timestep": | |
| self.class_embedding = TimestepEmbedding( | |
| timestep_input_dim, time_embed_dim, act_fn=act_fn | |
| ) | |
| elif class_embed_type == "identity": | |
| self.class_embedding = nn.Identity(time_embed_dim, time_embed_dim) | |
| elif class_embed_type == "projection": | |
| if projection_class_embeddings_input_dim is None: | |
| raise ValueError( | |
| "`class_embed_type`: 'projection' requires `projection_class_embeddings_input_dim` be set" | |
| ) | |
| # The projection `class_embed_type` is the same as the timestep `class_embed_type` except | |
| # 1. the `class_labels` inputs are not first converted to sinusoidal embeddings | |
| # 2. it projects from an arbitrary input dimension. | |
| # | |
| # Note that `TimestepEmbedding` is quite general, being mainly linear layers and activations. | |
| # When used for embedding actual timesteps, the timesteps are first converted to sinusoidal embeddings. | |
| # As a result, `TimestepEmbedding` can be passed arbitrary vectors. | |
| self.class_embedding = TimestepEmbedding( | |
| projection_class_embeddings_input_dim, time_embed_dim | |
| ) | |
| elif class_embed_type == "simple_projection": | |
| if projection_class_embeddings_input_dim is None: | |
| raise ValueError( | |
| "`class_embed_type`: 'simple_projection' requires `projection_class_embeddings_input_dim` be set" | |
| ) | |
| self.class_embedding = nn.Linear( | |
| projection_class_embeddings_input_dim, time_embed_dim | |
| ) | |
| else: | |
| self.class_embedding = None | |
| def _set_add_embedding( | |
| self, | |
| addition_embed_type: str, | |
| addition_embed_type_num_heads: int, | |
| addition_time_embed_dim: Optional[int], | |
| flip_sin_to_cos: bool, | |
| freq_shift: float, | |
| cross_attention_dim: Optional[int], | |
| encoder_hid_dim: Optional[int], | |
| projection_class_embeddings_input_dim: Optional[int], | |
| time_embed_dim: int, | |
| ): | |
| if addition_embed_type == "text": | |
| if encoder_hid_dim is not None: | |
| text_time_embedding_from_dim = encoder_hid_dim | |
| else: | |
| text_time_embedding_from_dim = cross_attention_dim | |
| self.add_embedding = TextTimeEmbedding( | |
| text_time_embedding_from_dim, | |
| time_embed_dim, | |
| num_heads=addition_embed_type_num_heads, | |
| ) | |
| elif addition_embed_type == "text_image": | |
| # text_embed_dim and image_embed_dim DON'T have to be `cross_attention_dim`. To not clutter the __init__ too much | |
| # they are set to `cross_attention_dim` here as this is exactly the required dimension for the currently only use | |
| # case when `addition_embed_type == "text_image"` (Kandinsky 2.1)` | |
| self.add_embedding = TextImageTimeEmbedding( | |
| text_embed_dim=cross_attention_dim, | |
| image_embed_dim=cross_attention_dim, | |
| time_embed_dim=time_embed_dim, | |
| ) | |
| elif addition_embed_type == "text_time": | |
| self.add_time_proj = Timesteps( | |
| addition_time_embed_dim, flip_sin_to_cos, freq_shift | |
| ) | |
| self.add_embedding = TimestepEmbedding( | |
| projection_class_embeddings_input_dim, time_embed_dim | |
| ) | |
| elif addition_embed_type == "image": | |
| # Kandinsky 2.2 | |
| self.add_embedding = ImageTimeEmbedding( | |
| image_embed_dim=encoder_hid_dim, time_embed_dim=time_embed_dim | |
| ) | |
| elif addition_embed_type == "image_hint": | |
| # Kandinsky 2.2 ControlNet | |
| self.add_embedding = ImageHintTimeEmbedding( | |
| image_embed_dim=encoder_hid_dim, time_embed_dim=time_embed_dim | |
| ) | |
| elif addition_embed_type is not None: | |
| raise ValueError( | |
| f"addition_embed_type: {addition_embed_type} must be None, 'text' or 'text_image'." | |
| ) | |
| def forward( | |
| self, | |
| sample: torch.FloatTensor, | |
| timestep: Union[torch.Tensor, float, int], | |
| encoder_hidden_states: torch.Tensor, | |
| class_labels: Optional[torch.Tensor] = None, | |
| attention_mask: Optional[torch.Tensor] = None, | |
| encoder_attention_mask: Optional[torch.Tensor] = None, | |
| return_dict: bool = True, | |
| iter_cur=0, | |
| save_kv=True, | |
| mode="drag", | |
| mask=None, | |
| ) -> Union[UNet2DConditionOutput, Tuple]: | |
| r""" | |
| Args: | |
| sample (`torch.FloatTensor`): (batch, channel, height, width) noisy inputs tensor | |
| timestep (`torch.FloatTensor` or `float` or `int`): (batch) timesteps | |
| encoder_hidden_states (`torch.FloatTensor`): (batch, sequence_length, feature_dim) encoder hidden states | |
| return_dict (`bool`, *optional*, defaults to `True`): | |
| Whether or not to return a [`models.unet_2d_condition.UNet2DConditionOutput`] instead of a plain tuple. | |
| Returns: | |
| [`~models.unet_2d_condition.UNet2DConditionOutput`] or `tuple`: | |
| [`~models.unet_2d_condition.UNet2DConditionOutput`] if `return_dict` is True, otherwise a `tuple`. When | |
| returning a tuple, the first element is the sample tensor. | |
| """ | |
| # By default samples have to be AT least a multiple of the overall upsampling factor. | |
| # The overall upsampling factor is equal to 2 ** (# num of upsampling layears). | |
| # However, the upsampling interpolation output size can be forced to fit any upsampling size | |
| # on the fly if necessary. | |
| default_overall_up_factor = 2**self.num_upsamplers | |
| # upsample size should be forwarded when sample is not a multiple of `default_overall_up_factor` | |
| forward_upsample_size = False | |
| upsample_size = None | |
| if any(s % default_overall_up_factor != 0 for s in sample.shape[-2:]): | |
| logger.info("Forward upsample size to force interpolation output size.") | |
| forward_upsample_size = True | |
| # prepare attention_mask | |
| if attention_mask is not None: | |
| attention_mask = (1 - attention_mask.to(sample.dtype)) * -10000.0 | |
| attention_mask = attention_mask.unsqueeze(1) | |
| # convert encoder_attention_mask to a bias the same way we do for attention_mask | |
| if encoder_attention_mask is not None: | |
| encoder_attention_mask = (1 - encoder_attention_mask.to(sample.dtype)) * -10000.0 | |
| encoder_attention_mask = encoder_attention_mask.unsqueeze(1) | |
| # center input if necessary | |
| if self.config.center_input_sample: | |
| sample = 2 * sample - 1.0 | |
| # time | |
| timesteps = timestep | |
| if not torch.is_tensor(timesteps): | |
| # TODO: this requires sync between CPU and GPU. So try to pass timesteps as tensors if you can | |
| # This would be a good case for the `match` statement (Python 3.10+) | |
| is_mps = sample.device.type == "mps" | |
| if isinstance(timestep, float): | |
| dtype = torch.float32 if is_mps else torch.float64 | |
| else: | |
| dtype = torch.int32 if is_mps else torch.int64 | |
| timesteps = torch.tensor([timesteps], dtype=dtype, device=sample.device) | |
| elif len(timesteps.shape) == 0: | |
| timesteps = timesteps[None].to(sample.device) | |
| # broadcast to batch dimension in a way that's compatible with ONNX/Core ML | |
| timesteps = timesteps.expand(sample.shape[0]) | |
| t_emb = self.time_proj(timesteps) | |
| # timesteps does not contain any weights and will always return f32 tensors | |
| # but time_embedding might actually be running in fp16. so we need to cast here. | |
| # there might be better ways to encapsulate this. | |
| t_emb = t_emb.to(dtype=self.dtype) | |
| emb = self.time_embedding(t_emb) | |
| if self.class_embedding is not None: | |
| if class_labels is None: | |
| raise ValueError( | |
| "class_labels should be provided when num_class_embeds > 0" | |
| ) | |
| if self.config.class_embed_type == "timestep": | |
| class_labels = self.time_proj(class_labels) | |
| class_emb = self.class_embedding(class_labels).to(dtype=self.dtype) | |
| if self.config.class_embeddings_concat: | |
| emb = torch.cat([emb, class_emb], dim=-1) | |
| else: | |
| emb = emb + class_emb | |
| # # pre-process | |
| sample = self.conv_in(sample) # sample shape: 4,4,8,64,64 | |
| # down | |
| down_block_res_samples = (sample,) | |
| for downsample_block in self.down_blocks: | |
| if ( | |
| hasattr(downsample_block, "has_cross_attention") | |
| and downsample_block.has_cross_attention | |
| ): | |
| sample, res_samples = downsample_block( | |
| hidden_states=sample, | |
| temb=emb, | |
| encoder_hidden_states=encoder_hidden_states, | |
| attention_mask=attention_mask, | |
| encoder_attention_mask=encoder_attention_mask, | |
| iter_cur=iter_cur, | |
| save_kv=save_kv, | |
| mode=mode, | |
| mask=mask, | |
| ) | |
| else: | |
| sample, res_samples = downsample_block(hidden_states=sample, temb=emb) | |
| down_block_res_samples += res_samples | |
| # mid | |
| sample = self.mid_block( | |
| sample, | |
| emb, | |
| encoder_hidden_states=encoder_hidden_states, | |
| attention_mask=attention_mask, | |
| encoder_attention_mask=encoder_attention_mask, | |
| iter_cur=iter_cur, | |
| save_kv=save_kv, | |
| mode=mode, | |
| mask=mask, | |
| ) | |
| # up | |
| for i, upsample_block in enumerate(self.up_blocks): | |
| is_final_block = i == len(self.up_blocks) - 1 | |
| res_samples = down_block_res_samples[-len(upsample_block.resnets) :] | |
| down_block_res_samples = down_block_res_samples[ | |
| : -len(upsample_block.resnets) | |
| ] | |
| # if we have not reached the final block and need to forward the | |
| # upsample size, we do it here | |
| if not is_final_block and forward_upsample_size: | |
| upsample_size = down_block_res_samples[-1].shape[2:] | |
| if ( | |
| hasattr(upsample_block, "has_cross_attention") | |
| and upsample_block.has_cross_attention | |
| ): | |
| sample = upsample_block( | |
| hidden_states=sample, | |
| temb=emb, | |
| res_hidden_states_tuple=res_samples, | |
| encoder_hidden_states=encoder_hidden_states, | |
| upsample_size=upsample_size, | |
| attention_mask=attention_mask, | |
| encoder_attention_mask=encoder_attention_mask, | |
| iter_cur=iter_cur, | |
| save_kv=save_kv, | |
| mode=mode, | |
| mask=mask, | |
| ) | |
| else: | |
| sample = upsample_block( | |
| hidden_states=sample, | |
| temb=emb, | |
| res_hidden_states_tuple=res_samples, | |
| upsample_size=upsample_size, | |
| ) | |
| # post-process | |
| sample = self.conv_norm_out(sample) | |
| sample = self.conv_act(sample) | |
| sample = self.conv_out(sample) | |
| if not return_dict: | |
| return (sample,) | |
| return UNet2DConditionOutput(sample=sample) | |
| def from_pretrained( | |
| cls, pretrained_model_name_or_path: Optional[Union[str, os.PathLike]], **kwargs | |
| ): | |
| r""" | |
| for gradio demo | |
| """ | |
| import diffusers | |
| __version__ = diffusers.__version__ | |
| from diffusers.utils import ( | |
| CONFIG_NAME, | |
| DIFFUSERS_CACHE, | |
| HUGGINGFACE_CO_RESOLVE_ENDPOINT, | |
| SAFETENSORS_WEIGHTS_NAME, | |
| WEIGHTS_NAME, | |
| is_accelerate_available, | |
| is_safetensors_available, | |
| is_torch_version, | |
| logging, | |
| ) | |
| if is_torch_version(">=", "1.9.0"): | |
| _LOW_CPU_MEM_USAGE_DEFAULT = True | |
| else: | |
| _LOW_CPU_MEM_USAGE_DEFAULT = False | |
| if is_accelerate_available(): | |
| import accelerate | |
| from accelerate.utils import set_module_tensor_to_device | |
| from accelerate.utils.versions import is_torch_version | |
| if is_safetensors_available(): | |
| import safetensors | |
| from diffusers.models.modeling_utils import load_state_dict | |
| cache_dir = kwargs.pop("cache_dir", DIFFUSERS_CACHE) | |
| ignore_mismatched_sizes = kwargs.pop("ignore_mismatched_sizes", False) | |
| force_download = kwargs.pop("force_download", False) | |
| resume_download = kwargs.pop("resume_download", False) | |
| proxies = kwargs.pop("proxies", None) | |
| output_loading_info = kwargs.pop("output_loading_info", False) | |
| local_files_only = kwargs.pop("local_files_only", False) | |
| use_auth_token = kwargs.pop("use_auth_token", None) | |
| revision = kwargs.pop("revision", None) | |
| torch_dtype = kwargs.pop("torch_dtype", None) | |
| subfolder = kwargs.pop("subfolder", None) | |
| device_map = kwargs.pop("device_map", None) | |
| low_cpu_mem_usage = kwargs.pop("low_cpu_mem_usage", _LOW_CPU_MEM_USAGE_DEFAULT) | |
| # custom arg | |
| use_sc_attn = kwargs.pop("use_sc_attn", True) | |
| use_st_attn = kwargs.pop("use_st_attn", False) | |
| st_attn_idx = kwargs.pop("st_attn_idx", 0) | |
| if low_cpu_mem_usage and not is_accelerate_available(): | |
| low_cpu_mem_usage = False | |
| logger.warning( | |
| "Cannot initialize model with low cpu memory usage because `accelerate` was not found in the" | |
| " environment. Defaulting to `low_cpu_mem_usage=False`. It is strongly recommended to install" | |
| " `accelerate` for faster and less memory-intense model loading. You can do so with: \n```\npip" | |
| " install accelerate\n```\n." | |
| ) | |
| if device_map is not None and not is_accelerate_available(): | |
| raise NotImplementedError( | |
| "Loading and dispatching requires `accelerate`. Please make sure to install accelerate or set" | |
| " `device_map=None`. You can install accelerate with `pip install accelerate`." | |
| ) | |
| # Check if we can handle device_map and dispatching the weights | |
| if device_map is not None and not is_torch_version(">=", "1.9.0"): | |
| raise NotImplementedError( | |
| "Loading and dispatching requires torch >= 1.9.0. Please either update your PyTorch version or set" | |
| " `device_map=None`." | |
| ) | |
| if low_cpu_mem_usage is True and not is_torch_version(">=", "1.9.0"): | |
| raise NotImplementedError( | |
| "Low memory initialization requires torch >= 1.9.0. Please either update your PyTorch version or set" | |
| " `low_cpu_mem_usage=False`." | |
| ) | |
| if low_cpu_mem_usage is False and device_map is not None: | |
| raise ValueError( | |
| f"You cannot set `low_cpu_mem_usage` to `False` while using device_map={device_map} for loading and" | |
| " dispatching. Please make sure to set `low_cpu_mem_usage=True`." | |
| ) | |
| user_agent = { | |
| "diffusers": __version__, | |
| "file_type": "model", | |
| "framework": "pytorch", | |
| } | |
| # Load config if we don't provide a configuration | |
| config_path = pretrained_model_name_or_path | |
| # This variable will flag if we're loading a sharded checkpoint. In this case the archive file is just the | |
| # Load model | |
| model_file = None | |
| if is_safetensors_available(): | |
| try: | |
| model_file = _get_model_file( | |
| pretrained_model_name_or_path, | |
| weights_name=SAFETENSORS_WEIGHTS_NAME, | |
| cache_dir=cache_dir, | |
| force_download=force_download, | |
| resume_download=resume_download, | |
| proxies=proxies, | |
| local_files_only=local_files_only, | |
| use_auth_token=use_auth_token, | |
| revision=revision, | |
| subfolder=subfolder, | |
| user_agent=user_agent, | |
| ) | |
| except: | |
| pass | |
| if model_file is None: | |
| model_file = _get_model_file( | |
| pretrained_model_name_or_path, | |
| weights_name=WEIGHTS_NAME, | |
| cache_dir=cache_dir, | |
| force_download=force_download, | |
| resume_download=resume_download, | |
| proxies=proxies, | |
| local_files_only=local_files_only, | |
| use_auth_token=use_auth_token, | |
| revision=revision, | |
| subfolder=subfolder, | |
| user_agent=user_agent, | |
| ) | |
| if low_cpu_mem_usage: | |
| # Instantiate model with empty weights | |
| with accelerate.init_empty_weights(): | |
| config, unused_kwargs = cls.load_config( | |
| config_path, | |
| cache_dir=cache_dir, | |
| return_unused_kwargs=True, | |
| force_download=force_download, | |
| resume_download=resume_download, | |
| proxies=proxies, | |
| local_files_only=local_files_only, | |
| use_auth_token=use_auth_token, | |
| revision=revision, | |
| subfolder=subfolder, | |
| device_map=device_map, | |
| **kwargs, | |
| ) | |
| # custom arg | |
| config["use_sc_attn"] = use_sc_attn | |
| config["use_st_attn"] = use_st_attn | |
| config["st_attn_idx"] = st_attn_idx | |
| model = cls.from_config(config, **unused_kwargs) | |
| # if device_map is None, load the state dict on move the params from meta device to the cpu | |
| if device_map is None: | |
| param_device = "cpu" | |
| state_dict = load_state_dict(model_file) | |
| # move the parms from meta device to cpu | |
| for param_name, param in state_dict.items(): | |
| # import ipdb; ipdb.set_trace() | |
| set_module_tensor_to_device( | |
| model, param_name, param_device, value=param | |
| ) | |
| else: # else let accelerate handle loading and dispatching. | |
| # Load weights and dispatch according to the device_map | |
| # by deafult the device_map is None and the weights are loaded on the CPU | |
| accelerate.load_checkpoint_and_dispatch(model, model_file, device_map) | |
| loading_info = { | |
| "missing_keys": [], | |
| "unexpected_keys": [], | |
| "mismatched_keys": [], | |
| "error_msgs": [], | |
| } | |
| else: | |
| config, unused_kwargs = cls.load_config( | |
| config_path, | |
| cache_dir=cache_dir, | |
| return_unused_kwargs=True, | |
| force_download=force_download, | |
| resume_download=resume_download, | |
| proxies=proxies, | |
| local_files_only=local_files_only, | |
| use_auth_token=use_auth_token, | |
| revision=revision, | |
| subfolder=subfolder, | |
| device_map=device_map, | |
| **kwargs, | |
| ) | |
| # custom arg | |
| config["use_sc_attn"] = use_sc_attn | |
| config["use_st_attn"] = use_st_attn | |
| config["st_attn_idx"] = st_attn_idx | |
| model = cls.from_config(config, **unused_kwargs) | |
| state_dict = load_state_dict(model_file) | |
| dtype = set(v.dtype for v in state_dict.values()) | |
| if len(dtype) > 1 and torch.float32 not in dtype: | |
| raise ValueError( | |
| f"The weights of the model file {model_file} have a mixture of incompatible dtypes {dtype}. Please" | |
| f" make sure that {model_file} weights have only one dtype." | |
| ) | |
| elif len(dtype) > 1 and torch.float32 in dtype: | |
| dtype = torch.float32 | |
| else: | |
| dtype = dtype.pop() | |
| # move model to correct dtype | |
| model = model.to(dtype) | |
| model, missing_keys, unexpected_keys, mismatched_keys, error_msgs = ( | |
| cls._load_pretrained_model( | |
| model, | |
| state_dict, | |
| model_file, | |
| pretrained_model_name_or_path, | |
| ignore_mismatched_sizes=ignore_mismatched_sizes, | |
| ) | |
| ) | |
| loading_info = { | |
| "missing_keys": missing_keys, | |
| "unexpected_keys": unexpected_keys, | |
| "mismatched_keys": mismatched_keys, | |
| "error_msgs": error_msgs, | |
| } | |
| if torch_dtype is not None and not isinstance(torch_dtype, torch.dtype): | |
| raise ValueError( | |
| f"{torch_dtype} needs to be of type `torch.dtype`, e.g. `torch.float16`, but is {type(torch_dtype)}." | |
| ) | |
| elif torch_dtype is not None: | |
| model = model.to(torch_dtype) | |
| model.register_to_config(_name_or_path=pretrained_model_name_or_path) | |
| # Set model in evaluation mode to deactivate DropOut modules by default | |
| model.eval() | |
| if output_loading_info: | |
| return model, loading_info | |
| return model | |