众所周知,Python是使用伪多线程运行的。这导致在多核情况下,CPU并没有被良好地利用起来。为了提高性能,我们一般会选择多进程进行工作。
进程之间的协作需要通过通信完成,考虑到管道的缓存能力和安全性,我们选择队列作为通信内容的载体。
multiprocessing模块下实现封装有很多基础类型供给多线程间共享调用,这足够满足我们的需要。
但是又一个问题出现了。进程的发送方可以根据需要向队列中加入数据,但是对于接收方不断地接收数据来保证加入队列的数据可以得到及时地处理。或许轮询式的尝试从队列中获取数据是个好选择。
那么多久循环一次呢?无限制循环?一个Linux tick时间?无论多长,在空闲的时候都会造成对CPU的性能无意义损耗。所以另一个解决方案呼之欲出,如果队列为空那么等待一个信号,直到队列中被放入数据后触发这个信号后继续该进程。这个借助multiprocessing.Event()便可轻易做到。
那么怎么对Queue进行封装呢?关于这一点参考之前写过的这篇文章: Python中轮询触发更替事件驱动的简单方法
那么现在,我们已经成功得对Queue进行了封装,那么我们是否可以对multiprocessing中manager里的其他的管理的数据内容进行封装呢?于是我给出了如下的解决方案。
import multiprocessing
#这是所有操作符相关过或者其他功能相关的特殊函数。
common_list = {"__add__","__concat__","__contains__",
"__truediv__","__floordiv__", "__and___",
"__xor__","__invert__","__or__","__pow__",
"__is__","__is_not__","__setitem__","__delitem__",
"__getitem__","__lshift__","__mod__","__mul__",
"__matmul__","__neg__","__not__","__pos__",
"__rshift__","__setitem__","__delitem__","__getitem__",
"__mod__","__sub__","__truth__","__lt__",
"__le__","__eq__","__ne__","__ge__",
"__gt__","__str__"}
#定义需要封装事件信号的类型和触发事件信号的方法
source_map = {
'Queue': {'put'},
'dict':{'__setitem__'}
}
# 封装了multiprocessing.Manager,
# 由于multiprocessing.Manager实际是一个方法,
# 所以使用了下述__getattribute__的方式。
class Manager(object):
def __init__(self):
self.manager = multiprocessing.Manager()
# 将multiprocessing.Manager的方法映射向了Manager
def __getattribute__(self, name):
# 如果该方法在source_map 内,则对该方法进行封装事件处理
if name in source_map:
# 初始化的方法,实例化被封装的实例和该实例所用的事件信号
def __init__(self_q, *args, **kwargs):
self_q.sign = self.manager.Event()
self_q.base = self.manager.__getattribute__(name)(*args, **kwargs)
#生成一个对应的函数的函数
def functionfactory(i,m,self=None):
if self:
def f(*args, **kwargs):
r = self.base.__getattribute__(m)(*args, **kwargs)
if i:
self.sign_set()
return r
else:
def f(self_q, *args, **kwargs):
r = self_q.base.__getattribute__(m)(*args, **kwargs)
if i:
self_q.sign_set()
return r
return f
# 对一些属性方法做映射
def __getattribute__(self_q, name_q):
if name_q[:5] == 'sign_':
return self_q.sign.__getattribute__(name_q[5:])
elif name_q in source_map[name] and name_q not in common_list:
return functionfactory(True, name_q, self_q)
elif name_q[:2]=='__' or name_q in ['sign', 'base'] or \
(name_q in common_list and name_q in source_map[name]):
return object.__getattribute__(self_q, name_q)
else:
return self_q.base.__getattribute__(name_q)
# __dir__ 方法的重写
def __dir__(self):
r = (dir(self.base)
+ ["sign_%s" % m for m in dir(self.sign) if m[:2]!='__']
+ ['sign', 'base'])
r.sort()
return r
# 生成动态生成类型的方法字典
method_map = {
'__init__': __init__,
'__getattribute__': __getattribute__,
'__dir__': __dir__
}
for m in common_list.intersection(
dir(type(self.manager.__getattribute__(name)()))
):
method_map[m] = functionfactory(m in source_map[name],m)
#生成并返回动态类型
return type(name, (object,), method_map)
elif name in ['manager', 'source_map']:
return object.__getattribute__(self, name)
else:
return self.manager.__getattribute__(self, name)
def __dir__(self):
r = dir(self.manager) + ['manager']
r.sort()
return r
if __name__ == "__main__":
m = Manager()
q = m.Queue()
q.sign_set()
print(q.sign_is_set()) # True
q.sign_clear()
q.put(1)
print(q.sign_wait()) # True
d = m.dict()
d[3] = 2
print(d.sign_is_set()) # True
d.sign_clear()
print(d[3]) # 2
print(d.sign_is_set()) # False
print(d) # {3:2}
其中,通过重写__getattribute__方法的方式,简单实现了对封装的外部类和对应内部类的属性进行了映射。即当在代码中调用a.b的时候,其实是通过a.__getattribute__返回的b属性。比如:
class AllMethodAllowed(object):
def __getattribute__(self, name):
if name in dir(object):
return object.__getattribute__(self, name)
else:
return 'Hello, %s !' % name
if __name__=="__main__":
ama = AllMethodAllowed()
print(ama.Emma) # Hello, Emma !
print(ama.World) # Hello, World !
但是python的基本操作是由实例直接对应的内置方法决定的而非__getattribute__方法获取的,所以需要通过定义的方式进行。