# 导入QMT核心模块
from xtquant import xtdata
from xtquant.xttrader import XtQuantTrader
from xtquant.xttype import StockAccount
# 策略参数设置
TRADE_VOLUME = 1000 # 每笔交易股数
ST_LIMIT_RATE = 1.10 # 涨停幅度(10%)
VOLUME_RATIO = 2.0 # 量能倍数
MIN_SEAL_AMOUNT = 5000000 # 最小封单金额(5百万)
def calculate_limit_price(pre_close):
"""计算涨停价(精确到分)"""
return round(pre_close * ST_LIMIT_RATE + 1e-8, 2) # 处理浮点精度问题
def on_tick(context):
"""实时行情回调函数"""
for code in context.watch_list:
# 获取全量tick数据
tick = xtdata.get_full_tick(code)
if not tick: continue
# 基础数据获取
pre_close = tick['preClosePrice']
current_price = tick['lastPrice']
bid1_vol = tick['bidVolumes'][0] # 买一量
ask1_vol = tick['askVolumes'][0] # 卖一量
# 计算当日成交量
today_vol = tick['amount'] / tick['lastPrice'] if tick['lastPrice'] > 0 else 0
# 计算历史5日平均成交量
his_data = xtdata.get_market_data(stock_list=[code], period='1d',
start_time='', end_time='',
count=5)
avg_vol = his_data.getVolume().mean()
# 计算涨停价
limit_price = calculate_limit_price(pre_close)
# ---------策略条件判断---------
# 条件1:当前价格触及涨停
condition1 = abs(current_price - limit_price) < 0.01 # 考虑精度误差
# 条件2:早盘涨停(10:30前)
condition2 = xtdata.get_market_time() < 103000
# 条件3:量能突破
condition3 = today_vol > avg_vol * VOLUME_RATIO
# 条件4:大单封板(买一量远大于卖一量)
condition4 = bid1_vol > ask1_vol * 10 and bid1_vol * current_price > MIN_SEAL_AMOUNT
if all([condition1, condition2, condition3, condition4]):
# 执行买入操作
xt_trader.order_stock(context.account, code, xtconstant.STOCK_BUY,
TRADE_VOLUME, xtconstant.FIX_PRICE,
limit_price, '打板策略')
print(f"触发打板买入{code} @{current_price}")
# 主程序
if __name__ == '__main__':
# 初始化交易接口
xt_trader = XtQuantTrader('path/to/your/qmt', 1)
acc = StockAccount('账号类型', '账号ID')
# 订阅标的
context = {
'account': acc,
'watch_list': ['000001.SZ', '600000.SH'] # 监控标的列表
}
# 订阅实时行情
xtdata.subscribe_whole_quote(context.watch_list)
xtdata.run()版权声明
本文仅代表作者观点,不代表牛人量化交易网立场。
本文系作者授权牛人量化交易网发表,未经许可,不得转载。




评论列表
发表评论