1. 비동기 요청의 필요성
업비트 거래소에서 원화로 거래할 수 있는 코인을 대상으로 데이터를 받아서
각종 지표를 추가하고, 머신러닝을 진행한 후 예측 데이터들 중 적절하다고 판단되는 것들을 선별하여 투자를 진행한다.
이 계산과정이 약 26 ~ 30초 가량 소요되는데, 실제 환경이라면 이 정도 시간은 기다릴 수 있었다.
하지만, 모델 별 성과 측정을 위한 테스트 환경이라면 여러 일을 계산해야 하기 때문에 상당한 시간이 소요된다.
비동기 요청이라고 해서 요청 값을 더 빨리 받을 수 있는건 아니다.
비동기 요청의 강점은 요청 건수가 다수일 때, 앞의 요청의 응답을 기다리지 않고 그 다음 요청을 진행하며
앞의 요청에 대한 응답이 돌아오면, 이후 작업을 진행하는 방식이다.
2. 비동기 요청 예시
동기적 요청 : 요청 1 -> 요청 1 응답 대기 -> 요청 1 응답 처리 -> 요청 2 -> 요청 2 응답 대기 -> 요청 2 응답 처리
비동기 요청: 요청 1 -> 요청 2 -> (요청 1 응답 시) 처리 -> (요청 2 응답 시 ) 처리
3. 기존 코드 - 동기적 요청
def check_bull_market(target_date, invest_number):
from_date = target_date - datetime.timedelta(days=180)
ticker_list = pyupbit.get_tickers("KRW")
bull_market = []
for ticker in ticker_list:
ma_flow_info = pyupbit.get_ohlcv_from(ticker=ticker, fromDatetime=start, to=end)
위 코드는 업비트 거래소에서 원화로 거래 가능한 모든 코인의 과거 180일 데이터를 가져오는 코드이다.
해당 요청에 대한 응답 (ma_flow_info)를 출력해보면, 간혹 None라는 데이터가 보이는데
이는 한 번에 들어오는 요청이 과다해서 None으로 반환된 것이다.
이 경우, 모든 코인에 대한 정보를 알 수 없으므로 모의 투자를 하거나 실제 투자를 할 때 전체 데이터를 기준으로 측정할 수 없다.
4. 기존 코드 수정 - 동기적 요청
@retry(stop_max_attempt_number=5, wait_random_min=1000, wait_random_max=2000)
def call_ticker_chart(ticker, start, end):
chart = pyupbit.get_ohlcv_from(ticker=ticker, fromDatetime=start, to=end)
if chart is None:
raise
return chart
def safe_call_ticker_chart(ticker, start, end):
try:
return call_ticker_chart(ticker, start, end)
except:
return None
def check_bull_market(target_date, invest_number): # 16 seconds
from_date = target_date - datetime.timedelta(days=180)
ticker_list = pyupbit.get_tickers("KRW")
bull_market = []
for ticker in ticker_list:
if re.match(KRW_CHECKER, ticker):
ma_flow_info = safe_call_ticker_chart(ticker, from_date, target_date)
동기적 요청은 유지한 채 코드를 일부 수정했다.
safe_call_ticker_chart 함수로 call_ticker_chart 를 호출한다.
call_ticker_chart를 통해 코인의 데이터를 불러오되, None 값이 반환되는 경우
오류를 발생시키고, 1 ~ 2 초 간 랜덤으로 대기한 후 재요청을 최대 5회 진행한다.
5회째 오류가 발생하는 경우라면 None 값이 반환되지만, 아직 까지 5회를 넘어서 None 값이 반환된 경험은 없었다.
하지만 이 방법은 위에서 설명한 것 처럼 데이터 요청에만 26 ~ 32초 가량 소요되고,
모의투자 180일을 진행하면 몇 시간이 걸리는 문제가 발생한다.
이를 해결하기 위해 요청 방식을 비동기로 수정했다.
5. 비동기 요청 - 초안
pip install aiohttp
import aiohttp, asyncio
async def fetch_prices(tickers, end_date):
async with aiohttp.ClientSession() as session:
tasks = []
dictionary_info = {}
for ticker in tickers:
tasks.append(fetch_price(session, ticker, end_date))
prices = await asyncio.gather(*tasks)
for price in prices:
dictionary_info[price[0]] = price[1]
return dictionary_info
비동기 요청을 위해 함수 앞에 async를 붙여준다.
request를 비동기적으로 처리하기 위해 aiohttp.ClientSession() 라인처럼 선언하고
tasks 함수에 비동기적으로 진행할 데이터를 모아준다.
작성 글 기준으로 여러 코인에 대한 요청을 진행할 것이므로, 리스트를 순회하며 fetch_price 함수 (아래 나옴)에
session, 코인이름, 조회 일자 (~ 까지 데이터)를 추가하고, asyncio.gather(*tasks) 로 요청을 비동기로 처리하고
prcies 변수에 담는다. 응답된 데이터는 리스트 형식으로 반환된다.
6. 비동기 요청 - 요청 코드
async def fetch_price(session, ticker, date):
url = f"https://api.upbit.com/v1/candles/days"
params = {
'market': ticker,
'to': date.strftime("%Y-%m-%dT%H:%M:%S"),
'count': 200
}
async with session.get(url, params=params) as response:
data = await response.json()
if response.status == 429:
await asyncio.sleep(1)
return await fetch_price(session, ticker, date)
# print(data)
data_frame = data_format(data)
return [ticker, data_frame]
async.gather 를 통해 실행되는 fetch_price 함수는 요청 url에 요청 내용이 담긴 params 를 담아
비동기적으로 요청한다. upbit에 비동기적으로 요청한다고 해서 응답을 더 빨리 해주거나 하는건 아니기 때문에
예외 처리가 없다면 429 too_manay_request가 반환된다. 때문에, 이 경우에는 1초 대기한 후 재요청을 보내도록 처리를 한다.
요청에 대한 응답이 온다면, 이 데이터 형식은 리스트에 담긴 json 형식이다.
이후 처리를 위해 data_format 함수로 응답 데이터를 DataFrame 형식으로 변환하고
코인 이름, DataFrame 데이터의 리스트 형식으로 반환한다.
어떤 코인은 에러를 통해 대기 후 다시 받은 것일 것이고, 어떤 코인은 에러 없이 받았을 수 있기 때문에
요청 순서대로 반환했다고 볼 수 없어서 코인 이름을 함께 명시했다.
7. 요청 데이터 DataFrame으로 변환
def data_format(response_data):
ticker_data = {"date": [], "open": [], "close": [], "low": [], "high": [], "volume": []}
for data in response_data[::-1]:
ticker_data["date"] += [data["candle_date_time_kst"]]
ticker_data["open"] += [data["opening_price"]]
ticker_data["close"] += [data["trade_price"]]
ticker_data["low"] += [data["low_price"]]
ticker_data["high"] += [data["high_price"]]
ticker_data["volume"] += [data["candle_acc_trade_volume"]]
df = pandas.DataFrame(ticker_data)
df['date'] = pandas.to_datetime(df['date'])
df.set_index('date', inplace=True)
return df
리스트를 순회하며 데이터를 추가한다. 데이터는 가장 앞 데이터가 최신 데이터이므로
뒤에서부터 순회하며 추가한다.
8. 완성 코드
import aiohttp, asyncio
import pandas, pyupbit
import datetime
def data_format(response_data):
ticker_data = {"date": [], "open": [], "close": [], "low": [], "high": [], "volume": []}
for data in response_data[::-1]:
ticker_data["date"] += [data["candle_date_time_kst"]]
ticker_data["open"] += [data["opening_price"]]
ticker_data["close"] += [data["trade_price"]]
ticker_data["low"] += [data["low_price"]]
ticker_data["high"] += [data["high_price"]]
ticker_data["volume"] += [data["candle_acc_trade_volume"]]
df = pandas.DataFrame(ticker_data)
df['date'] = pandas.to_datetime(df['date'])
df.set_index('date', inplace=True)
return df
async def fetch_price(session, ticker, date):
url = f"https://api.upbit.com/v1/candles/days"
params = {
'market': ticker,
'to': date.strftime("%Y-%m-%dT%H:%M:%S"),
'count': 200
}
async with session.get(url, params=params) as response:
data = await response.json()
if response.status == 429:
await asyncio.sleep(1)
return await fetch_price(session, ticker, date)
# print(data)
data_frame = data_format(data)
return [ticker, data_frame]
async def fetch_prices(tickers, end_date):
async with aiohttp.ClientSession() as session:
tasks = []
dictionary_info = {}
for ticker in tickers:
tasks.append(fetch_price(session, ticker, end_date))
prices = await asyncio.gather(*tasks)
for price in prices:
dictionary_info[price[0]] = price[1]
return dictionary_info
def get_ticekr_info(target_date):
tickers = pyupbit.get_tickers("KRW")
tickers_info = asyncio.run(fetch_prices(tickers, target_date))
return tickers_info
if __name__ == "__main__":
start = datetime.datetime.now()
tickers_info = get_ticekr_info(datetime.datetime(2024, 6, 19))
for ticker in tickers_info:
print(ticker, tickers_info[ticker].iloc[-1])
end = datetime.datetime.now()
print(f"running time: {end - start}")
응답 시간을 측정한 결과, 10초 내외로 소요되는 점을 확인했다.
기존 대비 최대 1/3 수준으로 줄인 수준이고, 모의 테스트를 좀 더 빠르게 진행할 수 있게 되었다.
'Backend > Python' 카테고리의 다른 글
[Python] Supabase 연결 오류 (0) | 2024.12.31 |
---|---|
[Python] 날짜 시간대 변경하기 (1) | 2024.12.09 |
[FastAPI] SQL Alchemy 연결 오류 해결 (0) | 2024.06.05 |
[Python] 비트코인 자동매매 - 백테스팅 (0) | 2024.05.15 |
[Python] 비트코인 자동매매 - 종목 선정 (0) | 2024.05.09 |