Friday, March 1, 2024

Python and random task distribution in ThreadPoolExecutor

Not a usual article for me, but will leave this boilerplate here for myself.

So, the task was the following: send multiple PUBLISH messages over SIPP, but not overload the SIP Proxy, and distribute them in time, like not sending all at once. Yes, I do know SIPP can do this, but I wanted to have SIPP exit code after sending each message and try to re-send it in case of issues. Order of sending is whatever. Just need to be delivered.

Here is the boilerplate Python function that I've used to achieve this

#!/usr/bin/python
import logging
import time
import random

from concurrent.futures import ThreadPoolExecutor

logging.getLogger('paramiko').setLevel(logging.INFO)

logging.basicConfig(
    format = '[%(asctime)s.%(msecs)03d] %(threadName)s %(name)s %(levelname)s: %(message)s',
    level=logging.INFO,
)

def get_timer_delay():
    '''
    Generator to be passed in ThreadPool to have delays in multiple iterated value processes.
    random delay between 5 and 200 ms. When reaching 1 sec - resets back to 0
    '''
    num = 0.0
    while True:
        yield num
        if num < 1:
            num += float(random.randrange(5, 200, 3)) / 1000
        else:
            num = 0.0

def process_iterated_value(value, start_delay):
    '''
    This function is called in thread-wise way to have multiple values processed at the moment
    '''
    time.sleep(start_delay)

    # Call SIPP process here (with exit code control)

    logging.info(f"Processed value {value} with delay {start_delay}")

#### ---------------- Script start ---------------- ########

execute_timeout = get_timer_delay()
iterated_value = range(0, 30)

logging.info("Process start")

with ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(process_iterated_value, iterated_value, execute_timeout)


logging.info("Process end")

Output is looks like this. All values are processed in a more-or-less distributed way that can be seen by timestamps

[00:06:43,375.375] MainThread root INFO: Process start
[00:06:43,375.375] ThreadPoolExecutor-0_0 root INFO: Processed value 0 with delay 0.0
[00:06:43,504.504] ThreadPoolExecutor-0_0 root INFO: Processed value 1 with delay 0.128
[00:06:43,620.620] ThreadPoolExecutor-0_1 root INFO: Processed value 2 with delay 0.244
[00:06:43,794.794] ThreadPoolExecutor-0_2 root INFO: Processed value 3 with delay 0.417
[00:06:43,928.928] ThreadPoolExecutor-0_3 root INFO: Processed value 4 with delay 0.5509999999999999
[00:06:44,053.053] ThreadPoolExecutor-0_4 root INFO: Processed value 5 with delay 0.6759999999999999
[00:06:44,334.334] ThreadPoolExecutor-0_0 root INFO: Processed value 6 with delay 0.828
[00:06:44,334.334] ThreadPoolExecutor-0_0 root INFO: Processed value 11 with delay 0.0
[00:06:44,475.475] ThreadPoolExecutor-0_0 root INFO: Processed value 12 with delay 0.14
[00:06:44,590.590] ThreadPoolExecutor-0_1 root INFO: Processed value 7 with delay 0.968
[00:06:44,689.689] ThreadPoolExecutor-0_0 root INFO: Processed value 13 with delay 0.21400000000000002
[00:06:44,769.769] ThreadPoolExecutor-0_2 root INFO: Processed value 8 with delay 0.973
[00:06:44,914.914] ThreadPoolExecutor-0_3 root INFO: Processed value 9 with delay 0.984
[00:06:44,941.941] ThreadPoolExecutor-0_1 root INFO: Processed value 14 with delay 0.35100000000000003
[00:06:45,185.185] ThreadPoolExecutor-0_0 root INFO: Processed value 15 with delay 0.494
[00:06:45,185.185] ThreadPoolExecutor-0_4 root INFO: Processed value 10 with delay 1.13
[00:06:45,413.413] ThreadPoolExecutor-0_2 root INFO: Processed value 16 with delay 0.643
[00:06:45,641.641] ThreadPoolExecutor-0_3 root INFO: Processed value 17 with delay 0.726
[00:06:45,686.686] ThreadPoolExecutor-0_1 root INFO: Processed value 18 with delay 0.743
[00:06:45,686.686] ThreadPoolExecutor-0_1 root INFO: Processed value 23 with delay 0.0
[00:06:45,740.740] ThreadPoolExecutor-0_1 root INFO: Processed value 24 with delay 0.053
[00:06:45,810.810] ThreadPoolExecutor-0_1 root INFO: Processed value 25 with delay 0.07
[00:06:45,886.886] ThreadPoolExecutor-0_1 root INFO: Processed value 26 with delay 0.07500000000000001
[00:06:45,952.952] ThreadPoolExecutor-0_0 root INFO: Processed value 19 with delay 0.766
[00:06:46,021.021] ThreadPoolExecutor-0_1 root INFO: Processed value 27 with delay 0.134
[00:06:46,071.071] ThreadPoolExecutor-0_4 root INFO: Processed value 20 with delay 0.885
[00:06:46,142.142] ThreadPoolExecutor-0_0 root INFO: Processed value 28 with delay 0.19
[00:06:46,354.354] ThreadPoolExecutor-0_1 root INFO: Processed value 29 with delay 0.33299999999999996
[00:06:46,376.376] ThreadPoolExecutor-0_2 root INFO: Processed value 21 with delay 0.962
[00:06:46,721.721] ThreadPoolExecutor-0_3 root INFO: Processed value 22 with delay 1.078
[00:06:46,721.721] MainThread root INFO: Process end
 

For sure, all values need to be adjusted after, but the idea is there.

The only thing that bothers, I'm not 100% sure how generator value num would behave in a multithread environment due to Python GIL. But here I'm asking for someone who has a Python knowledge to comment.