#!/usr/bin/env python
# coding: utf-8
"""
Wrapper for /bin/sign from obs-sign package
"""
from subprocess import Popen, PIPE
import json
import os
from requests import request
from .exceptions import CoprSignError, CoprSignNoKeyError, \
CoprKeygenRequestError
SIGN_BINARY = "/bin/sign"
DOMAIN = "fedorahosted.org"
[docs]def create_gpg_email(username, projectname):
"""
Creates canonical name_email to identify gpg key
"""
return "{}#{}@copr.{}".format(username, projectname, DOMAIN)
[docs]def get_pubkey(username, projectname, outfile=None):
"""
Retrieves public key for user/project from signer host.
:param outfile: [optional] file to write obtained key
:return: public keys
:raises CoprSignError: failed to retrieve key, see error message
:raises CoprSignNoKeyError: if there are no such user in keyring
"""
usermail = create_gpg_email(username, projectname)
cmd = ["sudo", SIGN_BINARY, "-u", usermail, "-p"]
try:
handle = Popen(cmd, stdout=PIPE, stderr=PIPE)
stdout, stderr = handle.communicate()
except Exception as e:
raise CoprSignError("Failed to get user pubkey"
" due to: {}".format(e))
if handle.returncode != 0:
if "unknown key:" in stderr:
raise CoprSignNoKeyError(
"There are no gpg keys for user {} in keyring".format(username),
return_code=handle.returncode,
cmd=cmd, stdout=stdout, stderr=stderr)
raise CoprSignError(
msg="Failed to get user pubkey\n"
"sign stdout: {}\n sign stderr: {}\n".format(stdout, stderr),
return_code=handle.returncode,
cmd=cmd, stdout=stdout, stderr=stderr)
if outfile:
with open(outfile, "w") as handle:
handle.write(stdout)
return stdout
[docs]def _sign_one(path, email):
cmd = ["sudo", SIGN_BINARY, "-u", email, "-r", path]
try:
handle = Popen(cmd, stdout=PIPE, stderr=PIPE)
stdout, stderr = handle.communicate()
except Exception as e:
err = CoprSignError(
msg="Failed to invoke sign {} by user {} with error {}"
.format(path, email, e, cmd=None, stdout=None, stderr=None))
raise err
if handle.returncode != 0:
err = CoprSignError(
msg="Failed to sign {} by user {}".format(path, email),
return_code=handle.returncode,
cmd=cmd, stdout=stdout, stderr=stderr)
raise err
return stdout, stderr
[docs]def sign_rpms_in_dir(username, projectname, path, opts, log):
"""
Signs rpms using obs-signd.
If some some pkgs failed to sign, entire build marked as failed,
but we continue to try sign other pkgs.
:param username: copr username
:param projectname: copr projectname
:param path: directory with rpms to be signed
:param Munch opts: backend config
:type log: logging.Logger
:raises: :py:class:`backend.exceptions.CoprSignError` failed to sign at least one package
"""
rpm_list = [
os.path.join(path, filename)
for filename in os.listdir(path)
if filename.endswith(".rpm")
]
if not rpm_list:
return
try:
get_pubkey(username, projectname)
except CoprSignNoKeyError:
create_user_keys(username, projectname, opts)
errors = [] # tuples (rpm_filepath, exception)
for rpm in rpm_list:
try:
_sign_one(rpm, create_gpg_email(username, projectname))
log.info("signed rpm: {}".format(rpm))
except CoprSignError as e:
log.exception("failed to sign rpm: {}".format(rpm))
errors.append((rpm, e))
if errors:
raise CoprSignError("Rpm sign failed, affected rpms: {}"
.format([err[0] for err in errors]))
[docs]def create_user_keys(username, projectname, opts):
"""
Generate a new key-pair at sign host
:param username:
:param projectname:
:param opts: backend config
:return: None
"""
data = json.dumps({
"name_real": "{}_{}".format(username, projectname),
"name_email": create_gpg_email(username, projectname)
})
keygen_url = "http://{}/gen_key".format(opts.keygen_host)
query = dict(url=keygen_url, data=data, method="post")
try:
response = request(**query)
except Exception as e:
raise CoprKeygenRequestError(
msg="Failed to create key-pair for user: {},"
" project:{} with error: {}"
.format(username, projectname, e), request=query)
if response.status_code >= 400:
raise CoprKeygenRequestError(
msg="Failed to create key-pair for user: {}, project:{}, status_code: {}, response: {}"
.format(username, projectname, response.status_code, response.text),
request=query, response=response)
[docs]def _unsign_one(path):
# Requires rpm-sign package
cmd = ["sudo", "/usr/bin/rpm", "--delsign", path]
handle = Popen(cmd, stdout=PIPE, stderr=PIPE)
stdout, stderr = handle.communicate()
if handle.returncode != 0:
err = CoprSignError(
msg="Failed to unsign ".format(path),
return_code=handle.returncode,
cmd=cmd, stdout=stdout, stderr=stderr)
raise err
return stdout, stderr
[docs]def unsign_rpms_in_dir(path, opts, log):
"""
:param path: directory with rpms to be signed
:param Munch opts: backend config
:type log: logging.Logger
:raises: :py:class:`backend.exceptions.CoprSignError` failed to sign at least one package
"""
rpm_list = [
os.path.join(path, filename)
for filename in os.listdir(path)
if filename.endswith(".rpm")
]
if not rpm_list:
return
errors = [] # tuples (rpm_filepath, exception)
for rpm in rpm_list:
try:
_unsign_one(rpm)
log.info("unsigned rpm: {}".format(rpm))
except CoprSignError as e:
log.exception("failed to unsign rpm: {}".format(rpm))
errors.append((rpm, e))
if errors:
raise CoprSignError("Rpm unsign failed, affected rpms: {}"
.format([err[0] for err in errors]))