# coding: utf-8
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
import json
from multiprocessing import Process
import time
from setproctitle import setproctitle
import traceback
import psutil
from ..vm_manage import VmStates
from ..exceptions import VmSpawnLimitReached
from ..helpers import get_redis_logger
[docs]class VmMaster(Process):
"""
Spawns and terminate VM for builder process.
:type vmm: backend.vm_manage.manager.VmManager
:type spawner: backend.vm_manage.spawn.Spawner
:type checker: backend.vm_manage.check.HealthChecker
"""
def __init__(self, opts, vmm, spawner, checker):
super(VmMaster, self).__init__(name="vm_master")
self.opts = opts
self.vmm = vmm
self.spawner = spawner
self.checker = checker
self.kill_received = False
self.log = get_redis_logger(self.opts, "vmm.vm_master", "vmm")
self.vmm.set_logger(self.log)
[docs] def remove_old_dirty_vms(self):
# terminate vms bound_to user and time.time() - vm.last_release_time > threshold_keep_vm_for_user_timeout
# or add field to VMD ot override common threshold
for vmd in self.vmm.get_vm_by_group_and_state_list(None, [VmStates.READY]):
if vmd.get_field(self.vmm.rc, "bound_to_user") is None:
continue
last_release = vmd.get_field(self.vmm.rc, "last_release")
if last_release is None:
continue
not_re_acquired_in = time.time() - float(last_release)
if not_re_acquired_in > self.opts.build_groups[vmd.group]["vm_dirty_terminating_timeout"]:
self.log.info("dirty VM `{}` not re-acquired in {}, terminating it"
.format(vmd.vm_name, not_re_acquired_in))
self.vmm.start_vm_termination(vmd.vm_name, allowed_pre_state=VmStates.READY)
[docs] def check_one_vm_for_dead_builder(self, vmd):
# TODO: builder should renew lease periodically
# and we should use that time instead of in_use_since and pid checks
in_use_since = vmd.get_field(self.vmm.rc, "in_use_since")
pid = vmd.get_field(self.vmm.rc, "used_by_pid")
if not in_use_since or not pid:
return
in_use_time_elapsed = time.time() - float(in_use_since)
# give a minute for worker to set correct title
if in_use_time_elapsed < 60 and str(pid) == "None":
return
pid = int(pid)
# try:
# # here we can catch race condition: worker acquired VM but haven't set process title yet
# if psutil.pid_exists(pid) and vmd.vm_name in psutil.Process(pid).cmdline[0]:
# return
#
# self.log.info("Process `{}` not exists anymore, doing second try. VM data: {}"
# .format(pid, vmd))
# # dirty hack: sleep and check again
# time.sleep(5)
# if psutil.pid_exists(pid) and vmd.vm_name in psutil.Process(pid).cmdline[0]:
# return
# except Exception:
# self.log.exception("Failed do determine if process `{}` still alive for VM: {}, assuming alive"
# .format(pid, vmd))
# return
# psutil changed Process().cmdline from property to function between f20 and f22
# disabling more precise check for now
try:
# here we can catch race condition: worker acquired VM but haven't set process title yet
if psutil.pid_exists(pid):
return
self.log.info("Process `{}` not exists anymore, doing second try. VM data: {}"
.format(pid, vmd))
# dirty hack: sleep and check again
time.sleep(5)
if psutil.pid_exists(pid):
return
except Exception:
self.log.exception("Failed do determine if process `{}` still alive for VM: {}, assuming alive"
.format(pid, vmd))
return
self.log.info("Process `{}` not exists anymore, terminating VM: {} ".format(pid, vmd.vm_name))
self.vmm.start_vm_termination(vmd.vm_name, allowed_pre_state=VmStates.IN_USE)
# TODO: build rescheduling ?
[docs] def remove_vm_with_dead_builder(self):
# TODO: rewrite build manage at backend and move functionality there
# VMM shouldn't do this
# check that process who acquired VMD still exists, otherwise release VM
for vmd in self.vmm.get_vm_by_group_and_state_list(None, [VmStates.IN_USE]):
self.check_one_vm_for_dead_builder(vmd)
[docs] def check_vms_health(self):
# for machines in state ready and time.time() - vm.last_health_check > threshold_health_check_period
states_to_check = [VmStates.CHECK_HEALTH_FAILED, VmStates.READY,
VmStates.GOT_IP, VmStates.IN_USE]
for vmd in self.vmm.get_vm_by_group_and_state_list(None, states_to_check):
last_health_check = vmd.get_field(self.vmm.rc, "last_health_check")
check_period = self.opts.build_groups[vmd.group]["vm_health_check_period"]
if not last_health_check or time.time() - float(last_health_check) > check_period:
self.start_vm_check(vmd.vm_name)
[docs] def start_vm_check(self, vm_name):
"""
Start VM health check sub-process if current VM state allows it
"""
vmd = self.vmm.get_vm_by_name(vm_name)
orig_state = vmd.state
if self.vmm.lua_scripts["set_checking_state"](keys=[vmd.vm_key], args=[time.time()]) == "OK":
# can start
try:
self.checker.run_check_health(vmd.vm_name, vmd.vm_ip)
except Exception as err:
self.log.exception("Failed to start health check: {}".format(err))
if orig_state != VmStates.IN_USE:
vmd.store_field(self.vmm.rc, "state", orig_state)
else:
self.log.debug("Failed to start vm check, wrong state")
return False
[docs] def _check_total_running_vm_limit(self, group):
""" Checks that number of VM in any state excluding Terminating plus
number of running spawn processes is less than
threshold defined by BackendConfig.build_group[group]["max_vm_total"]
"""
active_vmd_list = self.vmm.get_vm_by_group_and_state_list(
group, [VmStates.GOT_IP, VmStates.READY, VmStates.IN_USE,
VmStates.CHECK_HEALTH, VmStates.CHECK_HEALTH_FAILED])
total_vm_estimation = len(active_vmd_list) + self.spawner.get_proc_num_per_group(group)
if total_vm_estimation >= self.opts.build_groups[group]["max_vm_total"]:
raise VmSpawnLimitReached(
"Skip spawn for group {}: max total vm reached: vm count: {}, spawn process: {}"
.format(group, len(active_vmd_list), self.spawner.get_proc_num_per_group(group)))
[docs] def _check_elapsed_time_after_spawn(self, group):
""" Checks that time elapsed since latest VM spawn attempt is greater than
threshold defined by BackendConfig.build_group[group]["vm_spawn_min_interval"]
"""
last_vm_spawn_start = self.vmm.read_vm_pool_info(group, "last_vm_spawn_start")
if last_vm_spawn_start:
time_elapsed = time.time() - float(last_vm_spawn_start)
if time_elapsed < self.opts.build_groups[group]["vm_spawn_min_interval"]:
raise VmSpawnLimitReached(
"Skip spawn for group {}: time after previous spawn attempt "
"< vm_spawn_min_interval: {}<{}"
.format(group, time_elapsed, self.opts.build_groups[group]["vm_spawn_min_interval"]))
[docs] def _check_number_of_running_spawn_processes(self, group):
""" Check that number of running spawn processes is less than
threshold defined by BackendConfig.build_group[]["max_spawn_processes"]
"""
if self.spawner.get_proc_num_per_group(group) >= self.opts.build_groups[group]["max_spawn_processes"]:
raise VmSpawnLimitReached(
"Skip spawn for group {}: reached maximum number of spawning processes: {}"
.format(group, self.spawner.get_proc_num_per_group(group)))
[docs] def _check_total_vm_limit(self, group):
""" Check that number of running spawn processes is less than
threshold defined by BackendConfig.build_group[]["max_spawn_processes"]
"""
count_all_vm = len(self.vmm.get_all_vm_in_group(group))
if count_all_vm >= 2 * self.opts.build_groups[group]["max_vm_total"]:
raise VmSpawnLimitReached(
"Skip spawn for group {}: #(ALL VM) >= 2 * max_vm_total reached: {}"
.format(group, count_all_vm))
[docs] def try_spawn_one(self, group):
"""
Starts spawning process if all conditions are satisfied
"""
# TODO: add setting "max_vm_in_ready_state", when this number reached, do not spawn more VMS, min value = 1
try:
self._check_total_running_vm_limit(group)
self._check_elapsed_time_after_spawn(group)
self._check_number_of_running_spawn_processes(group)
self._check_total_vm_limit(group)
except VmSpawnLimitReached as err:
self.log.debug(err.msg)
return
self.log.info("Start spawning new VM for group: {}".format(self.opts.build_groups[group]["name"]))
self.vmm.write_vm_pool_info(group, "last_vm_spawn_start", time.time())
try:
self.spawner.start_spawn(group)
except Exception as error:
self.log.exception("Error during spawn attempt: {}".format(error))
[docs] def start_spawn_if_required(self):
for group in self.vmm.vm_groups:
self.try_spawn_one(group)
[docs] def do_cycle(self):
self.log.debug("starting do_cycle")
# TODO: each check should be executed in threads ... and finish with join?
self.remove_old_dirty_vms()
self.check_vms_health()
self.start_spawn_if_required()
self.remove_vm_with_dead_builder()
self.finalize_long_health_checks()
self.terminate_again()
self.spawner.recycle()
# todo: self.terminate_excessive_vms() -- for case when config changed during runtime
[docs] def run(self):
if any(x is None for x in [self.spawner, self.checker]):
raise RuntimeError("provide Spawner and HealthChecker "
"to run VmManager daemon")
setproctitle("VM master")
self.vmm.mark_server_start()
self.kill_received = False
self.log.info("VM master process started")
while not self.kill_received:
time.sleep(self.opts.vm_cycle_timeout)
try:
self.do_cycle()
except Exception as err:
self.log.error("Unhandled error: {}, {}".format(err, traceback.format_exc()))
[docs] def terminate(self):
self.kill_received = True
if self.spawner is not None:
self.spawner.terminate()
if self.checker is not None:
self.checker.terminate()
[docs] def finalize_long_health_checks(self):
"""
After server crash it's possible that some VM's will remain in `check_health` state
Here we are looking for such records and mark them with `check_health_failed` state
"""
for vmd in self.vmm.get_vm_by_group_and_state_list(None, [VmStates.CHECK_HEALTH]):
time_elapsed = time.time() - float(vmd.get_field(self.vmm.rc, "last_health_check") or 0)
if time_elapsed > self.opts.build_groups[vmd.group]["vm_health_check_max_time"]:
self.log.info("VM marked with check fail state, "
"VM stayed too long in health check state, elapsed: {} VM: {}"
.format(time_elapsed, str(vmd)))
self.vmm.mark_vm_check_failed(vmd.vm_name)
[docs] def terminate_again(self):
"""
If we failed to terminate instance request termination once more.
Non-terminated instance detected as vm in the `terminating` state with
time.time() - `terminating since` > Threshold
It's possible, that VM was terminated but termination process doesn't receive confirmation from VM provider,
but we have already got a new VM with the same IP => it's safe to remove old vm from pool
"""
for vmd in self.vmm.get_vm_by_group_and_state_list(None, [VmStates.TERMINATING]):
time_elapsed = time.time() - float(vmd.get_field(self.vmm.rc, "terminating_since") or 0)
if time_elapsed > self.opts.build_groups[vmd.group]["vm_terminating_timeout"]:
if len(self.vmm.lookup_vms_by_ip(vmd.vm_ip)) > 1:
self.log.info(
"Removing VM record: {}. There are more VM with the same ip, "
"it's safe to remove current one from VM pool".format(vmd.vm_name))
self.vmm.remove_vm_from_pool(vmd.vm_name)
else:
self.log.info("Sent VM {} for termination again".format(vmd.vm_name))
self.vmm.start_vm_termination(vmd.vm_name, allowed_pre_state=VmStates.TERMINATING)