Skip to content

Instantly share code, notes, and snippets.

@eros18123
Created January 9, 2026 19:43
Show Gist options
  • Select an option

  • Save eros18123/a557197eb69c36c6581001694e6bac70 to your computer and use it in GitHub Desktop.

Select an option

Save eros18123/a557197eb69c36c6581001694e6bac70 to your computer and use it in GitHub Desktop.
chat nova versao 2
import sys
import os
import uuid
import tempfile
import threading
import json
import time
import re
import base64
import mimetypes
import requests
from aqt import mw
from aqt.qt import (
QAction, QDialog, QWidget, QVBoxLayout,
QLineEdit, QPushButton, QLabel, pyqtSignal, QColor,
QDialogButtonBox, Qt, QMenu, QInputDialog, QHBoxLayout, QFormLayout,
QTableWidget, QTableWidgetItem, QHeaderView, QTextEdit, QSplitter,
QListWidget, QListWidgetItem, QTabWidget, QCheckBox
)
from aqt.utils import tooltip
from aqt import gui_hooks
from anki.sound import AVTag
from datetime import datetime, timedelta
try:
from aqt.qt import QWebEngineView
except ImportError:
QWebEngineView = None
class FirebaseAPI:
def __init__(self, base_url, api_key):
if not base_url.endswith('/'): base_url += '/'
self.base_url, self.api_key = base_url, api_key
self.auth_url_signup = f"https://identitytoolkit.googleapis.com/v1/accounts:signUp?key={self.api_key}"
self.auth_url_signin = f"https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword?key={self.api_key}"
self.auth_url_refresh = f"https://securetoken.googleapis.com/v1/token?key={self.api_key}"
self.auth_url_change = f"https://identitytoolkit.googleapis.com/v1/accounts:update?key={self.api_key}"
def _send_request(self, url, payload):
try:
r = requests.post(url, data=json.dumps(payload), timeout=10)
r.raise_for_status()
return r.json(), None
except requests.exceptions.RequestException as e:
try: return None, e.response.json().get("error", {}).get("message", "UNKNOWN_ERROR")
except: return None, str(e)
except Exception as e: return None, str(e)
def signup_user(self, e, p): return self._send_request(self.auth_url_signup, {"email": e, "password": p, "returnSecureToken": True})
def signin_user(self, e, p): return self._send_request(self.auth_url_signin, {"email": e, "password": p, "returnSecureToken": True})
def refresh_token(self, rt): return self._send_request(self.auth_url_refresh, {"grant_type": "refresh_token", "refresh_token": rt})
def change_password(self, it, np): return self._send_request(self.auth_url_change, {"idToken": it, "password": np, "returnSecureToken": False})
def get_data(self, path="", id_token=None, params=""):
try:
url = f"{self.base_url}{path}.json?auth={id_token}{'&' if params else ''}{params}"
r = requests.get(url, timeout=10)
r.raise_for_status()
return r.json()
except: return None
def put_data(self, path, data, id_token=None):
try:
url = f"{self.base_url}{path}.json?auth={id_token}"
requests.put(url, data=json.dumps(data), timeout=10).raise_for_status()
except Exception as e: print(f"AnkiChat: Erro PUT em {path}: {e}")
def patch_data(self, path, data, id_token=None):
try:
url = f"{self.base_url}{path}.json?auth={id_token}"
requests.patch(url, data=json.dumps(data), timeout=10).raise_for_status()
except Exception as e: print(f"AnkiChat: Erro PATCH em {path}: {e}")
def post_data(self, path, data, id_token=None, return_name=False):
try:
url = f"{self.base_url}{path}.json?auth={id_token}"
response = requests.post(url, data=json.dumps(data), timeout=10)
response.raise_for_status()
if return_name: return response.json()
except Exception as e:
if return_name: raise e
print(f"AnkiChat: Erro POST em {path}: {e}")
def delete_data(self, path, id_token=None):
try:
url = f"{self.base_url}{path}.json?auth={id_token}"
requests.delete(url, timeout=10).raise_for_status()
except: pass
class BackgroundUpdater:
def __init__(self):
self.is_connected = False
self.nickname = None
self.id_token = None
self.uid = None
self.email = None
self.refresh_token = None
self.firebase = None
self._heartbeat_thread = None
self.stop_heartbeat = threading.Event()
def initialize(self, firebase_api):
self.firebase = firebase_api
def update_state(self, email, uid, id_token, refresh_token, expires_in):
self.email, self.uid, self.id_token, self.refresh_token = email, uid, id_token, refresh_token
self.nickname = email.split('@')[0]
self.is_connected = True
self.start_heartbeat()
def clear_state(self):
self.is_connected = False
self.stop_heartbeat.set()
self.nickname, self.id_token, self.uid, self.email, self.refresh_token = None, None, None, None, None
def start_heartbeat(self):
if self._heartbeat_thread and self._heartbeat_thread.is_alive(): return
self.stop_heartbeat.clear()
self._heartbeat_thread = threading.Thread(target=self._presence_loop, daemon=True)
self._heartbeat_thread.start()
def _get_anki_stats_safe(self):
if not mw or not mw.col: return {}
result, evt = {}, threading.Event()
def query_on_main():
try:
start_of_today_ms = (mw.col.sched.day_cutoff - 86400) * 1000
total_reviews = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ?", start_of_today_ms) or 0
total_time_ms = mw.col.db.scalar("SELECT sum(time) FROM revlog WHERE id > ?", start_of_today_ms) or 0
new_done = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ? AND type = 0 AND lastIvl = 0", start_of_today_ms) or 0
learn_done = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ? AND type IN (0, 2) AND lastIvl > 0", start_of_today_ms) or 0
review_done = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ? AND type = 1", start_of_today_ms) or 0
again_count = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ? AND ease = 1", start_of_today_ms) or 0
hard_count = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ? AND ease = 2", start_of_today_ms) or 0
good_count = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ? AND ease = 3", start_of_today_ms) or 0
easy_count = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ? AND ease = 4", start_of_today_ms) or 0
mature_reviews_count = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ? AND type = 1", start_of_today_ms) or 0
mature_passes_count = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ? AND type = 1 AND ease > 1", start_of_today_ms) or 0
retention = (mature_passes_count / mature_reviews_count) if mature_reviews_count > 0 else 0
return {"reviews_today": total_reviews, "new_done": new_done, "learn_done": learn_done, "review_done": review_done, "total_time_ms": total_time_ms, "again_count": again_count, "hard_count": hard_count, "good_count": good_count, "easy_count": easy_count, "retention": retention}
except Exception as e:
print(f"AnkiChat: Erro ao executar query SQL: {e}")
return {}
def wrapper():
nonlocal result
try: result = query_on_main()
finally: evt.set()
mw.taskman.run_on_main(wrapper)
evt.wait(timeout=5.0)
return result
def _update_streak_if_needed(self):
today_str, yesterday_str = datetime.now().strftime('%Y-%m-%d'), (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
user_data = self.firebase.get_data(f"users/{self.uid}", self.id_token) or {}
last_review_date, current_streak = user_data.get("last_review_date"), user_data.get("streak", 0)
if last_review_date == today_str: return
new_streak = current_streak + 1 if last_review_date == yesterday_str else 1
self.firebase.patch_data(f"users/{self.uid}", {"streak": new_streak, "last_review_date": today_str}, self.id_token)
def update_presence_on_firebase(self):
if not self.is_connected: return
stats = self._get_anki_stats_safe()
if stats.get("reviews_today", 0) > 0: self._update_streak_if_needed()
presence_data = {"uid": self.uid, "nickname": self.nickname, "state": "online", "last_seen": int(time.time()), **stats}
self.firebase.put_data(f"online/{self.uid}", presence_data, self.id_token)
def trigger_immediate_update(self):
threading.Thread(target=self.update_presence_on_firebase, daemon=True).start()
def _presence_loop(self):
while not self.stop_heartbeat.is_set():
self.update_presence_on_firebase()
time.sleep(30)
background_updater = BackgroundUpdater()
class AuthManager:
def __init__(self, chat_window=None):
self.cw = chat_window
self.firebase = background_updater.firebase
self.addon_path = os.path.dirname(os.path.abspath(__file__))
self.history_file = os.path.join(self.addon_path, 'login_history.json')
self.autologin_file = os.path.join(self.addon_path, 'autologin.json')
def _obfuscate(self, data: str) -> str: return base64.b64encode(data.encode('utf-8')).decode('utf-8')
def _deobfuscate(self, data: str) -> str:
try: return base64.b64decode(data.encode('utf-8')).decode('utf-8')
except: return ""
def load_login_history(self):
if not os.path.exists(self.history_file): return {}
try:
with open(self.history_file, 'r', encoding='utf-8') as f: return json.load(f)
except: return {}
def save_login_history(self, history):
try:
with open(self.history_file, 'w', encoding='utf-8') as f: json.dump(history, f, indent=4)
except: pass
def start_login_thread(self, email, password): threading.Thread(target=self._attempt_login, args=(email, password), daemon=True).start()
def _attempt_login(self, email, password):
response, error = self.firebase.signin_user(email, password)
if response:
uid = response.get('localId')
history = self.load_login_history()
history[email] = self._obfuscate(password)
self.save_login_history(history)
background_updater.update_state(email, uid, response.get('idToken'), response.get('refreshToken'), response.get('expiresIn', '3600'))
if self.cw: self.cw.connection_succeeded.emit(email, uid, response.get('idToken'), password, response.get('refreshToken'), response.get('expiresIn', '3600'))
self.save_autologin_info(email, uid, response.get('refreshToken'))
else:
if self.cw: self.cw.connection_failed.emit(f"Falha no login: {error}")
def save_autologin_info(self, email, uid, refresh_token):
try:
with open(self.autologin_file, 'w', encoding='utf-8') as f: json.dump({'email': email, 'uid': uid, 'refreshToken': refresh_token}, f)
except: pass
def show_register_dialog(self):
dialog = QDialog(self.cw)
dialog.setWindowTitle("Registrar")
layout = QFormLayout(dialog)
email_entry, pass_entry = QLineEdit(), QLineEdit()
pass_entry.setEchoMode(QLineEdit.EchoMode.Password)
layout.addRow("E-mail:", email_entry)
layout.addRow("Senha:", pass_entry)
buttons = QDialogButtonBox(QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel)
buttons.accepted.connect(dialog.accept)
buttons.rejected.connect(dialog.reject)
layout.addWidget(buttons)
if dialog.exec():
email, password = email_entry.text().strip(), pass_entry.text()
if not email or not password: return
threading.Thread(target=self._attempt_register, args=(email, password), daemon=True).start()
def _attempt_register(self, email, password):
signup_response, signup_error = self.firebase.signup_user(email, password)
if signup_response:
new_nick, uid, id_token = signup_response.get('email').split('@')[0], signup_response.get('localId'), signup_response.get('idToken')
self.firebase.put_data(f"users/{uid}", {"nickname": new_nick}, id_token)
self.firebase.put_data(f"nick_to_uid/{new_nick}", uid, id_token)
background_updater.update_state(email, uid, id_token, signup_response.get('refreshToken'), signup_response.get('expiresIn', '3600'))
self.save_autologin_info(email, uid, signup_response.get('refreshToken'))
background_updater.trigger_immediate_update()
time.sleep(0.5)
if self.cw: self.cw.connection_succeeded.emit(email, uid, id_token, password, signup_response.get('refreshToken'), signup_response.get('expiresIn', '3600'))
else:
if self.cw: self.cw.connection_failed.emit(f"Erro no registro: {signup_error}")
def show_change_password_dialog(self): pass
class LiveViewWindow(QDialog):
def __init__(self, parent, target_uid, target_nick, request_id):
super().__init__(parent)
self.target_uid, self.target_nick, self.request_id = target_uid, target_nick, request_id
self.firebase, self.id_token, self.is_listening, self.last_html_content = background_updater.firebase, background_updater.id_token, True, ""
self.setWindowTitle(f"Visualizando: {self.target_nick}")
self.setMinimumSize(400, 300)
self.layout = QVBoxLayout(self)
if QWebEngineView:
self.webview = QWebEngineView()
self.layout.addWidget(self.webview)
self.webview.hide()
else: self.webview = QLabel("Módulo WebEngine não disponível."), self.layout.addWidget(self.webview)
self.status_label = QLabel("Aguardando o usuário iniciar a revisão...")
self.status_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.layout.addWidget(self.status_label)
threading.Thread(target=self._listen_for_updates, daemon=True).start()
def _listen_for_updates(self):
path = f"live_view/{self.target_uid}"
while self.is_listening:
try:
card_data = self.firebase.get_data(path, self.id_token)
if card_data: mw.taskman.run_on_main(lambda d=card_data: self.update_card_view(d))
else:
mw.taskman.run_on_main(self.show_stream_ended_message)
break
except Exception as e: print(f"LiveView: Erro ao buscar dados: {e}")
time.sleep(0.5)
def update_card_view(self, data):
if not self.is_listening or not QWebEngineView: return
state = data.get("state", "question")
if state == "waiting":
self.show_waiting_message()
return
self.status_label.hide()
self.webview.show()
q_html, a_html = data.get("q", ""), data.get("a", "")
full_html = f"<html><body><div id='qa'>{q_html}</div>"
if state == "answer": full_html += f"<hr id=answer>{a_html}"
full_html += "</body></html>"
if full_html != self.last_html_content:
self.last_html_content = full_html
self.webview.setHtml(full_html)
def show_waiting_message(self):
if QWebEngineView: self.webview.hide()
self.status_label.setText(f"Aguardando {self.target_nick} iniciar a revisão...")
self.status_label.show()
def show_stream_ended_message(self):
if QWebEngineView: self.webview.hide()
self.status_label.setText(f"{self.target_nick} encerrou a transmissão.")
self.status_label.show()
self.is_listening = False
def closeEvent(self, event):
self.is_listening = False
self.firebase.put_data(f"view_requests/{self.target_uid}/{self.request_id}/status", "closed", self.id_token)
super().closeEvent(event)
class SimpleChatWidget(QWidget):
new_message_received = pyqtSignal()
def __init__(self, parent=None):
super().__init__(parent)
self.displayed_message_ids = set()
self.setup_ui()
def setup_ui(self):
layout = QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
self.chat_display = QTextEdit()
self.chat_display.setReadOnly(True)
input_layout = QHBoxLayout()
self.chat_input, self.send_button = QLineEdit(), QPushButton("Enviar")
self.chat_input.setPlaceholderText("Digite sua mensagem...")
input_layout.addWidget(self.chat_input)
input_layout.addWidget(self.send_button)
layout.addWidget(self.chat_display)
layout.addLayout(input_layout)
self.send_button.clicked.connect(self.handle_send_message)
self.chat_input.returnPressed.connect(self.handle_send_message)
def handle_send_message(self):
if not background_updater.is_connected: return
text = self.chat_input.text().strip()
if not text: return
self.chat_input.clear()
message_data = {"uid": background_updater.uid, "nick": background_updater.nickname, "text": text, "timestamp": int(time.time() * 1000)}
threading.Thread(target=background_updater.firebase.post_data, args=("messages", message_data, background_updater.id_token)).start()
def update_messages(self, messages):
sorted_messages = sorted(messages.items(), key=lambda item: item[1].get('timestamp', 0))
new_message_arrived = False
for msg_id, msg_data in sorted_messages:
if msg_id not in self.displayed_message_ids:
if msg_data.get("quiz_event"): continue
if msg_data.get('uid') != background_updater.uid: new_message_arrived = True
self._append_message_to_chat(msg_data.get('nick', '...'), msg_data.get('text', ''), msg_data.get('uid') == background_updater.uid)
self.displayed_message_ids.add(msg_id)
if new_message_arrived: self.new_message_received.emit()
self.chat_display.verticalScrollBar().setValue(self.chat_display.verticalScrollBar().maximum())
def _append_message_to_chat(self, nick, text, is_own_message):
color = "green" if is_own_message else "blue"
self.chat_display.append(f"<b><font color='{color}'>{nick}:</font></b> {text}")
def clear_chat(self):
self.chat_display.clear()
self.displayed_message_ids.clear()
class NumericTableWidgetItem(QTableWidgetItem):
def __lt__(self, other):
try: return float(self.data(Qt.ItemDataRole.UserRole)) < float(other.data(Qt.ItemDataRole.UserRole))
except (ValueError, TypeError): return super().__lt__(other)
class ChatWindow(QDialog):
connection_succeeded = pyqtSignal(str, str, str, str, str, str)
connection_failed = pyqtSignal(str)
user_update_received = pyqtSignal(dict, dict)
message_update_received = pyqtSignal(dict)
show_tooltip_signal = pyqtSignal(str)
deck_invite_received = pyqtSignal(str, dict)
sent_deck_invite_accepted = pyqtSignal(str, str, dict)
deck_ready_for_download = pyqtSignal(str, dict)
view_request_received = pyqtSignal(str, dict)
view_request_accepted = pyqtSignal(str, str, dict)
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowFlags(self.windowFlags() | Qt.WindowType.WindowMinimizeButtonHint | Qt.WindowType.WindowMaximizeButtonHint)
self.RELAY_SERVER_URL = "https://eros18123.pythonanywhere.com"
self.pending_deck_transfers, self.is_being_watched, self.active_live_view_window = {}, False, None
self.pending_view_requests, self.viewers, self.blocked_users = {}, {}, set()
self.last_master_list, self.last_online_stats = {}, {}
self.addon_path = os.path.dirname(os.path.abspath(__file__))
self.settings_file = os.path.join(self.addon_path, 'ankichat_settings.json')
self.auth_manager = AuthManager(self)
self.setup_ui()
self._load_settings()
self.connection_succeeded.connect(self.on_connection_success)
self.connection_failed.connect(self.on_connection_failure)
self.user_update_received.connect(self.update_user_table)
self.message_update_received.connect(self.simple_chat_widget.update_messages)
self.show_tooltip_signal.connect(tooltip)
self.deck_invite_received.connect(self.handle_deck_invite)
self.sent_deck_invite_accepted.connect(self.handle_accepted_invite)
self.deck_ready_for_download.connect(self.handle_ready_for_download)
self.view_request_received.connect(self.handle_view_request)
self.view_request_accepted.connect(self.handle_accepted_view_request)
self.simple_chat_widget.new_message_received.connect(self.handle_new_chat_message_notification)
if background_updater.is_connected: self.on_connection_success()
else:
self.login_widget.show()
self.status_label.hide()
self.main_chat_widget.hide()
def setup_ui(self):
self.setWindowTitle("AnkiChat")
self.resize(1100, 600)
main_layout = QVBoxLayout(self)
self.login_widget = QWidget()
login_layout = QFormLayout(self.login_widget)
self.email_entry, self.password_entry = QLineEdit(), QLineEdit()
self.password_entry.setEchoMode(QLineEdit.EchoMode.Password)
login_layout.addRow("E-mail:", self.email_entry)
login_layout.addRow("Senha:", self.password_entry)
buttons_widget = QWidget()
buttons_layout = QHBoxLayout(buttons_widget)
self.login_button, self.register_button = QPushButton("Login"), QPushButton("Registrar")
buttons_layout.addWidget(self.login_button)
buttons_layout.addWidget(self.register_button)
login_layout.addRow(buttons_widget)
self.status_label = QLabel("Logando...")
self.status_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.main_chat_widget = QWidget()
chat_layout = QVBoxLayout(self.main_chat_widget)
top_bar_layout = QHBoxLayout()
self.logout_button = QPushButton("Logout")
self.viewers_label = QLabel("Assistindo você: Ninguém")
self.viewers_label.hide()
top_bar_layout.addWidget(self.logout_button)
top_bar_layout.addStretch()
top_bar_layout.addWidget(self.viewers_label)
chat_layout.addLayout(top_bar_layout)
self.tabs = QTabWidget()
users_widget = QWidget()
users_layout = QVBoxLayout(users_widget)
users_layout.setContentsMargins(0,0,0,0)
filter_layout = QHBoxLayout()
self.show_active_today_checkbox = QCheckBox("Mostrar somente quem estudou hoje")
self.show_online_only_checkbox = QCheckBox("Mostrar somente online")
filter_layout.addWidget(self.show_active_today_checkbox)
filter_layout.addWidget(self.show_online_only_checkbox)
filter_layout.addStretch()
self.user_table = QTableWidget()
self.user_table.setColumnCount(13)
self.user_table.setHorizontalHeaderLabels([
"Usuário", "Streak 🔥", "Total Rev.", "Novos", "Aprend.", "A Revisar",
"De Novo", "Difícil", "Bom", "Fácil", "Retenção %",
"Tempo Total", "Tempo Médio"
])
self.user_table.setContextMenuPolicy(Qt.ContextMenuPolicy.CustomContextMenu)
self.user_table.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers)
self.user_table.setSelectionBehavior(QTableWidget.SelectionBehavior.SelectRows)
self.user_table.verticalHeader().setVisible(False)
self.user_table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Stretch)
self.user_table.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeMode.Interactive)
self.user_table.setSortingEnabled(True)
users_layout.addLayout(filter_layout)
users_layout.addWidget(self.user_table)
self.simple_chat_widget = SimpleChatWidget(self)
self.tabs.addTab(users_widget, "Usuários")
self.tabs.addTab(self.simple_chat_widget, "Chat Geral")
self.chat_tab_index = 1
self.tabs.currentChanged.connect(self.on_tab_switched)
chat_layout.addWidget(self.tabs)
main_layout.addWidget(self.login_widget)
main_layout.addWidget(self.status_label)
main_layout.addWidget(self.main_chat_widget)
self.login_button.clicked.connect(self.handle_login)
self.register_button.clicked.connect(self.auth_manager.show_register_dialog)
self.logout_button.clicked.connect(self.logout)
self.user_table.customContextMenuRequested.connect(self.show_user_context_menu)
self.show_active_today_checkbox.stateChanged.connect(self._filters_changed)
self.show_online_only_checkbox.stateChanged.connect(self._filters_changed)
def handle_login(self):
email = self.email_entry.text().strip()
password = self.password_entry.text()
if not email or not password: return
self.login_widget.hide()
self.status_label.setText("Logando...")
self.status_label.show()
self.auth_manager.start_login_thread(email, password)
def _load_settings(self):
try:
if os.path.exists(self.settings_file):
with open(self.settings_file, 'r', encoding='utf-8') as f:
settings = json.load(f)
self.show_active_today_checkbox.setChecked(settings.get('show_active_today', False))
self.show_online_only_checkbox.setChecked(settings.get('show_online_only', False))
except Exception as e: print(f"AnkiChat: Erro ao carregar configurações: {e}")
def _save_settings(self):
settings = {'show_active_today': self.show_active_today_checkbox.isChecked(), 'show_online_only': self.show_online_only_checkbox.isChecked()}
try:
with open(self.settings_file, 'w', encoding='utf-8') as f: json.dump(settings, f, indent=4)
except Exception as e: print(f"AnkiChat: Erro ao salvar configurações: {e}")
def _filters_changed(self):
self._save_settings()
if self.last_master_list: self.update_user_table(self.last_master_list, self.last_online_stats)
def on_connection_success(self, *args):
self.login_widget.hide()
self.status_label.hide()
self.main_chat_widget.show()
self.setWindowTitle(f"AnkiChat - {background_updater.nickname}")
self.load_blocked_users()
self.simple_chat_widget.clear_chat()
if not hasattr(self, '_polling_thread') or not self._polling_thread.is_alive():
self._polling_thread = threading.Thread(target=self.poll_for_updates, daemon=True)
self._polling_thread.start()
def poll_for_updates(self):
while background_updater.is_connected:
try:
uid, token = background_updater.uid, background_updater.id_token
master_user_list, online_stats = background_updater.firebase.get_data("users", token) or {}, background_updater.firebase.get_data("online", token) or {}
self.user_update_received.emit(master_user_list, online_stats)
messages = background_updater.firebase.get_data("messages", token) or {}
self.message_update_received.emit(messages)
self._poll_deck_invites(uid, token)
self._poll_view_requests(uid, token)
self._check_viewer_status(uid, token)
except Exception as e: print(f"AnkiChat poll error: {e}")
time.sleep(2.5)
def update_user_table(self, master_user_list, online_stats):
self.last_master_list, self.last_online_stats = master_user_list, online_stats
show_active_today, show_online_only = self.show_active_today_checkbox.isChecked(), self.show_online_only_checkbox.isChecked()
self.user_table.setSortingEnabled(False)
self.user_table.setRowCount(0)
today_str, yesterday_str = datetime.now().strftime('%Y-%m-%d'), (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
for uid, user_data in master_user_list.items():
nick = user_data.get('nickname')
if not nick: continue
stats_data = online_stats.get(uid, {})
state, reviews_today = stats_data.get('state', 'offline'), stats_data.get('reviews_today', 0)
if show_online_only and state != 'online': continue
if show_active_today and reviews_today == 0: continue
row = self.user_table.rowCount()
self.user_table.insertRow(row)
nick_item = QTableWidgetItem(nick)
nick_item.setData(Qt.ItemDataRole.UserRole, uid)
if uid == background_updater.uid: nick_item.setForeground(QColor("green"))
elif state == 'offline': nick_item.setForeground(QColor("gray"))
if uid in self.blocked_users: nick_item.setForeground(QColor("red"))
self.user_table.setItem(row, 0, nick_item)
last_review_date, streak = user_data.get("last_review_date"), user_data.get("streak", 0)
display_streak = streak if last_review_date in [today_str, yesterday_str] else 0
new_done, learn_done, review_done = stats_data.get('new_done', 0), stats_data.get('learn_done', 0), stats_data.get('review_done', 0)
total_time_ms = stats_data.get('total_time_ms', 0)
avg_time_s = self._format_avg_time(total_time_ms, reviews_today)
again_count, hard_count, good_count, easy_count = stats_data.get('again_count', 0), stats_data.get('hard_count', 0), stats_data.get('good_count', 0), stats_data.get('easy_count', 0)
retention = stats_data.get('retention', 0)
def create_numeric_item(value, display_text):
item = NumericTableWidgetItem(display_text)
item.setData(Qt.ItemDataRole.UserRole, value)
return item
self.user_table.setItem(row, 1, create_numeric_item(display_streak, str(display_streak)))
self.user_table.setItem(row, 2, create_numeric_item(reviews_today, str(reviews_today)))
self.user_table.setItem(row, 3, create_numeric_item(new_done, str(new_done)))
self.user_table.setItem(row, 4, create_numeric_item(learn_done, str(learn_done)))
self.user_table.setItem(row, 5, create_numeric_item(review_done, str(review_done)))
self.user_table.setItem(row, 6, create_numeric_item(again_count, str(again_count)))
self.user_table.setItem(row, 7, create_numeric_item(hard_count, str(hard_count)))
self.user_table.setItem(row, 8, create_numeric_item(good_count, str(good_count)))
self.user_table.setItem(row, 9, create_numeric_item(easy_count, str(easy_count)))
retention_str = f"{retention * 100:.1f}%" if reviews_today > 0 else "-"
self.user_table.setItem(row, 10, create_numeric_item(retention, retention_str))
self.user_table.setItem(row, 11, create_numeric_item(total_time_ms, self._format_time(total_time_ms)))
self.user_table.setItem(row, 12, create_numeric_item(avg_time_s, f"{avg_time_s:.1f}s/card"))
self.user_table.setSortingEnabled(True)
def on_connection_failure(self, err_msg):
tooltip(err_msg)
self.status_label.hide()
self.login_widget.show()
def logout(self):
if background_updater.is_connected: background_updater.firebase.patch_data(f"online/{background_updater.uid}", {"state": "offline"}, background_updater.id_token)
background_updater.clear_state()
autologin_file = os.path.join(self.addon_path, 'autologin.json')
if os.path.exists(autologin_file): os.remove(autologin_file)
self.simple_chat_widget.clear_chat()
self.main_chat_widget.hide()
self.login_widget.show()
self.setWindowTitle("AnkiChat")
def handle_new_chat_message_notification(self):
if self.tabs.currentIndex() != self.chat_tab_index: self.tabs.tabBar().setTabTextColor(self.chat_tab_index, QColor("orange"))
def on_tab_switched(self, index):
if index == self.chat_tab_index: self.tabs.tabBar().setTabTextColor(self.chat_tab_index, QColor())
def _format_time(self, ms):
if ms is None: return "-"
seconds = ms / 1000
if seconds < 60: return f"{seconds:.1f}s"
return f"{seconds / 60:.1f} min"
def _format_avg_time(self, total_ms, count):
if not count or not total_ms: return 0
return (total_ms / count) / 1000
def show_user_context_menu(self, pos):
item = self.user_table.itemAt(pos)
if not item: return
nick, uid = self.user_table.item(item.row(), 0).text(), self.user_table.item(item.row(), 0).data(Qt.ItemDataRole.UserRole)
menu = QMenu()
if nick == background_updater.nickname:
menu.addAction("Mudar Senha").triggered.connect(self.auth_manager.show_change_password_dialog)
menu.addAction("Desbloquear Usuário...").triggered.connect(self.unblock_user_dialog)
if self.is_being_watched: menu.addAction("Parar Transmissão").triggered.connect(self._stop_broadcast)
else:
menu.addAction("Enviar Deck").triggered.connect(lambda: self.send_deck_invite(nick))
menu.addAction("Assistir Revisão").triggered.connect(lambda: self.send_view_request(nick))
menu.addSeparator()
if uid in self.blocked_users: menu.addAction("Desbloquear Usuário").triggered.connect(lambda: self.unblock_user(uid, nick))
else: menu.addAction("Bloquear Usuário").triggered.connect(lambda: self.block_user(uid, nick))
menu.exec(self.user_table.mapToGlobal(pos))
def send_deck_invite(self, nick):
decks = sorted(mw.col.decks.all_names())
if not decks:
tooltip("Você não tem decks para enviar.")
return
deck_name, ok = QInputDialog.getItem(self, "Enviar Deck", f"Escolha o deck para enviar para {nick}:", decks, 0, False)
if ok and deck_name: threading.Thread(target=self._send_deck_invitation, args=(nick, deck_name), daemon=True).start()
def _send_deck_invitation(self, target_nick, deck_name):
target_uid = background_updater.firebase.get_data(f"nick_to_uid/{target_nick}", background_updater.id_token)
if not target_uid:
self.show_tooltip_signal.emit(f"Usuário {target_nick} não encontrado.")
return
if background_updater.firebase.get_data(f"users/{target_uid}/blocked/{background_updater.uid}", background_updater.id_token):
self.show_tooltip_signal.emit(f"Você não pode interagir com {target_nick}.")
return
invite_data = {"sender_uid": background_updater.uid, "sender_nick": background_updater.nickname, "deck_name": deck_name, "status": "pending"}
res = background_updater.firebase.post_data(f"deck_invites/{target_uid}", invite_data, background_updater.id_token, return_name=True)
if res:
invite_id = res['name']
background_updater.firebase.put_data(f"sent_invites/{background_updater.uid}/{invite_id}", {"recipient_uid": target_uid}, background_updater.id_token)
self.show_tooltip_signal.emit(f"Convite para o deck '{deck_name}' enviado para {target_nick}.")
def handle_deck_invite(self, invite_id, invite_data):
sender, deck = invite_data.get("sender_nick", "Alguém"), invite_data.get("deck_name", "um deck")
dialog = QDialog(self)
dialog.setWindowTitle("Convite de Deck")
layout = QVBoxLayout(dialog)
layout.addWidget(QLabel(f"<b>{sender}</b> quer te enviar o deck <b>'{deck}'</b>.<br>Aceitar?"))
buttons = QDialogButtonBox(QDialogButtonBox.StandardButton.Yes | QDialogButtonBox.StandardButton.No)
buttons.accepted.connect(dialog.accept)
buttons.rejected.connect(dialog.reject)
layout.addWidget(buttons)
if dialog.exec():
background_updater.firebase.patch_data(f"deck_invites/{background_updater.uid}/{invite_id}", {"status": "accepted"}, background_updater.id_token)
self.show_tooltip_signal.emit("Convite aceito. Aguardando transferência...")
else:
background_updater.firebase.delete_data(f"deck_invites/{background_updater.uid}/{invite_id}", background_updater.id_token)
self.show_tooltip_signal.emit("Convite recusado.")
def handle_accepted_invite(self, recipient_uid, invite_id, invite_data):
self.show_tooltip_signal.emit(f"{invite_data.get('sender_nick')} aceitou. Iniciando envio...")
threading.Thread(target=self._process_deck_upload_p2p, args=(recipient_uid, invite_id, invite_data), daemon=True).start()
def _process_deck_upload_p2p(self, recipient_uid, invite_id, invite_data):
try:
transfer_id = str(uuid.uuid4())
background_updater.firebase.patch_data(f"deck_invites/{recipient_uid}/{invite_id}", {"transfer_id": transfer_id, "status": "transferring"}, background_updater.id_token)
deck_id = mw.col.decks.id(invite_data['deck_name'])
with tempfile.NamedTemporaryFile(suffix=".apkg", delete=False) as tmp: export_path = tmp.name
from anki.exporting import AnkiPackageExporter
exporter = AnkiPackageExporter(mw.col)
exporter.did = deck_id
exporter.exportInto(export_path)
with open(export_path, 'rb') as f:
while True:
chunk = f.read(1024 * 256)
if not chunk: break
requests.post(f"{self.RELAY_SERVER_URL}/upload/{transfer_id}", data=chunk, timeout=20)
requests.post(f"{self.RELAY_SERVER_URL}/end/{transfer_id}", timeout=10)
self.show_tooltip_signal.emit("Deck enviado com sucesso.")
except Exception as e: self.show_tooltip_signal.emit(f"Erro no envio: {e}")
finally:
if 'export_path' in locals() and os.path.exists(export_path): os.remove(export_path)
def handle_ready_for_download(self, invite_id, invite_data):
self.show_tooltip_signal.emit("Deck pronto para baixar. Iniciando download...")
threading.Thread(target=self._process_deck_download_p2p, args=(invite_id, invite_data), daemon=True).start()
def _process_deck_download_p2p(self, invite_id, invite_data):
try:
transfer_id = invite_data['transfer_id']
with tempfile.NamedTemporaryFile(suffix=".apkg", delete=False) as tmp: save_path = tmp.name
with open(save_path, "wb") as f:
while True:
response = requests.get(f"{self.RELAY_SERVER_URL}/download/{transfer_id}", timeout=20)
if response.status_code == 200:
if response.content == b'--EOF--': break
f.write(response.content)
elif response.status_code == 204: time.sleep(1)
else: raise Exception(f"Erro no servidor: {response.status_code}")
from aqt.importing import importFile
mw.taskman.run_on_main(lambda: importFile(mw, save_path))
self.show_tooltip_signal.emit(f"Deck '{invite_data['deck_name']}' importado!")
background_updater.firebase.delete_data(f"deck_invites/{background_updater.uid}/{invite_id}", background_updater.id_token)
except Exception as e: self.show_tooltip_signal.emit(f"Erro no download: {e}")
def send_view_request(self, nick): threading.Thread(target=self._send_view_request_thread, args=(nick,), daemon=True).start()
def _send_view_request_thread(self, target_nick):
target_uid = background_updater.firebase.get_data(f"nick_to_uid/{target_nick}", background_updater.id_token)
if not target_uid:
self.show_tooltip_signal.emit(f"Usuário {target_nick} não encontrado.")
return
if background_updater.firebase.get_data(f"users/{target_uid}/blocked/{background_updater.uid}", background_updater.id_token):
self.show_tooltip_signal.emit(f"Você não pode interagir com {target_nick}.")
return
req_data = {"requester_uid": background_updater.uid, "requester_nick": background_updater.nickname, "status": "pending"}
res = background_updater.firebase.post_data(f"view_requests/{target_uid}", req_data, background_updater.id_token, return_name=True)
if res:
req_id = res['name']
background_updater.firebase.put_data(f"sent_view_requests/{background_updater.uid}/{req_id}", {"target_uid": target_uid}, background_updater.id_token)
self.show_tooltip_signal.emit(f"Pedido de visualização enviado para {target_nick}.")
def handle_view_request(self, request_id, request_data):
requester_nick, requester_uid = request_data.get("requester_nick", "Alguém"), request_data.get("requester_uid")
dialog = QDialog(self)
dialog.setWindowTitle("Pedido de Visualização")
layout = QVBoxLayout(dialog)
layout.addWidget(QLabel(f"<b>{requester_nick}</b> quer assistir à sua revisão.<br>Aceitar?"))
buttons = QDialogButtonBox(QDialogButtonBox.StandardButton.Yes | QDialogButtonBox.StandardButton.No)
buttons.accepted.connect(dialog.accept)
buttons.rejected.connect(dialog.reject)
layout.addWidget(buttons)
if dialog.exec():
background_updater.firebase.patch_data(f"view_requests/{background_updater.uid}/{request_id}", {"status": "accepted"}, background_updater.id_token)
self.viewers[requester_uid] = requester_nick
self._update_viewers_display()
self._start_broadcast()
else:
background_updater.firebase.delete_data(f"view_requests/{background_updater.uid}/{request_id}", background_updater.id_token)
self.show_tooltip_signal.emit("Pedido de visualização recusado.")
def handle_accepted_view_request(self, target_uid, request_id, request_data):
target_nick = request_data.get("requester_nick", "Usuário")
if self.active_live_view_window and self.active_live_view_window.isVisible(): return
self.active_live_view_window = LiveViewWindow(self, target_uid, target_nick, request_id)
self.active_live_view_window.show()
def _start_broadcast(self):
if self.is_being_watched: return
self.is_being_watched = True
self.viewers_label.show()
gui_hooks.reviewer_did_show_question.append(self._broadcast_card_state)
gui_hooks.reviewer_did_show_answer.append(self._broadcast_card_state)
self.show_tooltip_signal.emit("Você está transmitindo sua revisão!")
def _stop_broadcast(self):
if not self.is_being_watched: return
# --- CORREÇÃO APLICADA AQUI ---
self.is_being_watched = False
self.viewers_label.hide()
self.viewers = {}
try:
gui_hooks.reviewer_did_show_question.remove(self._broadcast_card_state)
gui_hooks.reviewer_did_show_answer.remove(self._broadcast_card_state)
except ValueError: pass
background_updater.firebase.delete_data(f"live_view/{background_updater.uid}", background_updater.id_token)
self.show_tooltip_signal.emit("Transmissão encerrada.")
def _prepare_html_for_viewing(self, html: str, card, question_side: bool) -> str:
if not mw or not mw.col or not mw.col.media: return html
processed_html, media_dir = html, mw.col.media.dir()
for img_filename in re.findall(r'<img src="([^"]+)"[^>]*>', processed_html):
img_path = os.path.join(media_dir, img_filename)
if os.path.exists(img_path):
try:
mime_type, _ = mimetypes.guess_type(img_path)
if not mime_type: mime_type = "application/octet-stream"
with open(img_path, "rb") as image_file: encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
data_uri = f"data:{mime_type};base64,{encoded_string}"
processed_html = processed_html.replace(f'src="{img_filename}"', f'src="{data_uri}"', 1)
except Exception as e: print(f"AnkiChat: Não foi possível embutir a imagem {img_filename}: {e}")
av_tags = card.question_av_tags() if question_side else card.answer_av_tags()
for i, av_tag in enumerate(av_tags):
av_path = os.path.join(media_dir, av_tag.filename)
if os.path.exists(av_path):
try:
mime_type, _ = mimetypes.guess_type(av_path)
if not mime_type: mime_type = "application/octet-stream"
with open(av_path, "rb") as audio_file: encoded_string = base64.b64encode(audio_file.read()).decode('utf-8')
data_uri, anki_play_tag = f"data:{mime_type};base64,{encoded_string}", f"[anki:play:{'q' if question_side else 'a'}:{i}]"
audio_html_tag = f'<audio controls src="{data_uri}"></audio>'
processed_html = processed_html.replace(anki_play_tag, audio_html_tag)
except Exception as e: print(f"AnkiChat: Não foi possível embutir o áudio {av_tag.filename}: {e}")
return processed_html
def _broadcast_card_state(self, card):
if not self.is_being_watched: return
is_answer = mw.state == "review" and mw.reviewer.state == "answer"
q_html = self._prepare_html_for_viewing(card.q(), card, question_side=True)
a_html = self._prepare_html_for_viewing(card.a(), card, question_side=False) if is_answer else ""
data = {"q": q_html, "a": a_html, "state": "answer" if is_answer else "question"}
background_updater.firebase.put_data(f"live_view/{background_updater.uid}", data, background_updater.id_token)
def load_blocked_users(self):
blocked = background_updater.firebase.get_data(f"users/{background_updater.uid}/blocked", background_updater.id_token)
if blocked: self.blocked_users = set(blocked.keys())
def block_user(self, target_uid, target_nick):
background_updater.firebase.put_data(f"users/{background_updater.uid}/blocked/{target_uid}", True, background_updater.id_token)
self.blocked_users.add(target_uid)
self.show_tooltip_signal.emit(f"{target_nick} foi bloqueado.")
self._filters_changed()
def unblock_user(self, target_uid, target_nick):
background_updater.firebase.delete_data(f"users/{background_updater.uid}/blocked/{target_uid}", background_updater.id_token)
self.blocked_users.discard(target_uid)
self.show_tooltip_signal.emit(f"{target_nick} foi desbloqueado.")
self._filters_changed()
def unblock_user_dialog(self):
nick, ok = QInputDialog.getText(self, "Desbloquear Usuário", "Digite o nick do usuário a ser desbloqueado:")
if ok and nick:
target_uid = background_updater.firebase.get_data(f"nick_to_uid/{nick}", background_updater.id_token)
if target_uid and target_uid in self.blocked_users: self.unblock_user(target_uid, nick)
else: self.show_tooltip_signal.emit(f"Usuário '{nick}' não encontrado ou não está bloqueado.")
def _update_viewers_display(self):
if not self.viewers: self.viewers_label.setText("Assistindo você: Ninguém")
else: self.viewers_label.setText(f"Assistindo você: {', '.join(self.viewers.values())}")
def _check_viewer_status(self, uid, token):
if not self.is_being_watched: return
viewers_to_remove = []
for viewer_uid in self.viewers:
requests = background_updater.firebase.get_data(f"view_requests/{uid}", token, params=f'orderBy="requester_uid"&equalTo="{viewer_uid}"')
if not requests or not any(req_data.get("status") == "accepted" for req_data in requests.values()):
viewers_to_remove.append(viewer_uid)
if viewers_to_remove:
for viewer_uid in viewers_to_remove: del self.viewers[viewer_uid]
self._update_viewers_display()
def closeEvent(self, event):
self.hide()
event.ignore()
window_instance = None
def launch_window():
global window_instance
if window_instance is None: window_instance = ChatWindow(mw)
window_instance.show()
window_instance.activateWindow()
def on_review_for_background_update(reviewer, card, ease):
if background_updater.is_connected: background_updater.trigger_immediate_update()
def attempt_background_autologin():
addon_path = os.path.dirname(os.path.abspath(__file__))
autologin_file = os.path.join(addon_path, 'autologin.json')
if not os.path.exists(autologin_file): return
try:
with open(autologin_file, 'r') as f: data = json.load(f)
def _bg_login():
res, err = background_updater.firebase.refresh_token(data['refreshToken'])
if res:
background_updater.update_state(data['email'], data['uid'], res['id_token'], res['refresh_token'], res.get('expires_in', '3600'))
print(f"AnkiChat: Login automático em segundo plano bem-sucedido para {data['email']}")
else: print(f"AnkiChat: Falha no login automático em segundo plano: {err}")
threading.Thread(target=_bg_login, daemon=True).start()
except Exception as e: print(f"AnkiChat: Erro ao ler arquivo de autologin: {e}")
def init_addon():
FIREBASE_URL = "https://ankichatapp-default-rtdb.firebaseio.com/"
API_KEY = "AIzaSyDNSI7R6GX9B5PCGIwvAoKM_5uen7BU_C0"
firebase_api = FirebaseAPI(FIREBASE_URL, API_KEY)
background_updater.initialize(firebase_api)
action = QAction("AnkiChat", mw)
action.triggered.connect(launch_window)
mw.form.menubar.addAction(action)
attempt_background_autologin()
gui_hooks.reviewer_did_answer_card.append(on_review_for_background_update)
def cleanup_on_exit():
if background_updater.is_connected:
background_updater.firebase.patch_data(f"online/{background_updater.uid}", {"state": "offline"}, background_updater.id_token)
gui_hooks.main_window_did_init.append(init_addon)
gui_hooks.profile_will_close.append(cleanup_on_exit)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment