Многопоточность в Python

Андрей К…
Последнее изменение:
12
0
0

Многопоточность в Питоне

Иногда случается так, что необходимо выполнять какие-то процедуры параллельно друг-другу и независимо друг от друга – например, получать курсы с разных бирж. Или проверять состояние заказов на разных парах. Или грабить текст с чужих сайтов. Или заливать текст на чужие сайты. Или распознавать капчи.. Да мало ли чего!

В этой статье попрактикуемся в написании таких скриптов.

Без многопоточности

Давайте начнем с бесполезной задачи – будем получать книгу ордеров разных валютных пар на одной бирже. Для демонстрации возьмем Exmo. И для начала давайте без многопоточности – сделаем всё последовательно.

Нам понадобится функция, которая будет делать всю работу, и которой мы будем передавать нужные пары. Вот так будет выглядеть нулевая версия скрипта:

import time
import requests

def get_rates(pair):
    local_start_time = time.time()
    try:
        requests.get("https://api.exmo.com/v1/order_book/?pair={pair}&limit=1000".format(pair=pair))
    except Exception as e:
        print(e)
    print("Пара {pair}, время работы функции: {t:0.4f}".format(pair=pair, t=time.time()-local_start_time))

get_rates('BTC_LTC')

Запустим его и узнаем, что получение одной пары занимает примерно половину секунды (на самом деле, когда как)

Давайте добавим несколько пар, будем прогонять их в цикле, и заодно замерим общее время работы скрипта:

 

import time
import requests

pairs = ['BTC_LTC', 'BTC_ETH', 'BTC_USD', 'BTC_EUR', 'BTC_PLN', 'BCH_BTC', 'EOS_BTC', 'EOS_USD', 'BCH_RUB', 'BCH_ETH']

def get_rates(pair):
    local_start_time = time.time()
    try:
        requests.get("https://api.exmo.com/v1/order_book/?pair={pair}&limit=1000".format(pair=pair))
    except Exception as e:
        print(e)
    print("Пара {pair}, время работы функции: {t:0.4f}".format(pair=pair, t=time.time()-local_start_time))

global_start_time = time.time()

for pair in pairs:
    get_rates(pair)

print('Общее время работы {s:0.4f}'.format(s=time.time()-global_start_time))

На получение 10 пар ушло 3.5 секунды, данные первой пары соответственно устарели на три секунды.


Реклама


Многопоточность

В питоне из коробки идет модуль threading. Именно он отвечает за (условно) параллельное исполнение кода. Почему условное – расскажу ниже.

Для того, что бы создать поток, нужно два действия

  1. Запланировать его
  2. Запустить

 

Подготовим скрипт и прогоним его:

import time
import requests
import threading

pairs = ['BTC_LTC', 'BTC_ETH', 'BTC_USD', 'BTC_EUR', 'BTC_PLN', 'BCH_BTC', 'EOS_BTC', 'EOS_USD', 'BCH_RUB', 'BCH_ETH']

def get_rates(pair):
    local_start_time = time.time()
    try:
        requests.get("https://api.exmo.com/v1/order_book/?pair={pair}&limit=1000".format(pair=pair))
    except Exception as e:
        print(e)
    print("Пара {pair}, время работы функции: {t:0.4f}".format(pair=pair, t=time.time()-local_start_time))

global_start_time = time.time()

threads = []
for pair in pairs:
    # Подготавливаем потоки, складываем их в массив
    threads.append(threading.Thread(target=get_rates, args=(pair,)))

# Запускаем каждый поток
for thread in threads:
    thread.start()

# Ждем завершения каждого потока
for thread in threads:
    thread.join()

                   
print('Общее время работы {s:0.4f}'.format(s=time.time()-global_start_time))

А теперь рассмотрим детально и сделаем выводы.

Во-первых, можно увидеть, что пары обрабатывались не в том порядке, что мы вызывали.

Во-вторых, общее время работы уменьшилось (а у отдельно взятых пар увеличилось). Так же самое устаревшее время составило 2.47 секунды. В целом результат позитивный, а теперь заглянем под капот.

Когда вы передаете что-либо модулю threading, это равнозначно тому, что вы вкладываете что-то в руку Шиве с заглавной картинки поста. В данном случае в блоке кода

threads = []
for pair in pairs:
    # Подготавливаем потоки, складываем их в массив
    threads.append(threading.Thread(target=get_rates, args=(pair,)))

Я создал, грубо говоря отложенное задание threading.Thread(target=get_rates, args=(pair,)), и поместил его в массив threads. По аналогии, я соорудил гарпун и дал его Шиве, а тот схватил его в свободную руку.

В этом блоке кода

# Запускаем каждый поток
for thread in threads:
    thread.start()

я запустил каждое задание – сказал Шиве запулить каждый гарпун в сторону каждой акулы. Шива запустил и уже начал тянуть обратно.

Я мог бы на этом закончить скрипт, тогда все запущенные потоки продолжали бы работать в фоне, но я хочу дождаться окончания каждого потока и посмотреть, что там получилось. Поэтому я добавляю третий блок кода:

# Ждем завершения каждого потока
for thread in threads:
    thread.join()

Я как бы заявил – буду стоять тут и ждать, пока ты все гарпуны не вытащишь.

В итоге каждая команда запустилась независимо от других, и они начали соперничать за сетевую карту, процессор, оперативную память и т.п. – и порядок выполнения начали определять операционная система, процессор и GIL – такая глобальная штука в питоне, которая разруливает запущенные процессы.

После этого блока кода ничего не выполнится до тех пор, пока каждый поток не будет окончательно завершен. Основной процесс будет просто ждать, регулярно проверяя состояние дочерних потоков.

Вы можете использовать эти блоки как шаблон, меняться будет только название функции и передаваемые параметры. Внимание – если передаваемый параметр один, после него все равно должна стоять запятая, как в примере.


Реклама


Получаем курсы с разных бирж

Давайте для закрепления сделаем что-то более-менее полезное – например, будем сравнивать курсы одной и той же пары на разных биржах. Для универсальности возьмем ETH_BTC – такая пара есть везде :)

Т.к. на каждой бирже своё API, то под каждую биржу создадим свою функцию. Так же нам понадобится глобальный объект (словарь, в данном случае), куда каждый поток будет складывать полученные данные.

Так же меняется принцип работы – в каждой функции свой бесконечный цикл, таким образом система получается следующая:

  1. Основной поток создает три дочерних потока, каждый из которых бесконечно получает последнюю цену со своей биржи, и складывает в глобальный словарь.
  2. Так же создается отдельный поток, который бесконечно выводит текущие содержимое глобального словаря.

Вот такой примерно результат работы:

А вот, собственно, код:

import time
import requests
import threading

c1 = 'ETH'
c2 = 'BTC'

# Глобальный словарь, куда каждый поток складывает полученную информацию
stock_rates = {'exmo':0, 'binance':0, 'bittrex': 0}

# Получить последнюю цену с Эксмо
def get_exmo_rates(pair):
    while True:
        try:
            stock_rates['exmo'] = requests.get("https://api.exmo.com/v1/ticker/".format(pair=pair)).json()[pair]['last_trade']
        except Exception as e:
            print(e)
        time.sleep(0.5)
            
# Получить последнюю цену с Binance
def get_binance_rates(pair):
    while True:
        try:
            stock_rates['binance'] = requests.get("https://api.binance.com/api/v3/ticker/price?symbol={pair}".format(pair=pair)).json()['price']
        except Exception as e:
            print(e)
        time.sleep(0.5)

# Получить последнюю цену с Bittrex
def get_bittrex_rates(pair):
    while True:
        try:
            stock_rates['bittrex'] = requests.get("https://bittrex.com/api/v1.1/public/getticker?market={pair}".format(pair=pair)).json()['result']['Last']
        except Exception as e:
            print(e)
        time.sleep(0.5)        
            
def show_results():
    while True:
        print(stock_rates)
        time.sleep(1)
    

global_start_time = time.time()

threads = []

# Подготавливаем потоки, складываем их в массив
exmo_thread = threading.Thread(target=get_exmo_rates, args=(c1+'_'+c2,))
binance_thread = threading.Thread(target=get_binance_rates, args=(c1+c2,))
bittrex_thread = threading.Thread(target=get_bittrex_rates, args=(c2+'-'+c1,))
show_results_thread = threading.Thread(target=show_results)

threads.append(exmo_thread)
threads.append(binance_thread)
threads.append(bittrex_thread)
threads.append(show_results_thread)

# Запускаем каждый поток
for thread in threads:
    thread.start()

# Ждем завершения каждого потока
for thread in threads:
    thread.join()

Я немного по-другому создал потоки, что бы было нагляднее.


Реклама


Заключение

Многие задачи можно решить и без многопоточности, например, запуская код последовательно или запуская разные экземпляры скриптов – и иногда так будет даже лучше. А иногда без многопототочности сложно.

Так же не стоить рассчитывать на неё как на панацею – если намечаются серьезные вычисления, и будет сильно задействован процессор, то многопоточность может наоборот сильно замедлить выполнение.

А вот если большей частью проходят операции ввода и вывода (запросы по сети, работа с оперативной памятью, чтение/запись с диска) то процессы могут очень хорошо организоваться и прирост производительности будет серьезный.

В общем, это еще одна фишка, которая может в какой-то момент очень круто пригодиться, об этом стоит знать, я считаю :)

Комментарии: (12)
24.05.2018 04:35
Андрей все хорошо, но как остановить программу и вывод на экран скажем по кнопке Esc?!
30.06.2018 07:33
Как результат можно было бы скажем записывать в файл excel, txt или в sqlite3 базу для последующих торговых анализов?
30.06.2018 07:48
Да, лучше в sqlite3, она может работать в многопоточном режиме, что бы писать в файл придется повозиться
Где то вверху скрипта напишите

import sqlite3
conn = sqlite3.connect(---путь к файлу бд---, check_same_thread=False)
cursor = conn.cursor()
import threading
lock = threading.Lock()

таблицу  можно создать руками, можно програмно в этом же скрипте

table_cr = """ 
CREATE TABLE IF NOT EXISTS
            rates (
                pair TEXT,
                rate REAL,
                received DEFAULT (datetime('now','localtime'))
            );
""" 
cursor.executescript(table_cr)

в данном случае создастся таблица, если такой не было, где можно накапливать курс - модифицируйте под себя, какие поля собираетесь накапливать

Ну и каждый раз при получении данных нужно туда вставлять значения, навроде такого
        try:
            lock.acquire(True)
            cursor.execute("""
                INSERT INTO rates (pair, rate) 
                VALUES ('{pair}', {rate})
            """.format(
                   pair='полученная пара',
                   rate = 'полученный курс'
            ))
            conn.commit()
        finally:
            lock.release()
05.07.2018 21:16
вставить этот try finally в каждый get_x_rates?
02.09.2018 21:34
что-то не желает работать, вставил код
02.09.2018 22:26
rate = 'полученный курс' ни как не пойму что вписать в поле "полученный курс" чтоб в базу записывал курс?
02.09.2018 17:21
Ну вроде того, еще надо добавить тогда биржу
30.06.2018 08:03
Теперь бы понять ка бы прикрутить это к скрипту выше?
03.09.2018 17:40
Вот пример того, как писать в базу

import time
import requests
import threading
import sqlite3
import threading

lock = threading.Lock()


c1 = 'ETH'
c2 = 'BTC'

"""
Создаем базу, подключаемся к ней и создаем таблицы
"""

conn = sqlite3.connect('bitbinexi.bd', check_same_thread=False)
table_cr = """ 
CREATE TABLE IF NOT EXISTS
            rates (
                pair TEXT,
                rate REAL,
                stock TEXT,
                received DEFAULT (datetime('now','localtime'))
            );
"""
cursor = conn.cursor()
cursor.executescript(table_cr)


# Глобальный словарь, куда каждый поток складывает полученную информацию
stock_rates = {'bittrex': 0}

# Получить последнюю цену с Bittrex
def get_bittrex_rates(pair):
    while True:
        try:
            bittrex_rate = requests.get("https://bittrex.com/api/v1.1/public/getticker?market={pair}".format(pair=pair)).json()['result']['Last']
            # В bittrex_rate сейчас лежит последний курс
            stock_rates['bittrex'] = bittrex_rate
            
            try:
               lock.acquire(True)
               cursor.execute("""
                    INSERT INTO rates (pair, stock, rate) 
                    VALUES ('{pair}', '{stock}', '{rate}')
                  """.format(
                    pair=pair,
                    stock='Bittrex',
                    rate = float(bittrex_rate) 
                ))
               conn.commit()
            finally:
                lock.release()
                
        except Exception as e:
            print(e)
        time.sleep(0.5)        
            
def show_results():
    while True:
        print(stock_rates)
        time.sleep(1)
        
global_start_time = time.time()

threads = []

# Подготавливаем потоки, складываем их в массив
bittrex_thread = threading.Thread(target=get_bittrex_rates, args=(c2+'-'+c1,))
show_results_thread = threading.Thread(target=show_results)

threads.append(bittrex_thread)
threads.append(show_results_thread)

# Запускаем каждый поток
for thread in threads:
    thread.start()

# Ждем завершения каждого потока
for thread in threads:
    thread.join()
02.09.2018 06:33
Доброго времени суток! Как сделать чтоб записывал в таблицу новые значения по изменению rate = 'полученный курс', а не по мере поступления с запроса, чтоб исключить одинаковые данные с предыдущим запросом в записи?
02.09.2018 17:26
Разные есть подходы
Можно создать UNIQUE индекс по связке биржа-пара-курс, ну и еще какой-то параметр времени
Можно перед вставкой удалять записи с текущим курсом, неважно есть они или нет
Можно держать в памяти скрипта последние вставленные данные и сравнивать с ними перед вставкой
В случае binance можно вообще поменять скрипт на веб-сокеты, тогда в принципе будут прилетать только измененные цены, подробнее тут https://bablofil.ru/binance-webscokets/, пример с рабочим кодом (но другими задачами) тут https://bablofil.ru/inner-arbitrage/
05.09.2018 21:46
Здравствуйте. Подскажите как сделать для Binance два не зависимых потока: user_socket и ticker_socket (речь идет о websockets) чтобы они друг другу не мешали. Заранее спасибо.
Пожалуйста, Авторизуйтесь что бы оставить свой комментарий