A time-ordered point cloud inclues samples of a curve in a $d$ dimensional space, ordered by time. The convention we will follow in this class is that each point is a row of a matrix, and the columns of the matrix each hold a different dimension
import numpy as np
import matplotlib.pyplot as plt
import librosa
import IPython.display as ipd
N = 100
t = np.linspace(0, 1, N)
Y1 = np.zeros((N, 2))
Y1[:, 0] = np.cos(2*np.pi*t)
Y1[:, 1] = np.sin(4*np.pi*t)
Y2 = np.zeros((N, 2))
Y2[:, 0] = 1.3*np.cos(2*np.pi*(t**2))
Y2[:, 1] = 1.3*np.sin(4*np.pi*(t**2))+0.1
plt.figure(figsize=(10, 5))
plt.subplot(121)
plt.scatter(Y1[:, 0], Y1[:, 1], c = np.arange(N), cmap='Reds')
plt.subplot(122)
plt.scatter(Y2[:, 0], Y2[:, 1], c = np.arange(N), cmap='Blues')
def get_csm_pythonic(X, Y):
M = X.shape[0]
N = Y.shape[0]
D = np.zeros((M, N))
for i in range(M):
for j in range(N):
D[i, j] = np.sqrt(np.sum( (X[i, :]-Y[j, :])**2 ))
return D
def get_csm_fast(Y1, Y2):
Y1Sqr = np.sum(Y1**2, 1)
Y2Sqr = np.sum(Y2**2, 1)
D = Y1Sqr[:, None] + Y2Sqr[None, :] - 2*Y1.dot(Y2.T)
D[D < 0] = 0
D = np.sqrt(D)
return D
Below is the cross-similarity matrix between the two different time-ordered point clouds for the figure 8s
C = get_csm_fast(Y1, Y2)
c1 = 'Reds'
c2 = 'Blues'
plt.figure(figsize=(10, 5))
plt.subplot(121)
plt.title('Curve Examples')
plt.scatter(Y1[:, 0], Y1[:, 1], 30, np.arange(N), cmap=c1, edgecolor = 'none')
plt.scatter(Y2[:, 0], Y2[:, 1], 30, np.arange(N), cmap=c2, edgecolor = 'none')
ax = plt.gca()
ax.set_facecolor((0.7, 0.7, 0.7))
plt.axis('equal')
plt.subplot(122)
plt.imshow(C, interpolation = 'none', cmap='magma_r')
plt.colorbar()
plt.scatter(-2*np.ones(N), np.arange(N), 20, np.arange(N), cmap=c1, edgecolor = 'none')
plt.scatter(np.arange(N), -2*np.ones(N), 20, np.arange(N), cmap=c2, edgecolor = 'none')
plt.xlabel('Time Index on Right Curve')
plt.ylabel('Time Index on Left Curve')
x1, sr = librosa.load("vivaldi1.wav")
x2, sr = librosa.load("vivaldi2.wav")
xboth = np.zeros( (max(len(x1), len(x2)), 2) )
xboth[0:len(x1), 0] = x1
xboth[0:len(x2), 1] = x2
ipd.Audio([xboth[:, 0], xboth[:, 1]], rate=sr)
def timemap_stretch(x, sr, path, hop_length=32, n_fft = 4096):
"""
Stretch audio x so that it aligns with another
audio clip, according to a warping path
Parameters
----------
x: ndarray(N)
An array of audio samples
sr: int
Sample rate
path: ndarray(K, 2)
Warping path. Indices of x are in first row
hop_length: int
Hop length to use in the phase vocoder
n_fft: int
Number of fft samples to use in the phase vocoder
"""
# Break down into regions of constant slope
xdiff = path[1::, 0] - path[0:-1, 0]
ydiff = path[1::, 1] - path[0:-1, 1]
xdiff = xdiff[1::] - xdiff[0:-1]
ydiff = ydiff[1::] - ydiff[0:-1]
diff = xdiff + ydiff
ret = np.array([])
i1 = 0
while i1 < len(diff):
i2 = i1+1
while i2 < len(diff) and diff[i2] == 0:
i2 += 1
while i2 < len(diff) and path[i2, 0] - path[i1, 0] < n_fft:
i2 += 1
if i2 >= len(diff):
break
fac = (path[i2, 1]-path[i1, 1])/(path[i2, 0]-path[i1, 0])
if fac > 0:
fac = 1/fac
xi = x[path[i1, 0]:path[i2, 0]+1]
D = librosa.stft(xi, n_fft = n_fft, hop_length=hop_length)
DNew = librosa.phase_vocoder(D, fac, hop_length=hop_length)
xifast = librosa.istft(DNew, hop_length=hop_length)
ret = np.concatenate((ret, xifast))
i1 = i2
return ret
def stretch_audio(x1, x2, sr, path, hop_length):
"""
Wrap around pyrubberband to warp one audio stream
to another, according to some warping path
Parameters
----------
x1: ndarray(M)
First audio stream
x2: ndarray(N)
Second audio stream
sr: int
Sample rate
path: ndarray(P, 2)
Warping path, in units of windows
hop_length: int
The hop length between windows
Returns
-------
x3: ndarray(N, 2)
The synchronized audio. x2 is in the right channel,
and x1 stretched to x2 is in the left channel
"""
print("Stretching...")
path_final = np.array(path)*hop_length
path_final = [(row[0], row[1]) for row in path_final if row[0] < x1.size and row[1] < x2.size]
path_final.append((x1.size, x2.size))
path_final = np.array(path_final, dtype=int)
x3 = np.zeros((x2.size, 2))
x3[:, 1] = x2
x1_stretch = timemap_stretch(x1, sr, path_final)
print("x1.shape = ", x1.shape)
print("x2.shape = ", x2.shape)
print("x1_stretch.shape = ", x1_stretch.shape)
x1_stretch = x1_stretch[0:min(x1_stretch.size, x3.shape[0])]
x3 = x3[0:min(x3.shape[0], x1_stretch.size), :]
x3[:, 0] = x1_stretch
return x3
hop_length = 512
Y1 = librosa.feature.mfcc(y=x1, sr=sr, hop_length=hop_length).T
Y2 = librosa.feature.mfcc(y=x2, sr=sr, hop_length=hop_length).T
D = get_csm_fast(Y1, Y2)
import linmdtw
path = linmdtw.linmdtw(Y1, Y2)
plt.imshow(D, cmap='magma_r', aspect='auto')
plt.scatter(path[:, 1], path[:, 0])
xsync = stretch_audio(x1, x2, sr, path, hop_length)
ipd.Audio([xsync[:, 0], xsync[:, 1]], rate=sr)