Cross-Similarity Matrices

Time-ordered point clouds

A time-ordered point cloud inclues samples of a curve in a $d$ dimensional space, ordered by time. The convention we will follow in this class is that each point is a row of a matrix, and the columns of the matrix each hold a different dimension

Ex) Figure 8 Curve (Special case of a "Lissajous Curve")

Let's consider two different time-ordered point clouds sampling the following parameterized curve

$x = \cos(2 \pi t), y = \sin(4 \pi t)$

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import librosa
import IPython.display as ipd

N = 100
t = np.linspace(0, 1, N)
Y1 = np.zeros((N, 2))
Y1[:, 0] = np.cos(2*np.pi*t)
Y1[:, 1] = np.sin(4*np.pi*t)
Y2 = np.zeros((N, 2))
Y2[:, 0] = 1.3*np.cos(2*np.pi*(t**2))
Y2[:, 1] = 1.3*np.sin(4*np.pi*(t**2))+0.1    

plt.figure(figsize=(10, 5))
plt.subplot(121)
plt.scatter(Y1[:, 0], Y1[:, 1], c = np.arange(N), cmap='Reds')
plt.subplot(122)
plt.scatter(Y2[:, 0], Y2[:, 1], c = np.arange(N), cmap='Blues')
Out[2]:
<matplotlib.collections.PathCollection at 0x7fe81d14d250>

Cross-Similarity Matrices

A cross-similarity matrix $C$ between a time-ordered point cloud $X$ with $M$ points and a time-ordered point cloud $Y$ with $N$ points is a $M \times N$ matrix $D$ so that

$D_{ij} = || X_i - Y_j ||_2$

In particular

$D_{ij} = \sqrt{ \sum_{k = 1}^d (X_{ik} - Y_{jk})^2}$

Way 1: Mixed Python/Numpy CSM Computation

In [4]:
def get_csm_pythonic(X, Y):
    M = X.shape[0]
    N = Y.shape[0]
    D = np.zeros((M, N))
    for i in range(M):
        for j in range(N):
            D[i, j] = np.sqrt(np.sum( (X[i, :]-Y[j, :])**2 ))
    return D

Way 2: Super Fast Numpy Only CSM Computation with Broadcasting

In [5]:
def get_csm_fast(Y1, Y2):
    Y1Sqr = np.sum(Y1**2, 1)
    Y2Sqr = np.sum(Y2**2, 1)
    D = Y1Sqr[:, None] + Y2Sqr[None, :] - 2*Y1.dot(Y2.T)
    D[D < 0] = 0
    D = np.sqrt(D)
    return D

Below is the cross-similarity matrix between the two different time-ordered point clouds for the figure 8s

In [13]:
C = get_csm_fast(Y1, Y2)

c1 = 'Reds'
c2 = 'Blues'

plt.figure(figsize=(10, 5))

plt.subplot(121)
plt.title('Curve Examples')
plt.scatter(Y1[:, 0], Y1[:, 1], 30, np.arange(N), cmap=c1, edgecolor = 'none')
plt.scatter(Y2[:, 0], Y2[:, 1], 30, np.arange(N), cmap=c2, edgecolor = 'none')
ax = plt.gca()
ax.set_facecolor((0.7, 0.7, 0.7))
plt.axis('equal')


plt.subplot(122)
plt.imshow(C, interpolation = 'none', cmap='magma_r')
plt.colorbar()
plt.scatter(-2*np.ones(N), np.arange(N), 20, np.arange(N), cmap=c1, edgecolor = 'none')
plt.scatter(np.arange(N), -2*np.ones(N), 20, np.arange(N), cmap=c2, edgecolor = 'none')

plt.xlabel('Time Index on Right Curve')
plt.ylabel('Time Index on Left Curve')
Out[13]:
Text(0, 0.5, 'Time Index on Left Curve')

MFCC CSM

In [ ]:
x1, sr = librosa.load("vivaldi1.wav")
x2, sr = librosa.load("vivaldi2.wav")

xboth = np.zeros( (max(len(x1), len(x2)), 2) )
xboth[0:len(x1), 0] = x1
xboth[0:len(x2), 1] = x2

ipd.Audio([xboth[:, 0], xboth[:, 1]], rate=sr)
In [ ]:
def timemap_stretch(x, sr, path, hop_length=32, n_fft = 4096):
    """
    Stretch audio x so that it aligns with another
    audio clip, according to a warping path

    Parameters
    ----------
    x: ndarray(N)
        An array of audio samples
    sr: int
        Sample rate
    path: ndarray(K, 2)
        Warping path.  Indices of x are in first row
    hop_length: int
        Hop length to use in the phase vocoder
    n_fft: int
        Number of fft samples to use in the phase vocoder
    """
    # Break down into regions of constant slope
    xdiff = path[1::, 0] - path[0:-1, 0]
    ydiff = path[1::, 1] - path[0:-1, 1]
    xdiff = xdiff[1::] - xdiff[0:-1]
    ydiff = ydiff[1::] - ydiff[0:-1]
    diff = xdiff + ydiff
    ret = np.array([])
    i1 = 0
    while i1 < len(diff):
        i2 = i1+1
        while i2 < len(diff) and diff[i2] == 0:
            i2 += 1
        while i2 < len(diff) and path[i2, 0] - path[i1, 0] < n_fft:
            i2 += 1
        if i2 >= len(diff):
            break
        fac = (path[i2, 1]-path[i1, 1])/(path[i2, 0]-path[i1, 0])
        if fac > 0:
            fac = 1/fac
            xi = x[path[i1, 0]:path[i2, 0]+1]
            D = librosa.stft(xi, n_fft = n_fft, hop_length=hop_length)
            DNew = librosa.phase_vocoder(D, fac, hop_length=hop_length)
            xifast = librosa.istft(DNew, hop_length=hop_length)
            ret = np.concatenate((ret, xifast))
        i1 = i2
    return ret


def stretch_audio(x1, x2, sr, path, hop_length):
    """
    Wrap around pyrubberband to warp one audio stream
    to another, according to some warping path

    Parameters
    ----------
    x1: ndarray(M)
        First audio stream
    x2: ndarray(N)
        Second audio stream
    sr: int
        Sample rate
    path: ndarray(P, 2)
        Warping path, in units of windows
    hop_length: int
        The hop length between windows
    
    Returns
    -------
    x3: ndarray(N, 2)
        The synchronized audio.  x2 is in the right channel,
        and x1 stretched to x2 is in the left channel
    """
    print("Stretching...")
    path_final = np.array(path)*hop_length
    path_final = [(row[0], row[1]) for row in path_final if row[0] < x1.size and row[1] < x2.size]
    path_final.append((x1.size, x2.size))
    path_final = np.array(path_final, dtype=int)
    x3 = np.zeros((x2.size, 2))
    x3[:, 1] = x2
    x1_stretch = timemap_stretch(x1, sr, path_final)
    print("x1.shape = ", x1.shape)
    print("x2.shape = ", x2.shape)
    print("x1_stretch.shape = ", x1_stretch.shape)
    x1_stretch = x1_stretch[0:min(x1_stretch.size, x3.shape[0])]
    x3 = x3[0:min(x3.shape[0], x1_stretch.size), :]
    x3[:, 0] = x1_stretch
    return x3
In [ ]:
hop_length = 512
Y1 = librosa.feature.mfcc(y=x1, sr=sr, hop_length=hop_length).T
Y2 = librosa.feature.mfcc(y=x2, sr=sr, hop_length=hop_length).T
D = get_csm_fast(Y1, Y2)

import linmdtw
path = linmdtw.linmdtw(Y1, Y2)

plt.imshow(D, cmap='magma_r', aspect='auto')
plt.scatter(path[:, 1], path[:, 0])

xsync = stretch_audio(x1, x2, sr, path, hop_length)
ipd.Audio([xsync[:, 0], xsync[:, 1]], rate=sr)
In [ ]: