Source code for parakeet.models.waveflow

# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import math
from typing import List, Union, Tuple

import numpy as np
import paddle
from paddle import nn
from paddle.nn import functional as F
from paddle.nn import initializer as I

from parakeet.utils import checkpoint
from parakeet.modules import geometry as geo

__all__ = ["WaveFlow", "ConditionalWaveFlow", "WaveFlowLoss"]


def fold(x, n_group):
    r"""Fold audio or spectrogram's temporal dimension in to groups.

    Parameters
    ----------
    x : Tensor [shape=(\*, time_steps)
        The input tensor.

    n_group : int
        The size of a group.

    Returns
    ---------
    Tensor : [shape=(\*, time_steps // n_group, group)]
        Folded tensor.
    """
    *spatial_shape, time_steps = x.shape
    new_shape = spatial_shape + [time_steps // n_group, n_group]
    return paddle.reshape(x, new_shape)


class UpsampleNet(nn.LayerList):
    """Layer to upsample mel spectrogram to the same temporal resolution with
    the corresponding waveform.

    It consists of several conv2dtranspose layers which perform deconvolution
    on mel and time dimension.

    Parameters
    ----------
    upscale_factors : List[int], optional
        Time upsampling factors for each Conv2DTranspose Layer.

        The ``UpsampleNet`` contains ``len(upscale_factor)`` Conv2DTranspose
        Layers. Each upscale_factor is used as the ``stride`` for the
        corresponding Conv2DTranspose. Defaults to [16, 16], this the default
        upsampling factor is 256.

    Notes
    ------
    ``np.prod(upscale_factors)`` should equals the ``hop_length`` of the stft
    transformation used to extract spectrogram features from audio.

    For example, ``16 * 16 = 256``, then the spectrogram extracted with a stft
    transformation whose ``hop_length`` equals 256 is suitable.

    See Also
    ---------
    ``librosa.core.stft``
    """

    def __init__(self, upsample_factors):
        super().__init__()
        for factor in upsample_factors:
            std = math.sqrt(1 / (3 * 2 * factor))
            init = I.Uniform(-std, std)
            self.append(
                nn.utils.weight_norm(
                    nn.Conv2DTranspose(
                        1,
                        1, (3, 2 * factor),
                        padding=(1, factor // 2),
                        stride=(1, factor),
                        weight_attr=init,
                        bias_attr=init)))

        # upsample factors
        self.upsample_factor = np.prod(upsample_factors)
        self.upsample_factors = upsample_factors

    def forward(self, x, trim_conv_artifact=False):
        r"""Forward pass of the ``UpsampleNet``.

        Parameters
        -----------
        x : Tensor [shape=(batch_size, input_channels, time_steps)]
            The input spectrogram.

        trim_conv_artifact : bool, optional
            Trim deconvolution artifact at each layer. Defaults to False.

        Returns
        --------
        Tensor: [shape=(batch_size, input_channels, time_steps \* upsample_factor)]
            The upsampled spectrogram.

        Notes
        --------
        If trim_conv_artifact is ``True``, the output time steps is less
        than ``time_steps \* upsample_factors``.
        """
        x = paddle.unsqueeze(x, 1)  #(B, C, T) -> (B, 1, C, T)
        for layer in self:
            x = layer(x)
            if trim_conv_artifact:
                time_cutoff = layer._kernel_size[1] - layer._stride[1]
                x = x[:, :, :, :-time_cutoff]
            x = F.leaky_relu(x, 0.4)
        x = paddle.squeeze(x, 1)  # back to (B, C, T)
        return x


class ResidualBlock(nn.Layer):
    """ResidualBlock, the basic unit of ResidualNet used in WaveFlow.

    It has a conv2d layer, which has causal padding in height dimension and
    same paddign in width dimension. It also has projection for the condition
    and output.

    Parameters
    ----------
    channels : int
        Feature size of the input.

    cond_channels : int
        Featuer size of the condition.

    kernel_size : Tuple[int]
        Kernel size of the Convolution2d applied to the input.

    dilations : int
        Dilations of the Convolution2d applied to the input.
    """

    def __init__(self, channels, cond_channels, kernel_size, dilations):
        super().__init__()
        # input conv
        std = math.sqrt(1 / channels * np.prod(kernel_size))
        init = I.Uniform(-std, std)
        receptive_field = [
            1 + (k - 1) * d for (k, d) in zip(kernel_size, dilations)
        ]
        rh, rw = receptive_field
        paddings = [rh - 1, 0, rw // 2, (rw - 1) // 2]  # causal & same
        conv = nn.Conv2D(
            channels,
            2 * channels,
            kernel_size,
            padding=paddings,
            dilation=dilations,
            weight_attr=init,
            bias_attr=init)
        self.conv = nn.utils.weight_norm(conv)
        self.rh = rh
        self.rw = rw
        self.dilations = dilations

        # condition projection
        std = math.sqrt(1 / cond_channels)
        init = I.Uniform(-std, std)
        condition_proj = nn.Conv2D(
            cond_channels,
            2 * channels, (1, 1),
            weight_attr=init,
            bias_attr=init)
        self.condition_proj = nn.utils.weight_norm(condition_proj)

        # parametric residual & skip connection
        std = math.sqrt(1 / channels)
        init = I.Uniform(-std, std)
        out_proj = nn.Conv2D(
            channels, 2 * channels, (1, 1), weight_attr=init, bias_attr=init)
        self.out_proj = nn.utils.weight_norm(out_proj)

    def forward(self, x, condition):
        """Compute output for a whole folded sequence.

        Parameters
        ----------
        x : Tensor [shape=(batch_size, channel, height, width)]
            The input.

        condition : Tensor [shape=(batch_size, condition_channel, height, width)]
            The local condition.

        Returns
        -------
        res : Tensor [shape=(batch_size, channel, height, width)]
            The residual output.

        skip : Tensor [shape=(batch_size, channel, height, width)]
            The skip output.
        """
        x_in = x
        x = self.conv(x)
        x += self.condition_proj(condition)

        content, gate = paddle.chunk(x, 2, axis=1)
        x = paddle.tanh(content) * F.sigmoid(gate)

        x = self.out_proj(x)
        res, skip = paddle.chunk(x, 2, axis=1)
        res = x_in + res
        return res, skip

    def start_sequence(self):
        """Prepare the layer for incremental computation of causal
        convolution. Reset the buffer for causal convolution.

        Raises:
            ValueError: If not in evaluation mode.
        """
        if self.training:
            raise ValueError("Only use start sequence at evaluation mode.")
        self._conv_buffer = None

        # NOTE: call self.conv's weight norm hook expliccitly since
        # its weight will be visited directly in `add_input` without
        # calling its `__call__` method. If we do not trigger the weight
        # norm hook, the weight may be outdated. e.g. after loading from
        # a saved checkpoint
        # see also: https://github.com/pytorch/pytorch/issues/47588
        for hook in self.conv._forward_pre_hooks.values():
            hook(self.conv, None)

    def add_input(self, x_row, condition_row):
        """Compute the output for a row and update the buffer.

        Parameters
        ----------
        x_row : Tensor [shape=(batch_size, channel, 1, width)]
            A row of the input.

        condition_row : Tensor [shape=(batch_size, condition_channel, 1, width)]
            A row of the condition.

        Returns
        -------
        res : Tensor [shape=(batch_size, channel, 1, width)]
            A row of the the residual output.

        skip : Tensor [shape=(batch_size, channel, 1, width)]
            A row of the skip output.
        """
        x_row_in = x_row
        if self._conv_buffer is None:
            self._init_buffer(x_row)
        self._update_buffer(x_row)

        rw = self.rw
        x_row = F.conv2d(
            self._conv_buffer,
            self.conv.weight,
            self.conv.bias,
            padding=[0, 0, rw // 2, (rw - 1) // 2],
            dilation=self.dilations)
        x_row += self.condition_proj(condition_row)

        content, gate = paddle.chunk(x_row, 2, axis=1)
        x_row = paddle.tanh(content) * F.sigmoid(gate)

        x_row = self.out_proj(x_row)
        res, skip = paddle.chunk(x_row, 2, axis=1)
        res = x_row_in + res
        return res, skip

    def _init_buffer(self, input):
        batch_size, channels, _, width = input.shape
        self._conv_buffer = paddle.zeros(
            [batch_size, channels, self.rh, width], dtype=input.dtype)

    def _update_buffer(self, input):
        self._conv_buffer = paddle.concat(
            [self._conv_buffer[:, :, 1:, :], input], axis=2)


class ResidualNet(nn.LayerList):
    """A stack of several ResidualBlocks. It merges condition at each layer.

    Parameters
    ----------
    n_layer : int
        Number of ResidualBlocks in the ResidualNet.

    residual_channels : int
        Feature size of each ResidualBlocks.

    condition_channels : int
        Feature size of the condition.

    kernel_size : Tuple[int]
        Kernel size of each ResidualBlock.

    dilations_h : List[int]
        Dilation in height dimension of every ResidualBlock.

    Raises
    ------
    ValueError
        If the length of dilations_h does not equals n_layers.
    """

    def __init__(self,
                 n_layer: int,
                 residual_channels: int,
                 condition_channels: int,
                 kernel_size: Tuple[int],
                 dilations_h: List[int]):
        if len(dilations_h) != n_layer:
            raise ValueError(
                "number of dilations_h should equals num of layers")
        super().__init__()
        for i in range(n_layer):
            dilation = (dilations_h[i], 2**i)
            layer = ResidualBlock(residual_channels, condition_channels,
                                  kernel_size, dilation)
            self.append(layer)

    def forward(self, x, condition):
        """Comput the output of given the input and the condition.

        Parameters
        -----------
        x : Tensor [shape=(batch_size, channel, height, width)]
            The input.

        condition : Tensor [shape=(batch_size, condition_channel, height, width)]
            The local condition.

        Returns
        --------
        Tensor : [shape=(batch_size, channel, height, width)]
            The output, which is an aggregation of all the skip outputs.
        """
        skip_connections = []
        for layer in self:
            x, skip = layer(x, condition)
            skip_connections.append(skip)
        out = paddle.sum(paddle.stack(skip_connections, 0), 0)
        return out

    def start_sequence(self):
        """Prepare the layer for incremental computation.
        """
        for layer in self:
            layer.start_sequence()

    def add_input(self, x_row, condition_row):
        """Compute the output for a row and update the buffers.

        Parameters
        ----------
        x_row : Tensor [shape=(batch_size, channel, 1, width)]
            A row of the input.

        condition_row : Tensor [shape=(batch_size, condition_channel, 1, width)]
            A row of the condition.

        Returns
        -------
        res : Tensor [shape=(batch_size, channel, 1, width)]
            A row of the the residual output.

        skip : Tensor [shape=(batch_size, channel, 1, width)]
            A row of the skip output.
        """
        skip_connections = []
        for layer in self:
            x_row, skip = layer.add_input(x_row, condition_row)
            skip_connections.append(skip)
        out = paddle.sum(paddle.stack(skip_connections, 0), 0)
        return out


class Flow(nn.Layer):
    """A bijection (Reversable layer) that transform a density of latent
    variables p(Z) into a complex data distribution p(X).

    It's an auto regressive flow. The ``forward`` method implements the
    probability density estimation. The ``inverse`` method implements the
    sampling.

    Parameters
    ----------
    n_layers : int
        Number of ResidualBlocks in the Flow.

    channels : int
        Feature size of the ResidualBlocks.

    mel_bands : int
        Feature size of the mel spectrogram (mel bands).

    kernel_size : Tuple[int]
        Kernel size of each ResisualBlocks in the Flow.

    n_group : int
        Number of timesteps to the folded into a group.
    """
    dilations_dict = {
        8: [1, 1, 1, 1, 1, 1, 1, 1],
        16: [1, 1, 1, 1, 1, 1, 1, 1],
        32: [1, 2, 4, 1, 2, 4, 1, 2],
        64: [1, 2, 4, 8, 16, 1, 2, 4],
        128: [1, 2, 4, 8, 16, 32, 64, 1]
    }

    def __init__(self, n_layers, channels, mel_bands, kernel_size, n_group):
        super().__init__()
        # input projection
        self.input_proj = nn.utils.weight_norm(
            nn.Conv2D(
                1,
                channels, (1, 1),
                weight_attr=I.Uniform(-1., 1.),
                bias_attr=I.Uniform(-1., 1.)))

        # residual net
        self.resnet = ResidualNet(n_layers, channels, mel_bands, kernel_size,
                                  self.dilations_dict[n_group])

        # output projection
        self.output_proj = nn.Conv2D(
            channels,
            2, (1, 1),
            weight_attr=I.Constant(0.),
            bias_attr=I.Constant(0.))

        # specs
        self.n_group = n_group

    def _predict_parameters(self, x, condition):
        x = self.input_proj(x)
        x = self.resnet(x, condition)
        bijection_params = self.output_proj(x)
        logs, b = paddle.chunk(bijection_params, 2, axis=1)
        return logs, b

    def _transform(self, x, logs, b):
        z_0 = x[:, :, :1, :]  # the first row, just copy it
        z_out = x[:, :, 1:, :] * paddle.exp(logs) + b
        z_out = paddle.concat([z_0, z_out], axis=2)
        return z_out

    def forward(self, x, condition):
        """Probability density estimation. It is done by inversely transform
        a sample from p(X) into a sample from p(Z).

        Parameters
        -----------
        x : Tensor [shape=(batch, 1, height, width)]
            A input sample of the distribution p(X).

        condition : Tensor [shape=(batch, condition_channel, height, width)]
            The local condition.

        Returns
        --------
        z (Tensor): shape(batch, 1, height, width), the transformed sample.

        Tuple[Tensor, Tensor]
            The parameter of the transformation.

            logs (Tensor): shape(batch, 1, height - 1, width), the log scale
            of the transformation from x to z.

            b (Tensor): shape(batch, 1, height - 1, width), the shift of the
            transformation from x to z.
        """
        # (B, C, H-1, W)
        logs, b = self._predict_parameters(x[:, :, :-1, :],
                                           condition[:, :, 1:, :])
        z = self._transform(x, logs, b)
        return z, (logs, b)

    def _predict_row_parameters(self, x_row, condition_row):
        x_row = self.input_proj(x_row)
        x_row = self.resnet.add_input(x_row, condition_row)
        bijection_params = self.output_proj(x_row)
        logs, b = paddle.chunk(bijection_params, 2, axis=1)
        return logs, b

    def _inverse_transform_row(self, z_row, logs, b):
        x_row = (z_row - b) * paddle.exp(-logs)
        return x_row

    def _inverse_row(self, z_row, x_row, condition_row):
        logs, b = self._predict_row_parameters(x_row, condition_row)
        x_next_row = self._inverse_transform_row(z_row, logs, b)
        return x_next_row, (logs, b)

    def _start_sequence(self):
        self.resnet.start_sequence()

    def inverse(self, z, condition):
        """Sampling from the the distrition p(X). It is done by sample form
        p(Z) and transform the sample. It is a auto regressive transformation.

        Parameters
        -----------
        z : Tensor [shape=(batch, 1, height, width)]
            A sample of the distribution p(Z).

        condition : Tensor [shape=(batch, condition_channel, height, width)]
            The local condition.

        Returns
        ---------
        x : Tensor [shape=(batch, 1, height, width)]
            The transformed sample.

        Tuple[Tensor, Tensor]
            The parameter of the transformation.

            logs (Tensor): shape(batch, 1, height - 1, width), the log scale
            of the transformation from x to z.

            b (Tensor): shape(batch, 1, height - 1, width), the shift of the
            transformation from x to z.
        """
        z_0 = z[:, :, :1, :]
        x = []
        logs_list = []
        b_list = []
        x.append(z_0)

        self._start_sequence()
        for i in range(1, self.n_group):
            x_row = x[-1]  # actuallt i-1:i
            z_row = z[:, :, i:i + 1, :]
            condition_row = condition[:, :, i:i + 1, :]

            x_next_row, (logs, b) = self._inverse_row(z_row, x_row,
                                                      condition_row)
            x.append(x_next_row)
            logs_list.append(logs)
            b_list.append(b)

        x = paddle.concat(x, 2)
        logs = paddle.concat(logs_list, 2)
        b = paddle.concat(b_list, 2)
        return x, (logs, b)


[docs]class WaveFlow(nn.LayerList): """An Deep Reversible layer that is composed of severel auto regressive flows. Parameters ----------- n_flows : int Number of flows in the WaveFlow model. n_layers : int Number of ResidualBlocks in each Flow. n_group : int Number of timesteps to fold as a group. channels : int Feature size of each ResidualBlock. mel_bands : int Feature size of mel spectrogram (mel bands). kernel_size : Union[int, List[int]] Kernel size of the convolution layer in each ResidualBlock. """ def __init__(self, n_flows, n_layers, n_group, channels, mel_bands, kernel_size): if n_group % 2 or n_flows % 2: raise ValueError( "number of flows and number of group must be even " "since a permutation along group among flows is used.") super().__init__() for _ in range(n_flows): self.append( Flow(n_layers, channels, mel_bands, kernel_size, n_group)) # permutations in h self.perms = self._create_perm(n_group, n_flows) # specs self.n_group = n_group self.n_flows = n_flows def _create_perm(self, n_group, n_flows): indices = list(range(n_group)) half = n_group // 2 perms = [] for i in range(n_flows): if i < n_flows // 2: perms.append(indices[::-1]) else: perm = list(reversed(indices[:half])) + list( reversed(indices[half:])) perms.append(perm) return perms def _trim(self, x, condition): assert condition.shape[-1] >= x.shape[-1] pruned_len = int(x.shape[-1] // self.n_group * self.n_group) if x.shape[-1] > pruned_len: x = x[:, :pruned_len] if condition.shape[-1] > pruned_len: condition = condition[:, :, :pruned_len] return x, condition
[docs] def forward(self, x, condition): """Probability density estimation of random variable x given the condition. Parameters ----------- x : Tensor [shape=(batch_size, time_steps)] The audio. condition : Tensor [shape=(batch_size, condition channel, time_steps)] The local condition (mel spectrogram here). Returns -------- z : Tensor [shape=(batch_size, time_steps)] The transformed random variable. log_det_jacobian: Tensor [shape=(1,)] The log determinant of the jacobian of the transformation from x to z. """ # x: (B, T) # condition: (B, C, T) upsampled condition x, condition = self._trim(x, condition) # to (B, C, h, T//h) layout x = paddle.unsqueeze( paddle.transpose(fold(x, self.n_group), [0, 2, 1]), 1) condition = paddle.transpose( fold(condition, self.n_group), [0, 1, 3, 2]) # flows logs_list = [] for i, layer in enumerate(self): x, (logs, b) = layer(x, condition) logs_list.append(logs) # permute paddle has no shuffle dim x = geo.shuffle_dim(x, 2, perm=self.perms[i]) condition = geo.shuffle_dim(condition, 2, perm=self.perms[i]) z = paddle.squeeze(x, 1) # (B, H, W) batch_size = z.shape[0] z = paddle.reshape(paddle.transpose(z, [0, 2, 1]), [batch_size, -1]) log_det_jacobian = paddle.sum(paddle.stack(logs_list)) return z, log_det_jacobian
[docs] def inverse(self, z, condition): """Sampling from the the distrition p(X). It is done by sample a ``z`` form p(Z) and transform it into ``x``. Each Flow transform .. math:: `z_{i-1}` to .. math:: `z_{i}` in an autoregressive manner. Parameters ---------- z : Tensor [shape=(batch, 1, time_steps] A sample of the distribution p(Z). condition : Tensor [shape=(batch, condition_channel, time_steps)] The local condition. Returns -------- x : Tensor [shape=(batch_size, time_steps)] The transformed sample (audio here). """ z, condition = self._trim(z, condition) # to (B, C, h, T//h) layout z = paddle.unsqueeze( paddle.transpose(fold(z, self.n_group), [0, 2, 1]), 1) condition = paddle.transpose( fold(condition, self.n_group), [0, 1, 3, 2]) # reverse it flow by flow for i in reversed(range(self.n_flows)): z = geo.shuffle_dim(z, 2, perm=self.perms[i]) condition = geo.shuffle_dim(condition, 2, perm=self.perms[i]) z, (logs, b) = self[i].inverse(z, condition) x = paddle.squeeze(z, 1) # (B, H, W) batch_size = x.shape[0] x = paddle.reshape(paddle.transpose(x, [0, 2, 1]), [batch_size, -1]) return x
[docs]class ConditionalWaveFlow(nn.LayerList): """ConditionalWaveFlow, a UpsampleNet with a WaveFlow model. Parameters ---------- upsample_factors : List[int] Upsample factors for the upsample net. n_flows : int Number of flows in the WaveFlow model. n_layers : int Number of ResidualBlocks in each Flow. n_group : int Number of timesteps to fold as a group. channels : int Feature size of each ResidualBlock. n_mels : int Feature size of mel spectrogram (mel bands). kernel_size : Union[int, List[int]] Kernel size of the convolution layer in each ResidualBlock. """ def __init__(self, upsample_factors: List[int], n_flows: int, n_layers: int, n_group: int, channels: int, n_mels: int, kernel_size: Union[int, List[int]]): super().__init__() self.encoder = UpsampleNet(upsample_factors) self.decoder = WaveFlow( n_flows=n_flows, n_layers=n_layers, n_group=n_group, channels=channels, mel_bands=n_mels, kernel_size=kernel_size)
[docs] def forward(self, audio, mel): """Compute the transformed random variable z (x to z) and the log of the determinant of the jacobian of the transformation from x to z. Parameters ---------- audio : Tensor [shape=(B, T)] The audio. mel : Tensor [shape=(B, C_mel, T_mel)] The mel spectrogram. Returns ------- z : Tensor [shape=(B, T)] The inversely transformed random variable z (x to z) log_det_jacobian: Tensor [shape=(1,)] the log of the determinant of the jacobian of the transformation from x to z. """ condition = self.encoder(mel) z, log_det_jacobian = self.decoder(audio, condition) return z, log_det_jacobian
[docs] @paddle.no_grad() def infer(self, mel): r"""Generate raw audio given mel spectrogram. Parameters ---------- mel : Tensor [shape=(B, C_mel, T_mel)] Mel spectrogram (in log-magnitude). Returns ------- Tensor : [shape=(B, T)] The synthesized audio, where``T <= T_mel \* upsample_factors``. """ start = time.time() condition = self.encoder(mel, trim_conv_artifact=True) #(B, C, T) batch_size, _, time_steps = condition.shape z = paddle.randn([batch_size, time_steps], dtype=mel.dtype) x = self.decoder.inverse(z, condition) end = time.time() print("time: {}s".format(end - start)) return x
[docs] @paddle.no_grad() def predict(self, mel): """Generate raw audio given mel spectrogram. Parameters ---------- mel : np.ndarray [shape=(C_mel, T_mel)] Mel spectrogram of an utterance(in log-magnitude). Returns ------- np.ndarray [shape=(T,)] The synthesized audio. """ mel = paddle.to_tensor(mel) mel = paddle.unsqueeze(mel, 0) audio = self.infer(mel) audio = audio[0].numpy() return audio
[docs] @classmethod def from_pretrained(cls, config, checkpoint_path): """Build a ConditionalWaveFlow model from a pretrained model. Parameters ---------- config: yacs.config.CfgNode model configs checkpoint_path: Path or str the path of pretrained model checkpoint, without extension name Returns ------- ConditionalWaveFlow The model built from pretrained result. """ model = cls(upsample_factors=config.model.upsample_factors, n_flows=config.model.n_flows, n_layers=config.model.n_layers, n_group=config.model.n_group, channels=config.model.channels, n_mels=config.data.n_mels, kernel_size=config.model.kernel_size) checkpoint.load_parameters(model, checkpoint_path=checkpoint_path) return model
[docs]class WaveFlowLoss(nn.Layer): """Criterion of a WaveFlow model. Parameters ---------- sigma : float The standard deviation of the gaussian noise used in WaveFlow, by default 1.0. """ def __init__(self, sigma=1.0): super().__init__() self.sigma = sigma self.const = 0.5 * np.log(2 * np.pi) + np.log(self.sigma)
[docs] def forward(self, z, log_det_jacobian): """Compute the loss given the transformed random variable z and the log_det_jacobian of transformation from x to z. Parameters ---------- z : Tensor [shape=(B, T)] The transformed random variable (x to z). log_det_jacobian : Tensor [shape=(1,)] The log of the determinant of the jacobian matrix of the transformation from x to z. Returns ------- Tensor [shape=(1,)] The loss. """ loss = paddle.sum(z * z) / (2 * self.sigma * self.sigma ) - log_det_jacobian loss = loss / np.prod(z.shape) return loss + self.const