1. 비동기 요청의 필요성

업비트 거래소에서 원화로 거래할 수 있는 코인을 대상으로 데이터를 받아서

각종 지표를 추가하고, 머신러닝을 진행한 후 예측 데이터들 중 적절하다고 판단되는 것들을 선별하여 투자를 진행한다.

이 계산과정이 약 26 ~ 30초 가량 소요되는데, 실제 환경이라면 이 정도 시간은 기다릴 수 있었다.

하지만, 모델 별 성과 측정을 위한 테스트 환경이라면 여러 일을 계산해야 하기 때문에 상당한 시간이 소요된다.

 

비동기 요청이라고 해서 요청 값을 더 빨리 받을 수 있는건 아니다.

비동기 요청의 강점은 요청 건수가 다수일 때, 앞의 요청의 응답을 기다리지 않고 그 다음 요청을 진행하며

앞의 요청에 대한 응답이 돌아오면, 이후 작업을 진행하는 방식이다.

 

2. 비동기 요청 예시

동기적 요청 : 요청 1 -> 요청 1 응답 대기 ->  요청 1 응답 처리 -> 요청 2 -> 요청 2 응답 대기 -> 요청 2 응답 처리

비동기 요청: 요청 1 -> 요청 2 -> (요청 1 응답 시) 처리 -> (요청 2 응답 시 ) 처리

 

3. 기존  코드 - 동기적 요청

def check_bull_market(target_date, invest_number):
    from_date = target_date - datetime.timedelta(days=180)

    ticker_list = pyupbit.get_tickers("KRW")
    bull_market = []
    for ticker in ticker_list:
    	ma_flow_info = pyupbit.get_ohlcv_from(ticker=ticker, fromDatetime=start, to=end)

위 코드는 업비트 거래소에서 원화로 거래 가능한 모든 코인의 과거 180일 데이터를 가져오는 코드이다.

해당 요청에 대한 응답 (ma_flow_info)를 출력해보면, 간혹 None라는 데이터가 보이는데

이는 한 번에 들어오는 요청이 과다해서 None으로 반환된 것이다.

이 경우, 모든 코인에 대한 정보를 알 수 없으므로 모의 투자를 하거나 실제 투자를 할 때 전체 데이터를 기준으로 측정할 수 없다.

 

4. 기존 코드 수정 - 동기적 요청

@retry(stop_max_attempt_number=5, wait_random_min=1000, wait_random_max=2000)
def call_ticker_chart(ticker, start, end):
    chart = pyupbit.get_ohlcv_from(ticker=ticker, fromDatetime=start, to=end)
    if chart is None:
        raise

    return chart

def safe_call_ticker_chart(ticker, start, end):
    try:
        return call_ticker_chart(ticker, start, end)
    
    except:
        return None
    
def check_bull_market(target_date, invest_number): # 16 seconds
    from_date = target_date - datetime.timedelta(days=180)

    ticker_list = pyupbit.get_tickers("KRW")
    bull_market = []
    for ticker in ticker_list:
        if re.match(KRW_CHECKER, ticker):
            ma_flow_info = safe_call_ticker_chart(ticker, from_date, target_date)

동기적 요청은 유지한 채 코드를 일부 수정했다.

safe_call_ticker_chart 함수로 call_ticker_chart 를 호출한다. 

call_ticker_chart를 통해 코인의 데이터를 불러오되, None 값이 반환되는 경우

오류를 발생시키고, 1 ~ 2 초 간 랜덤으로 대기한 후 재요청을 최대 5회 진행한다.

5회째 오류가 발생하는 경우라면 None 값이 반환되지만, 아직 까지 5회를 넘어서 None 값이 반환된 경험은 없었다.

하지만 이 방법은 위에서 설명한 것 처럼 데이터 요청에만 26 ~ 32초 가량 소요되고,

모의투자 180일을 진행하면 몇 시간이 걸리는 문제가 발생한다.

이를 해결하기 위해 요청 방식을 비동기로 수정했다.

 

5. 비동기 요청 - 초안

pip install aiohttp

 

 

import aiohttp, asyncio

async def fetch_prices(tickers, end_date):
    async with aiohttp.ClientSession() as session:
        tasks = []
        dictionary_info = {}

        for ticker in tickers:
            tasks.append(fetch_price(session, ticker, end_date))

        prices = await asyncio.gather(*tasks)
        for price in prices:
            dictionary_info[price[0]] = price[1]

        return dictionary_info

 

비동기 요청을 위해 함수 앞에 async를 붙여준다.

request를 비동기적으로 처리하기 위해 aiohttp.ClientSession() 라인처럼 선언하고

tasks 함수에 비동기적으로 진행할 데이터를 모아준다.

작성 글 기준으로 여러 코인에 대한 요청을 진행할 것이므로, 리스트를 순회하며 fetch_price 함수 (아래 나옴)에

session, 코인이름, 조회 일자 (~ 까지 데이터)를 추가하고,  asyncio.gather(*tasks) 로 요청을 비동기로 처리하고

prcies 변수에 담는다. 응답된 데이터는 리스트 형식으로 반환된다.

 

6. 비동기 요청 - 요청 코드

async def fetch_price(session, ticker, date):
    url = f"https://api.upbit.com/v1/candles/days"
    params = {
        'market': ticker,
        'to': date.strftime("%Y-%m-%dT%H:%M:%S"),
        'count': 200
    }

    async with session.get(url, params=params) as response:
        data = await response.json()

        if response.status == 429:
            await asyncio.sleep(1)
            return await fetch_price(session, ticker, date)
        
        # print(data)
        data_frame = data_format(data)
        return [ticker, data_frame]

async.gather 를 통해 실행되는 fetch_price 함수는 요청 url에 요청 내용이 담긴 params 를 담아

비동기적으로 요청한다. upbit에 비동기적으로 요청한다고 해서 응답을 더 빨리 해주거나 하는건 아니기 때문에

예외 처리가 없다면 429 too_manay_request가 반환된다. 때문에, 이 경우에는 1초 대기한 후 재요청을 보내도록 처리를 한다.

요청에 대한 응답이 온다면, 이 데이터 형식은 리스트에 담긴 json 형식이다.

 

이후 처리를 위해 data_format 함수로 응답 데이터를 DataFrame 형식으로 변환하고

코인 이름, DataFrame 데이터의 리스트 형식으로 반환한다.

 

어떤 코인은 에러를 통해 대기 후 다시 받은 것일 것이고, 어떤 코인은 에러 없이 받았을 수 있기 때문에

요청 순서대로 반환했다고 볼 수 없어서 코인 이름을 함께 명시했다.

 

7. 요청 데이터 DataFrame으로 변환

def data_format(response_data):
    ticker_data = {"date": [], "open": [], "close": [], "low": [], "high": [], "volume": []}

    for data in response_data[::-1]:
        ticker_data["date"] += [data["candle_date_time_kst"]]
        ticker_data["open"] += [data["opening_price"]]
        ticker_data["close"] += [data["trade_price"]]
        ticker_data["low"] += [data["low_price"]]
        ticker_data["high"] += [data["high_price"]]
        ticker_data["volume"] += [data["candle_acc_trade_volume"]]

    df = pandas.DataFrame(ticker_data)
    df['date'] = pandas.to_datetime(df['date'])
    df.set_index('date', inplace=True)
    return df

리스트를 순회하며 데이터를 추가한다. 데이터는 가장 앞 데이터가 최신 데이터이므로 

뒤에서부터 순회하며 추가한다.

 

8. 완성 코드

import aiohttp, asyncio
import pandas, pyupbit
import datetime

def data_format(response_data):
    ticker_data = {"date": [], "open": [], "close": [], "low": [], "high": [], "volume": []}

    for data in response_data[::-1]:
        ticker_data["date"] += [data["candle_date_time_kst"]]
        ticker_data["open"] += [data["opening_price"]]
        ticker_data["close"] += [data["trade_price"]]
        ticker_data["low"] += [data["low_price"]]
        ticker_data["high"] += [data["high_price"]]
        ticker_data["volume"] += [data["candle_acc_trade_volume"]]

    df = pandas.DataFrame(ticker_data)
    df['date'] = pandas.to_datetime(df['date'])
    df.set_index('date', inplace=True)
    return df


async def fetch_price(session, ticker, date):
    url = f"https://api.upbit.com/v1/candles/days"
    params = {
        'market': ticker,
        'to': date.strftime("%Y-%m-%dT%H:%M:%S"),
        'count': 200
    }

    async with session.get(url, params=params) as response:
        data = await response.json()

        if response.status == 429:
            await asyncio.sleep(1)
            return await fetch_price(session, ticker, date)
        
        # print(data)
        data_frame = data_format(data)
        return [ticker, data_frame]
    

async def fetch_prices(tickers, end_date):
    async with aiohttp.ClientSession() as session:
        tasks = []
        dictionary_info = {}

        for ticker in tickers:
            tasks.append(fetch_price(session, ticker, end_date))

        prices = await asyncio.gather(*tasks)
        for price in prices:
            dictionary_info[price[0]] = price[1]

        return dictionary_info

def get_ticekr_info(target_date):
    tickers = pyupbit.get_tickers("KRW")
    tickers_info = asyncio.run(fetch_prices(tickers, target_date))
    return tickers_info

if __name__ == "__main__":
    start = datetime.datetime.now()
    tickers_info = get_ticekr_info(datetime.datetime(2024, 6, 19))
    for ticker in tickers_info:
        print(ticker, tickers_info[ticker].iloc[-1])
    end = datetime.datetime.now()

    print(f"running time: {end - start}")

응답 시간을 측정한 결과, 10초 내외로 소요되는 점을 확인했다.

기존 대비 최대 1/3 수준으로 줄인 수준이고, 모의 테스트를 좀 더 빠르게 진행할 수 있게 되었다.