Skip to content

Instantly share code, notes, and snippets.

@rdebroiz
Last active April 15, 2018 06:12
Show Gist options
  • Select an option

  • Save rdebroiz/4dcc59217bc5b5a88f17116c6df05261 to your computer and use it in GitHub Desktop.

Select an option

Save rdebroiz/4dcc59217bc5b5a88f17116c6df05261 to your computer and use it in GitHub Desktop.
import datetime
import threading
import queue
import time
import sys
import pydicom
import pynetdicom3
SCP_PORT = 12345
SCP_HOST = "localhost"
SCP_AET = "POUET_SCP"
SCU_AET = "POUET_SCU"
COUNT = 0
def start_scp(number_of_thread):
application = pynetdicom3.AE(
port=SCP_PORT,
ae_title=SCP_AET,
scp_sop_class=pynetdicom3.StorageSOPClassList,
)
application.maximum_associations = number_of_thread
def on_c_store(dataset):
global COUNT
COUNT += 1
answer = pydicom.Dataset()
answer.add_new(pydicom.tag.Tag(0x0000, 0x0900), 'US', 0x0000)
print('pouet: received dataset. SOPInstance: ', dataset.SOPInstanceUID,
", count: ", COUNT)
return answer
application.on_c_store = on_c_store
application.start()
def start_scu(file, number_of_thread, number_of_cstore):
startup_time = datetime.datetime.now()
application = pynetdicom3.AE(
port=SCP_PORT,
ae_title=SCU_AET,
scu_sop_class=pynetdicom3.StorageSOPClassList,
)
application.maximum_associations = number_of_thread
dataset_list = [pydicom.dcmread(file)] * number_of_cstore
consumer_queue = queue.Queue()
def consume_assoc():
print(threading.current_thread().getName(), "start consuming queue")
while True:
datasets = consumer_queue.get()
assoc = application.associate(SCP_HOST, SCP_PORT, ae_title=SCP_AET)
while not assoc.is_established:
time.sleep(0.001)
assoc = application.associate(SCP_HOST, SCP_PORT, ae_title=SCP_AET)
try:
for ds in datasets:
answer = assoc.send_c_store(ds)
if answer.Status:
raise ValueError(str(answer))
except:
raise
finally:
assoc.release()
total_time = datetime.datetime.now() - startup_time
print(threading.current_thread().getName(), "send ",
len(dataset_list), "datasets",
"elapsed time:", total_time.total_seconds(), "seconds")
consumer_queue.task_done()
for thread_id in range(0, number_of_thread):
name = "consumer_{}".format(thread_id +1)
thread = threading.Thread(target=consume_assoc, name=name)
thread.setDaemon(True)
thread.start()
for thread_id in range(0, number_of_thread):
consumer_queue.put(dataset_list)
# let some time to the queue to be consumed
time.sleep(1)
if __name__ == '__main__':
usage = (
"USAGE: python pynetdicom_thread.py "
"( scp <max_association> | scu <file> <nb_thread> <nb_cstore>)"
)
try:
if sys.argv[1] == 'scp':
start_scp(int(sys.argv[2]))
elif sys.argv[1] == 'scu':
start_scu(sys.argv[2], int(sys.argv[3]), int(sys.argv[4]))
else:
print(usage)
except Exception:
print(usage)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment