CPU密集型(CPU-bound)
CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。
在多重程序系统中,大部份时间用来做计算、逻辑判断等CPU动作的程序称之CPU bound。例如一个计算圆周率至小数点一千位以下的程序,在执行的过程当中绝大部份时间用在三角函数和开根号的计算,便是属于CPU bound的程序。
CPU bound的程序一般而言CPU占用率相当高。这可能是因为任务本身不太需要访问I/O设备,也可能是因为程序是多线程实现因此屏蔽掉了等待I/O的时间。
IO密集型(I/O bound)
IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。
I/O bound的程序一般在达到性能极限时,CPU占用率仍然较低。这可能是因为任务本身需要大量I/O操作,而pipeline做得不是很好,没有充分利用处理器能力。
CPU密集型 vs IO密集型
我们可以把任务分为计算密集型和IO密集型。
计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。
计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。
第二种任务的类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。
IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。
总结:IO密集型可以多线程。比如有一个任务,执行10万次循环,每次都打印hello world,然后休眠1秒,如果单线程,需要10万秒完成,如果10个线程,就只需要1万秒。
CPU密集型尽量少点线程。还是上面那个任务,不同的是取消休眠,如果是单线程,几乎一下完成,如果是多线程会慢很多,而且随着线程数越多,速度会越慢,因为线程的切换是要时间的。所以要不要多线程就看IO要不要很花时间。
我们将上述语言文字转换为代码来进一步测试。
import time
import random
import concurrent.futures
from functools import wraps
def test(data):
try:
for i in data:
# 模拟 I/O
time.sleep(0.01)
if i == data[-100]:
return i
except Exception as e:
raise e
def test2(data):
try:
for i in data:
# 输出时间复杂度为O(1) I/O时间短
print("hello word")
except Exception as e:
raise e
def run_time(func):
@wraps(func)
def wrapper(*args, **kwargs):
s_t = time.time()
f = func(*args, **kwargs)
e_t = time.time()
print(f"{func.__name__} run_time", e_t - s_t)
return f
return wrapper
# with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# executor.submit(test, False)
# worker = executor.submit(test, True)
# executor.submit(test, False)
# executor.submit(test, False)
# executor.submit(test, True)
# worker_exception = worker.exception()
# if worker_exception:
# print(worker_exception)
test_list = [random.randint(1, 1000) for i in range(1, 500000)]
@run_time
def thread_pool(func):
"""
多线程测试
:param func 方法引用地址
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
worker = executor.map(func, [test_list for i in range(5)])
for w in worker:
print(w)
@run_time
def process_pool(func):
"""
多进程测试
:param func 方法引用地址
"""
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor2:
results = executor2.map(func, [test_list for i in range(5)])
for result in results:
print(result)
if __name__ == '__main__':
# I/O 较多测试
thread_pool(test)
process_pool(test)
# I/O 较少测试
time.sleep(3)
thread_pool(test2)
time.sleep(3)
process_pool(test2)
- Post link: https://yanxiang.wang/IO%E5%AF%86%E9%9B%86%E5%9E%8B%E3%80%81CPU%E5%AF%86%E9%9B%86%E5%9E%8B%E5%88%86%E5%88%AB%E8%AF%A5%E9%80%89%E6%8B%A9%E5%A4%9A%E8%BF%9B%E7%A8%8B%E8%BF%98%E6%98%AF%E5%A4%9A%E7%BA%BF%E7%A8%8B/
- Copyright Notice: All articles in this blog are licensed under unless otherwise stated.