Skip to content

Instantly share code, notes, and snippets.

@llvtt
Created November 7, 2014 01:02
Show Gist options
  • Select an option

  • Save llvtt/cd79ef60891655be5bdd to your computer and use it in GitHub Desktop.

Select an option

Save llvtt/cd79ef60891655be5bdd to your computer and use it in GitHub Desktop.
Customized Solr DocManager
# Copyright 2013-2014 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Receives documents from the oplog worker threads and indexes them
into the backend.
This file is a document manager for the Solr search engine, but the intent
is that this file can be used as an example to add on different backends.
To extend this to other systems, simply implement the exact same class and
replace the method definitions with API calls for the desired backend.
"""
import re
import json
import pymongo
from pysolr import Solr, SolrError
from mongo_connector import errors
from mongo_connector.constants import (DEFAULT_COMMIT_INTERVAL,
DEFAULT_MAX_BULK)
from mongo_connector.util import retry_until_ok
from mongo_connector.doc_managers import DocManagerBase, exception_wrapper
from mongo_connector.doc_managers.formatters import DocumentFlattener
# pysolr only has 1 exception: SolrError
wrap_exceptions = exception_wrapper({
SolrError: errors.OperationFailed})
ADMIN_URL = 'admin/luke?show=schema&wt=json'
decoder = json.JSONDecoder()
# Customize these as necessary.
MONGO_HOST = "localhost"
MONGO_PORT = 27017
class DocumentFlattenerUnordered(DocumentFlattener):
"""Formatter that completely flattens documents, but does not unwind arrays.
Order of elements within arrays is not recorded.
An example:
{"a": 2,
"b": {
"c": {
"d": 5
}
},
"e": [6, 7, 8]
}
becomes:
{"a": 2, "b.c.d": 5, "e": [6, 7, 8]}
"""
def transform_element(self, key, value):
if isinstance(value, list):
result = {}
for lv in value:
for inner_k, inner_v in self.transform_element(key, lv):
inner_l = result.get(inner_k, [])
if isinstance(inner_v, list):
inner_l.extend(inner_v)
else:
inner_l.append(inner_v)
result[inner_k] = inner_l
for k in result:
yield k, result[k]
else:
parent = super(DocumentFlattenerUnordered, self)
for k, v in parent.transform_element(key, value):
yield k, v
class DocManager(DocManagerBase):
"""The DocManager class creates a connection to the backend engine and
adds/removes documents, and in the case of rollback, searches for them.
The reason for storing id/doc pairs as opposed to doc's is so that multiple
updates to the same doc reflect the most up to date version as opposed to
multiple, slightly different versions of a doc.
"""
def __init__(self, url, auto_commit_interval=DEFAULT_COMMIT_INTERVAL,
unique_key='_id', chunk_size=DEFAULT_MAX_BULK, **kwargs):
"""Verify Solr URL and establish a connection.
"""
self.solr = Solr(url)
self.mongo_client = pymongo.MongoClient(MONGO_HOST, MONGO_PORT)
self.unique_key = unique_key
# pysolr does things in milliseconds
if auto_commit_interval is not None:
self.auto_commit_interval = auto_commit_interval * 1000
else:
self.auto_commit_interval = None
self.chunk_size = chunk_size
self.field_list = []
self._build_fields()
self._formatter = DocumentFlattenerUnordered()
def _parse_fields(self, result, field_name):
""" If Schema access, parse fields and build respective lists
"""
field_list = []
for key, value in result.get('schema', {}).get(field_name, {}).items():
if key not in field_list:
field_list.append(key)
return field_list
@wrap_exceptions
def _build_fields(self):
""" Builds a list of valid fields
"""
declared_fields = self.solr._send_request('get', ADMIN_URL)
result = decoder.decode(declared_fields)
self.field_list = self._parse_fields(result, 'fields')
# Build regular expressions to match dynamic fields.
# dynamic field names may have exactly one wildcard, either at
# the beginning or the end of the name
self._dynamic_field_regexes = []
for wc_pattern in self._parse_fields(result, 'dynamicFields'):
if wc_pattern[0] == "*":
self._dynamic_field_regexes.append(
re.compile(".*%s\Z" % wc_pattern[1:]))
elif wc_pattern[-1] == "*":
self._dynamic_field_regexes.append(
re.compile("\A%s.*" % wc_pattern[:-1]))
def _clean_doc(self, doc):
"""Reformats the given document before insertion into Solr.
This method reformats the document in the following ways:
- removes extraneous fields that aren't defined in schema.xml
- unwinds arrays in order to find and later flatten sub-documents
- flattens the document so that there are no sub-documents, and every
value is associated with its dot-separated path of keys
An example:
{"a": 2,
"b": {
"c": {
"d": 5
}
},
"e": [6, 7, 8]
}
becomes:
{"a": 2, "b.c.d": 5, "e.0": 6, "e.1": 7, "e.2": 8}
"""
# Translate the _id field to whatever unique key we're using.
# _id may not exist in the doc, if we retrieved it from Solr
# as part of update.
if '_id' in doc:
doc[self.unique_key] = doc.pop("_id")
# SOLR cannot index fields within sub-documents, so flatten documents
# with the dot-separated path to each value as the respective key
flat_doc = self._formatter.format_document(doc)
# Only include fields that are explicitly provided in the
# schema or match one of the dynamic field patterns, if
# we were able to retrieve the schema
if len(self.field_list) + len(self._dynamic_field_regexes) > 0:
def include_field(field):
return field in self.field_list or any(
regex.match(field) for regex in self._dynamic_field_regexes
)
return dict((k, v) for k, v in flat_doc.items() if include_field(k))
return flat_doc
def stop(self):
""" Stops the instance
"""
pass
@wrap_exceptions
def update(self, doc, update_spec):
"""Apply updates given in update_spec to the document whose id
matches that of doc.
"""
# Grab the latest version of the document from MongoDB, instead of
# applying the update operation.
document_id = doc['_id']
db, coll = doc['ns'].split('.', 1)
latest_document = self.mongo_client[db][coll].find_one(document_id)
# A _version_ of 0 will always apply the update
latest_document['_version_'] = 0
self.upsert(latest_document)
return latest_document
@wrap_exceptions
def upsert(self, doc):
"""Update or insert a document into Solr
This method should call whatever add/insert/update method exists for
the backend engine and add the document in there. The input will
always be one mongo document, represented as a Python dictionary.
"""
if self.auto_commit_interval is not None:
self.solr.add([self._clean_doc(doc)],
commit=(self.auto_commit_interval == 0),
commitWithin=str(self.auto_commit_interval))
else:
self.solr.add([self._clean_doc(doc)], commit=False)
@wrap_exceptions
def bulk_upsert(self, docs):
"""Update or insert multiple documents into Solr
docs may be any iterable
"""
if self.auto_commit_interval is not None:
add_kwargs = {
"commit": (self.auto_commit_interval == 0),
"commitWithin": str(self.auto_commit_interval)
}
else:
add_kwargs = {"commit": False}
cleaned = (self._clean_doc(d) for d in docs)
if self.chunk_size > 0:
batch = list(next(cleaned) for i in range(self.chunk_size))
while batch:
self.solr.add(batch, **add_kwargs)
batch = list(next(cleaned)
for i in range(self.chunk_size))
else:
self.solr.add(cleaned, **add_kwargs)
@wrap_exceptions
def remove(self, doc):
"""Removes documents from Solr
The input is a python dictionary that represents a mongo document.
"""
self.solr.delete(id=str(doc["_id"]),
commit=(self.auto_commit_interval == 0))
@wrap_exceptions
def _remove(self):
"""Removes everything
"""
self.solr.delete(q='*:*', commit=(self.auto_commit_interval == 0))
@wrap_exceptions
def _stream_search(self, query):
"""Helper method for iterating over Solr search results."""
for doc in self.solr.search(query, rows=100000000):
if self.unique_key != "_id":
doc["_id"] = doc.pop(self.unique_key)
yield doc
@wrap_exceptions
def search(self, start_ts, end_ts):
"""Called to query Solr for documents in a time range."""
query = '_ts: [%s TO %s]' % (start_ts, end_ts)
return self._stream_search(query)
@wrap_exceptions
def _search(self, query):
"""For test purposes only. Performs search on Solr with given query
Does not have to be implemented.
"""
return self._stream_search(query)
def commit(self):
"""This function is used to force a commit.
"""
retry_until_ok(self.solr.commit)
@wrap_exceptions
def get_last_doc(self):
"""Returns the last document stored in the Solr engine.
"""
#search everything, sort by descending timestamp, return 1 row
try:
result = self.solr.search('*:*', sort='_ts desc', rows=1)
except ValueError:
return None
for r in result:
r['_id'] = r.pop(self.unique_key)
return r
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment