# -*- coding: utf-8 -*- # # Copyright (C) 2007-2009 Christopher Lenz # All rights reserved. # # This software is licensed as described in the file COPYING, which # you should have received as part of this distribution. """Python client API for CouchDB. >>> server = Server('http://localhost:5984/') >>> db = server.create('python-tests') >>> doc_id = db.create({'type': 'Person', 'name': 'John Doe'}) >>> doc = db[doc_id] >>> doc['type'] 'Person' >>> doc['name'] 'John Doe' >>> del db[doc.id] >>> doc.id in db False >>> del server['python-tests'] """ import httplib2 import mimetypes from urllib import quote, urlencode from types import FunctionType from inspect import getsource from textwrap import dedent import re import socket from couchdb import json __all__ = ['PreconditionFailed', 'ResourceNotFound', 'ResourceConflict', 'ServerError', 'Server', 'Database', 'Document', 'ViewResults', 'Row'] __docformat__ = 'restructuredtext en' DEFAULT_BASE_URI = 'http://localhost:5984/' class PreconditionFailed(Exception): """Exception raised when a 412 HTTP error is received in response to a request. """ class ResourceNotFound(Exception): """Exception raised when a 404 HTTP error is received in response to a request. """ class ResourceConflict(Exception): """Exception raised when a 409 HTTP error is received in response to a request. """ class ServerError(Exception): """Exception raised when an unexpected HTTP error is received in response to a request. """ class Server(object): """Representation of a CouchDB server. >>> server = Server('http://localhost:5984/') This class behaves like a dictionary of databases. For example, to get a list of database names on the server, you can simply iterate over the server object. New databases can be created using the `create` method: >>> db = server.create('python-tests') >>> db You can access existing databases using item access, specifying the database name as the key: >>> db = server['python-tests'] >>> db.name 'python-tests' Databases can be deleted using a ``del`` statement: >>> del server['python-tests'] """ def __init__(self, uri=DEFAULT_BASE_URI, cache=None, timeout=None): """Initialize the server object. :param uri: the URI of the server (for example ``http://localhost:5984/``) :param cache: either a cache directory path (as a string) or an object compatible with the ``httplib2.FileCache`` interface. If `None` (the default), no caching is performed. :param timeout: socket timeout in number of seconds, or `None` for no timeout """ http = httplib2.Http(cache=cache, timeout=timeout) http.force_exception_to_status_code = False self.resource = Resource(http, uri) def __contains__(self, name): """Return whether the server contains a database with the specified name. :param name: the database name :return: `True` if a database with the name exists, `False` otherwise """ try: self.resource.head(validate_dbname(name)) return True except ResourceNotFound: return False def __iter__(self): """Iterate over the names of all databases.""" resp, data = self.resource.get('_all_dbs') return iter(data) def __len__(self): """Return the number of databases.""" resp, data = self.resource.get('_all_dbs') return len(data) def __nonzero__(self): """Return whether the server is available.""" try: self.resource.head() return True except: return False def __repr__(self): return '<%s %r>' % (type(self).__name__, self.resource.uri) def __delitem__(self, name): """Remove the database with the specified name. :param name: the name of the database :raise ResourceNotFound: if no database with that name exists """ self.resource.delete(validate_dbname(name)) def __getitem__(self, name): """Return a `Database` object representing the database with the specified name. :param name: the name of the database :return: a `Database` object representing the database :rtype: `Database` :raise ResourceNotFound: if no database with that name exists """ db = Database(uri(self.resource.uri, name), validate_dbname(name), http=self.resource.http) db.resource.head() # actually make a request to the database return db @property def config(self): """The configuration of the CouchDB server. The configuration is represented as a nested dictionary of sections and options from the configuration files of the server, or the default values for options that are not explicitly configured. :type: `dict` """ resp, data = self.resource.get('_config') return data @property def version(self): """The version string of the CouchDB server. Note that this results in a request being made, and can also be used to check for the availability of the server. :type: `unicode`""" resp, data = self.resource.get() return data['version'] def stats(self): """Database statistics.""" resp, data = self.resource.get('_stats') return data def tasks(self): """A list of tasks currently active on the server.""" resp, data = self.resource.get('_active_tasks') return data def create(self, name): """Create a new database with the given name. :param name: the name of the database :return: a `Database` object representing the created database :rtype: `Database` :raise PreconditionFailed: if a database with that name already exists """ self.resource.put(validate_dbname(name)) return self[name] def delete(self, name): """Delete the database with the specified name. :param name: the name of the database :raise ResourceNotFound: if a database with that name does not exist :since: 0.6 """ del self[name] def replicate(self, source, target, **options): """Replicate changes from the source database to the target database. :param source: URL of the source database :param target: URL of the target database :param options: optional replication args, e.g. continuous=True """ data = {'source': source, 'target': target} data.update(options) resp, data = self.resource.post('_replicate', data) return data class Database(object): """Representation of a database on a CouchDB server. >>> server = Server('http://localhost:5984/') >>> db = server.create('python-tests') New documents can be added to the database using the `create()` method: >>> doc_id = db.create({'type': 'Person', 'name': 'John Doe'}) This class provides a dictionary-like interface to databases: documents are retrieved by their ID using item access >>> doc = db[doc_id] >>> doc #doctest: +ELLIPSIS Documents are represented as instances of the `Row` class, which is basically just a normal dictionary with the additional attributes ``id`` and ``rev``: >>> doc.id, doc.rev #doctest: +ELLIPSIS ('...', ...) >>> doc['type'] 'Person' >>> doc['name'] 'John Doe' To update an existing document, you use item access, too: >>> doc['name'] = 'Mary Jane' >>> db[doc.id] = doc The `create()` method creates a document with a random ID generated by CouchDB (which is not recommended). If you want to explicitly specify the ID, you'd use item access just as with updating: >>> db['JohnDoe'] = {'type': 'person', 'name': 'John Doe'} >>> 'JohnDoe' in db True >>> len(db) 2 >>> del server['python-tests'] """ def __init__(self, uri, name=None, http=None): self.resource = Resource(http, uri) self._name = name def __repr__(self): return '<%s %r>' % (type(self).__name__, self.name) def __contains__(self, id): """Return whether the database contains a document with the specified ID. :param id: the document ID :return: `True` if a document with the ID exists, `False` otherwise """ try: self.resource.head(id) return True except ResourceNotFound: return False def __iter__(self): """Return the IDs of all documents in the database.""" return iter([item.id for item in self.view('_all_docs')]) def __len__(self): """Return the number of documents in the database.""" resp, data = self.resource.get() return data['doc_count'] def __nonzero__(self): """Return whether the database is available.""" try: self.resource.head() return True except: return False def __delitem__(self, id): """Remove the document with the specified ID from the database. :param id: the document ID """ resp, data = self.resource.head(id) self.resource.delete(id, rev=resp['etag'].strip('"')) def __getitem__(self, id): """Return the document with the specified ID. :param id: the document ID :return: a `Row` object representing the requested document :rtype: `Document` """ resp, data = self.resource.get(id) return Document(data) def __setitem__(self, id, content): """Create or update a document with the specified ID. :param id: the document ID :param content: the document content; either a plain dictionary for new documents, or a `Row` object for existing documents """ resp, data = self.resource.put(id, content=content) content.update({'_id': data['id'], '_rev': data['rev']}) @property def name(self): """The name of the database. Note that this may require a request to the server unless the name has already been cached by the `info()` method. :type: basestring """ if self._name is None: self.info() return self._name def create(self, data): """Create a new document in the database with a random ID that is generated by the server. Note that it is generally better to avoid the `create()` method and instead generate document IDs on the client side. This is due to the fact that the underlying HTTP ``POST`` method is not idempotent, and an automatic retry due to a problem somewhere on the networking stack may cause multiple documents being created in the database. To avoid such problems you can generate a UUID on the client side. Python (since version 2.5) comes with a ``uuid`` module that can be used for this:: from uuid import uuid4 doc_id = uuid4().hex db[doc_id] = {'type': 'person', 'name': 'John Doe'} :param data: the data to store in the document :return: the ID of the created document :rtype: `unicode` """ resp, data = self.resource.post(content=data) return data['id'] def compact(self): """Compact the database. This will try to prune all revisions from the database. :return: a boolean to indicate whether the compaction was initiated successfully :rtype: `bool` """ resp, data = self.resource.post('_compact') return data['ok'] def copy(self, src, dest): """Copy the given document to create a new document. :param src: the ID of the document to copy, or a dictionary or `Document` object representing the source document. :param dest: either the destination document ID as string, or a dictionary or `Document` instance of the document that should be overwritten. :return: the new revision of the destination document :rtype: `str` :since: 0.6 """ if not isinstance(src, basestring): if not isinstance(src, dict): if hasattr(src, 'items'): src = src.items() else: raise TypeError('expected dict or string, got %s' % type(src)) src = src['_id'] if not isinstance(dest, basestring): if not isinstance(dest, dict): if hasattr(dest, 'items'): dest = dest.items() else: raise TypeError('expected dict or string, got %s' % type(dest)) if '_rev' in dest: dest = '%s?%s' % (unicode_quote(dest['_id']), unicode_urlencode({'rev': dest['_rev']})) else: dest = unicode_quote(dest['_id']) resp, data = self.resource._request('COPY', src, headers={'Destination': dest}) return data['rev'] def delete(self, doc): """Delete the given document from the database. Use this method in preference over ``__del__`` to ensure you're deleting the revision that you had previously retrieved. In the case the document has been updated since it was retrieved, this method will raise a `ResourceConflict` exception. >>> server = Server('http://localhost:5984/') >>> db = server.create('python-tests') >>> doc = dict(type='Person', name='John Doe') >>> db['johndoe'] = doc >>> doc2 = db['johndoe'] >>> doc2['age'] = 42 >>> db['johndoe'] = doc2 >>> db.delete(doc) Traceback (most recent call last): ... ResourceConflict: ('conflict', 'Document update conflict.') >>> del server['python-tests'] :param doc: a dictionary or `Document` object holding the document data :raise ResourceConflict: if the document was updated in the database :since: 0.4.1 """ self.resource.delete(doc['_id'], rev=doc['_rev']) def get(self, id, default=None, **options): """Return the document with the specified ID. :param id: the document ID :param default: the default value to return when the document is not found :return: a `Row` object representing the requested document, or `None` if no document with the ID was found :rtype: `Document` """ try: resp, data = self.resource.get(id, **options) except ResourceNotFound: return default else: return Document(data) def revisions(self, id, **options): """Return all available revisions of the given document. :param id: the document ID :return: an iterator over Document objects, each a different revision, in reverse chronological order, if any were found """ try: resp, data = self.resource.get(id, revs=True) except ResourceNotFound: return startrev = data['_revisions']['start'] for index, rev in enumerate(data['_revisions']['ids']): options['rev'] = '%d-%s' % (startrev - index, rev) revision = self.get(id, **options) if revision is None: return yield revision def info(self): """Return information about the database as a dictionary. The returned dictionary exactly corresponds to the JSON response to a ``GET`` request on the database URI. :return: a dictionary of database properties :rtype: ``dict`` :since: 0.4 """ resp, data = self.resource.get() self._name = data['db_name'] return data def delete_attachment(self, doc, filename): """Delete the specified attachment. Note that the provided `doc` is required to have a ``_rev`` field. Thus, if the `doc` is based on a view row, the view row would need to include the ``_rev`` field. :param doc: the dictionary or `Document` object representing the document that the attachment belongs to :param filename: the name of the attachment file :since: 0.4.1 """ resp, data = self.resource(doc['_id']).delete(filename, rev=doc['_rev']) doc['_rev'] = data['rev'] def get_attachment(self, id_or_doc, filename, default=None): """Return an attachment from the specified doc id and filename. :param id_or_doc: either a document ID or a dictionary or `Document` object representing the document that the attachment belongs to :param filename: the name of the attachment file :param default: default value to return when the document or attachment is not found :return: the content of the attachment as a string, or the value of the `default` argument if the attachment is not found :since: 0.4.1 """ if isinstance(id_or_doc, basestring): id = id_or_doc else: id = id_or_doc['_id'] try: resp, data = self.resource(id).get(filename) return data except ResourceNotFound: return default def put_attachment(self, doc, content, filename=None, content_type=None): """Create or replace an attachment. Note that the provided `doc` is required to have a ``_rev`` field. Thus, if the `doc` is based on a view row, the view row would need to include the ``_rev`` field. :param doc: the dictionary or `Document` object representing the document that the attachment should be added to :param content: the content to upload, either a file-like object or a string :param filename: the name of the attachment file; if omitted, this function tries to get the filename from the file-like object passed as the `content` argument value :param content_type: content type of the attachment; if omitted, the MIME type is guessed based on the file name extension :since: 0.4.1 """ if hasattr(content, 'read'): content = content.read() if filename is None: if hasattr(content, 'name'): filename = content.name else: raise ValueError('no filename specified for attachment') if content_type is None: content_type = ';'.join(filter(None, mimetypes.guess_type(filename))) resp, data = self.resource(doc['_id']).put(filename, content=content, headers={ 'Content-Type': content_type }, rev=doc['_rev']) doc['_rev'] = data['rev'] def query(self, map_fun, reduce_fun=None, language='javascript', wrapper=None, **options): """Execute an ad-hoc query (a "temp view") against the database. >>> server = Server('http://localhost:5984/') >>> db = server.create('python-tests') >>> db['johndoe'] = dict(type='Person', name='John Doe') >>> db['maryjane'] = dict(type='Person', name='Mary Jane') >>> db['gotham'] = dict(type='City', name='Gotham City') >>> map_fun = '''function(doc) { ... if (doc.type == 'Person') ... emit(doc.name, null); ... }''' >>> for row in db.query(map_fun): ... print row.key John Doe Mary Jane >>> for row in db.query(map_fun, descending=True): ... print row.key Mary Jane John Doe >>> for row in db.query(map_fun, key='John Doe'): ... print row.key John Doe >>> del server['python-tests'] :param map_fun: the code of the map function :param reduce_fun: the code of the reduce function (optional) :param language: the language of the functions, to determine which view server to use :param wrapper: an optional callable that should be used to wrap the result rows :param options: optional query string parameters :return: the view reults :rtype: `ViewResults` """ return TemporaryView(uri(self.resource.uri, '_temp_view'), map_fun, reduce_fun, language=language, wrapper=wrapper, http=self.resource.http)(**options) def update(self, documents, **options): """Perform a bulk update or insertion of the given documents using a single HTTP request. >>> server = Server('http://localhost:5984/') >>> db = server.create('python-tests') >>> for doc in db.update([ ... Document(type='Person', name='John Doe'), ... Document(type='Person', name='Mary Jane'), ... Document(type='City', name='Gotham City') ... ]): ... print repr(doc) #doctest: +ELLIPSIS (True, '...', '...') (True, '...', '...') (True, '...', '...') >>> del server['python-tests'] The return value of this method is a list containing a tuple for every element in the `documents` sequence. Each tuple is of the form ``(success, docid, rev_or_exc)``, where ``success`` is a boolean indicating whether the update succeeded, ``docid`` is the ID of the document, and ``rev_or_exc`` is either the new document revision, or an exception instance (e.g. `ResourceConflict`) if the update failed. If an object in the documents list is not a dictionary, this method looks for an ``items()`` method that can be used to convert the object to a dictionary. Effectively this means you can also use this method with `schema.Document` objects. :param documents: a sequence of dictionaries or `Document` objects, or objects providing a ``items()`` method that can be used to convert them to a dictionary :return: an iterable over the resulting documents :rtype: ``list`` :since: version 0.2 """ docs = [] for doc in documents: if isinstance(doc, dict): docs.append(doc) elif hasattr(doc, 'items'): docs.append(dict(doc.items())) else: raise TypeError('expected dict, got %s' % type(doc)) content = options content.update(docs=docs) resp, data = self.resource.post('_bulk_docs', content=content) results = [] for idx, result in enumerate(data): if 'error' in result: if result['error'] == 'conflict': exc_type = ResourceConflict else: # XXX: Any other error types mappable to exceptions here? exc_type = ServerError results.append((False, result['id'], exc_type(result['reason']))) else: doc = documents[idx] if isinstance(doc, dict): # XXX: Is this a good idea?? doc.update({'_id': result['id'], '_rev': result['rev']}) results.append((True, result['id'], result['rev'])) return results def view(self, name, wrapper=None, **options): """Execute a predefined view. >>> server = Server('http://localhost:5984/') >>> db = server.create('python-tests') >>> db['gotham'] = dict(type='City', name='Gotham City') >>> for row in db.view('_all_docs'): ... print row.id gotham >>> del server['python-tests'] :param name: the name of the view; for custom views, use the format ``design_docid/viewname``, that is, the document ID of the design document and the name of the view, separated by a slash :param wrapper: an optional callable that should be used to wrap the result rows :param options: optional query string parameters :return: the view results :rtype: `ViewResults` """ if not name.startswith('_'): design, name = name.split('/', 1) name = '/'.join(['_design', design, '_view', name]) return PermanentView(uri(self.resource.uri, *name.split('/')), name, wrapper=wrapper, http=self.resource.http)(**options) class Document(dict): """Representation of a document in the database. This is basically just a dictionary with the two additional properties `id` and `rev`, which contain the document ID and revision, respectively. """ def __repr__(self): return '<%s %r@%r %r>' % (type(self).__name__, self.id, self.rev, dict([(k,v) for k,v in self.items() if k not in ('_id', '_rev')])) @property def id(self): """The document ID. :type: basestring """ return self['_id'] @property def rev(self): """The document revision. :type: basestring """ return self['_rev'] class View(object): """Abstract representation of a view or query.""" def __init__(self, uri, wrapper=None, http=None): self.resource = Resource(http, uri) self.wrapper = wrapper def __call__(self, **options): return ViewResults(self, options) def __iter__(self): return self() def _encode_options(self, options): retval = {} for name, value in options.items(): if name in ('key', 'startkey', 'endkey') \ or not isinstance(value, basestring): value = json.encode(value) retval[name] = value return retval def _exec(self, options): raise NotImplementedError class PermanentView(View): """Representation of a permanent view on the server.""" def __init__(self, uri, name, wrapper=None, http=None): View.__init__(self, uri, wrapper=wrapper, http=http) self.name = name def __repr__(self): return '<%s %r>' % (type(self).__name__, self.name) def _exec(self, options): if 'keys' in options: options = options.copy() keys = {'keys': options.pop('keys')} resp, data = self.resource.post(content=keys, **self._encode_options(options)) else: resp, data = self.resource.get(**self._encode_options(options)) return data class TemporaryView(View): """Representation of a temporary view.""" def __init__(self, uri, map_fun, reduce_fun=None, language='javascript', wrapper=None, http=None): View.__init__(self, uri, wrapper=wrapper, http=http) if isinstance(map_fun, FunctionType): map_fun = getsource(map_fun).rstrip('\n\r') self.map_fun = dedent(map_fun.lstrip('\n\r')) if isinstance(reduce_fun, FunctionType): reduce_fun = getsource(reduce_fun).rstrip('\n\r') if reduce_fun: reduce_fun = dedent(reduce_fun.lstrip('\n\r')) self.reduce_fun = reduce_fun self.language = language def __repr__(self): return '<%s %r %r>' % (type(self).__name__, self.map_fun, self.reduce_fun) def _exec(self, options): body = {'map': self.map_fun, 'language': self.language} if self.reduce_fun: body['reduce'] = self.reduce_fun if 'keys' in options: options = options.copy() body['keys'] = options.pop('keys') content = json.encode(body).encode('utf-8') resp, data = self.resource.post(content=content, headers={ 'Content-Type': 'application/json' }, **self._encode_options(options)) return data class ViewResults(object): """Representation of a parameterized view (either permanent or temporary) and the results it produces. This class allows the specification of ``key``, ``startkey``, and ``endkey`` options using Python slice notation. >>> server = Server('http://localhost:5984/') >>> db = server.create('python-tests') >>> db['johndoe'] = dict(type='Person', name='John Doe') >>> db['maryjane'] = dict(type='Person', name='Mary Jane') >>> db['gotham'] = dict(type='City', name='Gotham City') >>> map_fun = '''function(doc) { ... emit([doc.type, doc.name], doc.name); ... }''' >>> results = db.query(map_fun) At this point, the view has not actually been accessed yet. It is accessed as soon as it is iterated over, its length is requested, or one of its `rows`, `total_rows`, or `offset` properties are accessed: >>> len(results) 3 You can use slices to apply ``startkey`` and/or ``endkey`` options to the view: >>> people = results[['Person']:['Person','ZZZZ']] >>> for person in people: ... print person.value John Doe Mary Jane >>> people.total_rows, people.offset (3, 1) Use plain indexed notation (without a slice) to apply the ``key`` option. Note that as CouchDB makes no claim that keys are unique in a view, this can still return multiple rows: >>> list(results[['City', 'Gotham City']]) [] >>> del server['python-tests'] """ def __init__(self, view, options): self.view = view self.options = options self._rows = self._total_rows = self._offset = None def __repr__(self): return '<%s %r %r>' % (type(self).__name__, self.view, self.options) def __getitem__(self, key): options = self.options.copy() if type(key) is slice: if key.start is not None: options['startkey'] = key.start if key.stop is not None: options['endkey'] = key.stop return ViewResults(self.view, options) else: options['key'] = key return ViewResults(self.view, options) def __iter__(self): wrapper = self.view.wrapper for row in self.rows: if wrapper is not None: yield wrapper(row) else: yield row def __len__(self): return len(self.rows) def _fetch(self): data = self.view._exec(self.options) self._rows = [Row(row) for row in data['rows']] self._total_rows = data.get('total_rows') self._offset = data.get('offset', 0) @property def rows(self): """The list of rows returned by the view. :type: `list` """ if self._rows is None: self._fetch() return self._rows @property def total_rows(self): """The total number of rows in this view. This value is `None` for reduce views. :type: `int` or ``NoneType`` for reduce views """ if self._rows is None: self._fetch() return self._total_rows @property def offset(self): """The offset of the results from the first row in the view. This value is 0 for reduce views. :type: `int` """ if self._rows is None: self._fetch() return self._offset class Row(dict): """Representation of a row as returned by database views.""" def __repr__(self): if self.id is None: return '<%s key=%r, value=%r>' % (type(self).__name__, self.key, self.value) return '<%s id=%r, key=%r, value=%r>' % (type(self).__name__, self.id, self.key, self.value) @property def id(self): """The associated Document ID if it exists. Returns `None` when it doesn't (reduce results). """ return self.get('id') @property def key(self): """The associated key.""" return self['key'] @property def value(self): """The associated value.""" return self['value'] @property def doc(self): """The associated document for the row. This is only present when the view was accessed with ``include_docs=True`` as a query parameter, otherwise this property will be `None`. """ doc = self.get('doc') if doc: return Document(doc) # Internals class Resource(object): def __init__(self, http, uri): if http is None: http = httplib2.Http() http.force_exception_to_status_code = False self.http = http self.uri = uri def __call__(self, path): return type(self)(self.http, uri(self.uri, path)) def delete(self, path=None, headers=None, **params): return self._request('DELETE', path, headers=headers, **params) def get(self, path=None, headers=None, **params): return self._request('GET', path, headers=headers, **params) def head(self, path=None, headers=None, **params): return self._request('HEAD', path, headers=headers, **params) def post(self, path=None, content=None, headers=None, **params): return self._request('POST', path, content=content, headers=headers, **params) def put(self, path=None, content=None, headers=None, **params): return self._request('PUT', path, content=content, headers=headers, **params) def _request(self, method, path=None, content=None, headers=None, **params): from couchdb import __version__ headers = headers or {} headers.setdefault('Accept', 'application/json') headers.setdefault('User-Agent', 'couchdb-python %s' % __version__) body = None if content is not None: if not isinstance(content, basestring): body = json.encode(content).encode('utf-8') headers.setdefault('Content-Type', 'application/json') else: body = content headers.setdefault('Content-Length', str(len(body))) def _make_request(retry=1): try: return self.http.request(uri(self.uri, path, **params), method, body=body, headers=headers) except socket.error, e: if retry > 0 and e.args[0] == 54: # reset by peer return _make_request(retry - 1) raise resp, data = _make_request() status_code = int(resp.status) if data and resp.get('content-type') == 'application/json': try: data = json.decode(data) except ValueError: pass if status_code >= 400: if type(data) is dict: error = (data.get('error'), data.get('reason')) else: error = data if status_code == 404: raise ResourceNotFound(error) elif status_code == 409: raise ResourceConflict(error) elif status_code == 412: raise PreconditionFailed(error) else: raise ServerError((status_code, error)) return resp, data def uri(base, *path, **query): """Assemble a uri based on a base, any number of path segments, and query string parameters. >>> uri('http://example.org', '_all_dbs') 'http://example.org/_all_dbs' A trailing slash on the uri base is handled gracefully: >>> uri('http://example.org/', '_all_dbs') 'http://example.org/_all_dbs' And multiple positional arguments become path parts: >>> uri('http://example.org/', 'foo', 'bar') 'http://example.org/foo/bar' All slashes within a path part are escaped: >>> uri('http://example.org/', 'foo/bar') 'http://example.org/foo%2Fbar' >>> uri('http://example.org/', 'foo', '/bar/') 'http://example.org/foo/%2Fbar%2F' """ if base and base.endswith('/'): base = base[:-1] retval = [base] # build the path path = '/'.join([''] + [unicode_quote(s) for s in path if s is not None]) if path: retval.append(path) # build the query string params = [] for name, value in query.items(): if type(value) in (list, tuple): params.extend([(name, i) for i in value if i is not None]) elif value is not None: if value is True: value = 'true' elif value is False: value = 'false' params.append((name, value)) if params: retval.extend(['?', unicode_urlencode(params)]) return ''.join(retval) def unicode_quote(string, safe=''): if isinstance(string, unicode): string = string.encode('utf-8') return quote(string, safe) def unicode_urlencode(data): if isinstance(data, dict): data = data.items() params = [] for name, value in data: if isinstance(value, unicode): value = value.encode('utf-8') params.append((name, value)) return urlencode(params) VALID_DB_NAME = re.compile(r'^[a-z][a-z0-9_$()+-/]*$') def validate_dbname(name): if not VALID_DB_NAME.match(name): raise ValueError('Invalid database name') return name