Skip to content

Instantly share code, notes, and snippets.

@Theasker
Last active November 16, 2022 20:07
Show Gist options
  • Select an option

  • Save Theasker/5188b3219c374f3d480ae29bffb75426 to your computer and use it in GitHub Desktop.

Select an option

Save Theasker/5188b3219c374f3d480ae29bffb75426 to your computer and use it in GitHub Desktop.
Telegram bot in Python for use in Terminal. You can send message, video, audio, image and document.
#!/usr/bin/python3
#-*- coding: utf-8 -*-
# config.ini structure
#
# {
# "token": "xxxx:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
# "channel": "-1111111111111",
# "group": "-22222222222",
# "botname": "name_bot",
# "url": "https://api.telegram.org/bot",
# "urllocal": "http://172.22.0.2:8081/bot"
# }
import requests, json, sys, os
from datetime import datetime
from os.path import exists
PROPERTIES = os.path.dirname(__file__) + '/config.ini'
class TelegramBot():
def __init__(self):
self._params = {} # Creo un diccionario para las variables del fichero
self._url = None
self.get_properties()
# print(self._params)
self.dispatch()
def get_properties(self):
properties = []
try:
with open(PROPERTIES, 'r') as file:
self._params = json.load(file)
self._url = f"{self._params['url_local']}{self._params['token']}/"
except OSError:
print("Could not open/read file:", PROPERTIES)
sys.exit()
def get_me(self):
method = "getme"
url = self._url + "/" + method
response = requests.get(url + method)
#response = requests.get(self._url + method)
return json.loads(response.text)
def get_updates(self):
# Last updates of the bot in the groups, channels, etc
method = "getupdates"
response = requests.get(self._url + method)
if response.status_code == 200:
# Convertimos la salida json en un objeto
output = json.loads(response.text)
return output
return None
def send_message(self, message, chat_id):
method = "sendmessage"
data = {
'chat_id': chat_id,
'text': message,
'parse_mode': 'HTML',
'disable_web_page_preview': True
}
response = requests.post(self._url + method, data=data)
return json.loads(response.text)
def send_media(self, chat_id, filename, type, caption=""):
# try open the file for check if exists
try:
file = open(filename, 'rb')
data = {
'chat_id': chat_id,
'caption': caption
}
if (type == "photo"):
method = "sendPhoto"
files = {'photo': file}
elif (type == "audio"):
method = "sendAudio"
files = {'audio': file}
elif (type == "video"):
method = "sendVideo"
files = {'video': file}
else:
data = {
'chat_id': chat_id,
'caption': caption,
'disable_content_type_detection': False
}
method = "sendDocument"
files = {'document': file}
response = requests.post(self._url + method, data, files=files)
return json.loads(response.text)
except IOError as e:
print ("I/O error({0}): {1}".format(e.errno, e.strerror))
except: #handle other exceptions such as attribute errors
print ("Unexpected error:", sys.exc_info()[0])
# Chat Updates the last 24 hours
def get_updates_chat(self, chat_id):
method = "getupdates"
data = {'chat_id': chat_id}
response = requests.post(self._url + method, data)
return json.loads(response.text)
def send_path(self, path):
count = 1
for root, dir_names, file_names in os.walk(path):
# print(f"The root is {root}")
# print(f"The directory name is: {dir_names}")
# print(f"The file names are: {file_names}")
# print(f"{root}")
for file in file_names:
filepath = root + "/" + file
response = self.send_media(self._params['channel'], filepath, "document", file)
if response['ok']:
print(count, "-", filepath, "✔")
count = count + 1
def dispatch(self):
if len(sys.argv) > 1 & len(sys.argv) < 5: # check number arguments
if sys.argv[1] == "-m":
response = self.send_message(sys.argv[2], self._params['channel'])
if response['ok']:
print("message sent!")
elif sys.argv[1] == "-h":
self.help()
elif sys.argv[1] == "-i":
if (len(sys.argv) == 4): # Check caption option
response = self.send_media(self._params['channel'],sys.argv[2],"photo",sys.argv[3])
else:
response = self.send_media(self._params['channel'],sys.argv[2],"photo")
if(response['ok']):
print("Image sent!")
elif sys.argv[1] == "-a":
if (len(sys.argv) == 4): # Check caption option
response = self.send_media(self._params['channel'],sys.argv[2],"audio",sys.argv[3])
else:
response = self.send_media(self._params['channel'],sys.argv[2],"audio")
if(response['ok']):
print("Audio sent!")
elif sys.argv[1] == "-v":
if (len(sys.argv) == 4): # Check caption option
response = self.send_media(self._params['channel'],sys.argv[2],"video",sys.argv[3])
else:
response = self.send_media(self._params['channel'],sys.argv[2],"video")
if(response['ok']):
print("Video sent!")
elif sys.argv[1] == "-d":
if (len(sys.argv) == 4): # Check caption option
response = self.send_media(self._params['channel'],sys.argv[2],"document",sys.argv[3])
else:
response = self.send_media(self._params['channel'],sys.argv[2],"document")
if(response['ok']):
print("Document sent!")
elif sys.argv[1] == "-p":
if len(sys.argv) > 2:
self.send_path(sys.argv[2])
else:
print("I need the path")
elif sys.argv[1] == "--getupdates":
response = self.get_updates()
print(json.dumps(response, indent=2))
else:
self.help()
else:
self.help()
def help(self):
print(f"Usage: {os.path.basename(__file__)} [OPTION] [IMAGE | FILE | AUDIO | VIDEO | DOCUMENT]")
print("\t-m <message>\t\t\tSend message")
print("\t-h\t\t\t\tHelp options")
print("\t-i <image path> [caption]\tSend a image")
print("\t-a <audio path> [caption]\tSend a audio")
print("\t-v <video path> [caption]\tSend a video")
print("\t-d <document path> [caption]\tSend a document")
print("\t-p <directory path> \t\tSends all files in the given directory and subdirectories.")
print("\t--getupdates\t\t\tGet updates telegram method")
if __name__ == "__main__":
telegram_bot = TelegramBot()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment