Warping Paths

We will now discuss the notion of a "warping path." For more theoretical details, click here to see a writeup I created for my algorithms class

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import librosa
import IPython.display as ipd
import linmdtw # pip install linmdtw

def get_csm_fast(Y1, Y2):
    Y1Sqr = np.sum(Y1**2, 1)
    Y2Sqr = np.sum(Y2**2, 1)
    D = Y1Sqr[:, None] + Y2Sqr[None, :] - 2*Y1.dot(Y2.T)
    D[D < 0] = 0
    D = np.sqrt(D)
    return D
In [2]:
N = 100
t = np.linspace(0, 1, N)
Y1 = np.zeros((N, 2))
Y1[:, 0] = np.cos(2*np.pi*t)
Y1[:, 1] = np.sin(4*np.pi*t)
Y2 = np.zeros((N, 2))
Y2[:, 0] = 1.3*np.cos(2*np.pi*(t**2))
Y2[:, 1] = 1.3*np.sin(4*np.pi*(t**2))+0.1    

Below is code to compute a warping path between Y1 and Y2, using the linmdtw library

In [3]:
# Compute the warping path
path = linmdtw.linmdtw(Y1, Y2)
path = np.array(path)

# Setup a colormap for the path
timecmap = 'afmhot'
c = plt.get_cmap(timecmap)
C = c(np.array(np.round(np.linspace(0, 255, path.shape[0])), dtype=np.int32))
C = C[:, 0:3]
/home/ctralie/anaconda3/lib/python3.7/site-packages/linmdtw-0.1.4-py3.7-linux-x86_64.egg/linmdtw/dtw.py:40: UserWarning: X is not 32-bit, so creating 32-bit version
  warnings.warn("X is not 32-bit, so creating 32-bit version")
/home/ctralie/anaconda3/lib/python3.7/site-packages/linmdtw-0.1.4-py3.7-linux-x86_64.egg/linmdtw/dtw.py:46: UserWarning: Y is not 32-bit, so creating 32-bit version
  warnings.warn("Y is not 32-bit, so creating 32-bit version")

Below is a plot of the cross-similarity matrix and the warping path between the two figure 8s

In [4]:
CSM = get_csm_fast(Y1, Y2)
c1 = 'Reds'
c2 = 'Blues'

plt.figure(figsize=(14, 5))
plt.subplot(121)
plt.title('Curve Examples')
plt.scatter(Y1[:, 0], Y1[:, 1], 30, np.arange(N), cmap=c1, edgecolor = 'none')
plt.scatter(Y2[:, 0], Y2[:, 1], 30, np.arange(N), cmap=c2, edgecolor = 'none')
for idx in range(path.shape[0]):
    i, j = path[idx, :]
    plt.plot([Y1[i, 0], Y2[j, 0]], [Y1[i, 1], Y2[j, 1]], c=C[idx, :])
ax = plt.gca()
ax.set_facecolor((0.7, 0.7, 0.7))
plt.axis('equal')


plt.subplot(122)
plt.imshow(CSM, interpolation = 'none', cmap='gray_r')
plt.colorbar()
plt.scatter(-2*np.ones(N), np.arange(N), 20, np.arange(N), cmap=c1, edgecolor = 'none')
plt.scatter(np.arange(N), -2*np.ones(N), 20, np.arange(N), cmap=c2, edgecolor = 'none')
plt.scatter(path[:, 1], path[:, 0], c=C)

plt.xlabel('Time Index on Right Curve')
plt.ylabel('Time Index on Left Curve')
Out[4]:
Text(0, 0.5, 'Time Index on Left Curve')

A warping path is a sequence of correspondences [i, j] between a point $i$ in the first point cloud and a point $j$ in the second point cloud satisfying the following properties:

  1. The first pair is [0, 0]; in other words we always match the first point in the first point cloud to the first point in the second point cloud

  2. The last pair is [M-1, N-1]; we also match the last points

  3. Let $p_k$ = [i, j] be the kth correspondence in the warping path in a sequence 1, 2, ..., k, ..., P, for P total correspondences in the warping path. There restrictions on what $p_{k+1}$ can be. It can be

    • [i+1, j] Move forward along the first point cloud but stay still on the second one
    • [i, j+1] Move forward along the second point cloud but stay still on the first one
    • [i+1, j+1] Move forward by one step along both

MFCC CSM

Just as this framework applies to time-ordered point clouds in 2D, it also applies to audio. If we summarize each window with MFCC features, for example, we have 20 numbers that summarize that window. We can think of this as tracing out a path in 20-dimensional space, and we can compute a cross-similarity matrix and a warping path just as before. The bottom line is that if there's a small distance between two windows summarized this way, then the windows summarize perceptually similar audio. If there is a large distance, then the windows captured dissimilar audio. We want to find a warping path that matches similar clips of audio in sequence, starting with audio that may not line up very well, as shown below

In [5]:
x1, sr = librosa.load("vivaldi1.wav")
x2, sr = librosa.load("vivaldi2.wav")

xboth = np.zeros( (max(len(x1), len(x2)), 2) )
xboth[0:len(x1), 0] = x1
xboth[0:len(x2), 1] = x2

ipd.Audio([xboth[:, 0], xboth[:, 1]], rate=sr)
Out[5]:
In [6]:
def timemap_stretch(x, sr, path, hop_length=32, n_fft = 4096):
    """
    Stretch audio x so that it aligns with another
    audio clip, according to a warping path

    Parameters
    ----------
    x: ndarray(N)
        An array of audio samples
    sr: int
        Sample rate
    path: ndarray(K, 2)
        Warping path.  Indices of x are in first row
    hop_length: int
        Hop length to use in the phase vocoder
    n_fft: int
        Number of fft samples to use in the phase vocoder
    """
    # Break down into regions of constant slope
    xdiff = path[1::, 0] - path[0:-1, 0]
    ydiff = path[1::, 1] - path[0:-1, 1]
    xdiff = xdiff[1::] - xdiff[0:-1]
    ydiff = ydiff[1::] - ydiff[0:-1]
    diff = xdiff + ydiff
    ret = np.array([])
    i1 = 0
    while i1 < len(diff):
        i2 = i1+1
        while i2 < len(diff) and diff[i2] == 0:
            i2 += 1
        while i2 < len(diff) and path[i2, 0] - path[i1, 0] < n_fft:
            i2 += 1
        if i2 >= len(diff):
            break
        fac = (path[i2, 1]-path[i1, 1])/(path[i2, 0]-path[i1, 0])
        if fac > 0:
            fac = 1/fac
            xi = x[path[i1, 0]:path[i2, 0]+1]
            D = librosa.stft(xi, n_fft = n_fft, hop_length=hop_length)
            DNew = librosa.phase_vocoder(D, fac, hop_length=hop_length)
            xifast = librosa.istft(DNew, hop_length=hop_length)
            ret = np.concatenate((ret, xifast))
        i1 = i2
    return ret


def stretch_audio(x1, x2, sr, path, hop_length):
    """
    Wrap around pyrubberband to warp one audio stream
    to another, according to some warping path

    Parameters
    ----------
    x1: ndarray(M)
        First audio stream
    x2: ndarray(N)
        Second audio stream
    sr: int
        Sample rate
    path: ndarray(P, 2)
        Warping path, in units of windows
    hop_length: int
        The hop length between windows
    
    Returns
    -------
    x3: ndarray(N, 2)
        The synchronized audio.  x2 is in the right channel,
        and x1 stretched to x2 is in the left channel
    """
    print("Stretching...")
    path_final = np.array(path)*hop_length
    path_final = [(row[0], row[1]) for row in path_final if row[0] < x1.size and row[1] < x2.size]
    path_final.append((x1.size, x2.size))
    path_final = np.array(path_final, dtype=int)
    x3 = np.zeros((x2.size, 2))
    x3[:, 1] = x2
    x1_stretch = timemap_stretch(x1, sr, path_final)
    print("x1.shape = ", x1.shape)
    print("x2.shape = ", x2.shape)
    print("x1_stretch.shape = ", x1_stretch.shape)
    x1_stretch = x1_stretch[0:min(x1_stretch.size, x3.shape[0])]
    x3 = x3[0:min(x3.shape[0], x1_stretch.size), :]
    x3[:, 0] = x1_stretch
    return x3

Let's listen to the results of an optimal warping path, where we allow one of the audio clips to speed up or slow down to respect the correspondences of that warping path. We hear that the audio is aligned quite nicely!

In [7]:
hop_length = 512
Y1 = librosa.feature.mfcc(y=x1, sr=sr, hop_length=hop_length).T # A "transpose" flips the rows and the columns
Y2 = librosa.feature.mfcc(y=x2, sr=sr, hop_length=hop_length).T # So that each window is along a row
D = get_csm_fast(Y1, Y2)
path = linmdtw.linmdtw(Y1, Y2)

plt.imshow(D, cmap='magma_r', aspect='auto')
plt.scatter(path[:, 1], path[:, 0])

xsync = stretch_audio(x1, x2, sr, path, hop_length)
ipd.Audio([xsync[:, 0], xsync[:, 1]], rate=sr)
/home/ctralie/anaconda3/lib/python3.7/site-packages/linmdtw-0.1.4-py3.7-linux-x86_64.egg/linmdtw/dtw.py:43: UserWarning: X is not C-contiguous; creating a copy that is C-contiguous
  warnings.warn("X is not C-contiguous; creating a copy that is C-contiguous")
/home/ctralie/anaconda3/lib/python3.7/site-packages/linmdtw-0.1.4-py3.7-linux-x86_64.egg/linmdtw/dtw.py:49: UserWarning: Y is not C-contiguous; creating a copy that is C-contiguous
  warnings.warn("Y is not C-contiguous; creating a copy that is C-contiguous")
Stretching...
x1.shape =  (176400,)
x2.shape =  (198450,)
x1_stretch.shape =  (188928,)
Out[7]:
In [ ]: