Last active
January 29, 2026 03:54
-
-
Save Glidias/0e1f0e98eee3e70ec1be2893a0f50407 to your computer and use it in GitHub Desktop.
Krita Script Points Editor to Sec Segmentation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from krita import Krita | |
| from PyQt5.QtWidgets import ( | |
| QDialog, QVBoxLayout, QHBoxLayout, QPushButton, | |
| QLabel, QMessageBox, QRadioButton, QFileDialog, QScrollArea, QApplication | |
| ) | |
| from PyQt5.QtGui import QPixmap, QPainter, QPen, QColor, QFont, QImage | |
| from PyQt5.QtCore import Qt, QPoint, QEvent | |
| import json | |
| import os | |
| class PointsCanvas(QLabel): | |
| def __init__(self, parent=None): | |
| super().__init__(parent) | |
| self.points = [] | |
| self.dragging_index = None | |
| self.setAlignment(Qt.AlignTop | Qt.AlignLeft) | |
| self.setStyleSheet("background-color: #111; border: 1px solid #444;") | |
| self.setMouseTracking(True) | |
| self.dialog_ref = parent | |
| self.zoom_factor = 1.0 | |
| self.original_pixmap = None | |
| self.grabGesture(Qt.PinchGesture) # Enable pinch gesture for macOS | |
| def set_document_and_thumbnail_size(self, doc_width, doc_height, thumb_width, thumb_height): | |
| self.doc_width = doc_width | |
| self.doc_height = doc_height | |
| self.thumb_width = thumb_width | |
| self.thumb_height = thumb_height | |
| self.base_scale_x = thumb_width / doc_width if doc_width > 0 else 1.0 | |
| self.base_scale_y = thumb_height / doc_height if doc_height > 0 else 1.0 | |
| def set_background_pixmap(self, pixmap): | |
| self.original_pixmap = pixmap | |
| self.apply_zoom() | |
| def apply_zoom(self): | |
| if self.original_pixmap: | |
| scaled = self.original_pixmap.scaled( | |
| int(self.original_pixmap.width() * self.zoom_factor), | |
| int(self.original_pixmap.height() * self.zoom_factor), | |
| Qt.KeepAspectRatio, | |
| Qt.SmoothTransformation | |
| ) | |
| self.setPixmap(scaled) | |
| self.resize(scaled.size()) | |
| # mousewheel event | |
| def wheelEvent(self, event): | |
| # check if shift key modifier is pressed | |
| if event.modifiers() & Qt.ShiftModifier: | |
| delta = event.angleDelta().y() | |
| if delta > 0: | |
| new_zoom = self.zoom_factor * 1.1 | |
| else: | |
| new_zoom = self.zoom_factor / 1.1 | |
| if 0.1 <= new_zoom <= 10.0: | |
| self.zoom_factor = new_zoom | |
| self.apply_zoom() | |
| event.accept() | |
| def screen_to_doc(self, screen_x, screen_y): | |
| current_scale_x = self.base_scale_x * self.zoom_factor | |
| current_scale_y = self.base_scale_y * self.zoom_factor | |
| doc_x = int(screen_x / current_scale_x) if current_scale_x > 0 else screen_x | |
| doc_y = int(screen_y / current_scale_y) if current_scale_y > 0 else screen_y | |
| doc_x = max(0, min(doc_x, self.doc_width - 1)) | |
| doc_y = max(0, min(doc_y, self.doc_height - 1)) | |
| return doc_x, doc_y | |
| def doc_to_screen(self, doc_x, doc_y): | |
| current_scale_x = self.base_scale_x * self.zoom_factor | |
| current_scale_y = self.base_scale_y * self.zoom_factor | |
| screen_x = int(doc_x * current_scale_x) | |
| screen_y = int(doc_y * current_scale_y) | |
| return screen_x, screen_y | |
| def add_point(self, screen_x, screen_y, point_type): | |
| doc_x, doc_y = self.screen_to_doc(screen_x, screen_y) | |
| same_type = [p for p in self.points if p['type'] == point_type] | |
| index = len(same_type) | |
| self.points.append({'x': doc_x, 'y': doc_y, 'type': point_type, 'index': index}) | |
| self.update() | |
| def clear_points(self): | |
| self.points = [] | |
| self.update() | |
| def get_points_for_export(self): | |
| pos = [{'x': p['x'], 'y': p['y']} for p in self.points if p['type'] == 'pos'] | |
| neg = [{'x': p['x'], 'y': p['y']} for p in self.points if p['type'] == 'neg'] | |
| return pos, neg | |
| def load_points_from_export(self, pos_points, neg_points): | |
| self.points = [] | |
| for p in pos_points: | |
| self.points.append({'x': p['x'], 'y': p['y'], 'type': 'pos', 'index': len([x for x in self.points if x['type'] == 'pos'])}) | |
| for p in neg_points: | |
| self.points.append({'x': p['x'], 'y': p['y'], 'type': 'neg', 'index': len([x for x in self.points if x['type'] == 'neg'])}) | |
| self._reindex() | |
| self.update() | |
| def _reindex(self): | |
| pos_idx = 0 | |
| neg_idx = 0 | |
| for p in self.points: | |
| if p['type'] == 'pos': | |
| p['index'] = pos_idx | |
| pos_idx += 1 | |
| else: | |
| p['index'] = neg_idx | |
| neg_idx += 1 | |
| def get_point_at(self, screen_x, screen_y, tol=12): | |
| doc_x, doc_y = self.screen_to_doc(screen_x, screen_y) | |
| current_scale_x = self.base_scale_x * self.zoom_factor | |
| doc_tol = tol / current_scale_x if current_scale_x > 0 else tol | |
| for i, p in enumerate(self.points): | |
| dx = p['x'] - doc_x | |
| dy = p['y'] - doc_y | |
| if dx*dx + dy*dy <= doc_tol*doc_tol: | |
| return i | |
| return None | |
| def delete_point_and_pair(self, idx): | |
| if idx < 0 or idx >= len(self.points): | |
| return | |
| target = self.points[idx] | |
| t_idx = target['index'] | |
| t_type = target['type'] | |
| pair_i = None | |
| for i, p in enumerate(self.points): | |
| if p['index'] == t_idx and p['type'] != t_type: | |
| pair_i = i | |
| break | |
| to_del = sorted([idx, pair_i] if pair_i is not None else [idx], reverse=True) | |
| for i in to_del: | |
| del self.points[i] | |
| self._reindex() | |
| self.update() | |
| def mousePressEvent(self, event): | |
| if event.button() != Qt.LeftButton: | |
| return | |
| screen_x, screen_y = event.x(), event.y() | |
| pt_idx = self.get_point_at(screen_x, screen_y) | |
| modifiers = event.modifiers() | |
| if modifiers & Qt.ControlModifier: | |
| if pt_idx is not None: | |
| self.delete_point_and_pair(pt_idx) | |
| # OPPOSITE: Shift only | |
| elif modifiers == Qt.ShiftModifier: | |
| dialog = self.dialog_ref | |
| if dialog and hasattr(dialog, 'current_mode'): | |
| opposite = 'neg' if dialog.current_mode == 'pos' else 'pos' | |
| if pt_idx is None: | |
| self.add_point(screen_x, screen_y, opposite) | |
| # REGULAR: No modifiers or other combinations | |
| else: | |
| if pt_idx is not None: | |
| self.dragging_index = pt_idx | |
| else: | |
| dialog = self.dialog_ref | |
| if dialog and hasattr(dialog, 'current_mode'): | |
| self.add_point(screen_x, screen_y, dialog.current_mode) | |
| self.update() | |
| def mouseMoveEvent(self, event): | |
| if self.dragging_index is not None and self.dragging_index < len(self.points): | |
| screen_x, screen_y = event.x(), event.y() | |
| doc_x, doc_y = self.screen_to_doc(screen_x, screen_y) | |
| self.points[self.dragging_index]['x'] = doc_x | |
| self.points[self.dragging_index]['y'] = doc_y | |
| self.update() | |
| def mouseReleaseEvent(self, event): | |
| if event.button() == Qt.LeftButton: | |
| self.dragging_index = None | |
| def event(self, event): | |
| if event.type() == QEvent.Gesture: | |
| return self.gestureEvent(event) | |
| return super().event(event) | |
| def gestureEvent(self, event): | |
| pinch = event.gesture(Qt.PinchGesture) | |
| if pinch: | |
| scale_factor = pinch.scaleFactor() | |
| if abs(scale_factor - 1.0) > 0.01: | |
| new_zoom = self.zoom_factor * scale_factor | |
| if 0.1 <= new_zoom <= 10.0: | |
| self.zoom_factor = new_zoom | |
| self.apply_zoom() | |
| return True | |
| return False | |
| def paintEvent(self, event): | |
| super().paintEvent(event) | |
| if not self.points: | |
| return | |
| painter = QPainter(self) | |
| painter.setRenderHint(QPainter.Antialiasing) | |
| font_size = max(8, int(10 * self.zoom_factor)) | |
| font = QFont("Arial", font_size, QFont.Bold) | |
| painter.setFont(font) | |
| for p in self.points: | |
| screen_x, screen_y = self.doc_to_screen(p['x'], p['y']) | |
| color = QColor(0, 255, 0) if p['type'] == 'pos' else QColor(255, 0, 0) | |
| pen_width = max(1, int(2 * self.zoom_factor)) | |
| pen = QPen(color, pen_width) | |
| painter.setPen(pen) | |
| radius = max(3, int(7 * self.zoom_factor)) | |
| painter.drawEllipse(QPoint(screen_x, screen_y), radius, radius) | |
| text = str(p['index']) | |
| text_x = screen_x + int(12 * self.zoom_factor) | |
| text_y = screen_y + int(5 * self.zoom_factor) | |
| # White outline | |
| painter.setPen(QColor(255, 255, 255)) | |
| offsets = [(-1,-1), (-1,0), (-1,1), (0,-1), (0,1), (1,-1), (1,0), (1,1)] | |
| for dx, dy in offsets: | |
| painter.drawText(text_x + dx, text_y + dy, text) | |
| # Colored fill | |
| fill_color = QColor(0, 80, 0) if p['type'] == 'pos' else QColor(80, 0, 0) | |
| painter.setPen(fill_color) | |
| painter.drawText(text_x, text_y, text) | |
| painter.end() | |
| class PointsEditorDialog(QDialog): | |
| def __init__(self, parent=None): | |
| super().__init__(parent) | |
| self.setWindowTitle("Points Editor") | |
| self.resize(800, 700) | |
| self.current_mode = 'pos' | |
| self.last_save_path = None | |
| self.result_points = None | |
| self.user_initiated_close = False | |
| self.setup_ui() | |
| self.load_background() | |
| if self.auto_load(): | |
| print("✅ Auto-loaded previously saved points.") | |
| def setup_ui(self): | |
| main_layout = QVBoxLayout() | |
| control_layout = QHBoxLayout() | |
| self.radio_pos = QRadioButton("Positive (Green)") | |
| self.radio_neg = QRadioButton("Negative (Red)") | |
| self.radio_pos.setChecked(True) | |
| self.radio_pos.toggled.connect(lambda: setattr(self, 'current_mode', 'pos')) | |
| self.radio_neg.toggled.connect(lambda: setattr(self, 'current_mode', 'neg')) | |
| save_btn = QPushButton("Save Points...") | |
| load_btn = QPushButton("Load Points...") | |
| clear_btn = QPushButton("Clear All") | |
| self.ok_btn = QPushButton("OK") | |
| self.close_btn = QPushButton("Close") | |
| save_btn.clicked.connect(self.save_points) | |
| load_btn.clicked.connect(self.load_points) | |
| clear_btn.clicked.connect(lambda: self.canvas.clear_points()) | |
| self.ok_btn.clicked.connect(self.on_ok_clicked) | |
| self.close_btn.clicked.connect(self.on_close_clicked) | |
| control_layout.addWidget(self.radio_pos) | |
| control_layout.addWidget(self.radio_neg) | |
| control_layout.addWidget(save_btn) | |
| control_layout.addWidget(load_btn) | |
| control_layout.addWidget(clear_btn) | |
| control_layout.addStretch() | |
| control_layout.addWidget(self.ok_btn) | |
| control_layout.addWidget(self.close_btn) | |
| main_layout.addLayout(control_layout) | |
| self.canvas = PointsCanvas() | |
| self.canvas.dialog_ref = self | |
| scroll_area = QScrollArea() | |
| scroll_area.setWidgetResizable(True) | |
| scroll_area.setWidget(self.canvas) | |
| scroll_area.setStyleSheet("QScrollArea { border: none; }") | |
| main_layout.addWidget(scroll_area) | |
| self.setLayout(main_layout) | |
| def load_background(self): | |
| try: | |
| app = Krita.instance() | |
| doc = app.activeDocument() | |
| if not doc: | |
| QMessageBox.warning(self, "No Document", "Open an image first.") | |
| return | |
| orig_w, orig_h = doc.width(), doc.height() | |
| thumb_qimage = doc.projection(0, 0, orig_w, orig_h) | |
| if thumb_qimage.isNull(): | |
| raise Exception("Thumbnail is null") | |
| thumb_w = thumb_qimage.width() | |
| thumb_h = thumb_qimage.height() | |
| thumb_qimage = thumb_qimage.convertToFormat(QImage.Format_RGBA8888) | |
| pixmap = QPixmap.fromImage(thumb_qimage) | |
| self.canvas.set_background_pixmap(pixmap) | |
| self.canvas.set_document_and_thumbnail_size(orig_w, orig_h, thumb_w, thumb_h) | |
| except Exception as e: | |
| QMessageBox.critical(self, "Error", f"Background load failed:\n{str(e)}") | |
| def get_autosave_path(self): | |
| try: | |
| doc = Krita.instance().activeDocument() | |
| if not doc or not doc.fileName(): | |
| return os.path.expanduser("~/krita_points_autosave.json") | |
| doc_path = doc.fileName() | |
| return doc_path + ".points.json" | |
| except: | |
| return os.path.expanduser("~/krita_points_autosave.json") | |
| def auto_save(self): | |
| try: | |
| pos_pts, neg_pts = self.canvas.get_points_for_export() | |
| if not pos_pts and not neg_pts: | |
| return | |
| autosave_path = self.get_autosave_path() | |
| data = { | |
| "coordinates_positive": pos_pts, | |
| "coordinates_negative": neg_pts, | |
| "auto_saved": True, | |
| "document": Krita.instance().activeDocument().fileName() if Krita.instance().activeDocument() else "unsaved" | |
| } | |
| with open(autosave_path, 'w') as f: | |
| json.dump(data, f) | |
| except: | |
| pass | |
| def auto_load(self): | |
| try: | |
| autosave_path = self.get_autosave_path() | |
| if os.path.exists(autosave_path): | |
| with open(autosave_path, 'r') as f: | |
| data = json.load(f) | |
| if data.get("auto_saved"): | |
| pos_pts = data.get("coordinates_positive", []) | |
| neg_pts = data.get("coordinates_negative", []) | |
| self.canvas.load_points_from_export(pos_pts, neg_pts) | |
| return True | |
| except: | |
| pass | |
| return False | |
| def on_ok_clicked(self): | |
| self.user_initiated_close = True | |
| self.result_points = self.canvas.get_points_for_export() | |
| self.auto_save() | |
| self.accept() | |
| def on_close_clicked(self): | |
| self.user_initiated_close = True | |
| self.result_points = None | |
| self.auto_save() | |
| self.reject() | |
| def closeEvent(self, event): | |
| if not self.user_initiated_close: | |
| self.result_points = None | |
| super().closeEvent(event) | |
| def get_result(self): | |
| return self.result_points | |
| def save_points(self): | |
| pos_pts, neg_pts = self.canvas.get_points_for_export() | |
| data = { | |
| "coordinates_positive": pos_pts, | |
| "coordinates_negative": neg_pts | |
| } | |
| filename, _ = QFileDialog.getSaveFileName( | |
| self, "Save Points", self.last_save_path or "", "JSON Files (*.json)" | |
| ) | |
| if not filename: | |
| return | |
| if not filename.endswith(".json"): | |
| filename += ".json" | |
| try: | |
| with open(filename, 'w') as f: | |
| json.dump(data, f) | |
| self.last_save_path = filename | |
| self.auto_save() | |
| # QMessageBox.information(self, "Saved", f"Saved {len(pos_pts)} positive and {len(neg_pts)} negative points.") | |
| except Exception as e: | |
| QMessageBox.critical(self, "Error", f"Failed to save:\n{str(e)}") | |
| def load_points(self): | |
| filename, _ = QFileDialog.getOpenFileName( | |
| self, "Load Points", self.last_save_path or "", "JSON Files (*.json)" | |
| ) | |
| if not filename: | |
| return | |
| try: | |
| with open(filename, 'r') as f: | |
| data = json.load(f) | |
| pos_pts = data.get("coordinates_positive", []) | |
| neg_pts = data.get("coordinates_negative", []) | |
| self.canvas.load_points_from_export(pos_pts, neg_pts) | |
| self.last_save_path = filename | |
| self.auto_save() | |
| # QMessageBox.information(self, "Loaded", f"Loaded {len(pos_pts)} positive and {len(neg_pts)} negative points.") | |
| except Exception as e: | |
| QMessageBox.critical(self, "Error", f"Failed to load:\n{str(e)}") | |
| # === RUN IN SCRIPTER === | |
| app = Krita.instance() | |
| if not app.activeWindow(): | |
| QMessageBox.critical(None, "Error", "No Krita window!") | |
| else: | |
| dialog = PointsEditorDialog(app.activeWindow().qwindow()) | |
| result = dialog.exec_() | |
| points = dialog.get_result() | |
| if points is not None: | |
| pos_points, neg_points = points | |
| print(f"✅ OK clicked! Positive points: {pos_points}") | |
| print(f"✅ OK clicked! Negative points: {neg_points}") | |
| else: | |
| if result == QDialog.Rejected: | |
| print("CloseOperation: Close button clicked (auto-saved, no points returned)") | |
| else: | |
| print("CloseOperation: X button clicked (no auto-save, no points returned)") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import os | |
| import shutil | |
| import tempfile | |
| from krita import Krita, Selection | |
| from PyQt5.QtGui import QImage | |
| from PyQt5.QtCore import QTimer, QThread, pyqtSignal, QEventLoop, Qt, QByteArray | |
| from PyQt5.QtWidgets import QMessageBox, QWidget, QProgressDialog, QApplication | |
| from krita_pointseditor import PointsEditorDialog | |
| import json, urllib.request | |
| import uuid | |
| # assumes you have websocket-client lib manually installed in Krita application directory | |
| import websocket as websocket_client | |
| # Krita api doc | |
| # https://apidoc.krita.maou-maou.fr/ | |
| # This script uses PointsEditorDialog to collect positive and negative points from user interface in Krita to help run segmentation on current playback-range frames in Krita document | |
| # and immediately replacing the frames in the currently selected transparency mask layer with the segmentation results from a ComfyUI SecNode video segmentation's api node workflow. | |
| # For animation, the currently selected transparency mask layer keyframe is used to determine what frame to use, and any drawn mask pixels can also used to supplement the selection points with reference bounding box/mask to influence the results. | |
| # The current document Selection (if any), can also be used to determine either reference bounding box/mask instead of the drawn mask pixels | |
| # | |
| # See `# Save meta.json` comment line for customisable options (currently hardcoded) | |
| CLIENT_ID = str(uuid.uuid4()) | |
| def save_input_frames_with_progress(tmpdir, src_layer, active_layer, canvas_dims, start_frame, end_frame, current_frame): | |
| """Save input frames with progress dialog (main thread only)""" | |
| total_frames = end_frame - start_frame + 1 | |
| # Create progress dialog | |
| progress = QProgressDialog("Saving input frames before ComfyUI process...", "Cancel", 0, total_frames) | |
| progress.setWindowModality(Qt.WindowModal) | |
| progress.setWindowTitle("Input Processing") | |
| progress.setAutoClose(True) | |
| progress.setAutoReset(True) | |
| progress.show() | |
| try: | |
| # Save any optional mask images if needed | |
| QApplication.processEvents() | |
| pixel_data = active_layer.pixelDataAtTime(0, 0, canvas_dims[0], canvas_dims[1], current_frame) if is_animated_masklayer else active_layer.pixelData(0, 0, canvas_dims[0], canvas_dims[1]) | |
| qimg = QImage(pixel_data, canvas_dims[0], canvas_dims[1], QImage.Format_Grayscale8) | |
| # check for drawn mask for either potential bbox (use_bbox_or_mask=0|1) or mask (if use_bbox_or_mask=2) | |
| # check if image is fully white 255 (empty no-effect see-thru mask in Krita case) | |
| all_white = all(qimg.pixelColor(x, y).red() == 255 for x in range(qimg.width()) for y in range(qimg.height())) | |
| if not all_white: | |
| mask_path = os.path.join(tmpdir, "input_mask", "mask.png") | |
| if not qimg.save(mask_path, "PNG"): | |
| raise Exception("Failed to save mask") | |
| # check for selection to save selection mask for either potential bbox (use_bbox_or_mask=0|1) or mask (if use_bbox_or_mask=2) | |
| selection = doc.selection() | |
| if selection is not None: | |
| # full canvas selection mask | |
| pixel_data = selection.pixelData(0, 0, canvas_dims[0], canvas_dims[1]) | |
| qimg = QImage(pixel_data, canvas_dims[0], canvas_dims[1], QImage.Format_Grayscale8) | |
| img_path = os.path.join(tmpdir, "input_mask", f"selection_mask.png") | |
| if not qimg.save(img_path, "PNG"): | |
| raise Exception(f"Failed to save canvas selection mask") | |
| # Save source frames | |
| for i, frame in enumerate(range(start_frame, end_frame + 1)): | |
| # Update progress | |
| progress.setValue(i) | |
| QApplication.processEvents() # Keep UI responsive | |
| if progress.wasCanceled(): | |
| return False | |
| # raise Exception("User canceled") | |
| # Save source frame | |
| pixel_data = src_layer.pixelDataAtTime(0, 0, canvas_dims[0], canvas_dims[1], frame) if is_animated_srclayer else src_layer.pixelData(0, 0, canvas_dims[0], canvas_dims[1]) | |
| qimg = QImage(pixel_data, canvas_dims[0], canvas_dims[1], QImage.Format_ARGB32) | |
| img_path = os.path.join(tmpdir, "input_src", f"frame_{frame:04d}.png") | |
| if not qimg.save(img_path, "PNG"): | |
| raise Exception(f"Failed to save source frame {frame}") | |
| progress.setValue(total_frames) | |
| return True | |
| finally: | |
| progress.close() | |
| class ComfyWorker(QThread): | |
| finished = pyqtSignal(str) # emits tmpdir | |
| error = pyqtSignal(str) | |
| def __init__(self, tmpdir): | |
| super().__init__() | |
| self.tmpdir = tmpdir | |
| def run(self): | |
| try: | |
| # Submit prompt | |
| data = json.dumps({"prompt": workflow, "client_id": CLIENT_ID }).encode("utf-8") | |
| req = urllib.request.Request("http://127.0.0.1:8188/prompt", data=data, headers={"Content-Type": "application/json"}) | |
| response = urllib.request.urlopen(req) | |
| result = json.loads(response.read()) | |
| prompt_id = result["prompt_id"] | |
| client_id = CLIENT_ID | |
| print(f"ComfyWorker started for prompt_id: {prompt_id} and client_d: {client_id}") | |
| # Now connect to WebSocket | |
| ws_url = f"ws://127.0.0.1:8188/ws?clientId={client_id}" | |
| websocket = websocket_client.create_connection(ws_url) | |
| try: | |
| websocket.recv() # skip initial message | |
| while True: | |
| raw = websocket.recv() | |
| if isinstance(raw, str): | |
| msg = json.loads(raw) | |
| is_my_prompt = ("data" in msg and "prompt_id" in msg["data"] and msg["data"]["prompt_id"] == prompt_id) | |
| if msg["type"] == "execution_error" and is_my_prompt: | |
| error_msg = msg["data"].get("exception_message", "Unknown error") | |
| raise Exception(f"ComfyUI error: {error_msg}") | |
| if msg['type'] == 'executing' and is_my_prompt: | |
| data = msg['data'] | |
| if data['node'] is None: | |
| self.finished.emit(self.tmpdir) | |
| return #Execution is done | |
| """ Fail-safe empty queue check may be useful | |
| if msg["type"] == "status": | |
| if msg["data"]["status"].get("exec_info", {}).get("queue_remaining") == 0: | |
| self.finished.emit(self.tmpdir) | |
| return | |
| """ | |
| finally: | |
| websocket.close() | |
| except Exception as e: | |
| print(f"Error in ComfyWorker: {str(e)}") | |
| self.error.emit(str(e)) | |
| # check if __file__ is defined (it may not be in some environments) | |
| if '__file__' not in globals(): | |
| __file__ = os.path.abspath('C:/ComfyUI/custom_nodes/mask2sam/script_krita_points2sec.py') | |
| API_BASE_DIR = os.path.join(os.path.dirname(__file__), "api_workflows") | |
| def find_appropriate_parent_and_target(doc, selected_node, has_masks): | |
| if not selected_node: | |
| return doc.rootNode(), None | |
| if has_masks: | |
| if selected_node.type() == "paintlayer": | |
| return selected_node.parentNode() or doc.rootNode(), selected_node | |
| if selected_node.type() == "transparencymask": | |
| parent_layer = selected_node.parentNode() | |
| if parent_layer: | |
| return parent_layer.parentNode() or doc.rootNode(), parent_layer | |
| parent = selected_node.parentNode() | |
| if parent and parent.type() == "grouplayer": | |
| return parent, None | |
| if selected_node.type() == "grouplayer": | |
| return selected_node, None | |
| return selected_node.parentNode() or doc.rootNode(), None | |
| else: | |
| if selected_node.type() == "grouplayer": | |
| return selected_node, None | |
| else: | |
| return selected_node.parentNode() or doc.rootNode(), None | |
| def qimage_to_rgba8_bytes(qimg): | |
| if qimg.format() != QImage.Format_RGBA8888: | |
| qimg = qimg.convertToFormat(QImage.Format_RGBA8888) | |
| # return ptr.asstring(qimg.byteCount()) if ptr else b'' | |
| return QByteArray(qimg.bits().asstring(qimg.byteCount())) | |
| def qimage_to_grayscale_bytes(qimg): | |
| if qimg.format() != QImage.Format_Grayscale8: | |
| qimg = qimg.convertToFormat(QImage.Format_Grayscale8) | |
| return QByteArray(qimg.bits().asstring(qimg.byteCount())) | |
| def logErrMessage(msg, title="Script Error"): | |
| QMessageBox.information(QWidget(), title, msg) | |
| def logAndRaiseErrMessage(msg, title="Script Error"): | |
| logErrMessage(msg, title) | |
| raise Exception(msg) | |
| # Get active document | |
| app = Krita.instance() | |
| doc = app.activeDocument() | |
| if not doc: | |
| logAndRaiseErrMessage("❌ No active document") | |
| canvas_dimensions = [doc.width(), doc.height()] | |
| start_frame = doc.playBackStartTime() | |
| current_frame = doc.currentTime() | |
| end_frame = doc.playBackEndTime() | |
| if current_frame < start_frame or current_frame > end_frame: | |
| logAndRaiseErrMessage("❌ Currently selected frame is outside of playback range") | |
| active_layer = doc.activeNode() | |
| if not active_layer: | |
| logAndRaiseErrMessage("❌ Please selecct a layer to run this script") | |
| if active_layer.type() != "transparencymask": | |
| logAndRaiseErrMessage("Other layer types besides mask not ot supported atm!: " + active_layer.type()) | |
| src_layer = active_layer.parentNode() | |
| parent_layer = src_layer | |
| if not src_layer or src_layer.type() != "paintlayer": | |
| logAndRaiseErrMessage("Please select a mask layer that is attached to a paint layer") | |
| # determine intent from krita layer frame selections by user | |
| if not active_layer.hasKeyframeAtTime(current_frame): # assume current frame sample exist at start of playback range | |
| if active_layer.hasKeyframeAtTime(start_frame): | |
| current_frame = start_frame | |
| else: # use keyframe at current_frame as reference between start and end range | |
| # allow for single frame processing workaround in animation if current playback frame range is only exactly 2 but there is an additional mask layer (dummy) keyframe at the other frame outside current_frame | |
| if end_frame - start_frame == 2 and active_layer.hasKeyframeAtTime(start_frame if current_frame == end_frame else end_frame): | |
| start_frame = end_frame = current_frame | |
| if start_frame > end_frame: | |
| logAndRaiseErrMessage("❌ Invalid selected frame range in document") | |
| # Pop up points editor input dialog | |
| points_dialog = PointsEditorDialog(app.activeWindow().qwindow()) | |
| result = points_dialog.exec_() | |
| pos_points, neg_points = [], [] | |
| points = points_dialog.get_result() | |
| if points is not None: | |
| pos_points, neg_points = points | |
| else: | |
| if result == points_dialog.Rejected: | |
| raise Exception("User closed script") | |
| else: | |
| raise Exception("User closed script [x]") | |
| # Setup files | |
| tmpdir = tempfile.mkdtemp(prefix="krita_comfy_") | |
| user_canceled = False | |
| is_animated_masklayer = active_layer.animated() | |
| is_animated_srclayer = src_layer.animated() | |
| if not is_animated_srclayer: | |
| end_frame = start_frame = current_frame = 0 # force single frame if src not animated | |
| if is_animated_masklayer and not active_layer.hasKeyframeAtTime(current_frame): | |
| logAndRaiseErrMessage("❌ Please make sure the selected mask layer has a keyframe at the designated frame of the selected range.") | |
| try: | |
| tmpdir_unix = tmpdir.replace("\\", "/") | |
| os.makedirs(os.path.join(tmpdir, "input_src"), exist_ok=True) | |
| os.makedirs(os.path.join(tmpdir, "input_mask"), exist_ok=True) | |
| os.makedirs(os.path.join(tmpdir, "output"), exist_ok=True) | |
| # Save frames with progress dialog | |
| user_canceled = not save_input_frames_with_progress( | |
| tmpdir, src_layer, active_layer, | |
| canvas_dimensions, start_frame, end_frame, current_frame | |
| ) | |
| # Save meta.json | |
| meta = { | |
| # Customisable options | |
| # https://github.com/9nate-drake/Comfyui-SecNodes?tab=readme-ov-file#2-sec-video-segmentation | |
| "use_bbox_or_mask": 1, # 0=points only, 1=bbox(from mask layer keyframe), 2=mask | |
| # if mask layer keyframe is blank, the bbox/mask will be ignored and will treat as `0=points only` | |
| # https://www.runcomfy.com/comfyui-nodes/ComfyUI-KJNodes/GrowMaskWithBlur | |
| "expand": 5, | |
| "tapered": False, | |
| # needed for workflow | |
| "annotation_frame_idx": current_frame - start_frame, | |
| "coordinates_positive": json.dumps(pos_points), | |
| "coordinates_negative": json.dumps(neg_points), | |
| } | |
| with open(os.path.join(tmpdir, "meta.json"), "w") as f: | |
| json.dump(meta, f) | |
| # Load workflow | |
| with open(os.path.join(API_BASE_DIR, "api_tmpl_folder_sec.json"), "r", encoding="utf-8") as f: | |
| workflow_str = f.read().replace("WORKING_FOLDER_LOCATION", tmpdir_unix) | |
| workflow = json.loads(workflow_str) | |
| print(f"✅set up temp dir: with files saved {tmpdir}", "ss") | |
| except Exception as e: | |
| print("clearing up tempdir due to error before Worker:" + str(e)) | |
| shutil.rmtree(tmpdir, ignore_errors=True) | |
| logAndRaiseErrMessage(f"Setup failed: {str(e)}") | |
| # response handlers | |
| def on_success(tmpdir): | |
| output_folder = os.path.join(tmpdir, "output") | |
| output_images = sorted([f for f in os.listdir(output_folder) if f.lower().endswith(('.png', '.webp'))]) | |
| if not output_images: | |
| raise Exception("No output images found") | |
| doc_width = doc.width() | |
| doc_height = doc.height() | |
| # insert new layer(s) | |
| doc.setActiveNode(parent_layer) | |
| # add new transpaenrcy mask | |
| #sel = Selection() | |
| #sel.select(0, 0, doc_width, doc_height, 255) | |
| #doc.setSelection(sel) | |
| #app.action('add_new_transparency_mask').trigger() | |
| #doc.setCurrentTime(start_frame) | |
| doc.setActiveNode(active_layer) | |
| # hack workaround to set base state of animated mask layer to white | |
| def reset_mask_base_to_white(mask_layer, width, height): | |
| white_image = QImage(width, height, QImage.Format_Grayscale8) | |
| white_image.fill(255) | |
| white_data = qimage_to_grayscale_bytes(white_image) | |
| original_time = doc.currentTime() | |
| # Try to find a frame without keyframe | |
| test_frame = None | |
| for frame in range(0, 1000): | |
| if not mask_layer.hasKeyframeAtTime(frame): | |
| test_frame = frame | |
| break | |
| if test_frame is not None: | |
| # Found a frame without keyframe - use it for base state | |
| doc.setCurrentTime(test_frame) | |
| mask_layer.setPixelData(white_data, 0, 0, width, height) | |
| else: | |
| # All frames have keyframes - temporarily clear one to set base state | |
| if mask_layer.animated(): | |
| # Clear the first keyframe temporarily | |
| first_keyframe = None | |
| for frame in range(0, 1000): | |
| if mask_layer.hasKeyframeAtTime(frame): | |
| first_keyframe = frame | |
| break | |
| if first_keyframe is not None: | |
| # Store the first keyframe data | |
| first_data = mask_layer.pixelDataAtTime(0, 0, width, height, first_keyframe) | |
| # Clear it to access base state | |
| mask_layer.setCurrentTime(first_keyframe) | |
| # Unfortunately, Krita doesn't have a direct way to clear single keyframe | |
| # So we'll use a different approach | |
| # Alternative: Set base state on a frame outside normal range | |
| doc.setCurrentTime(9999) # Use a very high frame number | |
| mask_layer.setPixelData(white_data, 0, 0, width, height) | |
| # Then restore the first keyframe | |
| doc.setCurrentTime(first_keyframe) | |
| mask_layer.setPixelData(first_data, 0, 0, width, height) | |
| doc.setCurrentTime(original_time) | |
| def create_keyframes(): | |
| add_blank_frame = app.action('add_blank_frame') | |
| # fill full white 255,255,255 | |
| new_layer = active_layer | |
| # confirm new_layer is transparencymask | |
| if new_layer.type() != "transparencymask": | |
| # logAndRaiseErrMessage("Failed to create new transparency mask layer:" + new_layer.type()) | |
| active_window = app.activeWindow() | |
| active_view = active_window.activeView() if active_window else None | |
| if active_view: | |
| active_view.showFloatingMessage("Failed to create new transparency mask layer:" + new_layer.type(), app.icon("16_light_info"), 4000, 1) | |
| return | |
| #white_image = QImage(doc_width, doc_height, QImage.Format_Grayscale8) | |
| # white_image.fill(255) | |
| #white_data = qimage_to_grayscale_bytes(white_image) | |
| original_time = doc.currentTime() | |
| #doc.setCurrentTime(0) | |
| #new_layer.setPixelData(white_data, 0, 0, doc_width, doc_height) | |
| reset_mask_base_to_white(new_layer, doc_width, doc_height) | |
| for i, img_name in enumerate(output_images): | |
| frame_num = start_frame + i | |
| doc.setCurrentTime(frame_num) | |
| add_blank_frame.trigger() | |
| # for i, img_name in enumerate(output_images): | |
| # frame_num = start_frame + i | |
| # doc.setCurrentTime(frame_num) | |
| # new_layer.setPixelData(white_data, 0, 0, white_image.width(), white_image.height()) | |
| for i, img_name in enumerate(output_images): | |
| frame_num = start_frame + i | |
| doc.setCurrentTime(frame_num) | |
| img_path = os.path.join(output_folder, img_name) | |
| qimg = QImage(img_path) | |
| pixel_data = qimage_to_grayscale_bytes(qimg) | |
| new_layer.setPixelData(pixel_data, 0, 0, qimg.width(), qimg.height()) | |
| doc.setCurrentTime(original_time) | |
| doc.refreshProjection() | |
| if active_layer.animated(): | |
| QTimer.singleShot(0, create_keyframes) | |
| else: | |
| img_path = os.path.join(output_folder, output_images[0]) | |
| qimg = QImage(img_path) | |
| pixel_data = qimage_to_grayscale_bytes(qimg) | |
| active_layer.setPixelData(pixel_data, 0, 0, qimg.width(), qimg.height()) | |
| doc.refreshProjection() | |
| def on_error(msg): | |
| print("clearing up tempdir due to error in Worker") | |
| shutil.rmtree(tmpdir, ignore_errors=True) | |
| logErrMessage(f"ComfyUI Error: {msg}") | |
| # Start worker ? | |
| try: | |
| active_window = app.activeWindow() | |
| active_view = active_window.activeView() if active_window else None | |
| if not user_canceled: | |
| # logErrMessage("Starting ComfyUI process, please wait... " + str(tmpdir)) | |
| worker = ComfyWorker(tmpdir) | |
| worker.finished.connect(on_success) | |
| worker.error.connect(on_error) | |
| worker.start() # Non-blocking! | |
| if active_view: | |
| active_view.showFloatingMessage("ComfyUI process has started...", app.icon("16_light_info"), 4000, 1) | |
| loop = QEventLoop() | |
| worker.finished.connect(loop.quit) | |
| worker.error.connect(loop.quit) | |
| loop.exec_() | |
| else: | |
| shutil.rmtree(tmpdir, ignore_errors=True) | |
| if active_view: | |
| active_view.showFloatingMessage("Canceled script", app.icon("16_light_info"), 1000, 1) | |
| except Exception as e: | |
| print("clearing up tempdir due to error starting Worker:" + str(e)) | |
| shutil.rmtree(tmpdir, ignore_errors=True) |
Author
Author
For the version that uses mask painting layers (and marquee selection) only without the Points Editor dialog appearing https://gist.github.com/Glidias/9c9fd652223224ab99d05d1c7954e134
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This could have a bit of improvement with the points editor widget being being able to accept optional custom image parameter instead (for background image reference) since it's possible that the

current_framereference index intended for Sec Segmentation might not be the current frame document projection currently used in Krita session.https://github.com/Glidias/krita-ai-diffusion/wiki/Feature:-Points-Editor-Widget
As a workaround when using it with the script, if a playback range is selected on the timeline, also ensure the playback head is moved back to the appropiate keyframe to ensure the right background image projection is captured before triggering the script to ensure the points editor shows the correct image reference.