로그파일에 저장된 거래기록 정리하기

civimanwp

Updated on:

그리드매매로 자동매매를 돌리면 매분마다 거래기록이 로그파일로 저장된다.
하루에 하나의 파일이 생성되며, 그 파일에는 매분마다 자동매매프로그램 작동에 따른 출력이 기록된다. 현재가격, 매수예정가격, 매도예정가격, 종목별 현재 보유 주수 등 아주 많은 종류의 기록이 저장된다. 하루치 기록의 크기는 1,100-1,200kb로 로그파일로는 작지 않다. 이중에서 내가 궁금한 것은 하루에 몇번 매수하고 매도했는지다. 이것을 알아보는 코드를 챗지피티의 도움을 받아 작성했다.

프롬프트는 순차적으로 작성을 했다.
로그파일이 들어있는 디렉토리에서 파일 읽어오기.
파일 중에서 파일이름이 특정하게 시작하는 파일만 읽기.
파일이름중에서 뒷쪽 숫자 포멧을 날짜로 인식하기.
매수/매도를 의미하는 텍스트열 찾기.
찾은 텍스트열이 매수인지 매도인지 구분하기.
여러 종목의 매수/매도를 구분하여 저장하기.
특정 종목의 자료만 추출하기.
JSON파일로 저장하기 – 기록용.
html파일로 저장하기 – 포스팅용.
엑셀파일로 저장하기 – 분석용.

그 결과물은 아래와 같다.

import os
import re
import pandas as pd
import json
from datetime import datetime

def read_log_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            log_lines = file.readlines()
    except UnicodeDecodeError:
        with open(file_path, 'r', encoding='cp949') as file:
            log_lines = file.readlines()
    return log_lines

def process_log_lines(log_lines):
    # Initialize a dictionary to count buy and sell transactions per stock
    transaction_counts = {}

    # Regular expression to match the relevant lines and extract necessary parts
    pattern = re.compile(r"(\w+ \w+) : (\d+) / (\d+) => (\d+)")

    # Process each line to count buy and sell transactions per stock
    for line in log_lines:
        match = pattern.search(line)
        if match:
            stock_name = match.group(1)
            current_price = int(match.group(2))
            previous_tier = int(match.group(3))
            current_tier = int(match.group(4))

            if stock_name not in transaction_counts:
                transaction_counts[stock_name] = {'buy': 0, 'sell': 0}

            if previous_tier > current_tier:
                transaction_counts[stock_name]['sell'] += 1
            elif previous_tier < current_tier:
                transaction_counts[stock_name]['buy'] += 1

    return transaction_counts

def aggregate_transactions_by_date_with_prefix(directory, prefix="AVT_KR_", extension=".log"):
    aggregated_data = {}

    # Ensure the directory exists
    if not os.path.exists(directory):
        raise FileNotFoundError(f"The directory {directory} does not exist.")

    # Process each log file in the directory
    for file_name in os.listdir(directory):
        if file_name.startswith(prefix) and file_name.endswith(extension):
            date_str = file_name.split('_')[-1].split('.')[0]
            log_date = datetime.strptime(date_str, '%Y-%m-%d').date()
            
            file_path = os.path.join(directory, file_name)
            log_lines = read_log_file(file_path)
            transaction_counts = process_log_lines(log_lines)

            for stock_name, counts in transaction_counts.items():
                if stock_name not in aggregated_data:
                    aggregated_data[stock_name] = {}
                if log_date not in aggregated_data[stock_name]:
                    aggregated_data[stock_name][log_date] = {'buy': 0, 'sell': 0}
                
                aggregated_data[stock_name][log_date]['buy'] += counts['buy']
                aggregated_data[stock_name][log_date]['sell'] += counts['sell']

    return aggregated_data

# Specify the directory where the log files are stored
log_directory = '/log'  # Adjust the path to your directory

# Aggregate transactions by date
aggregated_transactions = aggregate_transactions_by_date_with_prefix(log_directory)

# Convert the dictionary to a DataFrame for better readability
aggregated_df = pd.DataFrame.from_dict({(i,j): aggregated_transactions[i][j] 
                                        for i in aggregated_transactions.keys() 
                                        for j in aggregated_transactions[i].keys()},
                                       orient='index')

# Reset the index to separate the stock names and dates
aggregated_df.reset_index(inplace=True)
aggregated_df.columns = ['Stock', 'Date', 'Buy', 'Sell']

# Filter the DataFrame for 'KODEX 코스닥150레버리지'
kodex_kosdaq150_df = aggregated_df[aggregated_df['Stock'] == 'KODEX 코스닥150레버리지']

# Set the 'Date' as index
kodex_kosdaq150_df.set_index('Date', inplace=True)

# Select relevant columns
kodex_kosdaq150_df = kodex_kosdaq150_df[['Buy', 'Sell']]

# # Convert the DataFrame to a dictionary
# kodex_kosdaq150_dict = kodex_kosdaq150_df.to_dict(orient='index')

# # Save the dictionary to a JSON file
# json_file_path = 'kodex_kosdaq150_buy_sell_count.json'
# with open(json_file_path, 'w', encoding='utf-8') as json_file:
#     json.dump(kodex_kosdaq150_dict, json_file, ensure_ascii=False, indent=4)

# print(f"JSON file has been saved to {json_file_path}")

# Convert the DataFrame to an HTML table
html_table = kodex_kosdaq150_df.to_html()

# Save the HTML table to a file
html_file_path = 'kodex_kosdaq150_buy_sell_count.html'
with open(html_file_path, 'w', encoding='utf-8') as html_file:
    html_file.write(html_table)

print(f"HTML file has been saved to {html_file_path}")

excel_file_path = 'kodex_kosdaq150_buy_sell_count.xlsx'
kodex_kosdaq150_df.to_excel(excel_file_path, index=True)

print(f"Excel file has been saved to {excel_file_path}")

드물게 한번의 질문으로 나온 답변이 제대로 작동하는 코드가 되었다. 나의 질문이 늘은건지, 챗지피티가 똑똑해진건지.

다른 라이브러리는 사용해본 것들이었지만, re 라이브러리는 처음이었다. 문자열 관련 라이브러리인거 같은데, 공부할 기회가 되면 포스팅하겠다. 처음에는 쉽지 않은 과정일거라 생각했는데, 한번에 성공해서 아주 기분이 좋다.