本文共 10325 字,大约阅读时间需要 34 分钟。
http://nipunbatra.github.io/2014/07/dtw/
So you just recorded yourself saying a word and try to match it against another instance. The signals look similar, but have varying lengths and different activations for different features. So, how do you decide the similarity. Dynamic time warping (DTW) is probably something which can come to your rescue. Quoting wikipedia:
"In time series analysis, dynamic time warping (DTW) is an algorithm for measuring similarity between two temporal sequences which may vary in time or speed. For instance, similarities in walking patterns could be detected using DTW, even if one person was walking faster than the other, or if there were accelerations and decelerations during the course of an observation."
In this post I will try and put forward a naive implementation of DTW and also explain the different pieces of the problem.
import pandas as pdimport numpy as npimport matplotlib.pyplot as pltimport seaborn as sns%matplotlib inline
x = np.array([1, 1, 2, 3, 2, 0])y = np.array([0, 1, 1, 2, 3, 2, 1])
plt.plot(x,'r', label='x')plt.plot(y, 'g', label='y')plt.legend();
So, it appears that both the signals show similar behaviour: they both have a peak and around the peak they slop downwards. They vary in their speed and total duration. So, all set for DTW.
Our aim is to find a mapping between all points of x and y. For instance, x(3) may be mapped to y(4) and so on.
In this initial step, we will find out the distance between all pair of points in the two signals. Lesser distances implies that these points may be candidates to be matched together.
distances = np.zeros((len(y), len(x)))
distances
We will use euclidean distance between the pairs of points.
for i in range(len(y)): for j in range(len(x)): distances[i,j] = (x[j]-y[i])**2
distances
We will write a small function to visualize the distance matrix we just created.
def distance_cost_plot(distances): im = plt.imshow(distances, interpolation='nearest', cmap='Reds') plt.gca().invert_yaxis() plt.xlabel("X") plt.ylabel("Y") plt.grid() plt.colorbar();
distance_cost_plot(distances)
From the plot above, it seems like the diagonal entries have low distances, which means that the distance between similar index points in x and y is low.
In order to create a mapping between the two signals, we need to create a path in the above plot. The path should start at (0,0) and want to reach (M,N) where (M, N) are the lengths of the two signals. Our aim is to find the path of minimum distance. To do this, we thus build a matrix similar to the distances matrix. This matrix would contain the minimum distances to reach a specific point when starting from (0,0). We impose some restrictions on the paths which we would explore:
These restrictions would prevent the combinatorial explosion and convert the problem to a Dynamic Programming problem which can be solved in O(MN) time.
accumulated_cost = np.zeros((len(y), len(x)))
accumulated_cost[0,0] = distances[0,0]
Lets see how accumulated cost looks at this point.
distance_cost_plot(accumulated_cost)
for i in range(1, len(x)): accumulated_cost[0,i] = distances[0,i] + accumulated_cost[0, i-1]
distance_cost_plot(accumulated_cost)
for i in range(1, len(y)): accumulated_cost[i,0] = distances[i, 0] + accumulated_cost[i-1, 0]
distance_cost_plot(accumulated_cost)
for i in range(1, len(y)): for j in range(1, len(x)): accumulated_cost[i, j] = min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]) + distances[i, j]
distance_cost_plot(accumulated_cost)
So, now we have obtained the matrix containing cost of all paths starting from (0,0). We now need to find the path minimizing the distance which we do by backtracking.
Backtracking procedure is fairly simple and involves trying to move back from the last point (M, N) and finding which place we would reached there from (by minimizing the cost) and do this in a repetitive fashion.
path = [[len(x)-1, len(y)-1]]i = len(y)-1j = len(x)-1while i>0 and j>0: if i==0: j = j - 1 elif j==0: i = i - 1 else: if accumulated_cost[i-1, j] == min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]): i = i - 1 elif accumulated_cost[i, j-1] == min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]): j = j-1 else: i = i - 1 j= j- 1 path.append([j, i])path.append([0,0])
path
path_x = [point[0] for point in path]path_y = [point[1] for point in path]
distance_cost_plot(accumulated_cost)plt.plot(path_x, path_y);
The above plot shows the optimum warping path which minimizes the sum of distance (DTW distance) along the path. Let us wrap up the function by also incorporating the DTW distance between the two signals as well.
def path_cost(x, y, accumulated_cost, distances): path = [[len(x)-1, len(y)-1]] cost = 0 i = len(y)-1 j = len(x)-1 while i>0 and j>0: if i==0: j = j - 1 elif j==0: i = i - 1 else: if accumulated_cost[i-1, j] == min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]): i = i - 1 elif accumulated_cost[i, j-1] == min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]): j = j-1 else: i = i - 1 j= j- 1 path.append([j, i]) path.append([0,0]) for [y, x] in path: cost = cost +distances[x, y] return path, cost
path, cost = path_cost(x, y, accumulated_cost, distances)print pathprint cost
Let us compare our naive implementation with that of which also provides a DTW implementation.
import mlpy
dist, cost, path = mlpy.dtw_std(x, y, dist_only=False)
import matplotlib.cm as cmfig = plt.figure(1)ax = fig.add_subplot(111)plot1 = plt.imshow(cost.T, origin='lower', cmap=cm.gray, interpolation='nearest')plot2 = plt.plot(path[0], path[1], 'w')xlim = ax.set_xlim((-0.5, cost.shape[0]-0.5))ylim = ax.set_ylim((-0.5, cost.shape[1]-0.5))
The path looks almost identical to the one we got. The slight difference is due to the fact that the path chosen by our implementation and the one chosen by DTW have the same distance and thus we woulc choose either.
dist
Not bad! Our implementation gets the same distance between x and y.
Let us look at another interesting way to visualize the warp. We will place the two signals on the same axis and
plt.plot(x, 'bo-' ,label='x')plt.plot(y, 'g^-', label = 'y')plt.legend();paths = path_cost(x, y, accumulated_cost, distances)[0]for [map_x, map_y] in paths: print map_x, x[map_x], ":", map_y, y[map_y] plt.plot([map_x, map_y], [x[map_x], y[map_y]], 'r')
The above plot shows the mapping between the two signal. The red lines connect the matched points which are given by the DTW algorithm Looks neat isn't it? Now, let us try this for some known signal. This example is inspired from the example used in R's dtw implementation. We will see the DTW path between a sine and cosine on the same angles.
idx = np.linspace(0, 6.28, 100)
x = np.sin(idx)
y = np.cos(idx)
distances = np.zeros((len(y), len(x)))
for i in range(len(y)): for j in range(len(x)): distances[i,j] = (x[j]-y[i])**2
distance_cost_plot(distances)
accumulated_cost = np.zeros((len(y), len(x)))accumulated_cost[0,0] = distances[0,0]for i in range(1, len(y)): accumulated_cost[i,0] = distances[i, 0] + accumulated_cost[i-1, 0]for i in range(1, len(x)): accumulated_cost[0,i] = distances[0,i] + accumulated_cost[0, i-1] for i in range(1, len(y)): for j in range(1, len(x)): accumulated_cost[i, j] = min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]) + distances[i, j]
plt.plot(x, 'bo-' ,label='x')plt.plot(y, 'g^-', label = 'y')plt.legend();paths = path_cost(x, y, accumulated_cost, distances)[0]for [map_x, map_y] in paths: #print map_x, x[map_x], ":", map_y, y[map_y] plt.plot([map_x, map_y], [x[map_x], y[map_y]], 'r')
Ok, this does look nice. I am impressed!
We worked out a naive DTW implementation pretty much from scratch. It seems to do reasonably well on artificial data.
Feel free to comment!
转载地址:http://kflbi.baihongyu.com/