From 2507e4432bb790530b89985da9cd2bf922477cf3 Mon Sep 17 00:00:00 2001 From: "mangini@chromium.org" Date: Wed, 12 Feb 2014 09:34:33 +0000 Subject: [PATCH] Add Google Cloud Storage support to the documentation AppEngine server using Cloud Storage official Python client library. With this CL, URL paths can be directly mapped to Cloud Storage buckets (http://developer.chrome.com/path/... -> gs://bucket/...) using content_storage.json configuration file. open-source-thrid-party-reviews@ team: this CL adds a third-party library to Chromium repo. The library will be used only in the documentation server and it is the official Google library to access Google Cloud Storage from a Google AppEngine application (docs server). BUG=338007 Review URL: https://codereview.chromium.org/139303023 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@250663 0039d316-1c4b-4281-b951-d872f2087c98 --- .gitignore | 1 + .../common/extensions/docs/server2/.gitignore | 1 + chrome/common/extensions/docs/server2/README | 27 + .../common/extensions/docs/server2/app.yaml | 2 +- .../docs/server2/app_engine_handler.py | 2 +- .../extensions/docs/server2/build_server.py | 3 + .../docs/server2/content_providers.py | 48 +- .../docs/server2/content_providers_test.py | 9 +- .../common/extensions/docs/server2/cron.yaml | 2 +- .../extensions/docs/server2/cron_servlet.py | 9 +- .../docs/server2/cron_servlet_test.py | 4 + .../docs/server2/extensions_paths.py | 4 + .../docs/server2/gcs_file_system.py | 120 +++ .../docs/server2/gcs_file_system_provider.py | 93 ++ .../docs/server2/instance_servlet.py | 4 +- .../extensions/docs/server2/patch_servlet.py | 2 + .../docs/server2/server_instance.py | 12 +- .../templates/json/content_providers.json | 48 +- .../google_appengine_cloudstorage/OWNERS | 3 + .../google_appengine_cloudstorage/README | 7 + .../README.chromium | 17 + .../cloudstorage/__init__.py | 29 + .../cloudstorage/api_utils.py | 315 +++++++ .../cloudstorage/cloudstorage_api.py | 448 +++++++++ .../cloudstorage/common.py | 409 +++++++++ .../cloudstorage/errors.py | 140 +++ .../cloudstorage/rest_api.py | 246 +++++ .../cloudstorage/storage_api.py | 865 ++++++++++++++++++ .../cloudstorage/test_utils.py | 25 + 29 files changed, 2853 insertions(+), 42 deletions(-) create mode 100644 chrome/common/extensions/docs/server2/gcs_file_system.py create mode 100644 chrome/common/extensions/docs/server2/gcs_file_system_provider.py create mode 100644 third_party/google_appengine_cloudstorage/OWNERS create mode 100644 third_party/google_appengine_cloudstorage/README create mode 100644 third_party/google_appengine_cloudstorage/README.chromium create mode 100644 third_party/google_appengine_cloudstorage/cloudstorage/__init__.py create mode 100644 third_party/google_appengine_cloudstorage/cloudstorage/api_utils.py create mode 100644 third_party/google_appengine_cloudstorage/cloudstorage/cloudstorage_api.py create mode 100644 third_party/google_appengine_cloudstorage/cloudstorage/common.py create mode 100644 third_party/google_appengine_cloudstorage/cloudstorage/errors.py create mode 100644 third_party/google_appengine_cloudstorage/cloudstorage/rest_api.py create mode 100644 third_party/google_appengine_cloudstorage/cloudstorage/storage_api.py create mode 100644 third_party/google_appengine_cloudstorage/cloudstorage/test_utils.py diff --git a/.gitignore b/.gitignore index 32467e93821a25..35f8562678804e 100644 --- a/.gitignore +++ b/.gitignore @@ -220,6 +220,7 @@ v8.log /third_party/gles2_conform /third_party/gnu_binutils/ /third_party/gold +/third_party/google_appengine_cloudstorage /third_party/google_toolbox_for_mac/src /third_party/googlemac /third_party/gperf diff --git a/chrome/common/extensions/docs/server2/.gitignore b/chrome/common/extensions/docs/server2/.gitignore index 76b510c710f091..ab971ca447bc4b 100644 --- a/chrome/common/extensions/docs/server2/.gitignore +++ b/chrome/common/extensions/docs/server2/.gitignore @@ -1 +1,2 @@ third_party/ +local_debug/ diff --git a/chrome/common/extensions/docs/server2/README b/chrome/common/extensions/docs/server2/README index 839070317a8be1..92689044753d46 100644 --- a/chrome/common/extensions/docs/server2/README +++ b/chrome/common/extensions/docs/server2/README @@ -21,6 +21,33 @@ be sufficient. If for some reason you want to test against the app engine SDK: 3. View docs at http://localhost:8080/(apps|extensions)/ +-------------------------------------------- +Using Google Cloud Storage content providers + +With preview.py: + + 1. create a directory "[...]/server2/local_debug/gcs/" for every + gcs bucket referenced in content_providers.json + + 2. copy files to the respective local bucket directories. Preview.py has + no access to the real Google Cloud Storage. + +With start_dev_server.py: + + 1. Install gsutils from https://developers.google.com/storage/docs/gsutil + + 2. Set gsutil accordingly to the official instructions. + + 3. Make sure you have permission to the GCS buckets specified in + content_providers.json by running "gsutil ls gs://bucketname" + + 4. Get an oauth token (see instructions at the comment of + gcs_file_system_provider.py) and save it to the file + "[...]/server2/local_debug/gcs_debug.conf" + +Remember that the step 4 needs to be repeated every 10 minutes or so, +because the oauth access token expires quickly. + -------------------- Deploying the Server diff --git a/chrome/common/extensions/docs/server2/app.yaml b/chrome/common/extensions/docs/server2/app.yaml index c691abcb92ab2c..3385ea1c7e98f9 100644 --- a/chrome/common/extensions/docs/server2/app.yaml +++ b/chrome/common/extensions/docs/server2/app.yaml @@ -1,5 +1,5 @@ application: chrome-apps-doc -version: 3-6-0 +version: 3-7-0 runtime: python27 api_version: 1 threadsafe: false diff --git a/chrome/common/extensions/docs/server2/app_engine_handler.py b/chrome/common/extensions/docs/server2/app_engine_handler.py index 68d57ece25117c..8d6ebad51d14f4 100644 --- a/chrome/common/extensions/docs/server2/app_engine_handler.py +++ b/chrome/common/extensions/docs/server2/app_engine_handler.py @@ -43,4 +43,4 @@ def get(self): self.response.status = response.status else: self.response.out.write('Internal server error') - self.response.status = 500 \ No newline at end of file + self.response.status = 500 diff --git a/chrome/common/extensions/docs/server2/build_server.py b/chrome/common/extensions/docs/server2/build_server.py index 0b3d3155ad0a38..83d06513572849 100755 --- a/chrome/common/extensions/docs/server2/build_server.py +++ b/chrome/common/extensions/docs/server2/build_server.py @@ -75,6 +75,9 @@ def main(): make_init=False) MakeInit(LOCAL_THIRD_PARTY_DIR) + CopyThirdParty(os.path.join(THIRD_PARTY_DIR, 'google_appengine_cloudstorage', + 'cloudstorage'), 'cloudstorage') + # To be able to use the Handlebar class we need this import in __init__.py. with open(os.path.join(LOCAL_THIRD_PARTY_DIR, 'handlebar', diff --git a/chrome/common/extensions/docs/server2/content_providers.py b/chrome/common/extensions/docs/server2/content_providers.py index 41105a8ab7d1ac..d1d95f835139de 100644 --- a/chrome/common/extensions/docs/server2/content_providers.py +++ b/chrome/common/extensions/docs/server2/content_providers.py @@ -3,12 +3,15 @@ # found in the LICENSE file. import logging +import os import traceback from chroot_file_system import ChrootFileSystem from content_provider import ContentProvider -from extensions_paths import CONTENT_PROVIDERS +import environment +from extensions_paths import CONTENT_PROVIDERS, LOCAL_DEBUG_DIR from future import Gettable, Future +from local_file_system import LocalFileSystem from third_party.json_schema_compiler.memoize import memoize @@ -39,11 +42,32 @@ class ContentProviders(object): def __init__(self, compiled_fs_factory, host_file_system, - github_file_system_provider): + github_file_system_provider, + gcs_file_system_provider): self._compiled_fs_factory = compiled_fs_factory self._host_file_system = host_file_system self._github_file_system_provider = github_file_system_provider - self._cache = compiled_fs_factory.ForJson(host_file_system) + self._gcs_file_system_provider = gcs_file_system_provider + self._cache = None + + # If running the devserver and there is a LOCAL_DEBUG_DIR, we + # will read the content_provider configuration from there instead + # of fetching it from SVN trunk or patch. + if environment.IsDevServer() and os.path.exists(LOCAL_DEBUG_DIR): + local_fs = LocalFileSystem(LOCAL_DEBUG_DIR) + conf_stat = None + try: + conf_stat = local_fs.Stat(CONTENT_PROVIDERS) + except: + pass + + if conf_stat: + logging.warn(("Using local debug folder (%s) for " + "content_provider.json configuration") % LOCAL_DEBUG_DIR) + self._cache = compiled_fs_factory.ForJson(local_fs) + + if not self._cache: + self._cache = compiled_fs_factory.ForJson(host_file_system) @memoize def GetByName(self, name): @@ -94,6 +118,20 @@ def _CreateContentProvider(self, name, config): return None file_system = ChrootFileSystem(self._host_file_system, chromium_config['dir']) + elif 'gcs' in config: + gcs_config = config['gcs'] + if 'bucket' not in gcs_config: + logging.error('%s: "gcs" must have a "bucket" property' % name) + return None + bucket = gcs_config['bucket'] + if not bucket.startswith('gs://'): + logging.error('%s: bucket %s should start with gs://' % (name, bucket)) + return None + bucket = bucket[len('gs://'):] + file_system = self._gcs_file_system_provider.Create(bucket) + if 'dir' in gcs_config: + file_system = ChrootFileSystem(file_system, gcs_config['dir']) + elif 'github' in config: github_config = config['github'] if 'owner' not in github_config or 'repo' not in github_config: @@ -103,9 +141,9 @@ def _CreateContentProvider(self, name, config): github_config['owner'], github_config['repo']) if 'dir' in github_config: file_system = ChrootFileSystem(file_system, github_config['dir']) + else: - logging.error( - '%s: content provider type "%s" not supported' % (name, type_)) + logging.error('%s: content provider type not supported' % name) return None return ContentProvider(name, diff --git a/chrome/common/extensions/docs/server2/content_providers_test.py b/chrome/common/extensions/docs/server2/content_providers_test.py index 7ced6da6885f3a..1dac2f03932a8d 100755 --- a/chrome/common/extensions/docs/server2/content_providers_test.py +++ b/chrome/common/extensions/docs/server2/content_providers_test.py @@ -9,6 +9,7 @@ from compiled_file_system import CompiledFileSystem from content_providers import ContentProviders from extensions_paths import EXTENSIONS +from gcs_file_system_provider import CloudStorageFileSystemProvider from object_store_creator import ObjectStoreCreator from test_file_system import TestFileSystem from test_util import DisableLogging @@ -102,10 +103,14 @@ class ContentProvidersTest(unittest.TestCase): def setUp(self): test_file_system = TestFileSystem(_FILE_SYSTEM_DATA, relative_to=EXTENSIONS) self._github_fs_provider = _MockGithubFileSystemProvider(test_file_system) + object_store_creator = ObjectStoreCreator.ForTest() + # TODO(mangini): create tests for GCS + self._gcs_fs_provider = CloudStorageFileSystemProvider(object_store_creator) self._content_providers = ContentProviders( - CompiledFileSystem.Factory(ObjectStoreCreator.ForTest()), + CompiledFileSystem.Factory(object_store_creator), test_file_system, - self._github_fs_provider) + self._github_fs_provider, + self._gcs_fs_provider) def testSimpleRootPath(self): provider = self._content_providers.GetByName('apples') diff --git a/chrome/common/extensions/docs/server2/cron.yaml b/chrome/common/extensions/docs/server2/cron.yaml index bfa1594fb20f32..00aa788f007fd6 100644 --- a/chrome/common/extensions/docs/server2/cron.yaml +++ b/chrome/common/extensions/docs/server2/cron.yaml @@ -2,4 +2,4 @@ cron: - description: Repopulates all cached data. url: /_cron schedule: every 5 minutes - target: 3-6-0 + target: 3-7-0 diff --git a/chrome/common/extensions/docs/server2/cron_servlet.py b/chrome/common/extensions/docs/server2/cron_servlet.py index efdab71fa6c7af..99b93b7a859a4e 100644 --- a/chrome/common/extensions/docs/server2/cron_servlet.py +++ b/chrome/common/extensions/docs/server2/cron_servlet.py @@ -15,6 +15,7 @@ from extensions_paths import EXAMPLES, PUBLIC_TEMPLATES, STATIC_DOCS from file_system_util import CreateURLsFromPaths from future import Gettable, Future +from gcs_file_system_provider import CloudStorageFileSystemProvider from github_file_system_provider import GithubFileSystemProvider from host_file_system_provider import HostFileSystemProvider from object_store_creator import ObjectStoreCreator @@ -101,6 +102,9 @@ def CreateHostFileSystemProvider(self, def CreateGithubFileSystemProvider(self, object_store_creator): return GithubFileSystemProvider(object_store_creator) + def CreateGCSFileSystemProvider(self, object_store_creator): + return CloudStorageFileSystemProvider(object_store_creator) + def GetAppVersion(self): return GetAppVersion() @@ -287,8 +291,11 @@ def _CreateServerInstance(self, revision): object_store_creator, max_trunk_revision=revision) github_file_system_provider = self._delegate.CreateGithubFileSystemProvider( object_store_creator) + gcs_file_system_provider = self._delegate.CreateGCSFileSystemProvider( + object_store_creator) return ServerInstance(object_store_creator, CompiledFileSystem.Factory(object_store_creator), branch_utility, host_file_system_provider, - github_file_system_provider) + github_file_system_provider, + gcs_file_system_provider) diff --git a/chrome/common/extensions/docs/server2/cron_servlet_test.py b/chrome/common/extensions/docs/server2/cron_servlet_test.py index 2dcc34ca666b18..d54848456f64da 100755 --- a/chrome/common/extensions/docs/server2/cron_servlet_test.py +++ b/chrome/common/extensions/docs/server2/cron_servlet_test.py @@ -13,6 +13,7 @@ from extensions_paths import ( APP_YAML, CONTENT_PROVIDERS, EXTENSIONS, PUBLIC_TEMPLATES, SERVER2, STATIC_DOCS) +from gcs_file_system_provider import CloudStorageFileSystemProvider from github_file_system_provider import GithubFileSystemProvider from host_file_system_provider import HostFileSystemProvider from local_file_system import LocalFileSystem @@ -50,6 +51,9 @@ def constructor(branch=None, revision=None): def CreateGithubFileSystemProvider(self, object_store_creator): return GithubFileSystemProvider.ForEmpty() + def CreateGCSFileSystemProvider(self, object_store_creator): + return CloudStorageFileSystemProvider.ForEmpty() + def GetAppVersion(self): return self._app_version diff --git a/chrome/common/extensions/docs/server2/extensions_paths.py b/chrome/common/extensions/docs/server2/extensions_paths.py index 001f131fecacaf..c6969d5240453c 100644 --- a/chrome/common/extensions/docs/server2/extensions_paths.py +++ b/chrome/common/extensions/docs/server2/extensions_paths.py @@ -30,3 +30,7 @@ PUBLIC_TEMPLATES = join(TEMPLATES, 'public/') CONTENT_PROVIDERS = join(JSON_TEMPLATES, 'content_providers.json') + +LOCAL_DEBUG_DIR = join(SERVER2, 'local_debug/') +LOCAL_GCS_DIR = join(LOCAL_DEBUG_DIR, 'gcs/') +LOCAL_GCS_DEBUG_CONF = join(LOCAL_DEBUG_DIR, 'gcs_debug.conf') diff --git a/chrome/common/extensions/docs/server2/gcs_file_system.py b/chrome/common/extensions/docs/server2/gcs_file_system.py new file mode 100644 index 00000000000000..d1c3820fe7226f --- /dev/null +++ b/chrome/common/extensions/docs/server2/gcs_file_system.py @@ -0,0 +1,120 @@ +# Copyright 2014 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +from third_party.cloudstorage import cloudstorage_api +from third_party.cloudstorage import common +from third_party.cloudstorage import errors + +from docs_server_utils import StringIdentity +from file_system import FileSystem, FileNotFoundError, StatInfo +from future import Gettable, Future + +import logging +import traceback + +'''See gcs_file_system_provider.py for documentation on using Google Cloud +Storage as a filesystem. +''' +def _ReadFile(filename): + try: + with cloudstorage_api.open(filename, 'r') as f: + return f.read() + except errors.Error: + raise FileNotFoundError('Read failed for %s: %s' % (filename, + traceback.format_exc())) + +def _ListDir(dir_name): + try: + files = cloudstorage_api.listbucket(dir_name) + return [os_path.filename for os_path in files] + except errors.Error: + raise FileNotFoundError('cloudstorage.listbucket failed for %s: %s' % + (dir_name, traceback.format_exc())) + +def _CreateStatInfo(bucket, path): + bucket = '/%s' % bucket + full_path = '/'.join( (bucket, path.lstrip('/')) ) + try: + if full_path.endswith('/'): + child_versions = dict() + version = 0 + # Fetching stats for all files under full_path, recursively. The + # listbucket method uses a prefix approach to simulate hierarchy, + # but calling it without the "delimiter" argument searches for prefix, + # which means, for directories, everything beneath it. + for _file in cloudstorage_api.listbucket(full_path): + if not _file.is_dir: + # GCS doesn't have metadata for dirs + child_stat = cloudstorage_api.stat('%s' % _file.filename).st_ctime + filename = _file.filename[len(bucket)+1:] + child_versions[filename] = child_stat + version = max(version, child_stat) + else: + child_versions = None + version = cloudstorage_api.stat(full_path).st_ctime + return StatInfo(version, child_versions) + except (TypeError, errors.Error): + raise FileNotFoundError('cloudstorage.stat failed for %s: %s' % (path, + traceback.format_exc())) + + +class CloudStorageFileSystem(FileSystem): + '''FileSystem implementation which fetches resources from Google Cloud + Storage. + ''' + def __init__(self, bucket, debug_access_token=None, debug_bucket_prefix=None): + self._bucket = bucket + if debug_access_token: + logging.debug('gcs: using debug access token: %s' % debug_access_token) + common.set_access_token(debug_access_token) + if debug_bucket_prefix: + logging.debug('gcs: prefixing all bucket names with %s' % + debug_bucket_prefix) + self._bucket = debug_bucket_prefix + self._bucket + + def Read(self, paths): + def resolve(): + try: + result = {} + for path in paths: + full_path = '/%s/%s' % (self._bucket, path.lstrip('/')) + logging.debug('gcs: requested path %s, reading %s' % + (path, full_path)) + if path == '' or path.endswith('/'): + result[path] = _ListDir(full_path) + else: + result[path] = _ReadFile(full_path) + return result + except errors.AuthorizationError: + self._warnAboutAuthError() + raise + + return Future(delegate=Gettable(resolve)) + + def Refresh(self): + return Future(value=()) + + def Stat(self, path): + try: + return _CreateStatInfo(self._bucket, path) + except errors.AuthorizationError: + self._warnAboutAuthError() + raise + + def GetIdentity(self): + return '@'.join((self.__class__.__name__, StringIdentity(self._bucket))) + + def __repr__(self): + return 'LocalFileSystem(%s)' % self._bucket + + def _warnAboutAuthError(self): + logging.warn(('Authentication error on Cloud Storage. Check if your' + ' appengine project has permissions to Read the GCS' + ' buckets. If you are running a local appengine server,' + ' you need to set an access_token in' + ' local_debug/gcs_debug.conf.' + ' Remember that this token expires in less than 10' + ' minutes, so keep it updated. See' + ' gcs_file_system_provider.py for instructions.')); + logging.debug(traceback.format_exc()) diff --git a/chrome/common/extensions/docs/server2/gcs_file_system_provider.py b/chrome/common/extensions/docs/server2/gcs_file_system_provider.py new file mode 100644 index 00000000000000..3ea535d1299f5d --- /dev/null +++ b/chrome/common/extensions/docs/server2/gcs_file_system_provider.py @@ -0,0 +1,93 @@ +# Copyright 2014 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +import os +import environment + +from empty_dir_file_system import EmptyDirFileSystem +from extensions_paths import LOCAL_GCS_DIR, LOCAL_GCS_DEBUG_CONF +from local_file_system import LocalFileSystem +from path_util import IsDirectory + +class CloudStorageFileSystemProvider(object): + '''Provides CloudStorageFileSystem bound to a GCS bucket. + ''' + def __init__(self, object_store_creator): + self._object_store_creator = object_store_creator + + def Create(self, bucket): + '''Creates a CloudStorageFileSystemProvider. + + |bucket| is the name of GCS bucket, eg devtools-docs. It is expected + that this bucket has Read permission for this app in its ACLs. + + Optional configuration can be set in a local_debug/gcs_debug.conf file: + use_local_fs=True|False + access_token= + remote_bucket_prefix= + + If running in Preview mode or in Development mode with use_local_fs set to + True, buckets and files are looked inside the local_debug folder instead + of in the real GCS server. Preview server does not support direct GCS + access, so it is always forced to use a LocalFileSystem. + + For real GCS access in the Development mode (dev_appserver.py), + access_token and remote_bucket_prefix options can be + used to change the way GCS files are accessed. Both are ignored in a real + appengine instance. + + "access_token" is always REQUIRED on dev_appengine, otherwise you will + get 404 (auth) errors. You can get one access_token valid for a few minutes + by typing: + gsutil -d ls 2>&1 | grep "Bearer" | + sed "s/.*Bearer \(.*\).r.nUser-Agent.*/access_token=\1/" )" + + A sample output would be: + access_token=ya29.1.AADtN_VW5ibbfLHV5cMIK5ss4bHtVzBXpa4byjd + + Now add this line to the local_debug/gcs_debug.conf file and restart the + appengine development server. + + Remember that you will need a new access_token every ten minutes or + so. If you get 404 errors on log, update it. Access token is not + used for a deployed appengine app, only if you use dev_appengine.py. + + remote_bucket_prefix is useful if you want to test on your own GCS buckets + before using the real GCS buckets. + + ''' + if not environment.IsReleaseServer() and not environment.IsDevServer(): + bucket_local_path = os.path.join(LOCAL_GCS_DIR, bucket) + if IsDirectory(bucket_local_path): + return LocalFileSystem(bucket_local_path) + else: + return EmptyDirFileSystem() + + debug_access_token = None + debug_bucket_prefix = None + use_local_fs = False + + if environment.IsDevServer() and os.path.exists(LOCAL_GCS_DEBUG_CONF): + with open(LOCAL_GCS_DEBUG_CONF, "r") as token_file: + properties = dict(line.strip().split('=', 1) for line in token_file) + use_local_fs = properties.get('use_local_fs', 'False')=='True' + debug_access_token = properties.get('access_token', None) + debug_bucket_prefix = properties.get('remote_bucket_prefix', None) + + if environment.IsDevServer() and use_local_fs: + return LocalFileSystem(os.path.join(LOCAL_GCS_DIR, bucket)) + + # gcs_file_system has strong dependencies on runtime appengine APIs, + # so we only import it when we are sure we are not on preview.py or tests. + from gcs_file_system import CloudStorageFileSystem + return CloudStorageFileSystem( + bucket, debug_access_token, debug_bucket_prefix) + + + @staticmethod + def ForEmpty(): + class EmptyImpl(object): + def Create(self, bucket): + return EmptyDirFileSystem() + return EmptyImpl() diff --git a/chrome/common/extensions/docs/server2/instance_servlet.py b/chrome/common/extensions/docs/server2/instance_servlet.py index 058ee5cd2eefc0..5b3f7e8739c844 100644 --- a/chrome/common/extensions/docs/server2/instance_servlet.py +++ b/chrome/common/extensions/docs/server2/instance_servlet.py @@ -11,6 +11,7 @@ from render_servlet import RenderServlet from object_store_creator import ObjectStoreCreator from server_instance import ServerInstance +from gcs_file_system_provider import CloudStorageFileSystemProvider class InstanceServletRenderServletDelegate(RenderServlet.Delegate): '''AppEngine instances should never need to call out to SVN. That should only @@ -45,7 +46,8 @@ def CreateServerInstance(self): CompiledFileSystem.Factory(object_store_creator), branch_utility, host_file_system_provider, - github_file_system_provider) + github_file_system_provider, + CloudStorageFileSystemProvider(object_store_creator)) class InstanceServlet(object): '''Servlet for running on normal AppEngine instances. diff --git a/chrome/common/extensions/docs/server2/patch_servlet.py b/chrome/common/extensions/docs/server2/patch_servlet.py index 9f55d978f0a820..f003c4552b8db8 100644 --- a/chrome/common/extensions/docs/server2/patch_servlet.py +++ b/chrome/common/extensions/docs/server2/patch_servlet.py @@ -19,6 +19,7 @@ from server_instance import ServerInstance from servlet import Request, Response, Servlet import url_constants +from gcs_file_system_provider import CloudStorageFileSystemProvider class _PatchServletDelegate(RenderServlet.Delegate): @@ -63,6 +64,7 @@ def CreateServerInstance(self): branch_utility, patched_host_file_system_provider, self._delegate.CreateGithubFileSystemProvider(object_store_creator), + CloudStorageFileSystemProvider(object_store_creator), base_path='/_patch/%s/' % self._issue) # HACK: if content_providers.json changes in this patch then the cron needs diff --git a/chrome/common/extensions/docs/server2/server_instance.py b/chrome/common/extensions/docs/server2/server_instance.py index dca70ba91b9615..61ee914c9705ec 100644 --- a/chrome/common/extensions/docs/server2/server_instance.py +++ b/chrome/common/extensions/docs/server2/server_instance.py @@ -13,9 +13,10 @@ from empty_dir_file_system import EmptyDirFileSystem from environment import IsDevServer from features_bundle import FeaturesBundle +from gcs_file_system_provider import CloudStorageFileSystemProvider from github_file_system_provider import GithubFileSystemProvider -from host_file_system_provider import HostFileSystemProvider from host_file_system_iterator import HostFileSystemIterator +from host_file_system_provider import HostFileSystemProvider from object_store_creator import ObjectStoreCreator from reference_resolver import ReferenceResolver from samples_data_source import SamplesDataSource @@ -33,6 +34,7 @@ def __init__(self, branch_utility, host_file_system_provider, github_file_system_provider, + gcs_file_system_provider, base_path='/'): ''' |object_store_creator| @@ -61,6 +63,7 @@ def __init__(self, host_fs_at_trunk = host_file_system_provider.GetTrunk() self.github_file_system_provider = github_file_system_provider + self.gcs_file_system_provider = gcs_file_system_provider assert base_path.startswith('/') and base_path.endswith('/') self.base_path = base_path @@ -136,7 +139,8 @@ def __init__(self, self.content_providers = ContentProviders( self.compiled_fs_factory, host_fs_at_trunk, - self.github_file_system_provider) + self.github_file_system_provider, + self.gcs_file_system_provider) # TODO(kalman): Move all the remaining DataSources into DataSourceRegistry, # then factor out the DataSource creation into a factory method, so that @@ -170,6 +174,7 @@ def ForTest(file_system=None, file_system_provider=None, base_path='/'): TestBranchUtility.CreateWithCannedData(), file_system_provider, GithubFileSystemProvider.ForEmpty(), + CloudStorageFileSystemProvider(object_store_creator), base_path=base_path) @staticmethod @@ -183,4 +188,5 @@ def ForLocal(): CompiledFileSystem.Factory(object_store_creator), TestBranchUtility.CreateWithCannedData(), host_file_system_provider, - GithubFileSystemProvider.ForEmpty()) + GithubFileSystemProvider.ForEmpty(), + CloudStorageFileSystemProvider(object_store_creator)) diff --git a/chrome/common/extensions/docs/templates/json/content_providers.json b/chrome/common/extensions/docs/templates/json/content_providers.json index afdedc5ee231d2..c1a97dc88753a0 100644 --- a/chrome/common/extensions/docs/templates/json/content_providers.json +++ b/chrome/common/extensions/docs/templates/json/content_providers.json @@ -74,32 +74,26 @@ }, "serveFrom": "native-client", "supportsTemplates": true + }, + "devtools-docs": { + "gcs": { + "bucket": "gs://chromedocs-devtools" + }, + "serveFrom": "devtools", + "supportsTemplates": true + }, + "multidevice-docs": { + "gcs": { + "bucket": "gs://chromedocs-multidevice" + }, + "serveFrom": "multidevice", + "supportsTemplates": true + }, + "webstore-docs": { + "gcs": { + "bucket": "gs://chromedocs-webstore" + }, + "serveFrom": "webstore", + "supportsTemplates": true } - - // GitHub is not working at the moment. Disable entirely. - // - //"devtools-docs": { - // "github": { - // "owner": "GoogleChrome", - // "repo": "devtools-docs-migration" - // }, - // "serveFrom": "devtools", - // "supportsTemplates": true - //}, - //"multidevice-docs": { - // "github": { - // "owner": "GoogleChrome", - // "repo": "multi-device" - // }, - // "serveFrom": "multidevice", - // "supportsTemplates": true - //}, - //"webstore-docs": { - // "github": { - // "owner": "GoogleChrome", - // "repo": "webstore-docs" - // }, - // "serveFrom": "webstore", - // "supportsTemplates": true - //} } diff --git a/third_party/google_appengine_cloudstorage/OWNERS b/third_party/google_appengine_cloudstorage/OWNERS new file mode 100644 index 00000000000000..c0546aa24a7c13 --- /dev/null +++ b/third_party/google_appengine_cloudstorage/OWNERS @@ -0,0 +1,3 @@ +kalman@chromium.org +jyasskin@chromium.org +mangini@chromium.org diff --git a/third_party/google_appengine_cloudstorage/README b/third_party/google_appengine_cloudstorage/README new file mode 100644 index 00000000000000..bcfa9f69fddec1 --- /dev/null +++ b/third_party/google_appengine_cloudstorage/README @@ -0,0 +1,7 @@ +AppEngine Google Cloud Storage clients +====================================== + +Official site: http://code.google.com/p/appengine-gcs-client/ + +Check the site for up to date status, latest version, getting started & user +guides. diff --git a/third_party/google_appengine_cloudstorage/README.chromium b/third_party/google_appengine_cloudstorage/README.chromium new file mode 100644 index 00000000000000..073f575519a3f3 --- /dev/null +++ b/third_party/google_appengine_cloudstorage/README.chromium @@ -0,0 +1,17 @@ +Name: Google AppEngine Cloud Storage Python client +Short Name: appengine-gcs-client +URL: https://code.google.com/p/appengine-gcs-client/source +Date: 2014-Feb-14 +Revision: trunk +License: Apache 2.0 +License File: NOT_SHIPPED +Security Critical: no + +Description: +Python libraries to access Google Cloud Storage buckets from Google AppEngine +server, used in the documentation server to access parts of docs that are +hosted in Google Cloud Storage. + +Local Modifications: +- removed test and demo directories +- removed MANIFEST.in, setup.py and distribute_setup.py diff --git a/third_party/google_appengine_cloudstorage/cloudstorage/__init__.py b/third_party/google_appengine_cloudstorage/cloudstorage/__init__.py new file mode 100644 index 00000000000000..349a021a55783e --- /dev/null +++ b/third_party/google_appengine_cloudstorage/cloudstorage/__init__.py @@ -0,0 +1,29 @@ +# Copyright 2014 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Client Library for Google Cloud Storage.""" + + + + +from .api_utils import RetryParams +from .api_utils import set_default_retry_params +from cloudstorage_api import * +from .common import CSFileStat +from .common import GCSFileStat +from .common import validate_bucket_name +from .common import validate_bucket_path +from .common import validate_file_path +from errors import * +from storage_api import * diff --git a/third_party/google_appengine_cloudstorage/cloudstorage/api_utils.py b/third_party/google_appengine_cloudstorage/cloudstorage/api_utils.py new file mode 100644 index 00000000000000..49092d089f40b3 --- /dev/null +++ b/third_party/google_appengine_cloudstorage/cloudstorage/api_utils.py @@ -0,0 +1,315 @@ +# Copyright 2013 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Util functions and classes for cloudstorage_api.""" + + + +__all__ = ['set_default_retry_params', + 'RetryParams', + ] + +import copy +import httplib +import logging +import math +import os +import threading +import time +import urllib + + +try: + from google.appengine.api import urlfetch + from google.appengine.datastore import datastore_rpc + from google.appengine.ext.ndb import eventloop + from google.appengine.ext.ndb import utils + from google.appengine import runtime + from google.appengine.runtime import apiproxy_errors +except ImportError: + from google.appengine.api import urlfetch + from google.appengine.datastore import datastore_rpc + from google.appengine import runtime + from google.appengine.runtime import apiproxy_errors + from google.appengine.ext.ndb import eventloop + from google.appengine.ext.ndb import utils + + +_RETRIABLE_EXCEPTIONS = (urlfetch.DownloadError, + apiproxy_errors.Error) + +_thread_local_settings = threading.local() +_thread_local_settings.default_retry_params = None + + +def set_default_retry_params(retry_params): + """Set a default RetryParams for current thread current request.""" + _thread_local_settings.default_retry_params = copy.copy(retry_params) + + +def _get_default_retry_params(): + """Get default RetryParams for current request and current thread. + + Returns: + A new instance of the default RetryParams. + """ + default = getattr(_thread_local_settings, 'default_retry_params', None) + if default is None or not default.belong_to_current_request(): + return RetryParams() + else: + return copy.copy(default) + + +def _quote_filename(filename): + """Quotes filename to use as a valid URI path. + + Args: + filename: user provided filename. /bucket/filename. + + Returns: + The filename properly quoted to use as URI's path component. + """ + return urllib.quote(filename) + + +def _unquote_filename(filename): + """Unquotes a valid URI path back to its filename. + + This is the opposite of _quote_filename. + + Args: + filename: a quoted filename. /bucket/some%20filename. + + Returns: + The filename unquoted. + """ + return urllib.unquote(filename) + + +def _should_retry(resp): + """Given a urlfetch response, decide whether to retry that request.""" + return (resp.status_code == httplib.REQUEST_TIMEOUT or + (resp.status_code >= 500 and + resp.status_code < 600)) + + +class RetryParams(object): + """Retry configuration parameters.""" + + @datastore_rpc._positional(1) + def __init__(self, + backoff_factor=2.0, + initial_delay=0.1, + max_delay=10.0, + min_retries=2, + max_retries=5, + max_retry_period=30.0, + urlfetch_timeout=None, + save_access_token=False): + """Init. + + This object is unique per request per thread. + + Library will retry according to this setting when App Engine Server + can't call urlfetch, urlfetch timed out, or urlfetch got a 408 or + 500-600 response. + + Args: + backoff_factor: exponential backoff multiplier. + initial_delay: seconds to delay for the first retry. + max_delay: max seconds to delay for every retry. + min_retries: min number of times to retry. This value is automatically + capped by max_retries. + max_retries: max number of times to retry. Set this to 0 for no retry. + max_retry_period: max total seconds spent on retry. Retry stops when + this period passed AND min_retries has been attempted. + urlfetch_timeout: timeout for urlfetch in seconds. Could be None, + in which case the value will be chosen by urlfetch module. + save_access_token: persist access token to datastore to avoid + excessive usage of GetAccessToken API. Usually the token is cached + in process and in memcache. In some cases, memcache isn't very + reliable. + """ + self.backoff_factor = self._check('backoff_factor', backoff_factor) + self.initial_delay = self._check('initial_delay', initial_delay) + self.max_delay = self._check('max_delay', max_delay) + self.max_retry_period = self._check('max_retry_period', max_retry_period) + self.max_retries = self._check('max_retries', max_retries, True, int) + self.min_retries = self._check('min_retries', min_retries, True, int) + if self.min_retries > self.max_retries: + self.min_retries = self.max_retries + + self.urlfetch_timeout = None + if urlfetch_timeout is not None: + self.urlfetch_timeout = self._check('urlfetch_timeout', urlfetch_timeout) + self.save_access_token = self._check('save_access_token', save_access_token, + True, bool) + + self._request_id = os.getenv('REQUEST_LOG_ID') + + def __eq__(self, other): + if not isinstance(other, self.__class__): + return False + return self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not self.__eq__(other) + + @classmethod + def _check(cls, name, val, can_be_zero=False, val_type=float): + """Check init arguments. + + Args: + name: name of the argument. For logging purpose. + val: value. Value has to be non negative number. + can_be_zero: whether value can be zero. + val_type: Python type of the value. + + Returns: + The value. + + Raises: + ValueError: when invalid value is passed in. + TypeError: when invalid value type is passed in. + """ + valid_types = [val_type] + if val_type is float: + valid_types.append(int) + + if type(val) not in valid_types: + raise TypeError( + 'Expect type %s for parameter %s' % (val_type.__name__, name)) + if val < 0: + raise ValueError( + 'Value for parameter %s has to be greater than 0' % name) + if not can_be_zero and val == 0: + raise ValueError( + 'Value for parameter %s can not be 0' % name) + return val + + def belong_to_current_request(self): + return os.getenv('REQUEST_LOG_ID') == self._request_id + + def delay(self, n, start_time): + """Calculate delay before the next retry. + + Args: + n: the number of current attempt. The first attempt should be 1. + start_time: the time when retry started in unix time. + + Returns: + Number of seconds to wait before next retry. -1 if retry should give up. + """ + if (n > self.max_retries or + (n > self.min_retries and + time.time() - start_time > self.max_retry_period)): + return -1 + return min( + math.pow(self.backoff_factor, n-1) * self.initial_delay, + self.max_delay) + + +def _retry_fetch(url, retry_params, **kwds): + """A blocking fetch function similar to urlfetch.fetch. + + This function should be used when a urlfetch has timed out or the response + shows http request timeout. This function will put current thread to + sleep between retry backoffs. + + Args: + url: url to fetch. + retry_params: an instance of RetryParams. + **kwds: keyword arguments for urlfetch. If deadline is specified in kwds, + it precedes the one in RetryParams. If none is specified, it's up to + urlfetch to use its own default. + + Returns: + A urlfetch response from the last retry. None if no retry was attempted. + + Raises: + Whatever exception encountered during the last retry. + """ + n = 1 + start_time = time.time() + delay = retry_params.delay(n, start_time) + if delay <= 0: + return + + logging.info('Will retry request to %s.', url) + while delay > 0: + resp = None + try: + logging.info('Retry in %s seconds.', delay) + time.sleep(delay) + resp = urlfetch.fetch(url, **kwds) + except runtime.DeadlineExceededError: + logging.info( + 'Urlfetch retry %s will exceed request deadline ' + 'after %s seconds total', n, time.time() - start_time) + raise + except _RETRIABLE_EXCEPTIONS, e: + pass + + n += 1 + delay = retry_params.delay(n, start_time) + if resp and not _should_retry(resp): + break + elif resp: + logging.info( + 'Got status %s from GCS.', resp.status_code) + else: + logging.info( + 'Got exception "%r" while contacting GCS.', e) + + if resp: + return resp + + logging.info('Urlfetch failed after %s retries and %s seconds in total.', + n - 1, time.time() - start_time) + raise + + +def _run_until_rpc(): + """Eagerly evaluate tasklets until it is blocking on some RPC. + + Usually ndb eventloop el isn't run until some code calls future.get_result(). + + When an async tasklet is called, the tasklet wrapper evaluates the tasklet + code into a generator, enqueues a callback _help_tasklet_along onto + the el.current queue, and returns a future. + + _help_tasklet_along, when called by the el, will + get one yielded value from the generator. If the value if another future, + set up a callback _on_future_complete to invoke _help_tasklet_along + when the dependent future fulfills. If the value if a RPC, set up a + callback _on_rpc_complete to invoke _help_tasklet_along when the RPC fulfills. + Thus _help_tasklet_along drills down + the chain of futures until some future is blocked by RPC. El runs + all callbacks and constantly check pending RPC status. + """ + el = eventloop.get_event_loop() + while el.current: + el.run0() + + +def _eager_tasklet(tasklet): + """Decorator to turn tasklet to run eagerly.""" + + @utils.wrapping(tasklet) + def eager_wrapper(*args, **kwds): + fut = tasklet(*args, **kwds) + _run_until_rpc() + return fut + + return eager_wrapper diff --git a/third_party/google_appengine_cloudstorage/cloudstorage/cloudstorage_api.py b/third_party/google_appengine_cloudstorage/cloudstorage/cloudstorage_api.py new file mode 100644 index 00000000000000..51b4c842b798d9 --- /dev/null +++ b/third_party/google_appengine_cloudstorage/cloudstorage/cloudstorage_api.py @@ -0,0 +1,448 @@ +# Copyright 2012 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""File Interface for Google Cloud Storage.""" + + + +from __future__ import with_statement + + + +__all__ = ['delete', + 'listbucket', + 'open', + 'stat', + ] + +import logging +import StringIO +import urllib +import xml.etree.cElementTree as ET +from . import api_utils +from . import common +from . import errors +from . import storage_api + + + +def open(filename, + mode='r', + content_type=None, + options=None, + read_buffer_size=storage_api.ReadBuffer.DEFAULT_BUFFER_SIZE, + retry_params=None, + _account_id=None): + """Opens a Google Cloud Storage file and returns it as a File-like object. + + Args: + filename: A Google Cloud Storage filename of form '/bucket/filename'. + mode: 'r' for reading mode. 'w' for writing mode. + In reading mode, the file must exist. In writing mode, a file will + be created or be overrode. + content_type: The MIME type of the file. str. Only valid in writing mode. + options: A str->basestring dict to specify additional headers to pass to + GCS e.g. {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}. + Supported options are x-goog-acl, x-goog-meta-, cache-control, + content-disposition, and content-encoding. + Only valid in writing mode. + See https://developers.google.com/storage/docs/reference-headers + for details. + read_buffer_size: The buffer size for read. Read keeps a buffer + and prefetches another one. To minimize blocking for large files, + always read by buffer size. To minimize number of RPC requests for + small files, set a large buffer size. Max is 30MB. + retry_params: An instance of api_utils.RetryParams for subsequent calls + to GCS from this file handle. If None, the default one is used. + _account_id: Internal-use only. + + Returns: + A reading or writing buffer that supports File-like interface. Buffer + must be closed after operations are done. + + Raises: + errors.AuthorizationError: if authorization failed. + errors.NotFoundError: if an object that's expected to exist doesn't. + ValueError: invalid open mode or if content_type or options are specified + in reading mode. + """ + common.validate_file_path(filename) + api = storage_api._get_storage_api(retry_params=retry_params, + account_id=_account_id) + filename = api_utils._quote_filename(filename) + + if mode == 'w': + common.validate_options(options) + return storage_api.StreamingBuffer(api, filename, content_type, options) + elif mode == 'r': + if content_type or options: + raise ValueError('Options and content_type can only be specified ' + 'for writing mode.') + return storage_api.ReadBuffer(api, + filename, + buffer_size=read_buffer_size) + else: + raise ValueError('Invalid mode %s.' % mode) + + +def delete(filename, retry_params=None, _account_id=None): + """Delete a Google Cloud Storage file. + + Args: + filename: A Google Cloud Storage filename of form '/bucket/filename'. + retry_params: An api_utils.RetryParams for this call to GCS. If None, + the default one is used. + _account_id: Internal-use only. + + Raises: + errors.NotFoundError: if the file doesn't exist prior to deletion. + """ + api = storage_api._get_storage_api(retry_params=retry_params, + account_id=_account_id) + common.validate_file_path(filename) + filename = api_utils._quote_filename(filename) + status, resp_headers, _ = api.delete_object(filename) + errors.check_status(status, [204], filename, resp_headers=resp_headers) + + +def stat(filename, retry_params=None, _account_id=None): + """Get GCSFileStat of a Google Cloud storage file. + + Args: + filename: A Google Cloud Storage filename of form '/bucket/filename'. + retry_params: An api_utils.RetryParams for this call to GCS. If None, + the default one is used. + _account_id: Internal-use only. + + Returns: + a GCSFileStat object containing info about this file. + + Raises: + errors.AuthorizationError: if authorization failed. + errors.NotFoundError: if an object that's expected to exist doesn't. + """ + common.validate_file_path(filename) + api = storage_api._get_storage_api(retry_params=retry_params, + account_id=_account_id) + status, headers, _ = api.head_object(api_utils._quote_filename(filename)) + errors.check_status(status, [200], filename, resp_headers=headers) + file_stat = common.GCSFileStat( + filename=filename, + st_size=headers.get('content-length'), + st_ctime=common.http_time_to_posix(headers.get('last-modified')), + etag=headers.get('etag'), + content_type=headers.get('content-type'), + metadata=common.get_metadata(headers)) + + return file_stat + + +def _copy2(src, dst, metadata=None, retry_params=None): + """Copy the file content from src to dst. + + Internal use only! + + Args: + src: /bucket/filename + dst: /bucket/filename + metadata: a dict of metadata for this copy. If None, old metadata is copied. + For example, {'x-goog-meta-foo': 'bar'}. + retry_params: An api_utils.RetryParams for this call to GCS. If None, + the default one is used. + + Raises: + errors.AuthorizationError: if authorization failed. + errors.NotFoundError: if an object that's expected to exist doesn't. + """ + common.validate_file_path(src) + common.validate_file_path(dst) + + if metadata is None: + metadata = {} + copy_meta = 'COPY' + else: + copy_meta = 'REPLACE' + metadata.update({'x-goog-copy-source': src, + 'x-goog-metadata-directive': copy_meta}) + + api = storage_api._get_storage_api(retry_params=retry_params) + status, resp_headers, _ = api.put_object( + api_utils._quote_filename(dst), headers=metadata) + errors.check_status(status, [200], src, metadata, resp_headers) + + +def listbucket(path_prefix, marker=None, prefix=None, max_keys=None, + delimiter=None, retry_params=None, _account_id=None): + """Returns a GCSFileStat iterator over a bucket. + + Optional arguments can limit the result to a subset of files under bucket. + + This function has two modes: + 1. List bucket mode: Lists all files in the bucket without any concept of + hierarchy. GCS doesn't have real directory hierarchies. + 2. Directory emulation mode: If you specify the 'delimiter' argument, + it is used as a path separator to emulate a hierarchy of directories. + In this mode, the "path_prefix" argument should end in the delimiter + specified (thus designates a logical directory). The logical directory's + contents, both files and subdirectories, are listed. The names of + subdirectories returned will end with the delimiter. So listbucket + can be called with the subdirectory name to list the subdirectory's + contents. + + Args: + path_prefix: A Google Cloud Storage path of format "/bucket" or + "/bucket/prefix". Only objects whose fullpath starts with the + path_prefix will be returned. + marker: Another path prefix. Only objects whose fullpath starts + lexicographically after marker will be returned (exclusive). + prefix: Deprecated. Use path_prefix. + max_keys: The limit on the number of objects to return. int. + For best performance, specify max_keys only if you know how many objects + you want. Otherwise, this method requests large batches and handles + pagination for you. + delimiter: Use to turn on directory mode. str of one or multiple chars + that your bucket uses as its directory separator. + retry_params: An api_utils.RetryParams for this call to GCS. If None, + the default one is used. + _account_id: Internal-use only. + + Examples: + For files "/bucket/a", + "/bucket/bar/1" + "/bucket/foo", + "/bucket/foo/1", "/bucket/foo/2/1", "/bucket/foo/3/1", + + Regular mode: + listbucket("/bucket/f", marker="/bucket/foo/1") + will match "/bucket/foo/2/1", "/bucket/foo/3/1". + + Directory mode: + listbucket("/bucket/", delimiter="/") + will match "/bucket/a, "/bucket/bar/" "/bucket/foo", "/bucket/foo/". + listbucket("/bucket/foo/", delimiter="/") + will match "/bucket/foo/1", "/bucket/foo/2/", "/bucket/foo/3/" + + Returns: + Regular mode: + A GCSFileStat iterator over matched files ordered by filename. + The iterator returns GCSFileStat objects. filename, etag, st_size, + st_ctime, and is_dir are set. + + Directory emulation mode: + A GCSFileStat iterator over matched files and directories ordered by + name. The iterator returns GCSFileStat objects. For directories, + only the filename and is_dir fields are set. + + The last name yielded can be used as next call's marker. + """ + if prefix: + common.validate_bucket_path(path_prefix) + bucket = path_prefix + else: + bucket, prefix = common._process_path_prefix(path_prefix) + + if marker and marker.startswith(bucket): + marker = marker[len(bucket) + 1:] + + api = storage_api._get_storage_api(retry_params=retry_params, + account_id=_account_id) + options = {} + if marker: + options['marker'] = marker + if max_keys: + options['max-keys'] = max_keys + if prefix: + options['prefix'] = prefix + if delimiter: + options['delimiter'] = delimiter + + return _Bucket(api, bucket, options) + + +class _Bucket(object): + """A wrapper for a GCS bucket as the return value of listbucket.""" + + def __init__(self, api, path, options): + """Initialize. + + Args: + api: storage_api instance. + path: bucket path of form '/bucket'. + options: a dict of listbucket options. Please see listbucket doc. + """ + self._init(api, path, options) + + def _init(self, api, path, options): + self._api = api + self._path = path + self._options = options.copy() + self._get_bucket_fut = self._api.get_bucket_async( + self._path + '?' + urllib.urlencode(self._options)) + self._last_yield = None + self._new_max_keys = self._options.get('max-keys') + + def __getstate__(self): + options = self._options + if self._last_yield: + options['marker'] = self._last_yield.filename[len(self._path) + 1:] + if self._new_max_keys is not None: + options['max-keys'] = self._new_max_keys + return {'api': self._api, + 'path': self._path, + 'options': options} + + def __setstate__(self, state): + self._init(state['api'], state['path'], state['options']) + + def __iter__(self): + """Iter over the bucket. + + Yields: + GCSFileStat: a GCSFileStat for an object in the bucket. + They are ordered by GCSFileStat.filename. + """ + total = 0 + max_keys = self._options.get('max-keys') + + while self._get_bucket_fut: + status, resp_headers, content = self._get_bucket_fut.get_result() + errors.check_status(status, [200], self._path, resp_headers=resp_headers, + extras=self._options) + + if self._should_get_another_batch(content): + self._get_bucket_fut = self._api.get_bucket_async( + self._path + '?' + urllib.urlencode(self._options)) + else: + self._get_bucket_fut = None + + root = ET.fromstring(content) + dirs = self._next_dir_gen(root) + files = self._next_file_gen(root) + next_file = files.next() + next_dir = dirs.next() + + while ((max_keys is None or total < max_keys) and + not (next_file is None and next_dir is None)): + total += 1 + if next_file is None: + self._last_yield = next_dir + next_dir = dirs.next() + elif next_dir is None: + self._last_yield = next_file + next_file = files.next() + elif next_dir < next_file: + self._last_yield = next_dir + next_dir = dirs.next() + elif next_file < next_dir: + self._last_yield = next_file + next_file = files.next() + else: + logging.error( + 'Should never reach. next file is %r. next dir is %r.', + next_file, next_dir) + if self._new_max_keys: + self._new_max_keys -= 1 + yield self._last_yield + + def _next_file_gen(self, root): + """Generator for next file element in the document. + + Args: + root: root element of the XML tree. + + Yields: + GCSFileStat for the next file. + """ + for e in root.getiterator(common._T_CONTENTS): + st_ctime, size, etag, key = None, None, None, None + for child in e.getiterator('*'): + if child.tag == common._T_LAST_MODIFIED: + st_ctime = common.dt_str_to_posix(child.text) + elif child.tag == common._T_ETAG: + etag = child.text + elif child.tag == common._T_SIZE: + size = child.text + elif child.tag == common._T_KEY: + key = child.text + yield common.GCSFileStat(self._path + '/' + key, + size, etag, st_ctime) + e.clear() + yield None + + def _next_dir_gen(self, root): + """Generator for next directory element in the document. + + Args: + root: root element in the XML tree. + + Yields: + GCSFileStat for the next directory. + """ + for e in root.getiterator(common._T_COMMON_PREFIXES): + yield common.GCSFileStat( + self._path + '/' + e.find(common._T_PREFIX).text, + st_size=None, etag=None, st_ctime=None, is_dir=True) + e.clear() + yield None + + def _should_get_another_batch(self, content): + """Whether to issue another GET bucket call. + + Args: + content: response XML. + + Returns: + True if should, also update self._options for the next request. + False otherwise. + """ + if ('max-keys' in self._options and + self._options['max-keys'] <= common._MAX_GET_BUCKET_RESULT): + return False + + elements = self._find_elements( + content, set([common._T_IS_TRUNCATED, + common._T_NEXT_MARKER])) + if elements.get(common._T_IS_TRUNCATED, 'false').lower() != 'true': + return False + + next_marker = elements.get(common._T_NEXT_MARKER) + if next_marker is None: + self._options.pop('marker', None) + return False + self._options['marker'] = next_marker + return True + + def _find_elements(self, result, elements): + """Find interesting elements from XML. + + This function tries to only look for specified elements + without parsing the entire XML. The specified elements is better + located near the beginning. + + Args: + result: response XML. + elements: a set of interesting element tags. + + Returns: + A dict from element tag to element value. + """ + element_mapping = {} + result = StringIO.StringIO(result) + for _, e in ET.iterparse(result, events=('end',)): + if not elements: + break + if e.tag in elements: + element_mapping[e.tag] = e.text + elements.remove(e.tag) + return element_mapping diff --git a/third_party/google_appengine_cloudstorage/cloudstorage/common.py b/third_party/google_appengine_cloudstorage/cloudstorage/common.py new file mode 100644 index 00000000000000..9976b919f7c981 --- /dev/null +++ b/third_party/google_appengine_cloudstorage/cloudstorage/common.py @@ -0,0 +1,409 @@ +# Copyright 2012 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Helpers shared by cloudstorage_stub and cloudstorage_api.""" + + + + + +__all__ = ['CS_XML_NS', + 'CSFileStat', + 'dt_str_to_posix', + 'local_api_url', + 'LOCAL_GCS_ENDPOINT', + 'local_run', + 'get_access_token', + 'get_metadata', + 'GCSFileStat', + 'http_time_to_posix', + 'memory_usage', + 'posix_time_to_http', + 'posix_to_dt_str', + 'set_access_token', + 'validate_options', + 'validate_bucket_name', + 'validate_bucket_path', + 'validate_file_path', + ] + + +import calendar +import datetime +from email import utils as email_utils +import logging +import os +import re + +try: + from google.appengine.api import runtime +except ImportError: + from google.appengine.api import runtime + + +_GCS_BUCKET_REGEX_BASE = r'[a-z0-9\.\-_]{3,63}' +_GCS_BUCKET_REGEX = re.compile(_GCS_BUCKET_REGEX_BASE + r'$') +_GCS_BUCKET_PATH_REGEX = re.compile(r'/' + _GCS_BUCKET_REGEX_BASE + r'$') +_GCS_PATH_PREFIX_REGEX = re.compile(r'/' + _GCS_BUCKET_REGEX_BASE + r'.*') +_GCS_FULLPATH_REGEX = re.compile(r'/' + _GCS_BUCKET_REGEX_BASE + r'/.*') +_GCS_METADATA = ['x-goog-meta-', + 'content-disposition', + 'cache-control', + 'content-encoding'] +_GCS_OPTIONS = _GCS_METADATA + ['x-goog-acl'] +CS_XML_NS = 'http://doc.s3.amazonaws.com/2006-03-01' +LOCAL_GCS_ENDPOINT = '/_ah/gcs' +_access_token = '' + + +_MAX_GET_BUCKET_RESULT = 1000 + + +def set_access_token(access_token): + """Set the shared access token to authenticate with Google Cloud Storage. + + When set, the library will always attempt to communicate with the + real Google Cloud Storage with this token even when running on dev appserver. + Note the token could expire so it's up to you to renew it. + + When absent, the library will automatically request and refresh a token + on appserver, or when on dev appserver, talk to a Google Cloud Storage + stub. + + Args: + access_token: you can get one by run 'gsutil -d ls' and copy the + str after 'Bearer'. + """ + global _access_token + _access_token = access_token + + +def get_access_token(): + """Returns the shared access token.""" + return _access_token + + +class GCSFileStat(object): + """Container for GCS file stat.""" + + def __init__(self, + filename, + st_size, + etag, + st_ctime, + content_type=None, + metadata=None, + is_dir=False): + """Initialize. + + For files, the non optional arguments are always set. + For directories, only filename and is_dir is set. + + Args: + filename: a Google Cloud Storage filename of form '/bucket/filename'. + st_size: file size in bytes. long compatible. + etag: hex digest of the md5 hash of the file's content. str. + st_ctime: posix file creation time. float compatible. + content_type: content type. str. + metadata: a str->str dict of user specified options when creating + the file. Possible keys are x-goog-meta-, content-disposition, + content-encoding, and cache-control. + is_dir: True if this represents a directory. False if this is a real file. + """ + self.filename = filename + self.is_dir = is_dir + self.st_size = None + self.st_ctime = None + self.etag = None + self.content_type = content_type + self.metadata = metadata + + if not is_dir: + self.st_size = long(st_size) + self.st_ctime = float(st_ctime) + if etag[0] == '"' and etag[-1] == '"': + etag = etag[1:-1] + self.etag = etag + + def __repr__(self): + if self.is_dir: + return '(directory: %s)' % self.filename + + return ( + '(filename: %(filename)s, st_size: %(st_size)s, ' + 'st_ctime: %(st_ctime)s, etag: %(etag)s, ' + 'content_type: %(content_type)s, ' + 'metadata: %(metadata)s)' % + dict(filename=self.filename, + st_size=self.st_size, + st_ctime=self.st_ctime, + etag=self.etag, + content_type=self.content_type, + metadata=self.metadata)) + + def __cmp__(self, other): + if not isinstance(other, self.__class__): + raise ValueError('Argument to cmp must have the same type. ' + 'Expect %s, got %s', self.__class__.__name__, + other.__class__.__name__) + if self.filename > other.filename: + return 1 + elif self.filename < other.filename: + return -1 + return 0 + + def __hash__(self): + if self.etag: + return hash(self.etag) + return hash(self.filename) + + +CSFileStat = GCSFileStat + + +def get_metadata(headers): + """Get user defined options from HTTP response headers.""" + return dict((k, v) for k, v in headers.iteritems() + if any(k.lower().startswith(valid) for valid in _GCS_METADATA)) + + +def validate_bucket_name(name): + """Validate a Google Storage bucket name. + + Args: + name: a Google Storage bucket name with no prefix or suffix. + + Raises: + ValueError: if name is invalid. + """ + _validate_path(name) + if not _GCS_BUCKET_REGEX.match(name): + raise ValueError('Bucket should be 3-63 characters long using only a-z,' + '0-9, underscore, dash or dot but got %s' % name) + + +def validate_bucket_path(path): + """Validate a Google Cloud Storage bucket path. + + Args: + path: a Google Storage bucket path. It should have form '/bucket'. + + Raises: + ValueError: if path is invalid. + """ + _validate_path(path) + if not _GCS_BUCKET_PATH_REGEX.match(path): + raise ValueError('Bucket should have format /bucket ' + 'but got %s' % path) + + +def validate_file_path(path): + """Validate a Google Cloud Storage file path. + + Args: + path: a Google Storage file path. It should have form '/bucket/filename'. + + Raises: + ValueError: if path is invalid. + """ + _validate_path(path) + if not _GCS_FULLPATH_REGEX.match(path): + raise ValueError('Path should have format /bucket/filename ' + 'but got %s' % path) + + +def _process_path_prefix(path_prefix): + """Validate and process a Google Cloud Stoarge path prefix. + + Args: + path_prefix: a Google Cloud Storage path prefix of format '/bucket/prefix' + or '/bucket/' or '/bucket'. + + Raises: + ValueError: if path is invalid. + + Returns: + a tuple of /bucket and prefix. prefix can be None. + """ + _validate_path(path_prefix) + if not _GCS_PATH_PREFIX_REGEX.match(path_prefix): + raise ValueError('Path prefix should have format /bucket, /bucket/, ' + 'or /bucket/prefix but got %s.' % path_prefix) + bucket_name_end = path_prefix.find('/', 1) + bucket = path_prefix + prefix = None + if bucket_name_end != -1: + bucket = path_prefix[:bucket_name_end] + prefix = path_prefix[bucket_name_end + 1:] or None + return bucket, prefix + + +def _validate_path(path): + """Basic validation of Google Storage paths. + + Args: + path: a Google Storage path. It should have form '/bucket/filename' + or '/bucket'. + + Raises: + ValueError: if path is invalid. + TypeError: if path is not of type basestring. + """ + if not path: + raise ValueError('Path is empty') + if not isinstance(path, basestring): + raise TypeError('Path should be a string but is %s (%s).' % + (path.__class__, path)) + + +def validate_options(options): + """Validate Google Cloud Storage options. + + Args: + options: a str->basestring dict of options to pass to Google Cloud Storage. + + Raises: + ValueError: if option is not supported. + TypeError: if option is not of type str or value of an option + is not of type basestring. + """ + if not options: + return + + for k, v in options.iteritems(): + if not isinstance(k, str): + raise TypeError('option %r should be a str.' % k) + if not any(k.lower().startswith(valid) for valid in _GCS_OPTIONS): + raise ValueError('option %s is not supported.' % k) + if not isinstance(v, basestring): + raise TypeError('value %r for option %s should be of type basestring.' % + (v, k)) + + +def http_time_to_posix(http_time): + """Convert HTTP time format to posix time. + + See http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.3.1 + for http time format. + + Args: + http_time: time in RFC 2616 format. e.g. + "Mon, 20 Nov 1995 19:12:08 GMT". + + Returns: + A float of secs from unix epoch. + """ + if http_time is not None: + return email_utils.mktime_tz(email_utils.parsedate_tz(http_time)) + + +def posix_time_to_http(posix_time): + """Convert posix time to HTML header time format. + + Args: + posix_time: unix time. + + Returns: + A datatime str in RFC 2616 format. + """ + if posix_time: + return email_utils.formatdate(posix_time, usegmt=True) + + +_DT_FORMAT = '%Y-%m-%dT%H:%M:%S' + + +def dt_str_to_posix(dt_str): + """format str to posix. + + datetime str is of format %Y-%m-%dT%H:%M:%S.%fZ, + e.g. 2013-04-12T00:22:27.978Z. According to ISO 8601, T is a separator + between date and time when they are on the same line. + Z indicates UTC (zero meridian). + + A pointer: http://www.cl.cam.ac.uk/~mgk25/iso-time.html + + This is used to parse LastModified node from GCS's GET bucket XML response. + + Args: + dt_str: A datetime str. + + Returns: + A float of secs from unix epoch. By posix definition, epoch is midnight + 1970/1/1 UTC. + """ + parsable, _ = dt_str.split('.') + dt = datetime.datetime.strptime(parsable, _DT_FORMAT) + return calendar.timegm(dt.utctimetuple()) + + +def posix_to_dt_str(posix): + """Reverse of str_to_datetime. + + This is used by GCS stub to generate GET bucket XML response. + + Args: + posix: A float of secs from unix epoch. + + Returns: + A datetime str. + """ + dt = datetime.datetime.utcfromtimestamp(posix) + dt_str = dt.strftime(_DT_FORMAT) + return dt_str + '.000Z' + + +def local_run(): + """Whether we should hit GCS dev appserver stub.""" + server_software = os.environ.get('SERVER_SOFTWARE') + if server_software is None: + return True + if 'remote_api' in server_software: + return False + if server_software.startswith(('Development', 'testutil')): + return True + return False + + +def local_api_url(): + """Return URL for GCS emulation on dev appserver.""" + return 'http://%s%s' % (os.environ.get('HTTP_HOST'), LOCAL_GCS_ENDPOINT) + + +def memory_usage(method): + """Log memory usage before and after a method.""" + def wrapper(*args, **kwargs): + logging.info('Memory before method %s is %s.', + method.__name__, runtime.memory_usage().current()) + result = method(*args, **kwargs) + logging.info('Memory after method %s is %s', + method.__name__, runtime.memory_usage().current()) + return result + return wrapper + + +def _add_ns(tagname): + return '{%(ns)s}%(tag)s' % {'ns': CS_XML_NS, + 'tag': tagname} + + +_T_CONTENTS = _add_ns('Contents') +_T_LAST_MODIFIED = _add_ns('LastModified') +_T_ETAG = _add_ns('ETag') +_T_KEY = _add_ns('Key') +_T_SIZE = _add_ns('Size') +_T_PREFIX = _add_ns('Prefix') +_T_COMMON_PREFIXES = _add_ns('CommonPrefixes') +_T_NEXT_MARKER = _add_ns('NextMarker') +_T_IS_TRUNCATED = _add_ns('IsTruncated') diff --git a/third_party/google_appengine_cloudstorage/cloudstorage/errors.py b/third_party/google_appengine_cloudstorage/cloudstorage/errors.py new file mode 100644 index 00000000000000..9f116ac86dd5cf --- /dev/null +++ b/third_party/google_appengine_cloudstorage/cloudstorage/errors.py @@ -0,0 +1,140 @@ +# Copyright 2012 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Google Cloud Storage specific Files API calls.""" + + + + + +__all__ = ['AuthorizationError', + 'check_status', + 'Error', + 'FatalError', + 'FileClosedError', + 'ForbiddenError', + 'NotFoundError', + 'ServerError', + 'TimeoutError', + 'TransientError', + ] + +import httplib + + +class Error(Exception): + """Base error for all gcs operations. + + Error can happen on GAE side or GCS server side. + For details on a particular GCS HTTP response code, see + https://developers.google.com/storage/docs/reference-status#standardcodes + """ + + +class TransientError(Error): + """TransientError could be retried.""" + + +class TimeoutError(TransientError): + """HTTP 408 timeout.""" + + +class FatalError(Error): + """FatalError shouldn't be retried.""" + + +class FileClosedError(FatalError): + """File is already closed. + + This can happen when the upload has finished but 'write' is called on + a stale upload handle. + """ + + +class NotFoundError(FatalError): + """HTTP 404 resource not found.""" + + +class ForbiddenError(FatalError): + """HTTP 403 Forbidden. + + While GCS replies with a 403 error for many reasons, the most common one + is due to bucket permission not correctly setup for your app to access. + """ + + +class AuthorizationError(FatalError): + """HTTP 401 authentication required. + + Unauthorized request has been received by GCS. + + This error is mostly handled by GCS client. GCS client will request + a new access token and retry the request. + """ + + +class InvalidRange(FatalError): + """HTTP 416 RequestRangeNotSatifiable.""" + + +class ServerError(TransientError): + """HTTP >= 500 server side error.""" + + +def check_status(status, expected, path, headers=None, + resp_headers=None, extras=None): + """Check HTTP response status is expected. + + Args: + status: HTTP response status. int. + expected: a list of expected statuses. A list of ints. + path: filename or a path prefix. + headers: HTTP request headers. + resp_headers: HTTP response headers. + extras: extra info to be logged verbatim if error occurs. + + Raises: + AuthorizationError: if authorization failed. + NotFoundError: if an object that's expected to exist doesn't. + TimeoutError: if HTTP request timed out. + ServerError: if server experienced some errors. + FatalError: if any other unexpected errors occurred. + """ + if status in expected: + return + + msg = ('Expect status %r from Google Storage. But got status %d.\n' + 'Path: %r.\n' + 'Request headers: %r.\n' + 'Response headers: %r.\n' + 'Extra info: %r.\n' % + (expected, status, path, headers, resp_headers, extras)) + + if status == httplib.UNAUTHORIZED: + raise AuthorizationError(msg) + elif status == httplib.FORBIDDEN: + raise ForbiddenError(msg) + elif status == httplib.NOT_FOUND: + raise NotFoundError(msg) + elif status == httplib.REQUEST_TIMEOUT: + raise TimeoutError(msg) + elif status == httplib.REQUESTED_RANGE_NOT_SATISFIABLE: + raise InvalidRange(msg) + elif (status == httplib.OK and 308 in expected and + httplib.OK not in expected): + raise FileClosedError(msg) + elif status >= 500: + raise ServerError(msg) + else: + raise FatalError(msg) diff --git a/third_party/google_appengine_cloudstorage/cloudstorage/rest_api.py b/third_party/google_appengine_cloudstorage/cloudstorage/rest_api.py new file mode 100644 index 00000000000000..ed0683c105a4c5 --- /dev/null +++ b/third_party/google_appengine_cloudstorage/cloudstorage/rest_api.py @@ -0,0 +1,246 @@ +# Copyright 2012 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Base and helper classes for Google RESTful APIs.""" + + + + + +__all__ = ['add_sync_methods'] + +import httplib +import random +import time + +from . import api_utils + +try: + from google.appengine.api import app_identity + from google.appengine.ext import ndb +except ImportError: + from google.appengine.api import app_identity + from google.appengine.ext import ndb + + +def _make_sync_method(name): + """Helper to synthesize a synchronous method from an async method name. + + Used by the @add_sync_methods class decorator below. + + Args: + name: The name of the synchronous method. + + Returns: + A method (with first argument 'self') that retrieves and calls + self., passing its own arguments, expects it to return a + Future, and then waits for and returns that Future's result. + """ + + def sync_wrapper(self, *args, **kwds): + method = getattr(self, name) + future = method(*args, **kwds) + return future.get_result() + + return sync_wrapper + + +def add_sync_methods(cls): + """Class decorator to add synchronous methods corresponding to async methods. + + This modifies the class in place, adding additional methods to it. + If a synchronous method of a given name already exists it is not + replaced. + + Args: + cls: A class. + + Returns: + The same class, modified in place. + """ + for name in cls.__dict__.keys(): + if name.endswith('_async'): + sync_name = name[:-6] + if not hasattr(cls, sync_name): + setattr(cls, sync_name, _make_sync_method(name)) + return cls + + +class _AE_TokenStorage_(ndb.Model): + """Entity to store app_identity tokens in memcache.""" + + token = ndb.StringProperty() + expires = ndb.FloatProperty() + + +@ndb.tasklet +def _make_token_async(scopes, service_account_id): + """Get a fresh authentication token. + + Args: + scopes: A list of scopes. + service_account_id: Internal-use only. + + Returns: + An tuple (token, expiration_time) where expiration_time is + seconds since the epoch. + """ + rpc = app_identity.create_rpc() + app_identity.make_get_access_token_call(rpc, scopes, service_account_id) + token, expires_at = yield rpc + raise ndb.Return((token, expires_at)) + + +class _RestApi(object): + """Base class for REST-based API wrapper classes. + + This class manages authentication tokens and request retries. All + APIs are available as synchronous and async methods; synchronous + methods are synthesized from async ones by the add_sync_methods() + function in this module. + + WARNING: Do NOT directly use this api. It's an implementation detail + and is subject to change at any release. + """ + + _TOKEN_EXPIRATION_HEADROOM = random.randint(60, 600) + + def __init__(self, scopes, service_account_id=None, token_maker=None, + retry_params=None): + """Constructor. + + Args: + scopes: A scope or a list of scopes. + token_maker: An asynchronous function of the form + (scopes, service_account_id) -> (token, expires). + retry_params: An instance of api_utils.RetryParams. If None, the + default for current thread will be used. + service_account_id: Internal use only. + """ + + if isinstance(scopes, basestring): + scopes = [scopes] + self.scopes = scopes + self.service_account_id = service_account_id + self.make_token_async = token_maker or _make_token_async + self.token = None + if not retry_params: + retry_params = api_utils._get_default_retry_params() + self.retry_params = retry_params + + def __getstate__(self): + """Store state as part of serialization/pickling.""" + return {'token': self.token, + 'scopes': self.scopes, + 'id': self.service_account_id, + 'a_maker': None if self.make_token_async == _make_token_async + else self.make_token_async, + 'retry_params': self.retry_params} + + def __setstate__(self, state): + """Restore state as part of deserialization/unpickling.""" + self.__init__(state['scopes'], + service_account_id=state['id'], + token_maker=state['a_maker'], + retry_params=state['retry_params']) + self.token = state['token'] + + @ndb.tasklet + def do_request_async(self, url, method='GET', headers=None, payload=None, + deadline=None, callback=None): + """Issue one HTTP request. + + This is an async wrapper around urlfetch(). It adds an authentication + header and retries on a 401 status code. Upon other retriable errors, + it performs blocking retries. + """ + headers = {} if headers is None else dict(headers) + if self.token is None: + self.token = yield self.get_token_async() + headers['authorization'] = 'OAuth ' + self.token + + deadline = deadline or self.retry_params.urlfetch_timeout + + retry = False + resp = None + try: + resp = yield self.urlfetch_async(url, payload=payload, method=method, + headers=headers, follow_redirects=False, + deadline=deadline, callback=callback) + if resp.status_code == httplib.UNAUTHORIZED: + self.token = yield self.get_token_async(refresh=True) + headers['authorization'] = 'OAuth ' + self.token + resp = yield self.urlfetch_async( + url, payload=payload, method=method, headers=headers, + follow_redirects=False, deadline=deadline, callback=callback) + except api_utils._RETRIABLE_EXCEPTIONS: + retry = True + else: + retry = api_utils._should_retry(resp) + + if retry: + retry_resp = api_utils._retry_fetch( + url, retry_params=self.retry_params, payload=payload, method=method, + headers=headers, follow_redirects=False, deadline=deadline) + if retry_resp: + resp = retry_resp + elif not resp: + raise + + raise ndb.Return((resp.status_code, resp.headers, resp.content)) + + @ndb.tasklet + def get_token_async(self, refresh=False): + """Get an authentication token. + + The token is cached in memcache, keyed by the scopes argument. + + Args: + refresh: If True, ignore a cached token; default False. + + Returns: + An authentication token. + """ + if self.token is not None and not refresh: + raise ndb.Return(self.token) + key = '%s,%s' % (self.service_account_id, ','.join(self.scopes)) + ts = yield _AE_TokenStorage_.get_by_id_async( + key, use_cache=True, use_memcache=True, + use_datastore=self.retry_params.save_access_token) + if ts is None or ts.expires < (time.time() + + self._TOKEN_EXPIRATION_HEADROOM): + token, expires_at = yield self.make_token_async( + self.scopes, self.service_account_id) + timeout = int(expires_at - time.time()) + ts = _AE_TokenStorage_(id=key, token=token, expires=expires_at) + if timeout > 0: + yield ts.put_async(memcache_timeout=timeout, + use_datastore=self.retry_params.save_access_token, + use_cache=True, use_memcache=True) + self.token = ts.token + raise ndb.Return(self.token) + + def urlfetch_async(self, url, **kwds): + """Make an async urlfetch() call. + + This just passes the url and keyword arguments to NDB's async + urlfetch() wrapper in the current context. + + This returns a Future despite not being decorated with @ndb.tasklet! + """ + ctx = ndb.get_context() + return ctx.urlfetch(url, **kwds) + + +_RestApi = add_sync_methods(_RestApi) diff --git a/third_party/google_appengine_cloudstorage/cloudstorage/storage_api.py b/third_party/google_appengine_cloudstorage/cloudstorage/storage_api.py new file mode 100644 index 00000000000000..3971741dd704bf --- /dev/null +++ b/third_party/google_appengine_cloudstorage/cloudstorage/storage_api.py @@ -0,0 +1,865 @@ +# Copyright 2012 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Python wrappers for the Google Storage RESTful API.""" + + + + + +__all__ = ['ReadBuffer', + 'StreamingBuffer', + ] + +import collections +import logging +import os +import urlparse + +from . import api_utils +from . import common +from . import errors +from . import rest_api + +try: + from google.appengine.api import urlfetch + from google.appengine.ext import ndb +except ImportError: + from google.appengine.api import urlfetch + from google.appengine.ext import ndb + + + +def _get_storage_api(retry_params, account_id=None): + """Returns storage_api instance for API methods. + + Args: + retry_params: An instance of api_utils.RetryParams. If none, + thread's default will be used. + account_id: Internal-use only. + + Returns: + A storage_api instance to handle urlfetch work to GCS. + On dev appserver, this instance by default will talk to a local stub + unless common.ACCESS_TOKEN is set. That token will be used to talk + to the real GCS. + """ + + + api = _StorageApi(_StorageApi.full_control_scope, + service_account_id=account_id, + retry_params=retry_params) + if common.local_run() and not common.get_access_token(): + api.api_url = common.local_api_url() + if common.get_access_token(): + api.token = common.get_access_token() + return api + + +class _StorageApi(rest_api._RestApi): + """A simple wrapper for the Google Storage RESTful API. + + WARNING: Do NOT directly use this api. It's an implementation detail + and is subject to change at any release. + + All async methods have similar args and returns. + + Args: + path: The path to the Google Storage object or bucket, e.g. + '/mybucket/myfile' or '/mybucket'. + **kwd: Options for urlfetch. e.g. + headers={'content-type': 'text/plain'}, payload='blah'. + + Returns: + A ndb Future. When fulfilled, future.get_result() should return + a tuple of (status, headers, content) that represents a HTTP response + of Google Cloud Storage XML API. + """ + + api_url = 'https://storage.googleapis.com' + read_only_scope = 'https://www.googleapis.com/auth/devstorage.read_only' + read_write_scope = 'https://www.googleapis.com/auth/devstorage.read_write' + full_control_scope = 'https://www.googleapis.com/auth/devstorage.full_control' + + def __getstate__(self): + """Store state as part of serialization/pickling. + + Returns: + A tuple (of dictionaries) with the state of this object + """ + return (super(_StorageApi, self).__getstate__(), {'api_url': self.api_url}) + + def __setstate__(self, state): + """Restore state as part of deserialization/unpickling. + + Args: + state: the tuple from a __getstate__ call + """ + superstate, localstate = state + super(_StorageApi, self).__setstate__(superstate) + self.api_url = localstate['api_url'] + + @api_utils._eager_tasklet + @ndb.tasklet + def do_request_async(self, url, method='GET', headers=None, payload=None, + deadline=None, callback=None): + """Inherit docs. + + This method translates urlfetch exceptions to more service specific ones. + """ + if headers is None: + headers = {} + if 'x-goog-api-version' not in headers: + headers['x-goog-api-version'] = '2' + headers['accept-encoding'] = 'gzip, *' + try: + resp_tuple = yield super(_StorageApi, self).do_request_async( + url, method=method, headers=headers, payload=payload, + deadline=deadline, callback=callback) + except urlfetch.DownloadError, e: + raise errors.TimeoutError( + 'Request to Google Cloud Storage timed out.', e) + + raise ndb.Return(resp_tuple) + + + def post_object_async(self, path, **kwds): + """POST to an object.""" + return self.do_request_async(self.api_url + path, 'POST', **kwds) + + def put_object_async(self, path, **kwds): + """PUT an object.""" + return self.do_request_async(self.api_url + path, 'PUT', **kwds) + + def get_object_async(self, path, **kwds): + """GET an object. + + Note: No payload argument is supported. + """ + return self.do_request_async(self.api_url + path, 'GET', **kwds) + + def delete_object_async(self, path, **kwds): + """DELETE an object. + + Note: No payload argument is supported. + """ + return self.do_request_async(self.api_url + path, 'DELETE', **kwds) + + def head_object_async(self, path, **kwds): + """HEAD an object. + + Depending on request headers, HEAD returns various object properties, + e.g. Content-Length, Last-Modified, and ETag. + + Note: No payload argument is supported. + """ + return self.do_request_async(self.api_url + path, 'HEAD', **kwds) + + def get_bucket_async(self, path, **kwds): + """GET a bucket.""" + return self.do_request_async(self.api_url + path, 'GET', **kwds) + + +_StorageApi = rest_api.add_sync_methods(_StorageApi) + + +class ReadBuffer(object): + """A class for reading Google storage files.""" + + DEFAULT_BUFFER_SIZE = 1024 * 1024 + MAX_REQUEST_SIZE = 30 * DEFAULT_BUFFER_SIZE + + def __init__(self, + api, + path, + buffer_size=DEFAULT_BUFFER_SIZE, + max_request_size=MAX_REQUEST_SIZE): + """Constructor. + + Args: + api: A StorageApi instance. + path: Quoted/escaped path to the object, e.g. /mybucket/myfile + buffer_size: buffer size. The ReadBuffer keeps + one buffer. But there may be a pending future that contains + a second buffer. This size must be less than max_request_size. + max_request_size: Max bytes to request in one urlfetch. + """ + self._api = api + self._path = path + self.name = api_utils._unquote_filename(path) + self.closed = False + + assert buffer_size <= max_request_size + self._buffer_size = buffer_size + self._max_request_size = max_request_size + self._offset = 0 + self._buffer = _Buffer() + self._etag = None + + self._request_next_buffer() + + status, headers, _ = self._api.head_object(path) + errors.check_status(status, [200], path, resp_headers=headers) + self._file_size = long(headers['content-length']) + self._check_etag(headers.get('etag')) + if self._file_size == 0: + self._buffer_future = None + + def __getstate__(self): + """Store state as part of serialization/pickling. + + The contents of the read buffer are not stored, only the current offset for + data read by the client. A new read buffer is established at unpickling. + The head information for the object (file size and etag) are stored to + reduce startup and ensure the file has not changed. + + Returns: + A dictionary with the state of this object + """ + return {'api': self._api, + 'path': self._path, + 'buffer_size': self._buffer_size, + 'request_size': self._max_request_size, + 'etag': self._etag, + 'size': self._file_size, + 'offset': self._offset, + 'closed': self.closed} + + def __setstate__(self, state): + """Restore state as part of deserialization/unpickling. + + Args: + state: the dictionary from a __getstate__ call + + Along with restoring the state, pre-fetch the next read buffer. + """ + self._api = state['api'] + self._path = state['path'] + self.name = api_utils._unquote_filename(self._path) + self._buffer_size = state['buffer_size'] + self._max_request_size = state['request_size'] + self._etag = state['etag'] + self._file_size = state['size'] + self._offset = state['offset'] + self._buffer = _Buffer() + self.closed = state['closed'] + self._buffer_future = None + if self._remaining() and not self.closed: + self._request_next_buffer() + + def __iter__(self): + """Iterator interface. + + Note the ReadBuffer container itself is the iterator. It's + (quote PEP0234) + 'destructive: they consumes all the values and a second iterator + cannot easily be created that iterates independently over the same values. + You could open the file for the second time, or seek() to the beginning.' + + Returns: + Self. + """ + return self + + def next(self): + line = self.readline() + if not line: + raise StopIteration() + return line + + def readline(self, size=-1): + """Read one line delimited by '\n' from the file. + + A trailing newline character is kept in the string. It may be absent when a + file ends with an incomplete line. If the size argument is non-negative, + it specifies the maximum string size (counting the newline) to return. + A negative size is the same as unspecified. Empty string is returned + only when EOF is encountered immediately. + + Args: + size: Maximum number of bytes to read. If not specified, readline stops + only on '\n' or EOF. + + Returns: + The data read as a string. + + Raises: + IOError: When this buffer is closed. + """ + self._check_open() + if size == 0 or not self._remaining(): + return '' + + data_list = [] + newline_offset = self._buffer.find_newline(size) + while newline_offset < 0: + data = self._buffer.read(size) + size -= len(data) + self._offset += len(data) + data_list.append(data) + if size == 0 or not self._remaining(): + return ''.join(data_list) + self._buffer.reset(self._buffer_future.get_result()) + self._request_next_buffer() + newline_offset = self._buffer.find_newline(size) + + data = self._buffer.read_to_offset(newline_offset + 1) + self._offset += len(data) + data_list.append(data) + + return ''.join(data_list) + + def read(self, size=-1): + """Read data from RAW file. + + Args: + size: Number of bytes to read as integer. Actual number of bytes + read is always equal to size unless EOF is reached. If size is + negative or unspecified, read the entire file. + + Returns: + data read as str. + + Raises: + IOError: When this buffer is closed. + """ + self._check_open() + if not self._remaining(): + return '' + + data_list = [] + while True: + remaining = self._buffer.remaining() + if size >= 0 and size < remaining: + data_list.append(self._buffer.read(size)) + self._offset += size + break + else: + size -= remaining + self._offset += remaining + data_list.append(self._buffer.read()) + + if self._buffer_future is None: + if size < 0 or size >= self._remaining(): + needs = self._remaining() + else: + needs = size + data_list.extend(self._get_segments(self._offset, needs)) + self._offset += needs + break + + if self._buffer_future: + self._buffer.reset(self._buffer_future.get_result()) + self._buffer_future = None + + if self._buffer_future is None: + self._request_next_buffer() + return ''.join(data_list) + + def _remaining(self): + return self._file_size - self._offset + + def _request_next_buffer(self): + """Request next buffer. + + Requires self._offset and self._buffer are in consistent state + """ + self._buffer_future = None + next_offset = self._offset + self._buffer.remaining() + if not hasattr(self, '_file_size') or next_offset != self._file_size: + self._buffer_future = self._get_segment(next_offset, + self._buffer_size) + + def _get_segments(self, start, request_size): + """Get segments of the file from Google Storage as a list. + + A large request is broken into segments to avoid hitting urlfetch + response size limit. Each segment is returned from a separate urlfetch. + + Args: + start: start offset to request. Inclusive. Have to be within the + range of the file. + request_size: number of bytes to request. + + Returns: + A list of file segments in order + """ + if not request_size: + return [] + + end = start + request_size + futures = [] + + while request_size > self._max_request_size: + futures.append(self._get_segment(start, self._max_request_size)) + request_size -= self._max_request_size + start += self._max_request_size + if start < end: + futures.append(self._get_segment(start, end-start)) + return [fut.get_result() for fut in futures] + + @ndb.tasklet + def _get_segment(self, start, request_size): + """Get a segment of the file from Google Storage. + + Args: + start: start offset of the segment. Inclusive. Have to be within the + range of the file. + request_size: number of bytes to request. Have to be small enough + for a single urlfetch request. May go over the logical range of the + file. + + Yields: + a segment [start, start + request_size) of the file. + + Raises: + ValueError: if the file has changed while reading. + """ + end = start + request_size - 1 + content_range = '%d-%d' % (start, end) + headers = {'Range': 'bytes=' + content_range} + status, resp_headers, content = yield self._api.get_object_async( + self._path, headers=headers) + errors.check_status(status, [200, 206], self._path, headers, resp_headers) + self._check_etag(resp_headers.get('etag')) + raise ndb.Return(content) + + def _check_etag(self, etag): + """Check if etag is the same across requests to GCS. + + If self._etag is None, set it. If etag is set, check that the new + etag equals the old one. + + In the __init__ method, we fire one HEAD and one GET request using + ndb tasklet. One of them would return first and set the first value. + + Args: + etag: etag from a GCS HTTP response. None if etag is not part of the + response header. It could be None for example in the case of GCS + composite file. + + Raises: + ValueError: if two etags are not equal. + """ + if etag is None: + return + elif self._etag is None: + self._etag = etag + elif self._etag != etag: + raise ValueError('File on GCS has changed while reading.') + + def close(self): + self.closed = True + self._buffer = None + self._buffer_future = None + + def __enter__(self): + return self + + def __exit__(self, atype, value, traceback): + self.close() + return False + + def seek(self, offset, whence=os.SEEK_SET): + """Set the file's current offset. + + Note if the new offset is out of bound, it is adjusted to either 0 or EOF. + + Args: + offset: seek offset as number. + whence: seek mode. Supported modes are os.SEEK_SET (absolute seek), + os.SEEK_CUR (seek relative to the current position), and os.SEEK_END + (seek relative to the end, offset should be negative). + + Raises: + IOError: When this buffer is closed. + ValueError: When whence is invalid. + """ + self._check_open() + + self._buffer.reset() + self._buffer_future = None + + if whence == os.SEEK_SET: + self._offset = offset + elif whence == os.SEEK_CUR: + self._offset += offset + elif whence == os.SEEK_END: + self._offset = self._file_size + offset + else: + raise ValueError('Whence mode %s is invalid.' % str(whence)) + + self._offset = min(self._offset, self._file_size) + self._offset = max(self._offset, 0) + if self._remaining(): + self._request_next_buffer() + + def tell(self): + """Tell the file's current offset. + + Returns: + current offset in reading this file. + + Raises: + IOError: When this buffer is closed. + """ + self._check_open() + return self._offset + + def _check_open(self): + if self.closed: + raise IOError('Buffer is closed.') + + def seekable(self): + return True + + def readable(self): + return True + + def writable(self): + return False + + +class _Buffer(object): + """In memory buffer.""" + + def __init__(self): + self.reset() + + def reset(self, content='', offset=0): + self._buffer = content + self._offset = offset + + def read(self, size=-1): + """Returns bytes from self._buffer and update related offsets. + + Args: + size: number of bytes to read starting from current offset. + Read the entire buffer if negative. + + Returns: + Requested bytes from buffer. + """ + if size < 0: + offset = len(self._buffer) + else: + offset = self._offset + size + return self.read_to_offset(offset) + + def read_to_offset(self, offset): + """Returns bytes from self._buffer and update related offsets. + + Args: + offset: read from current offset to this offset, exclusive. + + Returns: + Requested bytes from buffer. + """ + assert offset >= self._offset + result = self._buffer[self._offset: offset] + self._offset += len(result) + return result + + def remaining(self): + return len(self._buffer) - self._offset + + def find_newline(self, size=-1): + """Search for newline char in buffer starting from current offset. + + Args: + size: number of bytes to search. -1 means all. + + Returns: + offset of newline char in buffer. -1 if doesn't exist. + """ + if size < 0: + return self._buffer.find('\n', self._offset) + return self._buffer.find('\n', self._offset, self._offset + size) + + +class StreamingBuffer(object): + """A class for creating large objects using the 'resumable' API. + + The API is a subset of the Python writable stream API sufficient to + support writing zip files using the zipfile module. + + The exact sequence of calls and use of headers is documented at + https://developers.google.com/storage/docs/developer-guide#unknownresumables + """ + + _blocksize = 256 * 1024 + + _maxrequestsize = 16 * _blocksize + + def __init__(self, + api, + path, + content_type=None, + gcs_headers=None): + """Constructor. + + Args: + api: A StorageApi instance. + path: Quoted/escaped path to the object, e.g. /mybucket/myfile + content_type: Optional content-type; Default value is + delegate to Google Cloud Storage. + gcs_headers: additional gs headers as a str->str dict, e.g + {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}. + """ + assert self._maxrequestsize > self._blocksize + assert self._maxrequestsize % self._blocksize == 0 + + self._api = api + self._path = path + self.name = api_utils._unquote_filename(path) + self.closed = False + + self._buffer = collections.deque() + self._buffered = 0 + self._written = 0 + self._offset = 0 + + headers = {'x-goog-resumable': 'start'} + if content_type: + headers['content-type'] = content_type + if gcs_headers: + headers.update(gcs_headers) + status, resp_headers, _ = self._api.post_object(path, headers=headers) + errors.check_status(status, [201], path, headers, resp_headers) + loc = resp_headers.get('location') + if not loc: + raise IOError('No location header found in 201 response') + parsed = urlparse.urlparse(loc) + self._path_with_token = '%s?%s' % (self._path, parsed.query) + + def __getstate__(self): + """Store state as part of serialization/pickling. + + The contents of the write buffer are stored. Writes to the underlying + storage are required to be on block boundaries (_blocksize) except for the + last write. In the worst case the pickled version of this object may be + slightly larger than the blocksize. + + Returns: + A dictionary with the state of this object + + """ + return {'api': self._api, + 'path': self._path, + 'path_token': self._path_with_token, + 'buffer': self._buffer, + 'buffered': self._buffered, + 'written': self._written, + 'offset': self._offset, + 'closed': self.closed} + + def __setstate__(self, state): + """Restore state as part of deserialization/unpickling. + + Args: + state: the dictionary from a __getstate__ call + """ + self._api = state['api'] + self._path_with_token = state['path_token'] + self._buffer = state['buffer'] + self._buffered = state['buffered'] + self._written = state['written'] + self._offset = state['offset'] + self.closed = state['closed'] + self._path = state['path'] + self.name = api_utils._unquote_filename(self._path) + + def write(self, data): + """Write some bytes. + + Args: + data: data to write. str. + + Raises: + TypeError: if data is not of type str. + """ + self._check_open() + if not isinstance(data, str): + raise TypeError('Expected str but got %s.' % type(data)) + if not data: + return + self._buffer.append(data) + self._buffered += len(data) + self._offset += len(data) + if self._buffered >= self._blocksize: + self._flush() + + def flush(self): + """Dummy API. + + This API is provided because the zipfile module uses it. It is a + no-op because Google Storage *requires* that all writes except for + the final one are multiples on 256K bytes aligned on 256K-byte + boundaries. + """ + self._check_open() + + def tell(self): + """Return the total number of bytes passed to write() so far. + + (There is no seek() method.) + """ + return self._offset + + def close(self): + """Flush the buffer and finalize the file. + + When this returns the new file is available for reading. + """ + if not self.closed: + self.closed = True + self._flush(finish=True) + self._buffer = None + + def __enter__(self): + return self + + def __exit__(self, atype, value, traceback): + self.close() + return False + + def _flush(self, finish=False): + """Internal API to flush. + + This is called only when the total amount of buffered data is at + least self._blocksize, or to flush the final (incomplete) block of + the file with finish=True. + """ + flush_len = 0 if finish else self._blocksize + + while self._buffered >= flush_len: + buffer = [] + buffered = 0 + + while self._buffer: + buf = self._buffer.popleft() + size = len(buf) + self._buffered -= size + buffer.append(buf) + buffered += size + if buffered >= self._maxrequestsize: + break + + if buffered > self._maxrequestsize: + excess = buffered - self._maxrequestsize + elif finish: + excess = 0 + else: + excess = buffered % self._blocksize + + if excess: + over = buffer.pop() + size = len(over) + assert size >= excess + buffered -= size + head, tail = over[:-excess], over[-excess:] + self._buffer.appendleft(tail) + self._buffered += len(tail) + if head: + buffer.append(head) + buffered += len(head) + + data = ''.join(buffer) + file_len = '*' + if finish and not self._buffered: + file_len = self._written + len(data) + self._send_data(data, self._written, file_len) + self._written += len(data) + if file_len != '*': + break + + def _send_data(self, data, start_offset, file_len): + """Send the block to the storage service. + + This is a utility method that does not modify self. + + Args: + data: data to send in str. + start_offset: start offset of the data in relation to the file. + file_len: an int if this is the last data to append to the file. + Otherwise '*'. + """ + headers = {} + end_offset = start_offset + len(data) - 1 + + if data: + headers['content-range'] = ('bytes %d-%d/%s' % + (start_offset, end_offset, file_len)) + else: + headers['content-range'] = ('bytes */%s' % file_len) + + status, response_headers, _ = self._api.put_object( + self._path_with_token, payload=data, headers=headers) + if file_len == '*': + expected = 308 + else: + expected = 200 + errors.check_status(status, [expected], self._path, headers, + response_headers, + {'upload_path': self._path_with_token}) + + def _get_offset_from_gcs(self): + """Get the last offset that has been written to GCS. + + This is a utility method that does not modify self. + + Returns: + an int of the last offset written to GCS by this upload, inclusive. + -1 means nothing has been written. + """ + headers = {'content-range': 'bytes */*'} + status, response_headers, _ = self._api.put_object( + self._path_with_token, headers=headers) + errors.check_status(status, [308], self._path, headers, + response_headers, + {'upload_path': self._path_with_token}) + val = response_headers.get('range') + if val is None: + return -1 + _, offset = val.rsplit('-', 1) + return int(offset) + + def _force_close(self, file_length=None): + """Close this buffer on file_length. + + Finalize this upload immediately on file_length. + Contents that are still in memory will not be uploaded. + + This is a utility method that does not modify self. + + Args: + file_length: file length. Must match what has been uploaded. If None, + it will be queried from GCS. + """ + if file_length is None: + file_length = self._get_offset_from_gcs() + 1 + self._send_data('', 0, file_length) + + def _check_open(self): + if self.closed: + raise IOError('Buffer is closed.') + + def seekable(self): + return False + + def readable(self): + return False + + def writable(self): + return True diff --git a/third_party/google_appengine_cloudstorage/cloudstorage/test_utils.py b/third_party/google_appengine_cloudstorage/cloudstorage/test_utils.py new file mode 100644 index 00000000000000..e4d82477dce7d4 --- /dev/null +++ b/third_party/google_appengine_cloudstorage/cloudstorage/test_utils.py @@ -0,0 +1,25 @@ +# Copyright 2013 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Utils for testing.""" + + +class MockUrlFetchResult(object): + + def __init__(self, status, headers, body): + self.status_code = status + self.headers = headers + self.content = body + self.content_was_truncated = False + self.final_url = None