Skip to content

Instantly share code, notes, and snippets.

@samisalkosuo
Created November 26, 2025 07:33
Show Gist options
  • Select an option

  • Save samisalkosuo/79e249f3fc6664ccb44d14ff02f572ae to your computer and use it in GitHub Desktop.

Select an option

Save samisalkosuo/79e249f3fc6664ccb44d14ff02f572ae to your computer and use it in GitHub Desktop.
from streamsets.sdk import ControlHub
import os
import sys
STREAMSETS_CREDENTIAL_ID = os.environ.get("STREAMSETS_CREDENTIAL_ID", "<default-credential-id>")
STREAMSETS_TOKEN = os.environ.get("STREAMSETS_TOKEN", "<default-token>")
STREAMSETS_ENGINE_ID = os.environ.get("STREAMSETS_ENGINE_ID", "<default-engine-id>")
STREAMSETS_PIPELINE_NAME = os.environ.get("STREAMSETS_PIPELINE_NAME", "Lab - Drone Ops - MQTT to Kafka")
ASTRADB_API_ENDPOINT = os.environ.get("ASTRADB_API_ENDPOINT", "<astradb-api-endpoint>")
ASTRADB_KEYSPACE_NAME = os.environ.get("ASTRADB_KEYSPACE_NAME", "default_keyspace")
ASTRADB_COLLECTION_NAME = os.environ.get("ASTRADB_COLLECTION_NAME", "drone_telemetry")
ASTRADB_APPLICATION_TOKEN = os.environ.get("ASTRADB_APPLICATION_TOKEN", "<AstraCS:token>")
MQTT_BROKER_URL = "tcp://mqtt.lab-mqtt.svc.cluster.local:1883"
#update commit message from argument if available
try:
COMMIT_MESSAGE = sys.argv[1]
except IndexError:
COMMIT_MESSAGE = "updated"
def getControlHub():
sch = ControlHub(credential_id=STREAMSETS_CREDENTIAL_ID, token=STREAMSETS_TOKEN)
#print(sch)
return sch
def getPipeline(sch):
try:
#get pipeline
pipeline = sch.pipelines.get(name=STREAMSETS_PIPELINE_NAME)
print(f"Creating new version of pipeline: {pipeline}")
except ValueError:
#create pipeline
engine = sch.engines.get(id=STREAMSETS_ENGINE_ID)
pipeline_builder = sch.get_pipeline_builder(engine_type='data_collector', engine_id=engine.id)
pipeline = pipeline_builder.build(STREAMSETS_PIPELINE_NAME)
sch.publish_pipeline(pipeline, commit_message='created')
#get newly created pipeline
pipeline = sch.pipelines.get(name=STREAMSETS_PIPELINE_NAME)
print(f"Pipeline created: {pipeline}")
#print(pipeline)
#stages
stages = pipeline.stages
#remove all stages
for stage in stages:
pipeline.remove_stages(stage)
return pipeline
def createStage(pipeline, stage_name, stage_label, type = None):
if type == None:
stage = pipeline.add_stage(label = stage_name)
else:
stage = pipeline.add_stage(label = stage_name, type = type)
stage.label = stage_label
return stage
def publish(sch, pipeline):
sch.publish_pipeline(pipeline, commit_message = COMMIT_MESSAGE)
print("Pipeline published.")
def main():
sch = getControlHub()
pipeline = getPipeline(sch)
#create stages
origin_mqtt_stage = createStage(pipeline, stage_name = 'MQTT Subscriber', stage_label = "Drone telemetry from MQTT", type = "origin")
origin_mqtt_stage.data_format = "JSON"
origin_mqtt_stage.broker_url = MQTT_BROKER_URL
origin_mqtt_stage.topic_filter = ["drone.telemetry"]
origin_mqtt_stage.clean_session = True
#field type converter
field_type_converter_stage = createStage(pipeline, stage_name = 'Field Type Converter', stage_label = "Convert types")
field_type_converter_stage.conversions_by_field_name = [
{
"fields": [
"/ts"
],
"targetType": "DATETIME",
"dateFormat": "OTHER",
"otherDateFormat": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX",
}
]
#create stream selector stage
match_patrol_predicate = "${str:matches(record:value('/phase'), \"patrol\")}"
default_predicate = "default"
stream_selector_stage = createStage(pipeline, stage_name = 'Stream Selector', stage_label = "Check drone phase, discard non-patrol")
stream_selector_stage.predicates = [
match_patrol_predicate,
default_predicate
]
#discard destination stage
#all but patrol messages are discarded
discard_stage = createStage(pipeline, stage_name = 'Trash', stage_label = "Discard", type='destination')
#add Astra DB REST destination
destination_astradb_stage = createStage(pipeline, stage_name = 'HTTP Client', stage_label = "Drone telemetry to Astra DB", type = 'destination')
destination_astradb_stage.resource_url = f"{ASTRADB_API_ENDPOINT}/api/json/v1/{ASTRADB_KEYSPACE_NAME}/{ASTRADB_COLLECTION_NAME}"
destination_astradb_stage.http_method = "POST"
destination_astradb_stage.headers = [
{
"key": "Token",
"value": f"{ASTRADB_APPLICATION_TOKEN}"
},
{
"key": "Content-Type",
"value": "application/json"
}
]
#create dev identity stages for checking message in control plane GUI
dev_identity_1_stage = createStage(pipeline, stage_name = 'Dev Identity', stage_label = "Dev Identity 1")
dev_identity_2_stage = createStage(pipeline, stage_name = 'Dev Identity', stage_label = "Dev Identity 2")
#javascript stage
create_astradb_json_stage = createStage(pipeline, stage_name = 'JavaScript Evaluator', stage_label = "Create JSON for Astra DB")
create_astradb_json_stage.script = """
var records = sdc.records;
for(var i = 0; i < records.length; i++) {
try {
var documentMap = {};
documentMap["drone_id"] = records[i].value['drone_id'];
documentMap["event_time"] = records[i].value['ts'];
documentMap["altitude"] = records[i].value['alt_m'];
documentMap["latitude"] = records[i].value['lat'];
documentMap["longitude"] = records[i].value['lon'];
documentMap["speed"] = records[i].value['speed_mps'];
documentMap["heading"] = records[i].value['heading_deg'];
var insertOneMap = {'insertOne': {'document':documentMap}}
records[i].value['insertOne'] = insertOneMap;
var newRecord = sdc.createRecord(records[i].sourceId + '-insertOne');
var newRecordFields = sdc.createMap(false);
newRecord.value = insertOneMap;
sdc.output.write(newRecord);
} catch (e) {
// Send record to error
sdc.error.write(records[i], e);
}
}
"""
#connect stages
#connect stream selector to destination
# Check where in the list of predicates the condition is
predicate_index = stream_selector_stage.predicates.index(next(predicate for predicate in stream_selector_stage.predicates if predicate['predicate'] == match_patrol_predicate))
# Connect the stream_selector stage to the stage
stream_selector_stage.connect_outputs(stages=[dev_identity_1_stage], output_lane_index=predicate_index)
# Check where in the list of predicates the condition is
predicate_index = stream_selector_stage.predicates.index(next(predicate for predicate in stream_selector_stage.predicates if predicate['predicate'] == default_predicate))
# Connect the stream_selector stage to the stage
stream_selector_stage.connect_outputs(stages=[dev_identity_2_stage], output_lane_index=predicate_index)
#connect stages
origin_mqtt_stage >> stream_selector_stage
dev_identity_2_stage >> discard_stage
dev_identity_1_stage >> field_type_converter_stage
field_type_converter_stage >> create_astradb_json_stage
create_astradb_json_stage >> destination_astradb_stage
publish(sch, pipeline)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment