我们可以把任务分为计算密集型和IO密集型,那如何在选择对应的处理方式,来更加快速的提高代码的性能。

CPU密集型(CPU-bound)

CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。

在多重程序系统中,大部份时间用来做计算、逻辑判断等CPU动作的程序称之CPU bound。例如一个计算圆周率至小数点一千位以下的程序,在执行的过程当中绝大部份时间用在三角函数和开根号的计算,便是属于CPU bound的程序。

CPU bound的程序一般而言CPU占用率相当高。这可能是因为任务本身不太需要访问I/O设备,也可能是因为程序是多线程实现因此屏蔽掉了等待I/O的时间。

IO密集型(I/O bound)

IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。

I/O bound的程序一般在达到性能极限时,CPU占用率仍然较低。这可能是因为任务本身需要大量I/O操作,而pipeline做得不是很好,没有充分利用处理器能力。

CPU密集型 vs IO密集型

我们可以把任务分为计算密集型和IO密集型。

计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。

计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。

第二种任务的类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。

IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。

总结:IO密集型可以多线程。比如有一个任务,执行10万次循环,每次都打印hello world,然后休眠1秒,如果单线程,需要10万秒完成,如果10个线程,就只需要1万秒。

CPU密集型尽量少点线程。还是上面那个任务,不同的是取消休眠,如果是单线程,几乎一下完成,如果是多线程会慢很多,而且随着线程数越多,速度会越慢,因为线程的切换是要时间的。所以要不要多线程就看IO要不要很花时间。

我们将上述语言文字转换为代码来进一步测试。

import time
import random

import concurrent.futures

from functools import wraps


def test(data):
    try:
        for i in data:
            # 模拟 I/O
            time.sleep(0.01)
            if i == data[-100]:
                return i
    except Exception as e:
        raise e


def test2(data):
    try:
        for i in data:
            # 输出时间复杂度为O(1) I/O时间短
            print("hello word")
    except Exception as e:
        raise e


def run_time(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        s_t = time.time()
        f = func(*args, **kwargs)
        e_t = time.time()
        print(f"{func.__name__} run_time", e_t - s_t)
        return f

    return wrapper


# with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
#     executor.submit(test, False)
#     worker = executor.submit(test, True)
#     executor.submit(test, False)
#     executor.submit(test, False)
#     executor.submit(test, True)
#     worker_exception = worker.exception()
#     if worker_exception:
#         print(worker_exception)

test_list = [random.randint(1, 1000) for i in range(1, 500000)]


@run_time
def thread_pool(func):
    """
    多线程测试
    
    :param func 方法引用地址
    """
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        worker = executor.map(func, [test_list for i in range(5)])
        for w in worker:
            print(w)


@run_time
def process_pool(func):
    """
    多进程测试
    
    :param func 方法引用地址
    """
    with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor2:
        results = executor2.map(func, [test_list for i in range(5)])

        for result in results:
            print(result)


if __name__ == '__main__':
    # I/O 较多测试
    thread_pool(test)
    process_pool(test)
    # I/O 较少测试
    time.sleep(3)
    thread_pool(test2)
    time.sleep(3)
    process_pool(test2)