-
-
Save mdornseif/950846 to your computer and use it in GitHub Desktop.
| #!/usr/bin/env python | |
| # encoding: utf-8 | |
| """ | |
| replication.py - extport data to external systems | |
| Created by Maximillian Dornseif on 2011-05-01. | |
| Copyright (c) 2011 HUDORA. All rights reserved. | |
| """ | |
| from __future__ import with_statement | |
| import config | |
| config.imported = True | |
| from ablage.models import Akte, Dokument, DokumentFile | |
| from boto.s3.connection import Location | |
| from gaetk import webapp2 | |
| from gaetk.handler import BasicHandler | |
| from google.appengine.api import taskqueue | |
| from google.appengine.ext import db | |
| from huTools import hujson | |
| import StringIO | |
| import boto.exception | |
| import boto.s3 | |
| import datetime | |
| import logging | |
| import tarfile | |
| import time | |
| ## Disaster Recovery on AppEngine and of-site Replication | |
| # In the following paragraphs we consider several disaster scenarios and how | |
| # to guard against them. Here we are only considering Safety (availability) | |
| # issues, not Security (confidentiality) issues. Our data is hosted on Google | |
| # AppEngine Servers which seem exclusively be controlled by Google Inc. and | |
| # exclusively hosted in the United States. This contributes to some disaster | |
| # recovery scenarios. | |
| # 1. Due to some programming or administration Error on our side data is wiped | |
| # out. | |
| # 2. Due to some programming or administration Error on Googles side data is | |
| # wiped out. Data may or may not | |
| # be restored by Google after some time (see "[unapplied writes][1]" in | |
| # 2010 or the AWS EBS outage in 2011). | |
| # 3. Due to some third party soft or hardware involvement data is wiped out. | |
| # Think of student or coordinated physical attacks on datacenters. | |
| # 4. Due to some contractual problems (e.g. we don't pay) data is deliberately | |
| # wiped out by Google. | |
| # 5. The US government or US court system decides to deny us access to our data. | |
| # 6. A disgruntled Admin decides to delete our data. | |
| # | |
| # In addition there are some desirable properties the replication should have: | |
| # | |
| # 1. One copy of the data must be stored within the EU. This is a requirement | |
| # for tax record keeping. | |
| # 2. One copy of the data should be stored within Germany. This makes tax | |
| # record keeping easier. | |
| # 3. One copy of the data should be stored on site. This would ensure | |
| # availability even if our company can't pay any 3 rd parties for storage | |
| # for some time. | |
| # 4. The replication should involve only minimal administrative resources. | |
| # We always keep this image in mind when designing that stuff: | |
| # http://static.23.nu/md/Pictures/ZZ573344DB.png | |
| # Especially we want to avoid cronjobs on unix machines which need | |
| # additional monitoring, patching, upgrading, disks, backups, etc. | |
| # If possible all should run on AppEngine. | |
| # One thing the replicas don't need to provide is immediate access. As long as | |
| # the data and metadata is replicated somewhere and can be loaded into an | |
| # (possibly to write on demand) application within a reasonable timeframe | |
| # we are fine. Several of the scenarios above imply that we would not have | |
| # access to AppEngine infrastructure and must rewrite our software anyhow. | |
| # So direct restore from the replicas is not needed. | |
| # We decided not to use the [bulkloader][3] for backups. While the bulkloader | |
| # is a fine pice of software it seems that it can't be used for | |
| # incremental backups. Also er are reluctant to enable the `remote_api` because | |
| # technically this would enable every developer with admin permissions on the | |
| # AppEngine to download our complete dataset "for testing". And then a laptop | |
| # gets lost/stolen ... | |
| # I also would argue that an application with enabled `remote_api` can't | |
| # comply with any serious audit/bookkeeping standards. So we don't use it. | |
| # Currently we have no objects bigger than 1 MB. This will change when we use | |
| # the blobstore. Replication entities bigger than 1 MB will be challenging | |
| # since the `urlfetch` API only allows 1 MB per upload. | |
| # Options for storage we considered: | |
| ### Amazon S3 (Simple Storage Service) | |
| # This was our first choice. Provides storage in the EU, is well known and | |
| # regarded and comes with a rich ecosystem. With [S3 multipart upload][4] it | |
| # would be possible to generate big files but unfortunately the part size must be | |
| # 5 MB or more while with the urlfetch API we can write only 1 MB or less. So | |
| # this doesn't work. But for objects < 1 MB Amazon S3 is a fine choice. | |
| ### GS (Google Storage) and the AppEngine blobstore | |
| # Both services don't guard against most of the disaster scenarios described | |
| # above but still have some interesting properties which might make them | |
| # desirable as an immediate step to generating replicas | |
| # With [resumable uploads][5] Google Storage provides the ability to generate | |
| # very large files while still being bound to the 1 MB upload limit of the | |
| # urlfetch API. In theory I should also be able to use the new | |
| # [file-like blobstore access][6] to write large files to the blobstore. But | |
| # there is a [30 second time limit][7] on keeping the file open. One can get | |
| # arround it by reopening the file in append mode. | |
| # For now we don't use Google Storage and the AppEngine blobstore because they | |
| # don't guard against most of our disaster scenarios. | |
| ### Dropbox and box.net | |
| # We use Dropbox extensively and have plenty of storage with a "Team" account. | |
| # We already use Dropbox for file input and output to AppEngine applications. | |
| # Dropbox provides not only online storage but also syncing to local machines. | |
| # Installing a dropbox client on a local machine would provide us with onside | |
| # storage with minimal administrative hassle. Offside Storage within the EU | |
| # would be more work. | |
| # Unfortunately the [public Dropbox API][8] does not provide a append to file | |
| # operation or something else to create files bigger than 1 MB from AppEngine. | |
| # The ecosystem for Dropbox Python libraries seems somewhat immature. | |
| # I haven't looked to close into box.net, but [the box.net upload API][9] | |
| # seems more or less have the same limitations as Dropbox. | |
| # I also took a quick lock into Windows Azure Storage but I didn't understand | |
| # if and how I can use only the storage service. | |
| ### Rackspace Cloudfiles | |
| # Cloudfiles is a storage offering provided by Rackspace in the United Stated | |
| # but also [in the united Kingdom by reacspace.co.uk][10]. And the United | |
| # Kingdom is in (mostly) the EU. Rackspace is pushing the Cloudfiles API with | |
| # the "OpenStack" initiative but there still seems to be no extensive | |
| # ecosystem around the API. What is strange is the fact that Rackspace US and | |
| # Rackspace UK seem to have no unified billing and the like. | |
| # The API would allow creating large files via "[Large Object Creation / | |
| # segmented upload][11]". To my understanding the `PUT` method together with | |
| # byte ranges would provide a way to append to a file and the Python library | |
| # for cloudfiles already [provides generator based upload][12] but it seems to | |
| # me the current implementation would not work this way on AppEngine. | |
| ### WebDAV (self hosted) | |
| # WebDAV would allow to use `PUT` with byte ranges and therefore would allow us | |
| # to generate arbitrary large output files. But we found no ready to use Python | |
| # library supporting that and no hosted WebDAV provider offering Cloud Scale | |
| # and Cloud pricing. We want to avoid self-hosted servers. | |
| ## Choice of technology | |
| # Currently the only services we felt comfortable with based on technology and | |
| # pricing where Amazon S3 and Rackspace Cloudfiles. Cloudfiles has the better | |
| # API for our requirements that would allow us to create very large files. | |
| # Amazon S3 has a much richer environment of Desktop and Browser based | |
| # utilities, FUSE filesystems etc. Therefor we decided for now to focus on | |
| # output smaller than 1 MB and start with using Amazon S3. We would use one of | |
| # the many desktop utilities to regularly sync data from Amazon S3 to local | |
| # on-site storage. (one Unix cronjob to monitor `:-(` ) | |
| # This approach will guard against all the described disaster scenarios above. | |
| # It should also guard against most data corruption scenarios because most of | |
| # our data structures are designed to be immutable: data is never rewritten, | |
| # instead a new version of the objects is created in the datastore. The old | |
| # version is kept for audit purposes. | |
| # [1]: http://groups.google.com/group/google-appengine-downtime-notify/msg/e9414ee6493da6fb | |
| # [3]: http://code.google.com/appengine/docs/python/tools/uploadingdata.html#Downloading_and_Uploading_All_Data | |
| # [4]: http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?uploadobjusingmpu.html | |
| # [5]: http://code.google.com/apis/storage/docs/developer-guide.html#resumable | |
| # [6]: http://code.google.com/appengine/docs/python/blobstore/overview.html#Writing_Files_to_the_Blobstore | |
| # [7]: http://groups.google.com/group/google-appengine-python/browse_thread/thread/7c52e9397fb88ac7 | |
| # [8]: https://www.dropbox.com/developers | |
| # [9]: http://developers.box.net/w/page/12923951/ApiFunction_Upload-and-Download | |
| # [10]: http://www.rackspace.co.uk/cloud-hosting/cloud-files/ | |
| # [11]: http://docs.rackspacecloud.com/files/api/v1/cf-devguide-20110420.pdf | |
| # [12]: https://github.com/rackspace/python-cloudfiles/blob/master/cloudfiles/storage_object.py#L429 | |
| ## Implementation | |
| # We want incremental replication running in fixed intervals (e.g. every 15 | |
| # minutes). All data changed since the last replication should be written to | |
| # the external storage. All of our datastore Entities have an `updated_at` | |
| # property tat is set on entity update and creation. We use this to select | |
| # entities for replication. | |
| # Data is stored as PDF and JSON to Amazon S3. | |
| # It is Packaged into "TAR-Archives" conforming to the _IEEE Std 1003.1-2001_. | |
| # The `boto` library has the tendency to flood the AppEngine log with | |
| # usually uninteresting stuff therefore we supress logging. | |
| logging.getLogger('boto').setLevel(logging.CRITICAL) | |
| # To remember which entity was the last one replicated we use a separate | |
| # simple model storing only a timestamp. | |
| class DateTimeConfig(db.Model): | |
| """Store Configuration as DateTime.""" | |
| data = db.DateTimeProperty() | |
| # We also use a Model for storing AWS access credentials. | |
| # You need to set the Credentials before first use like this: | |
| # StrConfig.get_or_insert('aws_key_%s' % tenant, data='*key*') | |
| # StrConfig.get_or_insert('aws_secret_%s' % tenant, data='*secret*') | |
| class StrConfig(db.Model): | |
| """Store Configuration as String.""" | |
| data = db.StringProperty(default='') | |
| # The replication is working incrementally and expected to be called at | |
| # regular intervals. Here we are only replicating Document entities and | |
| # related PDFs for a single tenant. | |
| # The replication stops after 500 seconds by default. This is to make sure | |
| # that where are no race conditions between a long running replication job | |
| # and the next job started by cron. Such a race condition should not result | |
| # in data loss but is a waste of resources. | |
| # We start of by the updated_at timestamp of the last entity successfully | |
| # replicated taken from `DateTimeConfig`. Alternatively the caller can | |
| # provide a start timestamp. During the first run we start by default at | |
| # 2010-11-01. | |
| def replicate_documents_s3(tenant, timelimit=500, startfrom=None): | |
| """Replication of Document Entities.""" | |
| if not startfrom: | |
| startfrom = DateTimeConfig.get_or_insert('export_latest_document_%s' % tenant, | |
| data=datetime.datetime(2010, 11, 1)).data | |
| starttime = time.time() | |
| maxdate = startfrom | |
| # The connection to S3 is set up based on credentials found in the | |
| # datastore. If not set we use default values which can be updated via | |
| # the datastore admin. | |
| # We assume that when connecting from GAE to AWS using SSL/TLS | |
| # is of little use. | |
| s3connection = boto.connect_s3( | |
| StrConfig.get_or_insert('aws_key_%s' % tenant, data='*key*').data, | |
| StrConfig.get_or_insert('aws_secret_%s' % tenant, data='*secret*').data, | |
| is_secure=False) | |
| # We create one bucket per month per tennant for replication. While there | |
| # seems to be no practical limit on how many keys can be stored in a S3 | |
| # bucket, most frontend tools get very slow with more than a few thousand | |
| # keys per bucket. We currently have less than 50.000 entities per month. | |
| # If we had more entities we would probably be better of with creating | |
| # a S3 bucket per day. | |
| bucket_name = 'XXX.%s-%s' % (tenant, startfrom.strftime('%Y-%m')) | |
| try: | |
| s3connection.create_bucket(bucket_name, location=Location.EU) | |
| except boto.exception.S3CreateError: | |
| logging.info("S3 bucket %s already exists", bucket_name) | |
| pass | |
| # We start replicating from the `updated_at` timestamp of the last entity | |
| # replicated. This results in the last entity being replicated twice. | |
| # While this is a certain waste of resources it ensures that the system | |
| # reliably replicates even if two entities have exactly the same | |
| # timestamp (which should never happen, due to the sub-millisecond | |
| # resolution) of the timestamp, but you never know. | |
| logging.info("archiving starting from %s to %s" % (startfrom, bucket_name)) | |
| docs = Dokument.all().filter('tenant =', tenant).filter( | |
| 'updated_at >=', startfrom).order('updated_at') | |
| # The JSON data will be encapsulated in a tar-gz stream to reduce the | |
| # number of small files in S3. `Tarstream` offers a sane interface for | |
| # generating tarfiles. | |
| tarf = Tarstream() | |
| # The first version of this code used iterator access to loop over the | |
| # documents. Unfortunately we saw timeouts and strange errors after about | |
| # 80 seconds. Using `fetch()` removed that issue and also nicely limits | |
| # the number of documents archived per call. | |
| # This approach would fail if we had 300 or more entities with exactly | |
| # the same timestamp. We just hope this will not happen. | |
| docs = docs.fetch(300) | |
| if docs: | |
| logging.debug("processing %d documents" % len(docs)) | |
| for doc in docs: | |
| # Prepare filenames. | |
| # We want to have no slashes and colons in filenames. Unfortunately | |
| # we can't use `strftime()` because this would loose microseconds. | |
| # Therefore we use `isoformat()` and `replace()`. | |
| # Finally we ensure the filename is not Unicode. | |
| # Since we use the updated_at property in the filename rewritten | |
| # versions of the entity will have a different filename. | |
| akte_name = '-'.join(doc.akte.key().id_or_name().split('-')[1:]) | |
| updated_at = doc.updated_at.isoformat().replace('-', '').replace(':', '') | |
| fname = "d-%s-%s-%s" % (updated_at, akte_name, doc.designator) | |
| fname = fname.encode('ascii', 'replace') | |
| # Serialize the entity as JSON. We put a separate file per entity | |
| # into the tar file. | |
| jsondata = hujson.dumps(doc.as_dict()) | |
| tarf.writestr(fname + '.json', jsondata) | |
| # For every entity we have a separate entity containing a PDF. Retrieve | |
| # that and store it to S3. | |
| # When reusing the same s3bucket as used when writing the JSON files | |
| # got mixed up on the server. Creating a separate bucket instance | |
| # solved that issue. | |
| # Since we use the designator in the filename and the designator is | |
| # in fact the SHA-1 of the PDF, rewritten PDFs will have a differen | |
| # filename. | |
| pdf = DokumentFile.get_by_key_name("%s-%s" % (tenant, doc.designator)).data | |
| logging.debug("%s is %d kb", doc.designator, len(pdf)/1024) | |
| s3bucket = s3connection.get_bucket(bucket_name) | |
| s3bucket.new_key(fname + '.pdf').set_contents_from_string(pdf) | |
| # Remeber the data of the newest updated_at value. | |
| maxdate = max([maxdate, doc.updated_at]) | |
| # If we have been running for more than `timelimit` seconds, stop | |
| # replication. | |
| if time.time() - starttime > timelimit: | |
| break | |
| # If the tar file is getting to big, stop | |
| # processing. | |
| if len(tarf) > 2000000: | |
| break | |
| logging.debug("done writing files, writing tga") | |
| tarname = 'dokumente_%s_%s.tgz' % (str(startfrom).replace(' ', 'T'), str(maxdate).replace(' ', 'T')) | |
| s3bucket = s3connection.get_bucket(bucket_name) | |
| s3bucket.new_key(tarname).set_contents_from_string(tarf.getvalue()) | |
| logging.info("written %s", tarname) | |
| # Finally store `maxdate` into the datastore so we know where we should | |
| # continue next time we are called. | |
| DateTimeConfig(key_name='export_latest_document_%s' % tenant, data=maxdate).put() | |
| return maxdate | |
| # HTTP-Request Handler to be called via cron or via a taskqueue. | |
| # Being called via a regular request would impose the 30 second request | |
| # runtime limit which is undesirable. Running form a Task Queue handler | |
| # or from cron would give us 10 Minutes runtime. | |
| # Currently we use a hardcoded tennant and call this handler every 10 minutes | |
| # via cron. | |
| class ReplicateDokumenteHandler(BasicHandler): | |
| """Handler for Replication of Dokument Entities.""" | |
| def get(self): | |
| """No parameters are expected.""" | |
| tenant = 'hudora.de' | |
| maxdate = replicate_documents_s3(tenant) | |
| logging.info("processed up to %s", maxdate) | |
| self.response.out.write(str(maxdate) + '\n') | |
| post = get | |
| # HTTP-Request Handler to trigger replication via taskque. For testing purposes. | |
| class TestReplicateDokumenteHandler(BasicHandler): | |
| """Initiate Replication of Dokument Entities via TaskQueue.""" | |
| def get(self): | |
| """No parameters are expected.""" | |
| taskqueue.add(url='/automation/replication/cron/dokumente', method='GET') | |
| self.response.out.write('ok\n') | |
| # The replication of Akte Entities follows the replication of Document | |
| # entities. But since we only handle relatively small JSON data bits | |
| # this function can tar arround 2500 entities in about 30 seconds. | |
| # The datastore iterator/query interface starts acting up when | |
| # requesting much more data, so we limit ourself to 30 seconds of processing. | |
| def replicate_akten_s3(tenant, timelimit=30, startfrom=None): | |
| """Replication of Akte Entities.""" | |
| if not startfrom: | |
| startfrom = DateTimeConfig.get_or_insert('export_latest_akte_%s' % tenant, | |
| data=datetime.datetime(2010, 11, 1)).data | |
| starttime = time.time() | |
| maxdate = startfrom | |
| s3connection = boto.connect_s3( | |
| StrConfig.get_or_insert('aws_key_%s' % tenant, data='*key*').data, | |
| StrConfig.get_or_insert('aws_secret_%s' % tenant, data='*secret*').data, | |
| is_secure=False) | |
| bucket_name = 'XXX.%s-%s' % (tenant, startfrom.strftime('%Y-%m')) | |
| try: | |
| s3connection.create_bucket(bucket_name, location=Location.EU) | |
| except boto.exception.S3CreateError: | |
| logging.info("S3 bucket %s already exists", bucket_name) | |
| pass | |
| logging.info("archiving starting from %s to %s" % (startfrom, bucket_name)) | |
| akten = Akte.all().filter('tenant =', tenant).filter( | |
| 'updated_at >=', startfrom).order('updated_at') | |
| tarf = Tarstream() | |
| for akte in akten: | |
| akte_name = '-'.join(akte.key().id_or_name().split('-')[1:]) | |
| updated_at = akte.updated_at.isoformat().replace('-', '').replace(':', '') | |
| fname = "%s-a-%s" % (akte_name, str(updated_at).replace(' ', 'T')) | |
| fname = fname.encode('ascii', 'replace') | |
| jsondata = hujson.dumps(akte.as_dict()) | |
| tarf.writestr(fname + '.json', jsondata) | |
| maxdate = max([maxdate, akte.updated_at]) | |
| if time.time() - starttime > timelimit: | |
| break | |
| if len(tarf) > 2000000: | |
| # `len(tarf)` is the uncompressed Data size. If we assume 4 bits/byte comprssion ratio, | |
| # with 2 MB raw date we would still be under the 1 MB upload limit. | |
| # In reality we observe compression ratios of about 0.35 bits/byte | |
| break | |
| if len(tarf): | |
| s3bucket = s3connection.get_bucket(bucket_name) | |
| tarname = 'akten_%s_%s.tgz' % (str(startfrom).replace(' ', 'T'), str(maxdate).replace(' ', 'T')) | |
| s3bucket.new_key(tarname).set_contents_from_string(tarf.getvalue()) | |
| logging.info("written %s", tarname) | |
| DateTimeConfig(key_name='export_latest_akte_%s' % tenant, data=maxdate).put() | |
| return maxdate | |
| # HTTP-Request Handler to be called via cron or via a taskqueue. | |
| class ReplicateAktenHandler(BasicHandler): | |
| """Handler for Replication of Akte Entities.""" | |
| def get(self): | |
| """No parameters are expected.""" | |
| tenant = 'hudora.de' | |
| maxdate = replicate_akten_s3(tenant) | |
| logging.info("processed up to %s", maxdate) | |
| self.response.out.write(str(maxdate) + '\n') | |
| # HTTP-Request Handler to trigger replication via taskque. | |
| class TestReplicateAktenHandler(BasicHandler): | |
| """Initiate Replication of Akte Entities via TaskQueue.""" | |
| def get(self): | |
| """No parameters are expected.""" | |
| taskqueue.add(url='/automation/replication/cron/akten', method='GET') | |
| self.response.out.write('ok\n') | |
| ## Evaluation | |
| # We assume that the ReplicateDokumenteHandler is called every 10 minutes. | |
| # We also assume that no more than 250 Documents in 10 minutes are added to | |
| # the application. | |
| # The setup will ensure that as long as Google AppEngine and Amazon S3 are | |
| # available all data older than 20 Minutes will exists on the S3 replica | |
| # within the EU. Should Google AppEngine or Amazon S3 become unavailable | |
| # for some period of time and then become available again, replication will | |
| # automatically "catch up". No data is ever deleted by the replication | |
| # process so that data loss in the primary system will not result in data | |
| # loss in the replica. Amazon guarantees 99.99999999% data durability. | |
| # Regular mirroring of the S3 data to a on-site local storage will ensure | |
| # a third replica within Germany. | |
| # From his third replica at regular intervals copies are created to removable | |
| # storage media. This media is stored at a different office inside a save. | |
| # This strategy will protect against all treat scenarios outlined above. | |
| ## Helper functionality | |
| class Tarstream(object): | |
| """Class to allow in-memory generation of tar files - similar to zipfile.""" | |
| def __init__(self): | |
| self.tf = StringIO.StringIO() | |
| # The `|` symbol in the mode string instructs tarfile to switch | |
| # to streaming mode. | |
| self.tar = tarfile.open(fileobj=self.tf, mode="w|gz") | |
| def __len__(self): | |
| return self.tf.tell() | |
| def writestr(self, name, data): | |
| """Create a tar file entry `name` and write `data` into it.""" | |
| ti = tarfile.TarInfo(name) | |
| ti.size = len(data) | |
| ti.mtime = time.time() | |
| self.tar.addfile(ti, StringIO.StringIO(data)) | |
| def getvalue(self): | |
| """Close tarfile and return the bytestream of the tar file.""" | |
| self.tar.close() | |
| return self.tf.getvalue() | |
| def main(): | |
| application = webapp2.WSGIApplication( | |
| [ | |
| ('/automation/replication/test/akten', TestReplicateAktenHandler), | |
| ('/automation/replication/cron/akten', ReplicateAktenHandler), | |
| ('/automation/replication/test/dokumente', TestReplicateDokumenteHandler), | |
| ('/automation/replication/cron/dokumente', ReplicateDokumenteHandler), | |
| ], | |
| debug=False) | |
| application.run() | |
| if __name__ == '__main__': | |
| main() |
| import zipfile | |
| import zlib | |
| import binascii | |
| import struct | |
| class blobstoreZipFile(zipfile.ZipFile): | |
| """Class to create a PLZIP file. | |
| This is based on Python's zipfile module but is stripped down to work | |
| with file like objects which only have a write() and possibly no | |
| other methods. It is tested with the AppEngine blobstore API. | |
| """ | |
| def __init__(self, file): | |
| """Open the ZIP file with mode read "r", write "w" or append "a".""" | |
| self.filelist = [] # List of ZipInfo instances for archive | |
| self.fp = file | |
| self.pos = 0 | |
| def write(self, data): | |
| self.pos += len(data) | |
| return self.fp.write(data) | |
| def tell(self): | |
| return self.pos | |
| def writestr(self, filename, bytes, date_time, comment=''): | |
| """Write a file into the archive. The contents is the string | |
| 'bytes'. 'zinfo_or_arcname' is either a ZipInfo instance or | |
| the name of the file in the archive.""" | |
| zinfo = zipfile.ZipInfo(filename=filename, | |
| date_time=date_time) | |
| # Uncompressed size | |
| zinfo.file_size = len(bytes) | |
| zinfo.compress_type=zipfile.ZIP_DEFLATED | |
| zinfo.comment = comment | |
| zinfo.CRC = binascii.crc32(bytes) | |
| co = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15) | |
| bytes = co.compress(bytes) + co.flush() | |
| zinfo.compress_size = len(bytes) # Compressed size | |
| zinfo.header_offset = self.tell() # Start of header bytes | |
| self.write(zinfo.FileHeader()) | |
| self.write(bytes) | |
| if zinfo.flag_bits & 0x08: | |
| # Write CRC and file sizes after the file data | |
| self.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size, | |
| zinfo.file_size)) | |
| self.filelist.append(zinfo) | |
| def flush(self): | |
| """write the ending records.""" | |
| count = 0 | |
| pos1 = self.tell() | |
| records = [] | |
| # write central directory | |
| for zinfo in self.filelist: | |
| count = count + 1 | |
| dt = zinfo.date_time | |
| dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2] | |
| dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2) | |
| file_size = zinfo.file_size | |
| compress_size = zinfo.compress_size | |
| header_offset = zinfo.header_offset | |
| extra_data = zinfo.extra | |
| extract_version = zinfo.extract_version | |
| create_version = zinfo.create_version | |
| centdir = struct.pack(zipfile.structCentralDir, | |
| zipfile.stringCentralDir, create_version, | |
| zinfo.create_system, extract_version, zinfo.reserved, | |
| zinfo.flag_bits, zinfo.compress_type, dostime, dosdate, | |
| zinfo.CRC, compress_size, file_size, | |
| len(zinfo.filename), len(extra_data), len(zinfo.comment), | |
| 0, zinfo.internal_attr, zinfo.external_attr, | |
| header_offset) | |
| records.append((''.join([centdir, zinfo.filename, extra_data, zinfo.comment]))) | |
| # Write end-of-zip-archive record | |
| pos2 = self.tell() | |
| endrec = struct.pack(zipfile.structEndArchive, zipfile.stringEndArchive, | |
| 0, 0, count, count, pos2 - pos1, pos1, 0) | |
| records.append(endrec) | |
| self.write(''.join(records)) |
Thanks Maximillian. It works with fixed data e.g:
z.writestr(fn.encode('utf-8'), "testytesty", (2000,6,21,8,8,8))
(the timestamp is a required parameter)
Thank you for great work.
I have an trouble generating zip files with this code.
I can generate zip files but when I download the files, I cannot unzip them.
Probably files are broken.
my python code is below.
from google.appengine.api import files
file_name = files.blobstore.create(mime_type='application/zip',_blobinfo_uploaded_filename='test.zip')
with files.open(file_name, 'a') as f:
z = blobstoreZipFile(f)
url = 'http://hoge.com/hoge.xml'
result = urlfetch.fetch(url)
fname = 'hoge.xml'
fname = fname.encode('utf-8')
import datetime
z.writestr(fname, result.content, date_time=datetime.datetime.now().timetuple()[:6])
# Finalize ZIP file and write directory
z.flush()
files.finalize(file_name)
blob_key = files.blobstore.get_blob_key(file_name)
result_file = Zipmodel(key_name='testzip',title='testzip',blob_key=blob_key)
result_file.put()
return Response('ok')
When I unzip on mac os Lion, I got these messages.
$ unzip test.zip
Archive: test.zip
warning [test.zip]: 61 extra bytes at beginning or within zipfile
(attempting to process anyway)
error [test.zip]: reported length of central directory is
-61 bytes too long (Atari STZip zipfile? J.H.Holm ZIPSPLIT 1.1
zipfile?). Compensating...
inflating: hoge.xml
Why this trouble happen?
ArneS, interseting. I would try to ensure that you get a plain string/buffer object from the blobstore. e.g. str(fn. read()).
yosukesuzuki, the first thing to check is, if the zipfile looks like a zipfile should look. Compare the output of:
hexdump -C test.zip
hexdump -C some_working_zipfile.zip
Hi, mdornseif. Thank you for your reply.
Last 3 lines of hexdump result is below.
00000e60 32 32 2e 67 69 66 50 4b 05 06 00 00 00 00 01 00 |22.gifPK........|
00000e70 01 00 00 00 00 00 1d 0e 00 00 00 00 |............|
00000e7c
I compared the output with the one of a zip file which is generated by 7zip.
Does the second last line lack anything?
@ArneS - did you find a solution - I too am trying to add blobs from the blobstore to a zip, I get the following error in the log
'ascii' codec can't decode byte 0x83 in position 11: ordinal not in range(128)
Traceback (most recent call last):
File "/base/python_runtime/python_lib/versions/1/google/appengine/ext/webapp/_webapp25.py", line 703, in call
handler.post(*groups)
File "/base/data/home/apps/ourphotoevent/1.355028382237794762/handler.py", line 1104, in post
z.writestr(thefilename, value, date_time=createat.timetuple()[:6] )
File "/base/data/home/apps/ourphotoevent/1.355028382237794762/handler.py", line 1018, in writestr
self.write(zinfo.FileHeader())
File "/base/python_runtime/python_dist/lib/python2.5/zipfile.py", line 260, in FileHeader
return header + self.filename + extra
UnicodeDecodeError: 'ascii' codec can't decode byte 0x83 in position 11: ordinal not in range(128)
Any ideas??
@mdornseif
Thanks for your suggestion. I have tried amending the code above by wrapping the result from the blob reader in a str(): but I still get the same error.
for df in DocumentFile.all():
fn = df.blob.filename
data = str(blobstore.BlobReader(df.blob).read())
z.writestr(fn.encode('utf-8'), data, 2007,6,21,8,8,8))
I've faced the same issues, fixed them here (If you still need this :) link
Arne, have you tried it with a fixed data and without the timestamp?