复制或下载notification.py到任意目录,这里假设为/tools/notification.py
给文件添加可执行权限
chmod +x /tools/notification.py在crontab中添加定时任务
crontab -e
0 23 * * * /usr/bin/python /tools/notification.py -i room_id -n room_name -t telegram_bot_token -c telegram_chat_id| #!/usr/bin/env python | |
| import itertools | |
| from typing import List | |
| import requests | |
| import re | |
| import datetime | |
| import argparse | |
| def telegram_notifaction(token: str, chat_id: str, message: str): | |
| url = 'https://api.telegram.org/bot%s/sendMessage?chat_id=%s' % ( | |
| token, chat_id) | |
| _ = requests.post(url, json={'text': message}, timeout=10) | |
| class Data: | |
| def __init__(self, data=['', 0, 0, 0]) -> None: | |
| self.time = data[0] | |
| self.left = data[1] | |
| self.all = data[2] | |
| self.allBuy = data[3] | |
| # Modified from https://github.com/ceynri/szu-electricity-reporter/blob/master/crawler.py | |
| def get_datas(client: str, room_name: str, room_id: str, interval: int = 7) -> List[Data]: | |
| url = 'http://192.168.84.3:9090/cgcSims/selectList.do' | |
| today = datetime.date.today() | |
| days_before = str(today - datetime.timedelta(days=interval)) | |
| today = str(today) | |
| params = { | |
| 'hiddenType': '', | |
| 'isHost': '0', | |
| 'beginTime': days_before, | |
| 'endTime': today, | |
| 'type': '2', | |
| 'client': client, | |
| 'roomId': room_id, | |
| 'roomName': room_name, | |
| 'building': '' | |
| } | |
| response = requests.post(url, data=params) | |
| html = response.text | |
| raw_e_data = re.findall( | |
| r'<td width="13%" align="center">(.*?)</td>', html, re.S) | |
| raw_date_data = re.findall( | |
| r'<td width="22%" align="center">(.*?)</td>', html, re.S) | |
| e_data = [] | |
| row, p = -1, 0 | |
| for datum in raw_e_data: | |
| if p % 5 == 0: | |
| row += 1 | |
| e_data.append([]) | |
| e_data[row].append(raw_date_data[row].strip()[:10]) | |
| elif p % 5 != 1: | |
| e_data[row].append(float(datum.strip())) | |
| p += 1 | |
| e_data = [Data(data) for data in e_data] | |
| return e_data | |
| def pairwise(arr): | |
| a, b = itertools.tee(arr) | |
| next(b, None) | |
| return zip(a, b) | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--client', type=str, default='192.168.84.1') | |
| parser.add_argument('-i', '--room-id', type=str, required=True) | |
| parser.add_argument('-n', '--room-name', type=str, required=True) | |
| parser.add_argument('-t', '--token', type=str, required=True) | |
| parser.add_argument('-c', '--chat-id', type=str, required=True) | |
| parser.add_argument('-d', '--days', type=int, default=7) | |
| args = parser.parse_args() | |
| messageFormat = '时间: {}, 剩余电量: {},昨天使用电量: {}' | |
| message = '房间号:{}, 用电查询({}天)\n'.format(args.room_name, args.days) | |
| datas = get_datas(args.client, args.room_name, args.room_id, interval=args.days + 1) | |
| for pair in pairwise(datas): | |
| a, b = pair | |
| message += messageFormat.format(b.time, b.left, '%.2f'%(b.all - a.all)) + '\n' | |
| telegram_notifaction(args.token, args.chat_id, message) |
复制或下载notification.py到任意目录,这里假设为/tools/notification.py
给文件添加可执行权限
chmod +x /tools/notification.py在crontab中添加定时任务
crontab -e
0 23 * * * /usr/bin/python /tools/notification.py -i room_id -n room_name -t telegram_bot_token -c telegram_chat_id