From d1521504f2ee1688b3c1eacf1458ec9c2064ccd5 Mon Sep 17 00:00:00 2001 From: Patrick Uiterwijk Date: Feb 21 2017 19:43:20 +0000 Subject: Implement Etcd-based data store Signed-off-by: Patrick Uiterwijk Reviewed-by: Randy Barlow Reviewed-by: Howard Johnson --- diff --git a/ipsilon/util/data.py b/ipsilon/util/data.py index 9713cb1..ef2b25b 100644 --- a/ipsilon/util/data.py +++ b/ipsilon/util/data.py @@ -10,8 +10,14 @@ from sqlalchemy.schema import (PrimaryKeyConstraint, Index, AddConstraint, CreateIndex) from sqlalchemy.sql import select, and_ import ConfigParser +try: + import etcd +except ImportError: + etcd = None import os +import json import uuid +from urlparse import urlparse import logging import time @@ -319,6 +325,284 @@ class FileQuery(Log): raise NotImplementedError +class EtcdStore(BaseStore): + """Etcd-based storage + + Example URI: etcd://server/rootpath?port=2379&scheme=https + The rootpath indicates at what point in the etcd key-space we will insert + our keys. + The parts after the ? are passed as key-value to the etcd client. + """ + + def __init__(self, uri): + if etcd is None: + raise NotImplementedError('Etcd client not available') + url = urlparse(uri) + self.rootpath = url.path + config = dict([cfg.split('=', 1) for cfg in url.query.split('&')]) + + if 'port' in config: + config['port'] = int(config['port']) + + self.debug('Etcd host: %s, rootpath: %s, config: %s' % + (url.netloc, url.path, config)) + + self.client = etcd.Client(host=url.netloc, **config) + + # We ignore the value, but this is a connection test + self.client.leader # pylint: disable=pointless-statement + + self.is_readonly = False + + def add_constraint(self, table): + raise NotImplementedError() + + def add_index(self, index): + raise NotImplementedError() + + def close(self): + # No-op + return + + +class EtcdQuery(Log): + """ + Class to store stuff in Etcd key-value stores. + + A row is stored in the etcd store under + /////.../ + Where rootpath is configurable,
is the name of the name of the + table, and pk_1, pk_2, ..., pk_n are the first, second and nth components + of the primary key of that table. + + This means that tables using etcd require a primary key. + + The value stored at those keys is a json document with all of the keys and + values for that object, including the primary keys. + + Cleanup of objects in etcd we leave to Etcd: when the object gets created, + we store the TTL in the key. + """ + + def __init__(self, store, table, table_def, trans=True): + """Query class initialization. + + store is a handle to a connected EtcdStore object. + table is the name of the "table" (key space) we are querying. + table_def is the table definition, look at OPTIONS_TABLE and + UNIQUE_DATA_TABLE for examples. + trans is accepted for compatibility with other Query types, but + ignored. + """ + if etcd is None: + raise NotImplementedError('Etcd client not available') + # We don't have indexes in a EtcdQuery, so drop that info + if isinstance(table_def, dict) and 'primary_key' in table_def: + columns = table_def['columns'] + if isinstance(columns[0], tuple): + columns = [column[0] for column in columns] + self._primary_key = tuple(table_def['primary_key']) + else: + # This is a custom plugin that uses tables that are incompatible + # with etcd. + raise ValueError('Etcd requires primary key') + self._table = table + self._table_def = table_def + self._store = store + self._section = table + self._columns = columns + self._con = store + + @property + def _table_dir(self): + """This returns the full path to the table key.""" + return '%s/%s' % (self._store.rootpath, self._table) + + def _get_most_specific_dir(self, kvfilter, del_kv=True, update=False): + """Get the most specific dir in which we can find stuff. + + Return a tuple with path and then the number of path levels not used. + + kvfilter is a dict with the keys we want to filter for. + del_kv is a boolean that indicates whether or not to remove used + filters from the kvfilter dict. + update is a boolean that indicates whether this is for an insert/update + operation. Those require a fully specified object path. + """ + path = self._table_dir + + if kvfilter is None: + kvfilter = {} + + pkeys_used = 0 + # We try to use as much of the primary key as we are able to to + # generate the most specific path possible. + for pkey in self._primary_key: + if pkey in kvfilter: + pkeys_used += 1 + path = os.path.join(path, kvfilter[pkey].replace(' ', '_')) + if del_kv: + del kvfilter[pkey] + else: + # Seems this next primary key value was not part of the filter + break + + levels_unused = len(self._primary_key) - pkeys_used + + if levels_unused != 0 and update: + raise Exception('Fully qualified object required for updates') + + return path, levels_unused + + def rollback(self): + """Rollback is ignored because etcd doesn't have transactions.""" + return + + def commit(self): + """Commit is ignored because etcd doesn't have transactions.""" + return + + def create(self): + """Create a directory to store the current table in.""" + try: + self._store.client.write(self._table_dir, None, dir=True) + except etcd.EtcdNotFile: + # This means that this key already contained a directory. In which + # case, we are done. + pass + + def drop(self): + """Drop the current table and everything under it.""" + self._store.client.delete(self._table_dir, recursive=True, dir=True) + + def _select_objects(self, kvfilter): + """ + Select all the objects that satisfy the kvfilter parts that are in the + primary key for this table. + """ + path, levels_unused = self._get_most_specific_dir(kvfilter) + try: + res = self._store.client.read(path, recursive=levels_unused != 0) + except etcd.EtcdKeyNotFound: + return None + + if levels_unused == 0: + # This was a fully qualified object, let's use the object + if res.dir: + return [] + else: + return [res] + else: + # This was not fully qualified. Given we used recursive=True, we + # know that "children" is the final objects. + return [cld for cld in res.children if not cld.dir] + + def _select_filter(self, kvfilter, res): + """ + Filters all objects from res that do not satisfy the non-primary + kvfilter entries. + """ + for obj in res: + result = json.loads(obj.value) + + pick_object = True + for key in kvfilter: + if key not in result: + pick_object = False + break + if result[key] != kvfilter[key]: + pick_object = False + break + if pick_object: + yield result + + def select(self, kvfilter=None, columns=None): + """Select specific objects from the store. + + kvfilter is a dict indicating which keys should be matched for. + columns is a list of columns to return, and their order. + Returns a list of column value lists. + """ + if columns is None: + columns = self._columns + + res = self._select_objects(kvfilter) + if res is None: + return [] + results = self._select_filter(kvfilter, res) + + rows = [] + for obj in results: + row = [] + for column in columns: + row.append(obj[column]) + rows.append(tuple(row)) + + return rows + + def insert(self, value_row, ttl=None): + """Insert a new object into the store. + + value_row is a list of column values. + ttl is the time for which the object is supposed to be kept. + """ + value_row = list(value_row) + + values = {} + for column in self._columns: + values[column] = value_row.pop(0) + + path, _ = self._get_most_specific_dir(values, False, update=True) + self._store.client.write(path, json.dumps(values), ttl=ttl) + + def update(self, values, kvfilter): + """Updates an item in the store. + + Requires a single object, thus the kvfilter must be specific to match + a single object. + + kvfilter is the dict of key-values that find a specific object. + values is the dict with key-values that we want to update to. + """ + path, _ = self._get_most_specific_dir(kvfilter, update=True) + for key in values: + if key in self._primary_key: + raise ValueError('Unable to update primary key values') + + current = json.loads(self._store.client.read(path).value) + for key in values: + current[key] = values[key] + self._store.client.write(path, json.dumps(current)) + + def delete(self, kvfilter): + """Deletes an item from the store. + + Requires a single object, thus the kvfilter must be specific to match + a single object. + + kvfilter is the dict of key-values that find a specific object. + """ + path, levels_unused = self._get_most_specific_dir(kvfilter) + if levels_unused == 0 or len(kvfilter) == 0: + try: + current = json.loads(self._store.client.read(path).value) + except etcd.EtcdKeyNotFound: + return + for key in kvfilter: + if current[key] != kvfilter[key]: + # We had 0 levels unused, meaning we are at a qualified + # object, and it doesn't match the kvfilter. We are done. + return + try: + self._store.client.delete(path, recursive=True, dir=True) + except etcd.EtcdKeyNotFound: + pass + else: + # This was not a fully specified object, we need to get all fully + # qualified objects + raise NotImplementedError() + + class Store(Log): # Static, Store-level variables _is_upgrade = False @@ -345,6 +629,9 @@ class Store(Log): _, filename = name.split('://') self._db = FileStore(filename) self._query = FileQuery + elif name.startswith('etcd://'): + self._db = EtcdStore(name) + self._query = EtcdQuery else: self._db = SqlStore.get_connection(name) self._query = SqlQuery