Created
July 20, 2016 03:28
-
-
Save pieman2201/a9ea24f4da5c88680bfd6b1ed0c9d5a2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python3 | |
| #made by pieman2201 | |
| #needs files: aliases.txt, locked.txt | |
| import requests | |
| import json | |
| import os | |
| import time | |
| import random | |
| #telegram bot stuff | |
| url = "https://api.telegram.org/bot%s/%s" | |
| token = "token goes here" | |
| #parameters | |
| chat_id = "" | |
| max_value = 1024 | |
| path = os.path.dirname(__file__) | |
| #requests stuff | |
| ConnectionError = requests.exceptions.ConnectionError | |
| def getUpdates(offset): | |
| #gets all updates starting with offset | |
| try: | |
| r = requests.get(url % (token, "getUpdates"), data={"offset": offset}, timeout=2) | |
| except: | |
| print("timeout") | |
| return [], offset | |
| r = json.loads(r.text) | |
| r = r["result"] | |
| if len(r) > 0: | |
| offset = int(r[-1]['update_id']) + 1 | |
| return r, offset | |
| start = time.time() | |
| latestOffset = 1 | |
| #update current offset to show the latest messages because telegram is dumb | |
| print("Updating...", end="") | |
| oldLatestOffset = 0 | |
| while oldLatestOffset < latestOffset: | |
| oldLatestOffset = latestOffset | |
| DRAIN, latestOffset = getUpdates(latestOffset) | |
| print("\rUpdated ") | |
| def sendMessage(message, reply_to_message_id=False, parse=True, tried_to_parse=False): | |
| #send message to current chat with content message | |
| payload = { | |
| "chat_id": chat_id, | |
| "text": message, | |
| "parse_mode": "Markdown", | |
| "disable_web_page_preview": True | |
| } | |
| if not parse: | |
| #don't parse if we don't wanna parse | |
| del payload["parse_mode"] | |
| if reply_to_message_id: | |
| #do a reply if specified | |
| payload['reply_to_message_id'] = reply_to_message_id | |
| del payload["parse_mode"] | |
| try: | |
| response = requests.post(url % (token, "sendMessage"), data=payload, timeout=2) | |
| except: | |
| #handle a timeout by trying again with all the same params | |
| print("timeout") | |
| sendMessage(message, reply_to_message_id, parse, tried_to_parse) | |
| resp = json.loads(response.text) | |
| if not resp["ok"] and tried_to_parse != True: | |
| #something probably went wrong with the parsing, let's try again | |
| sendMessage(message, reply_to_message_id=reply_to_message_id, parse=False, tried_to_parse=True) | |
| def loadAliases(): | |
| #puts saved aliases into an aliases dict | |
| aliases = {} | |
| keys = [] | |
| aliasFile = open(path + "/aliases.txt").read() | |
| for term in aliasFile.split("</term>\n"): | |
| term = term.replace("<term>","") | |
| alias = term.split("=")[0] | |
| value = "=".join(term.split("=")[1:]) | |
| if alias != "" and value != "": | |
| aliases[alias] = value | |
| keys.append(alias) | |
| return aliases, keys | |
| def saveAliases(aliases): | |
| #puts an aliases dict into a savefile | |
| aliasFile = open(path + "/aliases.txt", "w") | |
| aliasStr = "" | |
| for alias in aliases.keys(): | |
| aliasStr += "<term>" + alias + "=" + aliases[alias] + "</term>\n" | |
| aliasFile.write(aliasStr) | |
| aliasFile.close() | |
| aliases, aliasList = loadAliases() #load aliases on start | |
| print("Started") | |
| stime = 0 | |
| locked = [] | |
| users = {} | |
| banned = {} | |
| while True: | |
| try: | |
| #reload locked aliases on every cycle | |
| locked = [] | |
| lfile = open(path + "/locked.txt").read() | |
| for line in lfile.split('\n'): | |
| if line != '': | |
| locked.append(line) | |
| #get updates and the newest offset | |
| r, latestOffset = getUpdates(latestOffset) | |
| #handle spam detection and unbanning | |
| if int(time.time()) % 6 == 0: | |
| users = {} | |
| toUnban = [] | |
| for name in banned.keys(): | |
| if time.time() > banned[name] + 300: | |
| print("unbanned " + name) | |
| toUnban.append(name) | |
| for name in toUnban: | |
| del banned[name] | |
| #do a dumb smart sleep thingo | |
| if len(r) == 0: | |
| if stime < 5: | |
| stime += 0.25 | |
| print("interval incremented: " + str(stime)) | |
| else: | |
| print("interval reset") | |
| stime = 0 | |
| for update in r: | |
| #loop through each update | |
| if "message" in update.keys(): | |
| message = update['message'] | |
| message_id = message['message_id'] | |
| chat = message['chat'] | |
| chat_id = chat['id'] | |
| if "from" in message.keys(): | |
| #find and store name of user | |
| user = message['from'] | |
| if "username" in user.keys(): | |
| name = "@" + user['username'] | |
| else: | |
| name = user['first_name'] | |
| if name in banned.keys(): | |
| continue | |
| if "text" in message.keys(): | |
| #get the text of the message | |
| text = message['text'] | |
| if "/alias" == text[:6]: | |
| #add to the user's count | |
| if name in users.keys(): | |
| users[name] += 1 | |
| else: | |
| users[name] = 1 | |
| #check if the message is an /alias command and parse | |
| content = text[7:] | |
| try: | |
| alias = content.split("=")[0] | |
| value = "=".join(content.split("=")[1:]) | |
| if len(alias.split()) == 1: | |
| if alias not in locked: | |
| if len(value) < max_value: | |
| alias = alias.replace(" ", "") | |
| aliases[alias] = value | |
| print("alias " + alias + "=" + value + " by " + name) | |
| saveAliases(aliases) | |
| sendMessage("Aliased " + alias + " to " + value, message_id) | |
| else: | |
| print("value too big") | |
| sendMessage("Value is too big (" + str(max_value) + " chars)", message_id) | |
| else: | |
| print("cannot unlock alias") | |
| sendMessage("Alias is locked, sorry", message_id) | |
| else: | |
| print("alias malformed") | |
| sendMessage("Alias must be a single term", message_id) | |
| except: | |
| pass | |
| elif "/unalias" == text[:8]: | |
| #add to the user's count | |
| if name in users.keys(): | |
| users[name] += 1 | |
| else: | |
| users[name] = 1 | |
| #check if the message is an /unalias command and parse | |
| content = text[9:] | |
| try: | |
| alias = content | |
| if alias not in locked: | |
| if len(alias.split()) == 1: | |
| if alias in aliases.keys(): | |
| del aliases[alias] | |
| print("del " + alias) | |
| saveAliases(aliases) | |
| sendMessage("Unaliased " + alias, message_id) | |
| else: | |
| print("unalias malformed") | |
| sendMessage("Alias must be a single term", message_id) | |
| else: | |
| print("cannot unlock alias") | |
| sendMessage("Alias is locked, sorry", message_id) | |
| except: | |
| pass | |
| elif "/random" == text[:7]: | |
| #add to the user's count | |
| if name in users.keys(): | |
| users[name] += 1 | |
| else: | |
| users[name] = 1 | |
| #send a random one | |
| randomAlias = random.choice(aliasList) | |
| randomAliasStr = "/%s = %s" % (randomAlias, aliases[randomAlias]) | |
| sendMessage(randomAliasStr) | |
| elif "/time" == text[:5]: | |
| #add to the user's count (there should really be a func for this) | |
| if name in users.keys(): | |
| users[name] += 1 | |
| else: | |
| users[name] = 1 | |
| #send the current time in seconds since the bot started | |
| sendMessage("up for " + str(time.time() - start) + "s") | |
| elif "/" in text: | |
| #if there is a different command in the message | |
| terms = text.split() | |
| commands = [] | |
| for term in terms: | |
| #find the command in the message | |
| if "/" == term[0]: | |
| if "@" in term: | |
| alias = term[1:].split("@")[0] | |
| else: | |
| alias = term[1:] | |
| commands.append(alias) | |
| response = "" | |
| for command in commands: | |
| #for each command in the message... | |
| for alias in aliases.keys(): | |
| #check to see if the command is an alias... | |
| if alias == command: | |
| #add to the user's count per alias used | |
| if name in users.keys(): | |
| users[name] += 1 | |
| else: | |
| users[name] = 1 | |
| #if it is, add the value to the response | |
| value = aliases[alias] | |
| response += value + " " | |
| print(alias + " -> " + value) | |
| if response != "": | |
| #check if the message is blank. if not... | |
| response += name | |
| sendMessage(response) #send the values | |
| else: | |
| #unrelated message handling because telegram is dumb | |
| pass | |
| #handle banning users | |
| for name in users.keys(): | |
| if users[name] >= 5: | |
| users[name] = 0 | |
| banned[name] = time.time() | |
| sendMessage("Banned " + name + " for 5m due to spam") | |
| print("banned " + name) | |
| #sleep for the determined amount of time | |
| time.sleep(stime) | |
| except ConnectionError: | |
| print("ConnectionError") #should put stuff here but whatever |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment