Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 15 additions & 28 deletions pynumdiff/kalman_smooth.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
try: import cvxpy
except ImportError: pass

from pynumdiff.utils.utility import huber_const, wrap_angle, ensure_iterable
from pynumdiff.utils.utility import huber_const


def kalman_filter(y, xhat0, P0, A, Q, C, R, B=None, u=None, save_P=True,
circular_vars=None, circular_units='rad'):
def kalman_filter(y, xhat0, P0, A, Q, C, R, B=None, u=None, save_P=True, innovation_fn=None):
"""Run the forward pass of a Kalman filter. Expects discrete-time matrices; use :func:`scipy.linalg.expm`
in the caller to convert from continuous time if needed.

Expand All @@ -25,10 +24,9 @@ def kalman_filter(y, xhat0, P0, A, Q, C, R, B=None, u=None, save_P=True,
:param np.array u: optional control inputs, stacked in the direction of axis 0
:param bool save_P: whether to save history of error covariance and a priori state estimates, used with rts
smoothing but nonstandard to compute for ordinary filtering
:param bool or None circular_vars: bool indicating whether the measurement y is a circular (angular) variable
that is wrapped. This will use a circular innovation calculation for the Kalman filter. The smoothed result
will be returned in an unwrapped form.
:param string circular_units: 'rad' or 'deg' to specify whether wrapping is in degrees or radians.
:param callable innovation_fn: optional function :code:`(y_n, pred)` returning the innovation :code:`y_n - pred`,
where :code:`pred = C @ xhat_`. When :code:`None` (default), standard subtraction is used. Use this to handle
circular quantities, e.g. :code:`lambda y, pred: (y - pred + np.pi) % (2*np.pi) - np.pi` for angular measurements in radians.

:return: - **xhat_pre** (np.array) -- a priori estimates of xhat, with axis=0 the batch dimension, so xhat[n] gets the nth step
- **xhat_post** (np.array) -- a posteriori estimates of xhat
Expand Down Expand Up @@ -62,9 +60,7 @@ def kalman_filter(y, xhat0, P0, A, Q, C, R, B=None, u=None, save_P=True,
P = P_.copy()
if not np.isnan(y[n]): # handle missing data
K = P_ @ C.T @ np.linalg.inv(C @ P_ @ C.T + R)
innovation = y[n] - C @ xhat_
if circular_vars is not None and circular_vars is not False:
innovation[0] = wrap_angle(innovation[0], circular_units)
innovation = innovation_fn(y[n], C @ xhat_) if innovation_fn is not None else y[n] - C @ xhat_
xhat += K @ innovation
P -= K @ C @ P_
# the [n]th index of pre variables holds _{n|n-1} info; the [n]th index of post variables holds _{n|n} info
Expand Down Expand Up @@ -102,8 +98,7 @@ def rts_smooth(A, xhat_pre, xhat_post, P_pre, P_post, compute_P_smooth=True):
return xhat_smooth if not compute_P_smooth else (xhat_smooth, P_smooth)


def rtsdiff(x, dt_or_t, order, log_qr_ratio, forwardbackward, axis=0,
circular_vars=None, circular_units='rad'):
def rtsdiff(x, dt_or_t, order, log_qr_ratio, forwardbackward, axis=0, circular=False):
"""Perform Rauch-Tung-Striebel smoothing with a naive constant derivative model. Makes use of :code:`kalman_filter`
and :code:`rts_smooth`, which are made public. :code:`constant_X` methods in this module call this function.

Expand All @@ -118,24 +113,16 @@ def rtsdiff(x, dt_or_t, order, log_qr_ratio, forwardbackward, axis=0,
:param bool forwardbackward: indicates whether to run smoother forwards and backwards
(usually achieves better estimate at end points)
:param int axis: data dimension along which differentiation is performed
:param list[bool] circular_vars: set list element to bool for any axes of x that are a circular (angular) variable
that is wrapped. This will use a circular innovation calculation for the Kalman filter. The smoothed result
will be returned in an unwrapped form.
:param string circular_units: 'rad' or 'deg' to specify whether wrapping is in degrees or radians.
:param bool circular: if :code:`True`, treat the measured quantity as a circular variable in radians, wrapping
the innovation to :math:`[-\\pi, \\pi]`. The input :code:`x` must be in radians; convert degrees with
:code:`np.deg2rad` first. Default :code:`False`.

:return: - **x_hat** (np.array) -- estimated (smoothed) x, same shape as input :code:`x`
- **dxdt_hat** (np.array) -- estimated derivative of x, same shape as input :code:`x`
"""
N = x.shape[axis]
if not np.isscalar(dt_or_t) and N != len(dt_or_t):
raise ValueError("If `dt_or_t` is given as array-like, must have same length as x along `axis`.")

# turn circular_vars into something with the same shape as the number of differentiated axes in x
if len(x.shape) > 1:
n = int(np.prod(x.shape[:axis] + x.shape[axis+1:]))
circular_vars = ensure_iterable(circular_vars, n)
else:
circular_vars = ensure_iterable(circular_vars, 1)

q = 10**int(log_qr_ratio/2) # even-ish split of the powers across 0
r = q/(10**log_qr_ratio)
Expand All @@ -160,24 +147,24 @@ def rtsdiff(x, dt_or_t, order, log_qr_ratio, forwardbackward, axis=0,
Q_d[n] = eM[:order+1, order+1:] @ A_d[n].T
if forwardbackward: A_d_bwd = np.linalg.inv(A_d[::-1]) # properly broadcasts, taking inv of each stacked 2D array

innovation_fn = (lambda y, pred: (y - pred + np.pi) % (2*np.pi) - np.pi) if circular else None # wrap innovation to [-pi, pi] for circular variables

x_hat = np.empty_like(x); dxdt_hat = np.empty_like(x)
if forwardbackward: w = np.linspace(0, 1, N) # weights used to combine forward and backward results

for i, vec_idx in enumerate(np.ndindex(x.shape[:axis] + x.shape[axis+1:])): # works properly for 1D case too
for vec_idx in np.ndindex(x.shape[:axis] + x.shape[axis+1:]): # works properly for 1D case too
s = vec_idx[:axis] + (slice(None),) + vec_idx[axis:] # for indexing the vector we wish to differentiate
xhat0 = np.zeros(order+1); xhat0[0] = x[s][0] if not np.isnan(x[s][0]) else 0 # The first estimate is the first seen state. See #110

xhat_pre, xhat_post, P_pre, P_post = kalman_filter(x[s], xhat0, P0, A_d, Q_d, C, R,
circular_vars=circular_vars[i], circular_units=circular_units)
xhat_pre, xhat_post, P_pre, P_post = kalman_filter(x[s], xhat0, P0, A_d, Q_d, C, R, innovation_fn=innovation_fn)
xhat_smooth = rts_smooth(A_d, xhat_pre, xhat_post, P_pre, P_post, compute_P_smooth=False)
x_hat[s] = xhat_smooth[:,0] # first dimension is time, so slice first and second states at all times
dxdt_hat[s] = xhat_smooth[:,1]

if forwardbackward:
xhat0[0] = x[s][-1] if not np.isnan(x[s][-1]) else 0
xhat_pre, xhat_post, P_pre, P_post = kalman_filter(x[s][::-1], xhat0, P0, A_d_bwd,
Q_d if Q_d.ndim == 2 else Q_d[::-1], C, R,
circular_vars=circular_vars[i], circular_units=circular_units) # Use same Q matrices as before, because noise should still grow in reverse time
Q_d if Q_d.ndim == 2 else Q_d[::-1], C, R, innovation_fn=innovation_fn) # Use same Q matrices as before, because noise should still grow in reverse time
xhat_smooth = rts_smooth(A_d_bwd, xhat_pre, xhat_post, P_pre, P_post, compute_P_smooth=False)

x_hat[s] = x_hat[s] * w + xhat_smooth[:, 0][::-1] * (1-w)
Expand Down
18 changes: 18 additions & 0 deletions pynumdiff/tests/test_diff_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,24 @@ def test_multidimensionality(multidim_method_and_params, request):
legend = ax3.legend(bbox_to_anchor=(0.7, 0.8)); legend.legend_handles[0].set_facecolor(pyplot.cm.viridis(0.6))
fig.suptitle(f'{diff_method.__name__}', fontsize=16)

def test_circular_rtsdiff():
"""Ensure rtsdiff with circular=True correctly differentiates a wrapping angle signal in radians"""
np.random.seed(42)
N = 200
dt_circ = 0.05
t_circ = np.arange(N) * dt_circ
true_dtheta = 2.0 # constant angular velocity in rad/s
theta_true = true_dtheta * t_circ # linearly increasing angle, crosses 2*pi boundaries
theta_noisy = np.angle(np.exp(1j * (theta_true + 0.1 * np.random.randn(N)))) # wrap to [-pi, pi] and add noise

_, dxdt_hat = rtsdiff(theta_noisy, dt_circ, order=1, log_qr_ratio=1, forwardbackward=True, circular=True)

# The interior of the signal (away from endpoints) should recover the true angular velocity well
interior = slice(10, N-10)
rmse = np.sqrt(np.mean((dxdt_hat[interior] - true_dtheta)**2))
assert rmse < 0.5, f"RMSE of angular velocity estimate too large: {rmse:.3f} rad/s"


# List of methods that can handle missing values
nan_methods_and_params = [
(splinediff, {'degree': 5, 's': 2}),
Expand Down
49 changes: 1 addition & 48 deletions pynumdiff/utils/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from scipy.ndimage import convolve1d



def huber_const(M):
"""Scale that makes :code:`sum(huber())` interpolate :math:`\\sqrt{2}\\|\\cdot\\|_1` and :math:`\\frac{1}{2}\\|\\cdot\\|_2^2`,
from https://jmlr.org/papers/volume14/aravkin13a/aravkin13a.pdf, with correction for missing sqrt. Here :code:`huber`
Expand Down Expand Up @@ -202,51 +203,3 @@ def peakdet(x, delta, t=None):
return np.array(maxtab), np.array(mintab)


def wrap_angle(angle, units='rad', range='symmetric'):
"""Wrap an angle to a specified range.

:param np.array angle: angular values
:param string units: either 'rad' or 'deg'
:param string range: either 'symmetric' for [-pi, pi] / [-180, 180],
or 'positive' for [0, 2pi] / [0, 360]

:return: - **angle** -- the angular values wrapped as requested
"""
if units == 'rad':
period = 2 * np.pi
if range == 'symmetric':
return (angle + np.pi) % period - np.pi
elif range == 'positive':
return angle % period
else:
raise ValueError(f"Invalid range '{range}'. Expected 'symmetric' or 'positive'.")

elif units == 'deg':
period = 360.
if range == 'symmetric':
return (angle + 180.) % period - 180.
elif range == 'positive':
return angle % period
else:
raise ValueError(f"Invalid range '{range}'. Expected 'symmetric' or 'positive'.")

else:
raise ValueError(f"Invalid units '{units}'. Expected 'rad' or 'deg'.")


def ensure_iterable(v, length):
"""Ensure v is a list of the specified length.

If v is not iterable (e.g. a scalar), it is broadcast
into a list by repeating it `length` times. If it is already iterable,
it is returned as-is.

:param v: a scalar or iterable
:param int length: desired length of the output list when broadcasting a scalar
:return: v repeated `length` times if scalar, otherwise unchanged

:return: - **v** -- list or iterable
"""
if not hasattr(v, '__iter__'):
return [v] * length
return v
Loading