操作系统知识点总结归纳

本文主要对操作系统和Linux基础知识进行梳理、回顾,把一些需要记住的概念原理,和容易混淆,晦涩的知识点进行归纳。

操作系统

1. 并发与并行

(1) 并发

并发是指同一时间间隔内多个任务都在运行,但是并不会在用一时刻同时运行,存在交替执行的情况,因此也经常存在资源竞争德尔情况。线程是并发的,实现的库有threading。

(2) 并行

并行指的是同一时刻多个任务同时运行,进程是并行的,实现的库有multiprocessing。

(3) 并发与并行的应用场景

IO密集型操作(程序需要执行较多的读写,请求和回复任务的需要CPU对硬盘、内存进行读写,如爬虫)应使用并发更好,类似于多线程。

CPU密集型操作(程序运行需要花大量时间做逻辑判断、运算等消耗CUP资源的操作,如复杂的浮点运算)应使用并行更好,类似于多进程。

2. 同步与异步

同步和异步都是相对于多任务而言的。

同步:多个任务之间有先后顺序执行,一个执行完才能执行下一个。

异步:多个任务之间没有先后顺序,可以同时执行,有时候一个任务可能在必要的时候获取另一个同时执行的任务的结果,这叫回调。

3. 阻塞与非阻塞

阻塞非阻塞是相对于代码执行而言的。

阻塞:如果卡住了调用者,调用者不能继续往下执行,当前线程会被挂起,调用线程只有在得到结果之后才会返回,就是说调用者阻塞了,函数只有在得到结果之后才会将阻塞的线程激活。

如果不会卡住,可以继续执行就是非阻塞的,非阻塞调用指在不能立刻得到结果之前也会立刻返回,同时该函数不会阻塞当前线程。

4. 进程、线程以及协程间的区别

线程、进程、协程、锁的总结

(1) 进程(process)

进程是系统进行资源分配和调度的一个独立的最小单位,它是程序执行的一个实例。

程序运行时,系统就会创建一个进程,并为它分配资源,然后把该进程放入进程就绪队列,进程调度器选中它的时候,就会为它分配CPU、内存、时间片,程序开始真正运行。

进程拥有自己独立的内存空间,进程间数据不共享,因此开销大。

进程特征:

  • 动态性:进程的实质是程序在多道程序系统中的一次执行过程,进程是动态产生的,动态消亡的;
  • 并发性:任何进程都可以同其他进程一起并发执行。
  • 独立性:进程是一个能独立运行的基本单位,同时也是系统分配资源和调度的独立单位。
  • 异步性:由于进程间的相互制约,使进程具有执行的间断性,即进程按各自独立的、不可预知的速度向前推进。

(2) 线程(thread)

线程是程序执行的最小单位,它是进程的一个实体,也是进程的一个执行流,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本的单位。

一个程序至少有一个进程,一个进程至少有一个线程。一个进程可以由多个线程组成,线程不能够独立执行,必须依存在进程中。

线程间共享进程的所有资源(内存共享、数据共享、全局变量共享等),从而极大的提高了程序的运行效率,每个线程有自己的堆栈和局部变量。

线程的划分尺度小于进程(资源比进程少,仅仅需要运行中必不可少的程序计数器、寄存器和栈),使得多线程程序的并发性高。

线程执行开销小,但不利于资源管理和保护,而进程刚好相反。

(3) 协程(coroutine)

协程是一种用户态的轻量级线程,也称微线程,协程的调度完全由用户控制。

协程拥有自己的寄存器上下文和栈,协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前先前保存的寄存器上下文和栈,直接操作栈则基本没有内核切换的开销,可以不加锁的访问全局变量,所以上下文的切换非常快。

在合适的实际,gevent实现的协程可以把一个协程切换到另一个协程。只要这个过程中保存或回复CPU上下文那么程序还是可以运行的。

通俗解释:在一个线程中的某个函数,可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行,注意不是通过调用函数的方式做到的,并且且换的次数以及什么时候再切换到原来的函数都由开发者自己去确定。

协程是程序级别的,由程序员根据需要自己调度。我们把一个线程中的一个个函数叫做子程序,那么子程序在执行过程中可以中断去执行别的子程序;别的子程序也可以中断回来继续执行之前的子程序,这就是协程。

(4) 协程的优缺点

  • 协程的优点

    • 无需线程上下文切换的开销,协程避免了无意义的调度,由此可以提高性能(但也失去了标准线程使用多CPU的能力,需要用户自己控制调度);
    • 无需原子操作锁定及同步的开销;
    • 方便切换控制流,简化编程模型;
    • 协程很适合用于高并发处理,一个CPU支持上万的协程不是问题,高并发+高扩展性+低成本。
  • 协程的缺点

    • 无法利用多核资源,协程的本质是个单线程,它不能同时将单个CPU的多个核上,协程需要和进程配合才能运行在多个CPU上(CPU密集型应用)。
    • 进行阻塞操作时,会阻塞掉整个程序。

(5) 进程和线程的区别和优劣

  • 进程是资源分配的最小单位,线程是程序执行的最小单位;

  • 进程有自己独立的地址空间,每启动一个进程,系统就会为它分配地址空间,建立数据表来维护代码段、堆栈段和数据段,耗费资源多;而线程是共享进程间的资源,使用相同的地址空间,因此CPU 切换和创建一个线程的开销比进程小很多。

  • 线程之间的通信更方便,统一进程下的线程共享全局变量、静态变量等数据,而进程之间的通信的方式进行,不过如何处理好同步与互斥是编写多线程程序的难点。

  • 多进程程序更健壮,多线程程序只要有一个线程死掉,整个进程也死掉了,因为所有线程共享进程的内存;而一个进程死掉并不会对别的进程造成影响,因为进程有独立独立的地址空间,著名的Apache最早就是采用多进程的模式,现在好像使用的是多进程+多线程的混合模式。

(6) 协程和线程的差异

  • (1) 在实现多任务时,线程切换从系统层面远不止保存和恢复cpu上下文这么简单。
  • (2) 操作系统为了程序运行的高效性,每个线程都有自己缓存Cache等数据,操作系统会自动实现数据恢复操作,所以线程切换非常耗性能。
  • (3) 协程切换只是单纯的操作CPU上下文,所以一秒秒钟切换上百万次系统都扛得住。

5. python实现多进程

Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次返回一次,但是fork()调用一次返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回进程的ID。一个父进程可以fork出多个子进程,所以父进程要记下每个子进程的ID,而子进程只主要调用getppid()就可以拿到父进程的ID。

父进程、子进程执行的顺序没有规律,完全取决于操作系统的调度算法。各个进程有独立的运行空间,不共享全局变量;有时会因父进程提前退出,子进程的父进程和开始的不一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# os模块封装了常见的系统调用

import os

print(f"Process {os.getpid()} start...") # unix/linux/mac系统中

pid = os.fork()

if pid < 0:
print("fork()调用失败")
if pid == 0:
print(f"父进程pid: {os.getppid()}, 子进程pid: {os.gerpid()}.") # 子进程在系统的pid不为0
else:
print(f"子进程pid: {os.getpid()}, 父进程pid: {pid}") # 父进程返回子进程的ID

# 运行结果
'''
Process 3362 start...
父进程pid:3362, 子进程pid:3363
子进程pid:3363, 子进程pid:3362
'''

由于Windows没有fork调用,而Python是跨平台的,所以可以调用multiprocessing模块中的Process类来实现跨平台的多进程。

Process()的语法:Process([group[, target[, name [, args[, kwargs]]]]]),group参数未使用,默认为None;参数target表示这个进程实例所调用的对象;args表示调用这个对象的位置和参数元组;kwargs表示调用对象的关键子参数字典;name子进程名称。

Process属性方法

方法/属性 说明
start() 启动进程,调用进程中的run()方法。
run() 进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法。
timinate() 强制终止进程,不会进行任何清理操作。如果该进程终止前,创建了自进程,那么该子进程在其强制结束后变为僵尸进程;如果进程还保存了一个锁,那么也将不会被释放,进而导致死锁。使用时,要注意。
is_alive() 判断某进程是否存活,存活返回True,否则False。
join([timeout]) 主线程等待子线程终止。timeout为可选超时时间;p.join()只能join住start开启的进程,而不能join住run开启的进程
daemon() 默认值False, 如果设置为True,代表该进程为后台守护进程;当该进程的父进程终止时,该进程也随之终止;并且设置为True后,该进程不能创建子进程,设置该属性必须在start()之前。
exitcode 进程运行时为None,如果为-N,表示信号N结束了。
authkey 进程身份验证,默认是由os.urandom()随机生成32字符的字符串。这个键的用途是设计涉及网络链接的低层进程间的通信提供安全性,这类连接只有在具有相同身份验证才能成功。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Process
import os
import time

# 子进程要执行的代码
def run_proc(name):
time.sleep(3)
print(f"子进程{name}运行中,pid={os.getpid()},父进程的pid={os.getppid()}")

if __name__ == "__main__":
print(f"父进程是{os.getpid()}.")
p = Process(target=run_proc, args=('test',))
print("子进程开始启动.")
p.start() # 启动进程
p.join() # 阻塞当前进程,直到调用join方法的那个进程执行完,在继续执行当前进程
print("子进程运行结束")

# 结果
'''
父进程是12324.
子进程开始启动.
子进程test运行中,pid=4472,父进程的pid=12324
子进程运行结束
'''

创建自己的进程类,继承于Process类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from multiprocessing import Process
import os
import time

class MyProcess(Process):
def __init__(self, interval):
super().__init__()
self.interval = interval

def run(self):
print("子进程")
start_time = time.time()
time.sleep(self.interval)
stop_time = time.time()
print(f"子进程id: {os.getpid()}, 父进程id:{os.getppid()}, 共执行了{stop_time-start_time: .2f} 秒.")

if __name__ == "__main__":
print("主进程")
startTime = time.time()
p = MyProcess(2)
p.start()
p.join()
stopTime = time.time()
print(f"子进程结束,花费了{stopTime-startTime: .2f}")

# 结果
'''
主进程
子进程
子进程id: 3008, 父进程id:13084, 共执行了 2.00 秒.
子进程结束,花费了 2.79
'''

如果需要创建很多进程,需要用到进程池Pool方法。初始化进程时,可以指定一个最大进程数(默认为CPU核数),当池中的进程数达到最大值时,该请求就会等待,直到池中有进程结束,才会创建新的进程来执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from multiprocessing import Pool
import os
import time
import random

# 进程类
def worker(msg):
start_time = time.time()
print(f"{msg}开始执行,执行进程id为{os.getpid()},父进程id为{os.getppid()}")
time.sleep(random.random()*2)
stop_time = time.time()
print(msg, f"执行完毕,耗时{stop_time-start_time: .2f}秒。")

if __name__ == "__main__":
print("----开始执行----")
start = time.time()
pool = Pool(3)
for i in range(0, 10):
pool.apply_async(worker, (i,)) # 异步非阻塞
pool.close() # 关闭进程池,关闭后不再就收新请求
pool.join() # 必须放在close语句后面
stop = time.time()
print(f"执行结束,总耗时{stop-start:.2f}")

# 结果
'''
----开始执行----
0开始执行,执行进程id为4640,父进程id为7116
1开始执行,执行进程id为8,父进程id为7116
2开始执行,执行进程id为7728,父进程id为7116
0 执行完毕,耗时 0.09秒。
3开始执行,执行进程id为4640,父进程id为7116
2 执行完毕,耗时 0.10秒。
4开始执行,执行进程id为7728,父进程id为7116
4 执行完毕,耗时 0.04秒。
5开始执行,执行进程id为7728,父进程id为7116
3 执行完毕,耗时 0.20秒。
6开始执行,执行进程id为4640,父进程id为7116
6 执行完毕,耗时 0.12秒。
7开始执行,执行进程id为4640,父进程id为7116
5 执行完毕,耗时 1.38秒。
8开始执行,执行进程id为7728,父进程id为7116
1 执行完毕,耗时 1.95秒。
9开始执行,执行进程id为8,父进程id为7116
8 执行完毕,耗时 0.48秒。
7 执行完毕,耗时 1.81秒。
9 执行完毕,耗时 1.58秒。
执行结束,总耗时4.63
'''

如果使用进程池pool创建进程的话,就需要使用Manager().Queue()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from multiprocessing import Manager, Pool
import os

# queue,实现多进程间的数据传递,其实是个消息队列
def write(q):
print(f"--开始执行写进程 {os.getpid()}--")
for value in ['A', 'B', 'C']:
print(f"把 {value} 放入队列")
q.put(value)

def read(q):
print(f"--开始执行读进程 {os.getpid()}--")
for i in range(q.qsize()):
print(f"从队列读取 {q.get(True)}")

if __name__ == "__main__":
print(f"--开始执行主进程 {os.getpid()}--")
q = Manager().Queue()
p = Pool()
p.apply(write, args=(q,)) # apply 阻塞,上个进程结束才会执行下一个进程
p.apply(read, args=(q,))
p.close() # 关闭pool 不再接收请求
p.join() # 主进程阻塞,等待子进程运行结束,必须在close以后
print("--主进程结束--")

# 结果
'''
--开始执行主进程 6012--
--开始执行写进程 11524--
把 A 放入队列
把 B 放入队列
把 C 放入队列
--开始执行读进程 11524--
从队列读取 A
从队列读取 B
从队列读取 C
--主进程结束--
'''

当子进程不是自身,而是一个外部进程时,创建子进程后,还需要控制子进程的输入和输出。subprocess模块可以非常方便的启动子进程,然后可以通过communicate()方法输入。

python 中subprocess

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode("gbk"))
print('exit code:', p.returncode)

# 结果
'''
$ nslookup
默认服务器: dnspai-public-dns.dnspai.com
Address: 101.226.4.6

> > 服务器: dnspai-public-dns.dnspai.com
Address: 101.226.4.6

python.org MX preference = 50, mail exchanger = mail.python.org
>
exit code: 0
'''

6. python实现多线程

多任务可以由多进程完成,也可以由一个进程内的多线程完成。线程是操作系统直接支持的执行单元,Python的标准库提供了两个模块_thread(低级模块)和threading(高级模块),大部分情况下选择用threading模块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import time
import threading

# 新线程执行
def loop(num):
print(f"线程{threading.current_thread().name}正在运行...")
n = 0
while n < num:
n += 1
print(f"线程{threading.current_thread().name} >>> {n}")
time.sleep(1)
print(f"线程{threading.current_thread().name}结束")

print(f"线程{threading.current_thread().name}正在运行...")
t = threading.Thread(target=loop, name='LoopThread', args=(5,)) # 通过传参的方式调用局部变量
t.start()
t.join()
print(f"线程{threading.current_thread().name}运行结束.")

# 结果
'''
线程MainThread正在运行...
线程LoopThread正在运行...
线程LoopThread >>> 1
线程LoopThread >>> 2
线程LoopThread >>> 3
线程LoopThread >>> 4
线程LoopThread >>> 5
线程LoopThread结束.
线程MainThread运行结束.
'''

一个进程中的线程共享相同的内存单元、内存地址空间,可以访问相同的变量和对象,而且他们从同一堆中分配对象,通信、数据交换、同步操作,缺点线程是对全局变量随意修改,可能造成多线程之间对全局变量的混乱(不安全),可以通过传参的方式调用全局变量(不适用可变类型),但是修改次数过多,会有多次线程切换,产生紊乱。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 给线程加锁,实现线程按顺序执行
from threading import Thread, Lock
import time


class Task1(Thread):
def run(self):
while True:
if lock1.acquire(): # 获取锁对象
print("--Task1--")
time.sleep(1)
lock2.release() # 释放锁


class Task2(Thread):
def run(self):
while True:
if lock2.acquire():
print("--Task2--")
time.sleep(1)
lock3.release()


class Task3(Thread):
def run(self):
while True:
if lock3.acquire():
print("--Task3--")
time.sleep(1)
lock1.release()


lock1 = Lock()
lock2 = Lock()
lock2.acquire()
lock3 = Lock()
lock3.acquire()

t1 = Task1()
t2 = Task2()
t3 = Task3()
t1.start()
t2.start()
t3.start()

# 结果
'''
--Task1--
--Task2--
--Task3--
--Task1--
--Task2--
--Task3--
.
.
.
'''

Thread对象的属性和方法

属性 描述
\***Thread对象数据属性*****
name 线程名
ident 线程的标识符
daemon 布尔标志,表示这个线程是否是守护线程
\***Thread对象方法*****
__init__(group=None, target=None, name=None, args=(), kwargs={}, verbose=None, daemon=None) 实例化一个线程对象,需要有一个可调用的target,以及其参数args或者kwargs。还可以传递name或者group参数,不过后者还未实现。此外,verbose标志也是可选的,而daemon的值将会设定thread.daemon属性/标志。
start() 开始执行线程
run() 定义线程功能的方法(通常在子类中被应用开发者重写)
join(timeout=None) 直至启动线程终止之前一直挂起,除非给出了timeout,否则会一直阻塞
getName() 返回线程名
setName(name) 设定线程名
isAlivel/is_alive() 布尔标志,表示这个进程是否还存活
isDaemon() 如果是守护线程,则返回True,否则返回False
setDaemon(daemonic) 把线程的守护标志设定为布尔值daemonic(必须在线程start()之前调用

7. python实现协程

(1) yield + send实现

通过“生产者-消费者”模型来看下协程的应用,生产者产生消息后,直接通过yield跳转到消费者开始执行,带消费者执行完毕后,切换回生产者继续生产。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 利用生成器实现协程
# 来源:https://juejin.im/post/5d888151f265da03dd3db0f5
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print(f"[CONSUMER] Consuming {n} ...")
r = '200 OK'

def producer(c):
# 启动生成器
c.send(None)
n = 0
while n < 5:
n += 1
print(f'[PRODUCER] Producing {n} ...')
r = c.send(n)
print(f'[PRODUCER] Consumer return: {r}')
c.close()


if __name__ == "__main__":
c = consumer()
producer(c)

# 结果
'''
[PRODUCER] Producing 1 ...
[CONSUMER] Consuming 1 ...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2 ...
[CONSUMER] Consuming 2 ...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3 ...
[CONSUMER] Consuming 3 ...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4 ...
[CONSUMER] Consuming 4 ...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5 ...
[CONSUMER] Consuming 5 ...
[PRODUCER] Consumer return: 200 OK
'''

send(msg)next()的区别在于send可以传递参数给yield表达式,这时传递的参数会作为yield表达式的值,而yield的参数是返回给调用者的值。换句话说,就是send可以强行修改上一个yield表达式的值。比如函数中有一个yield赋值a = yield 5,第一次迭代到这里会返回5,a还没有赋值。第二次迭代时,使用send(10),那么就是强行修改yield 5表达式的值为10,本来是5的,结果a = 10send(msg)next()都有返回值,它们的返回值是当前迭代遇到yield时,yield后面表达式的值,其实就是当前迭代中yield后面的参数。第一次调用send时必须是send(None),否则会报错,之所以为None是因为这时候还没有一个yield表达式可以用来赋值。

(2) asyncio + yield from实现

asyncio是Python3.4版本引入的标准库,直接内置了对IO的支持。asyncio的异步操作,需要在coroutine中通过yield from 完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio

@asyncio.coroutine
def test(i):
print("test_1", i)
r = yield from asyncio.sleep(1)
print('test_2', i)

if __name__ == "__main__":
loop = asyncio.get_event_loop()
tasks = [test(i) for i in range(3)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

# 结果
'''
test_1 2
test_1 1
test_1 0
test_2 2
test_2 1
test_2 0
'''

@asyncio.coroutine 把一个generator标记为coroutine类型,然后就把这个coroutine扔到EventLoop中执行。test()会首先打印出test_1,然后yield from语法可以让我们方便地调用另一个generator。由于asyncio.sleep()也是一个coroutine,所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。当asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句。把asyncio.sleep(1)看成是一个耗时1秒的IO操作,在此期间主线程并未等待,而是去执行EventLoop中其他可以执行的coroutine了,因此可以实现并发执行。

(3) asyncio + asnyc/await实现

为了简化并耕海的标识异步IO,从Python3.5开始,引入了新的语法asyncawait,可以让coroutine的代码更简洁易读。实际上,就是async替换@asyncio.coroutine,await替换yield from

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio

async def test(i):
print('test_1', i)
await asyncio.sleep(1)
print('test_2', i)

if __name__ == '__main__':
loop = asyncio.get_event_loop()
tasks = [test(i) for i in range(3)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

# 结果
'''
test_1 2
test_1 1
test_1 0
test_2 2
test_2 1
test_2 0
'''

(4) greenlet 实现

Gevent是一个基于Greenlet实现的网络库,通过greenlet实现协程。基本思想是一个greenlet就认为是一个协程,当一个greenlet遇到IO操作的时候,比如访问网络,就会自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import gevent

def test(n):
for i in range(n):
print(gevent.getcurrent(), i)
# gevent.sleep(1) # 把3个greenlet依次运行改为交替运行

if __name__ == '__main__':
g1 = gevent.spawn(test, 3)
g2 = gevent.spawn(test, 3)
g3 = gevent.spawn(test, 3)

g1.join()
g2.join()
g3.join()

# 结果
'''
<Greenlet at 0x1ee82d8c268: test(3)> 0
<Greenlet at 0x1ee82d8c268: test(3)> 1
<Greenlet at 0x1ee82d8c268: test(3)> 2
<Greenlet at 0x1ee82d8c378: test(3)> 0
<Greenlet at 0x1ee82d8c378: test(3)> 1
<Greenlet at 0x1ee82d8c378: test(3)> 2
<Greenlet at 0x1ee82d8c488: test(3)> 0
<Greenlet at 0x1ee82d8c488: test(3)> 1
<Greenlet at 0x1ee82d8c488: test(3)> 2
'''

当然在实际的代码里,我们不会用gevent.sleep()去切换协程,而是在执行到IO操作时gevent会自动完成,所以gevent需要将Python自带的一些标准库的运行方式由阻塞式调用变为协作式运行。这一过程在启动时通过monkey patch完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from gevent import monkey; monkey.patch_all()
from urllib import request
import gevent

def test(url):
print(f"Get: {url}")
response = request.urlopen(url)
content = response.read().decode("utf-8")
print(f"{len(content)} bytes received from {url}.")

if __name__ == "__main__":
gevent.joinall([
gevent.spawn(test, 'http://httpbin.org/ip'),
gevent.spawn(test, 'http://httpbin.org/uuid'),
gevent.spawn(test, 'http://httpbin.org/user-agent')
])

# 结果,3个网络操作是并发操作的,而且结束的顺序不同,但只有一个线程
'''
Get: http://httpbin.org/ip
Get: http://httpbin.org/uuid
Get: http://httpbin.org/user-agent
46 bytes received from http://httpbin.org/ip.
53 bytes received from http://httpbin.org/uuid.
40 bytes received from http://httpbin.org/user-agent.
'''

8. 进程间通信

Process之间肯定需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue/Pipes等多种方式来交换数据。

(1) 消息队列Queue

简单的理解Queue实现进程间通信的方式,就是使用了操作系统给开辟的一个队列空间,各进程可以把数据放到该队列中,当然也可以从队列中把自己需要的信息取走。

Queue/queue模块常用属性

属性 描述
\***Queue/queue模块的类*****
Queue(maxsize=0) 创建一个先入先出队列。如果给定最大值,这在队列没有空间阻塞;否则为无限队列
LifoQueue(maxsize=0) 创建一个后入先出队列。如果给定最大值,这在队列没有空间阻塞;否则为无限队列
PriorityQueue(maxsize=0) 创建一个优先级队列。如果给定最大值,这在队列没有空间阻塞;否则为无限队列
\***Queue/queue异常*****
Empty 当对空队列调用get*()方法时抛出异常
Full 当对已满的队列调用put*()方法时抛出异常
\***Queue/queue对象方法*****
qsize() 返回队列大小(由于返回时队列大小可能被其他线程修改,所以该值为近似值)
empty() 如果队列为空,则返回True;否则返回False
full() 如果队列已满,则返回True;否则返回False
put(obj[, block=True[, timeout=None]]) 将obj放入队列,如果block参数为True时,一旦队列被写满,则代码就会被阻塞,知道有进程取走数据并腾出空间供obj使用;timeout参数用来设置阻塞的时间,即程序最多在阻塞timeout秒之后,如果还是没有空闲的空间,程序就会抛出queue.Full异常
put_nowait(obj) 该方法等价于put(obj, False)
get([block=True[, timeout=None]]) 从队列中取数据并返回,当block参数为True且timeout为None时,该方法会阻塞当前进程,知道队列中有可用的数据。如果block为False,则进程会直接做取数据的操作,如果取数据失败,则抛出queue.Empty异常(这种情形下timeout参数将不起作用)。如果设置timeout秒数,则当前进程最多被阻塞timeout秒,如果到时依旧没有可用的数据取出,则会抛出queue.Empty异常。
get_nowait() 该方法等价于get(False)
task_done() 用于表示队列中的某个元素已执行完成,该放方法会被下面的join()使用
join() 在队列中所有元素执行完毕并调用上面的task_done()信号钱,保持阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 进程中消息队列读写数据
from multiprocessing import Process, Queue
import os
import time
import random

# 写数据
def write(q):
print(f"写进程:{os.getpid()}")
for i in 'ABC':
print(f'正在往消息队列写入{i}')
q.put(i)
time.sleep(random.random())

# 读数据
def reader(q):
while True:
if not q.empty():
i = q.get()
print(f"从消息队列中读出{i}")
time.sleep(random.random())
else:
break

if __name__ == "__main__":
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start() # 启动子进程pw,写入
pr.start() # 启动子进程pr,读取
pw.join() # 等待pw结束
pr.terminate() # pr进程里是死循环,无法等待期结束,只能强行终止

# 结果
'''
写进程:3808
正在往消息队列写入A
从消息队列中读出A
正在往消息队列写入B
从消息队列中读出B
正在往消息队列写入C
'''

(2) 管道Pipe

通常情况下,管道有两个口,而Pipe也常常用来实现两个进程间的通信,这两个进程分别位于管道的两端,一端用来发送数据,一段用来接收数据。使用Pipe实现进程通信,首先调用multiprocessing.Pipe()函数来创建一个管道。

该函数的语法格式如下:conn1, conn2 = multiprocessing.Pipe([duplex=True]),其中conn1和conn2分别用来接收Pipe函数返回的两个端口;duplex参数默认为True,表示该管道是双向的,即位于两个端口的进程既可以发送数据也可以接收数据,若duplex=False,则表示管道是单通道,conn1只能用来接收数据,而conn2只能用来发送数据。

Pipe对象可调用的方法

属性 描述
send(obj) 发送一个obj给管道的另一端,另一端使用recv()方法就收。该obj必须是可序列化的对象,如果该对象序列化后超过32MB,则很可能会引发ValueError异常。
recv() 接受另一端通过send()方法发送过来的数据。
close() 关闭连接。
poll([timeout]) 返回连接中是否还有数据可以读取。
send_bytes(buffer[, offset[, size]]) 发送直接数据,如果 没有指定offset、size参数,则默认发送buffer字节串的全部数据;如果指定offset和size参数,则发送buffer字节串中从offset开始、长度为size的直接数据;通过该方法发送的数据,应该使用recv_bytes()recv_bytes_info()方法接收。
recv_bytes([maxlength]) 接收通过send_bytes()发送的数据,maxlength指定最多接收的字节数,该方法返回接收到的直接数据。
recv_bytes_info(buffer[, offset]) 功能类似于recv_bytes()方法类似,只是该方法将接收到的数据放在buffer中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 使用Pipe管道实现2个进程之间通信
import multiprocessing

def processFun(conn, name):
print(multiprocessing.current_process().pid, "进程发送数据:", name)
conn.send(name)

if __name__ == '__main__':
# 创建管道
conn1, conn2 = multiprocessing.Pipe()
# 创建子进程
p = multiprocessing.Process(target=processFun, args=(conn1, "http://www.baidu.com"))
# 启动子进程
p.start()
p.join()

print(multiprocessing.current_process().pid, "接收数据:")
print(conn2.recv())

# 结果
'''
5760 进程发送数据: http://www.baidu.com
7760 接收数据:
http://www.baidu.com
'''

(3) 共享内存

共享内存是一种常用的,高效的进程之间的通信方式,为了保证共享内存的有序访问,需要对进程采取额外的同步措施。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from multiprocessing import Process
import mmap
import contextlib
import time

def write():
with contextlib.closing(mmap.mmap(-1, 1024, tagname='cnblogs', access=mmap.ACCESS_WRITE)) as mem:
for share_data in ("Hello", "Alpha_Panda"):
mem.seek(0)
print('Write data:== %s == to share memory!' % share_data)
mem.write(str.encode(share_data))
mem.flush()
time.sleep(0.5)

def read():
while True:
invalid_byte, empty_byte = str.encode('\x00'), str.encode('')
with contextlib.closing(mmap.mmap(-1, 1024, tagname='cnblogs', access=mmap.ACCESS_READ)) as mem:
share_data = mem.read(1024).replace(invalid_byte, empty_byte)
if not share_data:
# 当共享内存没有有效数据时结束read
break
print("Get data:== %s == from share memory!" % share_data.decode())
time.sleep(0.5)

if __name__ == '__main__':
pr = Process(target=read, args=())
pw = Process(target=write, args=())
pw.start()
pr.start()
pw.join()
pr.join()

# 结果
'''
Write data:== Hello == to share memory!
Get data:== Hello == from share memory!
Write data:== Alpha_Panda == to share memory!
Get data:== Alpha_Panda == from share memory!
'''

(4) 信号量

通信原理:给定一个数量对多个进程可见,多个进程都可以操作该数量增减,并根据数量值决定自己的行为。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 实现方法:
"""
from multiprocessing import Semaphore

sem = Semaphore(num)
功能 : 创建信号量对象
参数 : 信号量的初始值
返回值 : 信号量对象

sem.acquire() 将信号量减1 当信号量为0时阻塞
sem.release() 将信号量加1
sem.get_value() 获取信号量数量
"""

from multiprocessing import Process, Semaphore
import os
import time

# 创建信号量
sem = Semaphore(3) # 3表示服务程序最多允许三个进程同时执行事件

def foo():
print(f"进程 {os.getpid()} 想执行事件")
# 获取信号量
sem.acquire() # 减少信号量
print(f"进程 {os.getpid()} 开始执行操作")
start = time.time()
time.sleep(3)
end = time.time()
print(f"睡眠了{end - start: .4f}秒")
print(f"进程 {os.getpid()} 操作执行完毕")
sem.release() # 增加信号量

if __name__ == "__main__":
p = Process(target=foo, args='')
p.start()
p.join()

# 结果
'''
进程 4856 想执行事件
进程 4856 开始执行操作
睡眠了 3.0010秒
进程 4856 操作执行完毕
'''

(5) socket套接字

套接字无疑是通信使用最为广泛的方式了,socket不仅可以跨主机进行通信,甚至有时候可以使用socket在同一主机的不同进程间进行通信。

socket的语法:socket = socket.socket(family, type[, protocal]),family代表地址家族,一般为AF_UNIX,AF_INET和AF_INET6;AF_UNIX用于同一台机器上的进程通信,AF_INET用于IPV4协议的TCP/UDP,AF_INET6用于IPV6协议的TCP/UDP。type表示套接字类型,一般为SOCK_STREAM,SOCK_DGRAM和SOCK_RAM;SOCK_STREAM为流式套接字,用于TCP通信,SOCK_DGRAM为数据报式套接字,用于UDP通信,SOCK_RAM为原始套接字,可以用于处理ICMP/IGMP等网络报文,这是普通套接字无法处理的。protocal代表协议编号,默认为0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 服务端
import socket

# 1、创建服务端的socket对象
sk = socket.socket()

# 2、绑定一个ip和端口
sk.bind(("127.0.0.1", 8888))

# 3、服务器端一直监听是否有客户端进行连接
sk.listen(5)

while True:
# 4、如果有客户端进行连接、则接受客户端的连接
conn, addr = sk.accept() # 返回客户端socket通信对象和客户端的ip

# 5、客户端与服务端进行通信
rev_data = conn.recv(1024)
print('服务端收到客户端发来的消息:%s' % (rev_data.decode('GB2312')))

# 6、服务端给客户端回消息
conn.send(b"HTTP/1.1 200 OK \r\n\r\n") # http协议
show_str = "<h1> 这短短的一生,我们最终都会失去,你不妨大胆一些。爱一个人,攀一座山,追一个梦,加油 !!!</h1>"
conn.send(show_str.encode('GB2312'))

# 7、关闭socket对象
conn.close()

客户端可以自己写,也可以直接通过浏览器访问:http://127.0.0.1:8888

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 客户端
import socket

# 1、创建socket通信对象
clientSocket = socket.socket()

# 2、使用正确的ip和端口去链接服务器
clientSocket.connect(("127.0.0.1", 8888))

# 3、客户端与服务器进行通信
# 给socket服务器发送信息
send_data = "你拼命赚钱的样子虽然有些狼狈。但是自己靠自己的样子真的很美!加油"
clientSocket.send(send_data.encode("GB2312"))

# 接收服务器的响应(服务器回复的消息)
recvData = clientSocket.recv(1024).decode("GB2312")
print("客户端收到服务器恢复的消息:%s" % (recvData))

# 4、关闭socket对象
clientSocket.close()

(6) 信号signal

参考链接

信号是在软件层次上对中断机制的一种模拟(是由系统内核 发出,由于错误内存冲突等原因引起产生的),在原理上,一个进程收到一个信号与处理器收到一个中断请求可以说是一样的。信号是异步的,一个进程不必通过任何操作来等待信号的到达,事实上,进程也不知道信号到底什么时候到达。信号是进程间通信机制中唯一的异步通信机制,可以看作是异步通知,通知接收信号的进程有哪些事情发生了。信号机制经过POSIX(一个针对Unix操作系统的标准化协议)实时扩展后,功能更加强大,除了基本通知功能外,还可以传递附加信息。信号事件的发生有两个来源:硬件来源(比如我们按下了键盘或者其它硬件故障);软件来源。

信号分为可靠信号和不可靠信号,实时信号和非实时信号。

进程有三种方式响应信号: 忽略信号、捕捉信号、执行默认操作。

9. 线程间通信

(1) 互斥锁

互斥锁为资源引入一个状态:锁定/非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
"""
#创建锁
mutex = threading.Lock()
#锁定
mutex.acquire([timeout])
#释放
mutex.release()
"""
import threading

money = 0

def order(n):
global money
money = money + n
money = money - n

class MyThread(threading.Thread):
def __init__(self, thread_name):
threading.Thread.__init__(self, name='线程' + thread_name)
self.thread_name = int(thread_name)

def run(self):
for i in range(1000000):
lock.acquire() # 加锁
order(self.thread_name)
lock.release() # 释放

if __name__ == '__main__':
lock = threading.Lock()
t1 = MyThread('1')
t2 = MyThread('10')
t1.start()
t2.start()
t1.join()
t2.join()
print(money)

# 结果
'''
0
'''

为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。acquire() 和 release() 必须成对出现,也就是说加了几把锁就得释放几把锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import threading
import time

num = 0
mutex = threading.RLock()

class MyThread(threading.Thread):
def run(self):
global num
time.sleep(1)

if mutex.acquire():
num = num + 1
msg = self.name + 'set num to' + str(num)
print(msg)
mutex.acquire()
mutex.release()
mutex.release()

if __name__ == '__main__':
for i in range(5):
t = MyThread()
t.start()

# 结果
'''
Thread-1set num to1
Thread-4set num to2
Thread-3set num to3
Thread-2set num to4
Thread-5set num to5
'''

(2) 信号量

在多线程编程中,为了防止不同的线程同时对一个公用的资源(比如全部变量)进行修改,需要进行同时访问的数量(通常是1)的限制。信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from threading import Semaphore, Thread
import time

# 创建信号量
sema = Semaphore(3) # 限制同时访问资源的数量为3


def foo(tid):
with sema:
print(f"{tid} acquire sema")
time.sleep(1)
print(f"{tid} release sema")


threads = []

for i in range(5):
t = Thread(target=foo, args=(i,))
threads.append(t)
t.start()

for t in threads:
t.join()

# 结果
'''
0 acquire sema
1 acquire sema
2 acquire sema
1 release sema3 acquire sema
0 release sema

4 acquire sema
2 release sema
3 release sema
4 release sema

'''

(3) 条件变量

条件变量可以认为是更高级的锁,比Lock和Rlock的用法更高级,能处理一些复杂的线程同步问题。threading.Condition() 创建一把资源锁(默认是Rlock),提供 acquire() 和 release() 方法,用法和 Rlock 一致。此外 Condition 还提供 wait()、Notify() 和 NotifyAll() 方法。

wait():线程挂起,直到收到一个 Notify() 通知或者超时(可选参数),wait() 必须在线程得到 Rlock 后才能使用。

Notify() :在线程挂起的时候,发送一个通知,让 wait() 等待线程继续运行,Notify() 也必须在线程得到 Rlock 后才能使用。 Notify(n=1),最多唤醒 n 个线程。

NotifyAll() :在线程挂起的时候,发送通知,让所有 wait() 阻塞的线程都继续运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import threading
import time


def test_a():
condition.acquire()
print("李白:中路团一波,推塔")
condition.wait()
print("李白:好的")
condition.notify()
condition.release()


def test_b():
time.sleep(2)
condition.acquire()
print("武则天:等我大冷却")
condition.notify()
condition.wait()
print("武则天:我的大好了,干干干")


if __name__ == '__main__':
condition = threading.Condition()
ta = threading.Thread(target=test_a)
tb = threading.Thread(target=test_b)
ta.start()
tb.start()
ta.join()
tb.join()

# 结果
'''
李白:中路团一波,推塔
武则天:等我大冷却
李白:好的
武则天:我的大好了,干干干
'''

(4) 事件

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用 threading 库中的 Event 对象。Event 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,event 对象中的信号标志被设置假。如果有线程等待一个 event 对象,而这个 event 对象的标志为假,那么这个线程将会被一直阻塞,直至该标志为真。一个线程如果将一个 event 对象的信号标志设置为真,它将唤醒所有等待这个 event 对象的线程。如果一个线程等待一个已经被设置为真的 event 对象,那么它将忽略这个事件,继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
"""
event.wait(): 阻塞线程,知道flag值变为True
event.set(): 设置flag值为True
event.clear(): 修改flag值为False
event.isSet():仅当flag值为True时返回
"""
# 主线程启动子线程后sleep两秒,子线程因为event.wait()被阻塞,当主线程醒来后执行event.set(),子线程才继续运行,两者输出时间差2s

import threading
import time
import datetime

class thread(threading.Thread):
def __init__(self, thread_name):
threading.Thread.__init__(self, name='线程' + thread_name)
self.thread_name = int(thread_name)

def run(self):
event.wait()
print(f"子线程运行时间: {datetime.datetime.now()}")

if __name__ == '__main__':
event = threading.Event()
t1 = thread('0')
t1.start() # 启动子线程
print(f"主线程运行时间:{datetime.datetime.now()}")
time.sleep(2)
event.set() # flag设置为True
t1.join()

# 结果
'''
主线程运行时间:2020-06-15 21:44:54.221804
子线程运行时间: 2020-06-15 21:44:56.222719
'''

(5) 消息队列

使用线程队列有一个要注意的问题是,向队列中添加数据项时并不会复制此数据项,线程间通信实际上是在线程间传递对象引用。如果你担心对象的共享状态,那你最好只传递不可修改的数据结构(如:整型、字符串或者元组)或者一个对象的深拷贝。

在一个进程中,不同子线程负责不同的任务,t1子线程负责获取到数据,t2子线程负责把数据保存的本地,那么他们之间的通信使用Queue来完成。因为在一个进程中,数据变量是共享的,即多个子线程可以对同一个全局变量进行操作修改,Queue是加了锁的安全消息队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# 假设需要爬取博客网站的所有文章详情,先要通过文章列表页爬取所有文章的url,再根据文章的url,爬取文章的具体内容
import threading
from queue import Queue
import time

# 爬取文章详情页
def get_detail_html(detail_url_list, id):
while True:
url = detail_url_list.get()
time.sleep(2) # 延时2s,模拟网络请求
print("thread {id}: get {url} detail finished ".format(id=id,url=url))

# 爬取文章列表页
def get_detail_url(queue):
for i in range(100):
time.sleep(1) # 延时1s,模拟比爬取文章详情要快
queue.put("http://projectedu.com/{id}".format(id=i))
print("get detail url {id} end".format(id=i))

if __name__ == "__main__":
detail_url_queue = Queue(maxsize=1000)
# 先创造两个线程
thread = threading.Thread(target=get_detail_url, args=(detail_url_queue,))
html_thread= []
for i in range(3):
thread2 = threading.Thread(target=get_detail_html, args=(detail_url_queue,i))
html_thread.append(thread2)
start_time = time.time()
# 启动两个线程
thread.start()
for i in range(3):
html_thread[i].start()
# 等待所有线程结束
thread.join()
for i in range(3):
html_thread[i].join()

print("last time: {} s".format(time.time()-start_time))

# 结果
'''
get detail url 0 end
get detail url 1 end
thread 0: get http://projectedu.com/0 detail finished
get detail url 2 end
thread 1: get http://projectedu.com/1 detail finished
get detail url 3 end
thread 2: get http://projectedu.com/2 detail finished
get detail url 4 end
thread 0: get http://projectedu.com/3 detail finished
get detail url 5 end
thread 1: get http://projectedu.com/4 detail finished
get detail url 6 end
thread 2: get http://projectedu.com/5 detail finished
get detail url 7 end
'''

10. 孤儿进程、僵尸进程、守护进程

孤儿进程:父进程退出,子进程还在运行这些子进程都是孤儿进程,孤儿进程将被init进程(进程号为1)说收养,并由init进程对他们完成状态收集工作。

僵尸进程:进程使用fork创建子进程,如果进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的描述符仍然保存在系统中,这些进程就是僵尸进程。避免僵尸进程的方法:

  • fork两次用孙子进程去完成子进程的任务;
  • 用wait()函数使父进程阻塞;
  • 使用信号量,在signal handler中调用waitpid,这样父进程不用阻塞。

守护进程:就是会随着主进程结束而结束的进程,主进程代码执行结束后就终止,而且守护进程内无法开启子进程,否则抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import os
import time
from multiprocessing import Process

class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name

def run(self):
print(f"进程为{os.getpid()},父进程为{os.getppid()}")
print(f"我的名字是{self.name}")

if __name__ == "__main__":
p1 = MyProcess("张三")
p2 = MyProcess("李四")
p2.daemon = True # 默认为False,必须在start()之前上设置
p1.start()
p2.start()
time.sleep(2)
print("主进程结束")

# 结果
'''
进程为436,父进程为11972
我的名字是张三
进程为12140,父进程为11972
我的名字是李四
主进程结束
'''

11. 进程池

Pool进程池模块

Multiprocessing.Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行它。Pool类用于需要执行的目标很多,而手动限制进程数量又太繁琐时,如果目标少且不用控制进程数量则可以用Process类。

进程池Pool语法:Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

  • processes: 进程数,由CPU个数决定,不能操作CPU总数;如果进程是None,那么返回的数字os.cpu_count()
  • initializer: 如果是None,那么每一个工作进程在开始会调用initialize(*initargs)
  • maxtasksperchild: 工作进程退出前可以完成的任务数,完成后用一个新的工作进程来代替原进程,让闲置下来的资源被释放。默认值为None,表示只要Pool存在工作进程就回一直存活。
  • context: 用在只当工作进程启动时的上下文,一般使用multiprocessing.Pool()或者一个context()对象的Pool()方法来创建一个池。

multiprocessing 模块下的Pool类下的方法:

方法 描述
apply(func[, args=()[, kwargs={}]]) 该函数用于传递不定参数,和python中的apply函数一致,主进程会被阻塞知道函数执行结束(不建议使用,python3已经移除)。
apply_async(func[, args=()[, kwargs={}[, callback[, error_callback]]]]) 与apply的用法一致,但是它是非阻塞的且支持结果返回后进行回调,调用失败,将调用error_callback
map(func, iterable[, chunksize=None]) Pool类中的map方法,与内置的map函数用法基本一致,它会使进程阻塞知道结果返回。(注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。)
map_async(func, iterable[, chunksize[, callback[, error_callback]]]) 与map用法一致,但是它是非阻塞的。
imap(func, iterable[, chunksize]) 返回迭代器,next()调用返回的迭代器的方法得到结果,imap()方法有一个可选的超时参数:next(timeout)将引发multiprocessing.TimeoutError异常
close() 防止任何更多的任务被提交到进程池中,一旦完成所有任务,工作进程将退出。
terminate() 立即停止工作进程未完成的工作,当进程池对象被垃圾回收时,terminate()将立即调用。
join() 等待工作进程退出,必须在close()terminate()使用之前join()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from multiprocessing import Pool
import time

def run(fn): # fn函数参数是数据列表的一个元素
time.sleep(1)
print(fn * fn)

if __name__ == "__main__":
s = [1, 2, 3, 4, 5, 6]
print("执行顺序:")
start = time.time()
for fn in s:
run(fn)
end = time.time()
print(f"顺序执行时间:{end-start:.2f}")
print("创建多个进程:")
pool = Pool(10) # 创建拥有10个进程数量的进程池
pool.map(run, s)
pool.close() # 关闭进程池,不在接收新的进程
pool.join() # 主进程阻塞等待子进程退出
t = time.time()
print(f"并行执行时间:{t-end:.2f}")

# 结果
'''
执行顺序:
1
4
9
16
25
36
顺序执行时间:6.00
创建多个进程:
1
9
16
25
4
36
并行执行时间:3.31
'''

12. 守护线程

守护线程:当主线程结束后,子线程也会随之结束,所以当主线程结束后,整个程序就退出了。所谓“线程守护”,就是主线程不管该线程的执行情况,只要是其他子线程结束且主线程执行完毕,主线程都会关闭。也就是说:主线程不等待该守护线程的执行完再去关闭。

守护线程的作用:是为其他线程提供便利服务,守护线程最典型的应用就是GC。

守护线程的特点:

  • 只要当前主线程中尚存在任何一个非守护线程没有结束,守护线程就继续工作;
  • 只有当最后一个非守护线程结束时,守护线程才随着主线程一同结束工作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#  线程不是守护线程
import time
import threading


def fun():
print("start fun")
time.sleep(2)
print("end fun")

def main():
print("main thread")
t1 = threading.Thread(target=fun,args=())
t1.setDaemon(False) # 非守护线程
t1.start()
time.sleep(1)
print("main thread end")

if __name__ == '__main__':
main()

# 结果,主程序在等待子线程结束才退出
'''
main thread
start fun
main thread end
end fun
'''
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 线程是守护线程
import time
import threading


def fun():
print("start fun")
time.sleep(2)
print("end fun")

def main():
print("main thread")
t1 = threading.Thread(target=fun,args=())
t1.setDaemon(True) # 守护线程
t1.start()
time.sleep(1)
print("main thread end")

if __name__ == '__main__':
main()

# 结果,程序在主程序结束后,直接退出。没有等待子线程运行完
'''
main thread
start fun
main thread end
'''
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 多线程情况下
import time
import threading


def fun():
print("start fun")
time.sleep(2)
print("end fun")

def foo():
print("start foo")
time.sleep(1)
print("end foo")

if __name__ == '__main__':
print("main thread")
t1 = threading.Thread(target=fun,args=())
t2 = threading.Thread(target=foo,args=())
t1.setDaemon(True) # 守护线程
t1.start()
t2.start()
time.sleep(1)
print("main thread end")

# 结果,程序在主程序结束后,等待非守护线程的foo执行完,才退出。而没有等待守护线程func运行完
'''
main thread
start fun
start foo
main thread end
end foo
'''

13. 线程池

PYTHON线程池及其原理和使用

系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。这种情形下,使用线程池可以很好地性能,尤其是当程序中需要大量存活期很短暂的线程时,更应该考虑使用线程池。

线程池在系统启动时创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。

此外,使用线程池可以有效控制系统中并发线程的数量。当西戎中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致Python解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过此数。

线程池的使用

线程池的基类是concurrent.futures模块中的Executor,包含两个子类:ThreadPoolExecutor(用于创建线程池),ProcessPoolExecutor用于创建进程池。

Executor中的常用方法:

方法 描述
submit(func, *args, **kwargs) 将函数func提交给线程池,*args表示传给func函数的参数,**kwargs表示以关键字形式给func传参
map(func, *iterables, timeout=None, chunksize=1) 该函数类似于python中的map()函数的用法,只是个该函数将会启动多个线程,以异步的方式立即对可迭代对象iterables进行map处理
shutdown(wait=True) 关闭线程池,不再接受性任务,会将以前所有提交的任务执行完成

程序task函数提交(submit)给线程池后,submit方法会返回一个Future对象,Future类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步的方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以Python使用Future来代表。

Future中的常用方法:

方法 描述
cancel() 取消该Future代表的线程任务,若该任务正在执行,不可取消,返回False;否则,程序会取消该任务,并返回True
cancelled() 返回Future代表的线程任务是否被成功取消
running() 若该Future代表的线程任务正在执行,不可取消,该方法返回True
done() 若该Future代表的线程任务被成功取消或者执行完成,该方法返回True
result(timeout=None) 获取该Future代表的线程任务最后的返回的结果,若任务还未完成,该方法将会阻塞当前线程,其中timeout参数指定阻塞持续时间
exception(timeout=None) 获取该Future代表的线程任务说引发的异常,若任务成功完成,且没有异常,该方法返回None
add_done_callback(func) 为该Future代表的线程任务注册一个”回调函数”,当该任务成功完成时,程序会自动触发func函数 ,并将对应的 Future 对象作为参数传给该回调函数

使用线程池来执行线程任务的步骤:

  • 调用ThreadPoolExecutor类的构造器创建一个线程池;
  • 定义一个普通函数作为线程任务;
  • 调用ThreadPoolExecutor对象的submit()方法来提交线程任务;
  • 当不想提交任何任务时,调用ThreadPoolExecutor对象的shutdown()方法来关闭线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from concurrent.futures import ThreadPoolExecutor
import threading
import time

# 定义一个线程任务函数
def func(x, y):
print(f"{threading.current_thread().name} is running, x={x}, y={y}.")
# time.sleep(2)
return x * y

def get_result(future):
"""
如果程序不希望直接调用 result() 方法阻塞线程,则可通过 Future 的 add_done_callback() 方法来添加回调函数,该回调函数形如 fn(future)。当线程任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数
"""
print(future.result())

def process():
# 创建一个包含2个子线程的线程池,且线程名为以“thread_”开头
pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread_")
# 向线程池提交一个task
for i in range(5):
future = pool.submit(func, i, i+1)
print(future.done())
future.add_done_callback(get_result)
pool.shutdown(wait=True)
print('process finished')

if __name__ == "__main__":
process()

# 结果
'''
thread__0 is running, x=0, y=1.
False
0
thread__0 is running, x=1, y=2.
True
2
False
thread__0 is running, x=2, y=3.
6
False
thread__1 is running, x=3, y=4.
12
False
thread__0 is running, x=4, y=5.
20
process finished
'''
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
"""
使用 with 语句来管理线程池,这样即可避免手动关闭线程池,通过Exectuor里的map()方法,为 iterables 的每个元素启动一个线程,以并发方式来执行 action 函数,并收集线程任务的返回值
"""
from concurrent.futures import ThreadPoolExecutor
import threading
import time

# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
# 创建一个包含4条线程的线程池
with ThreadPoolExecutor(max_workers=4) as pool:
# 使用线程执行map计算
# 后面元组有3个元素,因此程序启动3条线程来执行action函数
results = pool.map(action, (3, 5, 7))
print('--------------')
for r in results:
print(r)

# 结果
'''
ThreadPoolExecutor-0_0 0
ThreadPoolExecutor-0_0 1
ThreadPoolExecutor-0_0 2
ThreadPoolExecutor-0_0 0
ThreadPoolExecutor-0_0 1
ThreadPoolExecutor-0_0 2
ThreadPoolExecutor-0_0 3
ThreadPoolExecutor-0_0 4
ThreadPoolExecutor-0_0 0
ThreadPoolExecutor-0_0 1
ThreadPoolExecutor-0_0 2
ThreadPoolExecutor-0_0 3
ThreadPoolExecutor-0_0 4
ThreadPoolExecutor-0_0 5
ThreadPoolExecutor-0_0 6
--------------

3
10
21
'''

线程池的优点:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

14. 线程安全

线程安全就是多线程访问时,采用了加锁机制,当一个线程访问该类的某个数据时,进行保护,其他线程不能进行访问知道该线程读完,其他线程才能使用,这样就不会出现数据不一致或数据污染。

当多个线程访问某个方法时,不管通过怎样的调用方式或说这些线程如何交替执行,在主程序中不需要去做任何的同步,这个类的结果行为都是我们设想的正切行为,那么我们就可以说这个类是线程安全的。

15. 多线程资源竞争,怎么解决

线程是独立的,同一个进程里线程的数据共享的,当各个线程访问数据资源时会出现竞争状态,即数据几乎同时被多个线程占用,造成数据混乱,所谓的线程的不安全。

添加锁,确保了某段关键代码(共享数据资源)只能由一个线程从头到尾完整地执行,则能够解决多线程资源进程下原子操作的问题;但是添加锁阻止了多线程并发操作,包含锁的某段代码实际上只能以单线程模式执行,效率就大大下降;其次,由于可以存在多个锁,不同的线程持有不同的说,并试图获取对方持有的锁时,可能造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。

16. 有几种锁

  • 互斥锁

在编程中,引入了对象互斥锁的概念,来保证共享数据操作的完整性。每个对象都对应于一个可称为” 互斥锁” 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象。同一个进行中的多线程是共享系统资源的,,多个线程同时对一个对象进行操作,一个线程操作尚未结束,另一个线程已经对其进行操作,导致最终结果出现错误,此时需要对被操作对象添加互斥锁,保证每个线程对该对象的操作都得到正确的结果。

  • 可重入锁

可重入锁,也叫做递归锁,指的是在同一线程内,外层函数获得锁之后,内层递归函数仍然可以获取到该锁。换一种说法,同一个线程再次进入同步代码时,可以使用自己已获取到的锁,防止在同一线程中多次获取锁而导致死锁发生。

  • 死锁

若干个线程资源竞争时,都在等待对方对某部分资源解除占用状态,结果谁也不愿意先解锁,互相干等着,程序无法执行下去,这就是死锁。

17. 调度算法

常见的调度算法总结

(1) 先来先服务(FCFS, First Come First Service)

先来先服务(FCFS)调度算法是一种最简单的调度算法,该算法既可用于作业调度, 也可用于进程调度。FCFS算法比较有利于长作业(进程),而不利于短作业(进程)。由此可知,本算法适合于CPU繁忙型作业, 而不利于I/O繁忙型的作业(进程)。

(2) 短作业优先(SJF, Shortest Job First)

短作业(进程)优先调度算法(SJ/PF)是指对短作业或短进程优先调度的算法,该算法既可用于作业调度, 也可用于进程调度。但其对长作业不利;不能保证紧迫性作业(进程)被及时处理;作业的长短只是被估算出来的。

(3) 最高优先权调度(Priority Scheduling)

为了照顾紧迫性作业,使之进入系统后便获得优先处理,引入了最高优先权优先(FPF)调度算法。 此算法常被用在批处理系统中,作为作业调度算法,也作为多种操作系统中的进程调度,还可以用于实时系统中。当其用于作业调度, 将后备队列中若干个优先权最高的作业装入内存。当其用于进程调度时,把处理机分配给就绪队列中优先权最高的进程,此时, 又可以进一步把该算法分成以下两种:

  • 非抢占式优先权算法
  • 抢占式优先权算法(高性能计算机系统)

高响应比优先调度算法为了弥补短作业优先算法的不足,我们引入动态优先权,使作业的优先等级随着等待时间的增加而以速率a提高。 该优先权变化规律可描述为:优先权=(等待时间+要求服务时间)/要求服务时间 =(响应时间)/要求服务时间

(4) 时间片轮转(RR, Round Robin)

时间片轮转法一般用于进程调度,每次调度,把CPU分配队首进程,并令其执行一个时间片。 当执行的时间片用完时,由一个记时器发出一个时钟中断请求,该进程被停止,并被送往就绪队列末尾;依次循环。

(5) 多级反馈队列调度(Multilevel Feedback Queue Scheduling)

多级反馈队列调度算法多级反馈队列调度算法,不必事先知道各种进程所需要执行的时间,它是目前被公认的一种较好的进程调度算法。其实施过程如下:

  • 设置多个就绪队列,并为各个队列赋予不同的优先级,在优先权越高的队列中,为每个进程所规定的的执行时间片就越小;
  • 当一个新进程进入内存后,首先放入第一队列的末尾,按FCFC原则排队等候调度。如果它能在一个时间片中完成,便可撤离;如果未完成,就转入第二队列的末尾,在同样等待调度…如此下去,当一个长作业(进程)从第一队列一次将到第n队列(最后队列后),变按第n队列时间片轮转运行;
  • 仅当第一队列空闲时,调度程序才调度第二队列中的进程运行;仅当第一到第(i-1)队列空时,才会调度第i队列中的进程运行,并执行相应的时间片轮转。
  • 如果处理机正在处理第i队列中某进程,又有新进程进入优先级较高的队列,则此新队列抢占正在运行的处理机,并把正在运行的进行放在第i队列的队尾。

18. TreadLocal

19. nginx如何处理连接

20. nginx如何做性能优化

21. nginx与apache的区别

22. nginx超时怎么办

Linux

1. linux常用命令

2. Linux内存管理机制

3.使用什么命令查看cpu和内存情况

4. 如何查看进程的线程情况

top -H -p

5. 如何查看系统性能、性能指标

6. 查看系统负载

7. linux文件类型、文件管理

8. cup负载过高怎么解决

  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.
  • Copyrights © 2019-2020 holysll
  • Visitors: | Views: