Source code for backend.vm_manage.manager

# coding: utf-8

from __future__ import print_function
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import

from itertools import chain
import json
import time
import weakref
from cStringIO import StringIO
import datetime
from backend.exceptions import VmError, NoVmAvailable, VmDescriptorNotFound

from backend.helpers import get_redis_connection
from .models import VmDescriptor
from . import VmStates, KEY_VM_INSTANCE, KEY_VM_POOL, EventTopics, PUBSUB_MB, KEY_SERVER_INFO, \
    KEY_VM_POOL_INFO
from ..helpers import get_redis_logger

# KEYS[1]: VMD key
# ARGV[1] current timestamp for `last_health_check`
set_checking_state_lua = """
local old_state = redis.call("HGET", KEYS[1], "state")
if old_state ~= "got_ip" and old_state ~= "ready" and old_state ~= "in_use" and old_state ~= "check_health_failed" then
    return nil
else
    if old_state ~= "in_use" then
        redis.call("HSET", KEYS[1], "state", "check_health")
    end
    redis.call("HSET", KEYS[1], "last_health_check", ARGV[1])
    return "OK"
end
"""

# KEYS[1]: VMD key
# ARGV[1]: user to bound;
# ARGV[2]: pid of the builder process
# ARGV[3]: current timestamp for `in_use_since`
# ARGV[4]: task_id
# ARGV[5]: build_id
# ARGV[6]: chroot
acquire_vm_lua = """
local old_state = redis.call("HGET", KEYS[1], "state")
if old_state ~= "ready"  then
    return nil
else
    local last_health_check = tonumber(redis.call("HGET", KEYS[1], "last_health_check"))
    local server_restart_time = tonumber(redis.call("HGET", KEYS[2], "server_start_timestamp"))
    if last_health_check and server_restart_time and last_health_check > server_restart_time  then
        redis.call("HMSET", KEYS[1], "state", "in_use", "bound_to_user", ARGV[1],
                   "used_by_pid", ARGV[2], "in_use_since", ARGV[3],
                   "task_id",  ARGV[4], "build_id", ARGV[5], "chroot", ARGV[6])
        return "OK"
    else
        return nil
    end
end
"""

# KEYS[1]: VMD key
# ARGV[1] current timestamp for `last_release`
release_vm_lua = """
local old_state = redis.call("HGET", KEYS[1], "state")
if old_state ~= "in_use" then
    return nil
else
    redis.call("HMSET", KEYS[1], "state", "ready", "last_release", ARGV[1])
    redis.call("HDEL", KEYS[1], "in_use_since", "used_by_pid", "task_id", "build_id", "chroot")
    redis.call("HINCRBY", KEYS[1], "builds_count", 1)

    local check_fails = tonumber(redis.call("HGET", KEYS[1], "check_fails"))
    if check_fails > 0 then
        redis.call("HSET", KEYS[1], "state", "check_health_failed")
    end

    return "OK"
end
"""

# KEYS [1]: VMD key
# ARGS [1]: allowed_pre_state
# ARGS [2]: timestamp for `terminating_since`
terminate_vm_lua = """
local old_state = redis.call("HGET", KEYS[1], "state")

if old_state == "in_use" and ARGV[1] ~= "in_use" then
    return "Termination of VM in in_use state are forbidden"
end

if ARGV[1] and ARGV[1] ~= "None" and old_state ~= ARGV[1] then
    return "Old state != `allowed_pre_state`"
elseif old_state == "terminating" and ARGV[1] ~= "terminating" then
    return "Already terminating"
else
    redis.call("HMSET", KEYS[1], "state", "terminating", "terminating_since", ARGV[2])
    return "OK"
end
"""

mark_vm_check_failed_lua = """
local old_state = redis.call("HGET", KEYS[1], "state")
if old_state == "check_health" then
    redis.call("HMSET", KEYS[1], "state", "check_health_failed")
    return "OK"
end
"""


[docs]class VmManager(object): """ VM manager, it is used for two purposes: - Daemon which control VMs lifecycle, requires params `spawner,terminator` - Client to acquire and release VM in builder process :param opts: Global backend configuration :type opts: Munch :type logger: logging.Logger :param logger: Logger instance to use inside of manager, if None manager would create new logger using helpers.get_redis_logger """ def __init__(self, opts, logger=None): self.opts = weakref.proxy(opts) self.lua_scripts = {} self.rc = None self.log = logger or get_redis_logger(self.opts, "vmm.lib", "vmm") self.rc = get_redis_connection(self.opts) self.lua_scripts["set_checking_state"] = self.rc.register_script(set_checking_state_lua) self.lua_scripts["acquire_vm"] = self.rc.register_script(acquire_vm_lua) self.lua_scripts["release_vm"] = self.rc.register_script(release_vm_lua) self.lua_scripts["terminate_vm"] = self.rc.register_script(terminate_vm_lua) self.lua_scripts["mark_vm_check_failed"] = self.rc.register_script(mark_vm_check_failed_lua)
[docs] def set_logger(self, logger): """ :param logger: Logger to be used by manager :type logger: logging.Logger """ self.log = logger
@property def vm_groups(self): """ :return: VM build groups :rtype: list of int """ return range(self.opts.build_groups_count)
[docs] def add_vm_to_pool(self, vm_ip, vm_name, group): """ Adds newly spawned VM into the pool of available builders :param str vm_ip: IP :param str vm_name: VM name :param group: builder group :type group: int :rtype: VmDescriptor """ # print("\n ADD VM TO POOL") if self.rc.sismember(KEY_VM_POOL.format(group=group), vm_name): raise VmError("Can't add VM `{}` to the pool, such name already used".format(vm_name)) vmd = VmDescriptor(vm_ip, vm_name, group, VmStates.GOT_IP) # print("VMD: {}".format(vmd)) pipe = self.rc.pipeline() pipe.sadd(KEY_VM_POOL.format(group=group), vm_name) pipe.hmset(KEY_VM_INSTANCE.format(vm_name=vm_name), vmd.to_dict()) pipe.execute() self.log.info("registered new VM: {} {}".format(vmd.vm_name, vmd.vm_ip)) return vmd
[docs] def lookup_vms_by_ip(self, vm_ip): """ :param vm_ip: :return: List of found VMD with the give ip :rtype: list of VmDescriptor """ return [ vmd for vmd in self.get_all_vm() if vmd.vm_ip == vm_ip ]
[docs] def mark_vm_check_failed(self, vm_name): vm_key = KEY_VM_INSTANCE.format(vm_name=vm_name) self.lua_scripts["mark_vm_check_failed"](keys=[vm_key])
[docs] def mark_server_start(self): self.rc.hset(KEY_SERVER_INFO, "server_start_timestamp", time.time())
[docs] def can_user_acquire_more_vm(self, username, group): """ :return bool: True when user are allowed to acquire more VM """ vmd_list = self.get_all_vm_in_group(group) vm_count_used_by_user = len([ vmd for vmd in vmd_list if vmd.bound_to_user == username and vmd.state == VmStates.IN_USE ]) self.log.debug("# vm by user: {}, limit:{} ".format( vm_count_used_by_user, self.opts.build_groups[group]["max_vm_per_user"] )) if vm_count_used_by_user >= self.opts.build_groups[group]["max_vm_per_user"]: # TODO: this check isn't reliable, if two (or more) processes check VM list # at the +- same time, they could acquire more VMs # proper solution: do this check inside lua script self.log.debug("No VM are available, user `{}` already acquired #{} VMs" .format(username, vm_count_used_by_user)) return False else: return True
[docs] def acquire_vm(self, group, username, pid, task_id=None, build_id=None, chroot=None): """ Try to acquire VM from pool :param group: builder group id, as defined in config :type group: int :param username: build owner username, VMM prefer to reuse an existing VM which was used by the same user :param pid: builder pid to release VM after build process unhandled death :rtype: VmDescriptor :raises: NoVmAvailable when manager couldn't find suitable VM for the given group and user """ vmd_list = self.get_all_vm_in_group(group) if not self.can_user_acquire_more_vm(username, group): raise NoVmAvailable("No VM are available, user `{}` already acquired too much VMs" .format(username)) ready_vmd_list = [vmd for vmd in vmd_list if vmd.state == VmStates.READY] # trying to find VM used by this user dirtied_by_user = [vmd for vmd in ready_vmd_list if vmd.bound_to_user == username] clean_list = [vmd for vmd in ready_vmd_list if vmd.bound_to_user is None] all_vms = list(chain(dirtied_by_user, clean_list)) for vmd in all_vms: if vmd.get_field(self.rc, "check_fails") != "0": self.log.debug("VM {} has check fails, skip acquire".format(vmd.vm_name)) vm_key = KEY_VM_INSTANCE.format(vm_name=vmd.vm_name) if self.lua_scripts["acquire_vm"](keys=[vm_key, KEY_SERVER_INFO], args=[username, pid, time.time(), task_id, build_id, chroot]) == "OK": self.log.info("Acquired VM :{} {} for pid: {}".format(vmd.vm_name, vmd.vm_ip, pid)) return vmd else: raise NoVmAvailable("No VM are available, please wait in queue. Group: {}".format(group))
[docs] def release_vm(self, vm_name): """ Return VM into the pool. :return: True if successful :rtype: bool """ # in_use -> ready self.log.info("Releasing VM {}".format(vm_name)) vm_key = KEY_VM_INSTANCE.format(vm_name=vm_name) lua_result = self.lua_scripts["release_vm"](keys=[vm_key], args=[time.time()]) self.log.debug("release vm result `{}`".format(lua_result)) return lua_result == "OK"
[docs] def start_vm_termination(self, vm_name, allowed_pre_state=None): """ Initiate VM termination process using redis publish. :param allowed_pre_state: When defined force check that old state is among allowed ones. :type allowed_pre_state: str constant from VmState """ vmd = self.get_vm_by_name(vm_name) lua_result = self.lua_scripts["terminate_vm"](keys=[vmd.vm_key], args=[allowed_pre_state, time.time()]) if lua_result == "OK": msg = { "group": vmd.group, "vm_ip": vmd.vm_ip, "vm_name": vmd.vm_name, "topic": EventTopics.VM_TERMINATION_REQUEST } self.rc.publish(PUBSUB_MB, json.dumps(msg)) self.log.info("VM {} queued for termination".format(vmd.vm_name)) # TODO: remove when refactored build management # self.rc.publish(PUBSUB_INTERRUPT_BUILDER.format(vmd.vm_ip), "vm died") else: self.log.debug("VM termination `{}` skipped due to: {} ".format(vm_name, lua_result))
[docs] def remove_vm_from_pool(self, vm_name): """ Backend forgets about VM after this method :raises VmError: if VM has wrong state """ vmd = self.get_vm_by_name(vm_name) if vmd.get_field(self.rc, "state") != VmStates.TERMINATING: raise VmError("VM should have `terminating` state to be removable") pipe = self.rc.pipeline() pipe.srem(KEY_VM_POOL.format(group=vmd.group), vm_name) pipe.delete(KEY_VM_INSTANCE.format(vm_name=vm_name)) pipe.execute() self.log.info("removed vm `{}` from pool".format(vm_name))
[docs] def _load_multi_safe(self, vm_name_list): result = [] for vm_name in vm_name_list: try: result.append(VmDescriptor.load(self.rc, vm_name)) except VmDescriptorNotFound: self.log.debug("Failed to load VMD: {}".format(vm_name)) return result
[docs] def get_all_vm_in_group(self, group): """ :rtype: list of VmDescriptor """ vm_name_list = self.rc.smembers(KEY_VM_POOL.format(group=group)) return self._load_multi_safe(vm_name_list)
[docs] def get_all_vm(self): """ :rtype: list of VmDescriptor """ vmd_list = [] for group in self.vm_groups: vmd_list.extend(self.get_all_vm_in_group(group)) return vmd_list
[docs] def get_vm_by_name(self, vm_name): """ :rtype: VmDescriptor """ return VmDescriptor.load(self.rc, vm_name)
[docs] def get_vm_by_task_id(self, task_id): """ :rtype: VmDescriptor """ vmd_list = self.get_all_vm() for vmd in vmd_list: if vmd.task_id == task_id: return vmd return None
[docs] def get_vm_by_group_and_state_list(self, group, state_list): """ Select VM-s for the given group and allowed states :param group: filter VM-s by the build group. If ``group is None`` select VM-s from the all groups :param state_list: VM state should be in the ``state_list``. :return: Filtered VM-s :rtype: list of VmDescriptor """ states = set(state_list) if group is None: vmd_list = self.get_all_vm() else: vmd_list = self.get_all_vm_in_group(group) return [vmd for vmd in vmd_list if vmd.state in states]
[docs] def info(self): """ Present information about all managed VMs in a human readable form. :rtype: str """ dt_fields = { "last_health_check", "in_use_since", } buf = StringIO() for group_id in self.vm_groups: bg = self.opts.build_groups[group_id] buf.write("=" * 32) header = "\nVM group #{} {} archs: {}\n===\n".format(group_id, bg["name"], bg["archs"]) buf.write(header) vmd_list = self.get_all_vm_in_group(group_id) for vmd in vmd_list: buf.write("\t VM {}, ip: {}\n".format(vmd.vm_name, vmd.vm_ip)) for k, v in vmd.to_dict().items(): if k in ["vm_name", "vm_ip", "group"]: continue if k in dt_fields: v = str(datetime.datetime.fromtimestamp(float(v))) buf.write("\t\t{}: {}\n".format(k, v)) buf.write("\n") buf.write("\n") return buf.getvalue()
[docs] def write_vm_pool_info(self, group, key, value): self.rc.hset(KEY_VM_POOL_INFO.format(group=group), key, value)
[docs] def read_vm_pool_info(self, group, key): return self.rc.hget(KEY_VM_POOL_INFO.format(group=group), key)