# coding:utf-8
"""
把大文件分块
big_file.txt 是一个500M的5位数的乱序文档
多线程并没有提升速度
"""
import time
txtfile = ''
import threading
def txtmap(txtup):
with open('big_file.txt','r') as f:
i = 0
while i < 100000:
txt = f.read(1)
i += 1 if txt == ',' else 0
txtup += txt
# txtmap(txtfile)
start = time.time()
for i in range(100):
txti = threading.Thread(target = txtmap(txtfile))
txti.start()
txti.join()
print(time.time() - start)
def txtmap2(txtup):
with open('big_file.txt','r') as f:
i = 0
while i < 1000000:
txt = f.read(1)
i += 1 if txt == ',' else 0
txtup += txt
start = time.time()
for i in range(10):
txtmap2(txtfile)
print(time.time() - start)
多进程和队列初试
import time
from multiprocessing import Process
from multiprocessing import Queue
num = 0
qnum = Queue()
qnum.put(num)
def testnum(num):
num += 1
qnum.put(num)
for i in range(10):
p = Process(target = testnum,args = (qnum.get(),))
p.start()
p.join()
# testnum(num)
print(qnum.get(),qnum.empty())
在这里,qnum属于实例化对象,不需要用global标记
多次测试,发现多进程加队列,必须把文件指针位置也放进去,不然下一个读取位置就会乱跳
with open('big_file.txt','r') as f:
q.put ((txtfile3,f.tell()))
def txtmap(qget):
txtup = qget[0]
i = 0
f.seek(qget[1])
while i < 10:
txt = f.read(1)
i += 1 if txt == ',' else 0
txtup += txt
q.put((txtup,f.tell()))
start = time.time()
for i in range(10):
txtp2i = Process(target = txtmap,args =(q.get(),))
txtp2i.start()
txtp2i.join()
print('多进程加队列',time.time() - start,'\n',q.get())
以及这个args的赋值真是烦人,明明从q.get()出来的就是元组,它非要你=后面必须是一个元组的形式才行
# coding:utf-8
"""
把大文件分块
big_file.txt 是一个500M的5位数的乱序文档
多进加队列速度 < 多进程全局变量(并不能达到程序设计的目的) < 多线程加队列 < 多线程加全局变量 < 普通全局变量
多进程因为进程间通讯必须借助队列Queue 或者管道pipe 并不能改变全局变量
"""
import os
import time
from multiprocessing import Process
from multiprocessing import Queue
import threading
txtfile = ''
txtfile2 = ''
txtfile3 = ''
txtfile4 = ''
q = Queue()
#我的本子是4核的,因为python的GIL的限制,所以同时只能运行4个进程,多了就会等待
with open('big_file.txt','r') as f:
def txtmap():
i = 0
global txtfile
while i < 25:
txt = f.read(1)
i += 1 if txt == ',' else 0
txtfile += txt
print(txt)
print (os.getpid ())
print(txtfile)
start = time.time()
for i in range(4):
txtpi = Process(target = txtmap)
txtpi.start()
txtpi.join()
print('多进程全局变量',time.time() - start,'\n',txtfile)
if txtfile:
print(True)
else:
print(False)
with open('big_file.txt','r') as f:
def txtmap():
i = 0
global txtfile2
while i < 10:
txt = f.read(1)
i += 1 if txt == ',' else 0
txtfile2 += txt
start = time.time()
for i in range(10):
txtti = threading.Thread(target = txtmap)
txtti.start()
txtti.join()
print('多线程全局变量',time.time() - start,'\n',txtfile2)
with open('big_file.txt','r') as f:
q.put ((txtfile3,f.tell()))
def txtmap(qget):
txtup = qget[0]
i = 0
f.seek(qget[1])
while i < 25:
txt = f.read(1)
i += 1 if txt == ',' else 0
txtup += txt
print (os.getpid ())
q.put((txtup,f.tell()))
start = time.time()
for i in range(4):
txtp2i = Process(target = txtmap,args =(q.get(),))
txtp2i.start()
txtp2i.join()
print('多进程加队列',time.time() - start,'\n',q.get()[0])
#因为队列q内的消息已被取完,所以再放进去一次,不然会一直处于阻塞状态等待插入消息
q.put(txtfile3)
with open('big_file.txt','r') as f:
def txtmap(txtup):
i = 0
while i < 10:
txt = f.read(1)
i += 1 if txt == ',' else 0
txtup += txt
q.put(txtup)
start = time.time()
for i in range(10):
txtt2i = threading.Thread(target = txtmap,args = (q.get(),))
txtt2i.start()
txtt2i.join()
print('多线程加队列',time.time() - start,'\n',q.get())
with open('big_file.txt','r') as f:
def txtmap2():
i = 0
global txtfile4
while i < 10:
txt = f.read(1)
i += 1 if txt == ',' else 0
txtfile4+= txt
start = time.time()
for i in range(10):
txtmap2()
print('普通全局变量',time.time() - start,'\n',txtfile4)
#os.path.geisize返回的文件大小就是seek指针的最后一个位置
print(os.path.getsize('big_file.txt'))
with open('big_file.txt','r') as f:
print(f.seek(0,2))
#终于看到了多进程同时进行,哦呼!甚是欢喜!而且和文件的指针并不会冲突
from multiprocessing import Process
import time
with open('test.txt', 'r') as f:
def readseek(num):
f.seek(num)
print(time.time(),f.read(10))
time.sleep(10)
p1 = Process(target = readseek, args = (20,))
p2 = Process(target = readseek, args = (60,))
p3 = Process(target = readseek, args = (120,))
p4 = Process(target = readseek, args = (160,))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
print(time.time())