Skip to content

Instantly share code, notes, and snippets.

@aliafshar
Created June 24, 2014 20:47
Show Gist options
  • Select an option

  • Save aliafshar/883b675ad8822eff6298 to your computer and use it in GitHub Desktop.

Select an option

Save aliafshar/883b675ad8822eff6298 to your computer and use it in GitHub Desktop.
Some non-running code on async operations in compute
class Connection(object):
#...
def get_operation(self, operation, zone=None):
resp = self.svc.zoneOperations().get(**self._default_args(operation=operation,
zone=zone)).execute()
return Operation(raw=resp, connection=self)
def await_operation(self, operation, zone=None):
if isinstance(operation, Operation):
operation_arg = operation.name
zone_arg = zone or name_from_uri(operation.zone)
else:
operation_arg = operation
zone_arg = zone
while True:
op = self.get_operation(operation_arg, zone_arg)
if op.status == Operation.DONE:
break
time.sleep(1) # TODO decide a polling interval and make it configurable.
if operation.targetLink:
return get_object_from_url(self.http, operation.targetLink)
else:
return operation
def get_model_from_json(raw):
if hasattr(raw, 'get'):
name = raw.get('kind')
if name:
t = type_map.get(name)
if not t:
raise TypeError(name)
return t(raw)
return raw
def get_object_from_url(http, url):
resp, content = http.request(url)
raw = json.loads(content)
return get_model_from_json(raw)
class Operation(Model):
api_type = 'compute#operation'
name = ApiProperty('name', str)
zone = ApiProperty('zone', str)
selfLink = ApiProperty('selfLink', str)
link = ApiProperty('selfLink', str)
targetLink = ApiProperty('targetLink', str)
target = ApiProperty('targetLink', str)
status = ApiProperty('status', str)
target_type = None
DONE = 'DONE'
RUNNING = 'RUNNING'
PENDING = 'PENDING'
def poll(self, connection=None):
connection = connection or self.connection
def wait(self, status=None, connection=None):
status = status or Operation.DONE
connection = connection or self.connection
return connection.await_operation(self)
def fetch_target(self):
if self.targetLink:
return get_object_from_url(self.targetLink)
else:
return self
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment