Week 9: DTW Backtracing for Audio Alignment

Chris Tralie

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import numpy as np
import matplotlib.pyplot as plt
import librosa
import IPython.display as ipd
 
def get_csm_fast(Y1, Y2):
    Y1Sqr = np.sum(Y1**2, 1)
    Y2Sqr = np.sum(Y2**2, 1)
    D = Y1Sqr[:, None] + Y2Sqr[None, :] - 2*Y1.dot(Y2.T)
    D[D < 0] = 0
    D = np.sqrt(D)
    return D
 
def timemap_stretch(x, sr, path, hop_length=32, n_fft = 4096):
    """
    Stretch audio x so that it aligns with another
    audio clip, according to a warping path
 
    Parameters
    ----------
    x: ndarray(N)
        An array of audio samples
    sr: int
        Sample rate
    path: ndarray(K, 2)
        Warping path.  Indices of x are in first row
    hop_length: int
        Hop length to use in the phase vocoder
    n_fft: int
        Number of fft samples to use in the phase vocoder
    """
    # Break down into regions of constant slope
    xdiff = path[1::, 0] - path[0:-1, 0]
    ydiff = path[1::, 1] - path[0:-1, 1]
    xdiff = xdiff[1::] - xdiff[0:-1]
    ydiff = ydiff[1::] - ydiff[0:-1]
    diff = xdiff + ydiff
    ret = np.array([])
    i1 = 0
    while i1 < len(diff):
        i2 = i1+1
        while i2 < len(diff) and diff[i2] == 0:
            i2 += 1
        while i2 < len(diff) and path[i2, 0] - path[i1, 0] < n_fft:
            i2 += 1
        if i2 >= len(diff):
            break
        fac = (path[i2, 1]-path[i1, 1])/(path[i2, 0]-path[i1, 0])
        if fac > 0:
            fac = 1/fac
            xi = x[path[i1, 0]:path[i2, 0]+1]
            D = librosa.stft(xi, n_fft = n_fft, hop_length=hop_length)
            DNew = librosa.phase_vocoder(D, fac, hop_length=hop_length)
            xifast = librosa.istft(DNew, hop_length=hop_length)
            ret = np.concatenate((ret, xifast))
        i1 = i2
    return ret
 
def stretch_audio(x1, x2, sr, path, hop_length):
    """
    Use a phase vocoder to warp one audio stream
    to another, according to some warping path
 
    Parameters
    ----------
    x1: ndarray(M)
        First audio stream
    x2: ndarray(N)
        Second audio stream
    sr: int
        Sample rate
    path: ndarray(P, 2)
        Warping path, in units of windows
    hop_length: int
        The hop length between windows
     
    Returns
    -------
    x3: ndarray(N, 2)
        The synchronized audio.  x2 is in the right channel,
        and x1 stretched to x2 is in the left channel
    """
    print("Stretching...")
    path_final = np.array(path)*hop_length
    path_final = [(row[0], row[1]) for row in path_final if row[0] < x1.size and row[1] < x2.size]
    path_final.append((x1.size, x2.size))
    path_final = np.array(path_final, dtype=int)
    x3 = np.zeros((x2.size, 2))
    x3[:, 1] = x2
    x1_stretch = timemap_stretch(x1, sr, path_final)
    print("x1.shape = ", x1.shape)
    print("x2.shape = ", x2.shape)
    print("x1_stretch.shape = ", x1_stretch.shape)
    x1_stretch = x1_stretch[0:min(x1_stretch.size, x3.shape[0])]
    x3 = x3[0:min(x3.shape[0], x1_stretch.size), :]
    x3[:, 0] = x1_stretch
    return x3
 
 
def dtw(C):
    M = C.shape[0] # Number of rows
    N = C.shape[1] # Number of columns
    S = np.zeros_like(C)
    S[0, 0] = C[0, 0] # Base case
    # First row; all warping paths that only match a single element of the first point cloud
    # to some number of elements in the second point cloud
    """
    # Slower pure python code to do the cumulative sum would look like this
    for j in range(1, N):
        S[0, j] = S[0, j-1] + C[0, j]
    """
    S[0, :] = np.cumsum(C[0, :])
     
    # First column; all warping paths that only match a single element of the second point cloud
    # to some number of elements in the first point cloud
    S[:, 0] = np.cumsum(C[:, 0])
     
    # For the rest of the elements, we can apply the recurrence relation above
    for i in range(1, M):
        for j in range(1, N):
            S[i, j] = min(S[i-1, j], S[i, j-1], S[i-1, j-1]) + C[i, j]
     
    path = [[M-1, N-1]]
    # Loop through until you get to [0, 0]
    while path[-1][0] != 0 and path[-1][1] != 0:
        ## TODO: Fill this in
        pass
    path.reverse()
    return path
 
x1, sr = librosa.load("vivaldi1.wav")
x2, sr = librosa.load("vivaldi2.wav")
hop_length = 512
Y1 = librosa.feature.mfcc(y=x1, sr=sr, hop_length=hop_length).T # A "transpose" flips the rows and the columns
Y2 = librosa.feature.mfcc(y=x2, sr=sr, hop_length=hop_length).T # So that each window is along a row
C = get_csm_fast(Y1, Y2)
 
path = dtw(C)
 
xsync = stretch_audio(x1, x2, sr, path, hop_length)
ipd.Audio([xsync[:, 0], xsync[:, 1]], rate=sr)