Last active
January 13, 2026 15:50
-
-
Save pythonhacker/023baff2d1b9257b3b62d615e058d7da to your computer and use it in GitHub Desktop.
A Python cmd.Cmd console as a devops helper to Elasticsearch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import cmd | |
| import json | |
| import os | |
| from pathlib import Path | |
| from elasticsearch import Elasticsearch | |
| from elasticsearch.helpers import bulk | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| class QueryHelper: | |
| """ Query helper class """ | |
| def __init__(self, index): | |
| self.index = index | |
| self.index_mapper = { | |
| 'home_library_books': self.query_builder_home_library | |
| } | |
| def build_query(self, arg): | |
| """ Build query against multiple indices """ | |
| if self.index not in self.index_mapper: | |
| print(f"{self.index} - index not configured") | |
| return | |
| return self.index_mapper[self.index](arg) | |
| def query_builder_home_library(self, arg): | |
| """ Query builder for home library index """ | |
| es_query = { | |
| "query": { | |
| "multi_match": { | |
| "query": arg, | |
| "fields": [ | |
| "title^3", | |
| "authors^2", | |
| "genres", | |
| "review_notes" | |
| ], | |
| "type": "best_fields" | |
| } | |
| }, | |
| "size": 10 | |
| } | |
| return es_query | |
| class ElasticsearchShell(cmd.Cmd): | |
| intro = "Elasticsearch helper shell. Type help or ? to list commands.\n" | |
| prompt = "ES> " | |
| def __init__(self, es: Elasticsearch): | |
| super().__init__() | |
| self.es = es | |
| self.default_index = None | |
| def help_info(self): | |
| """ Help on info """ | |
| print("Return elasticsearch cluster information") | |
| def help_cat(self): | |
| """ Help on cat """ | |
| print("Print elasticsearch indices information") | |
| def help_get(self): | |
| """ Help on get """ | |
| print("Get a document by its id") | |
| def help_query(self): | |
| """ Help on query """ | |
| print("Query for elasticsearch documents by free text") | |
| def help_index(self): | |
| """ Help on index """ | |
| print("Index document(s) to elasticsearch") | |
| def help_set_index(self): | |
| """ Help on set_index """ | |
| print("Sets the default elasticsearch index") | |
| def do_info(self, arg): | |
| """info | |
| Show Elasticsearch cluster information | |
| """ | |
| info = self.es.info() | |
| print(json.dumps(info.body, indent=2)) | |
| def do_cat(self, arg): | |
| """cat | |
| List Elasticsearch indices (similar to _cat/indices) | |
| """ | |
| indices = self.es.cat.indices(format="json") | |
| print(json.dumps(indices.body, indent=4)) | |
| def do_query(self, arg): | |
| """query <search terms> | |
| Perform a text search across book fields | |
| Example: | |
| query dune | |
| query asimov robots | |
| query detective victorian | |
| """ | |
| if self.default_index is None: | |
| print("error - default index not set") | |
| return | |
| query_string = arg.strip() | |
| es_query = QueryHelper(self.default_index).build_query(query_string) | |
| if not es_query: | |
| print("error building ES query for input") | |
| return | |
| resp = self.es.search(index = self.default_index, body = es_query) | |
| hits = resp["hits"]["hits"] | |
| if not hits: | |
| print("No results found.") | |
| return | |
| print(f"Found {len(hits)} result(s)\n") | |
| for h in hits: | |
| src = h["_source"] | |
| score = h["_score"] | |
| id_ = h["_id"] | |
| print(f"Title - {src.get('title')} ({src.get('publication_year')})") | |
| print(f" authors: {', '.join(src.get('authors', []))}") | |
| print(f" genres: {', '.join(src.get('genres', []))}") | |
| print(f" rating: {src.get('rating')} | score: {score:.2f}") | |
| print(f" (id: {id_})\n") | |
| def do_set_index(self, arg): | |
| """ Set the default index """ | |
| arg = arg.strip() | |
| if not arg: | |
| print("use a non-empty string without spaces") | |
| return | |
| index = arg | |
| # Check if exists | |
| exists = self.es.indices.exists(index = index) | |
| if not exists: | |
| print(f"error - index {index} does not exist") | |
| return | |
| print("Set default index to:", index) | |
| # This should exist | |
| self.default_index = index | |
| def do_get(self, arg): | |
| """ Fetch a document by id """ | |
| arg = arg.strip() | |
| if not arg: | |
| print("use a valid document id") | |
| return | |
| if self.default_index is None: | |
| print("error - default index not set") | |
| return | |
| doc_id = arg | |
| try: | |
| resp = self.es.get(index = self.default_index, id=doc_id) | |
| except Exception as e: | |
| # Handle not found cleanly | |
| if hasattr(e, "status_code") and e.status_code == 404: | |
| print(f"Document not found: {doc_id}") | |
| return | |
| raise | |
| source = resp.get("_source") | |
| if not source: | |
| print("Document found but has no _source") | |
| return | |
| print(f"_id: {resp['_id']}") | |
| print(json.dumps(source, indent=4)) | |
| def do_index(self, arg): | |
| """index <data.json> | |
| Bulk index document(s) from a JSON file | |
| Example: | |
| index books.json | |
| JSON file can contain a single document | |
| as well | |
| """ | |
| if self.default_index is None: | |
| print("error - default index not set") | |
| return | |
| data_file = arg | |
| path = Path(data_file) | |
| if not path.exists(): | |
| print(f"Data file not found: {data_file}") | |
| return | |
| try: | |
| docs = json.loads(path.read_text()) | |
| except json.JSONDecodeError as e: | |
| print(f"Invalid JSON: {e}") | |
| return | |
| if not isinstance(docs, list): | |
| # 1 item -> list | |
| docs = [docs] | |
| actions = [ | |
| { | |
| "_index": self.default_index, | |
| "_source": doc | |
| } | |
| for doc in docs | |
| ] | |
| success, errors = bulk(self.es, actions, raise_on_error=False) | |
| print(f"Indexed {success} documents") | |
| if errors: | |
| print("Errors:") | |
| print(json.dumps(errors, indent=2)) | |
| def emptyline(self): | |
| return | |
| def do_EOF(self, arg): | |
| """Exit on Ctrl-D""" | |
| print("Goodbyte") | |
| return True | |
| def create_client() -> Elasticsearch: | |
| """ Create the ES client """ | |
| url = os.environ.get("ELASTIC_URL", "https://localhost:9200") | |
| user = os.environ.get("ELASTIC_USER", "elastic") | |
| password = os.environ.get("ELASTIC_PASSWORD") | |
| if not user or not password: | |
| raise RuntimeError("Missing env variables") | |
| return Elasticsearch( | |
| url, | |
| basic_auth=(user, password), | |
| verify_certs=False # local dev only | |
| ) | |
| if __name__ == "__main__": | |
| es = create_client() | |
| if not es.ping(): | |
| raise RuntimeError("Cannot connect to Elasticsearch") | |
| ElasticsearchShell(es).cmdloop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment