1 import os
2 import base64
3 import datetime
4 import functools
5 from functools import wraps, partial
6
7 from netaddr import IPAddress, IPNetwork
8 import re
9 import flask
10
11 from openid_teams.teams import TeamsRequest
12
13 from coprs import app
14 from coprs import db
15 from coprs import helpers
16 from coprs import models
17 from coprs import oid
18 from coprs.logic.complex_logic import ComplexLogic
19 from coprs.logic.users_logic import UsersLogic
20 from coprs.logic.coprs_logic import CoprsLogic
24 """
25 Create proper Fedora OpenID name from short name.
26
27 >>> fedoraoid == fed_openidize_name(user.name)
28 True
29 """
30
31 return "http://{0}.id.fedoraproject.org/".format(name)
32
48
51 return oidname.replace(".id.fedoraproject.org/", "") \
52 .replace("http://", "")
53
67
72
85
86
87 @app.errorhandler(404)
88 -def page_not_found(message):
89 return flask.render_template("404.html", message=message), 404
90
95
98 """
99 :type message: str
100 :type err: CoprHttpException
101 """
102 return flask.render_template("_error.html",
103 message=message,
104 error_code=code,
105 error_title=title), code
106
107
108 server_error_handler = partial(generic_error, code=500, title="Internal Server Error")
109 bad_request_handler = partial(generic_error, code=400, title="Bad Request")
110
111 app.errorhandler(500)(server_error_handler)
112 app.errorhandler(400)(bad_request_handler)
113
114 misc = flask.Blueprint("misc", __name__)
115
116
117 @misc.route(app.config['KRB5_LOGIN_BASEURI'] + "<name>/", methods=["GET"])
119 """
120 Handle the Kerberos authentication.
121
122 Note that if we are able to get here, either the user is authenticated
123 correctly, or apache is mis-configured and it does not perform KRB
124 authentication at all. Note also, even if that can be considered ugly, we
125 are reusing oid's get_next_url feature with kerberos login.
126 """
127
128
129 if flask.g.user is not None:
130 return flask.redirect(oid.get_next_url())
131
132 krb_config = app.config['KRB5_LOGIN']
133
134 found = None
135 for key in krb_config.keys():
136 if krb_config[key]['URI'] == name:
137 found = key
138 break
139
140 if not found:
141
142 return flask.render_template("404.html"), 404
143
144 if app.config["DEBUG"] and 'TEST_REMOTE_USER' in os.environ:
145
146 flask.request.environ['REMOTE_USER'] = os.environ['TEST_REMOTE_USER']
147
148 if 'REMOTE_USER' not in flask.request.environ:
149 nocred = "Kerberos authentication failed (no credentials provided)"
150 return flask.render_template("403.html", message=nocred), 403
151
152 krb_username = flask.request.environ['REMOTE_USER']
153 app.logger.debug("krb5 login attempt: " + krb_username)
154 username = krb_straighten_username(krb_username)
155 if not username:
156 message = "invalid krb5 username: " + krb_username
157 return flask.render_template("403.html", message=message), 403
158
159 krb_login = (
160 models.Krb5Login.query
161 .filter(models.Krb5Login.config_name == key)
162 .filter(models.Krb5Login.primary == username)
163 .first()
164 )
165 if krb_login:
166 flask.g.user = krb_login.user
167 flask.session['krb5_login'] = krb_login.user.name
168 flask.flash(u"Welcome, {0}".format(flask.g.user.name))
169 return flask.redirect(oid.get_next_url())
170
171
172 user = models.User.query.filter(models.User.username == username).first()
173 if not user:
174
175 email = username + "@" + krb_config[key]['email_domain']
176 user = create_user_wrapper(username, email)
177 db.session.add(user)
178
179 krb_login = models.Krb5Login(user=user, primary=username, config_name=key)
180 db.session.add(krb_login)
181 db.session.commit()
182
183 flask.flash(u"Welcome, {0}".format(user.name))
184 flask.g.user = user
185 flask.session['krb5_login'] = user.name
186 return flask.redirect(oid.get_next_url())
187
188
189 @misc.route("/login/", methods=["GET"])
190 @oid.loginhandler
191 -def login():
192 if not app.config['FAS_LOGIN']:
193 if app.config['KRB5_LOGIN']:
194 return krb5_login_redirect(next=oid.get_next_url())
195 flask.flash("No auth method available", "error")
196 return flask.redirect(flask.url_for("coprs_ns.coprs_show"))
197
198 if flask.g.user is not None:
199 return flask.redirect(oid.get_next_url())
200 else:
201
202 team_req = TeamsRequest(["_FAS_ALL_GROUPS_"])
203 return oid.try_login("https://id.fedoraproject.org/",
204 ask_for=["email", "timezone"],
205 extensions=[team_req])
206
210 flask.session["openid"] = resp.identity_url
211 fasusername = resp.identity_url.replace(
212 ".id.fedoraproject.org/", "").replace("http://", "")
213
214
215 if fasusername and (
216 (
217 app.config["USE_ALLOWED_USERS"] and
218 fasusername in app.config["ALLOWED_USERS"]
219 ) or not app.config["USE_ALLOWED_USERS"]):
220
221 username = fed_raw_name(resp.identity_url)
222 user = models.User.query.filter(
223 models.User.username == username).first()
224 if not user:
225 user = create_user_wrapper(username, resp.email, resp.timezone)
226 else:
227 user.mail = resp.email
228 user.timezone = resp.timezone
229 if "lp" in resp.extensions:
230 team_resp = resp.extensions['lp']
231 user.openid_groups = {"fas_groups": team_resp.teams}
232
233 db.session.add(user)
234 db.session.commit()
235 flask.flash(u"Welcome, {0}".format(user.name))
236 flask.g.user = user
237
238 if flask.request.url_root == oid.get_next_url():
239 return flask.redirect(flask.url_for("coprs_ns.coprs_by_user",
240 username=user.name))
241 return flask.redirect(oid.get_next_url())
242 else:
243 flask.flash("User '{0}' is not allowed".format(fasusername))
244 return flask.redirect(oid.get_next_url())
245
246
247 @misc.route("/logout/")
248 -def logout():
249 flask.session.pop("openid", None)
250 flask.session.pop("krb5_login", None)
251 flask.flash(u"You were signed out")
252 return flask.redirect(oid.get_next_url())
253
256 @functools.wraps(f)
257 def decorated_function(*args, **kwargs):
258 token = None
259 apt_login = None
260 if "Authorization" in flask.request.headers:
261 base64string = flask.request.headers["Authorization"]
262 base64string = base64string.split()[1].strip()
263 userstring = base64.b64decode(base64string)
264 (apt_login, token) = userstring.decode("utf-8").split(":")
265 token_auth = False
266 if token and apt_login:
267 user = UsersLogic.get_by_api_login(apt_login).first()
268 if (user and user.api_token == token and
269 user.api_token_expiration >= datetime.date.today()):
270
271 if user.proxy and "username" in flask.request.form:
272 user = UsersLogic.get(flask.request.form["username"]).first()
273
274 token_auth = True
275 flask.g.user = user
276 if not token_auth:
277 url = 'https://' + app.config["PUBLIC_COPR_HOSTNAME"]
278 url = helpers.fix_protocol_for_frontend(url)
279
280 output = {
281 "output": "notok",
282 "error": "Login invalid/expired. Please visit {0}/api to get or renew your API token.".format(url),
283 }
284 jsonout = flask.jsonify(output)
285 jsonout.status_code = 500
286 return jsonout
287 return f(*args, **kwargs)
288 return decorated_function
289
292 krbc = app.config['KRB5_LOGIN']
293 for key in krbc:
294
295 return flask.redirect(flask.url_for("misc.krb5_login",
296 name=krbc[key]['URI'],
297 next=next))
298 flask.flash("Unable to pick krb5 login page", "error")
299 return flask.redirect(flask.url_for("coprs_ns.coprs_show"))
300
303 def view_wrapper(f):
304 @functools.wraps(f)
305 def decorated_function(*args, **kwargs):
306 if flask.g.user is None:
307 return flask.redirect(flask.url_for("misc.login",
308 next=flask.request.url))
309
310 if role == helpers.RoleEnum("admin") and not flask.g.user.admin:
311 flask.flash("You are not allowed to access admin section.")
312 return flask.redirect(flask.url_for("coprs_ns.coprs_show"))
313
314 return f(*args, **kwargs)
315 return decorated_function
316
317
318
319
320
321 if callable(role):
322 return view_wrapper(role)
323 else:
324 return view_wrapper
325
329 @functools.wraps(f)
330 def decorated_function(*args, **kwargs):
331 auth = flask.request.authorization
332 if not auth or auth.password != app.config["BACKEND_PASSWORD"]:
333 return "You have to provide the correct password\n", 401
334
335 return f(*args, **kwargs)
336 return decorated_function
337
340 @functools.wraps(f)
341 def decorated_function(*args, **kwargs):
342 ip_addr = IPAddress(flask.request.remote_addr)
343 accept_ranges = set(app.config.get("INTRANET_IPS", []))
344 accept_ranges.add("127.0.0.1")
345 if not any(ip_addr in IPNetwork(addr_or_net) for addr_or_net in accept_ranges):
346 return ("Stats can be update only from intranet hosts, "
347 "not {}, check config\n".format(flask.request.remote_addr)), 403
348
349 return f(*args, **kwargs)
350 return decorated_function
351
364 return wrapper
365
366
367 @misc.route("/migration-report/")
368 @misc.route("/migration-report/<username>")
369 -def coprs_migration_report(username=None):
381
388
395