2626 PENDING , RUNNING , CANCELLED , CANCELLED_AND_NOTIFIED , FINISHED , Future ,
2727 BrokenExecutor )
2828from concurrent .futures .process import BrokenProcessPool , _check_system_limits
29- from multiprocessing import get_context
3029
3130import multiprocessing .process
3231import multiprocessing .util
32+ import multiprocessing as mp
3333
3434
3535if support .check_sanitizer (address = True , memory = True ):
@@ -131,7 +131,6 @@ def setUp(self):
131131 self .executor = self .executor_type (
132132 max_workers = self .worker_count ,
133133 ** self .executor_kwargs )
134- self ._prime_executor ()
135134
136135 def tearDown (self ):
137136 self .executor .shutdown (wait = True )
@@ -145,15 +144,7 @@ def tearDown(self):
145144 super ().tearDown ()
146145
147146 def get_context (self ):
148- return get_context (self .ctx )
149-
150- def _prime_executor (self ):
151- # Make sure that the executor is ready to do work before running the
152- # tests. This should reduce the probability of timeouts in the tests.
153- futures = [self .executor .submit (time .sleep , 0.1 )
154- for _ in range (self .worker_count )]
155- for f in futures :
156- f .result ()
147+ return mp .get_context (self .ctx )
157148
158149
159150class ThreadPoolMixin (ExecutorMixin ):
@@ -276,9 +267,6 @@ def test_initializer(self):
276267 with self .assertRaises (BrokenExecutor ):
277268 self .executor .submit (get_init_status )
278269
279- def _prime_executor (self ):
280- pass
281-
282270 @contextlib .contextmanager
283271 def _assert_logged (self , msg ):
284272 if self .log_queue is not None :
@@ -365,14 +353,14 @@ def test_hang_issue12364(self):
365353 f .result ()
366354
367355 def test_cancel_futures (self ):
368- executor = self .executor_type ( max_workers = 3 )
369- fs = [executor .submit (time .sleep , .1 ) for _ in range (50 )]
370- executor .shutdown (cancel_futures = True )
356+ assert self .worker_count <= 5 , "test needs few workers"
357+ fs = [self . executor .submit (time .sleep , .1 ) for _ in range (50 )]
358+ self . executor .shutdown (cancel_futures = True )
371359 # We can't guarantee the exact number of cancellations, but we can
372- # guarantee that *some* were cancelled. With setting max_workers to 3,
373- # most of the submitted futures should have been cancelled.
360+ # guarantee that *some* were cancelled. With few workers, many of
361+ # the submitted futures should have been cancelled.
374362 cancelled = [fut for fut in fs if fut .cancelled ()]
375- self .assertTrue (len (cancelled ) >= 35 , msg = f" { len ( cancelled ) = } " )
363+ self .assertGreater (len (cancelled ), 20 )
376364
377365 # Ensure the other futures were able to finish.
378366 # Use "not fut.cancelled()" instead of "fut.done()" to include futures
@@ -385,33 +373,32 @@ def test_cancel_futures(self):
385373 # Similar to the number of cancelled futures, we can't guarantee the
386374 # exact number that completed. But, we can guarantee that at least
387375 # one finished.
388- self .assertTrue (len (others ) > 0 , msg = f" { len ( others ) = } " )
376+ self .assertGreater (len (others ), 0 )
389377
390- def test_hang_issue39205 (self ):
378+ def test_hang_gh83386 (self ):
391379 """shutdown(wait=False) doesn't hang at exit with running futures.
392380
393- See https://bugs. python.org/issue39205 .
381+ See https://github.com/ python/cpython/issues/83386 .
394382 """
395383 if self .executor_type == futures .ProcessPoolExecutor :
396384 raise unittest .SkipTest (
397- "Hangs due to https://bugs. python.org/issue39205 " )
385+ "Hangs, see https://github.com/ python/cpython/issues/83386 " )
398386
399387 rc , out , err = assert_python_ok ('-c' , """if True:
400388 from concurrent.futures import {executor_type}
401389 from test.test_concurrent_futures import sleep_and_print
402390 if __name__ == "__main__":
391+ if {context!r}: multiprocessing.set_start_method({context!r})
403392 t = {executor_type}(max_workers=3)
404393 t.submit(sleep_and_print, 1.0, "apple")
405394 t.shutdown(wait=False)
406- """ .format (executor_type = self .executor_type .__name__ ))
395+ """ .format (executor_type = self .executor_type .__name__ ,
396+ context = getattr (self , 'ctx' , None )))
407397 self .assertFalse (err )
408398 self .assertEqual (out .strip (), b"apple" )
409399
410400
411401class ThreadPoolShutdownTest (ThreadPoolMixin , ExecutorShutdownTest , BaseTestCase ):
412- def _prime_executor (self ):
413- pass
414-
415402 def test_threads_terminate (self ):
416403 def acquire_lock (lock ):
417404 lock .acquire ()
@@ -506,14 +493,11 @@ def test_cancel_futures_wait_false(self):
506493
507494
508495class ProcessPoolShutdownTest (ExecutorShutdownTest ):
509- def _prime_executor (self ):
510- pass
511-
512496 def test_processes_terminate (self ):
513497 def acquire_lock (lock ):
514498 lock .acquire ()
515499
516- mp_context = get_context ()
500+ mp_context = self . get_context ()
517501 sem = mp_context .Semaphore (0 )
518502 for _ in range (3 ):
519503 self .executor .submit (acquire_lock , sem )
@@ -527,7 +511,8 @@ def acquire_lock(lock):
527511 p .join ()
528512
529513 def test_context_manager_shutdown (self ):
530- with futures .ProcessPoolExecutor (max_workers = 5 ) as e :
514+ with futures .ProcessPoolExecutor (
515+ max_workers = 5 , mp_context = self .get_context ()) as e :
531516 processes = e ._processes
532517 self .assertEqual (list (e .map (abs , range (- 5 , 5 ))),
533518 [5 , 4 , 3 , 2 , 1 , 0 , 1 , 2 , 3 , 4 ])
@@ -536,7 +521,8 @@ def test_context_manager_shutdown(self):
536521 p .join ()
537522
538523 def test_del_shutdown (self ):
539- executor = futures .ProcessPoolExecutor (max_workers = 5 )
524+ executor = futures .ProcessPoolExecutor (
525+ max_workers = 5 , mp_context = self .get_context ())
540526 res = executor .map (abs , range (- 5 , 5 ))
541527 executor_manager_thread = executor ._executor_manager_thread
542528 processes = executor ._processes
@@ -559,7 +545,8 @@ def test_del_shutdown(self):
559545 def test_shutdown_no_wait (self ):
560546 # Ensure that the executor cleans up the processes when calling
561547 # shutdown with wait=False
562- executor = futures .ProcessPoolExecutor (max_workers = 5 )
548+ executor = futures .ProcessPoolExecutor (
549+ max_workers = 5 , mp_context = self .get_context ())
563550 res = executor .map (abs , range (- 5 , 5 ))
564551 processes = executor ._processes
565552 call_queue = executor ._call_queue
@@ -936,7 +923,7 @@ def submit(pool):
936923 pool .submit (submit , pool )
937924
938925 for _ in range (50 ):
939- with futures .ProcessPoolExecutor (1 , mp_context = get_context ('fork' )) as workers :
926+ with futures .ProcessPoolExecutor (1 , mp_context = mp . get_context ('fork' )) as workers :
940927 workers .submit (tuple )
941928
942929
@@ -1006,7 +993,7 @@ def test_traceback(self):
1006993 def test_ressources_gced_in_workers (self ):
1007994 # Ensure that argument for a job are correctly gc-ed after the job
1008995 # is finished
1009- mgr = get_context ( self .ctx ).Manager ()
996+ mgr = self .get_context ( ).Manager ()
1010997 obj = EventfulGCObj (mgr )
1011998 future = self .executor .submit (id , obj )
1012999 future .result ()
@@ -1022,36 +1009,37 @@ def test_ressources_gced_in_workers(self):
10221009 mgr .join ()
10231010
10241011 def test_saturation (self ):
1025- executor = self .executor_type ( 4 )
1026- mp_context = get_context ()
1012+ executor = self .executor
1013+ mp_context = self . get_context ()
10271014 sem = mp_context .Semaphore (0 )
10281015 job_count = 15 * executor ._max_workers
1029- try :
1030- for _ in range (job_count ):
1031- executor .submit (sem .acquire )
1032- self .assertEqual (len (executor ._processes ), executor ._max_workers )
1033- for _ in range (job_count ):
1034- sem .release ()
1035- finally :
1036- executor .shutdown ()
1016+ for _ in range (job_count ):
1017+ executor .submit (sem .acquire )
1018+ self .assertEqual (len (executor ._processes ), executor ._max_workers )
1019+ for _ in range (job_count ):
1020+ sem .release ()
10371021
10381022 def test_idle_process_reuse_one (self ):
1039- executor = self .executor_type (4 )
1023+ executor = self .executor
1024+ assert executor ._max_workers >= 4
10401025 executor .submit (mul , 21 , 2 ).result ()
10411026 executor .submit (mul , 6 , 7 ).result ()
10421027 executor .submit (mul , 3 , 14 ).result ()
10431028 self .assertEqual (len (executor ._processes ), 1 )
1044- executor .shutdown ()
10451029
10461030 def test_idle_process_reuse_multiple (self ):
1047- executor = self .executor_type (4 )
1031+ executor = self .executor
1032+ assert executor ._max_workers <= 5
10481033 executor .submit (mul , 12 , 7 ).result ()
10491034 executor .submit (mul , 33 , 25 )
10501035 executor .submit (mul , 25 , 26 ).result ()
10511036 executor .submit (mul , 18 , 29 )
1052- self .assertLessEqual (len (executor ._processes ), 2 )
1037+ executor .submit (mul , 1 , 2 ).result ()
1038+ executor .submit (mul , 0 , 9 )
1039+ self .assertLessEqual (len (executor ._processes ), 3 )
10531040 executor .shutdown ()
10541041
1042+
10551043create_executor_tests (ProcessPoolExecutorTest ,
10561044 executor_mixins = (ProcessPoolForkMixin ,
10571045 ProcessPoolForkserverMixin ,
@@ -1153,7 +1141,7 @@ def _check_crash(self, error, func, *args, ignore_stderr=False):
11531141 self .executor .shutdown (wait = True )
11541142
11551143 executor = self .executor_type (
1156- max_workers = 2 , mp_context = get_context ( self .ctx ))
1144+ max_workers = 2 , mp_context = self .get_context ( ))
11571145 res = executor .submit (func , * args )
11581146
11591147 if ignore_stderr :
@@ -1232,7 +1220,7 @@ def test_shutdown_deadlock(self):
12321220 # if a worker fails after the shutdown call.
12331221 self .executor .shutdown (wait = True )
12341222 with self .executor_type (max_workers = 2 ,
1235- mp_context = get_context ( self .ctx )) as executor :
1223+ mp_context = self .get_context ( )) as executor :
12361224 self .executor = executor # Allow clean up in fail_on_deadlock
12371225 f = executor .submit (_crash , delay = .1 )
12381226 executor .shutdown (wait = True )
@@ -1245,7 +1233,7 @@ def test_shutdown_deadlock_pickle(self):
12451233 # Reported in bpo-39104.
12461234 self .executor .shutdown (wait = True )
12471235 with self .executor_type (max_workers = 2 ,
1248- mp_context = get_context ( self .ctx )) as executor :
1236+ mp_context = self .get_context ( )) as executor :
12491237 self .executor = executor # Allow clean up in fail_on_deadlock
12501238
12511239 # Start the executor and get the executor_manager_thread to collect
0 commit comments