Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pynumdiff/tests/test_diff_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ..finite_difference import finitediff, first_order, second_order, fourth_order
from ..polynomial_fit import polydiff, savgoldiff, splinediff
from ..basis_fit import spectraldiff, rbfdiff
from ..total_variation_regularization import velocity, acceleration, jerk, iterative_velocity, smooth_acceleration
from ..total_variation_regularization import velocity, acceleration, jerk, iterative_velocity, smooth_acceleration, tvrdiff
from ..kalman_smooth import rtsdiff, constant_velocity, constant_acceleration, constant_jerk, robustdiff
from ..linear_model import lineardiff
# Function aliases for testing cases where parameters change the behavior in a big way, so error limits can be indexed in dict
Expand Down Expand Up @@ -331,7 +331,8 @@ def test_diff_method(diff_method_and_params, test_func_and_deriv, request): # re
(spectraldiff, {'high_freq_cutoff': 0.25, 'pad_to_zero_dxdt': False}),
(rbfdiff, {'sigma': 0.5, 'lmbd': 1e-6}),
(splinediff, {'degree': 9, 's': 1e-6}),
(robustdiff, {'order':2, 'log_q':7, 'log_r':2})
(robustdiff, {'order':2, 'log_q':7, 'log_r':2}),
(tvrdiff, {'order': 3, 'gamma': 1e-4})
]

# Similar to the error_bounds table, index by method first. But then we test against only one 2D function,
Expand All @@ -348,7 +349,8 @@ def test_diff_method(diff_method_and_params, test_func_and_deriv, request): # re
spectraldiff: [(2, 1), (3, 2)], # lot of Gibbs ringing in 2nd order derivatives along t1 with t_1^2 sin(3 pi t_2 / 2)
rbfdiff: [(0, -1), (1, 0)],
splinediff: [(0, -1), (1, 0)],
robustdiff: [(-2, -3), (0, -1)]
robustdiff: [(-2, -3), (0, -1)],
tvrdiff: [(0, -1), (1, 0)]
}

@mark.parametrize("multidim_method_and_params", multidim_methods_and_params)
Expand Down
90 changes: 50 additions & 40 deletions pynumdiff/total_variation_regularization.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def iterative_velocity(x, dt, params=None, options=None, num_iterations=None, ga
return x_hat, dxdt_hat


def tvrdiff(x, dt, order, gamma, huberM=float('inf'), solver=None):
def tvrdiff(x, dt, order, gamma, huberM=float('inf'), solver=None, axis=0):
"""Generalized total variation regularized derivatives. Use convex optimization (cvxpy) to solve for a
total variation regularized derivative. Other convex-solver-based methods in this module call this function.

Expand All @@ -66,49 +66,59 @@ def tvrdiff(x, dt, order, gamma, huberM=float('inf'), solver=None):
:math:`M = 0` reduces to :math:`\\ell_1` loss, which seeks sparse residuals.
:param str solver: Solver to use. Solver options include: 'MOSEK', 'CVXOPT', 'CLARABEL', 'ECOS'.
If not given, fall back to CVXPY's default.
:param int axis: data dimension along which differentiation is performed

:return: - **x_hat** (np.array) -- estimated (smoothed) x
- **dxdt_hat** (np.array) -- estimated derivative of x
"""
# Normalize for numerical consistency with convex solver
mu = np.mean(x)
sigma = median_abs_deviation(x, scale='normal') # robust alternative to std()
if sigma == 0: sigma = 1 # safety guard
y = (x-mu)/sigma

# Define the variables for the highest order derivative and the integration constants
deriv_values = cvxpy.Variable(len(y)) # values of the order^th derivative, in which we're penalizing variation
integration_constants = cvxpy.Variable(order) # constants of integration that help get us back to x

# Recursively integrate the highest order derivative to get back to the position. This is a first-
# order scheme, but it's very fast and tends to do not markedly worse than 2nd order. See #116
# I also tried a trapezoidal integration rule here, and it works no better. See #116 too.
hx = deriv_values # variables are integrated to produce the signal estimate variables, \hat{x} in the math
for i in range(order):
hx = cvxpy.cumsum(hx) + integration_constants[i] # cumsum is like integration assuming dt = 1

# Compare the recursively integrated position to the noisy position. \ell_2 doesn't get scaled by 1/2 here,
# so cvxpy's doubled Huber is already the right scale, and \ell_1 should be scaled by 2\sqrt{2} to match.
fidelity_cost = cvxpy.sum_squares(y - hx) if huberM == float('inf') \
else np.sqrt(8)*cvxpy.norm(y - hx, 1) if huberM == 0 \
else utility.huber_const(huberM)*cvxpy.sum(cvxpy.huber(y - hx, huberM)) # data is already scaled, so M rather than M*sigma
# Set up and solve the optimization problem
prob = cvxpy.Problem(cvxpy.Minimize(fidelity_cost + gamma*cvxpy.sum(cvxpy.tv(deriv_values)) ))
prob.solve(solver=solver)

# Recursively integrate the final derivative values to get back to the function and derivative values
v = deriv_values.value
for i in range(order-1): # stop one short to get the first derivative
v = np.cumsum(v) + integration_constants.value[i]
dxdt_hat = v/dt # v only holds the dx values; to get deriv scale by dt
x_hat = np.cumsum(v) + integration_constants.value[order-1] # smoothed data

# Due to the first-order nature of the derivative, it has a slight lag. Average together every two values
# to better center the answer. But this leaves us one-short, so devise a good last value.
dxdt_hat = (dxdt_hat[:-1] + dxdt_hat[1:])/2
dxdt_hat = np.hstack((dxdt_hat, 2*dxdt_hat[-1] - dxdt_hat[-2])) # last value = penultimate value [-1] + diff between [-1] and [-2]

return x_hat*sigma+mu, dxdt_hat*sigma # derivative is linear, so scale derivative by scatter
x_hat = np.empty_like(x); dxdt_hat = np.empty_like(x)

for vec_idx in np.ndindex(x.shape[:axis] + x.shape[axis+1:]):
s = vec_idx[:axis] + (slice(None),) + vec_idx[axis:] # for indexing this iteration's vector in the overall array
x_v = x[s]

# Normalize for numerical consistency with convex solver
mu = np.mean(x_v)
sigma = median_abs_deviation(x_v, scale='normal') # robust alternative to std()
if sigma == 0: sigma = 1 # safety guard
y = (x_v-mu)/sigma

# Define the variables for the highest order derivative and the integration constants
deriv_values = cvxpy.Variable(len(y)) # values of the order^th derivative, in which we're penalizing variation
integration_constants = cvxpy.Variable(order) # constants of integration that help get us back to x

# Recursively integrate the highest order derivative to get back to the position. This is a first-
# order scheme, but it's very fast and tends to do not markedly worse than 2nd order. See #116
# I also tried a trapezoidal integration rule here, and it works no better. See #116 too.
hx = deriv_values # variables are integrated to produce the signal estimate variables, \hat{x} in the math
for i in range(order):
hx = cvxpy.cumsum(hx) + integration_constants[i] # cumsum is like integration assuming dt = 1

# Compare the recursively integrated position to the noisy position. \ell_2 doesn't get scaled by 1/2 here,
# so cvxpy's doubled Huber is already the right scale, and \ell_1 should be scaled by 2\sqrt{2} to match.
fidelity_cost = cvxpy.sum_squares(y - hx) if huberM == float('inf') \
else np.sqrt(8)*cvxpy.norm(y - hx, 1) if huberM == 0 \
else utility.huber_const(huberM)*cvxpy.sum(cvxpy.huber(y - hx, huberM)) # data is already scaled, so M rather than M*sigma
# Set up and solve the optimization problem
prob = cvxpy.Problem(cvxpy.Minimize(fidelity_cost + gamma*cvxpy.sum(cvxpy.tv(deriv_values)) ))
prob.solve(solver=solver)

# Recursively integrate the final derivative values to get back to the function and derivative values
v = deriv_values.value
for i in range(order-1): # stop one short to get the first derivative
v = np.cumsum(v) + integration_constants.value[i]
dxdt_hat_v = v/dt # v only holds the dx values; to get deriv scale by dt
x_hat_v = np.cumsum(v) + integration_constants.value[order-1] # smoothed data

# Due to the first-order nature of the derivative, it has a slight lag. Average together every two values
# to better center the answer. But this leaves us one-short, so devise a good last value.
dxdt_hat_v = (dxdt_hat_v[:-1] + dxdt_hat_v[1:])/2
dxdt_hat_v = np.hstack((dxdt_hat_v, 2*dxdt_hat_v[-1] - dxdt_hat_v[-2])) # last value = penultimate value [-1] + diff between [-1] and [-2]

x_hat[s] = x_hat_v*sigma+mu
dxdt_hat[s] = dxdt_hat_v*sigma # derivative is linear, so scale derivative by scatter

return x_hat, dxdt_hat


def velocity(x, dt, params=None, options=None, gamma=None, solver=None):
Expand Down
Loading