import torch from torch import nn class ConvNeXtBlock(nn.Module): """ConvNeXt Block adapted from https://github.com/facebookresearch/ConvNeXt to 1D audio signal. Args: dim (int): Number of input channels. intermediate_dim (int): Dimensionality of the intermediate layer. layer_scale_init_value (float, optional): Initial value for the layer scale. None means no scaling. Defaults to None. adanorm_num_embeddings (int, optional): Number of embeddings for AdaLayerNorm. None means non-conditional LayerNorm. Defaults to None. """ def __init__( self, dim: int, intermediate_dim: int | None = None, layer_scale_init_value: float = 0.0, elementwise_affine_ln: bool = True, is_causal: bool = False, ): super().__init__() intermediate_dim = intermediate_dim if intermediate_dim is not None else dim * 3 self.dwconv = nn.Conv1d( dim, dim, kernel_size=7, padding=0 if is_causal else 3, groups=dim ) # depthwise conv self.norm = nn.LayerNorm( dim, eps=1e-6, elementwise_affine=elementwise_affine_ln ) self.pwconv1 = nn.Linear( dim, intermediate_dim ) # pointwise/1x1 convs, implemented with linear layers self.act = nn.GELU() self.pwconv2 = nn.Linear(intermediate_dim, dim) self.gamma = ( nn.Parameter(layer_scale_init_value * torch.ones(dim), requires_grad=True) if layer_scale_init_value > 0 else None ) self.is_causal = is_causal def forward( self, x: torch.Tensor, scale_shift: tuple[torch.Tensor, torch.Tensor] | None = None, gate: torch.Tensor | None = None, ) -> torch.Tensor: residual = x if self.is_causal: x = torch.nn.functional.pad(x, (6, 0)) x = self.dwconv(x) x = x.transpose(1, 2) # (B, C, T) -> (B, T, C) x = self.norm(x) if scale_shift is not None: scale, shift = scale_shift x = x * scale[:, None] + shift[:, None] x = self.pwconv1(x) x = self.act(x) x = self.pwconv2(x) if self.gamma is not None: x = self.gamma * x if gate is not None: x = gate[:, None] * x x = x.transpose(1, 2) # (B, T, C) -> (B, C, T) x = residual + x return x class ConvNextNet(nn.Module): def __init__(self, n_layers, dim, intermediate_dim: int | None = None): super().__init__() self.net = nn.Sequential( *[ ConvNeXtBlock( dim, intermediate_dim, ) for _ in range(n_layers) ] ) def forward(self, x): return self.net(x) class ConvNextPatchEncoder(nn.Module): def __init__( self, patch_sizes: list[int], n_layers_per_patch: int, patch_expansion_factor: float = 1.5, is_decoder: bool = False, ): super().__init__() patch_to_dim = [] convnext = [] for i, patch_size in enumerate(patch_sizes): in_dim = int((patch_expansion_factor if i > 0 else 1.0) * patch_size) out_dim = int(patch_expansion_factor * patch_size) if is_decoder: in_dim, out_dim = out_dim, in_dim patch_to_dim.append( nn.Linear( in_dim, out_dim, ) ) convnext += [ nn.Sequential( *[ ConvNeXtBlock(int(patch_size * patch_expansion_factor)) for _ in range(n_layers_per_patch) ] ) ] self.is_decoder = is_decoder self.patch_sizes = patch_sizes self.patch_expansion_factor = patch_expansion_factor self.patch_to_dim = nn.ModuleList(patch_to_dim) self.convnext = nn.ModuleList(convnext) def forward(self, x): if self.is_decoder: for i, patch_size in reversed(list(enumerate(self.patch_sizes))): B, P, N = x.shape patch_expansion_factor_maybe = ( self.patch_expansion_factor if i > 0 else 1.0 ) x = x.reshape(B, int(patch_size * self.patch_expansion_factor), -1) x = self.convnext[i](x) x = self.patch_to_dim[i](x.transpose(1, 2)).transpose(1, 2) else: for i, patch_size in enumerate(self.patch_sizes): B, P, N = x.shape patch_expansion_factor_maybe = ( self.patch_expansion_factor if i > 0 else 1.0 ) x = x.reshape(B, int(patch_size * patch_expansion_factor_maybe), -1) x = self.patch_to_dim[i](x.transpose(1, 2)).transpose(1, 2) x = self.convnext[i](x) return x class ConvNextEncoder(nn.Module): def __init__( self, in_dim: int, dim: int, n_layers: int, intermediate_dim: int | None = None, stride: int = 1, ): super().__init__() self.in_proj = nn.Linear(in_dim, dim) if stride > 1: self.stride = nn.Conv1d( in_channels=dim, out_channels=dim, kernel_size=(stride * 2) + 1, stride=stride, padding=stride // 2, ) else: self.stride = nn.Identity() self.net = ConvNextNet(n_layers, dim, intermediate_dim) def forward(self, x): x = self.in_proj(x.transpose(1, 2)).transpose(1, 2) x = self.stride(x) return self.net(x) class ConvNextDecoder(nn.Module): def __init__( self, out_dim: int, dim: int, n_layers: int, intermediate_dim: int | None = None, stride: int = 1, stride_position: str = "before", ): super().__init__() self.out_proj = nn.Linear(dim, out_dim) if stride > 1: self.stride = nn.ConvTranspose1d( in_channels=dim, out_channels=dim, kernel_size=(stride * 2) + 1, stride=stride, padding=stride // 2, output_padding=stride // 2, ) else: self.stride = nn.Identity() self.stride_position = stride_position self.net = ConvNextNet(n_layers, dim, intermediate_dim) def forward(self, x): if self.stride_position == "before": x = self.stride(x) x = self.net(x) if self.stride_position == "after": x = self.stride(x) return self.out_proj(x.transpose(1, 2)).transpose(1, 2) class SwiGLU(nn.Module): def __init__(self, d_model: int, ffn_expansion_factor: int = 4): super().__init__() self.p_in = nn.Linear(d_model, (d_model * ffn_expansion_factor // 3) * 2) self.p_out = nn.Linear(d_model * ffn_expansion_factor // 3, d_model) def forward(self, x): gate, x = self.p_in(x).chunk(2, dim=-1) return self.p_out(nn.functional.silu(gate) * x)