We will now discuss the notion of a "warping path." For more theoretical details, click here to see a writeup I created for my algorithms class
import numpy as np
import matplotlib.pyplot as plt
import librosa
import IPython.display as ipd
import linmdtw # pip install linmdtw
def get_csm_fast(Y1, Y2):
Y1Sqr = np.sum(Y1**2, 1)
Y2Sqr = np.sum(Y2**2, 1)
D = Y1Sqr[:, None] + Y2Sqr[None, :] - 2*Y1.dot(Y2.T)
D[D < 0] = 0
D = np.sqrt(D)
return D
N = 100
t = np.linspace(0, 1, N)
Y1 = np.zeros((N, 2))
Y1[:, 0] = np.cos(2*np.pi*t)
Y1[:, 1] = np.sin(4*np.pi*t)
Y2 = np.zeros((N, 2))
Y2[:, 0] = 1.3*np.cos(2*np.pi*(t**2))
Y2[:, 1] = 1.3*np.sin(4*np.pi*(t**2))+0.1
Below is code to compute a warping path between Y1 and Y2, using the linmdtw library
# Compute the warping path
path = linmdtw.linmdtw(Y1, Y2)
path = np.array(path)
# Setup a colormap for the path
timecmap = 'afmhot'
c = plt.get_cmap(timecmap)
C = c(np.array(np.round(np.linspace(0, 255, path.shape[0])), dtype=np.int32))
C = C[:, 0:3]
Below is a plot of the cross-similarity matrix and the warping path between the two figure 8s
CSM = get_csm_fast(Y1, Y2)
c1 = 'Reds'
c2 = 'Blues'
plt.figure(figsize=(14, 5))
plt.subplot(121)
plt.title('Curve Examples')
plt.scatter(Y1[:, 0], Y1[:, 1], 30, np.arange(N), cmap=c1, edgecolor = 'none')
plt.scatter(Y2[:, 0], Y2[:, 1], 30, np.arange(N), cmap=c2, edgecolor = 'none')
for idx in range(path.shape[0]):
i, j = path[idx, :]
plt.plot([Y1[i, 0], Y2[j, 0]], [Y1[i, 1], Y2[j, 1]], c=C[idx, :])
ax = plt.gca()
ax.set_facecolor((0.7, 0.7, 0.7))
plt.axis('equal')
plt.subplot(122)
plt.imshow(CSM, interpolation = 'none', cmap='gray_r')
plt.colorbar()
plt.scatter(-2*np.ones(N), np.arange(N), 20, np.arange(N), cmap=c1, edgecolor = 'none')
plt.scatter(np.arange(N), -2*np.ones(N), 20, np.arange(N), cmap=c2, edgecolor = 'none')
plt.scatter(path[:, 1], path[:, 0], c=C)
plt.xlabel('Time Index on Right Curve')
plt.ylabel('Time Index on Left Curve')
A warping path is a sequence of correspondences [i, j] between a point $i$ in the first point cloud and a point $j$ in the second point cloud satisfying the following properties:
The first pair is [0, 0]; in other words we always match the first point in the first point cloud to the first point in the second point cloud
The last pair is [M-1, N-1]; we also match the last points
Let $p_k$ = [i, j] be the kth correspondence in the warping path in a sequence 1, 2, ..., k, ..., P, for P total correspondences in the warping path. There restrictions on what $p_{k+1}$ can be. It can be
Just as this framework applies to time-ordered point clouds in 2D, it also applies to audio. If we summarize each window with MFCC features, for example, we have 20 numbers that summarize that window. We can think of this as tracing out a path in 20-dimensional space, and we can compute a cross-similarity matrix and a warping path just as before. The bottom line is that if there's a small distance between two windows summarized this way, then the windows summarize perceptually similar audio. If there is a large distance, then the windows captured dissimilar audio. We want to find a warping path that matches similar clips of audio in sequence, starting with audio that may not line up very well, as shown below
x1, sr = librosa.load("vivaldi1.wav")
x2, sr = librosa.load("vivaldi2.wav")
xboth = np.zeros( (max(len(x1), len(x2)), 2) )
xboth[0:len(x1), 0] = x1
xboth[0:len(x2), 1] = x2
ipd.Audio([xboth[:, 0], xboth[:, 1]], rate=sr)
def timemap_stretch(x, sr, path, hop_length=32, n_fft = 4096):
"""
Stretch audio x so that it aligns with another
audio clip, according to a warping path
Parameters
----------
x: ndarray(N)
An array of audio samples
sr: int
Sample rate
path: ndarray(K, 2)
Warping path. Indices of x are in first row
hop_length: int
Hop length to use in the phase vocoder
n_fft: int
Number of fft samples to use in the phase vocoder
"""
# Break down into regions of constant slope
xdiff = path[1::, 0] - path[0:-1, 0]
ydiff = path[1::, 1] - path[0:-1, 1]
xdiff = xdiff[1::] - xdiff[0:-1]
ydiff = ydiff[1::] - ydiff[0:-1]
diff = xdiff + ydiff
ret = np.array([])
i1 = 0
while i1 < len(diff):
i2 = i1+1
while i2 < len(diff) and diff[i2] == 0:
i2 += 1
while i2 < len(diff) and path[i2, 0] - path[i1, 0] < n_fft:
i2 += 1
if i2 >= len(diff):
break
fac = (path[i2, 1]-path[i1, 1])/(path[i2, 0]-path[i1, 0])
if fac > 0:
fac = 1/fac
xi = x[path[i1, 0]:path[i2, 0]+1]
D = librosa.stft(xi, n_fft = n_fft, hop_length=hop_length)
DNew = librosa.phase_vocoder(D, fac, hop_length=hop_length)
xifast = librosa.istft(DNew, hop_length=hop_length)
ret = np.concatenate((ret, xifast))
i1 = i2
return ret
def stretch_audio(x1, x2, sr, path, hop_length):
"""
Wrap around pyrubberband to warp one audio stream
to another, according to some warping path
Parameters
----------
x1: ndarray(M)
First audio stream
x2: ndarray(N)
Second audio stream
sr: int
Sample rate
path: ndarray(P, 2)
Warping path, in units of windows
hop_length: int
The hop length between windows
Returns
-------
x3: ndarray(N, 2)
The synchronized audio. x2 is in the right channel,
and x1 stretched to x2 is in the left channel
"""
print("Stretching...")
path_final = np.array(path)*hop_length
path_final = [(row[0], row[1]) for row in path_final if row[0] < x1.size and row[1] < x2.size]
path_final.append((x1.size, x2.size))
path_final = np.array(path_final, dtype=int)
x3 = np.zeros((x2.size, 2))
x3[:, 1] = x2
x1_stretch = timemap_stretch(x1, sr, path_final)
print("x1.shape = ", x1.shape)
print("x2.shape = ", x2.shape)
print("x1_stretch.shape = ", x1_stretch.shape)
x1_stretch = x1_stretch[0:min(x1_stretch.size, x3.shape[0])]
x3 = x3[0:min(x3.shape[0], x1_stretch.size), :]
x3[:, 0] = x1_stretch
return x3
Let's listen to the results of an optimal warping path, where we allow one of the audio clips to speed up or slow down to respect the correspondences of that warping path. We hear that the audio is aligned quite nicely!
hop_length = 512
Y1 = librosa.feature.mfcc(y=x1, sr=sr, hop_length=hop_length).T # A "transpose" flips the rows and the columns
Y2 = librosa.feature.mfcc(y=x2, sr=sr, hop_length=hop_length).T # So that each window is along a row
D = get_csm_fast(Y1, Y2)
path = linmdtw.linmdtw(Y1, Y2)
plt.imshow(D, cmap='magma_r', aspect='auto')
plt.scatter(path[:, 1], path[:, 0])
xsync = stretch_audio(x1, x2, sr, path, hop_length)
ipd.Audio([xsync[:, 0], xsync[:, 1]], rate=sr)