Python分布式异步任务队列库-Celery学习-01

最近在项目上遇到了单机性能瓶颈,故趁此机会准备让项目中耗时的部分升级为分布式处理。考虑如果用C++来进行开发的话周期太长,而且很容易出现问题,于是乎初步选择了 Python 和Celery库来解决问题。

注:本文将以Celery 4.X版本为基础。

1、Celery 简介

它是一个异步任务调度工具,用户使用 Celery 产生任务,借用中间人来传递任务,任务执行单元从中间人那里消费任务。任务执行单元可以单机部署,也可以分布式部署,因此 Celery 是一个高可用的生产者消费者模型的异步任务队列。你可以将你的任务交给 Celery 处理,也可以让 Celery 自动按 crontab 那样去自动调度任务,然后去做其他事情,你可以随时查看任务执行的状态,也可以让 Celery 执行完成后自动把执行结果告诉你。

2、Celery 基本概念

任务生产者 :调用Celery提供的API,函数,装饰器而产生任务并交给任务队列的都是任务生产者。

任务执行单元(Worker):Worker是Celery提供的任务执行的单元,Worker并发的运行在分布式的系统节点中

中间人(Broker):Celery 中通常使用中间人(Broker)在客户端和工作者(Worker)之间传递。在任务生产者向队列添加消息后Broker把消息传递给 Worker。

后端(Backend):Celery 中用于储存Worker执行完毕后的结果。

官方给出的实现Broker的工具包括:

NameStatusMonitoringRemote Control
RabbitMQStableYesYes
RedisStableYesYes
Amazon SQSStableNoNo
ZookeeperExperimentalNoNo

更多信息可参阅:http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html

3、Celery 安装

正常来说安装是很简单的只需要pip install一下就可以了。

pip install celery[librabbitmq,redis,auth,msgpack]

#注:在 Python3.7 上 celery 4.2.0 之前的版本async模块和关键字async起了冲突,所以可能会提示错误。
#from . import async, base 
#SyntaxError: invalid syntax
#详情参见 : https://github.com/celery/celery/issues/4500

#解决方案:使用github上的最新版本
pip install --upgrade https://github.com/celery/celery/archive/master.zip

4、第一个Celery应用程序

这里我们选择使用 Redis 作为中间人和后端。

由于Redis使用的非常广泛,介绍也很多,这里不单独讨论了。

4.1、Worker

作为分布式的程序当然需要有具体干活的代码,Worker就是这个干活的代码。

这里直接贴出代码,可直接保存为add_tasks.py。

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

from celery import Celery
import time
import os
import platform


def CeleryPlatformSet():
    #使用platform来判断操作系统类型
    sysstr = platform.system()
    if(sysstr =="Windows"):
        os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
    elif(sysstr == "Linux"):
        pass
    else:
        pass

CeleryPlatformSet()

#'redis://:[email protected]:port/db'
app = Celery('tasks', broker='redis://:[email protected]:6379/0',backend ='redis://:[email protected]:6379/0' )

@app.task
def add(x, y):
    #模拟耗时操作
    time.sleep(30)
    s = x + y
    print("sum is " + str(s))
    return s

if __name__ == '__main__':
    app.worker_main()

这个例子里添加了一个最基本的工作函数add,用来计算两个参数的和。

唯一需要注意的一点是,由于Windows下没有fork函数,所以如果Celery是在Windows下运行需要设置环境变量”FORKED_BY_MULTIPROCESSING”为1.

cmd下可以使用命令进行设置 (关闭后失效)
set FORKED_BY_MULTIPROCESSING=1

这里例子中直接使用了os包自动根据操作系统来设置。

启动worker可以使用命令celery -A add_tasks worker -l info

或者:python add_tasks .py worker

如果有疑问可以使用 celery –help 命令来查看celery命令的帮助文档。

4.2、生产者

有了完成任务的工作者当然也还需要一个负责添加任务的生产者。

下面是生产者的代码。命名为start_tasks.py。

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

#导入任务函数add
from add_tasks import add 
import time


def main():
    #delay是异步调用,不会阻塞,程序会立即往下运行
    result = add.delay(12,12) 
    # 循环检查任务是否执行完毕
    while not result.ready():
        print(time.strftime('%H:%M:%S'))
        #每秒只检查一次
        time.sleep(1)
    #判断任务是否成功执行
    print(result.successful())
    #获取任务的返回结果
    print(result.get())

    
if __name__ == '__main__':
    main()

代码很简单,注释也写的比较清楚,这里不做过多的解释。保存为文件后可直接运行,通过输出我们可以观察到执行的过程和结果。
如果需要更多的工作者,可以把worker拷贝到更多计算机并行运行,需要注意的是IP是否正确,redis这样的中间人是否能够被其他计算机正确访问。

友情提示:redis默认无密码,直接允许公网访问有极大风险,可能会直接导致服务器被入侵。请务必设置一个强大难以破解的密码并设置防火墙阻止他人访问。

是时候该抛弃GSM网络了-新型伪基站诈骗法的防范

近日 江宁公安在线 在微博上发布了一条引发激烈舆论的微博( https://weibo.com/1113218211/GsVOZ1hwZ ),文中描述了一种新型的伪基站诈骗法,能够嗅探用户的短信内容。其中写到“此类劫持和嗅探并不仅限于 GSM 手机,包括 LTE,CDMA 类的 4G 手机也可能会受到相应威胁。”以及“一个坏消息是,此技术目前基本上没有办法防范。”

攻击原理:
首先我们得需要了解,在中国大陆境内主要的电信运营商有3家,分别是中国电信,中国联通,中国移动。其中中国电信的2G网络制式为CDMA,其余两家的2G网络制式都为GSM,本文讨论的攻击方式仅针对GSM,所以中国电信的用户可以忽略本文。
GSM网络比较古老,采用了单向鉴权的用户认证方式以及弱加密方式(甚至于无加密)存在比较大的安全问题。在网络上随手一搜即可搜到很多关于GSM嗅探的文章,并且攻击成本十分的低廉。
具体的内容大家有兴趣可以自行搜索,这里我只讲一个大概。
当用户手机以GSM网络连接到信号基站的时候,如果有短信需要发送给基站上的某个用户,基站会把短信以广播的方式通知在使用此基站的所有终端,也就是说如果有任意攻击者连接到基站,就可以收到所有通过此基站传送的短信。
虽然现代手机大多数都会默认使用3G或者4G网络进行通信,但是在遇到信号不好或者其他情况的时候往往手机会为了保障通信自动降级为GSM网络进行通信,所以就产生了很严重的安全问题。

解决方案:
针对部分Android手机用户可以在操作系统中选择拒绝使用GSM网络来预防攻击。
以LineageOS为例,联通用户可以在移动网络设置中选择仅使用WCDMA和LTE网络。
移动用户可以在移动网络设置中选择仅使用TD-SCDMA和LTE网络。

注:部分手机的操作系统可能不提供SIM卡的移动网络设置功能。可以尝试在拨号界面输入*#*#4636#*#会弹出手机测试的界面,找到手机信息菜单,设置首选网络类型(移动用户可以选TD-SCDMA/LTE,联通用户可以选LTE/WCDMA)这样即可屏蔽GSM网络。

PC上通过Proxy使用Line

最近非常的不太平,而微信的安全性和私密性始终无法让人满意,所以逼不得已只能转而去使用各种由非大陆提供的聊天软件。
本人也试用过一些聊天软件,目前来看Line还是比较让人满意的一种选择。
唯独遗憾的是Line的PC客户端在Windows 8或者Windows 10上工作时软件提供的代理(Proxy)功能运行并不是很正常甚至因为DNS污染的原因无法正常登陆使用,经过本人尝试在这里提出几个解决方法。

方法1、全平台适用,无论是Linux用户还是Windows又或者Mac OS都可以使用Google Chrome版本的Line来登陆。
这个方法非常的简单,安装Google Chrome以后设置好代理,在访问Chrome商店安装Line提供的插件即可。
对于主要使用Chrome浏览器的人来说,如果设置全局代理不太方便,可以通过安装SwitchyOmega等插件的方式来管理代理。

方法2、通过Proxifier来代理Line客户端
由于Chrome版本提供的功能比较简单,还是需要使用完整版的Line客户端,这里也提供一种解决方法。
Proxifier是一款功能非常强大的代理软件,可以让不支持通过代理服务器工作的网络程序能通过HTTPS或SOCKS代理访问网络。
首先下载Line客户端,这里不细说了。
官网地址:https://line.me/zh-hans/download
接着需要安装Proxifier(Proxifier为付费软件,建议支持正版)
官网地址:http://www.proxifier.com/
安装完成之后
需要打开主界面找到Profile菜单找到Proxy Server设置代理服务器。
如图所示:


我本人使用的是V2ray,所以选择了SOCKS5,大家需要根据自己使用的代理填写。
接着需要设置使用代理的程序
需要打开主界面找到Profile菜单找到Proxification Rules

单击后会弹出如下的对话框

如上图所示,这里我们设置了V2ray等软件不通过代理直接连接,不匹配任何规则的直接连接,Line客户端通过设置的代理连接。
接着有一点需要注意,一般的软件到这里就可以通过代理上网了,但是由于Line所使用的域名被DNS污染了无法得到正确的结果,所以还需要设置DNS全部走代理。
需要打开主界面找到Profile菜单找到Name Resolution

弹出的对话框中,勾选通过代理解析主机名

确定后即可愉快的使用Line了。

注意,Proxifier退出后则不会继续走代理,所以每次使用Line前需要先启动Proxifier。
如果嫌弃配置太过麻烦,可以直接使用我提供的配置文件,导入Proxifier后只需要修改一下代理服务器的地址和端口即可。
导入方法file菜单选择import即可

PPX文件下载地址https://www.exvs.org/wp-content/uploads/2018/07/Line_2D6C1EB6.ppx

傻瓜策略-PE_PB优化指数投资法

策略基准:沪深300指数

测试时间:2013-04-01至2018-03-01

策略内容:每隔25个交易日进行一次选股,在全部A股中选出符合如下条件的股票,按照等权比例买入30只股票(如果不满30只则按照实际数量买入,始终保持仓位为100%)。

选股条件:
排除ST,停牌,即将退市股票

市净率小于2

动态和静态市盈率均低于30

净资产收益率(ROE)大于3%

回测结果:

策略收益 146.59%
平均年化收益 20.74%
基准收益 62.28%
最大回撤 27.795%

策略代码:
(策略代码基于生成器生成)

from kuanke.wizard import *
from jqdata import *
import numpy as np
import pandas as pd
import talib
import datetime

## 初始化函数,设定要操作的股票、基准等等
def initialize(context):
    # 设定基准
    set_benchmark('000300.XSHG')
    # 设定滑点
    set_slippage(FixedSlippage(0.02))
    # True为开启动态复权模式,使用真实价格交易
    set_option('use_real_price', True) 
    # 设定成交量比例
    set_option('order_volume_ratio', 1)
    # 股票类交易手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
    set_order_cost(OrderCost(open_tax=0, close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
    # 个股最大持仓比重
    g.security_max_proportion = 1
    # 选股频率
    g.check_stocks_refresh_rate = 25
    # 买入频率
    g.buy_refresh_rate = 1
    # 卖出频率
    g.sell_refresh_rate = 1
    # 最大建仓数量
    g.max_hold_stocknum = 30

    # 选股频率计数器
    g.check_stocks_days = 0 
    # 买卖交易频率计数器
    g.buy_trade_days=0
    g.sell_trade_days=0 
    # 获取未卖出的股票
    g.open_sell_securities = [] 
    # 卖出股票的dict
    g.selled_security_list={}
    
    # 股票筛选初始化函数
    check_stocks_initialize()
    # 股票筛选排序初始化函数
    check_stocks_sort_initialize()
    # 出场初始化函数
    sell_initialize()
    # 入场初始化函数
    buy_initialize()
    # 风控初始化函数
    risk_management_initialize()

    # 关闭提示
    log.set_level('order', 'error')

    # 运行函数
    run_daily(sell_every_day,'open') #卖出未卖出成功的股票
    run_daily(risk_management, 'every_bar') #风险控制
    run_daily(check_stocks, 'open') #选股
    run_daily(trade, 'open') #交易  
    run_daily(selled_security_list_count, 'after_close') #卖出股票日期计数 
      

## 股票筛选初始化函数
def check_stocks_initialize():
    # 是否过滤停盘
    g.filter_paused = True
    # 是否过滤退市  
    g.filter_delisted = True
    # 是否只有ST
    g.only_st = False
    # 是否过滤ST
    g.filter_st = True
    # 股票池
    g.security_universe_index = ["all_a_securities"]
    g.security_universe_user_securities = []
    # 行业列表
    g.industry_list = []
    # 概念列表
    g.concept_list = []

## 股票筛选排序初始化函数
def check_stocks_sort_initialize():
    # 总排序准则: desc-降序、asc-升序
    g.check_out_lists_ascending = 'desc'

## 出场初始化函数
def sell_initialize():
    # 设定是否卖出buy_lists中的股票
    g.sell_will_buy = False

    # 固定出仓的数量或者百分比
    g.sell_by_amount = None
    g.sell_by_percent = None

## 入场初始化函数
def buy_initialize():
    # 是否可重复买入
    g.filter_holded = False

    # 委托类型
    g.order_style_str = 'by_cap_mean'
    g.order_style_value = 100

## 风控初始化函数
def risk_management_initialize():
    # 策略风控信号
    g.risk_management_signal = True

    # 策略当日触发风控清仓信号
    g.daily_risk_management = True

    # 单只最大买入股数或金额
    g.max_buy_value = None
    g.max_buy_amount = None


## 卖出未卖出成功的股票
def sell_every_day(context):
    g.open_sell_securities = list(set(g.open_sell_securities))
    open_sell_securities = [s for s in context.portfolio.positions.keys() if s in g.open_sell_securities]
    if len(open_sell_securities)>0:
        for stock in open_sell_securities:
            order_target_value(stock, 0)
    g.open_sell_securities = [s for s in g.open_sell_securities if s in context.portfolio.positions.keys()]
    return

## 风控
def risk_management(context):
    ### _风控函数筛选-开始 ###
    ### _风控函数筛选-结束 ###
    return

## 股票筛选
def check_stocks(context):
    if g.check_stocks_days%g.check_stocks_refresh_rate != 0:
        # 计数器加一
        g.check_stocks_days += 1
        return
    # 股票池赋值
    g.check_out_lists = get_security_universe(context, g.security_universe_index, g.security_universe_user_securities)
    # 行业过滤
    g.check_out_lists = industry_filter(context, g.check_out_lists, g.industry_list)
    # 概念过滤
    g.check_out_lists = concept_filter(context, g.check_out_lists, g.concept_list)
    # 过滤ST股票
    g.check_out_lists = st_filter(context, g.check_out_lists)
    # 过滤退市股票
    g.check_out_lists = delisted_filter(context, g.check_out_lists)
    # 财务筛选
    g.check_out_lists = financial_statements_filter(context, g.check_out_lists)
    # 行情筛选
    g.check_out_lists = situation_filter(context, g.check_out_lists)
    # 技术指标筛选
    g.check_out_lists = technical_indicators_filter(context, g.check_out_lists)
    # 形态指标筛选函数
    g.check_out_lists = pattern_recognition_filter(context, g.check_out_lists)
    # 其他筛选函数
    g.check_out_lists = other_func_filter(context, g.check_out_lists)

    # 排序
    input_dict = get_check_stocks_sort_input_dict()
    g.check_out_lists = check_stocks_sort(context,g.check_out_lists,input_dict,g.check_out_lists_ascending)

    # 计数器归一
    g.check_stocks_days = 1
    return

## 交易函数
def trade(context):
   # 初始化买入列表
    buy_lists = []

    # 买入股票筛选
    if g.buy_trade_days%g.buy_refresh_rate == 0:
        # 获取 buy_lists 列表
        buy_lists = g.check_out_lists
        # 过滤ST股票
        buy_lists = st_filter(context, buy_lists)
        # 过滤停牌股票
        buy_lists = paused_filter(context, buy_lists)
        # 过滤退市股票
        buy_lists = delisted_filter(context, buy_lists)
        # 过滤涨停股票
        buy_lists = high_limit_filter(context, buy_lists)

        ### _入场函数筛选-开始 ###
        ### _入场函数筛选-结束 ###

    # 卖出操作
    if g.sell_trade_days%g.sell_refresh_rate != 0:
        # 计数器加一
        g.sell_trade_days += 1
    else:
        # 卖出股票
        sell(context, buy_lists)
        # 计数器归一
        g.sell_trade_days = 1


    # 买入操作
    if g.buy_trade_days%g.buy_refresh_rate != 0:
        # 计数器加一
        g.buy_trade_days += 1
    else:
        # 卖出股票
        buy(context, buy_lists)
        # 计数器归一
        g.buy_trade_days = 1

## 卖出股票日期计数
def selled_security_list_count(context):
    g.daily_risk_management = True
    if len(g.selled_security_list)>0:
        for stock in g.selled_security_list.keys():
            g.selled_security_list[stock] += 1

##################################  选股函数群 ##################################

## 财务指标筛选函数
def financial_statements_filter(context, security_list):
    ### _财务指标筛选函数-开始 ###
    security_list = financial_data_filter_xiaoyu(security_list, valuation.pb_ratio, 2)
    security_list = financial_data_filter_xiaoyu(security_list, valuation.pe_ratio, 30)
    security_list = financial_data_filter_xiaoyu(security_list, valuation.pe_ratio_lyr, 30)
    security_list = financial_data_filter_dayu(security_list, indicator.roe, 3)
    ### _财务指标筛选函数-结束 ###

    # 返回列表
    return security_list

## 行情筛选函数
def situation_filter(context, security_list):
    ### _行情筛选函数-开始 ###
    ### _行情筛选函数-结束 ###

    # 返回列表
    return security_list

## 技术指标筛选函数
def technical_indicators_filter(context, security_list):
    ### _技术指标筛选函数-开始 ###
    ### _技术指标筛选函数-结束 ###

    # 返回列表
    return security_list

## 形态指标筛选函数
def pattern_recognition_filter(context, security_list):
    ### _形态指标筛选函数-开始 ###
    ### _形态指标筛选函数-结束 ###

    # 返回列表
    return security_list

## 其他方式筛选函数
def other_func_filter(context, security_list):
    ### _其他方式筛选函数-开始 ###
    ### _其他方式筛选函数-结束 ###

    # 返回列表
    return security_list

# 获取选股排序的 input_dict
def get_check_stocks_sort_input_dict():
    input_dict = {
        }
    # 返回结果
    return input_dict

##################################  交易函数群 ##################################
# 交易函数 - 出场
def sell(context, buy_lists):
    # 获取 sell_lists 列表
    init_sl = context.portfolio.positions.keys()
    sell_lists = context.portfolio.positions.keys()

    # 判断是否卖出buy_lists中的股票
    if not g.sell_will_buy:
        sell_lists = [security for security in sell_lists if security not in buy_lists]
    
    ### _出场函数筛选-开始 ###
    ### _出场函数筛选-结束 ###
    
    # 卖出股票
    if len(sell_lists)>0:
        for stock in sell_lists:
            sell_by_amount_or_percent_or_none(context,stock, g.sell_by_amount, g.sell_by_percent, g.open_sell_securities)
    
    # 获取卖出的股票, 并加入到 g.selled_security_list中
    selled_security_list_dict(context,init_sl)
    
    return

# 交易函数 - 入场
def buy(context, buy_lists):
    # 风控信号判断
    if not g.risk_management_signal:
        return
    
    # 判断当日是否触发风控清仓止损
    if not g.daily_risk_management:
        return
    # 判断是否可重复买入
    buy_lists = holded_filter(context,buy_lists)
    
    # 获取最终的 buy_lists 列表
    Num = g.max_hold_stocknum - len(context.portfolio.positions)
    buy_lists = buy_lists[:Num]

    # 买入股票
    if len(buy_lists)>0:
        # 分配资金
        result = order_style(context,buy_lists,g.max_hold_stocknum, g.order_style_str, g.order_style_value)
        for stock in buy_lists:
            if len(context.portfolio.positions) < g.max_hold_stocknum:
                # 获取资金
                Cash = result[stock]
                # 判断个股最大持仓比重
                value = judge_security_max_proportion(context,stock,Cash,g.security_max_proportion)
                # 判断单只最大买入股数或金额
                amount = max_buy_value_or_amount(stock,value,g.max_buy_value,g.max_buy_amount)
                # 下单
                order(stock, amount, MarketOrderStyle())
    return

###################################  公用函数群 ##################################
## 排序
def check_stocks_sort(context,security_list,input_dict,ascending='desc'):
    if (len(security_list) == 0) or (len(input_dict) == 0):
        return security_list
    else:
        # 生成 key 的 list
        idk = list(input_dict.keys())
        # 生成矩阵
        a = pd.DataFrame()
        for i in idk:
            b = get_sort_dataframe(security_list, i, input_dict[i])
            a = pd.concat([a,b],axis = 1)
        # 生成 score 列
        a['score'] = a.sum(1,False)
        # 根据 score 排序
        if ascending == 'asc':# 升序
            a = a.sort(['score'],ascending = True)
        elif ascending == 'desc':# 降序
            a = a.sort(['score'],ascending = False)
        # 返回结果
        return list(a.index)

## 过滤同一标的继上次卖出N天不再买入
def filter_n_tradeday_not_buy(security, n=0):
    try:
        if (security in g.selled_security_list.keys()) and (g.selled_security_list[security]0:
        for stock in selled_sl:
            g.selled_security_list[stock] = 0

## 过滤停牌股票
def paused_filter(context, security_list):
    if g.filter_paused:
        current_data = get_current_data()
        security_list = [stock for stock in security_list if not current_data[stock].paused]
    # 返回结果
    return security_list

## 过滤退市股票
def delisted_filter(context, security_list):
    if g.filter_delisted:
        current_data = get_current_data()
        security_list = [stock for stock in security_list if not (('退' in current_data[stock].name) or ('*' in current_data[stock].name))]
    # 返回结果
    return security_list


## 过滤ST股票
def st_filter(context, security_list):
    if g.only_st:
        current_data = get_current_data()
        security_list = [stock for stock in security_list if current_data[stock].is_st]
    else:
        if g.filter_st:
            current_data = get_current_data()
            security_list = [stock for stock in security_list if not current_data[stock].is_st]
    # 返回结果
    return security_list

# 过滤涨停股票
def high_limit_filter(context, security_list):
    current_data = get_current_data()
    security_list = [stock for stock in security_list if not (current_data[stock].day_open >= current_data[stock].high_limit)]
    # 返回结果
    return security_list

# 获取股票股票池
def get_security_universe(context, security_universe_index, security_universe_user_securities):
    temp_index = []
    for s in security_universe_index:
        if s == 'all_a_securities':
            temp_index += list(get_all_securities(['stock'], context.current_dt.date()).index)
        else:
            temp_index += get_index_stocks(s)
    for x in security_universe_user_securities:
        temp_index += x
    return  sorted(list(set(temp_index)))

# 行业过滤
def industry_filter(context, security_list, industry_list):
    if len(industry_list) == 0:
        # 返回股票列表
        return security_list
    else:
        securities = []
        for s in industry_list:
            temp_securities = get_industry_stocks(s)
            securities += temp_securities
        security_list = [stock for stock in security_list if stock in securities]
        # 返回股票列表
        return security_list

# 概念过滤
def concept_filter(context, security_list, concept_list):
    if len(concept_list) == 0:
        return security_list
    else:
        securities = []
        for s in concept_list:
            temp_securities = get_concept_stocks(s)
            securities += temp_securities
        security_list = [stock for stock in security_list if stock in securities]
        # 返回股票列表
        return security_list

#自定义函数

傻瓜策略-PE中位数ETF投资法

回测平台为聚宽 https://www.joinquant.com/

基准指数选择为沪深300指数,交易标的为510310(沪深300ETF)
策略内容:
每个月5日计算沪深300指数的成分股PE,如果PE中位数低于25则满仓买入沪深300ETF,如果PE中位数超过40,则清仓。
测试时间:2013-04-01至2018-03-05
(由于沪深300ETF上市日期为2013年3月29日,固此策略无法追溯的更早)

回测结果:

基准收益 61.04%
策略收益 110.70%
策略年化收益 16.81%
最大回撤 18.637%
Alpha 0.104
Beta 0.373

策略代码:

def initialize(context):
    g.__mycash=1000000
    g.__mysecurity = '510310.XSHG'
    #获取沪深300成分股
    g.security = get_index_stocks('000300.XSHG')
    set_universe(g.security)
    #设置基准收益
    set_benchmark('000300.XSHG')
    set_order_cost(OrderCost(open_tax=0,
    close_tax=0.001, open_commission=0.0003,
    close_commission=0.0003, close_today_commission=0,
    min_commission=5), type='stock')
    run_monthly(Deal,5)
    
    
def Deal(context):
    security=g.security
    #g.__mycash+=5000
    PE = get_fundamentals(query(
            valuation.pe_ratio_lyr
        ).filter(
            valuation.code.in_(g.security)
        ))
    PE_list=PE.pe_ratio_lyr.tolist()
    templist=[]
    for x in PE_list:
        if x >= 0:
            templist.append(x)
    PE_list=templist[:]
    PE_list.sort()
    pe_mean = int(PE_list[len(PE_list)/2])
    #"""
    log.info("PE %s" % (pe_mean))
    if (pe_mean <= 25):
        order_target_value(g.__mysecurity, g.__mycash)
    elif (pe_mean > 25 and pe_mean <= 35):
        pass
    else:
        order_target_value(g.__mysecurity,0)
    #elif (pe_mean > 35 and pe_mean <= 35):
    #    order_target_value(g.__mysecurity, g.__mycash*0.85) 

    
def handle_data(context,data):
    pass