从定义一个(包含错误的)自定义进程类开始:
import multiprocessing as mp
class MyProc(mp.Process):
def __init__(self) -> None:
super().__init__()
print('proc init')
self.unpickle = lambda x: x+2
def run(self):
print('subproc')
print(self.unpickle)
print(g)
print('outside')
if __name__ == '__main__':
print('main')
g = random.random()
p = MyProc()
p.start()
MyProc()
这里执行BaseProcess的构造函数,做一些变量赋值之类的操作,目前还是在主进程里。start函数真正启动一个子进程。
Process.Start() 做了什么
部分源码如下:
class BaseProcess(object):
def start(self):
'''
Start child process
'''
self._check_closed()
assert self._popen is None, 'cannot start a process twice'
assert self._parent_pid == os.getpid(), \
'can only start a process object created by current process'
assert not _current_process._config.get('daemon'), \
'daemonic processes are not allowed to have children'
_cleanup()
self._popen = self._Popen(self)
self._sentinel = self._popen.sentinel
# Avoid a refcycle if the target function holds an indirect
# reference to the process object (see bpo-30775)
del self._target, se
class Process(process.BaseProcess):
_start_method = None
@staticmethod
def _Popen(process_obj):
return _default_context.get_context().Process._Popen(process_obj)
关键是这一句
self._popen = self._Popen(self),Process子类重载并实现了_Popen方法。在Windows平台上,context只有spawn一种,即通过WinAPI
CreateProcess实现启动一个新进程。我们来看看具体是在哪儿、如何调用的。
在往下看之前,先明确几个变量是干什么的:
_default_context.get_context()
这里先获得了一个SpawnContext对象,它只有一个比较重要的属性即Process。该属性是一个SpawnProcess对象,它是一个BaseProcess的子类,其中在_Popen方法构造了一个Popen实例,在该实例的构造函数里将实现对CreateProcess的调用。
参数process_obj即来自self._popen = self._Popen(self)中的self,是自定义进程类的一个实例。
涉及到的部分代码如下:
class SpawnProcess(process.BaseProcess):
_start_method = 'spawn'
@staticmethod
def _Popen(process_obj):
from .popen_spawn_win32 import Popen
return Popen(process_obj)
class SpawnContext(BaseContext):
_name = 'spawn'
Process = SpawnProcess
_concrete_contexts = {
'spawn': SpawnContext(),
}
_default_context = DefaultContext(_concrete_contexts['spawn'])
来看下Popen对象的构造函数:
class Popen(object):
'''
Start a subprocess to run the code of a process object
'''
method = 'spawn'
def __init__(self, process_obj):
prep_data = spawn.get_preparation_data(process_obj._name)
# read end of pipe will be "stolen" by the child process
# -- see spawn_main() in spawn.py.
rhandle, whandle = _winapi.CreatePipe(None, 0) # 打开一个管道,分别拿到读句柄和写句柄
wfd = msvcrt.open_osfhandle(whandle, 0)
# 构造启动命令行。结果形如:python -c "from multiprocessing.spawn import spawn_main; spawn_main(xxx); --multiprocessing-fork"
cmd = spawn.get_command_line(parent_pid=os.getpid(),
pipe_handle=rhandle)
cmd = ' '.join('"%s"' % x for x in cmd)
python_exe = spawn.get_executable()
# bpo-35797: When running in a venv, we bypass the redirect
# executor and launch our base Python.
if WINENV and _path_eq(python_exe, sys.executable):
python_exe = sys._base_executable
env = os.environ.copy()
env["__PYVENV_LAUNCHER__"] = sys.executable
else:
env = None
with open(wfd, 'wb', closefd=True) as to_child:
# start process
try:
# 传入相同命令行,启动新进程,返回(新进程句柄、线程句柄、进程ID、线程ID)四元组。
hp, ht, pid, tid = _winapi.CreateProcess(
python_exe, cmd,
None, None, False, 0, env, None, None)
_winapi.CloseHandle(ht)
except:
_winapi.CloseHandle(rhandle)
raise
# set attributes of self
self.pid = pid
self.returncode = None
self._handle = hp
self.sentinel = int(hp)
self.finalizer = util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
# send information to child
set_spawning_popen(self)
try:
reduction.dump(prep_data, to_child)
# 以Pickle的方式,把自定义进程中的属性通过写管道发送给子进程。
reduction.dump(process_obj, to_child)
finally:
set_spawning_popen(None)
如何让新进程启动后去执行自定义类的
run()
注意到,新进程的启动命令是一行python代码,它会执行
spawn_main。然后查看spawn_main的源码可以发现它从管道中反序列化原进程对象,执行一些必要的初始化工作之后,最终通过Process._bootstrap调用Process.run。至此,一个新进程就启动起来了。
一个正确的自定义进程
可以看到,spawn方式启动不像fork那样,而是新启动一个进程,这个新进程并不会执行在if __name__ == '__main__'包围下的代码,因此全局变量g不会被管道发送;同时,MyProc.unpickle属性是一个lambda表达式,它是不能被序列化的,因此无法被发送到子进程。程序会执行失败。修改后的代码如下:
import multiprocessing as mp
class MyProc(mp.Process):
def __init__(self) -> None:
super().__init__()
print('proc init')
def run(self):
print('subproc')
print('outside')
if __name__ == '__main__':
print('main')
g = random.random()
p = MyProc()
p.start()