Многопоточность в Python

Многопоточность в Питоне

Иногда случается так, что необходимо выполнять какие-то процедуры параллельно друг-другу и независимо друг от друга – например, получать курсы с разных бирж. Или проверять состояние заказов на разных парах. Или грабить текст с чужих сайтов. Или заливать текст на чужие сайты. Или распознавать капчи.. Да мало ли чего!

В этой статье попрактикуемся в написании таких скриптов.

Без многопоточности

Давайте начнем с бесполезной задачи – будем получать книгу ордеров разных валютных пар на одной бирже. Для демонстрации возьмем Exmo. И для начала давайте без многопоточности – сделаем всё последовательно.

Нам понадобится функция, которая будет делать всю работу, и которой мы будем передавать нужные пары. Вот так будет выглядеть нулевая версия скрипта:

import time
import requests

def get_rates(pair):
    local_start_time = time.time()
    try:
        requests.get("https://api.exmo.com/v1/order_book/?pair={pair}&limit=1000".format(pair=pair))
    except Exception as e:
        print(e)
    print("Пара {pair}, время работы функции: {t:0.4f}".format(pair=pair, t=time.time()-local_start_time))

get_rates('BTC_LTC')

Запустим его и узнаем, что получение одной пары занимает примерно половину секунды (на самом деле, когда как)

Давайте добавим несколько пар, будем прогонять их в цикле, и заодно замерим общее время работы скрипта:

 

import time
import requests

pairs = ['BTC_LTC', 'BTC_ETH', 'BTC_USD', 'BTC_EUR', 'BTC_PLN', 'BCH_BTC', 'EOS_BTC', 'EOS_USD', 'BCH_RUB', 'BCH_ETH']

def get_rates(pair):
    local_start_time = time.time()
    try:
        requests.get("https://api.exmo.com/v1/order_book/?pair={pair}&limit=1000".format(pair=pair))
    except Exception as e:
        print(e)

Реклама:


print("Пара {pair}, время работы функции: {t:0.4f}".format(pair=pair, t=time.time()-local_start_time)) global_start_time = time.time() for pair in pairs: get_rates(pair) print('Общее время работы {s:0.4f}'.format(s=time.time()-global_start_time))

На получение 10 пар ушло 3.5 секунды, данные первой пары соответственно устарели на три секунды.

Многопоточность

В питоне из коробки идет модуль threading. Именно он отвечает за (условно) параллельное исполнение кода. Почему условное – расскажу ниже.

Для того, что бы создать поток, нужно два действия

  1. Запланировать его
  2. Запустить

 

Подготовим скрипт и прогоним его:

import time
import requests
import threading

pairs = ['BTC_LTC', 'BTC_ETH', 'BTC_USD', 'BTC_EUR', 'BTC_PLN', 'BCH_BTC', 'EOS_BTC', 'EOS_USD', 'BCH_RUB', 'BCH_ETH']

def get_rates(pair):
    local_start_time = time.time()
    try:
        requests.get("https://api.exmo.com/v1/order_book/?pair={pair}&limit=1000".format(pair=pair))
    except Exception as e:
        print(e)
    print("Пара {pair}, время работы функции: {t:0.4f}".format(pair=pair, t=time.time()-local_start_time))

global_start_time = time.time()

threads = []
for pair in pairs:
    # Подготавливаем потоки, складываем их в массив
    threads.append(threading.Thread(target=get_rates, args=(pair,)))

# Запускаем каждый поток
for thread in threads:
    thread.start()

# Ждем завершения каждого потока
for thread in threads:
    thread.join()

                   
print('Общее время работы {s:0.4f}'.format(s=time.time()-global_start_time))

А теперь рассмотрим детально и сделаем выводы.

Во-первых, можно увидеть, что пары обрабатывались не в том порядке, что мы вызывали.

Во-вторых, общее время работы уменьшилось (а у отдельно взятых пар увеличилось). Так же самое устаревшее время составило 2.47 секунды. В целом результат позитивный, а теперь заглянем под капот.

Когда вы передаете что-либо модулю threading, это равнозначно тому, что вы вкладываете что-то в руку Шиве с заглавной картинки поста. В данном случае в блоке кода

threads = []
for pair in pairs:
    # Подготавливаем потоки, складываем их в массив
    threads.append(threading.Thread(target=get_rates, args=(pair,)))

Я создал, грубо говоря отложенное задание threading.Thread(target=get_rates, args=(pair,)), и поместил его в массив threads. По аналогии, я соорудил гарпун и дал его Шиве, а тот схватил его в свободную руку.

В этом блоке кода

# Запускаем каждый поток
for thread in threads:
    thread.start()

я запустил каждое задание – сказал Шиве запулить каждый гарпун в сторону каждой акулы. Шива запустил и уже начал тянуть обратно.

Я мог бы на этом закончить скрипт, тогда все запущенные потоки продолжали бы работать в фоне, но я хочу дождаться окончания каждого потока и посмотреть, что там получилось. Поэтому я добавляю третий блок кода:

# Ждем завершения каждого потока
for thread in threads:
    thread.join()

Я как бы заявил – буду стоять тут и ждать, пока ты все гарпуны не вытащишь.

В итоге каждая команда запустилась независимо от других, и они начали соперничать за сетевую карту, процессор, оперативную память и т.п. – и порядок выполнения начали определять операционная система, процессор и GIL – такая глобальная штука в питоне, которая разруливает запущенные процессы.

После этого блока кода ничего не выполнится до тех пор, пока каждый поток не будет окончательно завершен. Основной процесс будет просто ждать, регулярно проверяя состояние дочерних потоков.

Вы можете использовать эти блоки как шаблон, меняться будет только название функции и передаваемые параметры. Внимание – если передаваемый параметр один, после него все равно должна стоять запятая, как в примере.

Получаем курсы с разных бирж

Давайте для закрепления сделаем что-то более-менее полезное – например, будем сравнивать курсы одной и той же пары на разных биржах. Для универсальности возьмем ETH_BTC – такая пара есть везде :)

Т.к. на каждой бирже своё API, то под каждую биржу создадим свою функцию. Так же нам понадобится глобальный объект (словарь, в данном случае), куда каждый поток будет складывать полученные данные.

Так же меняется принцип работы – в каждой функции свой бесконечный цикл, таким образом система получается следующая:

  1. Основной поток создает три дочерних потока, каждый из которых бесконечно получает последнюю цену со своей биржи, и складывает в глобальный словарь.
  2. Так же создается отдельный поток, который бесконечно выводит текущие содержимое глобального словаря.

Вот такой примерно результат работы:

А вот, собственно, код:

import time
import requests
import threading

c1 = 'ETH'
c2 = 'BTC'

# Глобальный словарь, куда каждый поток складывает полученную информацию
stock_rates = {'exmo':0, 'binance':0, 'bittrex': 0}

# Получить последнюю цену с Эксмо
def get_exmo_rates(pair):
    while True:
        try:
            stock_rates['exmo'] = requests.get("https://api.exmo.com/v1/ticker/".format(pair=pair)).json()[pair]['last_trade']
        except Exception as e:
            print(e)
        time.sleep(0.5)
            
# Получить последнюю цену с Binance
def get_binance_rates(pair):
    while True:
        try:
            stock_rates['binance'] = requests.get("https://api.binance.com/api/v3/ticker/price?symbol={pair}".format(pair=pair)).json()['price']
        except Exception as e:
            print(e)
        time.sleep(0.5)

# Получить последнюю цену с Bittrex
def get_bittrex_rates(pair):
    while True:
        try:
            stock_rates['bittrex'] = requests.get("https://bittrex.com/api/v1.1/public/getticker?market={pair}".format(pair=pair)).json()['result']['Last']
        except Exception as e:
            print(e)
        time.sleep(0.5)        
            
def show_results():
    while True:
        print(stock_rates)
        time.sleep(1)
    

global_start_time = time.time()

threads = []

# Подготавливаем потоки, складываем их в массив
exmo_thread = threading.Thread(target=get_exmo_rates, args=(c1+'_'+c2,))
binance_thread = threading.Thread(target=get_binance_rates, args=(c1+c2,))
bittrex_thread = threading.Thread(target=get_bittrex_rates, args=(c2+'-'+c1,))
show_results_thread = threading.Thread(target=show_results)

threads.append(exmo_thread)
threads.append(binance_thread)
threads.append(bittrex_thread)
threads.append(show_results_thread)

# Запускаем каждый поток
for thread in threads:
    thread.start()

# Ждем завершения каждого потока
for thread in threads:
    thread.join()

Я немного по-другому создал потоки, что бы было нагляднее.

Заключение

Многие задачи можно решить и без многопоточности, например, запуская код последовательно или запуская разные экземпляры скриптов – и иногда так будет даже лучше. А иногда без многопототочности сложно.

Так же не стоить рассчитывать на неё как на панацею – если намечаются серьезные вычисления, и будет сильно задействован процессор, то многопоточность может наоборот сильно замедлить выполнение.

А вот если большей частью проходят операции ввода и вывода (запросы по сети, работа с оперативной памятью, чтение/запись с диска) то процессы могут очень хорошо организоваться и прирост производительности будет серьезный.

В общем, это еще одна фишка, которая может в какой-то момент очень круто пригодиться, об этом стоит знать, я считаю :)

Тэги: