Created
November 26, 2025 07:33
-
-
Save samisalkosuo/79e249f3fc6664ccb44d14ff02f572ae to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from streamsets.sdk import ControlHub | |
| import os | |
| import sys | |
| STREAMSETS_CREDENTIAL_ID = os.environ.get("STREAMSETS_CREDENTIAL_ID", "<default-credential-id>") | |
| STREAMSETS_TOKEN = os.environ.get("STREAMSETS_TOKEN", "<default-token>") | |
| STREAMSETS_ENGINE_ID = os.environ.get("STREAMSETS_ENGINE_ID", "<default-engine-id>") | |
| STREAMSETS_PIPELINE_NAME = os.environ.get("STREAMSETS_PIPELINE_NAME", "Lab - Drone Ops - MQTT to Kafka") | |
| ASTRADB_API_ENDPOINT = os.environ.get("ASTRADB_API_ENDPOINT", "<astradb-api-endpoint>") | |
| ASTRADB_KEYSPACE_NAME = os.environ.get("ASTRADB_KEYSPACE_NAME", "default_keyspace") | |
| ASTRADB_COLLECTION_NAME = os.environ.get("ASTRADB_COLLECTION_NAME", "drone_telemetry") | |
| ASTRADB_APPLICATION_TOKEN = os.environ.get("ASTRADB_APPLICATION_TOKEN", "<AstraCS:token>") | |
| MQTT_BROKER_URL = "tcp://mqtt.lab-mqtt.svc.cluster.local:1883" | |
| #update commit message from argument if available | |
| try: | |
| COMMIT_MESSAGE = sys.argv[1] | |
| except IndexError: | |
| COMMIT_MESSAGE = "updated" | |
| def getControlHub(): | |
| sch = ControlHub(credential_id=STREAMSETS_CREDENTIAL_ID, token=STREAMSETS_TOKEN) | |
| #print(sch) | |
| return sch | |
| def getPipeline(sch): | |
| try: | |
| #get pipeline | |
| pipeline = sch.pipelines.get(name=STREAMSETS_PIPELINE_NAME) | |
| print(f"Creating new version of pipeline: {pipeline}") | |
| except ValueError: | |
| #create pipeline | |
| engine = sch.engines.get(id=STREAMSETS_ENGINE_ID) | |
| pipeline_builder = sch.get_pipeline_builder(engine_type='data_collector', engine_id=engine.id) | |
| pipeline = pipeline_builder.build(STREAMSETS_PIPELINE_NAME) | |
| sch.publish_pipeline(pipeline, commit_message='created') | |
| #get newly created pipeline | |
| pipeline = sch.pipelines.get(name=STREAMSETS_PIPELINE_NAME) | |
| print(f"Pipeline created: {pipeline}") | |
| #print(pipeline) | |
| #stages | |
| stages = pipeline.stages | |
| #remove all stages | |
| for stage in stages: | |
| pipeline.remove_stages(stage) | |
| return pipeline | |
| def createStage(pipeline, stage_name, stage_label, type = None): | |
| if type == None: | |
| stage = pipeline.add_stage(label = stage_name) | |
| else: | |
| stage = pipeline.add_stage(label = stage_name, type = type) | |
| stage.label = stage_label | |
| return stage | |
| def publish(sch, pipeline): | |
| sch.publish_pipeline(pipeline, commit_message = COMMIT_MESSAGE) | |
| print("Pipeline published.") | |
| def main(): | |
| sch = getControlHub() | |
| pipeline = getPipeline(sch) | |
| #create stages | |
| origin_mqtt_stage = createStage(pipeline, stage_name = 'MQTT Subscriber', stage_label = "Drone telemetry from MQTT", type = "origin") | |
| origin_mqtt_stage.data_format = "JSON" | |
| origin_mqtt_stage.broker_url = MQTT_BROKER_URL | |
| origin_mqtt_stage.topic_filter = ["drone.telemetry"] | |
| origin_mqtt_stage.clean_session = True | |
| #field type converter | |
| field_type_converter_stage = createStage(pipeline, stage_name = 'Field Type Converter', stage_label = "Convert types") | |
| field_type_converter_stage.conversions_by_field_name = [ | |
| { | |
| "fields": [ | |
| "/ts" | |
| ], | |
| "targetType": "DATETIME", | |
| "dateFormat": "OTHER", | |
| "otherDateFormat": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX", | |
| } | |
| ] | |
| #create stream selector stage | |
| match_patrol_predicate = "${str:matches(record:value('/phase'), \"patrol\")}" | |
| default_predicate = "default" | |
| stream_selector_stage = createStage(pipeline, stage_name = 'Stream Selector', stage_label = "Check drone phase, discard non-patrol") | |
| stream_selector_stage.predicates = [ | |
| match_patrol_predicate, | |
| default_predicate | |
| ] | |
| #discard destination stage | |
| #all but patrol messages are discarded | |
| discard_stage = createStage(pipeline, stage_name = 'Trash', stage_label = "Discard", type='destination') | |
| #add Astra DB REST destination | |
| destination_astradb_stage = createStage(pipeline, stage_name = 'HTTP Client', stage_label = "Drone telemetry to Astra DB", type = 'destination') | |
| destination_astradb_stage.resource_url = f"{ASTRADB_API_ENDPOINT}/api/json/v1/{ASTRADB_KEYSPACE_NAME}/{ASTRADB_COLLECTION_NAME}" | |
| destination_astradb_stage.http_method = "POST" | |
| destination_astradb_stage.headers = [ | |
| { | |
| "key": "Token", | |
| "value": f"{ASTRADB_APPLICATION_TOKEN}" | |
| }, | |
| { | |
| "key": "Content-Type", | |
| "value": "application/json" | |
| } | |
| ] | |
| #create dev identity stages for checking message in control plane GUI | |
| dev_identity_1_stage = createStage(pipeline, stage_name = 'Dev Identity', stage_label = "Dev Identity 1") | |
| dev_identity_2_stage = createStage(pipeline, stage_name = 'Dev Identity', stage_label = "Dev Identity 2") | |
| #javascript stage | |
| create_astradb_json_stage = createStage(pipeline, stage_name = 'JavaScript Evaluator', stage_label = "Create JSON for Astra DB") | |
| create_astradb_json_stage.script = """ | |
| var records = sdc.records; | |
| for(var i = 0; i < records.length; i++) { | |
| try { | |
| var documentMap = {}; | |
| documentMap["drone_id"] = records[i].value['drone_id']; | |
| documentMap["event_time"] = records[i].value['ts']; | |
| documentMap["altitude"] = records[i].value['alt_m']; | |
| documentMap["latitude"] = records[i].value['lat']; | |
| documentMap["longitude"] = records[i].value['lon']; | |
| documentMap["speed"] = records[i].value['speed_mps']; | |
| documentMap["heading"] = records[i].value['heading_deg']; | |
| var insertOneMap = {'insertOne': {'document':documentMap}} | |
| records[i].value['insertOne'] = insertOneMap; | |
| var newRecord = sdc.createRecord(records[i].sourceId + '-insertOne'); | |
| var newRecordFields = sdc.createMap(false); | |
| newRecord.value = insertOneMap; | |
| sdc.output.write(newRecord); | |
| } catch (e) { | |
| // Send record to error | |
| sdc.error.write(records[i], e); | |
| } | |
| } | |
| """ | |
| #connect stages | |
| #connect stream selector to destination | |
| # Check where in the list of predicates the condition is | |
| predicate_index = stream_selector_stage.predicates.index(next(predicate for predicate in stream_selector_stage.predicates if predicate['predicate'] == match_patrol_predicate)) | |
| # Connect the stream_selector stage to the stage | |
| stream_selector_stage.connect_outputs(stages=[dev_identity_1_stage], output_lane_index=predicate_index) | |
| # Check where in the list of predicates the condition is | |
| predicate_index = stream_selector_stage.predicates.index(next(predicate for predicate in stream_selector_stage.predicates if predicate['predicate'] == default_predicate)) | |
| # Connect the stream_selector stage to the stage | |
| stream_selector_stage.connect_outputs(stages=[dev_identity_2_stage], output_lane_index=predicate_index) | |
| #connect stages | |
| origin_mqtt_stage >> stream_selector_stage | |
| dev_identity_2_stage >> discard_stage | |
| dev_identity_1_stage >> field_type_converter_stage | |
| field_type_converter_stage >> create_astradb_json_stage | |
| create_astradb_json_stage >> destination_astradb_stage | |
| publish(sch, pipeline) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment