-
-
Save oscarandreu/5564d37ac0a83e7b9cbef6e4123932c6 to your computer and use it in GitHub Desktop.
An Apache NiFi InvokeScriptedProcessor template (in Jython) for running ExecuteScript Jython scripts faster
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from org.apache.nifi.processor import Processor,Relationship | |
| from org.apache.nifi.components import PropertyDescriptor | |
| from org.apache.nifi.processor.util import StandardValidators | |
| from java.lang import Throwable | |
| class ScriptBody(): | |
| def __init__(self): | |
| pass | |
| def executeScript(self, session, context, log, REL_SUCCESS, REL_FAILURE): | |
| flowFile = session.get() | |
| if (not flowFile): | |
| return | |
| # transfer | |
| session.transfer(flowFile, REL_SUCCESS) | |
| #end class | |
| class JythonProcessor(Processor): | |
| REL_SUCCESS = (Relationship.Builder() | |
| .name("success") | |
| .description('FlowFiles that were successfully processed are routed here') | |
| .build()) | |
| REL_FAILURE = (Relationship.Builder() | |
| .name("failure") | |
| .description('FlowFiles that were not successfully processed are routed here') | |
| .build()) | |
| importFilesPath = (PropertyDescriptor.Builder() | |
| .name('importFilesPath') | |
| .description("Defines the importFilesPath.") | |
| .required(True) | |
| .defaultValue('/mnt/foo/bar') | |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) | |
| .build()) | |
| importFile = (PropertyDescriptor.Builder() | |
| .name('importFile') | |
| .description("filename to be imported") | |
| .expressionLanguageSupported(True) | |
| .required(True) | |
| .defaultValue('import.csv') | |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) | |
| .build()) | |
| log = None | |
| eb = ScriptBody() | |
| def initialize(self, context): | |
| self.log = context.logger | |
| def getRelationships(self): | |
| return set([self.REL_SUCCESS, self.REL_FAILURE]) | |
| def validate(self,context): | |
| pass | |
| def onPropertyModified(self,descriptor, oldValue, newValue): | |
| pass | |
| def getPropertyDescriptors(self): | |
| return [ | |
| self.importFile, | |
| self.importFilesPath | |
| ] | |
| def getIdentifier(self): | |
| return None | |
| def onTrigger(self,context, sessionFactory): | |
| session = sessionFactory.createSession() | |
| try: | |
| self.eb.executeScript(session, context, self.log, self.REL_SUCCESS, self.REL_FAILURE) | |
| session.commit() | |
| except Throwable, t: | |
| self.log.error('{} failed to process due to {}; rolling back session', [self, t]) | |
| session.rollback(true) | |
| raise t | |
| #end class | |
| processor = JythonProcessor() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey @karanmankar95!
I don't know if you still need have this problem, but I'm replying anyways for posterity and the sake of everyone else that may have the error.
The error on line 68 is probably the lowercase "true", it should be True. The former is not valid Python syntax.
The error on line 69 is probably due to line 66 (again, not valid Python syntax). It should be
except Throwable as t:In the end, lines 61 to 69 (i.e., the onTrigger function) should be something like this: