Optimal Warping Path Costs via Dynamic Programing

aka "Dynamic Time Warping"

For more information on the theory behind algorithms like this, check out this module and this module from my algorithms class

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import librosa
import IPython.display as ipd

def get_csm_fast(Y1, Y2):
    Y1Sqr = np.sum(Y1**2, 1)
    Y2Sqr = np.sum(Y2**2, 1)
    D = Y1Sqr[:, None] + Y2Sqr[None, :] - 2*Y1.dot(Y2.T)
    D[D < 0] = 0
    D = np.sqrt(D)
    return D

N = 100
t = np.linspace(0, 1, N)
Y1 = np.zeros((N, 2))
Y1[:, 0] = np.cos(2*np.pi*t)
Y1[:, 1] = np.sin(4*np.pi*t)
Y2 = np.zeros((N, 2))
Y2[:, 0] = 1.3*np.cos(2*np.pi*(t**2))
Y2[:, 1] = 1.3*np.sin(4*np.pi*(t**2))+0.1    

Lemma: Dynamic Programming Subproblem Cost

Let's consider a cross-similarity matrix $C$. Let's consider the minimum cost warping path that starts at $[0, 0]$ and ends at $[i, j]$. We'll call the optimal cost over all such warping paths $S_{ij}$. Then the following recurrence relation can be used to compute $S_{ij}$

$S_{ij} = \min \left\{ \begin{array}{c} S_{i-1, j} \\ S_{i, j-1} \\ S_{i-1, j-1} \end{array} \right\} + C_{ij} $

What I'm looking for in the end is $S_{M-1, N-1}$, which will give me the optimal cost of aligning the entire first point cloud to entire second point cloud. To do this, I have to fill in the entire $S$ table row by row, and then I can look at the lower right element of $S$

In [6]:
C = get_csm_fast(Y1, Y2)
M = C.shape[0] # Number of rows
N = C.shape[1] # Number of columns
S = np.zeros_like(C)
S[0, 0] = C[0, 0] # Base case
# First row; all warping paths that only match a single element of the first point cloud
# to some number of elements in the second point cloud
"""
# Slower pure python code to do the cumulative sum would look like this
for j in range(1, N):
    S[0, j] = S[0, j-1] + C[0, j]
"""
S[0, :] = np.cumsum(C[0, :])

# First column; all warping paths that only match a single element of the second point cloud
# to some number of elements in the first point cloud
S[:, 0] = np.cumsum(C[:, 0])

# For the rest of the elements, we can apply the recurrence relation above
for i in range(1, M):
    for j in range(1, N):
        S[i, j] = min(S[i-1, j], S[i, j-1], S[i-1, j-1]) + C[i, j]
In [4]:
plt.figure(figsize=(12, 6))
plt.subplot(121)
plt.imshow(C, cmap='magma_r')
plt.colorbar()
plt.title("Cross-Similarity Matrix")
plt.subplot(122)
plt.imshow(S, cmap='magma_r')
plt.title("Cumulative Sum Matrix")
plt.colorbar()
print(S[-1, -1])
30.489092600824907

Just to show you that this gives the correct answer, let's compare it to the cost we get from the warping path that linmdtw gives us

In [5]:
import linmdtw
path = linmdtw.linmdtw(Y1, Y2)
cost = 0
for i, j in path:
    cost += C[i, j]
print(cost)
30.489092600824907
/home/ctralie/anaconda3/lib/python3.7/site-packages/linmdtw-0.1.4-py3.7-linux-x86_64.egg/linmdtw/dtw.py:40: UserWarning: X is not 32-bit, so creating 32-bit version
  warnings.warn("X is not 32-bit, so creating 32-bit version")
/home/ctralie/anaconda3/lib/python3.7/site-packages/linmdtw-0.1.4-py3.7-linux-x86_64.egg/linmdtw/dtw.py:46: UserWarning: Y is not 32-bit, so creating 32-bit version
  warnings.warn("Y is not 32-bit, so creating 32-bit version")
In [ ]: