Skip to content

Instantly share code, notes, and snippets.

@sushmithapopuri
Last active June 2, 2024 11:41
Show Gist options
  • Select an option

  • Save sushmithapopuri/e40a99e4b39516e459ee46c374f0fb83 to your computer and use it in GitHub Desktop.

Select an option

Save sushmithapopuri/e40a99e4b39516e459ee46c374f0fb83 to your computer and use it in GitHub Desktop.
import boto3
def get_session(env_name):
if env_name == 'prd':
return boto3.Session(aws_access_key_id='',aws_secret_access_key='')
if env_name == 'qa':
return boto3.Session(aws_access_key_id='',aws_secret_access_key='')
if env_name == 'dev':
return boto3.session.Session()
def send_msg(message, i):
queue_url = 'https://sqs.eu-west-1.amazonaws.com/'
queue_url = 'https://sqs.eu-west-1.amazonaws.com/'
response = sqs_client.send_message(QueueUrl=queue_url,MessageBody=(message), DelaySeconds = i)
print(response['MessageId'])
boto_session = get_session('dev')
sqs_client = boto_session.client('sqs')
for i in range(1,50):
send_msg('Hello', i)
# Create SQS client
sqs = boto3.client('sqs')
queue_url = 'arn:aws:sqs:eu-west-1:702142730744:gms-obr-mcr-test-lz-in-dev1'
# Send message to SQS queue
def snd_msg():
return sqs.send_message(
QueueUrl=queue_url,
DelaySeconds=10,
MessageAttributes={
'Title': {
'DataType': 'String',
'StringValue': 'Test'
}
},
MessageBody=(
'Test Message for Autoscaling '
)
)
print('Hi')
for i in range (10):
print(snd_msg())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment