-
-
Save deniszh/7986974 to your computer and use it in GitHub Desktop.
| #!/usr/bin/env python | |
| import os | |
| import mmap | |
| import struct | |
| import signal | |
| import optparse | |
| import cql | |
| try: | |
| import whisper | |
| except ImportError: | |
| raise SystemExit('[ERROR] Please make sure whisper is installed properly') | |
| # Ignore SIGPIPE | |
| signal.signal(signal.SIGPIPE, signal.SIG_DFL) | |
| option_parser = optparse.OptionParser(usage='''%prog path''') | |
| (options, args) = option_parser.parse_args() | |
| if len(args) != 1: | |
| option_parser.error("require one input file name") | |
| else: | |
| path = args[0] | |
| def mmap_file(filename): | |
| fd = os.open(filename, os.O_RDONLY) | |
| map = mmap.mmap(fd, os.fstat(fd).st_size, prot=mmap.PROT_READ) | |
| os.close(fd) | |
| return map | |
| def read_header(map): | |
| try: | |
| (aggregationType,maxRetention,xFilesFactor,archiveCount) = struct.unpack(whisper.metadataFormat,map[:whisper.metadataSize]) | |
| except: | |
| raise CorruptWhisperFile("Unable to unpack header") | |
| archives = [] | |
| archiveOffset = whisper.metadataSize | |
| for i in xrange(archiveCount): | |
| try: | |
| (offset, secondsPerPoint, points) = struct.unpack(whisper.archiveInfoFormat, map[archiveOffset:archiveOffset+whisper.archiveInfoSize]) | |
| except: | |
| raise CorruptWhisperFile("Unable to read archive %d metadata" % i) | |
| archiveInfo = { | |
| 'offset' : offset, | |
| 'secondsPerPoint' : secondsPerPoint, | |
| 'points' : points, | |
| 'retention' : secondsPerPoint * points, | |
| 'size' : points * whisper.pointSize, | |
| } | |
| archives.append(archiveInfo) | |
| archiveOffset += whisper.archiveInfoSize | |
| header = { | |
| 'aggregationMethod' : whisper.aggregationTypeToMethod.get(aggregationType, 'average'), | |
| 'maxRetention' : maxRetention, | |
| 'xFilesFactor' : xFilesFactor, | |
| 'archives' : archives, | |
| } | |
| return header | |
| def dump_header(header): | |
| print 'Meta data:' | |
| print ' aggregation method: %s' % header['aggregationMethod'] | |
| print ' max retention: %d' % header['maxRetention'] | |
| print ' xFilesFactor: %g' % header['xFilesFactor'] | |
| def dump_archive_headers(archives): | |
| for i,archive in enumerate(archives): | |
| print 'Archive %d info:' % i | |
| print ' offset: %d' % archive['offset'] | |
| print ' seconds per point: %d' % archive['secondsPerPoint'] | |
| print ' points: %d' % archive['points'] | |
| print ' retention: %d' % archive['retention'] | |
| print ' size: %d' % archive['size'] | |
| def dump_archives(archives): | |
| name = path.replace('/opt/graphite/storage/whisper/','',1) | |
| name = name.replace('.wsp','',1) | |
| name = name.replace('/', '.') | |
| con = cql.connect('127.0.0.1', 9160, 'metric', cql_version='3.0.0') | |
| print ("Connected to Cassandra!") | |
| cursor = con.cursor() | |
| for i,archive in enumerate(archives): | |
| print 'Archive %d data:' %i | |
| offset = archive['offset'] | |
| for point in xrange(archive['points']): | |
| (timestamp, value) = struct.unpack(whisper.pointFormat, map[offset:offset+whisper.pointSize]) | |
| print '%d: %d, %10.35g' % (point, timestamp, value) | |
| offset += whisper.pointSize | |
| period = archive['retention'] | |
| rollup = archive['secondsPerPoint'] | |
| ttl = period | |
| #CQLString = "UPDATE metric USING TTL ? SET data = data + ? WHERE tenant = '' AND rollup = ? AND period = ? AND path = ? AND time = ?;" | |
| #cursor.execute(CQLString, [ttl, value, rollup, period, name, timestamp]) | |
| #print CQLString, [ttl, value, rollup, period, name, timestamp] | |
| if timestamp > 0: | |
| CQLString = "UPDATE metric USING TTL %d SET data = data + [ %d ] WHERE tenant = '' AND rollup = %d AND period = %d AND path = '%s' AND time = %d;" % (ttl, value, rollup, period, name, timestamp) | |
| #print CQLString | |
| cursor.execute(CQLString) | |
| if not os.path.exists(path): | |
| raise SystemExit('[ERROR] File "%s" does not exist!' % path) | |
| map = mmap_file(path) | |
| header = read_header(map) | |
| dump_header(header) | |
| dump_archive_headers(header['archives']) | |
| dump_archives(header['archives']) |
Hi @punnie,
never tried to migrate our main storage though because of space concerns. But, even for linear scalability, if migration of 1281 file took ~ 1281 sec (very loose approximation) in 16 threads, then migration of full storage of 160K metrics will took 160000 sec = 44 hours.
I do not think that you will be able to squeeze more speed from python version, and I'm loose at clojure - maybe @pir will be able to provide clojure version of this script with built in concurrency.
@punnie
But btw - speed is not required for migration. You can spit metrics stream in two with relay, one stream will go to your old installation - new will go to cyanite. After that you can copy wsp files and migrate them, even if it will took a week - you will get all data in cyanite and then remove old installation.
Sorry for digging up this old thing, but I've got to ask @deniszh: how long did this script take to run for your TB of whisper data? I've been running it for our measly 300GB of data for about 12 hours now, using 8 parallel jobs and it seems like it will take literally forever.
I know I'm just being lazy now, but is there a more updated version of this? ;)