Created
November 26, 2025 07:04
-
-
Save samisalkosuo/1d160aba4c491570fb87cca871d0d677 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from streamsets.sdk import ControlHub | |
| import os | |
| import sys | |
| import json | |
| STREAMSETS_CREDENTIAL_ID = os.environ.get("STREAMSETS_CREDENTIAL_ID", "<default-credential-id>") | |
| STREAMSETS_TOKEN = os.environ.get("STREAMSETS_TOKEN", "<default-token>") | |
| STREAMSETS_ENGINE_ID = os.environ.get("STREAMSETS_ENGINE_ID", "<default-engine-id>") | |
| STREAMSETS_PIPELINE_NAME = os.environ.get("STREAMSETS_PIPELINE_NAME", "Lab - Drone Ops - MQTT to Kafka") | |
| ASTRADB_API_ENDPOINT = os.environ.get("ASTRADB_API_ENDPOINT", "<astradb-api-endpoint>") | |
| ASTRADB_KEYSPACE_NAME = os.environ.get("ASTRADB_KEYSPACE_NAME", "default_keyspace") | |
| ASTRADB_COLLECTION_NAME = os.environ.get("ASTRADB_COLLECTION_NAME", "drone_telemetry") | |
| ASTRADB_APPLICATION_TOKEN = os.environ.get("ASTRADB_APPLICATION_TOKEN", "<AstraCS:token>") | |
| MQTT_BROKER_URL = "tcp://mqtt.lab-mqtt.svc.cluster.local:1883" | |
| #update commit message from argument if available | |
| try: | |
| COMMIT_MESSAGE = sys.argv[1] | |
| except IndexError: | |
| COMMIT_MESSAGE = "updated" | |
| sch = ControlHub(credential_id=STREAMSETS_CREDENTIAL_ID, token=STREAMSETS_TOKEN) | |
| #print(sch) | |
| try: | |
| #get pipeline | |
| pipeline = sch.pipelines.get(name=STREAMSETS_PIPELINE_NAME) | |
| print(f"Creating new version of pipeline: {pipeline}") | |
| except ValueError: | |
| #create pipeline | |
| engine = sch.engines.get(id=STREAMSETS_ENGINE_ID) | |
| pipeline_builder = sch.get_pipeline_builder(engine_type='data_collector', engine_id=engine.id) | |
| pipeline = pipeline_builder.build(STREAMSETS_PIPELINE_NAME) | |
| sch.publish_pipeline(pipeline, commit_message='created') | |
| #get newly created pipeline | |
| pipeline = sch.pipelines.get(name=STREAMSETS_PIPELINE_NAME) | |
| print(f"Pipeline created: {pipeline}") | |
| #print(pipeline) | |
| #stages | |
| stages = pipeline.stages | |
| #print(stages) | |
| #remove all stages | |
| for stage in stages: | |
| pipeline.remove_stages(stage) | |
| #create origin stage | |
| origin_mqtt_stage = pipeline.add_stage(label = 'MQTT Subscriber', type='origin') | |
| #origin_mqtt_stage.description("Drone telemetry raw data") | |
| #print(origin_mqtt_stage.configuration) | |
| origin_mqtt_stage.data_format = "JSON" | |
| origin_mqtt_stage.broker_url = MQTT_BROKER_URL | |
| origin_mqtt_stage.topic_filter = ["drone.telemetry"] | |
| origin_mqtt_stage.clean_session = True | |
| origin_mqtt_stage.label = "Drone telemetry from MQTT" | |
| #help(origin_mqtt_stage) | |
| #print(origin_mqtt_stage.client_id) | |
| #field remover stage | |
| field_remover_stage = pipeline.add_stage(label = 'Field Remover') | |
| field_remover_stage.label = "Remove unnecessary fields" | |
| #field_remover_stage.action = "Remove Listed Fields" | |
| field_remover_stage.fields = ["/power", | |
| "/camera", | |
| "/imu", | |
| "/magnetometer_uT", | |
| "/gnss", | |
| "/env", | |
| "/motors", | |
| "/cpu_temp_c", | |
| "/link_quality_pct", | |
| "/barometer_hpa", | |
| "/airspeed_mps", | |
| "/battery_pct" | |
| ] | |
| #help(field_remover_stage) | |
| #field renamer stage | |
| field_renamer_stage = pipeline.add_stage(label = 'Field Renamer') | |
| field_renamer_stage.label = "Rename fields" | |
| #help(field_renamer_stage) | |
| field_renamer_stage.fields_to_rename = [ | |
| {"fromFieldExpression": "/ts", "toFieldExpression": "/event_time"}, | |
| {"fromFieldExpression": "/alt_m", "toFieldExpression": "/altitude"}, | |
| {"fromFieldExpression": "/lat", "toFieldExpression": "/latitude"}, | |
| {"fromFieldExpression": "/lon", "toFieldExpression": "/longitude"}, | |
| {"fromFieldExpression": "/speed_mps", "toFieldExpression": "/speed"}, | |
| {"fromFieldExpression": "/heading_deg", "toFieldExpression": "/heading"} | |
| ] | |
| #field type converter | |
| field_type_converter_stage = pipeline.add_stage(label = 'Field Type Converter') | |
| field_type_converter_stage.label = "Convert types" | |
| field_type_converter_stage.conversions_by_field_name = [ | |
| { | |
| "fields": [ | |
| "/event_time" | |
| ], | |
| "targetType": "DATETIME", | |
| "dateFormat": "OTHER", | |
| "otherDateFormat": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX", | |
| } | |
| ] | |
| #create stream selector stage | |
| match_patrol_predicate = "${str:matches(record:value('/insertOne/document/phase'), \"patrol\")}" | |
| default_predicate = "default" | |
| stream_selector_stage = pipeline.add_stage(label = 'Stream Selector') | |
| stream_selector_stage.label = "Check drone phase, discard non-patrol" | |
| stream_selector_stage.predicates = [ | |
| match_patrol_predicate, | |
| default_predicate | |
| ] | |
| #discard destination | |
| discard_stage = pipeline.add_stage(label = 'Trash', type='destination') | |
| discard_stage.label = "Discard" | |
| #trash | |
| #trash_stage = pipeline.add_stage(label = 'Trash', type='destination') | |
| #add Astra DB REST destination | |
| destination_astradb_stage = pipeline.add_stage(label = 'HTTP Client', type='destination') | |
| destination_astradb_stage.label = "Drone telemetry to Astra DB" | |
| destination_astradb_stage.resource_url = f"{ASTRADB_API_ENDPOINT}/api/json/v1/{ASTRADB_KEYSPACE_NAME}/{ASTRADB_COLLECTION_NAME}" | |
| destination_astradb_stage.headers = [ | |
| { | |
| "key": "Token", | |
| "value": f"{ASTRADB_APPLICATION_TOKEN}" | |
| }, | |
| { | |
| "key": "Content-Type", | |
| "value": "application/json" | |
| } | |
| ]#help(destination_astradb_stage) | |
| #stream selector stage (discard all records not in "patrol" phase) | |
| #${str:matches(record:value('/transaction_type'), "deposit") || str:matches(record:value('/transaction_type'), "withdrawal")} | |
| #help(field_type_converter_stage) | |
| #create dev identity stages for checking message in control plane GUI | |
| dev_identity_1_stage = pipeline.add_stage(label = 'Dev Identity') | |
| dev_identity_1_stage.label = "Dev Identity 1" | |
| dev_identity_2_stage = pipeline.add_stage(label = 'Dev Identity') | |
| dev_identity_2_stage.label = "Dev Identity 2" | |
| #connect stream selector to destination | |
| # Check where in the list of predicates the condition is | |
| predicate_index = stream_selector_stage.predicates.index(next(predicate for predicate in stream_selector_stage.predicates if predicate['predicate'] == match_patrol_predicate)) | |
| # Connect the stream_selector stage to the stage | |
| stream_selector_stage.connect_outputs(stages=[dev_identity_1_stage], output_lane_index=predicate_index) | |
| #discard all but patrol messages | |
| # Check where in the list of predicates the condition is | |
| predicate_index = stream_selector_stage.predicates.index(next(predicate for predicate in stream_selector_stage.predicates if predicate['predicate'] == default_predicate)) | |
| # Connect the stream_selector stage to the stage | |
| stream_selector_stage.connect_outputs(stages=[dev_identity_2_stage], output_lane_index=predicate_index) | |
| #help(stream_selector_stage) | |
| #javascript stage | |
| create_astradb_json_stage = pipeline.add_stage(label = 'JavaScript Evaluator') | |
| create_astradb_json_stage.label = "Create JSON for Astra DB" | |
| create_astradb_json_stage.script = """ | |
| var records = sdc.records; | |
| for(var i = 0; i < records.length; i++) { | |
| try { | |
| var documentMap = {}; | |
| documentMap["drone_id"] = records[i].value['drone_id']; | |
| documentMap["event_time"] = records[i].value['event_time']; | |
| documentMap["phase"] = records[i].value['phase']; | |
| documentMap["altitude"] = records[i].value['altitude']; | |
| documentMap["latitude"] = records[i].value['latitude']; | |
| documentMap["longitude"] = records[i].value['longitude']; | |
| documentMap["speed"] = records[i].value['speed']; | |
| documentMap["heading"] = records[i].value['heading']; | |
| var insertOneMap = {'insertOne': {'document':documentMap}} | |
| records[i].value['insertOne'] = insertOneMap; | |
| var newRecord = sdc.createRecord(records[i].sourceId + '-insertOne'); | |
| var newRecordFields = sdc.createMap(false); | |
| newRecord.value = insertOneMap; | |
| sdc.output.write(newRecord); | |
| } catch (e) { | |
| // Send record to error | |
| sdc.error.write(records[i], e); | |
| } | |
| } | |
| """ | |
| #help(create_astradb_json_stage) | |
| #connect stages | |
| origin_mqtt_stage >> field_remover_stage | |
| field_remover_stage >> field_renamer_stage | |
| field_renamer_stage >> field_type_converter_stage | |
| field_type_converter_stage >> create_astradb_json_stage | |
| create_astradb_json_stage >> stream_selector_stage | |
| dev_identity_1_stage >> destination_astradb_stage | |
| dev_identity_2_stage >> discard_stage | |
| #stream_selector_stage >> trash_stage | |
| sch.publish_pipeline(pipeline, commit_message = COMMIT_MESSAGE) | |
| print("Pipeline published.") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment