v5可以用的一个通知脚本notify.py,V6不能用了,
脚本如下:需要改什么,或要改什么配置
cat /opt/n9e/etc/script/notify.py
#!/usr/bin/env python
-- coding: UTF-8 --
import sys,getopt
import json
import requests
class Sender(object):
@classmethod
def send_email(cls, payload):
# already done in go code
pass
@classmethod
def send_wecom(cls, payload):
acc_id=1000008
acc_serc='xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
corpid='xxxxxxxxxxxxxxxxxxxxxx'
corpsecret=acc_serc
url='https://qyapi.weixin.qq.com/cgi-bin/gettoken?'
requests.adapters.DEFAULT_RETRIES = 5 # 增加重连次数
s = requests.session()
s.keep_alive = False # 关闭多余连接
rep = s.get(url+'corpid='+corpid+'&corpsecret='+corpsecret)
token=rep.json().get('access_token')
users = payload.get('event').get("notify_users_obj")
tokens={}
for u in users:
contacts = u.get("contacts")
if contacts.get("wecom_user_id", ""):
tokens[contacts.get("wecom_user_id", "")] = 1
for t in tokens:
url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}".format(token)
body = {
"msgtype": "text",
"touser" :"{}".format(t),
"agentid" : acc_id,
"text": {
"content": payload.get('tpls').get("wecom.tpl", "wecom.tpl not found")
}
}
headers = {'Content-Type':'application/json'}
req = requests.post(url=url, json=body, headers=headers)
try:
print(req)
except (error):
print(error)
@classmethod
def send_dingtalk(cls, payload):
# already done in go code
pass
@classmethod
def send_feishu(cls, payload):
# already done in go code
pass
@classmethod
def send_sms(cls, payload):
users = payload.get('event').get("notify_users_obj")
phones = {}
for u in users:
if u.get("phone"):
phones[u.get("phone")] = 1
if phones:
print("send_sms not implemented, phones: {}".format(phones.keys()))
@classmethod
def send_voice(cls, payload):
users = payload.get('event').get("notify_users_obj")
phones = {}
for u in users:
if u.get("phone"):
phones[u.get("phone")] = 1
if phones:
print("send_voice not implemented, phones: {}".format(phones.keys()))
def main():
payload = json.load(sys.stdin)
with open(".payload", 'w') as f:
f.write(json.dumps(payload, indent=4))
for ch in payload.get('event').get('notify_channels'):
send_func_name = "send_{}".format(ch.strip())
if not hasattr(Sender, send_func_name):
print("function: {} not found", send_func_name)
continue
send_func = getattr(Sender, send_func_name)
send_func(payload)
def hello():
print("hello nightingale")
if name == "main":
if len(sys.argv) == 1:
main()
elif sys.argv[1] == "hello":
hello()
else:
print("I am confused")