Last active
November 16, 2022 20:07
-
-
Save Theasker/5188b3219c374f3d480ae29bffb75426 to your computer and use it in GitHub Desktop.
Telegram bot in Python for use in Terminal. You can send message, video, audio, image and document.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python3 | |
| #-*- coding: utf-8 -*- | |
| # config.ini structure | |
| # | |
| # { | |
| # "token": "xxxx:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", | |
| # "channel": "-1111111111111", | |
| # "group": "-22222222222", | |
| # "botname": "name_bot", | |
| # "url": "https://api.telegram.org/bot", | |
| # "urllocal": "http://172.22.0.2:8081/bot" | |
| # } | |
| import requests, json, sys, os | |
| from datetime import datetime | |
| from os.path import exists | |
| PROPERTIES = os.path.dirname(__file__) + '/config.ini' | |
| class TelegramBot(): | |
| def __init__(self): | |
| self._params = {} # Creo un diccionario para las variables del fichero | |
| self._url = None | |
| self.get_properties() | |
| # print(self._params) | |
| self.dispatch() | |
| def get_properties(self): | |
| properties = [] | |
| try: | |
| with open(PROPERTIES, 'r') as file: | |
| self._params = json.load(file) | |
| self._url = f"{self._params['url_local']}{self._params['token']}/" | |
| except OSError: | |
| print("Could not open/read file:", PROPERTIES) | |
| sys.exit() | |
| def get_me(self): | |
| method = "getme" | |
| url = self._url + "/" + method | |
| response = requests.get(url + method) | |
| #response = requests.get(self._url + method) | |
| return json.loads(response.text) | |
| def get_updates(self): | |
| # Last updates of the bot in the groups, channels, etc | |
| method = "getupdates" | |
| response = requests.get(self._url + method) | |
| if response.status_code == 200: | |
| # Convertimos la salida json en un objeto | |
| output = json.loads(response.text) | |
| return output | |
| return None | |
| def send_message(self, message, chat_id): | |
| method = "sendmessage" | |
| data = { | |
| 'chat_id': chat_id, | |
| 'text': message, | |
| 'parse_mode': 'HTML', | |
| 'disable_web_page_preview': True | |
| } | |
| response = requests.post(self._url + method, data=data) | |
| return json.loads(response.text) | |
| def send_media(self, chat_id, filename, type, caption=""): | |
| # try open the file for check if exists | |
| try: | |
| file = open(filename, 'rb') | |
| data = { | |
| 'chat_id': chat_id, | |
| 'caption': caption | |
| } | |
| if (type == "photo"): | |
| method = "sendPhoto" | |
| files = {'photo': file} | |
| elif (type == "audio"): | |
| method = "sendAudio" | |
| files = {'audio': file} | |
| elif (type == "video"): | |
| method = "sendVideo" | |
| files = {'video': file} | |
| else: | |
| data = { | |
| 'chat_id': chat_id, | |
| 'caption': caption, | |
| 'disable_content_type_detection': False | |
| } | |
| method = "sendDocument" | |
| files = {'document': file} | |
| response = requests.post(self._url + method, data, files=files) | |
| return json.loads(response.text) | |
| except IOError as e: | |
| print ("I/O error({0}): {1}".format(e.errno, e.strerror)) | |
| except: #handle other exceptions such as attribute errors | |
| print ("Unexpected error:", sys.exc_info()[0]) | |
| # Chat Updates the last 24 hours | |
| def get_updates_chat(self, chat_id): | |
| method = "getupdates" | |
| data = {'chat_id': chat_id} | |
| response = requests.post(self._url + method, data) | |
| return json.loads(response.text) | |
| def send_path(self, path): | |
| count = 1 | |
| for root, dir_names, file_names in os.walk(path): | |
| # print(f"The root is {root}") | |
| # print(f"The directory name is: {dir_names}") | |
| # print(f"The file names are: {file_names}") | |
| # print(f"{root}") | |
| for file in file_names: | |
| filepath = root + "/" + file | |
| response = self.send_media(self._params['channel'], filepath, "document", file) | |
| if response['ok']: | |
| print(count, "-", filepath, "✔") | |
| count = count + 1 | |
| def dispatch(self): | |
| if len(sys.argv) > 1 & len(sys.argv) < 5: # check number arguments | |
| if sys.argv[1] == "-m": | |
| response = self.send_message(sys.argv[2], self._params['channel']) | |
| if response['ok']: | |
| print("message sent!") | |
| elif sys.argv[1] == "-h": | |
| self.help() | |
| elif sys.argv[1] == "-i": | |
| if (len(sys.argv) == 4): # Check caption option | |
| response = self.send_media(self._params['channel'],sys.argv[2],"photo",sys.argv[3]) | |
| else: | |
| response = self.send_media(self._params['channel'],sys.argv[2],"photo") | |
| if(response['ok']): | |
| print("Image sent!") | |
| elif sys.argv[1] == "-a": | |
| if (len(sys.argv) == 4): # Check caption option | |
| response = self.send_media(self._params['channel'],sys.argv[2],"audio",sys.argv[3]) | |
| else: | |
| response = self.send_media(self._params['channel'],sys.argv[2],"audio") | |
| if(response['ok']): | |
| print("Audio sent!") | |
| elif sys.argv[1] == "-v": | |
| if (len(sys.argv) == 4): # Check caption option | |
| response = self.send_media(self._params['channel'],sys.argv[2],"video",sys.argv[3]) | |
| else: | |
| response = self.send_media(self._params['channel'],sys.argv[2],"video") | |
| if(response['ok']): | |
| print("Video sent!") | |
| elif sys.argv[1] == "-d": | |
| if (len(sys.argv) == 4): # Check caption option | |
| response = self.send_media(self._params['channel'],sys.argv[2],"document",sys.argv[3]) | |
| else: | |
| response = self.send_media(self._params['channel'],sys.argv[2],"document") | |
| if(response['ok']): | |
| print("Document sent!") | |
| elif sys.argv[1] == "-p": | |
| if len(sys.argv) > 2: | |
| self.send_path(sys.argv[2]) | |
| else: | |
| print("I need the path") | |
| elif sys.argv[1] == "--getupdates": | |
| response = self.get_updates() | |
| print(json.dumps(response, indent=2)) | |
| else: | |
| self.help() | |
| else: | |
| self.help() | |
| def help(self): | |
| print(f"Usage: {os.path.basename(__file__)} [OPTION] [IMAGE | FILE | AUDIO | VIDEO | DOCUMENT]") | |
| print("\t-m <message>\t\t\tSend message") | |
| print("\t-h\t\t\t\tHelp options") | |
| print("\t-i <image path> [caption]\tSend a image") | |
| print("\t-a <audio path> [caption]\tSend a audio") | |
| print("\t-v <video path> [caption]\tSend a video") | |
| print("\t-d <document path> [caption]\tSend a document") | |
| print("\t-p <directory path> \t\tSends all files in the given directory and subdirectories.") | |
| print("\t--getupdates\t\t\tGet updates telegram method") | |
| if __name__ == "__main__": | |
| telegram_bot = TelegramBot() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment