Created
March 19, 2024 17:44
-
-
Save albal/132f678829fc3fd424f828abafa69bcc to your computer and use it in GitHub Desktop.
Messing around with Elasticsearch in Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from datetime import datetime | |
| from elasticsearch import Elasticsearch | |
| from faker import Faker | |
| import sys | |
| import argparse | |
| # Create the argument parser | |
| parser = argparse.ArgumentParser(description='Elasticsearch command line tool') | |
| # Add the command line arguments | |
| parser.add_argument('--create-entries', action='store_true', help='Create random entries') | |
| parser.add_argument('--search-text', type=str, help='Search for entries with matching text') | |
| parser.add_argument('--all', action='store_true', help='Show all search results') | |
| parser.add_argument('--dump', action='store_true', help='Dump everything in the index') | |
| # Parse the command line arguments | |
| args = parser.parse_args() | |
| print(args) | |
| if len(sys.argv) == 1: | |
| parser.print_help() | |
| sys.exit(0) | |
| # Connect to the Elasticsearch server | |
| es = Elasticsearch("http://localhost:9200") | |
| if args.create_entries: | |
| for n in range(123): | |
| # Generate random author and text | |
| fake = Faker() | |
| author = fake.name() | |
| text = fake.sentence() | |
| # Index a document | |
| doc = { | |
| 'author': author, | |
| 'text': text, | |
| 'timestamp': datetime.now() | |
| } | |
| res = es.index(index="test-index", document=doc) | |
| print(f"Document indexed: {res['result']}") | |
| if args.search_text: | |
| # Get the search text from command line argument | |
| search_text = args.search_text | |
| # Search for the document with matching text | |
| res = es.search(index="test-index", body={"size": 1200, "query": {"match": {"text": search_text}}}) | |
| print("Search Results:") | |
| if args.all: | |
| print(f"Found {res['hits']['total']['value']} entries") | |
| for hit in res['hits']['hits']: | |
| print(f" - {hit['_source']['author']}: {hit['_source']['text']}") | |
| else: | |
| print(f"Found {res['hits']['total']['value']} entries - only showing first 10, use --all to see all entries") | |
| for hit in res['hits']['hits'][:10]: | |
| print(f" - {hit['_source']['author']}: {hit['_source']['text']}") | |
| if args.dump: | |
| # Get the search text from command line argument | |
| search_text = args.search_text | |
| # Use scroll API to get all results | |
| res = es.search(index="test-index", body={"size": 10000, "query": {"match_all": {}}}, scroll="1m") | |
| print("Search Results:") | |
| print(f"Found {res['hits']['total']['value']} entries") | |
| # Process the initial results | |
| for hit in res['hits']['hits']: | |
| print(f" - {hit['_source']['author']}: {hit['_source']['text']}") | |
| # Scroll through the remaining results | |
| scroll_id = res['_scroll_id'] | |
| while True: | |
| res = es.scroll(scroll_id=scroll_id, scroll="1m") | |
| if len(res['hits']['hits']) == 0: | |
| break | |
| for hit in res['hits']['hits']: | |
| print(f" - {hit['_source']['author']}: {hit['_source']['text']}") | |
| # Refresh the index | |
| es.indices.refresh(index="test-index") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment