BFS(广度优先搜索)是图论中的一个基础搜索算法,对于下图,BFS将按照节点的数字大小逐一遍历。
单线程中的实现
借用队列的先进先出特性实现,伪代码实现如下,代码清晰且易于理解。
1 Breadth-First-Search(Graph, root):
2
3 for each node n in Graph:
4 n.distance = INFINITY
5 n.parent = NIL
6
7 create empty queue Q
8
9 root.distance = 0
10 Q.enqueue(root)
11
12 while Q is not empty:
13
14 current = Q.dequeue()
15
16 for each node n that is adjacent to current:
17 if n.distance == INFINITY:
18 n.distance = current.distance + 1
19 n.parent = current
20 Q.enqueue(n)
21
多线程中的实现
很多搜索场景下,比如网络爬虫算法,单线程的BFS无法充分利用现有的多核多线程的优势。
算法导论第三版新增了一章多线程算法的内容,主要描述了在多线程环境下如何实现某种算法。主要思想可以通过递归计算斐波那契数列(Fibonacci)来描述,众所周知的fib递归算法如下:
def fib(n):
if n < 2:
return n
a = fib(n - 1)
b = fib(n - 2)
return a + b
通过增加sync和spawn原语,可以将其转为多线程算法。spawn的含义代表在新线程中运行,sync的含义代表所有线程的汇聚,即所有线程都运行至此才继续执行下一行代码。
将上述fib转为多线程算法是这样的:
def fib(n):
if n < 2:
return n
a = spawn fib(n - 1)
b = fib(n - 2)
sync
return a + b
基于这个理论,需要首先找到哪些可并发,哪些需要汇聚,考虑下面这种情况:
B和C,D、E和E是分别满足并发条件的,但B和C并发后的节点E同时属于二者的后继,如果多个线程同时访问时,会将E两次推送至造成错误,并有可能使得多个线程同时取得对E的访问。
在既保证遍历是按照广度优先,而又不至于发生多次访问的错误的情况下,可以采用一种基于有限约束的并发算法,即只允许当前节点的所有子节点并发。这种约束反馈在代码实现上:
1 Breadth-First-Search(Graph, root):
2-11 ... ...
12 while Q is not empty:
13
14 current = Q.dequeue()
15
16 for spaw each node n that is adjacent to current:
17 if n.distance == INFINITY:
18 n.distance = current.distance + 1
19 n.parent = current
20 Q.enqueue(n)
21 sync
注意上面for循环中的spaw,这里指的是对每个节点的访问都将在单独的线程中进行,仅当所有节点完成遍历后进行sync。这样,所有的并发既不会乱序,也不会带来访问冲突。
Actor模型实现spawn和sync
什么是Actor
七周七并发一书中详细介绍了Actor模型。首先它是一个通用并发编程模型,也被Erlang借鉴。其次,它封装了状态并通过消息与其他actor通信,可以适应共享内存架构和分布式内存架构,而且有很好的容错性。简单来讲,Actor就是一个可以接收和发送消息的异步执行体。
使用python来模拟Actor
鉴于Actor是一个可收发消息的执行体,我们需要2个python的概念与其对应:thread 和 queue。 thread是一个执行体,而Queue恰好是一个消息存储体。
class Actor(Thread):
def __init__(s):
s.queue = Queue()
Thread.__init__(s)
def send(s, obj):
s.queue.put(obj)
def recv(s):
return s.queue.get()
def work_done(s):
s.queue.task_done()
def sync(s):
s.queue.join()
def has_more_work(s):
return not s.queue.empty()
def work():
pass
def run(s):
s.work()
Python内置类Queue的join方法说明:
Queue.join()
Blocks until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
配合task_done(),恰好可以模拟前文提及的sync方法,比如这样同步所有工作线程:
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = Queue()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
多线程BFS实现
有了可以sync的Actor,那么多线程的BFS就可以如下方式实现了。
首先将遍历节点的动作封装在Actor中,比如称之为Vistor,它是一个Actor:
class Visitor(Actor):
def __init__(s, monitor, vid = 0):
s.monitor = monitor
s.sid = sid
Actor.__init__(s)
def work(s):
while True:
v = s.recv()
nexts = visit(v)
s.send(nexts)
s.work_done()
对于发起BFS的线程,我们称之为monitor线程,当然,它也是一个Actor:
class Monitor(Actor):
def __init__(s, n_visitors, start_vertex = [], depth = 8):
Actor.__init__(s)
s.n_visitors = n_visitors
s.depth = depth
s.choice = 0
s.visit_history = set()
for v in start_vertex:
s.send(v)
s.visitors = [Visitor(s, x) for x in range(n_visitors)]
for sp in s.n_visitors:
sp.start()
def wait_all(s):
for sp in s.n_visitors:
sp.sync()
def dispatch(s,v):
if v not in s.visit_history:
s.n_visitors[s.choice % s.n_visitors].send(v)
s.choice += 1
s.visit_history.add(v)
def work(s):
while s.has_more_work():
for v in chain.from_iterable(s.recv_all()):
if has_visit_depth == s.depth:
break
s.dispatch(v)
s.wait_all()
s.done_work()
至此,我们就实现了这种基于有限约束的BFS了。