Showing posts with label python. Show all posts
Showing posts with label python. Show all posts

Friday, March 1, 2024

Python and random task distribution in ThreadPoolExecutor

Not a usual article for me, but will leave this boilerplate here for myself.

So, the task was the following: send multiple PUBLISH messages over SIPP, but not overload the SIP Proxy, and distribute them in time, like not sending all at once. Yes, I do know SIPP can do this, but I wanted to have SIPP exit code after sending each message and try to re-send it in case of issues. Order of sending is whatever. Just need to be delivered.

Here is the boilerplate Python function that I've used to achieve this

#!/usr/bin/python
import logging
import time
import random

from concurrent.futures import ThreadPoolExecutor

logging.getLogger('paramiko').setLevel(logging.INFO)

logging.basicConfig(
    format = '[%(asctime)s.%(msecs)03d] %(threadName)s %(name)s %(levelname)s: %(message)s',
    level=logging.INFO,
)

def get_timer_delay():
    '''
    Generator to be passed in ThreadPool to have delays in multiple iterated value processes.
    random delay between 5 and 200 ms. When reaching 1 sec - resets back to 0
    '''
    num = 0.0
    while True:
        yield num
        if num < 1:
            num += float(random.randrange(5, 200, 3)) / 1000
        else:
            num = 0.0

def process_iterated_value(value, start_delay):
    '''
    This function is called in thread-wise way to have multiple values processed at the moment
    '''
    time.sleep(start_delay)

    # Call SIPP process here (with exit code control)

    logging.info(f"Processed value {value} with delay {start_delay}")

#### ---------------- Script start ---------------- ########

execute_timeout = get_timer_delay()
iterated_value = range(0, 30)

logging.info("Process start")

with ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(process_iterated_value, iterated_value, execute_timeout)


logging.info("Process end")

Output is looks like this. All values are processed in a more-or-less distributed way that can be seen by timestamps

[00:06:43,375.375] MainThread root INFO: Process start
[00:06:43,375.375] ThreadPoolExecutor-0_0 root INFO: Processed value 0 with delay 0.0
[00:06:43,504.504] ThreadPoolExecutor-0_0 root INFO: Processed value 1 with delay 0.128
[00:06:43,620.620] ThreadPoolExecutor-0_1 root INFO: Processed value 2 with delay 0.244
[00:06:43,794.794] ThreadPoolExecutor-0_2 root INFO: Processed value 3 with delay 0.417
[00:06:43,928.928] ThreadPoolExecutor-0_3 root INFO: Processed value 4 with delay 0.5509999999999999
[00:06:44,053.053] ThreadPoolExecutor-0_4 root INFO: Processed value 5 with delay 0.6759999999999999
[00:06:44,334.334] ThreadPoolExecutor-0_0 root INFO: Processed value 6 with delay 0.828
[00:06:44,334.334] ThreadPoolExecutor-0_0 root INFO: Processed value 11 with delay 0.0
[00:06:44,475.475] ThreadPoolExecutor-0_0 root INFO: Processed value 12 with delay 0.14
[00:06:44,590.590] ThreadPoolExecutor-0_1 root INFO: Processed value 7 with delay 0.968
[00:06:44,689.689] ThreadPoolExecutor-0_0 root INFO: Processed value 13 with delay 0.21400000000000002
[00:06:44,769.769] ThreadPoolExecutor-0_2 root INFO: Processed value 8 with delay 0.973
[00:06:44,914.914] ThreadPoolExecutor-0_3 root INFO: Processed value 9 with delay 0.984
[00:06:44,941.941] ThreadPoolExecutor-0_1 root INFO: Processed value 14 with delay 0.35100000000000003
[00:06:45,185.185] ThreadPoolExecutor-0_0 root INFO: Processed value 15 with delay 0.494
[00:06:45,185.185] ThreadPoolExecutor-0_4 root INFO: Processed value 10 with delay 1.13
[00:06:45,413.413] ThreadPoolExecutor-0_2 root INFO: Processed value 16 with delay 0.643
[00:06:45,641.641] ThreadPoolExecutor-0_3 root INFO: Processed value 17 with delay 0.726
[00:06:45,686.686] ThreadPoolExecutor-0_1 root INFO: Processed value 18 with delay 0.743
[00:06:45,686.686] ThreadPoolExecutor-0_1 root INFO: Processed value 23 with delay 0.0
[00:06:45,740.740] ThreadPoolExecutor-0_1 root INFO: Processed value 24 with delay 0.053
[00:06:45,810.810] ThreadPoolExecutor-0_1 root INFO: Processed value 25 with delay 0.07
[00:06:45,886.886] ThreadPoolExecutor-0_1 root INFO: Processed value 26 with delay 0.07500000000000001
[00:06:45,952.952] ThreadPoolExecutor-0_0 root INFO: Processed value 19 with delay 0.766
[00:06:46,021.021] ThreadPoolExecutor-0_1 root INFO: Processed value 27 with delay 0.134
[00:06:46,071.071] ThreadPoolExecutor-0_4 root INFO: Processed value 20 with delay 0.885
[00:06:46,142.142] ThreadPoolExecutor-0_0 root INFO: Processed value 28 with delay 0.19
[00:06:46,354.354] ThreadPoolExecutor-0_1 root INFO: Processed value 29 with delay 0.33299999999999996
[00:06:46,376.376] ThreadPoolExecutor-0_2 root INFO: Processed value 21 with delay 0.962
[00:06:46,721.721] ThreadPoolExecutor-0_3 root INFO: Processed value 22 with delay 1.078
[00:06:46,721.721] MainThread root INFO: Process end
 

For sure, all values need to be adjusted after, but the idea is there.

The only thing that bothers, I'm not 100% sure how generator value num would behave in a multithread environment due to Python GIL. But here I'm asking for someone who has a Python knowledge to comment.

Thursday, April 18, 2019

Simple Albert laucher plugin for managing VPN's

For some reasons now switched to Linux laptop from Macbook. And as an amazing replacement for Alfred launcher found Albert.
It also have a powerful plugin system and simple workflows could be created really fast.
One of workflows I used to in Alfred is switch VPN's on/off directly from Alfred's interface. As it's all opensource, answer is quite simple - if you need something, write it.
So, as a small task during sickness wrote a plugin for toggling VPN's state, that are managed through NetworkManager.


Looks like this:


Code is simple and published here

Wednesday, May 6, 2015

Сервис определения страны по IP

Итак, возникла необходимость вычислять, так сказать по IP. А точнее - определять страну для автоматической подстановки телефонного кода. Как оказалось, за эту услугу хотят брать денюжки, например тут. Но хочется то халявного. Ну и по общим правилам опенсорса, если что-то надо - напиши сам. Тем более, база, пусть и не такая точная есть у того же MaxMind
Есть также и такая штука, но я что-то не очень уверен в ее скорости. Но можно попробовать.
В любом случае, родился такой сервис.
Абсолютно бесплатно, поддерживает https.
Используется связка Nginx + uwsgi + flask + postgresql. Почему так? Потому что хотелось попробовать, что есть фласк и постгре. Написано крайне коряво, потому что только учусь. Сейчас получается каждый запрос - это отдельное подключение к базе. Т.к. это все-таки сервер есть мнение переписать это под вариант одно подключение - много запросов и подключение держать постоянно. Но как это сделать я пока не знаю, посмотрим на нагрузку.
Ну и по традиции - код. Да, я знаю, он ужасен, в связи с чем попрошу в комментах наставить на путь истинный.

--
upd:

  • Постарался сделать так, чтобы соединение к базе было одно, а не дергать каждый раз.
  • Добавил метод автоопределения места запроса https://geoip.webcall.today/getcountry?ip=auto



from flask import Flask,request,jsonify
import socket
import psycopg2

def ip_address_is_valid(address):
    try: socket.inet_aton(address)
    except socket.error: return False
    else: return address.count('.') == 3

application = Flask(__name__)
con  = psycopg2.connect(database='geoip', user='geoip', host='localhost', password = 'geoipuser')

@application.route('/')
def hello_world():
    return "<b>Geoserver welcomes you. Usage = https://geoip.webcall.today/getcountry?ip=X.X.X.X</b><br> This product includes GeoLite2 data created by MaxMind, available from<br> <a href=\"http://www.maxmind.com\">http://www.maxmind.com</a>."

@application.route('/getcountry')
def getcountry():
    global con
    ip = request.args.get('ip', '')
    if ip == 'auto':
        ip = '%s' % request.remote_addr
    if ip_address_is_valid(ip):
        try:
            cur = con.cursor()
            cur.execute("SELECT code,location,fullname FROM countrydata WHERE geodata = (SELECT geodata FROM ipdata WHERE subnet >>= '%s'::inet LIMIT 1) LIMIT 1" % ip)
            result = cur.fetchone()
            if result:
                result = jsonify(code = result[0],
                                   location = result[1],
                                   name = result[2]), 200
            else:
                result = jsonify(code = 'none',
                                 location = 'None',
                                 name = 'None'), 404
        except psycopg2.DatabaseError, e:
            if con:
                con.rollback()
            con  = psycopg2.connect(database='geoip', user='geoip', host='localhost', password = 'geoipuser')
            result = jsonify(code = 'Error %s' % e,
                            location = 'None',
                            name = 'None'), 500
        finally:
            pass
    else:
        result = jsonify(code = 'none',
                         location = 'None',
                         name = 'None'), 404
    return result

if __name__ == '__main__':
    application.run(host='127.0.0.1')


Ну и ip_address_is_valid можно переписать с использованием регулярок. Что-то типа

--import socket
++import re

def ip_address_is_valid(address):
    regex_ip = "^(\d|[1-9]\d|1\d\d|2([0-4]\d|5[0-5]))\.(\d|[1-9]\d|1\d\d|2([0-4]\d|5[0-5]))\.(\d|[1-9]\d|1\d\d|2([0-4]\d|5[0-5]))\.(\d|[1-9]\d|1\d\d|2([0-4]\d|5[0-5]))$"
    if re.match(regex_ip, address):
        return True
    return False