From 8fd4bead9738a88ee971e8e37d308793b9d948c9 Mon Sep 17 00:00:00 2001 From: robot Date: Wed, 6 Aug 2025 01:44:32 +0800 Subject: [PATCH 1/3] feature: cmd type 'stock'. --- GEMINI.md | 83 +++++++++++++++ config/stock_factors.ini | 19 ++++ request.txt | 94 +++++++++++++++++ requirements.txt | 4 +- src/command.py | 25 +++-- src/command_manager.py | 112 +++++++------------- src/fzf.py | 7 +- src/quickcmd.py | 2 +- src/stock_analyzer.py | 220 +++++++++++++++++++++++++++++++++++++++ 9 files changed, 481 insertions(+), 85 deletions(-) create mode 100644 GEMINI.md create mode 100644 config/stock_factors.ini create mode 100644 request.txt create mode 100644 src/stock_analyzer.py diff --git a/GEMINI.md b/GEMINI.md new file mode 100644 index 0000000..dbe908b --- /dev/null +++ b/GEMINI.md @@ -0,0 +1,83 @@ +# Gemini Agent Instructions for quickcmd + +This document provides instructions for the Gemini agent on how to interact with, develop, and manage the `quickcmd` project. + +## Project Overview + +`quickcmd` is a command-line utility designed to simplify the management and execution of custom commands. It uses `fzf` for fuzzy-finding through commands defined in `.ini` files, allowing for quick, interactive execution. The core logic is written in Python, with a `bash` script acting as the main entry point and wrapper. + +## Core Technologies + +- **Language**: Python 2/3, Bash +- **Key Libraries**: `requests`, `urllib3` +- **Core Tools**: `fzf` (a command-line fuzzy finder) +- **Configuration**: `.ini` files for command definitions. +- **Testing**: Standard Python `unittest` module. + +## Project Structure + +- `quickcmd.sh`: The main entry point script that the user sources and calls via the `qc` function. It handles locating the python interpreter and executing the main python script. +- `src/`: Contains all Python source code. + - `quickcmd.py`: The main Python script that parses arguments and orchestrates the application flow. + - `command_manager.py`: Handles loading, adding, deleting, and modifying commands from `.ini` files. + - `command.py`: Represents a single command object. + - `fzf.py`: A wrapper for interacting with the `fzf` command-line tool. + - `iniparser.py`: Custom parser for the `.ini` configuration files. +- `commands/`: The default directory where user-defined command `.ini` files are stored. +- `test/`: Contains unit tests. +- `install.sh`: Script for installing `quickcmd` and its dependencies (`fzf`, python packages). +- `requirements.txt`: Python dependencies. + +## Development Workflow + +### 1. Install Dependencies + +The project requires Python and the packages listed in `requirements.txt`. `fzf` is also a critical dependency. + +To install Python dependencies: +```bash +pip install -r requirements.txt +``` + +### 2. Running the Application + +The application is intended to be run via the `qc` function defined in `quickcmd.sh`. For development and testing, you can execute the main Python script directly: + +```bash +python src/quickcmd.py +``` + +To list all commands without using `fzf`: +```bash +python src/quickcmd.py --list +``` + +### 3. Running Tests + +Tests are located in the `test/` directory and can be run using Python's `unittest` module. + +```bash +python -m unittest discover test +``` + +### 4. Managing Commands + +- **Location**: Commands are stored in `.ini` files within the `commands/` directory. +- **Adding a Command**: New commands can be added by creating a new `.ini` file or adding a new section to an existing one in the `commands/` directory. The interactive way is to run `python src/quickcmd.py --addcmd`. +- **Structure of a command in `.ini`**: + ```ini + [command_name] + command = echo "Hello, World!" + workdir = /tmp + tip = A simple example command. + ``` + +## Goals for Gemini + +Your primary tasks for this project will be: + +1. **Bug Fixes**: Identify and fix bugs in the Python source code (`src/`) or the `quickcmd.sh` script. +2. **Feature Development**: Add new features, such as new command types, enhanced configuration options, or improved output formatting. +3. **Refactoring**: Improve code quality, such as refactoring the `iniparser.py` or improving the command execution logic in `command_manager.py`. +4. **Writing Tests**: Add new unit tests in the `test/` directory to cover new or existing functionality. +5. **Managing Commands**: Assist with creating, modifying, or deleting commands in the `commands/` directory as requested. diff --git a/config/stock_factors.ini b/config/stock_factors.ini new file mode 100644 index 0000000..cad810e --- /dev/null +++ b/config/stock_factors.ini @@ -0,0 +1,19 @@ +[default] +# Default investment factor if not specified by company or industry +factor = 0.85 +# Default dividend payout ratio +payout_ratio = 0.50 + +[company] +# Supports company name or stock code +# Example for a specific stock +五粮液 = 0.85 +000858 = 0.85 +HK01113 = 0.90 + +[industry] +# Investment factors by industry +服装 = 0.5 +房地产 = 0.6 +银行 = 0.7 +白酒 = 0.8 diff --git a/request.txt b/request.txt new file mode 100644 index 0000000..df0762d --- /dev/null +++ b/request.txt @@ -0,0 +1,94 @@ +理解以下文档中的内容,帮我基于 akshare 在 quickcmd 中实现以下功能: + +- 根据关键字搜索股票信息,支持深市 A 股、B 股、港股 +- 根据名称或股票代码,通过 akshare 获取股票信息 +- 然后通过下面的公式,对股票进行穿透回报率计算,以及计算对应值得投资的股价。 + - 计算公式中,有个投资因子(悲观因子),可以通过读取指定数据文件(或项目默认路径的的数据文件中)获取,例如五粮液的投资因子是 0.85,要在人为预期估值的 85% 作为买入点,以此提供 15% 的安全边际。 + - 投资因子可以根据行业或者公司名称在配置文件中指定 +- 投资因子的数据文件中,要采用高信息密度的格式,并且要可读性可修改性强,方便观看和修改。 + 例如格式如果使用 ini 可以是: + +``` +[default] +factor = 0.85 + +[company] +-- 支持代码或公司名称 +HK01113 = 0.85 +五粮液 = 0.85 + + +[industry] +服装 = 0.5 +房地产 = 0.6 +``` + +- 先从公司中找,没有找到找行业,没有找到找默认,如果配置文件没有默认配置,那么就让用户输入。 +- 同样还是 qc 启动,添加 stock 类型的 cmd_type,专门用于此功能。例如可以通过以下配置 (commands/stock.ini 中的内容) 来触发对应的功能: + +``` +[Analyze Stock] +type = "stock" +``` + +以下是文档: + +# 第 2 期 选股因子之穿透回报率 + +背景:一部分有能力或者精力的朋友,想要优化获得现金流的标的,并不满足于被动投资红利 ETF + +目的:通过一系列的限制和筛选,找到一套寻觅股息穿透稳定的企业 + +解决的问题:以什么价格去买。 + +穿透的意思:市场 <--> 公司 <--> 股东,也就是股东能从市场(穿透了公司)获取到的收益/回报。 + +投资者关注点:不关心公司本身挣了多少钱,而是关心有多少钱能穿透到投资者身上。 + +## 穿透回报率应该包含的内容 + +- 分钱:直接分现金 +- 注销型回购:公司购买股票,然后注销,相当于投资者手上的股份更多了。 + - 反对:股权激励型的回购,相当于拿投资人的钱买股票,发给高管(国内绝大多数都是发给高管)。 + +## 穿透回报率计算方式 + +必须掌握: +- 净利润 x 股息支付率 = 公司公告最低股息支付率 x 你个人预测的保守未来净利润(保守估计,因此最好再乘以 0.85) +- 真实可支配现金流 + 现金储备 + +可供参考: + +- 管理层承诺 +- 历史线外推:根据公司长期(5-10 年)不变的派息金额或者股息支付率来计算 +- 商业模式影响:是否具备投资、花销的空间 + - 下下策,只能靠猜 + - 轻资产:10-12 倍估值 + - 重资产:6-9 倍估值 + +### 计算示例 + +五粮液给出:24 - 26 年股息支付率不得低于 70%,且不得低于 200 亿, +那么,就将五粮液的股息支付率设置成 70%(0.7),考虑到五粮液利润极度稳定,在悲观(保守)的情况下,把去年的利润打 8 折,再乘以 70%,得到今年股息;再除以市值,得到股息率。 + + +## 是否可持续 + +需要了解:真实可支配现金流(真钱、不是假钱应付账款) + 现金储备 + +- 真实可支配现金流:经营性现金流净流入 - 资本开支 - 财务费用 + - 所谓的真实可支配的现金流,就是看公司到底有多少钱可以用于派息)有些公司虽然有不少利润,但是他的投资花销远为于经营得到的现金,他的股息本质上是**贷款**在派息,这是**不可靠的**,不能够长期的持续。 + - 在有选择的情况下,我们会优先选择`经营得到的现金流 - 资本开支流出的现金流 - 财务费用消耗的现金流,即真实剔除投资开支 + 银行利息后赚到的现金`能够覆盖派息的。 + - 这一种派息才是可持续的,否则本质上就是**贷款分红**或者**融资分红**。 + +- 现金储备:现金扣除有息负债后的储备 + - 而现金储备扣除了银行的贷款,尤其是**短期借款**,是一种粗略的衡量公司能否稳定派息的一个保障。更加保守的计算口径,是`总现金 - 流动负债`。 + + +## 理想中的穿透回报率条件 + +- 高度可计算的回报率,首先搜查有无公开文件规定了最低的股息支付率或者管理层直接公开承诺了金额 +- 如没有,则可以根据有无历史稳定的派息规律进行线性外推 +- 如果还是没有,那么回报的结果就是高度不确定的,我们只能根据商业模式结合历史数据悲观猜测 +- 同时我们要求安全的现金流 + 现金储备 +- 穿透回报率的诉求,至少为 3%,最好 5% 以上,且要同时满足大于货币无风险利率 1% diff --git a/requirements.txt b/requirements.txt index 73ed6b4..27fd032 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ requests -urllib3<2.0 \ No newline at end of file +urllib3<2.0 +akshare +pandas diff --git a/src/command.py b/src/command.py index 23bb8c1..692188d 100644 --- a/src/command.py +++ b/src/command.py @@ -17,32 +17,36 @@ def __init__(self, file="", name="", items=[]): self.tip = infos.get("tip", "") self.api_key = infos.get("api_key", "") self.multi_line_question = infos.get("multi_line_question", False) + self.type = infos.get("type", "") self.file = file - if self.command: + if self.type == "stock": + self.cmd_type = "stock" + prefix = "[STOCK] " + elif self.command: prefix = "[CMD] " self.cmd_type = "cmd" - elif self.godir: self.cmd_type = "cd" prefix = "[GOTO] " - elif self.tip: self.cmd_type = "tip" prefix = "[TIP] " - elif self.api_key: self.cmd_type = "chatgpt" prefix = "[ChatGPT] " - else: - sys.exit("Unknown command") + sys.exit(f"Unknown command: {self.type}") + self.name = prefix + name self.name = self.name.replace(" ", "-") self.qcc = QuickCmdColor() + def get_type(self): + return self.cmd_type + def abs_path(self, path): if path and path.startswith("~"): # os.path.join(os.path.expanduser("~"), path[1:]) @@ -108,6 +112,7 @@ def complete(self): for variable in variables: m = re.match(r'^\${(\w+)}$', variable) + # m = re.match(r'^\${(\w+)}', variable) name = m.group(1) try: @@ -186,6 +191,9 @@ def tostring(self): elif self.cmd_type == "tip": s = "%s\n[+] Tip = %s" % (s, self.tip) + elif self.cmd_type == "stock": + s = "%s\n[+] Type = Stock Analysis" % (s) + if self.file: s = "%s\n[+] In file = %s" %(s, self.file) @@ -199,7 +207,10 @@ def fzf_str(self, index): res = "{}; cd {}".format(res, self.godir) elif self.tip: res = "{}; tip: {}".format(res, self.tip) + elif self.cmd_type == "stock": + res = "{}: {}".format(res, "Perform stock analysis") + - res = "{}\r\n".format(res) + res = "{}\n".format(res) return res diff --git a/src/command_manager.py b/src/command_manager.py index 972ffef..dae73ac 100644 --- a/src/command_manager.py +++ b/src/command_manager.py @@ -10,8 +10,7 @@ from quickcmd_color import QuickCmdColor import platform import iniparser -# import ConfigParser - +from stock_analyzer import run_stock_analysis_workflow class CommandManager(object): def __init__(self, cmddir): @@ -22,7 +21,6 @@ def __init__(self, cmddir): self.cp = None def to_cmds(self, inifile, configs): - #self.commands = [] for config in configs: section, options = config cmd = Command(inifile, section, options) @@ -32,21 +30,15 @@ def load_cmds(self): if not os.path.exists(self.cmddir): return None - #all_config = [] for path, _, files in os.walk(self.cmddir): for filename in files: - # 跳过非 ini 文件 if not filename.endswith(".ini"): continue inifile = os.path.join(path, filename) parser = iniparser.IniParser(inifile) configs = parser.all() - #all_config += configs self.to_cmds(inifile, configs) - #self.to_cmds(inifile, all_config) - # return self.commands - def get_cmds(self): return self.commands @@ -61,16 +53,14 @@ def print_cmds(self): self.qcc.light_green_print(cmd.tostring()) def get_cmd(self, index): - if self.commands and index is not None and index >= 0: + if self.commands and index is not None and index >= 0 and index < len(self.commands): return self.commands[index] return None def gen_config(self, name, config_dict): configs = list() - for key, value in config_dict.items(): configs.append((key, value)) - return name, configs def add_cmd(self): @@ -84,79 +74,63 @@ def add_cmd(self): self.qcc.red_print("invalid command name!") continue - cmd_type = self.qcc.green_input("\n1.command\n2.change directory\n3.tip\n4.ChatGPT\nCommand type: ") - if int(cmd_type) == 1: + cmd_type_prompt = "\n1.command\n2.change directory\n3.tip\n4.ChatGPT\n5.Stock Analysis\nCommand type: " + cmd_type = self.qcc.green_input(cmd_type_prompt) + + if not cmd_type.isdigit(): + self.qcc.red_print("Invalid selection.") + continue + + cmd_type = int(cmd_type) + + if cmd_type == 1: cmd = self.qcc.green_input("Command: ") if not cmd: self.qcc.red_print("Bad command") continue - config_dict["command"] = cmd - workdir = self.qcc.green_input("Work Directory: ") config_dict["workdir"] = workdir - - elif int(cmd_type) == 2: + elif cmd_type == 2: godir = self.qcc.green_input("Directory: ") - if not godir: - self.qcc.red_print("Bad directory " + godir) + if not godir or not os.path.exists(godir): + self.qcc.red_print("Bad or non-existent directory: " + godir) continue - - if not os.path.exists(godir): - self.qcc.red_print("Directory not found: " + godir) - continue - config_dict["godir"] = godir - - elif int(cmd_type) == 3: + elif cmd_type == 3: tip = self.qcc.lines_input("Tip: ") if not tip: self.qcc.red_print("Bad Tip") continue - config_dict["tip"] = tip - - elif int(cmd_type) == 4: + elif cmd_type == 4: api_key = self.qcc.green_input("API Key: ") if not api_key: self.qcc.red_print("Bad API Key") continue - config_dict["api_key"] = api_key - multi_line = self.qcc.green_input("Do you want to enter multiple lines? [Y/N]: ") - if multi_line and (multi_line == "Yes" or multi_line == "YES" or multi_line == "Y" or multi_line == "y"): - multi_line = True - - else: - multi_line = False - - config_dict["multi_line_question"] = multi_line - - # TODO try to ask a question to check if the API key is valid + config_dict["multi_line_question"] = multi_line.lower() in ['y', 'yes'] + elif cmd_type == 5: + config_dict["type"] = "stock" + else: + self.qcc.red_print("Invalid command type!") + continue break - # print cmd file - print(name) - files = os.listdir(self.cmddir) - cmdfiles = [] - for file in files: - if file.endswith(".ini"): - cmdfiles.append(file) - + files = [f for f in os.listdir(self.cmddir) if f.endswith(".ini")] cmdfile = None while True: i = 1 - for file in cmdfiles: + for file in files: self.qcc.blue_print("{}: {}".format(i, file)) i += 1 self.qcc.red_print("{}: new file".format(i)) - select = self.qcc.green_input("please select: ") # 直接回车就是默认文件 + select = self.qcc.green_input("please select: ") select = select.strip() - # 生成命令 + if select == "": - # default file cmdfile = self.def_qc_file break @@ -166,26 +140,14 @@ def add_cmd(self): select_num = int(select) if select_num == i: - # new file - while True: - newfn = None - while True: - newfn = self.qcc.green_input("input new filename: ") - if newfn: - break - cmdfile = "{}/{}".format(self.cmddir, newfn) - if not newfn.endswith(".ini"): - cmdfile = "{}.ini".format(cmdfile) - if not os.path.exists(cmdfile): - break - break - elif select_num > 0 and select_num < i: - # select file - cmdfile = "{}/{}".format(self.cmddir, - cmdfiles[select_num - 1]) + newfn = self.qcc.green_input("input new filename (without .ini): ") + if newfn: + cmdfile = "{}/{}.ini".format(self.cmddir, newfn) + break + elif 0 < select_num < i: + cmdfile = os.path.join(self.cmddir, files[select_num - 1]) break else: - # invalid select self.qcc.red_print("invalid input") section, configs = self.gen_config(name, config_dict) @@ -196,12 +158,11 @@ def add_cmd(self): def del_cmd(self, cmd): cmdfile = cmd.get_file() - section = cmd.get_name() + section = cmd.get_name().split(']', 1)[1].replace('-', ' ') # Get original name self.qcc.red_print("delete {} from {}".format(section, cmdfile)) parser = iniparser.IniParser(cmdfile) parser.delete(section) parser.save_or_remove() - pass def mod_cmd(self, cmd_obj): cmdfile = cmd_obj.get_file() @@ -246,7 +207,10 @@ def set_action_mod(self): def do_action(self, cmd): if self.action == "run": - if cmd.complete(): + cmd_type = cmd.get_type() + if cmd_type == "stock": + run_stock_analysis_workflow() + elif cmd.complete(): return cmd.execute() elif self.action == "del": return self.del_cmd(cmd) diff --git a/src/fzf.py b/src/fzf.py index dceb91d..140897a 100644 --- a/src/fzf.py +++ b/src/fzf.py @@ -29,7 +29,10 @@ def prepare_files(self, cmds): with codecs.open(self.fzf_input, 'w', encoding='utf-8') as fp: i = 1 for cmd in cmds: - fp.write(cmd.fzf_str(i)) + if isinstance(cmd, str): + fp.write("{:0>3}: {}\n".format(i, cmd)) + else: + fp.write(cmd.fzf_str(i)) i = i + 1 def run(self): @@ -53,4 +56,4 @@ def parse_output(self): output = output[:pos].rstrip('\r\n\t ') if not output: return None - return (int(output) - 1) \ No newline at end of file + return (int(output) - 1) diff --git a/src/quickcmd.py b/src/quickcmd.py index 8e9867d..290601e 100755 --- a/src/quickcmd.py +++ b/src/quickcmd.py @@ -146,4 +146,4 @@ def main(args=None): if __name__ == "__main__": - exit(main()) + exit(main()) \ No newline at end of file diff --git a/src/stock_analyzer.py b/src/stock_analyzer.py new file mode 100644 index 0000000..087cc6b --- /dev/null +++ b/src/stock_analyzer.py @@ -0,0 +1,220 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- + +import akshare as ak +import pandas as pd +import os +from iniparser import IniParser +from quickcmd_color import QuickCmdColor +from fzf import FuzzyFinder + +class FactorManager: + def __init__(self, config_path, stock_code, stock_name, industry): + self.config_path = config_path + self.stock_code = stock_code + self.stock_name = stock_name + self.industry = industry + self.parser = IniParser(self.config_path) if os.path.exists(self.config_path) else None + self.qcc = QuickCmdColor() + + def _get_value(self, section, key): + if not self.parser: + return None + # Use get_config which is the method in the provided iniparser.py + config = self.parser.get_config(section, key) + return config[1] if config else None + + def get_factor(self): + # 1. Check company by code + factor = self._get_value('company', self.stock_code) + if factor: + return float(factor) + + # 2. Check company by name + factor = self._get_value('company', self.stock_name) + if factor: + return float(factor) + + # 3. Check industry + factor = self._get_value('industry', self.industry) + if factor: + return float(factor) + + # 4. Check default + factor = self._get_value('default', 'factor') + if factor: + return float(factor) + + # 5. Prompt user + while True: + try: + factor_input = self.qcc.green_input(f"No factor found for {self.stock_name}. Please enter a factor (e.g., 0.85): ") + return float(factor_input) + except (ValueError, TypeError): + self.qcc.red_print("Invalid input. Please enter a number.") + + def get_payout_ratio(self): + # This logic needs to be adapted to how you want to store payout ratios in the ini. + # For now, let's assume a simple lookup similar to factor. + ratio = self._get_value('company', f"{self.stock_code}_payout_ratio") or \ + self._get_value('company', f"{self.stock_name}_payout_ratio") or \ + self._get_value('industry', f"{self.industry}_payout_ratio") or \ + self._get_value('default', 'payout_ratio') + + return float(ratio) if ratio else 0.5 # Default to 0.5 if not found + + +class StockAnalyzer: + def __init__(self, stock_code, stock_name): + self.stock_code = stock_code + self.stock_name = stock_name + self.qcc = QuickCmdColor() + self.data = {} + + def _format_value(self, value): + if isinstance(value, (int, float)): + return f"{value / 100_000_000:.2f} 亿" + return "N/A" + + def analyze(self): + try: + self.qcc.blue_print(f"Fetching data for {self.stock_name} ({self.stock_code})...") + + # Fetch all necessary data + self.data['profile'] = ak.stock_individual_info_em(symbol=self.stock_code) + self.data['quote'] = ak.stock_zh_a_spot_em(symbol=self.stock_code) if self.stock_code.isdigit() else ak.stock_hk_spot_em(symbol=self.stock_code) + self.data['cash_flow'] = ak.stock_cash_flow_sheet_by_report_em(symbol=self.stock_code) + self.data['balance_sheet'] = ak.stock_balance_sheet_by_report_em(symbol=self.stock_code) + self.data['income_statement'] = ak.stock_financial_analysis_indicator(symbol=self.stock_code) + + # Extract key values + industry = self.data['profile'].loc[self.data['profile']['item'] == '行业'].iloc[0, 1] + market_cap = self.data['quote']['总市值'].iloc[0] + + # Get latest annual data + latest_annual_report_date = self.data['income_statement']['报告期'].iloc[0] + + net_profit = self.data['income_statement'][self.data['income_statement']['报告期'] == latest_annual_report_date]['净利润 (元)'].iloc[0] + + cash_flow_latest = self.data['cash_flow'][self.data['cash_flow']['REPORT_DATE'].str.contains('12-31')].iloc[0] + op_cash_flow = cash_flow_latest['NET_CASH_FLOWS_OPER_ACT'] + capex = cash_flow_latest['NET_CASH_FLOWS_INV_ACT'] + + balance_sheet_latest = self.data['balance_sheet'][self.data['balance_sheet']['REPORT_DATE'].str.contains('12-31')].iloc[0] + financial_expenses = balance_sheet_latest.get('FIN_EXP', 0) # Use .get for safety + total_cash = balance_sheet_latest['MONETARY_CAPITAL'] + current_liabilities = balance_sheet_latest['TOTAL_CURRENT_LIAB'] + + # Initialize FactorManager + config_path = os.path.join(os.path.dirname(__file__), '..', 'config', 'stock_factors.ini') + factor_mgr = FactorManager(config_path, self.stock_code, self.stock_name, industry) + investment_factor = factor_mgr.get_factor() + payout_ratio = factor_mgr.get_payout_ratio() + + # Calculations + conservative_net_profit = net_profit * investment_factor + estimated_dividend = conservative_net_profit * payout_ratio + penetration_return_rate = (estimated_dividend / market_cap) * 100 if market_cap else 0 + + real_controllable_cash_flow = op_cash_flow + capex - (financial_expenses or 0) + cash_reserve = total_cash - current_liabilities + + # Target price for 5% return + target_market_cap_5_percent = estimated_dividend / 0.05 + current_price = self.data['quote']['最新价'].iloc[0] + current_total_shares = market_cap / current_price if current_price else 0 + target_price_5_percent = target_market_cap_5_percent / current_total_shares if current_total_shares else 0 + + # Print report + self.qcc.light_green_print("\n--- Stock Analysis Report ---") + self.qcc.white_print(f" {self.stock_name} ({self.stock_code}) - {industry}") + self.qcc.light_green_print("-----------------------------") + + self.qcc.white_print(f" Market Cap: {self._format_value(market_cap)}") + self.qcc.white_print(f" Last Year Net Profit: {self._format_value(net_profit)}") + + self.qcc.light_green_print("\n--- Penetration Return Rate ---") + self.qcc.white_print(f" Investment Factor: {investment_factor:.2f}") + self.qcc.white_print(f" Assumed Payout Ratio: {payout_ratio:.2f}") + self.qcc.cyan_print(f" Conservative Est. Dividend: {self._format_value(estimated_dividend)}") + self.qcc.yellow_print(f" Estimated Penetration Return Rate: {penetration_return_rate:.2f}%") + + self.qcc.light_green_print("\n--- Sustainability Check ---") + self.qcc.white_print(f" Real Controllable Cash Flow: {self._format_value(real_controllable_cash_flow)}") + self.qcc.white_print(f" Cash Reserve (Cash - Current Liab.): {self._format_value(cash_reserve)}") + if real_controllable_cash_flow > estimated_dividend: + self.qcc.green_print(" Cash flow can cover estimated dividend.") + else: + self.qcc.red_print(" Warning: Cash flow may NOT cover estimated dividend.") + + self.qcc.light_green_print("\n--- Investment Valuation ---") + self.qcc.white_print(f" Target Price for 5% Return: {target_price_5_percent:.2f} CNY") + self.qcc.light_green_print("-----------------------------\n") + + except Exception as e: + self.qcc.red_print(f"An error occurred during analysis: {e}") + self.qcc.red_print("This might be due to data availability for the selected stock (e.g., non-A-shares, new listings).") + + +def search_stocks(keyword): + """ + Search for stocks across different markets. + """ + qcc = QuickCmdColor() + qcc.blue_print(f"Searching for '{keyword}'...") + try: + all_results = [] + + # A-shares + stock_a_df = ak.stock_info_a_code_name() + result_a = stock_a_df[stock_a_df.apply(lambda row: keyword.lower() in str(row['code']).lower() or keyword.lower() in str(row['name']).lower(), axis=1)].copy() + if not result_a.empty: + result_a['market'] = 'A-Share' + all_results.append(result_a[['code', 'name', 'market']]) + + # TODO HK-shares + # stock_hk_df = ak.stock_info_hk_name() + # result_hk = stock_hk_df[stock_hk_df.apply(lambda row: keyword.lower() in str(row['代码']).lower() or keyword.lower() in str(row['名称']).lower(), axis=1)].copy() + # if not result_hk.empty: + # result_hk = result_hk.rename(columns={'代码': 'code', '名称': 'name'}) + # result_hk['market'] = 'HK-Share' + # all_results.append(result_hk[['code', 'name', 'market']]) + + if not all_results: + return [] + + combined_results = pd.concat(all_results, ignore_index=True) + + return [f"{row['code']} - {row['name']} - {row['market']}" for index, row in combined_results.iterrows()] + + except Exception as e: + qcc.red_print(f"An error occurred during stock search: {e}") + return [] + +def run_stock_analysis_workflow(): + qcc = QuickCmdColor() + keyword = qcc.green_input("Enter stock keyword (name or code) to search: ") + if not keyword: + qcc.red_print("Search keyword cannot be empty.") + return + + results = search_stocks(keyword) + if not results: + qcc.red_print(f"No stock found for keyword: {keyword}") + return + + fzf = FuzzyFinder() + fzf.prepare_files(results) + fzf.run() + index = fzf.parse_output() + + print(index) + + if index is not None and index < len(results): + selected = results[index] + print(selected) + parts = selected.split(' - ') + stock_code = parts[0] + stock_name = parts[1] + analyzer = StockAnalyzer(stock_code, stock_name) + analyzer.analyze() From 32a945fa5db3abb8ac42237a771b8b203588f021 Mon Sep 17 00:00:00 2001 From: robot Date: Wed, 6 Aug 2025 01:46:02 +0800 Subject: [PATCH 2/3] feature: cmd type 'stock'. --- .gitignore | 2 ++ request.txt | 94 ----------------------------------------------------- 2 files changed, 2 insertions(+), 94 deletions(-) delete mode 100644 request.txt diff --git a/.gitignore b/.gitignore index ecb348b..d41a4e3 100644 --- a/.gitignore +++ b/.gitignore @@ -132,3 +132,5 @@ dmypy.json .DS_Store commands/ .qc.cd.path +request.txt + diff --git a/request.txt b/request.txt deleted file mode 100644 index df0762d..0000000 --- a/request.txt +++ /dev/null @@ -1,94 +0,0 @@ -理解以下文档中的内容,帮我基于 akshare 在 quickcmd 中实现以下功能: - -- 根据关键字搜索股票信息,支持深市 A 股、B 股、港股 -- 根据名称或股票代码,通过 akshare 获取股票信息 -- 然后通过下面的公式,对股票进行穿透回报率计算,以及计算对应值得投资的股价。 - - 计算公式中,有个投资因子(悲观因子),可以通过读取指定数据文件(或项目默认路径的的数据文件中)获取,例如五粮液的投资因子是 0.85,要在人为预期估值的 85% 作为买入点,以此提供 15% 的安全边际。 - - 投资因子可以根据行业或者公司名称在配置文件中指定 -- 投资因子的数据文件中,要采用高信息密度的格式,并且要可读性可修改性强,方便观看和修改。 - 例如格式如果使用 ini 可以是: - -``` -[default] -factor = 0.85 - -[company] --- 支持代码或公司名称 -HK01113 = 0.85 -五粮液 = 0.85 - - -[industry] -服装 = 0.5 -房地产 = 0.6 -``` - -- 先从公司中找,没有找到找行业,没有找到找默认,如果配置文件没有默认配置,那么就让用户输入。 -- 同样还是 qc 启动,添加 stock 类型的 cmd_type,专门用于此功能。例如可以通过以下配置 (commands/stock.ini 中的内容) 来触发对应的功能: - -``` -[Analyze Stock] -type = "stock" -``` - -以下是文档: - -# 第 2 期 选股因子之穿透回报率 - -背景:一部分有能力或者精力的朋友,想要优化获得现金流的标的,并不满足于被动投资红利 ETF - -目的:通过一系列的限制和筛选,找到一套寻觅股息穿透稳定的企业 - -解决的问题:以什么价格去买。 - -穿透的意思:市场 <--> 公司 <--> 股东,也就是股东能从市场(穿透了公司)获取到的收益/回报。 - -投资者关注点:不关心公司本身挣了多少钱,而是关心有多少钱能穿透到投资者身上。 - -## 穿透回报率应该包含的内容 - -- 分钱:直接分现金 -- 注销型回购:公司购买股票,然后注销,相当于投资者手上的股份更多了。 - - 反对:股权激励型的回购,相当于拿投资人的钱买股票,发给高管(国内绝大多数都是发给高管)。 - -## 穿透回报率计算方式 - -必须掌握: -- 净利润 x 股息支付率 = 公司公告最低股息支付率 x 你个人预测的保守未来净利润(保守估计,因此最好再乘以 0.85) -- 真实可支配现金流 + 现金储备 - -可供参考: - -- 管理层承诺 -- 历史线外推:根据公司长期(5-10 年)不变的派息金额或者股息支付率来计算 -- 商业模式影响:是否具备投资、花销的空间 - - 下下策,只能靠猜 - - 轻资产:10-12 倍估值 - - 重资产:6-9 倍估值 - -### 计算示例 - -五粮液给出:24 - 26 年股息支付率不得低于 70%,且不得低于 200 亿, -那么,就将五粮液的股息支付率设置成 70%(0.7),考虑到五粮液利润极度稳定,在悲观(保守)的情况下,把去年的利润打 8 折,再乘以 70%,得到今年股息;再除以市值,得到股息率。 - - -## 是否可持续 - -需要了解:真实可支配现金流(真钱、不是假钱应付账款) + 现金储备 - -- 真实可支配现金流:经营性现金流净流入 - 资本开支 - 财务费用 - - 所谓的真实可支配的现金流,就是看公司到底有多少钱可以用于派息)有些公司虽然有不少利润,但是他的投资花销远为于经营得到的现金,他的股息本质上是**贷款**在派息,这是**不可靠的**,不能够长期的持续。 - - 在有选择的情况下,我们会优先选择`经营得到的现金流 - 资本开支流出的现金流 - 财务费用消耗的现金流,即真实剔除投资开支 + 银行利息后赚到的现金`能够覆盖派息的。 - - 这一种派息才是可持续的,否则本质上就是**贷款分红**或者**融资分红**。 - -- 现金储备:现金扣除有息负债后的储备 - - 而现金储备扣除了银行的贷款,尤其是**短期借款**,是一种粗略的衡量公司能否稳定派息的一个保障。更加保守的计算口径,是`总现金 - 流动负债`。 - - -## 理想中的穿透回报率条件 - -- 高度可计算的回报率,首先搜查有无公开文件规定了最低的股息支付率或者管理层直接公开承诺了金额 -- 如没有,则可以根据有无历史稳定的派息规律进行线性外推 -- 如果还是没有,那么回报的结果就是高度不确定的,我们只能根据商业模式结合历史数据悲观猜测 -- 同时我们要求安全的现金流 + 现金储备 -- 穿透回报率的诉求,至少为 3%,最好 5% 以上,且要同时满足大于货币无风险利率 1% From be6478f54d0dd80ddd3cf55f820c79a14cc694ae Mon Sep 17 00:00:00 2001 From: robot Date: Mon, 11 Aug 2025 00:55:22 +0800 Subject: [PATCH 3/3] change: update. --- .gitignore | 1 + src/stock_analyzer.py | 79 ++++++-- src/stock_info.py | 432 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 492 insertions(+), 20 deletions(-) create mode 100644 src/stock_info.py diff --git a/.gitignore b/.gitignore index d41a4e3..3598673 100644 --- a/.gitignore +++ b/.gitignore @@ -133,4 +133,5 @@ dmypy.json commands/ .qc.cd.path request.txt +*_cache.json diff --git a/src/stock_analyzer.py b/src/stock_analyzer.py index 087cc6b..9b32a32 100644 --- a/src/stock_analyzer.py +++ b/src/stock_analyzer.py @@ -1,12 +1,19 @@ #! /usr/bin/env python3 # -*- coding: utf-8 -*- +import os +import json import akshare as ak import pandas as pd -import os from iniparser import IniParser from quickcmd_color import QuickCmdColor from fzf import FuzzyFinder +from datetime import datetime, timedelta +from stock_info import get_stock_info_cn_cached, \ + get_stock_info_hk_cached, \ + get_stock_info_us_cached, \ + get_stock_individual_info, \ + get_stock_quote class FactorManager: def __init__(self, config_path, stock_code, stock_name, industry): @@ -65,9 +72,10 @@ def get_payout_ratio(self): class StockAnalyzer: - def __init__(self, stock_code, stock_name): + def __init__(self, stock_code, stock_name, stock_share): self.stock_code = stock_code self.stock_name = stock_name + self.stock_share = stock_share self.qcc = QuickCmdColor() self.data = {} @@ -77,12 +85,18 @@ def _format_value(self, value): return "N/A" def analyze(self): + if self.stock_share not in ["A-Share", "HK-Stock"]: + self.qcc.red_print(f"Unsupported stock share: {self.stock_share}") + return None + try: - self.qcc.blue_print(f"Fetching data for {self.stock_name} ({self.stock_code})...") + self.qcc.blue_print(f"Fetching data for {self.stock_name} ({self.stock_code}, {self.stock_share})...") # Fetch all necessary data - self.data['profile'] = ak.stock_individual_info_em(symbol=self.stock_code) - self.data['quote'] = ak.stock_zh_a_spot_em(symbol=self.stock_code) if self.stock_code.isdigit() else ak.stock_hk_spot_em(symbol=self.stock_code) + self.data['profile'] = get_stock_individual_info(stock_code=self.stock_code, stock_share=self.stock_share) + self.data['quote'] =get_stock_quote(stock_code=self.stock_code, stock_share=self.stock_share) + print(f"---isshe---: profile: {self.data['profile']}, quote: {self.data['quote']}") + # self.data['cash_flow'] = ak.stock_cash_flow_sheet_by_report_em(symbol=self.stock_code) self.data['balance_sheet'] = ak.stock_balance_sheet_by_report_em(symbol=self.stock_code) self.data['income_statement'] = ak.stock_financial_analysis_indicator(symbol=self.stock_code) @@ -156,7 +170,7 @@ def analyze(self): self.qcc.red_print("This might be due to data availability for the selected stock (e.g., non-A-shares, new listings).") -def search_stocks(keyword): +def search_stocks(keyword, us=False): """ Search for stocks across different markets. """ @@ -164,21 +178,45 @@ def search_stocks(keyword): qcc.blue_print(f"Searching for '{keyword}'...") try: all_results = [] + keyword_lower = keyword.lower() # A-shares - stock_a_df = ak.stock_info_a_code_name() - result_a = stock_a_df[stock_a_df.apply(lambda row: keyword.lower() in str(row['code']).lower() or keyword.lower() in str(row['name']).lower(), axis=1)].copy() - if not result_a.empty: - result_a['market'] = 'A-Share' - all_results.append(result_a[['code', 'name', 'market']]) - - # TODO HK-shares - # stock_hk_df = ak.stock_info_hk_name() - # result_hk = stock_hk_df[stock_hk_df.apply(lambda row: keyword.lower() in str(row['代码']).lower() or keyword.lower() in str(row['名称']).lower(), axis=1)].copy() - # if not result_hk.empty: - # result_hk = result_hk.rename(columns={'代码': 'code', '名称': 'name'}) - # result_hk['market'] = 'HK-Share' - # all_results.append(result_hk[['code', 'name', 'market']]) + stock_a_list = get_stock_info_cn_cached() + result_a = [ + stock for stock in stock_a_list + if keyword_lower in str(stock['code']).lower() or + keyword_lower in str(stock['name']).lower() + ] + if result_a: + # list is not empty + result_df = pd.DataFrame(result_a) + result_df['market'] = 'A-Share' + all_results.append(result_df[['code', 'name', 'market']]) + + # HK-Stock + stock_hk_list = get_stock_info_hk_cached() + result_hk = [ + stock for stock in stock_hk_list + if keyword_lower in str(stock['code']).lower() or + keyword_lower in str(stock['name']).lower() + ] + if result_hk: + result_df = pd.DataFrame(result_hk) + result_df['market'] = 'HK-Stock' + all_results.append(result_df[['code', 'name', 'market']]) + + if us: + # US-Stock + stock_us_list = get_stock_info_us_cached() + result_us = [ + stock for stock in stock_us_list + if keyword_lower in str(stock['code']).lower() or + keyword_lower in str(stock['name']).lower() + ] + if result_us: + result_df = pd.DataFrame(result_us) + result_df['market'] = 'US-Stock' + all_results.append(result_df[['code', 'name', 'market']]) if not all_results: return [] @@ -216,5 +254,6 @@ def run_stock_analysis_workflow(): parts = selected.split(' - ') stock_code = parts[0] stock_name = parts[1] - analyzer = StockAnalyzer(stock_code, stock_name) + stock_share = parts[2] + analyzer = StockAnalyzer(stock_code, stock_name, stock_share) analyzer.analyze() diff --git a/src/stock_info.py b/src/stock_info.py new file mode 100644 index 0000000..4e9f297 --- /dev/null +++ b/src/stock_info.py @@ -0,0 +1,432 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import json +import akshare as ak +import pandas as pd +from datetime import datetime, timedelta + +default_cache_hours = 168 + +def get_stock_info_cn_cached(cache_file: str = 'stock_info_cache.json', cache_hours: int = default_cache_hours): + """ + Get A-share stock basic information with local JSON cache support. + + Args: + cache_file (str): Path to the cache file. Defaults to 'stock_info_cache.json'. + cache_hours (int): Cache expiration time in hours. Defaults to default_cache_hours. + + Returns: List of stock information + [ + { + 'code': str, # Stock code + 'name': str # Stock name + }, + ... + ] + + Raises: + Exception: If failed to fetch data and no valid cache exists. + """ + + # Check if cache file exists + if os.path.exists(cache_file): + try: + # Load cache file + with open(cache_file, 'r', encoding='utf-8') as f: + cache_data = json.load(f) + + # Check if cache has required fields + if 'timestamp' in cache_data and 'data' in cache_data: + # Parse cache timestamp + cache_time = datetime.fromisoformat(cache_data['timestamp']) + current_time = datetime.now() + + # Check if cache is still valid + if current_time - cache_time < timedelta(hours=cache_hours): + print(f"Using cached data from {cache_data['timestamp']}") + return cache_data['data'] + else: + print(f"Cache expired (created at {cache_data['timestamp']})") + else: + print("Invalid cache format, will refresh data") + + except (json.JSONDecodeError, ValueError, KeyError) as e: + print(f"Error reading cache file: {e}, will refresh data") + else: + print("Cache file not found, will fetch new data") + + # Fetch new data from akshare + try: + print("Fetching latest stock information...") + stock_df = ak.stock_info_a_code_name() + + # Convert DataFrame to list of dictionaries + stock_list = stock_df.to_dict('records') + + # Prepare cache data structure + cache_data = { + 'timestamp': datetime.now().isoformat(), + 'data': stock_list + } + + # Save to cache file + with open(cache_file, 'w', encoding='utf-8') as f: + json.dump(cache_data, f, ensure_ascii=False, indent=2) + + print(f"Data cached successfully to {cache_file}") + print(f"Total stocks: {len(stock_list)}") + + return stock_list + + except Exception as e: + print(f"Failed to fetch new data: {e}") + + # Try to use old cache if exists + if os.path.exists(cache_file): + try: + with open(cache_file, 'r', encoding='utf-8') as f: + old_cache = json.load(f) + print("Using old cache data due to fetch failure") + return old_cache['data'] + except Exception as cache_error: + print(f"Failed to load old cache: {cache_error}") + + # Re-raise the original exception if no fallback available + raise e + +def get_stock_info_hk_cached(cache_file: str = 'stock_info_hk_cache.json', cache_hours: int = default_cache_hours): + """ + Get Hong Kong stock basic information with local JSON cache support. + + Args: + cache_file (str): Path to the cache file. Defaults to 'stock_info_hk_cache.json'. + cache_hours (int): Cache expiration time in hours. Defaults to default_cache_hours. + + Returns: List of HK stock information [{'code': str, 'name': str}, ...] + + Raises: + Exception: If failed to fetch data and no valid cache exists. + """ + + # Check if cache file exists + if os.path.exists(cache_file): + try: + # Load cache file + with open(cache_file, 'r', encoding='utf-8') as f: + cache_data = json.load(f) + + # Check if cache has required fields + if 'timestamp' in cache_data and 'data' in cache_data: + # Parse cache timestamp + cache_time = datetime.fromisoformat(cache_data['timestamp']) + current_time = datetime.now() + + # Check if cache is still valid + if current_time - cache_time < timedelta(hours=cache_hours): + print(f"Using cached HK stock data from {cache_data['timestamp']}") + return cache_data['data'] + else: + print(f"HK stock cache expired (created at {cache_data['timestamp']})") + else: + print("Invalid HK stock cache format, will refresh data") + + except (json.JSONDecodeError, ValueError, KeyError) as e: + print(f"Error reading HK stock cache file: {e}, will refresh data") + else: + print("HK stock cache file not found, will fetch new data") + + # Fetch new data from akshare + try: + print("Fetching latest HK stock information...") + stock_df = ak.stock_hk_spot_em() + + # Extract and rename columns to maintain consistency + if '代码' in stock_df.columns and '名称' in stock_df.columns: + stock_basic_df = stock_df[['代码', '名称']].copy() + stock_basic_df.columns = ['code', 'name'] + else: + # Fallback: try different column names + possible_code_cols = ['代码', 'code', 'symbol', '股票代码'] + possible_name_cols = ['名称', 'name', '股票名称', '公司名称'] + + code_col = None + name_col = None + + for col in possible_code_cols: + if col in stock_df.columns: + code_col = col + break + + for col in possible_name_cols: + if col in stock_df.columns: + name_col = col + break + + if code_col and name_col: + stock_basic_df = stock_df[[code_col, name_col]].copy() + stock_basic_df.columns = ['code', 'name'] + else: + raise ValueError(f"Cannot find code/name columns in HK stock data. Available columns: {list(stock_df.columns)}") + + # Convert DataFrame to list of dictionaries + stock_list = stock_basic_df.to_dict('records') + + # Clean and format data + cleaned_stock_list = [] + for stock in stock_list: + code = str(stock.get('code', '')).strip() + name = str(stock.get('name', '')).strip() + + # Skip empty records + if code and name and code != 'nan' and name != 'nan': + cleaned_stock_list.append({ + 'code': code, + 'name': name + }) + + # Prepare cache data structure + cache_data = { + 'timestamp': datetime.now().isoformat(), + 'market': 'HK', + 'data': cleaned_stock_list + } + + # Save to cache file + with open(cache_file, 'w', encoding='utf-8') as f: + json.dump(cache_data, f, ensure_ascii=False, indent=2) + + print(f"HK stock data cached successfully to {cache_file}") + print(f"Total HK stocks: {len(cleaned_stock_list)}") + + return cleaned_stock_list + + except Exception as e: + print(f"Failed to fetch new HK stock data: {e}") + + # Try to use old cache if exists + if os.path.exists(cache_file): + try: + with open(cache_file, 'r', encoding='utf-8') as f: + old_cache = json.load(f) + print("Using old HK stock cache data due to fetch failure") + return old_cache['data'] + except Exception as cache_error: + print(f"Failed to load old HK stock cache: {cache_error}") + + # Re-raise the original exception if no fallback available + raise e + +def get_stock_info_us_cached(cache_file: str = 'stock_info_us_cache.json', cache_hours: int = default_cache_hours): + """ + Get US stock basic information with local JSON cache support. + + Args: + cache_file (str): Path to the cache file. Defaults to 'stock_info_us_cache.json'. + cache_hours (int): Cache expiration time in hours. Defaults to default_cache_hours. + + Returns: List of US stock information [{'code': str, 'name': str}, ...] + + Raises: + Exception: If failed to fetch data and no valid cache exists. + """ + + # Check if cache file exists + if os.path.exists(cache_file): + try: + # Load cache file + with open(cache_file, 'r', encoding='utf-8') as f: + cache_data = json.load(f) + + # Check if cache has required fields + if 'timestamp' in cache_data and 'data' in cache_data: + # Parse cache timestamp + cache_time = datetime.fromisoformat(cache_data['timestamp']) + current_time = datetime.now() + + # Check if cache is still valid + if current_time - cache_time < timedelta(hours=cache_hours): + print(f"Using cached US stock data from {cache_data['timestamp']}") + return cache_data['data'] + else: + print(f"US stock cache expired (created at {cache_data['timestamp']})") + else: + print("Invalid US stock cache format, will refresh data") + + except (json.JSONDecodeError, ValueError, KeyError) as e: + print(f"Error reading US stock cache file: {e}, will refresh data") + else: + print("US stock cache file not found, will fetch new data") + + # Fetch new data from akshare + try: + print("Fetching latest US stock information...") + stock_df = ak.stock_us_spot_em() + + # Extract and rename columns to maintain consistency + if '代码' in stock_df.columns and '名称' in stock_df.columns: + stock_basic_df = stock_df[['代码', '名称']].copy() + stock_basic_df.columns = ['code', 'name'] + else: + # Fallback: try different column names + possible_code_cols = ['代码', 'code', 'symbol', '股票代码', 'Symbol'] + possible_name_cols = ['名称', 'name', '股票名称', '公司名称', 'Name'] + + code_col = None + name_col = None + + for col in possible_code_cols: + if col in stock_df.columns: + code_col = col + break + + for col in possible_name_cols: + if col in stock_df.columns: + name_col = col + break + + if code_col and name_col: + stock_basic_df = stock_df[[code_col, name_col]].copy() + stock_basic_df.columns = ['code', 'name'] + else: + raise ValueError(f"Cannot find code/name columns in US stock data. Available columns: {list(stock_df.columns)}") + + # Convert DataFrame to list of dictionaries + stock_list = stock_basic_df.to_dict('records') + + # Clean and format data + cleaned_stock_list = [] + for stock in stock_list: + code = str(stock.get('code', '')).strip() + name = str(stock.get('name', '')).strip() + + # Skip empty records + if code and name and code != 'nan' and name != 'nan': + cleaned_stock_list.append({ + 'code': code, + 'name': name + }) + + # Prepare cache data structure + cache_data = { + 'timestamp': datetime.now().isoformat(), + 'market': 'US', + 'data': cleaned_stock_list + } + + # Save to cache file + with open(cache_file, 'w', encoding='utf-8') as f: + json.dump(cache_data, f, ensure_ascii=False, indent=2) + + print(f"US stock data cached successfully to {cache_file}") + print(f"Total US stocks: {len(cleaned_stock_list)}") + + return cleaned_stock_list + + except Exception as e: + print(f"Failed to fetch new US stock data: {e}") + + # Try to use old cache if exists + if os.path.exists(cache_file): + try: + with open(cache_file, 'r', encoding='utf-8') as f: + old_cache = json.load(f) + print("Using old US stock cache data due to fetch failure") + return old_cache['data'] + except Exception as cache_error: + print(f"Failed to load old US stock cache: {cache_error}") + + # Re-raise the original exception if no fallback available + raise e + + +def get_stock_individual_info(stock_code: str, stock_share: str): + """ + Get individual stock information based on stock market type. + + Args: + stock_code (str): Stock code/symbol + stock_share (str): Stock market type - 'A-Share', 'HK-Stock', 'US-Stock' + + Returns: Stock individual information DataFrame, None if failed + + Raises: + ValueError: If unsupported stock_share type is provided + Exception: If API call fails + """ + + try: + if stock_share == 'A-Share': + # A 股个股信息查询 - 东方财富 + print(f"Fetching A-Share info for {stock_code}") + return ak.stock_individual_info_em(symbol=stock_code) + + elif stock_share == 'HK-Stock': + # 港股个股信息查询 - 雪球 + print(f"Fetching HK stock info for {stock_code}") + return ak.stock_individual_basic_info_hk_xq(symbol=stock_code) + + elif stock_share == 'US-Stock': + # 美股个股信息查询 - 雪球 + print(f"Fetching US stock info for {stock_code}") + return ak.stock_individual_basic_info_us_xq(symbol=stock_code) + + else: + raise ValueError(f"Unsupported stock_share type: {stock_share}. " + f"Supported types: 'A-Share', 'HK-Stock'") + + except Exception as e: + print(f"Error fetching stock info for {stock_code} ({stock_share}): {e}") + return None + + +def get_stock_quote(stock_code: str, stock_share: str): + """ + Get stock real-time quote based on stock market type. + + Args: + stock_code (str): Stock code/symbol + stock_share (str): Stock market type - 'A-Share' or 'HK-Stock' + + Returns: + Optional[pd.Series]: Stock quote information as Series, None if failed + + Raises: + ValueError: If unsupported stock_share type is provided + """ + + try: + if stock_share == 'A-Share': + # A 股实时行情 + print(f"Fetching A-Share quote for {stock_code}") + spot_df = ak.stock_zh_a_spot_em() + + # Filter by stock code + quote_data = spot_df[spot_df['代码'] == stock_code] + if not quote_data.empty: + return quote_data.iloc[0] # Return first match as Series + else: + print(f"No A-Share data found for code: {stock_code}") + return None + + elif stock_share == 'HK-Stock': + # 港股实时行情 + print(f"Fetching HK stock quote for {stock_code}") + spot_df = ak.stock_hk_spot_em() + + # Filter by stock code + quote_data = spot_df[spot_df['代码'] == stock_code] + if not quote_data.empty: + return quote_data.iloc[0] # Return first match as Series + else: + print(f"No HK stock data found for code: {stock_code}") + return None + + else: + raise ValueError(f"Unsupported stock_share type: {stock_share}. " + f"Supported types: 'A-Share', 'HK-Stock'") + + except Exception as e: + print(f"Error fetching quote for {stock_code} ({stock_share}): {e}") + return None