Name
暴雪网格-724小时交易机器人每日更新实盘效果
Author
红色的雪
Strategy Arguments
Argument | Default | Description |
---|---|---|
base_price | 3 | 基础价格-中线价格 |
sale_buy_diff | 0.03 | 买卖差价,卖出的价格- 买入的价格 |
two_distance | 0.01 | 递增、递减的时候,每个格子的距离 |
trade_amount | 2 | 每次交易的数量 |
sale_price_max | 5 | 卖出的上限价格 |
buy_price_min | true | 买入的上限价格 |
Source (python)
#!python2
# -*- coding:utf-8 -*-
'''
主要是使用发明者量化API进行网格买卖,当前只支持单品网格买卖
'''
from time import sleep
import datetime,copy
sale_price_list = [] #卖出的价格列表
buy_price_list = [] #买入的价格列表
class fmz_market():
def get_data_depth(self):
data_depth = exchange.GetDepth()
return data_depth
#检查当前是否可以进行买卖操作
def make_trade_check(self,symbol):
trade_infor = {'price':0,'trade_type':''}
#进行买卖列表判断,先更新交易记录列表
trade_price_list = self.get_trade_price_list(symbol)
sale_price_list = trade_price_list[0]
buy_price_list = trade_price_list[1]
#获取深度数据
data_depth = self.get_data_depth()
#买单列表:
data_depth_bids = data_depth.Bids[0]
#卖单列表:
data_depth_asks = data_depth.Asks[0]
#如果买入记录不为空
sale_buy_diff_now = two_distance*len(sale_price_list) if len(sale_price_list) >0 else sale_buy_diff
sale_buy_diff_sale = two_distance if len(sale_price_list) > 0 else sale_buy_diff
# sale_price_last = float(sale_price_list[len(sale_price_list)-1]) if len(sale_price_list) >0 else base_price
# buy_price_last = float(buy_price_list[len(buy_price_list)-1]) if len(buy_price_list) >0 else base_price
sale_price_last = float(sale_price_list[0]) if len(sale_price_list) > 0 and float(sale_price_list[0]) > base_price else base_price
buy_price_last = float(buy_price_list[0]) if len(buy_price_list) > 0 and float(buy_price_list[0]) < base_price else base_price
#判断当前价格是否满足 卖出的价格请求
if float(data_depth_bids.Price) - sale_price_last > sale_buy_diff_sale and float(data_depth_bids.Amount) > trade_amount * 1.5:
Log("当前卖价:",str(data_depth_bids.Price),"订单中最高卖价:",str(sale_price_last),"生成卖单")
trade_infor['price'] = float(data_depth_bids.Price)
trade_infor['trade_type'] = 'sale'
#判断当前价格是否满足 买入的价格请求
if buy_price_last - float(data_depth_asks.Price) > sale_buy_diff_now and float(data_depth_bids.Amount) > trade_amount * 1.5:
#Log("当前买价:", str(data_depth_asks.Price), "订单中最高买价:", str(sale_price_last),"生成买单")
trade_infor['price'] = float(data_depth_bids.Price)
trade_infor['trade_type'] = 'buy'
#判断当前价格是否破格,破格则置空买卖信息
if float(data_depth_bids.Price) - sale_price_max > 0 or buy_price_min - float(data_depth_asks.Price) > 0:
trade_infor['price'] = 0
trade_infor['trade_type'] = ''
timestr = (datetime.datetime.now()).strftime('%Y-%m-%d %H:%M:%S')
Log(trade_infor,"...time:",timestr)
if trade_infor['price'] != 0:
#Log(trade_infor,"...time:",timestr)
pass
return trade_infor
#根据委托信息生成买卖价格列表
def get_trade_price_list(self,symbol):
sale_list = []
buy_list = []
#获取所有的交易记录,根据不同的类型 分配到 买卖列表中
orders = exchange.GetOrders()
for i in range(len(orders)):
if orders[i].Type == 1:
sale_price = float(orders[i].Price)
sale_price_bak = copy.deepcopy(sale_price)
sale_list.append(sale_price_bak)
if orders[i].Type == 0:
buy_price = float(orders[i].Price)
buy_price_bak = copy.deepcopy(buy_price)
buy_list.append(buy_price_bak)
#判断为0的数组进处理
if len(sale_list) == 0:
for i in range(len(buy_list)):
sale_list.append(float('%.6f' % (buy_list[i] + sale_buy_diff)))
if len(buy_list) == 0:
for i in range(len(sale_list)):
buy_list.append(float('%.6f' % (sale_list[i] - sale_buy_diff)))
trade_price_list = [sale_list,buy_list]
return trade_price_list
#网格交易入口:
def grid_trade_start(self,symbol):
#进行状态获取,上涨/下跌
# trend_status = self.kline_trend_check(symbol)
#获取可否进行交易
trade_infor = self.make_trade_check(symbol)
#进行判断是否交易,判断是否可以卖出
# if trend_status == 'is_up' and trade_infor['price'] > 0 and trade_infor['trade_type'] == 'sale':
if trade_infor['price'] > 0 and trade_infor['trade_type'] == 'sale':
buy_price = float('%.6f' % (trade_infor['price'] - sale_buy_diff))
#调用下单功能,先调用卖,再调用买
order_id = exchange.Sell(trade_infor['price'], trade_amount)
#Log('order_id:',order_id)
#检查下单是否成功,不成功,则直接返回
if order_id is None:
Log("下单失败,等待600s继续.........")
sleep(600)
return 0
#检查主交易是否成功:判断卖单单是否交易成功,交易成功则进行买单下单
for i in range(100):
sale_orders = exchange.GetOrder(order_id)
#Log('sale_orders:',sale_orders)
if sale_orders.Status == 1:
#如果卖单已成交,则进行买单提交
exchange.Buy(buy_price, trade_amount)
return 0
sleep(10)
#如果循环1000次还未成交,则取消订单
exchange.CancelOrder(order_id)
# 进行判断是否交易,判断是否可以买入
if trade_infor['price'] > 0 and trade_infor['trade_type'] == 'buy':
sale_price = float('%.6f' % (trade_infor['price'] + sale_buy_diff))
# 调用下单功能,先调用买入,再调用卖出
order_id = exchange.Buy(trade_infor['price'], trade_amount)
# 检查下单是否成功,不成功,则直接返回
if order_id is None:
#Log("下单失败,等待600s继续.........")
sleep(600)
return 0
# 检查主交易是否成功:判断买单是否交易成功,交易成功则进行卖单下单
for i in range(100):
buy_orders = exchange.GetOrder(order_id)
if buy_orders.Status == 1:
# 如果买单已成交,则进行卖单提交
exchange.Sell(sale_price, trade_amount)
return 0
sleep(10)
# 如果循环1000次还未成交,则取消订单
exchange.CancelOrder(order_id)
#进行循环调用
def grid_trade_cycle(self,symbol):
cycle_num = 0
while(True):
timestr = (datetime.datetime.now()).strftime('%H%M%S')
self.grid_trade_start(symbol)
sleep(10)
cycle_num = cycle_num + 1
if cycle_num % 100 == 0:
account_infor = exchange.GetAccount()
Log("当前用户的账号信息:%s,....当前已循环检查次数:%s"%(account_infor,str(cycle_num)))
def main():
Log(exchange.GetAccount())
Log("测试")
fmz_market_instances = fmz_market()
fmz_market_instances.grid_trade_cycle(100)
#order_id = exchange.Sell(10000,1)
Detail
https://www.fmz.com/strategy/175807
Last Modified
2020-03-12 13:53:43