Python实现多目标跟踪中的线性分配问题(Linear Assignment Problem,LAP)

多目标跟踪中用到的求解线性分配问题(Linear Assignment Problem,LAP)Python

flyfish

如果想看 C++版本的,请点这里。

线性分配问题(LAP,Linear Assignment Problem)是一个经典的优化问题,其目标是在若干任务和若干工人之间进行分配,以最小化总成本。成本可以是时间、金钱等。
LAPJV算法(Linear Assignment Problem Jonker-Volgenant algorithm)是解决线性分配问题的一种具体方法。这是一种基于Jonker和Volgenant提出的优化算法,旨在高效地求解LAP。

我们有一个公司,当前有5个任务需要完成,同时公司有5名工人可供选择。每个工人完成每个任务的成本(例如时间、金钱等)不同。公司的目标是找到一种分配方案,使得所有任务都能被一个工人完成,并且总成本最小。

具体而言,成本矩阵如下:

任务1 任务2 任务3 任务4 任务5
工人1 9 11 14 11 7
工人2 6 15 13 13 10
工人3 12 13 6 8 8
工人4 11 9 10 12 9
工人5 7 12 14 10 14

该矩阵中的每个值表示分配工人完成任务的成本。公司希望最小化这些成本的总和。
用Python代码来理解,使用lap库来验证写的对不对

依赖库

conda install -c conda-forge lap

代码实现

import numpy as np
import matplotlib.pyplot as plt
import lap

class LAPJV:
    def __init__(self, cost_matrix):
        self.cost_matrix = np.array(cost_matrix)
        self.n = self.cost_matrix.shape[0]
        self.x = np.full(self.n, -1, dtype=int)
        self.y = np.full(self.n, -1, dtype=int)
        self.v = np.zeros(self.n)
        self.free_rows = []

    def solve(self):
        self.column_reduction()
        self.augment()

    def column_reduction(self):
        for j in range(self.n):
            min_cost = np.min(self.cost_matrix[:, j])
            self.cost_matrix[:, j] -= min_cost
            self.v[j] = min_cost

        for j in range(self.n):
            i = np.argmin(self.cost_matrix[:, j])
            if self.x[i] == -1:
                self.x[i] = j
                self.y[j] = i
            else:
                self.y[j] = -1

        for i in range(self.n):
            if self.x[i] == -1:
                self.free_rows.append(i)
            elif self.y[self.x[i]] != i:
                self.x[i] = -1
                self.free_rows.append(i)

    def augment(self):
        while self.free_rows:
            self.find_augmenting_path()

    def find_augmenting_path(self):
        d = np.full(self.n, np.inf)
        pred = np.full(self.n, -1, dtype=int)
        cols = np.arange(self.n)

        i = self.free_rows.pop(0)
        for j in range(self.n):
            d[j] = self.cost_matrix[i, j] - self.v[j]
            pred[j] = i

        u = np.zeros(self.n)
        visited = np.zeros(self.n, dtype=bool)
        final_j = -1

        while final_j == -1:
            min_d = np.min(d[~visited])
            for j in cols[~visited]:
                if d[j] == min_d:
                    if self.y[j] == -1:
                        final_j = j
                        break
                    visited[j] = True
                    i = self.y[j]
                    u[j] = min_d
                    for k in cols[~visited]:
                        new_d = self.cost_matrix[i, k] - self.v[k] - u[j]
                        if new_d < d[k]:
                            d[k] = new_d
                            pred[k] = j

        while final_j != -1:
            i = pred[final_j]
            j = self.x[i]
            self.y[final_j] = i
            self.x[i] = final_j
            final_j = j

    def get_assignment(self):
        return self.x

def plot_assignment(cost_matrix, assignment):
    n = len(cost_matrix)
    fig, ax = plt.subplots()
    im = ax.imshow(cost_matrix, cmap='viridis')

    for i in range(n):
        ax.plot(assignment[i], i, 'ro')

    ax.set_xticks(np.arange(n))
    ax.set_yticks(np.arange(n))
    ax.set_xticklabels([f'Job {j+1}' for j in range(n)])
    ax.set_yticklabels([f'Worker {i+1}' for i in range(n)])

    plt.title('Job Assignment')
    plt.colorbar(im)
    plt.show()

# 更复杂的示例成本矩阵
cost_matrix = [
    [9, 11, 14, 11, 7],
    [6, 15, 13, 13, 10],
    [12, 13, 6, 8, 8],
    [11, 9, 10, 12, 9],
    [7, 12, 14, 10, 14]
]

# 创建并解决LAPJV问题
lap_solver = LAPJV(cost_matrix)
lap_solver.solve()
assignment = lap_solver.get_assignment()

# 使用lap库验证结果
cost_matrix_np = np.array(cost_matrix)
row_ind, col_ind, _ = lap.lapjv(cost_matrix_np)

# 打印分配结果
print("Custom LAPJV Assignment:")
for i, j in enumerate(assignment):
    print(f"Worker {i+1} is assigned to Job {j+1}")

print("\nLAP Library Assignment:")
for i, j in enumerate(col_ind):
    print(f"Worker {i+1} is assigned to Job {j+1}")

# 绘制分配结果
plot_assignment(cost_matrix, assignment)
Custom LAPJV Assignment:
Worker 1 is assigned to Job 5
Worker 2 is assigned to Job 1
Worker 3 is assigned to Job 3
Worker 4 is assigned to Job 2
Worker 5 is assigned to Job 4

LAP Library Assignment:
Worker 1 is assigned to Job 5
Worker 2 is assigned to Job 1
Worker 3 is assigned to Job 3
Worker 4 is assigned to Job 2
Worker 5 is assigned to Job 4

如何使用~来选择未访问的列:

import numpy as np

# 示例数据
visited = np.array([True, False, True, False])
cols = np.array([0, 1, 2, 3])

# 选择未访问的列
unvisited_cols = cols[~visited]

print("Visited:", visited)
print("Cols:", cols)
print("Unvisited Cols:", unvisited_cols)
Visited: [ True False  True False]
Cols: [0 1 2 3]
Unvisited Cols: [1 3]

作者:西笑生

物联沃分享整理
物联沃-IOTWORD物联网 » Python实现多目标跟踪中的线性分配问题(Linear Assignment Problem,LAP)

发表回复