Source code for backend.vm_manage.event_handle

# coding: utf-8
import json
from multiprocessing import Process
from setproctitle import setproctitle
from threading import Thread
import time

from backend.exceptions import VmDescriptorNotFound
from backend.helpers import get_redis_logger
from backend.vm_manage import VmStates, PUBSUB_MB, EventTopics


[docs]class Recycle(Thread): """ Cleanup vmm services, now only terminator :param vmm: :return: """ def __init__(self, terminator, recycle_period, *args, **kwargs): self.terminator = terminator self.recycle_period = int(recycle_period) super(Recycle, self).__init__(*args, **kwargs) self._running = False
[docs] def run(self): self._running = True while self._running: time.sleep(self.recycle_period) self.terminator.recycle()
[docs] def terminate(self): self._running = False
# KEYS[1]: VMD key on_health_check_success_lua = """ local old_state = redis.call("HGET", KEYS[1], "state") if old_state ~= "check_health" and old_state ~= "in_use" then return nil else redis.call("HSET", KEYS[1], "check_fails", 0) if old_state == "check_health" then redis.call("HSET", KEYS[1], "state", "{}") end end """.format(VmStates.READY) # KEYS[1]: VMD key record_failure_lua = """ local old_state = redis.call("HGET", KEYS[1], "state") if old_state ~= "check_health" and old_state ~= "in_use" and old_state ~= "check_health_failed" then return nil else redis.call("HINCRBY", KEYS[1], "check_fails", 1) if old_state == "check_health" then redis.call("HSET", KEYS[1], "state", "{}") end end """.format(VmStates.CHECK_HEALTH_FAILED)
[docs]class EventHandler(Process): """ :type vmm: VmManager """ def __init__(self, opts, vmm, terminator): super(EventHandler, self).__init__(name="EventHandler") self.opts = opts self.vmm = vmm self.terminator = terminator self.kill_received = False # self.do_recycle_proc = None self.handlers_map = { EventTopics.HEALTH_CHECK: self.on_health_check_result, EventTopics.VM_SPAWNED: self.on_vm_spawned, EventTopics.VM_TERMINATION_REQUEST: self.on_vm_termination_request, EventTopics.VM_TERMINATED: self.on_vm_termination_result, } self.lua_scripts = {} self.recycle_period = 60 self.log = get_redis_logger(self.vmm.opts, "vmm.event_handler", "vmm") self.vmm.set_logger(self.log)
[docs] def post_init(self): # todo: move to manager.py, wrap call into VmManager methods self.lua_scripts["on_health_check_success"] = self.vmm.rc.register_script(on_health_check_success_lua) self.lua_scripts["record_failure"] = self.vmm.rc.register_script(record_failure_lua)
[docs] def on_health_check_result(self, msg): try: vmd = self.vmm.get_vm_by_name(msg["vm_name"]) check_fails_count = int(vmd.get_field(self.vmm.rc, "check_fails") or 0) except VmDescriptorNotFound: self.log.debug("VM record disappeared, ignoring health check results, msg: {}" .format(msg)) return if msg["result"] == "OK": self.lua_scripts["on_health_check_success"](keys=[vmd.vm_key], args=[time.time()]) self.log.debug("recording success for ip:{} name:{}".format(vmd.vm_ip, vmd.vm_name)) else: self.log.debug("recording check fail: {}".format(msg)) self.lua_scripts["record_failure"](keys=[vmd.vm_key]) fails_count = int(vmd.get_field(self.vmm.rc, "check_fails") or 0) max_check_fails = self.opts.build_groups[vmd.group]["vm_max_check_fails"] if fails_count > max_check_fails and vmd.state != VmStates.IN_USE: self.log.info("check fail threshold reached: {}, terminating: {}" .format(check_fails_count, msg)) self.vmm.start_vm_termination(vmd.vm_name)
[docs] def on_vm_spawned(self, msg): self.vmm.add_vm_to_pool(vm_ip=msg["vm_ip"], vm_name=msg["vm_name"], group=msg["group"])
[docs] def on_vm_termination_request(self, msg): self.terminator.terminate_vm(vm_ip=msg["vm_ip"], vm_name=msg["vm_name"], group=msg["group"])
[docs] def on_vm_termination_result(self, msg): if msg["result"] == "OK" and "vm_name" in msg: self.log.debug("Vm terminated, removing from pool ip: {}, name: {}, msg: {}" .format(msg.get("vm_ip"), msg.get("vm_name"), msg.get("msg"))) self.vmm.remove_vm_from_pool(msg["vm_name"]) elif "vm_name" not in msg: self.log.debug("Vm termination event missing vm name, msg: {}".format(msg)) else: self.log.debug("Vm termination failed ip: {}, name: {}, msg: {}" .format(msg.get("vm_ip"), msg.get("vm_name"), msg.get("msg")))
[docs] def run(self): setproctitle("Event Handler") self.do_recycle_proc = Recycle(terminator=self.terminator, recycle_period=self.recycle_period) self.do_recycle_proc.start() self.start_listen()
[docs] def terminate(self): self.kill_received = True self.do_recycle_proc.terminate() self.do_recycle_proc.join()
[docs] def start_listen(self): """ Listens redis pubsub and perform requested actions. Message payload is packed in json, it should be a dictionary at the root level with reserved field `topic` which is required for message routing :type vmm: VmManager """ channel = self.vmm.rc.pubsub(ignore_subscribe_messages=True) channel.subscribe(PUBSUB_MB) # TODO: check subscribe success self.log.info("Spawned pubsub handler") for raw in channel.listen(): if self.kill_received: break if raw is None: continue else: if "type" not in raw or raw["type"] != "message" or "data" not in raw: continue try: msg = json.loads(raw["data"]) if "topic" not in msg: raise Exception("Handler received msg without `topic` field, msg: {}".format(msg)) topic = msg["topic"] if topic not in self.handlers_map: raise Exception("Handler received msg with unknown `topic` field, msg: {}".format(msg)) self.handlers_map[topic](msg) except Exception as err: self.log.exception("Handler error: raw msg: {}, {}".format(raw, err))