Week 9: DTW Backtracing for Audio Alignment
Chris Tralie
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | import numpy as np import matplotlib.pyplot as plt import librosa import IPython.display as ipd def get_csm_fast(Y1, Y2): Y1Sqr = np. sum (Y1 * * 2 , 1 ) Y2Sqr = np. sum (Y2 * * 2 , 1 ) D = Y1Sqr[:, None ] + Y2Sqr[ None , :] - 2 * Y1.dot(Y2.T) D[D < 0 ] = 0 D = np.sqrt(D) return D def timemap_stretch(x, sr, path, hop_length = 32 , n_fft = 4096 ): """ Stretch audio x so that it aligns with another audio clip, according to a warping path Parameters ---------- x: ndarray(N) An array of audio samples sr: int Sample rate path: ndarray(K, 2) Warping path. Indices of x are in first row hop_length: int Hop length to use in the phase vocoder n_fft: int Number of fft samples to use in the phase vocoder """ # Break down into regions of constant slope xdiff = path[ 1 ::, 0 ] - path[ 0 : - 1 , 0 ] ydiff = path[ 1 ::, 1 ] - path[ 0 : - 1 , 1 ] xdiff = xdiff[ 1 ::] - xdiff[ 0 : - 1 ] ydiff = ydiff[ 1 ::] - ydiff[ 0 : - 1 ] diff = xdiff + ydiff ret = np.array([]) i1 = 0 while i1 < len (diff): i2 = i1 + 1 while i2 < len (diff) and diff[i2] = = 0 : i2 + = 1 while i2 < len (diff) and path[i2, 0 ] - path[i1, 0 ] < n_fft: i2 + = 1 if i2 > = len (diff): break fac = (path[i2, 1 ] - path[i1, 1 ]) / (path[i2, 0 ] - path[i1, 0 ]) if fac > 0 : fac = 1 / fac xi = x[path[i1, 0 ]:path[i2, 0 ] + 1 ] D = librosa.stft(xi, n_fft = n_fft, hop_length = hop_length) DNew = librosa.phase_vocoder(D, fac, hop_length = hop_length) xifast = librosa.istft(DNew, hop_length = hop_length) ret = np.concatenate((ret, xifast)) i1 = i2 return ret def stretch_audio(x1, x2, sr, path, hop_length): """ Use a phase vocoder to warp one audio stream to another, according to some warping path Parameters ---------- x1: ndarray(M) First audio stream x2: ndarray(N) Second audio stream sr: int Sample rate path: ndarray(P, 2) Warping path, in units of windows hop_length: int The hop length between windows Returns ------- x3: ndarray(N, 2) The synchronized audio. x2 is in the right channel, and x1 stretched to x2 is in the left channel """ print ( "Stretching..." ) path_final = np.array(path) * hop_length path_final = [(row[ 0 ], row[ 1 ]) for row in path_final if row[ 0 ] < x1.size and row[ 1 ] < x2.size] path_final.append((x1.size, x2.size)) path_final = np.array(path_final, dtype = int ) x3 = np.zeros((x2.size, 2 )) x3[:, 1 ] = x2 x1_stretch = timemap_stretch(x1, sr, path_final) print ( "x1.shape = " , x1.shape) print ( "x2.shape = " , x2.shape) print ( "x1_stretch.shape = " , x1_stretch.shape) x1_stretch = x1_stretch[ 0 : min (x1_stretch.size, x3.shape[ 0 ])] x3 = x3[ 0 : min (x3.shape[ 0 ], x1_stretch.size), :] x3[:, 0 ] = x1_stretch return x3 def dtw(C): M = C.shape[ 0 ] # Number of rows N = C.shape[ 1 ] # Number of columns S = np.zeros_like(C) S[ 0 , 0 ] = C[ 0 , 0 ] # Base case # First row; all warping paths that only match a single element of the first point cloud # to some number of elements in the second point cloud """ # Slower pure python code to do the cumulative sum would look like this for j in range(1, N): S[0, j] = S[0, j-1] + C[0, j] """ S[ 0 , :] = np.cumsum(C[ 0 , :]) # First column; all warping paths that only match a single element of the second point cloud # to some number of elements in the first point cloud S[:, 0 ] = np.cumsum(C[:, 0 ]) # For the rest of the elements, we can apply the recurrence relation above for i in range ( 1 , M): for j in range ( 1 , N): S[i, j] = min (S[i - 1 , j], S[i, j - 1 ], S[i - 1 , j - 1 ]) + C[i, j] path = [[M - 1 , N - 1 ]] # Loop through until you get to [0, 0] while path[ - 1 ][ 0 ] ! = 0 and path[ - 1 ][ 1 ] ! = 0 : ## TODO: Fill this in pass path.reverse() return path x1, sr = librosa.load( "vivaldi1.wav" ) x2, sr = librosa.load( "vivaldi2.wav" ) hop_length = 512 Y1 = librosa.feature.mfcc(y = x1, sr = sr, hop_length = hop_length).T # A "transpose" flips the rows and the columns Y2 = librosa.feature.mfcc(y = x2, sr = sr, hop_length = hop_length).T # So that each window is along a row C = get_csm_fast(Y1, Y2) path = dtw(C) xsync = stretch_audio(x1, x2, sr, path, hop_length) ipd.Audio([xsync[:, 0 ], xsync[:, 1 ]], rate = sr) |