From 9b237b82280bf2f1e932573fcf34a259da7d8731 Mon Sep 17 00:00:00 2001 From: Patrick Uiterwijk Date: Oct 04 2018 09:01:03 +0000 Subject: Migrate user keys to SshKey Signed-off-by: Patrick Uiterwijk --- diff --git a/alembic/versions/f16ab75e4d32_sshkeys.py b/alembic/versions/f16ab75e4d32_sshkeys.py new file mode 100644 index 0000000..3376f64 --- /dev/null +++ b/alembic/versions/f16ab75e4d32_sshkeys.py @@ -0,0 +1,175 @@ +"""Migrate SSH keys to the new format. + +Revision ID: f16ab75e4d32 +Revises: 0a8f99c161e2 +Create Date: 2018-09-24 16:11:21.297620 + +""" + +# revision identifiers, used by Alembic. +revision = "f16ab75e4d32" +down_revision = "0a8f99c161e2" + +import datetime + +from alembic import op +import sqlalchemy as sa + +from pagure.lib import is_valid_ssh_key + + +def upgrade(): + """ Upgrade the database model for the way we store user's public ssh + keys. + + For this we leverage the existing ``deploykeys`` table. + It gets renamed to ``sshkeys``, we add the user_id foreign key as now + ssh keys stored in this table can be linked to an user. + Then we convert the existing ssh keys to this database model. + Finally, we drop the ``public_ssh_key`` column from the ``users`` table. + """ + users_table = sa.sql.table( + "users", + sa.sql.column("id", sa.Integer), + sa.sql.column("public_ssh_key", sa.TEXT()), + ) + sshkey_table = sa.sql.table( + "sshkeys", + sa.sql.column("id", sa.Integer), + sa.sql.column("user_id", sa.Integer), + sa.sql.column("public_ssh_key", sa.TEXT()), + sa.sql.column("ssh_short_key", sa.TEXT()), + sa.sql.column("ssh_search_key", sa.TEXT()), + sa.sql.column("creator_user_id", sa.Integer), + sa.sql.column("pushaccess", sa.Boolean), + sa.sql.column("date_created", sa.DateTime), + ) + + op.rename_table("deploykeys", "sshkeys") + + op.add_column("sshkeys", sa.Column("user_id", sa.Integer(), nullable=True)) + op.create_index( + op.f("ix_sshkeys_sshkeys_user_id"), + "sshkeys", + ["user_id"], + unique=False, + ) + op.create_foreign_key( + op.f("sshkeys_user_id_fkey"), + "sshkeys", + "users", + ["user_id"], + ["id"], + onupdate=u"CASCADE", + ) + + print("Convert existing ssh keys to the new format") + + conn = op.get_bind() + for key in conn.execute(sshkey_table.select()): + ssh_short_key = is_valid_ssh_key(key.public_ssh_key).strip() + ssh_search_key = ssh_short_key.split(" ")[1] + op.execute( + sshkey_table.update() + .where(sshkey_table.c.id == key.id) + .values({ + "ssh_short_key": ssh_short_key, + "ssh_search_key": ssh_search_key, + }) + ) + + data = [] + for user in conn.execute(users_table.select()): + if not user.public_ssh_key: + continue + for key in user.public_ssh_key.split("\n"): + if key in (None, False) or not key.strip(): + print("Skipping one key") + continue + ssh_short_key = is_valid_ssh_key(key).strip() + ssh_search_key = ssh_short_key.split(" ")[1] + print("Key: %s" % key) + print("Short: %s" % ssh_short_key) + print("Search: %s" % ssh_search_key) + tmp = {} + tmp["user_id"] = user.id + tmp["creator_user_id"] = user.id + tmp["public_ssh_key"] = key + tmp["ssh_search_key"] = ssh_search_key + tmp["ssh_short_key"] = ssh_short_key + tmp["pushaccess"] = True + tmp['date_created'] = datetime.datetime.utcnow() + data.append(tmp) + + op.bulk_insert(sshkey_table, data) + + op.drop_column("users", "public_ssh_key") + + +def downgrade(): + """ Downgrade the database model for the way we store user's public ssh + keys. + + For this we bring back the keys present in the ``sshkeys`` table and + put them back into the ``public_ssh_key`` column of the ``users`` table. + """ + + users_table = sa.sql.table( + "users", + sa.sql.column("id", sa.Integer), + sa.sql.column("public_ssh_key", sa.TEXT()), + ) + sshkey_table = sa.sql.table( + "sshkeys", + sa.sql.column("user_id", sa.Integer), + sa.sql.column("public_ssh_key", sa.TEXT()), + sa.sql.column("ssh_short_key", sa.TEXT()), + sa.sql.column("ssh_search_key", sa.TEXT()), + sa.sql.column("creator_user_id", sa.Integer), + sa.sql.column("pushaccess", sa.Boolean), + sa.sql.column("date_created", sa.DateTime), + ) + + op.add_column( + "users", sa.Column("public_ssh_key", sa.TEXT(), nullable=True) + ) + + print("Convert existing ssh keys to the old format") + + conn = op.get_bind() + data = [] + for key in conn.execute(sshkey_table.select()): + if not key.user_id: + continue + user = [ + u + for u in conn.execute( + users_table.select().where(users_table.c.id == key.user_id) + ) + ] + user = user[0] + ssh_key = "" + if user.public_ssh_key: + ssh_key = user.public_ssh_key + "\n" + ssh_key += key.public_ssh_key + + op.execute( + users_table.update() + .where(users_table.c.id == key.user_id) + .values({"public_ssh_key": ssh_key}) + ) + + print("Remove the keys associated with users since we moved them") + + op.execute( + sshkey_table.delete() + .where(sshkey_table.c.user_id != None) + ) + + op.drop_constraint( + op.f("sshkeys_user_id_fkey"), "sshkeys", type_="foreignkey" + ) + op.drop_index(op.f("ix_sshkeys_sshkeys_user_id"), table_name="sshkeys") + op.drop_column("sshkeys", "user_id") + + op.rename_table("sshkeys", "deploykeys") diff --git a/files/keyhelper.py b/files/keyhelper.py index 8d9448f..2a018cb 100644 --- a/files/keyhelper.py +++ b/files/keyhelper.py @@ -14,6 +14,8 @@ from __future__ import unicode_literals, print_function import sys import os +import requests + # Since this is run by sshd, we don't have a way to set environment # variables ahead of time if "PAGURE_CONFIG" not in os.environ and os.path.exists( @@ -22,10 +24,7 @@ if "PAGURE_CONFIG" not in os.environ and os.path.exists( os.environ["PAGURE_CONFIG"] = "/etc/pagure/pagure.cfg" # Here starts the code -import pagure -import pagure.lib from pagure.config import config as pagure_config -from pagure.lib.model import User, DeployKey # Get the arguments @@ -62,51 +61,24 @@ if not username_lookup: sys.exit(0) -session = pagure.lib.create_session(pagure_config["DB_URL"]) -if not session: - print("Unable to get database access") +url = "%s/pv/ssh/lookupkey/" % pagure_config["APP_URL"] +data = {"search_key": fingerprint} +if username_lookup: + data["username"] = username +resp = requests.post(url, data=data) +if not resp.status_code == 200: + print( + "Error during lookup request: status: %s" % resp.status_code, + file=sys.stderr, + ) sys.exit(1) +result = resp.json() -# First try to figure out if this is a deploykey. -# We can look those up very quickly, since those are already -# indexed by key fingerprint. -query = session.query(DeployKey).filter( - DeployKey.ssh_search_key == fingerprint -) -for dkey in query.all(): - keyenv = { - "username": "deploykey_%s_%s" - % (werkzeug.secure_filename(dkey.project.fullname), dkey.id) - } - print( - "%s %s" - % (pagure_config["SSH_KEYS_OPTIONS"] % keyenv, dkey.public_ssh_key) - ) +if not result["found"]: + # Everything OK, key just didn't exist. sys.exit(0) - -# Now look if it's a normal user -query = session.query(User) -if username_lookup: - query = query.filter(User.user == username) - -for user in query.all(): - for key in user.public_ssh_key.split("\n"): - # Make slightly more sane - key = key.strip() - # Check if this could even be a valid key - key = key.split(" ") - # Should be at the very least ["", "*', - [wtforms.validators.Required(), ssh_key_validator], - ) - - -class AddDeployKeyForm(PagureForm): - """ Form to add a deploy key to a project. """ +class AddSSHKeyForm(PagureForm): + """ Form to add a SSH key to a user. """ ssh_key = wtforms.TextField( 'SSH Key *', [wtforms.validators.Required()] # TODO: Add an ssh key validator? ) + + +class AddDeployKeyForm(AddSSHKeyForm): + """ Form to add a deploy key to a project. """ + pushaccess = wtforms.BooleanField( "Push access", [wtforms.validators.optional()], diff --git a/pagure/internal/__init__.py b/pagure/internal/__init__.py index 00b911c..67932c7 100644 --- a/pagure/internal/__init__.py +++ b/pagure/internal/__init__.py @@ -18,6 +18,7 @@ import os import flask import pygit2 +import werkzeug from functools import wraps from sqlalchemy.exc import SQLAlchemyError @@ -80,6 +81,32 @@ def localonly(function): return decorated_function +@PV.route("/ssh/lookupkey/", methods=["POST"]) +@localonly +def lookup_ssh_key(): + """ Looks up an SSH key by search_key for keyhelper.py """ + search_key = flask.request.form["search_key"] + username = flask.request.form.get("username") + key = pagure.lib.find_ssh_key(flask.g.session, search_key, username) + + if not key: + return flask.jsonify({"found": False}) + + result = {"found": True, "public_key": key.public_ssh_key} + + if key.user: + result["username"] = key.user.username + elif key.project: + result["username"] = "deploykey_%s_%s" % ( + werkzeug.secure_filename(key.project.fullname), + key.id, + ) + else: + return flask.jsonify({"found": False}) + + return flask.jsonify(result) + + @PV.route("/pull-request/comment/", methods=["PUT"]) @localonly def pull_request_add_comment(): diff --git a/pagure/lib/__init__.py b/pagure/lib/__init__.py index 8a3429f..f57dc37 100644 --- a/pagure/lib/__init__.py +++ b/pagure/lib/__init__.py @@ -228,21 +228,15 @@ def search_user(session, username=None, email=None, token=None, pattern=None): return output -_is_valid_ssh_key_force_md5 = None - - -def is_valid_ssh_key(key): +def is_valid_ssh_key(key, fp_hash="SHA256"): """ Validates the ssh key using ssh-keygen. """ - global _is_valid_ssh_key_force_md5 key = key.strip() if not key: return None with tempfile.TemporaryFile() as f: f.write(key.encode("utf-8")) f.seek(0) - cmd = ["/usr/bin/ssh-keygen", "-l", "-f", "/dev/stdin"] - if _is_valid_ssh_key_force_md5: - cmd.extend(["-E", "md5"]) + cmd = ["/usr/bin/ssh-keygen", "-l", "-f", "/dev/stdin", "-E", fp_hash] proc = subprocess.Popen( cmd, stdin=f, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) @@ -251,30 +245,6 @@ def is_valid_ssh_key(key): return False stdout = stdout.decode("utf-8") - if _is_valid_ssh_key_force_md5 is None: - # We grab the "key ID" portion of the very first key to verify the - # algorithm that's default on this system. - # We always want to use the MD5 hash method, to be consistent and a - # common lower denominator, so if we detect another method being - # default, set a variable so we know we will always need to call with - # -E md5. - # (Unfortunately, the older openSSH versions that had the default to - # MD5 also don't even support the -E argument...) - # Example line: - # with hash: 1024 SHA256:ztcRX... root@test (RSA) - # without : 1024 f9:a2:... key (RSA) - keyparts = stdout.split("\n")[0].split(" ")[1].split(":") - if len(keyparts) == 2 or keyparts[0].upper() in ("MD5", "SHA256"): - # This means that we get a keyid of HASH: rather than just - # , which indicates this is a system that supports multiple - # hash methods. Record this, and recall ourselves. - _is_valid_ssh_key_force_md5 = True - return is_valid_ssh_key(key) - else: - # This means this is a system that does not recognize the -E - # argument, thus we should never pass it. - _is_valid_ssh_key_force_md5 = False - return stdout @@ -285,6 +255,33 @@ def are_valid_ssh_keys(keys): ) +def find_ssh_key(session, search_key, username): + """ Finds and returns SSHKey matching the requested search_key. + + Args: + session: database session + search_key (string): The SSH fingerprint we are requested to look up + username (string or None): If this is provided, the key is looked up + to belong to the requested user. + """ + query = session.query(model.SSHKey).filter( + model.SSHKey.ssh_search_key == search_key + ) + + if username: + userowner = ( + session.query(model.User.id) + .filter(model.User.user == username) + .subquery() + ) + query = query.filter(model.SSHKey.user_id == userowner) + + try: + return query.one() + except sqlalchemy.orm.exc.NoResultFound: + return None + + def create_deploykeys_ssh_keys_on_disk(project, gitolite_keydir): """ Create the ssh keys for the projects' deploy keys on the key dir. @@ -355,22 +352,19 @@ def create_user_ssh_keys_on_disk(user, gitolite_keydir): gitolite_keydir, "keys_%i" % i, "%s.pub" % user.user ) - if not user.public_ssh_key: + if not user.sshkeys: return # Now let's create new keyfiles for the user - keys = user.public_ssh_key.split("\n") - for i in range(len(keys)): - if not keys[i]: - continue - if not is_valid_ssh_key(keys[i]): + for key in user.sshkeys: + if not is_valid_ssh_key(key.public_ssh_key): continue keyline_dir = os.path.join(gitolite_keydir, "keys_%i" % i) if not os.path.exists(keyline_dir): os.mkdir(keyline_dir) keyfile = os.path.join(keyline_dir, "%s.pub" % user.user) with open(keyfile, "w") as stream: - stream.write(keys[i].strip()) + stream.write(key.public_ssh_key.strip()) def add_issue_comment( @@ -1055,51 +1049,65 @@ def edit_issue_tags( return msgs -def add_deploykey_to_project(session, project, ssh_key, pushaccess, user): +def add_sshkey_to_project_or_user( + session, ssh_key, pushaccess, creator, project=None, user=None +): """ Add a deploy key to a specified project. """ + if project is None and user is None: + raise ValueError( + "SSH Keys need to be added to either a project or a user" + ) + if project is not None and user is not None: + raise ValueError("SSH Keys need to be assigned to at least one object") + ssh_key = ssh_key.strip() if "\n" in ssh_key: - raise pagure.exceptions.PagureException( - "Deploy key can only be single keys." - ) + raise pagure.exceptions.PagureException("Please add single SSH keys.") ssh_short_key = is_valid_ssh_key(ssh_key) if ssh_short_key in [None, False]: - raise pagure.exceptions.PagureException("Deploy key invalid.") + raise pagure.exceptions.PagureException("SSH key invalid.") # We are sure that this only contains a single key, but ssh-keygen still - # return a \n at the end - ssh_short_key = ssh_short_key.split("\n")[0] + # returns a \n at the end + ssh_short_key = ssh_short_key.strip() + if "\n" in ssh_key: + raise pagure.exceptions.PagureException( + "SSH has misbehaved when analyzing the SSH key" + ) - # Make sure that this key is not a deploy key in this or another project. - # If we dupe keys, gitolite might choke. + # Make sure that this key is not an SSH key for another project or user. + # If we dupe keys, we can't really know who this is for. ssh_search_key = ssh_short_key.split(" ")[1] if ( - session.query(model.DeployKey) - .filter(model.DeployKey.ssh_search_key == ssh_search_key) + session.query(model.SSHKey) + .filter(model.SSHKey.ssh_search_key == ssh_search_key) .count() != 0 ): - raise pagure.exceptions.PagureException("Deploy key already exists.") + raise pagure.exceptions.PagureException("SSH key already exists.") - user_obj = get_user(session, user) - new_key_obj = model.DeployKey( - project_id=project.id, + new_key_obj = model.SSHKey( pushaccess=pushaccess, public_ssh_key=ssh_key, ssh_short_key=ssh_short_key, ssh_search_key=ssh_search_key, - creator_user_id=user_obj.id, + creator_user_id=creator.id, ) + if project: + new_key_obj.project = project + if user: + new_key_obj.user_id = user.id + session.add(new_key_obj) # Make sure we won't have SQLAlchemy error before we continue session.flush() # We do not send any notifications on purpose - return "Deploy key added" + return "SSH key added" def add_user_to_project( @@ -3492,7 +3500,7 @@ def set_up_user( except pagure.exceptions.PagureException as err: _log.exception(err) - if ssh_key and not user.public_ssh_key: + if ssh_key: update_user_ssh(session, user, ssh_key, keydir) return user @@ -3535,8 +3543,21 @@ def update_user_ssh(session, user, ssh_key, keydir, update_only=False): if isinstance(user, six.string_types): user = get_user(session, user) - user.public_ssh_key = ssh_key - if keydir and user.public_ssh_key: + if ssh_key: + for key in user.sshkeys: + key.delete() + for key in ssh_key.strip().split("\n"): + key = key.strip() + pagure.lib.add_sshkey_to_project_or_user( + session=session, + ssh_key=key, + user=user, + pushaccess=True, + creator=user, + ) + session.commit() + + if keydir: create_user_ssh_keys_on_disk(user, keydir) if update_only: pagure.lib.tasks.gitolite_post_compile_only.delay() diff --git a/pagure/lib/model.py b/pagure/lib/model.py index 0759b96..cecd3c0 100644 --- a/pagure/lib/model.py +++ b/pagure/lib/model.py @@ -32,6 +32,7 @@ from sqlalchemy.orm import backref from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import scoped_session from sqlalchemy.orm import relation +from sqlalchemy.orm import validates import pagure.exceptions from pagure.config import config as pagure_config @@ -199,7 +200,6 @@ class User(BASE): id = sa.Column(sa.Integer, primary_key=True) user = sa.Column(sa.String(255), nullable=False, unique=True, index=True) fullname = sa.Column(sa.String(255), nullable=False, index=True) - public_ssh_key = sa.Column(sa.Text, nullable=True) default_email = sa.Column(sa.Text, nullable=False) _settings = sa.Column(sa.Text, nullable=True) @@ -1115,22 +1115,35 @@ class ProjectUser(BASE): user = relation("User", backref="user_projects") -class DeployKey(BASE): - """ Stores information about deployment keys. +class SSHKey(BASE): + """ Stores information about SSH keys. - Table -- deploykeys + Every instance needs to either have user_id set (SSH key for a specific + user) or project_id ("deploy key" for a specific project). + + Table -- sshkeys """ - __tablename__ = "deploykeys" + __tablename__ = "sshkeys" id = sa.Column(sa.Integer, primary_key=True) project_id = sa.Column( sa.Integer, sa.ForeignKey("projects.id", onupdate="CASCADE", ondelete="CASCADE"), + nullable=True, + index=True, ) pushaccess = sa.Column(sa.Boolean, nullable=False, default=False) + user_id = sa.Column( + sa.Integer, + sa.ForeignKey("users.id", onupdate="CASCADE"), + nullable=True, + index=True, + ) public_ssh_key = sa.Column(sa.Text, nullable=False) ssh_short_key = sa.Column(sa.Text, nullable=False) - ssh_search_key = sa.Column(sa.Text, nullable=False) + ssh_search_key = sa.Column( + sa.Text, nullable=False, index=True, unique=True + ) creator_user_id = sa.Column( sa.Integer, sa.ForeignKey("users.id", onupdate="CASCADE"), @@ -1141,6 +1154,23 @@ class DeployKey(BASE): sa.DateTime, nullable=False, default=datetime.datetime.utcnow ) + # Validations + # These two validators are intended to make sure an SSHKey is either + # assigned to a Project or a User, but not both. + @validates("project_id") + def validate_project_id(self, key, value): + """ Validates that user_id is not set. """ + if self.user_id is not None: + raise ValueError("SSHKey can't have both project and user") + return value + + @validates("user_id") + def validate_user_id(self, key, value): + """ Validates that project_id is not set. """ + if self.project_id is not None: + raise ValueError("SSHKey can't have both user and project") + return value + # Relations project = relation( "Project", @@ -1151,6 +1181,15 @@ class DeployKey(BASE): ), ) + user = relation( + "User", + foreign_keys=[user_id], + remote_side=[User.id], + backref=backref( + "sshkeys", cascade="delete, delete-orphan", single_parent=True + ), + ) + creator_user = relation( "User", foreign_keys=[creator_user_id], remote_side=[User.id] ) diff --git a/pagure/templates/add_sshkey.html b/pagure/templates/add_sshkey.html new file mode 100644 index 0000000..81b51d0 --- /dev/null +++ b/pagure/templates/add_sshkey.html @@ -0,0 +1,32 @@ +{% extends "master.html" %} + +{% set tag = "home" %} + +{% block title %}Add SSH key{% endblock %} + +{% block content %} +
+
+
+
+
+ Add SSH key +
+
+
+
+ + +
+

+ + + {{ form.csrf_token }} +

+
+
+
+
+
+
+{% endblock %} diff --git a/pagure/templates/user_settings.html b/pagure/templates/user_settings.html index c3655b7..2614557 100644 --- a/pagure/templates/user_settings.html +++ b/pagure/templates/user_settings.html @@ -165,11 +165,11 @@
{% for acl in token.acls_list %} - {% if acl == 'create_project' %} + {% if acl == 'create_project' %} - {% elif acl == 'fork_project' %} + {% elif acl == 'fork_project' %} - {% elif acl == 'modify_project' %} + {% elif acl == 'modify_project' %} {% endif %} {% endfor %} @@ -212,19 +212,37 @@ diff --git a/pagure/ui/app.py b/pagure/ui/app.py index b49c908..57c616b 100644 --- a/pagure/ui/app.py +++ b/pagure/ui/app.py @@ -1098,8 +1098,8 @@ def wait_task(taskid): ) -@UI_NS.route("/settings/", methods=("GET", "POST")) -@UI_NS.route("/settings", methods=("GET", "POST")) +@UI_NS.route("/settings/") +@UI_NS.route("/settings") @login_required def user_settings(): """ Update the user settings. @@ -1110,31 +1110,7 @@ def user_settings(): ) user = _get_user(username=flask.g.fas_user.username) - - form = pagure.forms.UserSettingsForm() - if form.validate_on_submit() and pagure_config.get("LOCAL_SSH_KEY", True): - ssh_key = form.ssh_key.data - - try: - message = "Nothing to update" - if user.public_ssh_key != ssh_key: - pagure.lib.update_user_ssh( - flask.g.session, - user=user, - ssh_key=ssh_key, - keydir=pagure_config.get("GITOLITE_KEYDIR", None), - update_only=True, - ) - flask.g.session.commit() - message = "Public ssh key updated" - flask.flash(message) - return flask.redirect(flask.url_for("ui_ns.user_settings")) - except SQLAlchemyError as err: # pragma: no cover - flask.g.session.rollback() - flask.flash(str(err), "error") - elif flask.request.method == "GET": - form.ssh_key.data = user.public_ssh_key - + form = pagure.forms.ConfirmationForm() return flask.render_template("user_settings.html", user=user, form=form) @@ -1177,6 +1153,94 @@ def update_user_settings(): return flask.redirect(flask.url_for("ui_ns.user_settings")) +@UI_NS.route("/settings/usersettings/addkey", methods=["GET", "POST"]) +@login_required +def add_user_sshkey(): + """ Add the specified SSH key to the user. + """ + if admin_session_timedout(): + if flask.request.method == "POST": + flask.flash("Action canceled, try it again", "error") + return flask.redirect( + flask.url_for("auth_login", next=flask.request.url) + ) + + form = pagure.forms.AddSSHKeyForm() + + if form.validate_on_submit(): + try: + msg = pagure.lib.add_sshkey_to_project_or_user( + flask.g.session, + ssh_key=form.ssh_key.data, + pushaccess=True, + creator=flask.g.fas_user, + user=flask.g.fas_user, + ) + flask.g.session.commit() + pagure.lib.create_user_ssh_keys_on_disk( + flask.g.fas_user, pagure_config.get("GITOLITE_KEYDIR", None) + ) + pagure.lib.tasks.gitolite_post_compile_only.delay() + flask.flash(msg) + return flask.redirect( + flask.url_for("ui_ns.user_settings") + "#nav-ssh-tab" + ) + except pagure.exceptions.PagureException as msg: + flask.g.session.rollback() + _log.debug(msg) + flask.flash(str(msg), "error") + except SQLAlchemyError as err: # pragma: no cover + flask.g.session.rollback() + _log.exception(err) + flask.flash("SSH key could not be added", "error") + + return flask.render_template("add_sshkey.html", form=form) + + +@UI_NS.route("/settings/usersettings/removekey/", methods=["POST"]) +@login_required +def remove_user_sshkey(keyid): + """ Removes an SSH key from the user. + """ + if admin_session_timedout(): + if flask.request.method == "POST": + flask.flash("Action canceled, try it again", "error") + return flask.redirect( + flask.url_for("auth_login", next=flask.request.url) + ) + form = pagure.forms.ConfirmationForm() + if form.validate_on_submit(): + found = False + user = pagure.lib.get_user(flask.g.session, flask.g.fas_user.username) + for key in user.sshkeys: + if key.id == keyid: + flask.g.session.delete(key) + found = True + break + + if not found: + flask.flash("SSH key does not exist in user.", "error") + return flask.redirect( + flask.url_for("ui_ns.user_settings") + "#nav-ssh-tab" + ) + + try: + flask.g.session.commit() + pagure.lib.create_user_ssh_keys_on_disk( + flask.g.fas_user, pagure_config.get("GITOLITE_KEYDIR", None) + ) + pagure.lib.tasks.gitolite_post_compile_only.delay() + flask.flash("SSH key removed") + except SQLAlchemyError as err: # pragma: no cover + flask.g.session.rollback() + _log.exception(err) + flask.flash("SSH key could not be removed", "error") + + return flask.redirect( + flask.url_for("ui_ns.user_settings") + "#nav-ssh-tab" + ) + + @UI_NS.route("/markdown/", methods=["POST"]) def markdown_preview(): """ Return the provided markdown text in html. diff --git a/pagure/ui/filters.py b/pagure/ui/filters.py index c8f56bd..8fff167 100644 --- a/pagure/ui/filters.py +++ b/pagure/ui/filters.py @@ -759,19 +759,18 @@ def join_prefix(values, num): @UI_NS.app_template_filter("user_can_clone_ssh") def user_can_clone_ssh(username): - ssh_keys = "" + has_ssh_keys = False if flask.g.authenticated: - ssh_keys = ( - pagure.lib.search_user( - flask.g.session, username=flask.g.fas_user.username - ).public_ssh_key - or "" + has_ssh_keys = ( + len( + pagure.lib.search_user( + flask.g.session, username=flask.g.fas_user.username + ).sshkeys + ) + != 0 ) - if not ( - pagure_config.get("ALWAYS_RENDER_SSH_CLONE_URL") or ssh_keys.strip() - ): - return False - return True + always_render = pagure_config.get("ALWAYS_RENDER_SSH_CLONE_URL") + return always_render or has_ssh_keys @UI_NS.app_template_filter("git_url_ssh") diff --git a/pagure/ui/repo.py b/pagure/ui/repo.py index e2635e4..b9fb770 100644 --- a/pagure/ui/repo.py +++ b/pagure/ui/repo.py @@ -1731,10 +1731,14 @@ def remove_deploykey(repo, keyid, username=None, namespace=None): form = pagure.forms.ConfirmationForm() if form.validate_on_submit(): - keyids = ["%s" % key.id for key in repo.deploykeys] - keyid = "%s" % keyid + found = False + for key in repo.deploykeys: + if key.id == keyid: + flask.g.session.delete(key) + found = True + break - if keyid not in keyids: + if not found: flask.flash("Deploy key does not exist in project.", "error") return flask.redirect( flask.url_for( @@ -1746,10 +1750,6 @@ def remove_deploykey(repo, keyid, username=None, namespace=None): + "#deploykeys-tab" ) - for key in repo.deploykeys: - if "%s" % key.id == keyid: - flask.g.session.delete(key) - break try: flask.g.session.commit() pagure.lib.create_deploykeys_ssh_keys_on_disk( @@ -1860,12 +1860,12 @@ def add_deploykey(repo, username=None, namespace=None): if form.validate_on_submit(): try: - msg = pagure.lib.add_deploykey_to_project( + msg = pagure.lib.add_sshkey_to_project_or_user( flask.g.session, - repo, ssh_key=form.ssh_key.data, + creator=flask.g.fas_user, + project=repo, pushaccess=form.pushaccess.data, - user=flask.g.fas_user.username, ) flask.g.session.commit() pagure.lib.create_deploykeys_ssh_keys_on_disk( diff --git a/tests/test_pagure_flask_internal.py b/tests/test_pagure_flask_internal.py index 338ef0d..402e3c4 100644 --- a/tests/test_pagure_flask_internal.py +++ b/tests/test_pagure_flask_internal.py @@ -2874,6 +2874,86 @@ class PagureFlaskInternaltests(tests.Modeltests): js_data = json.loads(output.get_data(as_text=True)) self.assertEqual(js_data, {u'results': u'Random error'}) + def test_lookup_ssh_key(self): + """ Test the mergeable_request_pull endpoint when the backend + raises an GitError exception. + """ + tests.create_projects(self.session) + + repo = pagure.lib.get_authorized_project(self.session, 'test') + project_key = 'ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIC4zmifEL8TLLZUZnjAuVL8495DAkpAAM2eBhwHwawBm' + project_key_fp = 'SHA256:ZSUQAqpPDWi90Fs6Ow8Epc8F3qiKVfU+H5ssvo7jiI0' + + pingou = pagure.lib.get_user(self.session, 'pingou') + user_key = 'ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIDTsfdTcXw4rlU1aQwOTbLOXqossLwpPIk27S/G17kUz' + user_key_fp = 'SHA256:jUJHzrq2Ct6Ubf7Y9rnB6tGnbHM9dMVsveyfPojm+i0' + + pagure.lib.add_sshkey_to_project_or_user( + self.session, + user_key, + pushaccess=True, + creator=pingou, + user=pingou, + ) + pagure.lib.add_sshkey_to_project_or_user( + self.session, + project_key, + pushaccess=True, + creator=pingou, + project=repo, + ) + self.session.commit() + + url = '/pv/ssh/lookupkey/' + + output = self.app.post( + url, + data={'search_key': 'asdf'}, + ) + self.assertEqual(output.status_code, 200) + result = json.loads(output.get_data(as_text=True)) + self.assertEqual(result['found'], False) + + output = self.app.post( + url, + data={'search_key': user_key_fp}, + ) + self.assertEqual(output.status_code, 200) + result = json.loads(output.get_data(as_text=True)) + self.assertEqual(result['found'], True) + self.assertEqual(result['username'], 'pingou') + self.assertEqual(result['public_key'], user_key) + + output = self.app.post( + url, + data={'search_key': user_key_fp, + 'username': 'pingou'}, + ) + self.assertEqual(output.status_code, 200) + result = json.loads(output.get_data(as_text=True)) + self.assertEqual(result['found'], True) + self.assertEqual(result['username'], 'pingou') + self.assertEqual(result['public_key'], user_key) + + output = self.app.post( + url, + data={'search_key': user_key_fp, + 'username': 'foo'}, + ) + self.assertEqual(output.status_code, 200) + result = json.loads(output.get_data(as_text=True)) + self.assertEqual(result['found'], False) + + output = self.app.post( + url, + data={'search_key': project_key_fp}, + ) + self.assertEqual(output.status_code, 200) + result = json.loads(output.get_data(as_text=True)) + self.assertEqual(result['found'], True) + self.assertEqual(result['username'], 'deploykey_test_2') + self.assertEqual(result['public_key'], project_key) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/tests/test_pagure_flask_ui_app.py b/tests/test_pagure_flask_ui_app.py index 830648e..1aca307 100644 --- a/tests/test_pagure_flask_ui_app.py +++ b/tests/test_pagure_flask_ui_app.py @@ -686,139 +686,137 @@ class PagureFlaskApptests(tests.Modeltests): output_text = output.get_data(as_text=True) self.assertIn( 'foo\'s settings - Pagure', output_text) - if self.get_wtforms_version() >= (2, 2): - self.assertIn( - '', - output_text) - else: - self.assertIn( - '', output_text) - csrf_token = self.get_csrf(output=output) + ast.return_value = True + output = self.app.get('/settings/') + self.assertEqual(output.status_code, 302) - data = { - 'ssh_key': 'blah' - } + @patch('pagure.decorators.admin_session_timedout') + def test_add_sshkey(self, ast): + """ Test the add_sshkey endpoint. """ + ast.return_value = False - output = self.app.post('/settings/', data=data) - self.assertEqual(output.status_code, 200) - output_text = output.get_data(as_text=True) - self.assertIn( - 'foo\'s settings - Pagure', output_text) + # User not logged in + output = self.app.get('/settings/usersettings/addkey') + self.assertEqual(output.status_code, 302) - data['csrf_token'] = csrf_token + ast.return_value = False - output = self.app.post( - '/settings/', data=data, follow_redirects=True) + user = tests.FakeUser(username='pingou') + with tests.user_set(self.app.application, user): + output = self.app.get('/settings/usersettings/addkey') self.assertEqual(output.status_code, 200) output_text = output.get_data(as_text=True) - self.assertIn('Invalid SSH keys', output_text) - self.assertIn( - 'foo\'s settings - Pagure', output_text) - self.assertIn('>blah', output_text) + self.assertIn('Add SSH key', output_text) csrf_token = output_text.split( 'name="csrf_token" type="hidden" value="')[1].split('">')[0] data = { - 'ssh_key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQDUkub32fZnNI' - '1zJYs43vhhx3c6IcYo4yzhw1gQ37BLhrrNeS6x8l5PKX4J8ZP5' - '1XhViPaLbeOpl94Vm5VSCbLy0xtY9KwLhMkbKj7g6vvfxLm2sT' - 'Osb15j4jzIkUYYgIE7cHhZMCLWR6UA1c1HEzo6mewMDsvpQ9wk' - 'cDnAuXjK3Q==', - 'csrf_token': csrf_token + 'ssh_key': 'asdf', } + # No CSRF token + output = self.app.post('/settings/usersettings/addkey', data=data) + self.assertEqual(output.status_code, 200) + output_text = output.get_data(as_text=True) + self.assertIn('Add SSH key', output_text) + + data['csrf_token'] = csrf_token + + # First, invalid SSH key + output = self.app.post('/settings/usersettings/addkey', data=data) + self.assertEqual(output.status_code, 200) + output_text = output.get_data(as_text=True) + self.assertIn('Add SSH key', output_text) + self.assertIn('SSH key invalid', output_text) + + # Next up, multiple SSH keys + data['ssh_key'] = 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQDAzBMSIlvPRaEiLOTVInErkRIw9CzQQcnslDekAn1jFnGf+SNa1acvbTiATbCX71AA03giKrPxPH79dxcC7aDXerc6zRcKjJs6MAL9PrCjnbyxCKXRNNZU5U9X/DLaaL1b3caB+WD6OoorhS3LTEtKPX8xyjOzhf3OQSzNjhJp5Q==\nssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQDAzBMSIlvPRaEiLOTVInErkRIw9CzQQcnslDekAn1jFnGf+SNa1acvbTiATbCX71AA03giKrPxPH79dxcC7aDXerc6zRcKjJs6MAL9PrCjnbyxCKXRNNZU5U9X/DLaaL1b3caB+WD6OoorhS3LTEtKPX8xyjOzhf3OQSzNjhJp5Q==' output = self.app.post( - '/settings/', data=data, follow_redirects=True) + '/settings/usersettings/addkey', data=data, follow_redirects=True) self.assertEqual(output.status_code, 200) output_text = output.get_data(as_text=True) - self.assertIn('Public ssh key updated', output_text) - self.assertIn( - 'foo\'s settings - Pagure', output_text) - if self.get_wtforms_version() >= (2, 2): - self.assertIn( - '', - output_text) - else: - self.assertIn( - '', output_text) csrf_token = self.get_csrf(output=output) @@ -991,20 +980,11 @@ class PagureFlaskApptests(tests.Modeltests): user.username = 'foo' with tests.user_set(self.app.application, user): - output = self.app.post('/settings/') + output = self.app.get('/settings/') self.assertEqual(output.status_code, 200) output_text = output.get_data(as_text=True) self.assertIn( 'foo\'s settings - Pagure', output_text) - if self.get_wtforms_version() >= (2, 2): - self.assertIn( - '', - output_text) - else: - self.assertIn( - '', output_text) csrf_token = self.get_csrf(output=output) @@ -1024,20 +1004,11 @@ class PagureFlaskApptests(tests.Modeltests): user.username = 'pingou' with tests.user_set(self.app.application, user): - output = self.app.post('/settings/') + output = self.app.get('/settings/') self.assertEqual(output.status_code, 200) output_text = output.get_data(as_text=True) self.assertIn( 'pingou\'s settings - Pagure', output_text) - if self.get_wtforms_version() >= (2, 2): - self.assertIn( - '', - output_text) - else: - self.assertIn( - '', output_text) csrf_token = self.get_csrf(output=output) @@ -1245,15 +1216,6 @@ class PagureFlaskApptests(tests.Modeltests): output_text = output.get_data(as_text=True) self.assertIn( 'pingou\'s settings - Pagure', output_text) - if self.get_wtforms_version() >= (2, 2): - self.assertIn( - '', - output_text) - else: - self.assertIn( - '', output_text) csrf_token = self.get_csrf(output=output) @@ -1342,15 +1304,6 @@ class PagureFlaskApptests(tests.Modeltests): output_text = output.get_data(as_text=True) self.assertIn( 'pingou\'s settings - Pagure', output_text) - if self.get_wtforms_version() >= (2, 2): - self.assertIn( - '', - output_text) - else: - self.assertIn( - '', output_text) csrf_token = self.get_csrf(output=output) diff --git a/tests/test_pagure_flask_ui_issues.py b/tests/test_pagure_flask_ui_issues.py index 6977406..6746b3a 100644 --- a/tests/test_pagure_flask_ui_issues.py +++ b/tests/test_pagure_flask_ui_issues.py @@ -3225,7 +3225,14 @@ class PagureFlaskIssuestests(tests.Modeltests): repo = pagure.lib._get_project(self.session, 'test') pagure.lib.update_read_only_mode(self.session, repo, read_only=False) - pagure.lib.get_user(self.session, 'pingou').public_ssh_key = 'foo' + pingou = pagure.lib.get_user(self.session, 'pingou') + pagure.lib.add_sshkey_to_project_or_user( + session=self.session, + user=pingou, + ssh_key='ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQDAzBMSIlvPRaEiLOTVInErkRIw9CzQQcnslDekAn1jFnGf+SNa1acvbTiATbCX71AA03giKrPxPH79dxcC7aDXerc6zRcKjJs6MAL9PrCjnbyxCKXRNNZU5U9X/DLaaL1b3caB+WD6OoorhS3LTEtKPX8xyjOzhf3OQSzNjhJp5Q==', + pushaccess=True, + creator=pingou, + ) self.session.commit() with tests.user_set(self.app.application, user): # Check that the git issue URL is present diff --git a/tests/test_pagure_flask_ui_repo.py b/tests/test_pagure_flask_ui_repo.py index 7c31536..12a4f39 100644 --- a/tests/test_pagure_flask_ui_repo.py +++ b/tests/test_pagure_flask_ui_repo.py @@ -167,7 +167,7 @@ class PagureFlaskRepotests(tests.Modeltests): self.assertEqual(output.status_code, 200) output_text = output.get_data(as_text=True) self.assertIn('Add deploy key to the', output_text) - self.assertIn('Deploy key invalid', output_text) + self.assertIn('SSH key invalid', output_text) # Next up, multiple SSH keys data['ssh_key'] = 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQDAzBMSIlvPRaEiLOTVInErkRIw9CzQQcnslDekAn1jFnGf+SNa1acvbTiATbCX71AA03giKrPxPH79dxcC7aDXerc6zRcKjJs6MAL9PrCjnbyxCKXRNNZU5U9X/DLaaL1b3caB+WD6OoorhS3LTEtKPX8xyjOzhf3OQSzNjhJp5Q==\nssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQDAzBMSIlvPRaEiLOTVInErkRIw9CzQQcnslDekAn1jFnGf+SNa1acvbTiATbCX71AA03giKrPxPH79dxcC7aDXerc6zRcKjJs6MAL9PrCjnbyxCKXRNNZU5U9X/DLaaL1b3caB+WD6OoorhS3LTEtKPX8xyjOzhf3OQSzNjhJp5Q==' @@ -175,7 +175,7 @@ class PagureFlaskRepotests(tests.Modeltests): '/test/adddeploykey', data=data, follow_redirects=True) self.assertEqual(output.status_code, 200) output_text = output.get_data(as_text=True) - self.assertIn('Deploy key can only be single keys.', output_text) + self.assertIn('Please add single SSH keys.', output_text) # Now, a valid SSH key data['ssh_key'] = 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQDAzBMSIlvPRaEiLOTVInErkRIw9CzQQcnslDekAn1jFnGf+SNa1acvbTiATbCX71AA03giKrPxPH79dxcC7aDXerc6zRcKjJs6MAL9PrCjnbyxCKXRNNZU5U9X/DLaaL1b3caB+WD6OoorhS3LTEtKPX8xyjOzhf3OQSzNjhJp5Q==' @@ -186,7 +186,7 @@ class PagureFlaskRepotests(tests.Modeltests): self.assertIn( 'Settings - test - Pagure', output_text) self.assertIn('
Project Settings
', output_text) - self.assertIn('Deploy key added', output_text) + self.assertIn('SSH key added', output_text) self.assertNotIn('Push Access', output_text) # And now, adding the same key @@ -194,7 +194,7 @@ class PagureFlaskRepotests(tests.Modeltests): '/test/adddeploykey', data=data, follow_redirects=True) self.assertEqual(output.status_code, 200) output_text = output.get_data(as_text=True) - self.assertIn('Deploy key already exists', output_text) + self.assertIn('SSH key already exists', output_text) # And next, a key with push access data['ssh_key'] = 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC9Xwc2RDzPBhlEDARfHldGjudIVoa04tqT1JVKGQmyllTFz7Rb8CngQL3e7zyNzotnhwYKHdoiLlPkVEiDee4dWMUe48ilqId+FJZQGhyv8fu4BoFdE1AJUVylzmltbLg14VqG5gjTpXgtlrEva9arKwBMHJjRYc8ScaSn3OgyQw==' @@ -206,7 +206,7 @@ class PagureFlaskRepotests(tests.Modeltests): self.assertIn( 'Settings - test - Pagure', output_text) self.assertIn('
Project Settings
', output_text) - self.assertIn('Deploy key added', output_text) + self.assertIn('SSH key added', output_text) self.assertIn('Push Access', output_text) @patch('pagure.decorators.admin_session_timedout') @@ -616,15 +616,15 @@ class PagureFlaskRepotests(tests.Modeltests): # Add a deploy key to a project repo = pagure.lib.get_authorized_project(self.session, 'test') - msg = pagure.lib.add_deploykey_to_project( + msg = pagure.lib.add_sshkey_to_project_or_user( session=self.session, project=repo, ssh_key='ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQDAzBMSIlvPRaEiLOTVInErkRIw9CzQQcnslDekAn1jFnGf+SNa1acvbTiATbCX71AA03giKrPxPH79dxcC7aDXerc6zRcKjJs6MAL9PrCjnbyxCKXRNNZU5U9X/DLaaL1b3caB+WD6OoorhS3LTEtKPX8xyjOzhf3OQSzNjhJp5Q==', pushaccess=True, - user='pingou', + creator=user, ) self.session.commit() - self.assertEqual(msg, 'Deploy key added') + self.assertEqual(msg, 'SSH key added') with tests.user_set(self.app.application, user): output = self.app.post('/test/dropdeploykey/1', follow_redirects=True) @@ -1544,7 +1544,15 @@ class PagureFlaskRepotests(tests.Modeltests): tests.create_projects(self.session) tests.create_projects_git(os.path.join(self.path, 'repos'), bare=True) - pagure.lib.get_user(self.session, 'pingou').public_ssh_key = 'foo' + pingou = pagure.lib.get_user(self.session, 'pingou') + pagure.lib.add_sshkey_to_project_or_user( + session=self.session, + user=pingou, + ssh_key='ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQDAzBMSIlvPRaEiLOTVInErkRIw9CzQQcnslDekAn1jFnGf+SNa1acvbTiATbCX71AA03giKrPxPH79dxcC7aDXerc6zRcKjJs6MAL9PrCjnbyxCKXRNNZU5U9X/DLaaL1b3caB+WD6OoorhS3LTEtKPX8xyjOzhf3OQSzNjhJp5Q==', + pushaccess=True, + creator=pingou, + ) + self.session.commit() repo = pagure.lib._get_project(self.session, 'test') pagure.lib.update_read_only_mode(self.session, repo, read_only=False) self.session.commit() @@ -1613,7 +1621,14 @@ class PagureFlaskRepotests(tests.Modeltests): tests.create_projects_git(os.path.join(self.path, 'repos'), bare=True) repo = pagure.lib._get_project(self.session, 'test') pagure.lib.update_read_only_mode(self.session, repo, read_only=True) - pagure.lib.get_user(self.session, 'pingou').public_ssh_key = 'foo' + pingou = pagure.lib.get_user(self.session, 'pingou') + pagure.lib.add_sshkey_to_project_or_user( + session=self.session, + user=pingou, + ssh_key='ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQDAzBMSIlvPRaEiLOTVInErkRIw9CzQQcnslDekAn1jFnGf+SNa1acvbTiATbCX71AA03giKrPxPH79dxcC7aDXerc6zRcKjJs6MAL9PrCjnbyxCKXRNNZU5U9X/DLaaL1b3caB+WD6OoorhS3LTEtKPX8xyjOzhf3OQSzNjhJp5Q==', + pushaccess=True, + creator=pingou, + ) self.session.commit() user = tests.FakeUser(username='pingou') diff --git a/tests/test_pagure_lib.py b/tests/test_pagure_lib.py index 0c0697f..875e6b7 100644 --- a/tests/test_pagure_lib.py +++ b/tests/test_pagure_lib.py @@ -2066,7 +2066,7 @@ class PagureLibtests(tests.Modeltests): fullname='Seth', default_email='skvidal@fp.o', keydir=pagure.config.config.get('GITOLITE_KEYDIR', None), - ssh_key='foo key', + ssh_key='ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQDtgzSO9d1IrKdmyBFUvtAJPLgGOhp0lSySkWRSe+/+3KXYjSnsLnCJQlO5M7JfaXhtTHEow86rh4W9+FoJdzo5iocAwH5xPZ5ttHLy7VHgTzNMUeMgKpjy6bBOdPoGPPG4mo7QCMCRJdWBRDv4OSEMLU5jQAvC272YK2V8L918VQ== root@test', ) self.session.commit() @@ -2151,7 +2151,6 @@ class PagureLibtests(tests.Modeltests): fullname='Seth', default_email='skvidal@fp.o', keydir=pagure.config.config.get('GITOLITE_KEYDIR', None), - ssh_key='foo key', ) self.session.commit() @@ -2188,35 +2187,6 @@ class PagureLibtests(tests.Modeltests): sorted(['skvidal@fp.o', 'skvidal@example.c']), sorted([email.email for email in items[2].emails])) - def test_update_user_ssh(self): - """ Test the update_user_ssh of pagure.lib. """ - - # Before - user = pagure.lib.search_user(self.session, username='foo') - self.assertEqual(user.public_ssh_key, None) - - msg = pagure.lib.update_user_ssh(self.session, user, 'blah', keydir=None) - user = pagure.lib.search_user(self.session, username='foo') - self.assertEqual(user.public_ssh_key, 'blah') - - msg = pagure.lib.update_user_ssh(self.session, user, 'blah', keydir=None) - user = pagure.lib.search_user(self.session, username='foo') - self.assertEqual(user.public_ssh_key, 'blah') - - msg = pagure.lib.update_user_ssh(self.session, 'foo', None, keydir=None) - user = pagure.lib.search_user(self.session, username='foo') - self.assertEqual(user.public_ssh_key, None) - - @patch('pagure.lib.tasks.gitolite_post_compile_only.delay') - def test_update_user_ssh_update_only(self, gitolite_post_compile_only): - """ Test that update_user_ssh method called with update_only=True - calls the gitolite_post_compile_only method of helper. """ - user = pagure.lib.search_user(self.session, username='foo') - msg = pagure.lib.update_user_ssh( - self.session, user, 'blah', keydir='/tmp', update_only=True - ) - gitolite_post_compile_only.assert_called_once() - def avatar_url_from_email(self): """ Test the avatar_url_from_openid of pagure.lib. """ output = pagure.lib.avatar_url_from_email('pingou@fedoraproject.org') @@ -5160,11 +5130,8 @@ foo bar self.assertEqual(keys[1], '') key = keys[0].split(' ') self.assertEqual(key[0], '1024') - # We should always get the MD5 version - if key[1].startswith('MD5'): - self.assertEqual(key[1], 'MD5:f9:a2:14:97:a5:42:78:f7:16:f8:fb:73:ba:f0:f4:fe') - else: - self.assertEqual(key[1], 'f9:a2:14:97:a5:42:78:f7:16:f8:fb:73:ba:f0:f4:fe') + # We should always get the SHA256 version + self.assertEqual(key[1], 'SHA256:ztcRXIq7y/HNfwwscrexCyDF46/Pn/oHTkBZx87GwI8') self.assertEqual(key[3], '(RSA)') def test_create_deploykeys_ssh_keys_on_disk_empty(self): @@ -5197,14 +5164,15 @@ foo bar project = pagure.lib._get_project(self.session, 'test') # Add a deploy key to the project - msg = pagure.lib.add_deploykey_to_project( + pingou = pagure.lib.get_user(self.session, 'pingou') + msg = pagure.lib.add_sshkey_to_project_or_user( self.session, project=project, ssh_key='foo bar', pushaccess=False, - user='pingou' + creator=pingou, ) - self.assertEqual(msg, 'Deploy key added') + self.assertEqual(msg, 'SSH key added') self.assertIsNone( pagure.lib.create_deploykeys_ssh_keys_on_disk( @@ -5237,7 +5205,7 @@ foo bar project = pagure.lib._get_project(self.session, 'test') # Add a deploy key to the project - new_key_obj = pagure.lib.model.DeployKey( + new_key_obj = pagure.lib.model.SSHKey( project_id=project.id, pushaccess=False, public_ssh_key='\n foo bar', @@ -5265,7 +5233,7 @@ foo bar project = pagure.lib._get_project(self.session, 'test') # Add a deploy key to the project - new_key_obj = pagure.lib.model.DeployKey( + new_key_obj = pagure.lib.model.SSHKey( project_id=project.id, pushaccess=False, public_ssh_key='foo bar', @@ -5291,48 +5259,6 @@ foo bar self.assertIsNone( pagure.lib.create_user_ssh_keys_on_disk(None, None)) - def test_create_user_ssh_keys_on_disk_no_key(self): - """ Test the create_user_ssh_keys_on_disk function of pagure.lib. """ - user = pagure.lib.get_user(self.session, 'foo') - - self.assertIsNone( - pagure.lib.create_user_ssh_keys_on_disk(user, self.path)) - - def test_create_user_ssh_keys_on_disk_invalid_key(self): - """ Test the create_user_ssh_keys_on_disk function of pagure.lib. """ - user = pagure.lib.get_user(self.session, 'foo') - user.public_ssh_key = 'foo\n bar' - self.session.add(user) - self.session.commit() - - self.assertIsNone( - pagure.lib.create_user_ssh_keys_on_disk(user, self.path)) - - def test_create_user_ssh_keys_on_disk_empty_first_key(self): - """ Test the create_user_ssh_keys_on_disk function of pagure.lib. """ - user = pagure.lib.get_user(self.session, 'foo') - user.public_ssh_key = '\nbar' - self.session.add(user) - self.session.commit() - - self.assertIsNone( - pagure.lib.create_user_ssh_keys_on_disk(user, self.path)) - - @patch('pagure.lib.is_valid_ssh_key', MagicMock(return_value='foo bar')) - def test_create_user_ssh_keys_on_disk(self): - """ Test the create_user_ssh_keys_on_disk function of pagure.lib. """ - user = pagure.lib.get_user(self.session, 'foo') - user.public_ssh_key = 'foo bar' - self.session.add(user) - self.session.commit() - - self.assertIsNone( - pagure.lib.create_user_ssh_keys_on_disk(user, self.path)) - - # Re-generate the ssh keys on disk: - self.assertIsNone( - pagure.lib.create_user_ssh_keys_on_disk(user, self.path)) - def test_update_user_settings_invalid_user(self): """ Test the update_user_settings function of pagure.lib. """ self.assertRaises( diff --git a/tests/test_pagure_lib_git.py b/tests/test_pagure_lib_git.py index 693dbef..cf481ee 100644 --- a/tests/test_pagure_lib_git.py +++ b/tests/test_pagure_lib_git.py @@ -380,23 +380,24 @@ repo requests/somenamespace/test3 repo = pagure.lib.get_authorized_project(self.session, 'test') # Add two deploy keys (one readonly one push) - msg1 = pagure.lib.add_deploykey_to_project( + pingou = pagure.lib.get_user(self.session, 'pingou') + msg1 = pagure.lib.add_sshkey_to_project_or_user( session=self.session, project=repo, ssh_key='ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQDAzBMSIlvPRaEiLOTVInErkRIw9CzQQcnslDekAn1jFnGf+SNa1acvbTiATbCX71AA03giKrPxPH79dxcC7aDXerc6zRcKjJs6MAL9PrCjnbyxCKXRNNZU5U9X/DLaaL1b3caB+WD6OoorhS3LTEtKPX8xyjOzhf3OQSzNjhJp5Q==', pushaccess=False, - user='pingou' + creator=pingou ) - msg2 = pagure.lib.add_deploykey_to_project( + msg2 = pagure.lib.add_sshkey_to_project_or_user( session=self.session, project=repo, ssh_key='ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC9Xwc2RDzPBhlEDARfHldGjudIVoa04tqT1JVKGQmyllTFz7Rb8CngQL3e7zyNzotnhwYKHdoiLlPkVEiDee4dWMUe48ilqId+FJZQGhyv8fu4BoFdE1AJUVylzmltbLg14VqG5gjTpXgtlrEva9arKwBMHJjRYc8ScaSn3OgyQw==', pushaccess=True, - user='pingou' + creator=pingou ) self.session.commit() - self.assertEqual(msg1, 'Deploy key added') - self.assertEqual(msg2, 'Deploy key added') + self.assertEqual(msg1, 'SSH key added') + self.assertEqual(msg2, 'SSH key added') # Add a forked project item = pagure.lib.model.Project( user_id=1, # pingou