Source code for torchid.dynonet.module.lti

import torch
import numpy as np  # temporary
from torch.nn.parameter import Parameter
from torchid.dynonet.functional.lti import MimoLinearDynamicalOperatorFun


[docs]class MimoLinearDynamicalOperator(torch.nn.Module): r"""Applies a multi-input-multi-output linear dynamical filtering operation. Args: in_channels (int): Number of input channels out_channels (int): Number of output channels n_b (int): Number of learnable coefficients of the transfer function numerator n_a (int): Number of learnable coefficients of the transfer function denominator n_k (int, optional): Number of input delays in the numerator. Default: 0 Shape: - Input: (batch_size, seq_len, in_channels) - Output: (batch_size, seq_len, out_channels) Attributes: b_coeff (Tensor): The learnable coefficients of the transfer function numerator a_coeff (Tensor): The learnable coefficients of the transfer function denominator Examples:: >>> in_channels, out_channels = 2, 4 >>> n_b, n_a, n_k = 2, 2, 1 >>> G = MimoLinearDynamicalOperator(in_channels, out_channels, n_b, n_a, n_k) >>> batch_size, seq_len = 32, 100 >>> u_in = torch.ones((batch_size, seq_len, in_channels)) >>> y_out = G(u_in, y_0, u_0) # shape: (batch_size, seq_len, out_channels) """ def __init__(self, in_channels, out_channels, n_b, n_a, n_k=0): super(MimoLinearDynamicalOperator, self).__init__() self.b_coeff = Parameter(torch.zeros(out_channels, in_channels, n_b)) self.a_coeff = Parameter(torch.zeros(out_channels, in_channels, n_a)) self.out_channels = out_channels self.in_channels = in_channels self.n_a = n_a self.n_b = n_b self.n_k = n_k with torch.no_grad(): init_range = 0.01 self.a_coeff[:] = (torch.rand(self.a_coeff.shape) - 0.5) * 2 * init_range self.b_coeff[:] = (torch.rand(self.b_coeff.shape) - 0.5) * 2 * init_range def forward(self, u_in, y_0=None, u_0=None): if self.n_k != 0: #u_d = u_in.roll(self.n_k, dims=-2) # roll on the time axis #u_d[..., 0:self.n_k, :] = 0.0 # input sequence with delay u_d = torch.empty_like(u_in) u_d[..., self.n_k:, :] = u_in[:, :-self.n_k, :] u_d[..., 0:self.n_k, :] = 0.0 else: u_d = u_in return MimoLinearDynamicalOperatorFun.apply(self.b_coeff, self.a_coeff, u_d, y_0, u_0)
[docs] def get_filtdata(self): r"""Returns the numerator and denominator coefficients of the transfer function :math:`q^{-1}`-polynomials. The polynomials are function of the variable :math:`q^{-1}`. The polynomial coefficients b and a have length m and n, respectively and are sorted in descending power order. For a certain input channel :math:`i` and output channel :math:`o`, the corresponding transfer function :math:`G_{i\rightarrow o}(z)` is: .. math:: G_{i\rightarrow o}(z) = q^{-n_k}\frac{b[o, i, 0] + b[o, i, 1]q^{-1} + \dots + b[o, i, n]q^{-m+1}} {a[o, i, 0] + a[o, i, 1]q^{-1} + \dots + a[o, i, n]q^{-n+1}} Returns: np.array(in_channels, out_channels, m), np.array(in_channels, out_channels, n): numerator :math:`\beta` and denominator :math:`\alpha` polynomial coefficients of the transfer function. Examples:: >>> num, den = G.get_tfdata() >>> G_tf = control.TransferFunction(G2_num, G2_den, ts=1.0) """ return self.__get_filtdata__()
[docs] def get_tfdata(self): r"""Returns the numerator and denominator coefficients of the transfer function :math:`z`-polynomials. The polynomials are function of the variable Z-transform variable :math:`z`. The polynomial coefficients :math::`\beta` and :math:`\alpha` have equal length p and are sorted in descending power order. For a certain input channel :math:`i` and output channel :math:`o`, the corresponding transfer function :math:`G_{i\rightarrow o}(z)` is: .. math:: G_{i\rightarrow o}(z) = \frac{\beta[o, i, 0]z^{n-1} + \beta[o, i, 1]z^{n-1} + \dots + \beta[o, i, p]}{\alpha[o, i, 0]z^{n-1} + \alpha[o, i, 1]z^{n-2} + \dots + \alpha[o, i, p]} Returns: np.array(in_channels, out_channels, p), np.array(in_channels, out_channels, p): numerator :math:`\beta` and denominator :math:`\alpha` polynomial coefficients of the transfer function. Examples:: >>> num, den = G.get_tfdata() >>> G_tf = control.TransferFunction(G2_num, G2_den, ts=1.0) """ return self.__get_tfdata__()
def __get_filtdata__(self): # returns the coefficients of the polynomials b and a as function of q^{-1} b_coeff_np, a_coeff_np = self.__get_ba_coeff__() b_seq = np.zeros_like(b_coeff_np, shape=(self.out_channels, self.in_channels, self.n_b + self.n_k)) #b_coeff_np b_seq[:, :, self.n_k:] = b_coeff_np[:, :, :] a_seq = np.empty_like(a_coeff_np, shape=(self.out_channels, self.in_channels, self.n_a + 1)) a_seq[:, :, 0] = 1 a_seq[:, :, 1:] = a_coeff_np[:, :, :] return b_seq, a_seq def __get_tfdata__(self): b_seq, a_seq = self.__get_filtdata__() M = self.n_b + self.n_k # number of numerator coefficients of the q^{-1} polynomial N = self.n_a + 1 # number of denominator coefficients of the q^{-1} polynomial if M > N: num = b_seq den = np.c_[a_seq, np.zeros((self.out_channels, self.in_channels, M - N))] elif N > M: num = np.c_[b_seq, np.zeros((self.out_channels, self.in_channels, N - M))] den = a_seq else: # N == M num = b_seq den = a_seq return num, den def __get_ba_coeff__(self): return self.b_coeff.detach().numpy(), self.a_coeff.detach().numpy()
# SISO is implemented as a sub-case of MIMO
[docs]class SisoLinearDynamicalOperator(MimoLinearDynamicalOperator): r"""Applies a single-input-single-output linear dynamical filtering operation. Args: n_b (int): Number of learnable coefficients of the transfer function numerator n_a (int): Number of learnable coefficients of the transfer function denominator n_k (int, optional): Number of input delays in the numerator. Default: 0 Shape: - Input: (batch_size, seq_len, 1) - Output: (batch_size, seq_len, 1) Attributes: b_coeff (Tensor): the learnable coefficients of the transfer function numerator a_coeff (Tensor): the learnable coefficients of the transfer function denominator Examples:: >>> n_b, n_a = 2, 2 >>> G = SisoLinearDynamicalOperator(b_coeff, a_coeff) >>> batch_size, seq_len = 32, 100 >>> u_in = torch.ones((batch_size, seq_len)) >>> y_out = G(u_in, y_0, u_0) # shape: (batch_size, seq_len, 1) """ def __init__(self, n_b, n_a, n_k=0): super(SisoLinearDynamicalOperator, self).__init__(1, 1, n_b=n_b, n_a=n_a, n_k=n_k)
[docs] def get_filtdata(self): b_seq, a_seq = super(SisoLinearDynamicalOperator, self).__get_filtdata__() # MIMO numden return b_seq[0, 0, :], a_seq[0, 0, :]
[docs] def get_tfdata(self): num, den = super(SisoLinearDynamicalOperator, self).__get_tfdata__() # MIMO numden return num[0, 0, :], den[0, 0, :]
[docs]class MimoFirLinearDynamicalOperator(torch.nn.Module): r"""Applies a FIR linear multi-input-multi-output filtering operation. Args: in_channels (int): Number of input channels out_channels (int): Number of output channels n_b (int): Number of learnable FIR coefficients Shape: - Input: (batch_size, seq_len, in_channels) - Output: (batch_size, seq_len, out_channels) Attributes: G (torch.nn.Conv1d): The underlying Conv1D object used to implement the convolution Examples:: >>> in_channels, out_channels = 2, 4 >>> n_b = 128 >>> G = MimoLinearDynamicalOperator(in_channels, out_channels, n_b) >>> batch_size, seq_len = 32, 100 >>> u_in = torch.ones((batch_size, seq_len, in_channels)) >>> y_out = G(u_in, y_0, u_0) # shape: (batch_size, seq_len, out_channels) """ def __init__(self, in_channels, out_channels, n_b, channels_last=True): super(MimoFirLinearDynamicalOperator, self).__init__() self.G = torch.nn.Conv1d(in_channels, out_channels, kernel_size=n_b, bias=False, padding=n_b-1) #self.b_coeff = self.G.weight self.n_a = 0 self.n_b = n_b self.out_channels = out_channels self.in_channels = in_channels self.channels_last = channels_last def forward(self, u_in): # PyTorch 1.4 does not have a channels_last option for Conv1d, which is more convenient # in our block-oriented modeling framework. # Then, let us transpose the last and second last channels manually before and after applying torch.nn.Conv1d if self.channels_last: u_torch = u_in.transpose(-2, -1) y_out = self.G(u_torch) y_out = y_out[..., 0:-self.n_b+1] if self.channels_last: y_out = y_out.transpose(-2, -1) return y_out def get_filtdata(self): return self.__get_filtdata__() def get_tfdata(self): return self.__get_tfdata__() def __get_filtdata__(self): b_coeff, a_coeff = self.__get_ba_coeff__() b_seq = b_coeff a_seq = np.empty_like(a_coeff, shape=(self.out_channels, self.in_channels, self.n_a + 1)) a_seq[:, :, 0] = 1 a_seq[:, :, 1:] = a_coeff[:, :, :] return b_seq, a_seq def __get_tfdata__(self): b_seq, a_seq = self.__get_filtdata__() M = self.n_b # numerator coefficients N = self.n_a + 1 # denominator coefficients if M > N: num = b_seq den = np.c_[a_seq, np.zeros((self.out_channels, self.in_channels, M - N))] elif N > M: num = np.c_[self.b_poly, np.zeros((self.out_channels, self.in_channels, N - M))] den = a_seq else: # N == M num = b_seq den = a_seq return num, den def __get_ba_coeff__(self): b_coeff_np = self.G.weight.detach().numpy() b_coeff_np = b_coeff_np[:, :, ::-1] a_coeff_np = np.zeros_like(b_coeff_np, shape=(self.out_channels, self.in_channels, 0)) return b_coeff_np, a_coeff_np
[docs]class SisoFirLinearDynamicalOperator(MimoFirLinearDynamicalOperator): r"""Applies a FIR linear single-input-single-output filtering operation. Args: n_b (int): Number of learnable FIR coefficients Shape: - Input: (batch_size, seq_len, 1) - Output: (batch_size, seq_len, 1) Attributes: G (torch.nn.Conv1d): The underlying Conv1D object used to implement the convolution Examples:: >>> n_b = 128 >>> G = SisoFirLinearDynamicalOperator(n_b) >>> batch_size, seq_len = 32, 100 >>> u_in = torch.ones((batch_size, seq_len, 1)) >>> y_out = G(u_in, y_0, u_0) # shape: (batch_size, seq_len, 1) """ def __init__(self, n_b, channels_last=True): super(SisoFirLinearDynamicalOperator, self).__init__(1, 1, n_b, channels_last=channels_last) def get_filtdata(self): b_seq, a_seq = super(SisoFirLinearDynamicalOperator, self).__get_filtdata__() # call to MIMO ba return b_seq[0, 0, :], a_seq[0, 0, :] def get_tfdata(self): num, den = super(SisoFirLinearDynamicalOperator, self).__get_tfdata__() # call to MIMO numden return num[0, 0, :], den[0, 0, :]
[docs]class MimoSecondOrderDynamicOperator(torch.nn.Module): r"""Applies a stable second-order linear multi-input-multi-output filtering operation. The denominator of the transfer function is parametrized in terms of two complex conjugate poles with magnitude :math:: `r, 0 < r < 1` and phase :math:: `\beta, < 0 \beta < \pi`. In turn, :math:: `r` and :math:: `\beta` are parametrized in terms of unconstrained variables :math:: `\rho` and :math:: `\psi` Args: in_channels (int): Number of input channels out_channels (int): Number of output channels Shape: - Input: (batch_size, seq_len, 1) - Output: (batch_size, seq_len, 1) Attributes: rho (Tensor): the learnable :math:: `\rho` coefficients of the transfer function denominator psi (Tensor): the learnable :math:: `\psi` coefficients of the transfer function denominator b_coeff (Tensor): the learnable numerator coefficients Examples:: >>> n_b = 128 >>> G = SisoFirLinearDynamicalOperator(n_b) >>> batch_size, seq_len = 32, 100 >>> u_in = torch.ones((batch_size, seq_len, 1)) >>> y_out = G(u_in, y_0, u_0) # shape: (batch_size, seq_len, 1) """ def __init__(self, in_channels, out_channels): super(MimoSecondOrderDynamicOperator, self).__init__() self.b_coeff = Parameter(torch.zeros(out_channels, in_channels, 2)) self.rho = Parameter(torch.zeros(out_channels, in_channels, 1)) self.psi = Parameter(torch.zeros((out_channels, in_channels, 1))) with torch.no_grad(): self.rho[:] = torch.randn(self.rho.shape) * 0.1 self.psi[:] = torch.randn(self.rho.shape) * 0.1 self.b_coeff[:] = torch.randn(self.b_coeff.shape) * 0.01 def forward(self, u_in, y_0=None, u_0=None): r = torch.sigmoid(self.rho) beta = np.pi * torch.sigmoid(self.psi) a_1 = -2 * r * torch.cos(beta) a_2 = r ** 2 a_coeff = torch.cat((a_1, a_2), dim=-1) return MimoLinearDynamicalOperatorFun.apply(self.b_coeff, a_coeff, u_in, y_0, u_0)
[docs]class SisoSecondOrderDynamicOperator(MimoSecondOrderDynamicOperator): r"""Applies a stable second-order linear single-input-single-output filtering operation. The denominator of the transfer function is parametrized in terms of two complex conjugate poles with magnitude :math:: `r, 0 < r < 1` and phase :math:: `\beta, < 0 \beta < \pi`. In turn, :math:: `r` and :math:: `\beta` are parametrized in terms of unconstrained variables :math:: `\rho` and :math:: `\psi` """ def __init__(self): super(SisoSecondOrderDynamicOperator, self).__init__(1, 1) # in_channels, out_channels, n_b, n_a