python高级特性
iteration迭代
对list,tuple的遍历被称为迭代。对list实现类似Java那样的下标循环怎么办?Python内置的enumerate函数可以把一个list变成索引-元素对
>>> for i, value in enumerate(['A', 'B', 'C']):
... print(i, value)
iterator迭代器
凡是可作用于for循环的对象都是Iterable类型;凡是可作用于next()
函数的对象都是Iterator类型,它们表示一个惰性计算的序列。集合数据类型如list
、dict
、str
等是Iterable但不是Iterator,不过可以通过iter()
函数获得一个Iterator对象。
>>> isinstance(iter([]), Iterator)
True
Python的for
循环本质上就是通过不断调用next()
函数实现的。循环有for in
和while
,迭代只能用for in
。
受到内存限制,列表容量肯定是有限的。generator生成器,属于iterator。生成方法:
g = (x * x for x in range(10))
函数定义中包含yield
关键字, 用next(g)
或for n in g
获取值。想要拿到返回值,必须捕获StopIteration
错误,返回值包含在StopIteration
的e.value
中。
python并发
python multiprocessing模块封装了多进程和多线程,其中multiprocessing.Process新启动进程,multiprocessing.Pool对应多进程池,multiprocessing.dummy.Pool对应多线程池。后两者用法一致,以下是多进程的用法,其中args
为元组格式,Iterable
为可迭代对象。由于进程锁存在,多线程通常无加速效果。
from multiprocessing import Process
p = Process(target=f, args=(num, arr))
p.start()
p.join()
# 获取最大进程数,可设为更小的值,如一半
import os
count = os.cpu_count()
from multiprocessing import Pool
pool=Pool(count)
results, async_results = [], []
for i in range(count):
# 同步并发,子进程会block,直到获取结果,func一直在一个子进程中执行,故无加速效果
results.append(pool.apply(func, args))
# 异步并发,子进程不会block,支持callback
async_results.append(pool.apply_async(func, args, callback))
# results已为所需结果
# 返回AsyncResult,通过get获取结果
get_results = [x.get() for x in async_results]
pool.close()
pool.join()
或
from multiprocessing import Pool
pool = Pool(count)
# func只能有1个入参
pool.map(func, Iterable) # 同步并发
pool.map_async(func, Iterable)。# 异步并发
# func只能有1个入参,lazy模式,返回类似Generator,遍历时(可能)计算
pool.imap(func, Iterable) # 有序并发
pool.imap_unordered(func, Iterable)。# 无序并发
# func可以有多个入参,Iterable元素仍为Iterable,可解包为多个入参
pool.starmap(func, Iterable) # 同步并发
pool.starmap_async(func, Iterable) # 异步并发
multiprocessing[.sharedctypes]模块可用于一维数组内存共享:
- Array:有锁版,避免写冲突。
- RawArray:无锁版,性能好。
multiprocessing.shared_memory模块可用于子进程间内存共享:
- SharedMemory:内存共享。
- ShareableList:List共享,仅支持几种元素类型。
multiprocessing.managers.SharedMemoryManager模块,支持上述两种共享内存类型的管理。
使用多进程计算非定长向量距离矩阵
import numpy as np
import os
import multiprocessing
from functools import partial
from dtaidistance import dtw, dtw_ndim
# 进程池initializer函数
def init_pool(array):
global glob_array # 共享全局变量
glob_array = array
# 子进程函数
def process_fn(ij, func=None, array_width=None):
i, j, ai, aj = ij
# 子进程读取全局变量glob_array,对齐一维glob_array与原始二维array的对应位置关系
glob_array[i * array_width + j] = func(ai, aj)
def calc_relation_mat(func, list1, list2=None, relation='dist'):
len1 = len(list1)
len2 = len1 if list2 is None else len(list2)
array = np.zeros((len1, len2))
# Pool.map仅支持一个入参,使用偏函数functools.partial,预先传入其他参数
fn_partial = partial(process_fn, func=func, array_width=array.shape[0])
# array为展平的矩阵(即multiprocessing.RawArray, 多进程不支持二维矩阵)
array_shared = multiprocessing.RawArray('d', array.ravel())
# 由于各进程改动对应矩阵位置(即内存地址)处的值,无冲突,故无需加进程锁
# 定义进程池,指定进程数量(processes),初始化函数(initializer)及其参数(initargs)
n_proc = max(os.cpu_count(), 16)
p = multiprocessing.Pool(processes=n_proc, initializer=init_pool, initargs=(array_shared,))
# 若list1==list2,先计算下三角矩阵,然后转置后复值到上三角位置,否则全部计算
it = [(i, j, list1[i], list1[j]) for i in range(len1) for j in range(i if list2 is None else len2)]
# map函数向子进程函数分配不同的参数
p.map(fn_partial, it)
p.close()
p.join()
# glob_array为子进程中的全局变量,在主进程中并未被定义,主进程中的array_shared与子进程中的glob_array指向同一内存地址
array = np.frombuffer(array_shared, np.double).reshape(array.shape)
if list2 is None:
if relation == 'dist':
# 无需 - np.diag(np.diag(dist_mat)),因为对角线为0
array = array + array.T
elif relation == 'sim':
# 对角线为1
array = array + array.T + np.eye(len(list1))
return array
if __name__ == '__main__':
list1 = [np.random.randn(x) for x in range(1, 11)]
dist_mat = calc_relation_mat(dtw.distance, list1)
多进程/线程调试,子进程/线程代码异常时,报错信息不会输出到当前父进程/线程窗口,无法使用pdb直接对多进程进行调试。有以下几种方法:
- print可以生效,但顺序随机。
- 设置pdb.set_trace()后,通过Pycharm提供的远程调试功能。
- ForkedPdb,来源于stackoverflow。
异常
unable to find vcvarsall.bat
解决办法参考:
http://www.cnblogs.com/youxin/p/3159363.html
http://blog.csdn.net/secretx/article/details/17472107
Microsoft Visual C++ Compiler for Python 2.7
http://aka.ms/vcpython27
锁
线程锁,只能在同一个进程不同线程之间加锁,无法在不同进程(如不同用户)之间加锁。如果其他线程锁定同一个Flag,会被阻塞,直到锁被释放。任务会按加锁的顺序执行。
import threading
# 创建锁
mutex = threading.Lock()
# 加锁,传递一个Flag
mutex.acquire(5)
# 执行任务,如读写文件
# 解锁
mutex.release()
文件锁,通过Linux文件在不同进行(如不同用户)之间加锁。如果其他进程/线程锁定同一个文件,会被阻塞,直到锁被释放。任务不一定会按加锁顺序执行。
在Linux下,Python的标准库有现成的文件锁模块fcntl
,提供了unix系统fcntl()和ioctl()的接口。
fcntl.flock(fd, operation)
其中:
- ` fd 表示文件描述符;
-
operation
表示锁操作,取值如下:- LOCK_SH:表示共享锁,一个文件的共享锁可以同时被多个进程拥有。
- LOCK_EX:表示排他锁,一个文件的排他锁只能同时被一个进程拥有。
- LOCK_UN:表示删除文件锁。
- LOCK_MAND:表示共享模式强制锁,与LOCK_READ或者LOCK_WRITE联合使用,表示是否允许并发的读/写操作。
import fcntl
# 方式一:
with open('/tmp/myfile.lock', 'w') as f:
# 加锁
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
# 执行任务,如读写文件
# 解锁方式一,主动解锁
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
# 解锁方式二,文件关闭后,自动解锁
# 方式二:
f = open('/tmp/myfile.lock', 'w')
# 加锁
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
# 执行任务,如读写文件
# 解锁方式一,主动解锁
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
# 解锁方式二,文件关闭后,自动解锁
f.close()
fcntl
模块在Windows上不可用,可以使用msvcrt
模块代替。一种跨平台的文件锁实现如下。
# Reference:
# - https://docs.python.org/zh-cn/3/library/fcntl.html
# - https://docs.python.org/zh-cn/3/library/msvcrt.html
# - https://docs.python.org/zh-cn/3/library/ctypes.html#ctypes.WinDLL
# - https://juejin.cn/post/6870689230440529927
# - https://zhuanlan.zhihu.com/p/354383209
import platform
if platform.system() != 'Windows':
import fcntl
is_unix = True
LOCK_FILE = '/tmp/file.lock'
else:
import msvcrt
is_unix = False
LOCK_FILE = 'C:\\file.lock'
NBYTES = 1
LOCK_EX = 2
LOCK_NB = 4
def lock(file_desc, mode=LOCK_EX):
"""同一进程内对同一文件重复加锁,不同进程对同一个文件重复加锁,会阻塞或返回False。"""
if mode == LOCK_NB:
if is_unix:
try:
fcntl.flock(file_desc, fcntl.LOCK_EX | fcntl.LOCK_NB)
except:
return False
else:
try:
msvcrt.locking(file_desc.fileno(), msvcrt.LK_NBLCK, NBYTES)
file_desc.seek(0)
except:
return False
return True
else:
if is_unix:
fcntl.flock(file_desc, fcntl.LOCK_EX)
else:
msvcrt.locking(file_desc.fileno(), msvcrt.LK_LOCK, NBYTES)
file_desc.seek(0)
return True
def unlock(file_desc):
if is_unix:
fcntl.flock(file_desc, fcntl.LOCK_UN)
else:
# file_desc.seek(0)
msvcrt.locking(file_desc.fileno(), msvcrt.LK_UNLCK, 1)
return True
拷贝
浅层与深层复制(拷贝)的区别仅与复合对象(即包含列表或类的实例等其他对象的对象)相关。参考示例
-
=
:赋值语句(即引用),不复制对象,而是创建目标和对象的绑定关系,id()
不变。 -
copy.copy(x)
:浅拷贝,不拷贝内部对象。构造一个新的复合对象,然后(在尽可能的范围内)将原始对象中找到的对象的 引用 插入其中。 -
copy.deepcopy(x[, memo])
:深拷贝,完全拷贝了对象及其内部对象。构造一个新的复合对象,然后,递归地将在原始对象里找到的对象的副本插入其中。
深度复制操作通常存在两个问题, 而浅层复制操作并不存在这些问题:
- 递归对象 (直接或间接包含对自身引用的复合对象) 可能会导致递归循环。
- 由于深层复制会复制所有内容,因此可能会过多复制(例如本应该在副本之间共享的数据)。
deepcopy()
函数用以下方式避免了这些问题:
- 保留在当前复制过程中已复制的对象的 "备忘录" (
memo
) 字典;以及 - 允许用户定义的类重载复制操作或复制的组件集合。
浅拷贝等价用法:
dict.copy()
-
numpy.copy()
,ndarray.copy()
:两者默认的存储order
不同。推荐后者,同np.array(a, copy=True)
。 -
original_list[:]
:列表的索引和切片。 - 类可以使用与控制序列化(
pickling
)操作相同的接口来控制复制操作。 - 自定义类的拷贝,copy() 和 deepcopy()
深拷贝等价用法:
-
numpy.copyto()
,ndarray+0
(ndarray运算)
numpy视图(ndarray.view()
)/切片/索引,介于引用和浅拷贝之间,创建新对象,形状等属性可以不同,但共享数据区。numpy浅拷贝,数据区值类型数据不共享,子对象共享。numpy深度拷贝请用copy.deepcopy()
。