From 55f001f2dfcebdabdf82502b91b4c46a77e34f62 Mon Sep 17 00:00:00 2001 From: zhuofeng Date: Fri, 24 Jan 2025 11:56:41 +0800 Subject: [PATCH] add avg_block_io and ai_block_io --- config/plugins/ai_block_io.ini | 39 + config/plugins/avg_block_io.ini | 40 + config/tasks/ai_block_io.mod | 7 + config/tasks/avg_block_io.mod | 7 + selftest/test/test_ai_block_io.py | 165 ++++ src/python/sentryPlugins/__init__.py | 0 .../sentryPlugins/ai_block_io/README.md | 1 + .../sentryPlugins/ai_block_io/__init__.py | 0 .../sentryPlugins/ai_block_io/ai_block_io.py | 239 ++++++ .../sentryPlugins/ai_block_io/alarm_report.py | 80 ++ .../ai_block_io/config_parser.py | 742 ++++++++++++++++++ .../sentryPlugins/ai_block_io/data_access.py | 127 +++ .../sentryPlugins/ai_block_io/detector.py | 156 ++++ .../sentryPlugins/ai_block_io/io_data.py | 54 ++ .../ai_block_io/sliding_window.py | 129 +++ .../sentryPlugins/ai_block_io/threshold.py | 178 +++++ src/python/sentryPlugins/ai_block_io/utils.py | 73 ++ .../sentryPlugins/avg_block_io/__init__.py | 0 .../avg_block_io/avg_block_io.py | 189 +++++ .../sentryPlugins/avg_block_io/config.py | 208 +++++ .../sentryPlugins/avg_block_io/module_conn.py | 145 ++++ .../avg_block_io/stage_window.py | 55 ++ .../sentryPlugins/avg_block_io/utils.py | 140 ++++ 24 files changed, 2778 insertions(+), 1 deletion(-) create mode 100644 config/plugins/ai_block_io.ini create mode 100644 config/plugins/avg_block_io.ini create mode 100644 config/tasks/ai_block_io.mod create mode 100644 config/tasks/avg_block_io.mod create mode 100644 selftest/test/test_ai_block_io.py create mode 100644 src/python/sentryPlugins/__init__.py create mode 100644 src/python/sentryPlugins/ai_block_io/README.md create mode 100644 src/python/sentryPlugins/ai_block_io/__init__.py create mode 100644 src/python/sentryPlugins/ai_block_io/ai_block_io.py create mode 100644 src/python/sentryPlugins/ai_block_io/alarm_report.py create mode 100644 src/python/sentryPlugins/ai_block_io/config_parser.py create mode 100644 src/python/sentryPlugins/ai_block_io/data_access.py create mode 100644 src/python/sentryPlugins/ai_block_io/detector.py create mode 100644 src/python/sentryPlugins/ai_block_io/io_data.py create mode 100644 src/python/sentryPlugins/ai_block_io/sliding_window.py create mode 100644 src/python/sentryPlugins/ai_block_io/threshold.py create mode 100644 src/python/sentryPlugins/ai_block_io/utils.py create mode 100644 src/python/sentryPlugins/avg_block_io/__init__.py create mode 100644 src/python/sentryPlugins/avg_block_io/avg_block_io.py create mode 100644 src/python/sentryPlugins/avg_block_io/config.py create mode 100644 src/python/sentryPlugins/avg_block_io/module_conn.py create mode 100644 src/python/sentryPlugins/avg_block_io/stage_window.py create mode 100644 src/python/sentryPlugins/avg_block_io/utils.py diff --git a/config/plugins/ai_block_io.ini b/config/plugins/ai_block_io.ini new file mode 100644 index 0000000..69f44ba --- /dev/null +++ b/config/plugins/ai_block_io.ini @@ -0,0 +1,39 @@ +[log] +level=info + +[common] +period_time=1 +disk=default +stage=default +iotype=read,write + +[algorithm] +train_data_duration=24 +train_update_duration=2 +algorithm_type=boxplot +boxplot_parameter=1.5 +win_type=not_continuous +win_size=30 +win_threshold=6 + +[latency_sata_ssd] +read_avg_lim=10000 +write_avg_lim=10000 +read_tot_lim=50000 +write_tot_lim=50000 + +[latency_nvme_ssd] +read_avg_lim=10000 +write_avg_lim=10000 +read_tot_lim=50000 +write_tot_lim=50000 + +[latency_sata_hdd] +read_avg_lim=15000 +write_avg_lim=15000 +read_tot_lim=50000 +write_tot_lim=50000 + +[iodump] +read_iodump_lim=0 +write_iodump_lim=0 \ No newline at end of file diff --git a/config/plugins/avg_block_io.ini b/config/plugins/avg_block_io.ini new file mode 100644 index 0000000..3b4ee33 --- /dev/null +++ b/config/plugins/avg_block_io.ini @@ -0,0 +1,40 @@ +[log] +level=info + +[common] +disk=default +stage=default +iotype=read,write +period_time=1 + +[algorithm] +win_size=30 +win_threshold=6 + +[latency_nvme_ssd] +read_avg_lim=10000 +write_avg_lim=10000 +read_avg_time=3 +write_avg_time=3 +read_tot_lim=50000 +write_tot_lim=50000 + +[latency_sata_ssd] +read_avg_lim=10000 +write_avg_lim=10000 +read_avg_time=3 +write_avg_time=3 +read_tot_lim=50000 +write_tot_lim=50000 + +[latency_sata_hdd] +read_avg_lim=15000 +write_avg_lim=15000 +read_avg_time=3 +write_avg_time=3 +read_tot_lim=50000 +write_tot_lim=50000 + +[iodump] +read_iodump_lim=0 +write_iodump_lim=0 diff --git a/config/tasks/ai_block_io.mod b/config/tasks/ai_block_io.mod new file mode 100644 index 0000000..82f4f0b --- /dev/null +++ b/config/tasks/ai_block_io.mod @@ -0,0 +1,7 @@ +[common] +enabled=yes +task_start=/usr/bin/python3 /usr/bin/ai_block_io +task_stop=pkill -f /usr/bin/ai_block_io +type=oneshot +alarm_id=1002 +alarm_clear_time=5 \ No newline at end of file diff --git a/config/tasks/avg_block_io.mod b/config/tasks/avg_block_io.mod new file mode 100644 index 0000000..bcd063b --- /dev/null +++ b/config/tasks/avg_block_io.mod @@ -0,0 +1,7 @@ +[common] +enabled=yes +task_start=/usr/bin/python3 /usr/bin/avg_block_io +task_stop=pkill -f /usr/bin/avg_block_io +type=oneshot +alarm_id=1002 +alarm_clear_time=5 diff --git a/selftest/test/test_ai_block_io.py b/selftest/test/test_ai_block_io.py new file mode 100644 index 0000000..c762c82 --- /dev/null +++ b/selftest/test/test_ai_block_io.py @@ -0,0 +1,165 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. + +import unittest +import numpy as np + +from sentryPlugins.ai_block_io.threshold import AbsoluteThreshold, BoxplotThreshold, NSigmaThreshold +from sentryPlugins.ai_block_io.sliding_window import (NotContinuousSlidingWindow, + ContinuousSlidingWindow, MedianSlidingWindow) + + +def _get_boxplot_threshold(data_list: list, parameter): + q1 = np.percentile(data_list, 25) + q3 = np.percentile(data_list, 75) + iqr = q3 - q1 + return q3 + parameter * iqr + + +def _get_n_sigma_threshold(data_list: list, parameter): + mean = np.mean(data_list) + std = np.std(data_list) + return mean + parameter * std + + +class Test(unittest.TestCase): + @classmethod + def setUpClass(cls): + print("UnitTest Begin...") + + @classmethod + def tearDownClass(cls): + print("UnitTest End...") + + def setUp(self): + print("Begin...") + + def tearDown(self): + print("End...") + + def test_absolute_threshold(self): + absolute = AbsoluteThreshold() + self.assertEqual(None, absolute.get_threshold()) + self.assertFalse(absolute.is_abnormal(5000)) + absolute.set_threshold(40) + self.assertEqual(40, absolute.get_threshold()) + self.assertTrue(absolute.is_abnormal(50)) + + def test_boxplot_threshold(self): + boxplot = BoxplotThreshold(1.5, 5, 1) + # 阶段1:尚未初始化 + self.assertEqual(None, boxplot.get_threshold()) + self.assertFalse(boxplot.is_abnormal(5000)) + # 往boxplot中插入5个元素后,会生成阈值 + data_list = [20, 20, 20, 30, 10] + for data in data_list: + boxplot.push_latest_data_to_queue(data) + # 阶段2:初始化 + boxplot_threshold = boxplot.get_threshold() + self.assertEqual(_get_boxplot_threshold(data_list, 1.5), boxplot_threshold) + self.assertTrue(boxplot.is_abnormal(5000)) + data_list.pop(0) + data_list.append(100) + boxplot.push_latest_data_to_queue(100) + # 阶段3:更新阈值 + boxplot_threshold = boxplot.get_threshold() + self.assertEqual(_get_boxplot_threshold(data_list, 1.5), boxplot_threshold) + + def test_n_sigma_threshold(self): + n_sigma = NSigmaThreshold(3, 5, 1) + self.assertEqual(None, n_sigma.get_threshold()) + self.assertFalse(n_sigma.is_abnormal(5000)) + data_list = [20, 20, 20, 30, 10] + for data in data_list: + n_sigma.push_latest_data_to_queue(data) + n_sigma_threshold = n_sigma.get_threshold() + self.assertEqual(_get_n_sigma_threshold(data_list, 3), n_sigma_threshold) + self.assertTrue(n_sigma.is_abnormal(5000)) + data_list.pop(0) + data_list.append(100) + n_sigma.push_latest_data_to_queue(100) + # 阶段3:更新阈值 + n_sigma_threshold = n_sigma.get_threshold() + self.assertEqual(_get_n_sigma_threshold(data_list, 3), n_sigma_threshold) + + def test_not_continuous_sliding_window(self): + not_continuous = NotContinuousSlidingWindow(5, 3) + boxplot_threshold = BoxplotThreshold(1.5, 10, 8) + boxplot_threshold.attach_observer(not_continuous) + data_list1 = [19, 20, 20, 20, 20, 20, 22, 24, 23, 20] + for data in data_list1: + boxplot_threshold.push_latest_data_to_queue(data) + result = not_continuous.is_slow_io_event(data) + self.assertFalse(result[0][0]) + self.assertEqual(23.75, boxplot_threshold.get_threshold()) + boxplot_threshold.push_latest_data_to_queue(24) + result = not_continuous.is_slow_io_event(24) + self.assertFalse(result[0][0]) + boxplot_threshold.push_latest_data_to_queue(25) + result = not_continuous.is_slow_io_event(25) + self.assertTrue(result[0]) + data_list2 = [20, 20, 20, 20, 20, 20] + for data in data_list2: + boxplot_threshold.push_latest_data_to_queue(data) + result = not_continuous.is_slow_io_event(data) + self.assertFalse(result[0][0]) + self.assertEqual(25.625, boxplot_threshold.get_threshold()) + + def test_continuous_sliding_window(self): + continuous = ContinuousSlidingWindow(5, 3) + boxplot_threshold = BoxplotThreshold(1.5, 10, 8) + boxplot_threshold.attach_observer(continuous) + data_list = [19, 20, 20, 20, 20, 20, 22, 24, 23, 20] + for data in data_list: + boxplot_threshold.push_latest_data_to_queue(data) + result = continuous.is_slow_io_event(data) + self.assertFalse(result[0][0]) + self.assertEqual(23.75, boxplot_threshold.get_threshold()) + # 没有三个异常点 + self.assertFalse(continuous.is_slow_io_event(25)[0][0]) + # 不连续的三个异常点 + self.assertFalse(continuous.is_slow_io_event(25)[0][0]) + # 连续的三个异常点 + self.assertTrue(continuous.is_slow_io_event(25)[0][0]) + + def test_median_sliding_window(self): + median = MedianSlidingWindow(5, 3) + absolute_threshold = AbsoluteThreshold(10, 8) + absolute_threshold.attach_observer(median) + absolute_threshold.set_threshold(24.5) + data_list = [24, 24, 24, 25, 25] + for data in data_list: + self.assertFalse(median.is_slow_io_event(data)[0][0]) + self.assertTrue(median.is_slow_io_event(25)[0]) + + def test_parse_collect_data(self): + collect = { + "read": [1.0, 2.0, 3.0, 4.0], + "write": [5.0, 6.0, 7.0, 8.0], + "flush": [9.0, 10.0, 11.0, 12.0], + "discard": [13.0, 14.0, 15.0, 16.0], + } + from sentryPlugins.ai_block_io.io_data import BaseData + from sentryPlugins.ai_block_io.data_access import _get_io_stage_data + + io_data = _get_io_stage_data(collect) + self.assertEqual( + io_data.read, BaseData(latency=1.0, io_dump=2.0, io_length=3.0, iops=4.0) + ) + self.assertEqual( + io_data.write, BaseData(latency=5.0, io_dump=6.0, io_length=7.0, iops=8.0) + ) + self.assertEqual( + io_data.flush, BaseData(latency=9.0, io_dump=10.0, io_length=11.0, iops=12.0) + ) + self.assertEqual( + io_data.discard, BaseData(latency=13.0, io_dump=14.0, io_length=15.0, iops=16.0) + ) diff --git a/src/python/sentryPlugins/__init__.py b/src/python/sentryPlugins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/python/sentryPlugins/ai_block_io/README.md b/src/python/sentryPlugins/ai_block_io/README.md new file mode 100644 index 0000000..95c1111 --- /dev/null +++ b/src/python/sentryPlugins/ai_block_io/README.md @@ -0,0 +1 @@ +# slow_io_detection diff --git a/src/python/sentryPlugins/ai_block_io/__init__.py b/src/python/sentryPlugins/ai_block_io/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/python/sentryPlugins/ai_block_io/ai_block_io.py b/src/python/sentryPlugins/ai_block_io/ai_block_io.py new file mode 100644 index 0000000..8075f5f --- /dev/null +++ b/src/python/sentryPlugins/ai_block_io/ai_block_io.py @@ -0,0 +1,239 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. + +import time +import signal +import logging +from collections import defaultdict + +from .detector import Detector, DiskDetector +from .threshold import ThresholdFactory, ThresholdType +from .sliding_window import SlidingWindowFactory +from .utils import get_data_queue_size_and_update_size +from .config_parser import ConfigParser +from .data_access import ( + get_io_data_from_collect_plug, + check_collect_valid, + get_disk_type, + check_disk_is_available +) +from .io_data import MetricName +from .alarm_report import Xalarm, Report + +CONFIG_FILE = "/etc/sysSentry/plugins/ai_block_io.ini" + + +def sig_handler(signum, frame): + Report.report_pass(f"receive signal: {signum}, exiting...") + logging.info("Finished ai_block_io plugin running.") + exit(signum) + + +class SlowIODetection: + _config_parser = None + _disk_list = [] + _detector_name_list = defaultdict(list) + _disk_detectors = {} + + def __init__(self, config_parser: ConfigParser): + self._config_parser = config_parser + self.__init_detector_name_list() + self.__init_detector() + + def __init_detector_name_list(self): + disks: list = self._config_parser.disks_to_detection + stages: list = self._config_parser.stage + iotypes: list = self._config_parser.iotype + + if disks is None: + logging.warning("you not specify any disk or use default, so ai_block_io will enable all available disk.") + all_available_disk_list = check_collect_valid(self._config_parser.period_time) + if all_available_disk_list is None: + Report.report_pass("get available disk error, please check if the collector plug is enable. exiting...") + logging.critical("get available disk error, please check if the collector plug is enable. exiting...") + exit(1) + if len(all_available_disk_list) == 0: + Report.report_pass("not found available disk. exiting...") + logging.critical("not found available disk. exiting...") + exit(1) + disks = all_available_disk_list + logging.info(f"available disk list is follow: {disks}.") + + for disk in disks: + tmp_disk = [disk] + ret = check_disk_is_available(self._config_parser.period_time, tmp_disk) + if not ret: + logging.warning(f"disk: {disk} is not available, it will be ignored.") + continue + + disk_type_result = get_disk_type(disk) + if disk_type_result["ret"] == 0 and disk_type_result["message"] in ( + '0', + '1', + '2', + ): + disk_type = int(disk_type_result["message"]) + else: + logging.warning( + "%s get disk type error, return %s, so it will be ignored.", + disk, + disk_type_result, + ) + continue + self._disk_list.append(disk) + for stage in stages: + for iotype in iotypes: + self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "latency")) + self._detector_name_list[disk].append(MetricName(disk, disk_type, stage, iotype, "io_dump")) + + if not self._detector_name_list: + Report.report_pass("the disks to detection is empty, ai_block_io will exit.") + logging.critical("the disks to detection is empty, ai_block_io will exit.") + exit(1) + + def __init_detector(self): + train_data_duration, train_update_duration = ( + self._config_parser.get_train_data_duration_and_train_update_duration() + ) + slow_io_detection_frequency = self._config_parser.period_time + threshold_type = self._config_parser.algorithm_type + data_queue_size, update_size = get_data_queue_size_and_update_size( + train_data_duration, train_update_duration, slow_io_detection_frequency + ) + sliding_window_type = self._config_parser.sliding_window_type + window_size, window_threshold = ( + self._config_parser.get_window_size_and_window_minimum_threshold() + ) + + for disk, metric_name_list in self._detector_name_list.items(): + disk_detector = DiskDetector(disk) + for metric_name in metric_name_list: + + if metric_name.metric_name == 'latency': + threshold = ThresholdFactory().get_threshold( + threshold_type, + boxplot_parameter=self._config_parser.boxplot_parameter, + n_sigma_paramter=self._config_parser.n_sigma_parameter, + data_queue_size=data_queue_size, + data_queue_update_size=update_size, + ) + tot_lim = self._config_parser.get_tot_lim( + metric_name.disk_type, metric_name.io_access_type_name + ) + avg_lim = self._config_parser.get_avg_lim( + metric_name.disk_type, metric_name.io_access_type_name + ) + if tot_lim is None: + logging.warning( + "disk %s, disk type %s, io type %s, get tot lim error, so it will be ignored.", + disk, + metric_name.disk_type, + metric_name.io_access_type_name, + ) + sliding_window = SlidingWindowFactory().get_sliding_window( + sliding_window_type, + queue_length=window_size, + threshold=window_threshold, + abs_threshold=tot_lim, + avg_lim=avg_lim + ) + detector = Detector(metric_name, threshold, sliding_window) + disk_detector.add_detector(detector) + continue + + elif metric_name.metric_name == 'io_dump': + threshold = ThresholdFactory().get_threshold(ThresholdType.AbsoluteThreshold) + abs_threshold = None + if metric_name.io_access_type_name == 'read': + abs_threshold = self._config_parser.read_iodump_lim + elif metric_name.io_access_type_name == 'write': + abs_threshold = self._config_parser.write_iodump_lim + sliding_window = SlidingWindowFactory().get_sliding_window( + sliding_window_type, + queue_length=window_size, + threshold=window_threshold + ) + detector = Detector(metric_name, threshold, sliding_window) + threshold.set_threshold(abs_threshold) + disk_detector.add_detector(detector) + + logging.info(f"disk: [{disk}] add detector:\n [{disk_detector}]") + self._disk_detectors[disk] = disk_detector + + def launch(self): + while True: + logging.debug("step0. AI threshold slow io event detection is looping.") + + # Step1:获取IO数据 + io_data_dict_with_disk_name = get_io_data_from_collect_plug( + self._config_parser.period_time, self._disk_list + ) + logging.debug(f"step1. Get io data: {str(io_data_dict_with_disk_name)}") + if io_data_dict_with_disk_name is None: + Report.report_pass( + "get io data error, please check if the collector plug is enable. exitting..." + ) + exit(1) + + # Step2:慢IO检测 + logging.debug("step2. Start to detection slow io event.") + slow_io_event_list = [] + for disk, disk_detector in self._disk_detectors.items(): + result = disk_detector.is_slow_io_event(io_data_dict_with_disk_name) + if result[0]: + slow_io_event_list.append(result) + logging.debug("step2. End to detection slow io event.") + + # Step3:慢IO事件上报 + logging.debug("step3. Report slow io event to sysSentry.") + for slow_io_event in slow_io_event_list: + alarm_content = { + "alarm_source": "ai_block_io", + "driver_name": slow_io_event[1], + "io_type": slow_io_event[4], + "reason": slow_io_event[2], + "block_stack": slow_io_event[3], + "alarm_type": slow_io_event[5], + "details": slow_io_event[6] + } + Xalarm.major(alarm_content) + tmp_alarm_content = alarm_content.copy() + del tmp_alarm_content["details"] + logging.warning("[SLOW IO] " + str(tmp_alarm_content)) + logging.warning(f'[SLOW IO] disk: {str(tmp_alarm_content.get("driver_name"))}, ' + f'stage: {str(tmp_alarm_content.get("driver_name"))}, ' + f'iotype: {str(tmp_alarm_content.get("io_type"))}, ' + f'type: {str(tmp_alarm_content.get("alarm_type"))}, ' + f'reason: {str(tmp_alarm_content.get("reason"))}') + logging.warning(f"latency: " + str(alarm_content.get("details").get("latency"))) + logging.warning(f"iodump: " + str(alarm_content.get("details").get("iodump"))) + + # Step4:等待检测时间 + logging.debug("step4. Wait to start next slow io event detection loop.") + time.sleep(self._config_parser.period_time) + + +def main(): + # Step1:注册消息处理函数 + signal.signal(signal.SIGINT, sig_handler) + signal.signal(signal.SIGTERM, sig_handler) + + # Step2:断点恢复 + # todo: + + # Step3:读取配置 + config_file_name = CONFIG_FILE + config = ConfigParser(config_file_name) + config.read_config_from_file() + + # Step4:启动慢IO检测 + slow_io_detection = SlowIODetection(config) + slow_io_detection.launch() diff --git a/src/python/sentryPlugins/ai_block_io/alarm_report.py b/src/python/sentryPlugins/ai_block_io/alarm_report.py new file mode 100644 index 0000000..61bb145 --- /dev/null +++ b/src/python/sentryPlugins/ai_block_io/alarm_report.py @@ -0,0 +1,80 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. + +import logging +import json + +from xalarm.sentry_notify import ( + xalarm_report, + MINOR_ALM, + MAJOR_ALM, + CRITICAL_ALM, + ALARM_TYPE_OCCUR, + ALARM_TYPE_RECOVER, +) + +from syssentry.result import ResultLevel, report_result + + +class Report: + TASK_NAME = "ai_block_io" + + @staticmethod + def report_pass(info: str): + report_result(Report.TASK_NAME, ResultLevel.PASS, json.dumps({"msg": info})) + logging.debug(f'Report {Report.TASK_NAME} PASS: {info}') + + @staticmethod + def report_fail(info: str): + report_result(Report.TASK_NAME, ResultLevel.FAIL, json.dumps({"msg": info})) + logging.debug(f'Report {Report.TASK_NAME} FAIL: {info}') + + @staticmethod + def report_skip(info: str): + report_result(Report.TASK_NAME, ResultLevel.SKIP, json.dumps({"msg": info})) + logging.debug(f'Report {Report.TASK_NAME} SKIP: {info}') + + +class Xalarm: + ALARM_ID = 1002 + + @staticmethod + def minor(info: dict): + info_str = json.dumps(info) + xalarm_report(Xalarm.ALARM_ID, MINOR_ALM, ALARM_TYPE_OCCUR, info_str) + logging.debug(f"Report {Xalarm.ALARM_ID} MINOR_ALM: {info_str}") + + @staticmethod + def major(info: dict): + info_str = json.dumps(info) + xalarm_report(Xalarm.ALARM_ID, MAJOR_ALM, ALARM_TYPE_OCCUR, info_str) + logging.debug(f"Report {Xalarm.ALARM_ID} MAJOR_ALM: {info_str}") + + @staticmethod + def critical(info: dict): + info_str = json.dumps(info) + xalarm_report(Xalarm.ALARM_ID, CRITICAL_ALM, ALARM_TYPE_OCCUR, info_str) + logging.debug(f"Report {Xalarm.ALARM_ID} CRITICAL_ALM: {info_str}") + + def minor_recover(info: dict): + info_str = json.dumps(info) + xalarm_report(Xalarm.ALARM_ID, MINOR_ALM, ALARM_TYPE_RECOVER, info_str) + logging.debug(f"Report {Xalarm.ALARM_ID} MINOR_ALM Recover: {info_str}") + + def major_recover(info: dict): + info_str = json.dumps(info) + xalarm_report(Xalarm.ALARM_ID, MAJOR_ALM, ALARM_TYPE_RECOVER, info_str) + logging.debug(f"Report {Xalarm.ALARM_ID} MAJOR_ALM Recover: {info_str}") + + def critical_recover(info: dict): + info_str = json.dumps(info) + xalarm_report(Xalarm.ALARM_ID, CRITICAL_ALM, ALARM_TYPE_RECOVER, info_str) + logging.debug(f"Report {Xalarm.ALARM_ID} CRITICAL_ALM Recover: {info_str}") diff --git a/src/python/sentryPlugins/ai_block_io/config_parser.py b/src/python/sentryPlugins/ai_block_io/config_parser.py new file mode 100644 index 0000000..1bbb609 --- /dev/null +++ b/src/python/sentryPlugins/ai_block_io/config_parser.py @@ -0,0 +1,742 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. + +import os +import configparser +import logging + +from .alarm_report import Report +from .threshold import ThresholdType +from .utils import get_threshold_type_enum, get_sliding_window_type_enum, get_log_level +from .data_access import check_detect_frequency_is_valid + + +LOG_FORMAT = "%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s" + +ALL_STAGE_LIST = [ + "throtl", + "wbt", + "gettag", + "plug", + "deadline", + "hctx", + "requeue", + "rq_driver", + "bio", +] +ALL_IOTPYE_LIST = ["read", "write"] +DISK_TYPE_MAP = { + 0: "nvme_ssd", + 1: "sata_ssd", + 2: "sata_hdd", +} + + +def init_log_format(log_level: str): + logging.basicConfig(level=get_log_level(log_level.lower()), format=LOG_FORMAT) + if log_level.lower() not in ("info", "warning", "error", "debug"): + logging.warning( + "the log_level: %s you set is invalid, use default value: info.", log_level + ) + + +class ConfigParser: + DEFAULT_CONF = { + "log": {"level": "info"}, + "common": { + "period_time": 1, + "disk": None, + "stage": "throtl,wbt,gettag,plug,deadline,hctx,requeue,rq_driver,bio", + "iotype": "read,write", + }, + "algorithm": { + "train_data_duration": 24.0, + "train_update_duration": 2.0, + "algorithm_type": get_threshold_type_enum("boxplot"), + "boxplot_parameter": 1.5, + "n_sigma_parameter": 3.0, + "win_type": get_sliding_window_type_enum("not_continuous"), + "win_size": 30, + "win_threshold": 6, + }, + "latency_sata_ssd": { + "read_avg_lim": 10000, + "write_avg_lim": 10000, + "read_tot_lim": 50000, + "write_tot_lim": 50000 + }, + "latency_nvme_ssd": { + "read_avg_lim": 10000, + "write_avg_lim": 10000, + "read_tot_lim": 50000, + "write_tot_lim": 50000 + }, + "latency_sata_hdd": { + "read_avg_lim": 15000, + "write_avg_lim": 15000, + "read_tot_lim": 50000, + "write_tot_lim": 50000 + }, + "iodump": { + "read_iodump_lim": 0, + "write_iodump_lim": 0 + } + } + + def __init__(self, config_file_name): + self._conf = ConfigParser.DEFAULT_CONF + self._config_file_name = config_file_name + + def _get_config_value( + self, + config_items: dict, + key: str, + value_type, + default_value=None, + gt=None, + ge=None, + lt=None, + le=None, + section=None + ): + if section is not None: + print_key = section + "." + key + else: + print_key = key + value = config_items.get(key) + if value is None: + logging.warning( + "config of %s not found, the default value %s will be used.", + print_key, + default_value, + ) + value = default_value + if not value: + logging.critical( + "the value of %s is empty, ai_block_io plug will exit.", print_key + ) + Report.report_pass( + f"the value of {print_key} is empty, ai_block_io plug will exit." + ) + exit(1) + try: + value = value_type(value) + except ValueError: + logging.critical( + "the value of %s is not a valid %s, ai_block_io plug will exit.", + print_key, + value_type, + ) + Report.report_pass( + f"the value of {print_key} is not a valid {value_type}, ai_block_io plug will exit." + ) + exit(1) + if gt is not None and value <= gt: + logging.critical( + "the value of %s is not greater than %s, ai_block_io plug will exit.", + print_key, + gt, + ) + Report.report_pass( + f"the value of {print_key} is not greater than {gt}, ai_block_io plug will exit." + ) + exit(1) + if ge is not None and value < ge: + logging.critical( + "the value of %s is not greater than or equal to %s, ai_block_io plug will exit.", + print_key, + ge, + ) + Report.report_pass( + f"the value of {print_key} is not greater than or equal to {ge}, ai_block_io plug will exit." + ) + exit(1) + if lt is not None and value >= lt: + logging.critical( + "the value of %s is not less than %s, ai_block_io plug will exit.", + print_key, + lt, + ) + Report.report_pass( + f"the value of {print_key} is not less than {lt}, ai_block_io plug will exit." + ) + exit(1) + if le is not None and value > le: + logging.critical( + "the value of %s is not less than or equal to %s, ai_block_io plug will exit.", + print_key, + le, + ) + Report.report_pass( + f"the value of {print_key} is not less than or equal to {le}, ai_block_io plug will exit." + ) + exit(1) + + return value + + def _read_period_time(self, items_common: dict): + self._conf["common"]["period_time"] = self._get_config_value( + items_common, + "period_time", + int, + self.DEFAULT_CONF["common"]["period_time"], + gt=0 + ) + frequency = self._conf["common"]["period_time"] + ret = check_detect_frequency_is_valid(frequency) + if ret is None: + log = f"period_time: {frequency} is invalid, "\ + f"Check whether the value range is too large or is not an "\ + f"integer multiple of period_time.. exiting..." + Report.report_pass(log) + logging.critical(log) + exit(1) + + def _read_disks_to_detect(self, items_common: dict): + disks_to_detection = items_common.get("disk") + if disks_to_detection is None: + logging.warning("config of disk not found, the default value will be used.") + self._conf["common"]["disk"] = None + return + disks_to_detection = disks_to_detection.strip() + disks_to_detection = disks_to_detection.lower() + if not disks_to_detection: + logging.critical("the value of disk is empty, ai_block_io plug will exit.") + Report.report_pass( + "the value of disk is empty, ai_block_io plug will exit." + ) + exit(1) + disk_list = disks_to_detection.split(",") + disk_list = [disk.strip() for disk in disk_list] + if len(disk_list) == 1 and disk_list[0] == "default": + self._conf["common"]["disk"] = None + return + if len(disk_list) > 10: + ten_disk_list = disk_list[0:10] + other_disk_list = disk_list[10:] + logging.warning(f"disk only support maximum is 10, disks: {ten_disk_list} will be retained, other: {other_disk_list} will be ignored.") + else: + ten_disk_list = disk_list + set_ten_disk_list = set(ten_disk_list) + if len(ten_disk_list) > len(set_ten_disk_list): + tmp = ten_disk_list + ten_disk_list = list(set_ten_disk_list) + logging.warning(f"disk exist duplicate, it will be deduplicate, before: {tmp}, after: {ten_disk_list}") + self._conf["common"]["disk"] = ten_disk_list + + def _read_train_data_duration(self, items_algorithm: dict): + self._conf["algorithm"]["train_data_duration"] = self._get_config_value( + items_algorithm, + "train_data_duration", + float, + self.DEFAULT_CONF["algorithm"]["train_data_duration"], + gt=0, + le=720, + ) + + def _read_train_update_duration(self, items_algorithm: dict): + default_train_update_duration = self.DEFAULT_CONF["algorithm"][ + "train_update_duration" + ] + if default_train_update_duration > self._conf["algorithm"]["train_data_duration"]: + default_train_update_duration = ( + self._conf["algorithm"]["train_data_duration"] / 2 + ) + self._conf["algorithm"]["train_update_duration"] = self._get_config_value( + items_algorithm, + "train_update_duration", + float, + default_train_update_duration, + gt=0, + le=self._conf["algorithm"]["train_data_duration"], + ) + + def _read_algorithm_type_and_parameter(self, items_algorithm: dict): + algorithm_type = items_algorithm.get("algorithm_type") + if algorithm_type is None: + default_algorithm_type = self._conf["algorithm"]["algorithm_type"] + logging.warning(f"algorithm_type not found, it will be set default: {default_algorithm_type}") + else: + self._conf["algorithm"]["algorithm_type"] = get_threshold_type_enum(algorithm_type) + + if self._conf["algorithm"]["algorithm_type"] is None: + logging.critical( + "the algorithm_type: %s you set is invalid. ai_block_io plug will exit.", + algorithm_type, + ) + Report.report_pass( + f"the algorithm_type: {algorithm_type} you set is invalid. ai_block_io plug will exit." + ) + exit(1) + + elif self._conf["algorithm"]["algorithm_type"] == ThresholdType.NSigmaThreshold: + self._conf["algorithm"]["n_sigma_parameter"] = self._get_config_value( + items_algorithm, + "n_sigma_parameter", + float, + self.DEFAULT_CONF["algorithm"]["n_sigma_parameter"], + gt=0, + le=10, + ) + elif ( + self._conf["algorithm"]["algorithm_type"] == ThresholdType.BoxplotThreshold + ): + self._conf["algorithm"]["boxplot_parameter"] = self._get_config_value( + items_algorithm, + "boxplot_parameter", + float, + self.DEFAULT_CONF["algorithm"]["boxplot_parameter"], + gt=0, + le=10, + ) + + def _read_stage(self, items_algorithm: dict): + stage_str = items_algorithm.get("stage") + if stage_str is None: + stage_str = self.DEFAULT_CONF["common"]["stage"] + logging.warning(f"stage not found, it will be set default: {stage_str}") + else: + stage_str = stage_str.strip() + + stage_str = stage_str.lower() + stage_list = stage_str.split(",") + stage_list = [stage.strip() for stage in stage_list] + if len(stage_list) == 1 and stage_list[0] == "": + logging.critical("stage value not allow is empty, exiting...") + exit(1) + if len(stage_list) == 1 and stage_list[0] == "default": + logging.warning( + "stage will enable default value: %s", + self.DEFAULT_CONF["common"]["stage"], + ) + self._conf["common"]["stage"] = ALL_STAGE_LIST + return + for stage in stage_list: + if stage not in ALL_STAGE_LIST: + logging.critical( + "stage: %s is not valid stage, ai_block_io will exit...", stage + ) + exit(1) + dup_stage_list = set(stage_list) + if "bio" not in dup_stage_list: + logging.critical("stage must contains bio stage, exiting...") + exit(1) + self._conf["common"]["stage"] = dup_stage_list + + def _read_iotype(self, items_algorithm: dict): + iotype_str = items_algorithm.get("iotype") + if iotype_str is None: + iotype_str = self.DEFAULT_CONF["common"]["iotype"] + logging.warning(f"iotype not found, it will be set default: {iotype_str}") + else: + iotype_str = iotype_str.strip() + + iotype_str = iotype_str.lower() + iotype_list = iotype_str.split(",") + iotype_list = [iotype.strip() for iotype in iotype_list] + if len(iotype_list) == 1 and iotype_list[0] == "": + logging.critical("iotype value not allow is empty, exiting...") + exit(1) + if len(iotype_list) == 1 and iotype_list[0] == "default": + logging.warning( + "iotype will enable default value: %s", + self.DEFAULT_CONF["common"]["iotype"], + ) + self._conf["common"]["iotype"] = ALL_IOTPYE_LIST + return + for iotype in iotype_list: + if iotype not in ALL_IOTPYE_LIST: + logging.critical( + "iotype: %s is not valid iotype, ai_block_io will exit...", iotype + ) + exit(1) + dup_iotype_list = set(iotype_list) + self._conf["common"]["iotype"] = dup_iotype_list + + def _read_sliding_window_type(self, items_sliding_window: dict): + sliding_window_type = items_sliding_window.get("win_type") + + if sliding_window_type is None: + default_sliding_window_type = self._conf["algorithm"]["win_type"] + logging.warning(f"win_type not found, it will be set default: {default_sliding_window_type}") + return + + sliding_window_type = sliding_window_type.strip() + if sliding_window_type is not None: + self._conf["algorithm"]["win_type"] = ( + get_sliding_window_type_enum(sliding_window_type) + ) + if self._conf["algorithm"]["win_type"] is None: + logging.critical( + "the win_type: %s you set is invalid. ai_block_io plug will exit.", + sliding_window_type, + ) + Report.report_pass( + f"the win_type: {sliding_window_type} you set is invalid. ai_block_io plug will exit." + ) + exit(1) + + def _read_window_size(self, items_sliding_window: dict): + self._conf["algorithm"]["win_size"] = self._get_config_value( + items_sliding_window, + "win_size", + int, + self.DEFAULT_CONF["algorithm"]["win_size"], + gt=0, + le=300, + ) + + def _read_window_minimum_threshold(self, items_sliding_window: dict): + default_window_minimum_threshold = self.DEFAULT_CONF["algorithm"]["win_threshold"] + self._conf["algorithm"]["win_threshold"] = ( + self._get_config_value( + items_sliding_window, + "win_threshold", + int, + default_window_minimum_threshold, + gt=0, + le=self._conf["algorithm"]["win_size"], + ) + ) + + def read_config_from_file(self): + if not os.path.exists(self._config_file_name): + init_log_format(self._conf["log"]["level"]) + logging.critical( + "config file %s not found, ai_block_io plug will exit.", + self._config_file_name, + ) + Report.report_pass( + f"config file {self._config_file_name} not found, ai_block_io plug will exit." + ) + exit(1) + + con = configparser.ConfigParser() + try: + con.read(self._config_file_name, encoding="utf-8") + except configparser.Error as e: + init_log_format(self._conf["log"]["level"]) + logging.critical( + "config file read error: %s, ai_block_io plug will exit.", e + ) + Report.report_pass( + f"config file read error: {e}, ai_block_io plug will exit." + ) + exit(1) + + if con.has_section("log"): + items_log = dict(con.items("log")) + # 情况一:没有log,则使用默认值 + # 情况二:有log,值为空或异常,使用默认值 + # 情况三:有log,值正常,则使用该值 + self._conf["log"]["level"] = items_log.get( + "level", self.DEFAULT_CONF["log"]["level"] + ) + init_log_format(self._conf["log"]["level"]) + else: + init_log_format(self._conf["log"]["level"]) + logging.warning( + "log section parameter not found, it will be set to default value." + ) + + if con.has_section("common"): + items_common = dict(con.items("common")) + + self._read_period_time(items_common) + self._read_disks_to_detect(items_common) + self._read_stage(items_common) + self._read_iotype(items_common) + else: + Report.report_pass("not found common section. exiting...") + logging.critical("not found common section. exiting...") + exit(1) + + if con.has_section("algorithm"): + items_algorithm = dict(con.items("algorithm")) + self._read_train_data_duration(items_algorithm) + self._read_train_update_duration(items_algorithm) + self._read_algorithm_type_and_parameter(items_algorithm) + self._read_sliding_window_type(items_algorithm) + self._read_window_size(items_algorithm) + self._read_window_minimum_threshold(items_algorithm) + + if con.has_section("latency_sata_ssd"): + items_latency_sata_ssd = dict(con.items("latency_sata_ssd")) + self._conf["latency_sata_ssd"]["read_tot_lim"] = self._get_config_value( + items_latency_sata_ssd, + "read_tot_lim", + int, + self.DEFAULT_CONF["latency_sata_ssd"]["read_tot_lim"], + gt=0, + section="latency_sata_ssd" + ) + self._conf["latency_sata_ssd"]["write_tot_lim"] = self._get_config_value( + items_latency_sata_ssd, + "write_tot_lim", + int, + self.DEFAULT_CONF["latency_sata_ssd"]["write_tot_lim"], + gt=0, + section="latency_sata_ssd" + ) + self._conf["latency_sata_ssd"]["read_avg_lim"] = self._get_config_value( + items_latency_sata_ssd, + "read_avg_lim", + int, + self.DEFAULT_CONF["latency_sata_ssd"]["read_avg_lim"], + gt=0, + section="latency_sata_ssd" + ) + self._conf["latency_sata_ssd"]["write_avg_lim"] = self._get_config_value( + items_latency_sata_ssd, + "write_avg_lim", + int, + self.DEFAULT_CONF["latency_sata_ssd"]["write_avg_lim"], + gt=0, + section="latency_sata_ssd" + ) + if self._conf["latency_sata_ssd"]["read_avg_lim"] >= self._conf["latency_sata_ssd"]["read_tot_lim"]: + Report.report_pass("latency_sata_ssd.read_avg_lim must < latency_sata_ssd.read_tot_lim . exiting...") + logging.critical("latency_sata_ssd.read_avg_lim must < latency_sata_ssd.read_tot_lim . exiting...") + exit(1) + if self._conf["latency_sata_ssd"]["write_avg_lim"] >= self._conf["latency_sata_ssd"]["write_tot_lim"]: + Report.report_pass("latency_sata_ssd.write_avg_lim must < latency_sata_ssd.write_tot_lim . exiting...") + logging.critical("latency_sata_ssd.read_avg_lim must < latency_sata_ssd.read_tot_lim . exiting...") + exit(1) + else: + Report.report_pass("not found latency_sata_ssd section. exiting...") + logging.critical("not found latency_sata_ssd section. exiting...") + exit(1) + + if con.has_section("latency_nvme_ssd"): + items_latency_nvme_ssd = dict(con.items("latency_nvme_ssd")) + self._conf["latency_nvme_ssd"]["read_tot_lim"] = self._get_config_value( + items_latency_nvme_ssd, + "read_tot_lim", + int, + self.DEFAULT_CONF["latency_nvme_ssd"]["read_tot_lim"], + gt=0, + section="latency_nvme_ssd" + ) + self._conf["latency_nvme_ssd"]["write_tot_lim"] = self._get_config_value( + items_latency_nvme_ssd, + "write_tot_lim", + int, + self.DEFAULT_CONF["latency_nvme_ssd"]["write_tot_lim"], + gt=0, + section="latency_nvme_ssd" + ) + self._conf["latency_nvme_ssd"]["read_avg_lim"] = self._get_config_value( + items_latency_nvme_ssd, + "read_avg_lim", + int, + self.DEFAULT_CONF["latency_nvme_ssd"]["read_avg_lim"], + gt=0, + section="latency_nvme_ssd" + ) + self._conf["latency_nvme_ssd"]["write_avg_lim"] = self._get_config_value( + items_latency_nvme_ssd, + "write_avg_lim", + int, + self.DEFAULT_CONF["latency_nvme_ssd"]["write_avg_lim"], + gt=0, + section="latency_nvme_ssd" + ) + if self._conf["latency_nvme_ssd"]["read_avg_lim"] >= self._conf["latency_nvme_ssd"]["read_tot_lim"]: + Report.report_pass("latency_nvme_ssd.read_avg_lim must < latency_nvme_ssd.read_tot_lim . exiting...") + logging.critical("latency_nvme_ssd.read_avg_lim must < latency_nvme_ssd.read_tot_lim . exiting...") + exit(1) + if self._conf["latency_nvme_ssd"]["write_avg_lim"] >= self._conf["latency_nvme_ssd"]["write_tot_lim"]: + Report.report_pass("latency_nvme_ssd.write_avg_lim must < latency_nvme_ssd.write_tot_lim . exiting...") + logging.critical("latency_nvme_ssd.write_avg_lim must < latency_nvme_ssd.write_tot_lim . exiting...") + exit(1) + else: + Report.report_pass("not found latency_nvme_ssd section. exiting...") + logging.critical("not found latency_nvme_ssd section. exiting...") + exit(1) + + if con.has_section("latency_sata_hdd"): + items_latency_sata_hdd = dict(con.items("latency_sata_hdd")) + self._conf["latency_sata_hdd"]["read_tot_lim"] = self._get_config_value( + items_latency_sata_hdd, + "read_tot_lim", + int, + self.DEFAULT_CONF["latency_sata_hdd"]["read_tot_lim"], + gt=0, + section="latency_sata_hdd" + ) + self._conf["latency_sata_hdd"]["write_tot_lim"] = self._get_config_value( + items_latency_sata_hdd, + "write_tot_lim", + int, + self.DEFAULT_CONF["latency_sata_hdd"]["write_tot_lim"], + gt=0, + section="latency_sata_hdd" + ) + self._conf["latency_sata_hdd"]["read_avg_lim"] = self._get_config_value( + items_latency_sata_hdd, + "read_avg_lim", + int, + self.DEFAULT_CONF["latency_sata_hdd"]["read_avg_lim"], + gt=0, + section="latency_sata_hdd" + ) + self._conf["latency_sata_hdd"]["write_avg_lim"] = self._get_config_value( + items_latency_sata_hdd, + "write_avg_lim", + int, + self.DEFAULT_CONF["latency_sata_hdd"]["write_avg_lim"], + gt=0, + section="latency_sata_hdd" + ) + if self._conf["latency_sata_hdd"]["read_avg_lim"] >= self._conf["latency_sata_hdd"]["read_tot_lim"]: + Report.report_pass("latency_sata_hdd.read_avg_lim must < latency_sata_hdd.read_tot_lim . exiting...") + logging.critical("latency_sata_hdd.read_avg_lim must < latency_sata_hdd.read_tot_lim . exiting...") + exit(1) + if self._conf["latency_sata_hdd"]["write_avg_lim"] >= self._conf["latency_sata_hdd"]["write_tot_lim"]: + Report.report_pass("latency_sata_hdd.write_avg_lim must < latency_sata_hdd.write_tot_lim . exiting...") + logging.critical("latency_sata_hdd.write_avg_lim must < latency_sata_hdd.write_tot_lim . exiting...") + exit(1) + else: + Report.report_pass("not found latency_sata_hdd section. exiting...") + logging.critical("not found latency_sata_hdd section. exiting...") + exit(1) + + if con.has_section("iodump"): + items_iodump = dict(con.items("iodump")) + self._conf["iodump"]["read_iodump_lim"] = self._get_config_value( + items_iodump, + "read_iodump_lim", + int, + self.DEFAULT_CONF["iodump"]["read_iodump_lim"], + ge=0 + ) + self._conf["iodump"]["write_iodump_lim"] = self._get_config_value( + items_iodump, + "write_iodump_lim", + int, + self.DEFAULT_CONF["iodump"]["write_iodump_lim"], + ge=0 + ) + else: + Report.report_pass("not found iodump section. exiting...") + logging.critical("not found iodump section. exiting...") + exit(1) + + self.__print_all_config_value() + + def __repr__(self) -> str: + return str(self._conf) + + def __str__(self) -> str: + return str(self._conf) + + def __print_all_config_value(self): + logging.info("all config is follow:\n %s", self) + + def get_tot_lim(self, disk_type, io_type): + if io_type == "read": + return self._conf.get( + f"latency_{DISK_TYPE_MAP.get(disk_type, '')}", {} + ).get("read_tot_lim", None) + elif io_type == "write": + return self._conf.get( + f"latency_{DISK_TYPE_MAP.get(disk_type, '')}", {} + ).get("write_tot_lim", None) + else: + return None + + def get_avg_lim(self, disk_type, io_type): + if io_type == "read": + return self._conf.get( + f"latency_{DISK_TYPE_MAP.get(disk_type, '')}", {} + ).get("read_avg_lim", None) + elif io_type == "write": + return self._conf.get( + f"latency_{DISK_TYPE_MAP.get(disk_type, '')}", {} + ).get("write_avg_lim", None) + else: + return None + + def get_train_data_duration_and_train_update_duration(self): + return ( + self._conf["algorithm"]["train_data_duration"], + self._conf["algorithm"]["train_update_duration"], + ) + + def get_window_size_and_window_minimum_threshold(self): + return ( + self._conf["algorithm"]["win_size"], + self._conf["algorithm"]["win_threshold"], + ) + + @property + def period_time(self): + return self._conf["common"]["period_time"] + + @property + def algorithm_type(self): + return self._conf["algorithm"]["algorithm_type"] + + @property + def sliding_window_type(self): + return self._conf["algorithm"]["win_type"] + + @property + def train_data_duration(self): + return self._conf["algorithm"]["train_data_duration"] + + @property + def train_update_duration(self): + return self._conf["algorithm"]["train_update_duration"] + + @property + def window_size(self): + return self._conf["algorithm"]["win_size"] + + @property + def window_minimum_threshold(self): + return self._conf["algorithm"]["win_threshold"] + + @property + def absolute_threshold(self): + return self._conf["common"]["absolute_threshold"] + + @property + def log_level(self): + return self._conf["log"]["level"] + + @property + def disks_to_detection(self): + return self._conf["common"]["disk"] + + @property + def stage(self): + return self._conf["common"]["stage"] + + @property + def iotype(self): + return self._conf["common"]["iotype"] + + @property + def boxplot_parameter(self): + return self._conf["algorithm"]["boxplot_parameter"] + + @property + def n_sigma_parameter(self): + return self._conf["algorithm"]["n_sigma_parameter"] + + @property + def read_iodump_lim(self): + return self._conf["iodump"]["read_iodump_lim"] + + @property + def write_iodump_lim(self): + return self._conf["iodump"]["write_iodump_lim"] \ No newline at end of file diff --git a/src/python/sentryPlugins/ai_block_io/data_access.py b/src/python/sentryPlugins/ai_block_io/data_access.py new file mode 100644 index 0000000..2f2d607 --- /dev/null +++ b/src/python/sentryPlugins/ai_block_io/data_access.py @@ -0,0 +1,127 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. + +import json +import logging + +from sentryCollector.collect_plugin import ( + Result_Messages, + get_io_data, + is_iocollect_valid, + get_disk_type +) + + +from .io_data import IOStageData, IOData + +COLLECT_STAGES = [ + "throtl", + "wbt", + "gettag", + "plug", + "bfq", + "hctx", + "requeue", + "rq_driver", + "bio", + "iocost", +] + + +def check_collect_valid(period): + data_raw = is_iocollect_valid(period) + if data_raw["ret"] == 0: + try: + data = json.loads(data_raw["message"]) + except Exception as e: + logging.warning(f"get valid devices failed, occur exception: {e}") + return None + if not data: + logging.warning(f"get valid devices failed, return {data_raw}") + return None + return [k for k in data.keys()] + else: + logging.warning(f"get valid devices failed, return {data_raw}") + return None + + +def check_detect_frequency_is_valid(period): + data_raw = is_iocollect_valid(period) + if data_raw["ret"] == 0: + try: + data = json.loads(data_raw["message"]) + except Exception as e: + return None + if not data: + return None + return [k for k in data.keys()] + else: + return None + + +def check_disk_is_available(period_time, disk): + data_raw = is_iocollect_valid(period_time, disk) + if data_raw["ret"] == 0: + try: + data = json.loads(data_raw["message"]) + except Exception as e: + return False + if not data: + return False + return True + else: + return False + + +def _get_raw_data(period, disk_list): + return get_io_data( + period, + disk_list, + COLLECT_STAGES, + ["read", "write", "flush", "discard"], + ) + + +def _get_io_stage_data(data): + io_stage_data = IOStageData() + for data_type in ("read", "write", "flush", "discard"): + if data_type in data: + getattr(io_stage_data, data_type).latency = data[data_type][0] + getattr(io_stage_data, data_type).io_dump = data[data_type][1] + getattr(io_stage_data, data_type).io_length = data[data_type][2] + getattr(io_stage_data, data_type).iops = data[data_type][3] + return io_stage_data + + +def get_io_data_from_collect_plug(period, disk_list): + data_raw = _get_raw_data(period, disk_list) + if data_raw["ret"] == 0: + ret = {} + try: + data = json.loads(data_raw["message"]) + except json.decoder.JSONDecodeError as e: + logging.warning(f"get io data failed, {e}") + return None + + for disk in data: + disk_data = data[disk] + disk_ret = IOData() + for k, v in disk_data.items(): + try: + getattr(disk_ret, k) + setattr(disk_ret, k, _get_io_stage_data(v)) + except AttributeError: + logging.debug(f"no attr {k}") + continue + ret[disk] = disk_ret + return ret + logging.warning(f'get io data failed with message: {data_raw["message"]}') + return None diff --git a/src/python/sentryPlugins/ai_block_io/detector.py b/src/python/sentryPlugins/ai_block_io/detector.py new file mode 100644 index 0000000..27fb7f7 --- /dev/null +++ b/src/python/sentryPlugins/ai_block_io/detector.py @@ -0,0 +1,156 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +import logging +from datetime import datetime + +from .io_data import MetricName +from .threshold import Threshold +from .sliding_window import SlidingWindow +from .utils import get_metric_value_from_io_data_dict_by_metric_name + + +class Detector: + + def __init__(self, metric_name: MetricName, threshold: Threshold, sliding_window: SlidingWindow): + self._metric_name = metric_name + self._threshold = threshold + # for when threshold update, it can print latest threshold with metric name + self._threshold.set_metric_name(self._metric_name) + self._slidingWindow = sliding_window + self._threshold.attach_observer(self._slidingWindow) + self._count = None + + @property + def metric_name(self): + return self._metric_name + + def get_sliding_window_data(self): + return self._slidingWindow.get_data() + + def is_slow_io_event(self, io_data_dict_with_disk_name: dict): + if self._count is None: + self._count = datetime.now() + else: + now_time = datetime.now() + time_diff = (now_time - self._count).total_seconds() + if time_diff >= 60: + logging.info(f"({self._metric_name}) 's latest ai threshold is: {self._threshold.get_threshold()}.") + self._count = None + + logging.debug(f'enter Detector: {self}') + metric_value = get_metric_value_from_io_data_dict_by_metric_name(io_data_dict_with_disk_name, self._metric_name) + if metric_value is None: + logging.debug('not found metric value, so return None.') + return (False, False), None, None, None, None + logging.debug(f'input metric value: {str(metric_value)}') + self._threshold.push_latest_data_to_queue(metric_value) + detection_result = self._slidingWindow.is_slow_io_event(metric_value) + # 检测到慢周期,由Detector负责打印info级别日志 + if detection_result[0][1]: + logging.info(f'[abnormal_period]: disk: {self._metric_name.disk_name}, ' + f'stage: {self._metric_name.stage_name}, ' + f'iotype: {self._metric_name.io_access_type_name}, ' + f'type: {self._metric_name.metric_name}, ' + f'ai_threshold: {round(detection_result[2], 3)}, ' + f'curr_val: {metric_value}') + else: + logging.debug(f'Detection result: {str(detection_result)}') + logging.debug(f'exit Detector: {self}') + return detection_result + + def __repr__(self): + return (f'disk_name: {self._metric_name.disk_name}, stage_name: {self._metric_name.stage_name},' + f' io_type_name: {self._metric_name.io_access_type_name},' + f' metric_name: {self._metric_name.metric_name}, threshold_type: {self._threshold},' + f' sliding_window_type: {self._slidingWindow}') + + +def set_to_str(parameter: set): + ret = "" + parameter = list(parameter) + length = len(parameter) + for i in range(length): + if i == 0: + ret += parameter[i] + else: + ret += "," + parameter[i] + return ret + + +class DiskDetector: + + def __init__(self, disk_name: str): + self._disk_name = disk_name + self._detector_list = [] + + def add_detector(self, detector: Detector): + self._detector_list.append(detector) + + def get_detector_list_window(self): + latency_wins = {"read": {}, "write": {}} + iodump_wins = {"read": {}, "write": {}} + for detector in self._detector_list: + if detector.metric_name.metric_name == 'latency': + latency_wins[detector.metric_name.io_access_type_name][detector.metric_name.stage_name] = detector.get_sliding_window_data() + elif detector.metric_name.metric_name == 'io_dump': + iodump_wins[detector.metric_name.io_access_type_name][detector.metric_name.stage_name] = detector.get_sliding_window_data() + return latency_wins, iodump_wins + + def is_slow_io_event(self, io_data_dict_with_disk_name: dict): + diagnosis_info = {"bio": [], "rq_driver": [], "kernel_stack": []} + for detector in self._detector_list: + # result返回内容:(是否检测到慢IO,是否检测到慢周期)、窗口、ai阈值、绝对阈值 + # 示例: (False, False), self._io_data_queue, self._ai_threshold, self._abs_threshold + result = detector.is_slow_io_event(io_data_dict_with_disk_name) + if result[0][0]: + if detector.metric_name.stage_name == "bio": + diagnosis_info["bio"].append(detector.metric_name) + elif detector.metric_name.stage_name == "rq_driver": + diagnosis_info["rq_driver"].append(detector.metric_name) + else: + diagnosis_info["kernel_stack"].append(detector.metric_name) + + if len(diagnosis_info["bio"]) == 0: + return False, None, None, None, None, None, None + + driver_name = self._disk_name + reason = "unknown" + block_stack = set() + io_type = set() + alarm_type = set() + + for key, value in diagnosis_info.items(): + for metric_name in value: + block_stack.add(metric_name.stage_name) + io_type.add(metric_name.io_access_type_name) + alarm_type.add(metric_name.metric_name) + + latency_wins, iodump_wins = self.get_detector_list_window() + details = {"latency": latency_wins, "iodump": iodump_wins} + + io_press = {"throtl", "wbt", "iocost", "bfq"} + driver_slow = {"rq_driver"} + kernel_slow = {"gettag", "plug", "deadline", "hctx", "requeue"} + + if not io_press.isdisjoint(block_stack): + reason = "io_press" + elif not driver_slow.isdisjoint(block_stack): + reason = "driver_slow" + elif not kernel_slow.isdisjoint(block_stack): + reason = "kernel_slow" + + return True, driver_name, reason, set_to_str(block_stack), set_to_str(io_type), set_to_str(alarm_type), details + + def __repr__(self): + msg = f'disk: {self._disk_name}, ' + for detector in self._detector_list: + msg += f'\n detector: [{detector}]' + return msg diff --git a/src/python/sentryPlugins/ai_block_io/io_data.py b/src/python/sentryPlugins/ai_block_io/io_data.py new file mode 100644 index 0000000..6042911 --- /dev/null +++ b/src/python/sentryPlugins/ai_block_io/io_data.py @@ -0,0 +1,54 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. + +from dataclasses import dataclass, field +from datetime import datetime +from typing import Optional + + +@dataclass +class BaseData: + latency: Optional[float] = field(default_factory=lambda: None) + io_dump: Optional[int] = field(default_factory=lambda: None) + io_length: Optional[int] = field(default_factory=lambda: None) + iops: Optional[int] = field(default_factory=lambda: None) + + +@dataclass +class IOStageData: + read: BaseData = field(default_factory=lambda: BaseData()) + write: BaseData = field(default_factory=lambda: BaseData()) + flush: BaseData = field(default_factory=lambda: BaseData()) + discard: BaseData = field(default_factory=lambda: BaseData()) + + +@dataclass +class IOData: + throtl: IOStageData = field(default_factory=lambda: IOStageData()) + wbt: IOStageData = field(default_factory=lambda: IOStageData()) + gettag: IOStageData = field(default_factory=lambda: IOStageData()) + iocost: IOStageData = field(default_factory=lambda: IOStageData()) + plug: IOStageData = field(default_factory=lambda: IOStageData()) + bfq: IOStageData = field(default_factory=lambda: IOStageData()) + hctx: IOStageData = field(default_factory=lambda: IOStageData()) + requeue: IOStageData = field(default_factory=lambda: IOStageData()) + rq_driver: IOStageData = field(default_factory=lambda: IOStageData()) + bio: IOStageData = field(default_factory=lambda: IOStageData()) + time_stamp: float = field(default_factory=lambda: datetime.now().timestamp()) + + +@dataclass(frozen=True) +class MetricName: + disk_name: str + disk_type: int + stage_name: str + io_access_type_name: str + metric_name: str diff --git a/src/python/sentryPlugins/ai_block_io/sliding_window.py b/src/python/sentryPlugins/ai_block_io/sliding_window.py new file mode 100644 index 0000000..a13033f --- /dev/null +++ b/src/python/sentryPlugins/ai_block_io/sliding_window.py @@ -0,0 +1,129 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. + +from enum import Enum, unique +import numpy as np + + +@unique +class SlidingWindowType(Enum): + NotContinuousSlidingWindow = 0 + ContinuousSlidingWindow = 1 + MedianSlidingWindow = 2 + + +class SlidingWindow: + def __init__(self, queue_length: int, threshold: int, abs_threshold: int = None, avg_lim: int = None): + self._queue_length = queue_length + self._queue_threshold = threshold + self._ai_threshold = None + self._abs_threshold = abs_threshold + self._avg_lim = avg_lim + self._io_data_queue = [] + self._io_data_queue_abnormal_tag = [] + + def is_abnormal(self, data): + if self._avg_lim is not None and data < self._avg_lim: + return False + if self._ai_threshold is not None and data > self._ai_threshold: + return True + if self._abs_threshold is not None and data > self._abs_threshold: + return True + + def push(self, data: float): + if len(self._io_data_queue) == self._queue_length: + self._io_data_queue.pop(0) + self._io_data_queue_abnormal_tag.pop(0) + self._io_data_queue.append(data) + tag = self.is_abnormal(data) + self._io_data_queue_abnormal_tag.append(tag) + return tag + + def update(self, threshold): + if self._ai_threshold == threshold: + return + self._ai_threshold = threshold + self._io_data_queue_abnormal_tag.clear() + for data in self._io_data_queue: + self._io_data_queue_abnormal_tag.append(self.is_abnormal(data)) + + def is_slow_io_event(self, data): + return False, None, None, None + + def get_data(self): + return self._io_data_queue + + def __repr__(self): + return "[SlidingWindow]" + + +class NotContinuousSlidingWindow(SlidingWindow): + def is_slow_io_event(self, data): + is_abnormal_period = super().push(data) + is_slow_io_event = False + if len(self._io_data_queue) < self._queue_length or (self._ai_threshold is None and self._abs_threshold is None): + is_slow_io_event = False + if self._io_data_queue_abnormal_tag.count(True) >= self._queue_threshold: + is_slow_io_event = True + return (is_slow_io_event, is_abnormal_period), self._io_data_queue, self._ai_threshold, self._abs_threshold, self._avg_lim + + def __repr__(self): + return f"[NotContinuousSlidingWindow, window size: {self._queue_length}, threshold: {self._queue_threshold}]" + + +class ContinuousSlidingWindow(SlidingWindow): + def is_slow_io_event(self, data): + is_abnormal_period = super().push(data) + is_slow_io_event = False + if len(self._io_data_queue) < self._queue_length or (self._ai_threshold is None and self._abs_threshold is None): + is_slow_io_event = False + consecutive_count = 0 + for tag in self._io_data_queue_abnormal_tag: + if tag: + consecutive_count += 1 + if consecutive_count >= self._queue_threshold: + is_slow_io_event = True + break + else: + consecutive_count = 0 + return (is_slow_io_event, is_abnormal_period), self._io_data_queue, self._ai_threshold, self._abs_threshold, self._avg_lim + + def __repr__(self): + return f"[ContinuousSlidingWindow, window size: {self._queue_length}, threshold: {self._queue_threshold}]" + + +class MedianSlidingWindow(SlidingWindow): + def is_slow_io_event(self, data): + is_abnormal_period = super().push(data) + is_slow_io_event = False + if len(self._io_data_queue) < self._queue_length or (self._ai_threshold is None and self._abs_threshold is None): + is_slow_io_event = False + median = np.median(self._io_data_queue) + if (self._ai_threshold is not None and median > self._ai_threshold) or (self._abs_threshold is not None and median > self._abs_threshold): + is_slow_io_event = True + return (is_slow_io_event, is_abnormal_period), self._io_data_queue, self._ai_threshold, self._abs_threshold, self._avg_lim + + def __repr__(self): + return f"[MedianSlidingWindow, window size: {self._queue_length}]" + + +class SlidingWindowFactory: + def get_sliding_window( + self, sliding_window_type: SlidingWindowType, *args, **kwargs + ): + if sliding_window_type == SlidingWindowType.NotContinuousSlidingWindow: + return NotContinuousSlidingWindow(*args, **kwargs) + elif sliding_window_type == SlidingWindowType.ContinuousSlidingWindow: + return ContinuousSlidingWindow(*args, **kwargs) + elif sliding_window_type == SlidingWindowType.MedianSlidingWindow: + return MedianSlidingWindow(*args, **kwargs) + else: + return NotContinuousSlidingWindow(*args, **kwargs) diff --git a/src/python/sentryPlugins/ai_block_io/threshold.py b/src/python/sentryPlugins/ai_block_io/threshold.py new file mode 100644 index 0000000..e202bb8 --- /dev/null +++ b/src/python/sentryPlugins/ai_block_io/threshold.py @@ -0,0 +1,178 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +import logging +from enum import Enum +import queue +import numpy as np +import math + +from .sliding_window import SlidingWindow + + +class ThresholdState(Enum): + INIT = 0 + START = 1 + + +class Threshold: + + def __init__(self, data_queue_size: int = 10000, data_queue_update_size: int = 1000): + self._observer = None + self.data_queue = queue.Queue(data_queue_size) + self.data_queue_update_size = data_queue_update_size + self.new_data_size = 0 + self.threshold_state = ThresholdState.INIT + self.threshold = math.inf + self.metric_name = None + + def set_threshold(self, threshold): + self.threshold = threshold + self.threshold_state = ThresholdState.START + self.notify_observer() + + def set_metric_name(self, metric_name): + self.metric_name = metric_name + + def get_threshold(self): + if self.threshold_state == ThresholdState.INIT: + return None + return self.threshold + + def is_abnormal(self, data): + if self.threshold_state == ThresholdState.INIT: + return False + return data >= self.threshold + + # 使用观察者模式,当阈值更新时,自动同步刷新滑窗中的阈值 + def attach_observer(self, observer: SlidingWindow): + self._observer = observer + + def notify_observer(self): + if self._observer is not None: + self._observer.update(self.threshold) + + def push_latest_data_to_queue(self, data): + pass + + def __repr__(self): + return "Threshold" + + def __str__(self): + return "Threshold" + + +class AbsoluteThreshold(Threshold): + def __init__(self, data_queue_size: int = 10000, data_queue_update_size: int = 1000, **kwargs): + super().__init__(data_queue_size, data_queue_update_size) + + def push_latest_data_to_queue(self, data): + pass + + def __repr__(self): + return "[AbsoluteThreshold]" + + def __str__(self): + return "absolute" + + +class BoxplotThreshold(Threshold): + def __init__(self, boxplot_parameter: float = 1.5, data_queue_size: int = 10000, data_queue_update_size: int = 1000, **kwargs): + super().__init__(data_queue_size, data_queue_update_size) + self.parameter = boxplot_parameter + + def _update_threshold(self): + old_threshold = self.threshold + data = list(self.data_queue.queue) + q1 = np.percentile(data, 25) + q3 = np.percentile(data, 75) + iqr = q3 - q1 + self.threshold = q3 + self.parameter * iqr + if self.threshold_state == ThresholdState.INIT: + self.threshold_state = ThresholdState.START + logging.info(f"MetricName: [{self.metric_name}]'s threshold update, old is: {old_threshold} -> new is: {self.threshold}") + self.notify_observer() + + def push_latest_data_to_queue(self, data): + if data < 1e-6: + return + try: + self.data_queue.put(data, block=False) + except queue.Full: + self.data_queue.get() + self.data_queue.put(data) + self.new_data_size += 1 + if (self.data_queue.full() and (self.threshold_state == ThresholdState.INIT or + (self.threshold_state == ThresholdState.START and + self.new_data_size >= self.data_queue_update_size))): + self._update_threshold() + self.new_data_size = 0 + + def __repr__(self): + return f"[BoxplotThreshold, param is: {self.parameter}, train_size: {self.data_queue.maxsize}, update_size: {self.data_queue_update_size}]" + + def __str__(self): + return "boxplot" + + +class NSigmaThreshold(Threshold): + def __init__(self, n_sigma_parameter: float = 3.0, data_queue_size: int = 10000, data_queue_update_size: int = 1000, **kwargs): + super().__init__(data_queue_size, data_queue_update_size) + self.parameter = n_sigma_parameter + + def _update_threshold(self): + old_threshold = self.threshold + data = list(self.data_queue.queue) + mean = np.mean(data) + std = np.std(data) + self.threshold = mean + self.parameter * std + if self.threshold_state == ThresholdState.INIT: + self.threshold_state = ThresholdState.START + logging.info(f"MetricName: [{self.metric_name}]'s threshold update, old is: {old_threshold} -> new is: {self.threshold}") + self.notify_observer() + + def push_latest_data_to_queue(self, data): + if data < 1e-6: + return + try: + self.data_queue.put(data, block=False) + except queue.Full: + self.data_queue.get() + self.data_queue.put(data) + self.new_data_size += 1 + if (self.data_queue.full() and (self.threshold_state == ThresholdState.INIT or + (self.threshold_state == ThresholdState.START and + self.new_data_size >= self.data_queue_update_size))): + self._update_threshold() + self.new_data_size = 0 + + def __repr__(self): + return f"[NSigmaThreshold, param is: {self.parameter}, train_size: {self.data_queue.maxsize}, update_size: {self.data_queue_update_size}]" + + def __str__(self): + return "n_sigma" + + +class ThresholdType(Enum): + AbsoluteThreshold = 0 + BoxplotThreshold = 1 + NSigmaThreshold = 2 + + +class ThresholdFactory: + def get_threshold(self, threshold_type: ThresholdType, *args, **kwargs): + if threshold_type == ThresholdType.AbsoluteThreshold: + return AbsoluteThreshold(*args, **kwargs) + elif threshold_type == ThresholdType.BoxplotThreshold: + return BoxplotThreshold(*args, **kwargs) + elif threshold_type == ThresholdType.NSigmaThreshold: + return NSigmaThreshold(*args, **kwargs) + else: + raise ValueError(f"Invalid threshold type: {threshold_type}") diff --git a/src/python/sentryPlugins/ai_block_io/utils.py b/src/python/sentryPlugins/ai_block_io/utils.py new file mode 100644 index 0000000..7d2390b --- /dev/null +++ b/src/python/sentryPlugins/ai_block_io/utils.py @@ -0,0 +1,73 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. + +import logging +from dataclasses import asdict + + +from .threshold import ThresholdType +from .sliding_window import SlidingWindowType +from .io_data import MetricName, IOData + + +def get_threshold_type_enum(algorithm_type: str): + if algorithm_type.lower() == "boxplot": + return ThresholdType.BoxplotThreshold + if algorithm_type.lower() == "n_sigma": + return ThresholdType.NSigmaThreshold + return None + + +def get_sliding_window_type_enum(sliding_window_type: str): + if sliding_window_type.lower() == "not_continuous": + return SlidingWindowType.NotContinuousSlidingWindow + if sliding_window_type.lower() == "continuous": + return SlidingWindowType.ContinuousSlidingWindow + if sliding_window_type.lower() == "median": + return SlidingWindowType.MedianSlidingWindow + return None + + +def get_metric_value_from_io_data_dict_by_metric_name( + io_data_dict: dict, metric_name: MetricName +): + try: + io_data: IOData = io_data_dict[metric_name.disk_name] + io_stage_data = asdict(io_data)[metric_name.stage_name] + base_data = io_stage_data[metric_name.io_access_type_name] + metric_value = base_data[metric_name.metric_name] + return metric_value + except KeyError: + return None + + +def get_data_queue_size_and_update_size( + training_data_duration: float, + train_update_duration: float, + slow_io_detect_frequency: int, +): + data_queue_size = int(training_data_duration * 60 * 60 / slow_io_detect_frequency) + update_size = int(train_update_duration * 60 * 60 / slow_io_detect_frequency) + return data_queue_size, update_size + + +def get_log_level(log_level: str): + if log_level.lower() == "debug": + return logging.DEBUG + elif log_level.lower() == "info": + return logging.INFO + elif log_level.lower() == "warning": + return logging.WARNING + elif log_level.lower() == "error": + return logging.ERROR + elif log_level.lower() == "critical": + return logging.CRITICAL + return logging.INFO diff --git a/src/python/sentryPlugins/avg_block_io/__init__.py b/src/python/sentryPlugins/avg_block_io/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/python/sentryPlugins/avg_block_io/avg_block_io.py b/src/python/sentryPlugins/avg_block_io/avg_block_io.py new file mode 100644 index 0000000..899d517 --- /dev/null +++ b/src/python/sentryPlugins/avg_block_io/avg_block_io.py @@ -0,0 +1,189 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +import logging +import signal +import configparser +import time + +from .config import read_config_log, read_config_common, read_config_algorithm, read_config_latency, read_config_iodump, read_config_stage +from .stage_window import IoWindow, IoDumpWindow +from .module_conn import avg_is_iocollect_valid, avg_get_io_data, report_alarm_fail, process_report_data, sig_handler, get_disk_type_by_name, check_disk_list_validation +from .utils import update_avg_and_check_abnormal + +CONFIG_FILE = "/etc/sysSentry/plugins/avg_block_io.ini" + + +def init_io_win(io_dic, config, common_param): + """initialize windows of latency, iodump, and dict of avg_value""" + iotype_list = io_dic["iotype_list"] + io_data = {} + io_avg_value = {} + for disk_name in io_dic["disk_list"]: + io_data[disk_name] = {} + io_avg_value[disk_name] = {} + curr_disk_type = get_disk_type_by_name(disk_name) + for stage_name in io_dic["stage_list"]: + io_data[disk_name][stage_name] = {} + io_avg_value[disk_name][stage_name] = {} + # 解析stage配置 + curr_stage_param = read_config_stage(config, stage_name, iotype_list, curr_disk_type) + for rw in iotype_list: + io_data[disk_name][stage_name][rw] = {} + io_avg_value[disk_name][stage_name][rw] = [0, 0] + + # 对每个rw创建latency和iodump窗口 + avg_lim_key = "{}_avg_lim".format(rw) + avg_time_key = "{}_avg_time".format(rw) + tot_lim_key = "{}_tot_lim".format(rw) + iodump_lim_key = "{}_iodump_lim".format(rw) + + # 获取值,优先从 curr_stage_param 获取,如果不存在,则从 common_param 获取 + avg_lim_value = curr_stage_param.get(avg_lim_key, common_param.get(curr_disk_type, {}).get(avg_lim_key)) + avg_time_value = curr_stage_param.get(avg_time_key, common_param.get(curr_disk_type, {}).get(avg_time_key)) + tot_lim_value = curr_stage_param.get(tot_lim_key, common_param.get(curr_disk_type, {}).get(tot_lim_key)) + iodump_lim_value = curr_stage_param.get(iodump_lim_key, common_param.get("iodump", {}).get(iodump_lim_key)) + + if avg_lim_value and avg_time_value and tot_lim_value: + io_data[disk_name][stage_name][rw]["latency"] = IoWindow(window_size=io_dic["win_size"], window_threshold=io_dic["win_threshold"], abnormal_multiple=avg_time_value, abnormal_multiple_lim=avg_lim_value, abnormal_time=tot_lim_value) + logging.debug("Successfully create {}-{}-{}-latency window".format(disk_name, stage_name, rw)) + + if iodump_lim_value is not None: + io_data[disk_name][stage_name][rw]["iodump"] = IoDumpWindow(window_size=io_dic["win_size"], window_threshold=io_dic["win_threshold"], abnormal_time=iodump_lim_value) + logging.debug("Successfully create {}-{}-{}-iodump window".format(disk_name, stage_name, rw)) + return io_data, io_avg_value + + +def get_valid_disk_stage_list(io_dic, config_disk, config_stage): + """get disk_list and stage_list by sentryCollector""" + json_data = avg_is_iocollect_valid(io_dic, config_disk, config_stage) + + all_disk_set = json_data.keys() + all_stage_set = set() + for disk_stage_list in json_data.values(): + all_stage_set.update(disk_stage_list) + + disk_list = [key for key in all_disk_set if key in config_disk] + not_in_disk_list = [key for key in config_disk if key not in all_disk_set] + + if not config_disk and not not_in_disk_list: + disk_list = [key for key in all_disk_set] + + if not disk_list: + report_alarm_fail("Cannot get valid disk name") + + disk_list = check_disk_list_validation(disk_list) + + disk_list = disk_list[:10] if len(disk_list) > 10 else disk_list + + if not config_disk: + logging.info(f"Default common.disk using disk={disk_list}") + elif sorted(disk_list) != sorted(config_disk): + logging.warning(f"Set common.disk to {disk_list}") + + stage_list = [key for key in all_stage_set if key in config_stage] + not_in_stage_list = [key for key in config_stage if key not in all_stage_set] + + if not_in_stage_list: + report_alarm_fail(f"Invalid common.stage_list config, cannot set {not_in_stage_list}") + + if not config_stage: + stage_list = [key for key in all_stage_set] + + if not stage_list: + report_alarm_fail("Cannot get valid stage name.") + + if not config_stage: + logging.info(f"Default common.stage using stage={stage_list}") + + return disk_list, stage_list + + +def main_loop(io_dic, io_data, io_avg_value): + """main loop of avg_block_io""" + period_time = io_dic["period_time"] + disk_list = io_dic["disk_list"] + stage_list = io_dic["stage_list"] + iotype_list = io_dic["iotype_list"] + win_size = io_dic["win_size"] + # 开始循环 + while True: + # 等待x秒 + time.sleep(period_time) + + # 采集模块对接,获取周期数据 + is_success, curr_period_data = avg_get_io_data(io_dic) + if not is_success: + logging.error(f"{curr_period_data['msg']}") + continue + + # 处理周期数据 + reach_size = False + for disk_name in disk_list: + for stage_name in stage_list: + for rw in iotype_list: + if disk_name in curr_period_data and stage_name in curr_period_data[disk_name] and rw in curr_period_data[disk_name][stage_name]: + io_key = (disk_name, stage_name, rw) + reach_size = update_avg_and_check_abnormal(curr_period_data, io_key, win_size, io_avg_value, io_data) + + # win_size不满时不进行告警判断 + if not reach_size: + continue + + # 判断异常窗口、异常场景 + for disk_name in disk_list: + for rw in iotype_list: + process_report_data(disk_name, rw, io_data) + + +def main(): + """main func""" + # 注册停止信号-2/-15 + signal.signal(signal.SIGINT, sig_handler) + signal.signal(signal.SIGTERM, sig_handler) + + log_level = read_config_log(CONFIG_FILE) + log_format = "%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s" + logging.basicConfig(level=log_level, format=log_format) + + # 初始化配置读取 + config = configparser.ConfigParser(comment_prefixes=('#', ';')) + try: + config.read(CONFIG_FILE) + except configparser.Error: + report_alarm_fail("Failed to read config file") + + io_dic = {} + + # 读取配置文件 -- common段 + io_dic["period_time"], disk, stage, io_dic["iotype_list"] = read_config_common(config) + + # 采集模块对接,is_iocollect_valid() + io_dic["disk_list"], io_dic["stage_list"] = get_valid_disk_stage_list(io_dic, disk, stage) + + logging.debug(f"disk={io_dic['disk_list']}, stage={io_dic['stage_list']}") + + if "bio" not in io_dic["stage_list"]: + report_alarm_fail("Cannot run avg_block_io without bio stage") + + # 初始化窗口 -- config读取,对应is_iocollect_valid返回的结果 + # step1. 解析公共配置 --- algorithm + io_dic["win_size"], io_dic["win_threshold"] = read_config_algorithm(config) + + # step2. 解析公共配置 --- latency_xxx + common_param = read_config_latency(config) + + # step3. 解析公共配置 --- iodump + common_param['iodump'] = read_config_iodump(config) + + # step4. 循环创建窗口 + io_data, io_avg_value = init_io_win(io_dic, config, common_param) + + main_loop(io_dic, io_data, io_avg_value) diff --git a/src/python/sentryPlugins/avg_block_io/config.py b/src/python/sentryPlugins/avg_block_io/config.py new file mode 100644 index 0000000..c1e8ab1 --- /dev/null +++ b/src/python/sentryPlugins/avg_block_io/config.py @@ -0,0 +1,208 @@ +import configparser +import logging +import os + +from .module_conn import report_alarm_fail +from sentryCollector.collect_plugin import Disk_Type + + +CONF_LOG = 'log' +CONF_LOG_LEVEL = 'level' +LogLevel = { + "debug": logging.DEBUG, + "info": logging.INFO, + "warning": logging.WARNING, + "error": logging.ERROR, + "critical": logging.CRITICAL +} + +CONF_COMMON = 'common' +CONF_COMMON_DISK = 'disk' +CONF_COMMON_STAGE = 'stage' +CONF_COMMON_IOTYPE = 'iotype' +CONF_COMMON_PER_TIME = 'period_time' + +CONF_ALGO = 'algorithm' +CONF_ALGO_SIZE = 'win_size' +CONF_ALGO_THRE = 'win_threshold' + +CONF_LATENCY = 'latency_{}' +CONF_IODUMP = 'iodump' + + +DEFAULT_PARAM = { + CONF_LOG: { + CONF_LOG_LEVEL: 'info' + }, CONF_COMMON: { + CONF_COMMON_DISK: 'default', + CONF_COMMON_STAGE: 'default', + CONF_COMMON_IOTYPE: 'read,write', + CONF_COMMON_PER_TIME: 1 + }, CONF_ALGO: { + CONF_ALGO_SIZE: 30, + CONF_ALGO_THRE: 6 + }, 'latency_nvme_ssd': { + 'read_avg_lim': 10000, + 'write_avg_lim': 10000, + 'read_avg_time': 3, + 'write_avg_time': 3, + 'read_tot_lim': 50000, + 'write_tot_lim': 50000, + }, 'latency_sata_ssd' : { + 'read_avg_lim': 10000, + 'write_avg_lim': 10000, + 'read_avg_time': 3, + 'write_avg_time': 3, + 'read_tot_lim': 50000, + 'write_tot_lim': 50000, + }, 'latency_sata_hdd' : { + 'read_avg_lim': 15000, + 'write_avg_lim': 15000, + 'read_avg_time': 3, + 'write_avg_time': 3, + 'read_tot_lim': 50000, + 'write_tot_lim': 50000 + }, CONF_IODUMP: { + 'read_iodump_lim': 0, + 'write_iodump_lim': 0 + } +} + + +def get_section_value(section_name, config): + common_param = {} + config_sec = config[section_name] + for config_key in DEFAULT_PARAM[section_name]: + if config_key in config_sec: + if not config_sec[config_key].isdecimal(): + report_alarm_fail(f"Invalid {section_name}.{config_key} config.") + common_param[config_key] = int(config_sec[config_key]) + else: + common_param[config_key] = DEFAULT_PARAM[section_name][config_key] + logging.warning(f"Unset {section_name}.{config_key} in config file, use {common_param[config_key]} as default") + return common_param + + +def read_config_log(filename): + """read config file, get [log] section value""" + default_log_level = DEFAULT_PARAM[CONF_LOG][CONF_LOG_LEVEL] + if not os.path.exists(filename): + return LogLevel.get(default_log_level) + + config = configparser.ConfigParser() + config.read(filename) + + log_level = config.get(CONF_LOG, CONF_LOG_LEVEL, fallback=default_log_level) + if log_level.lower() in LogLevel: + return LogLevel.get(log_level.lower()) + return LogLevel.get(default_log_level) + + +def read_config_common(config): + """read config file, get [common] section value""" + if not config.has_section(CONF_COMMON): + report_alarm_fail(f"Cannot find {CONF_COMMON} section in config file") + + try: + disk_name = config.get(CONF_COMMON, CONF_COMMON_DISK).lower() + disk = [] if disk_name == "default" else disk_name.split(",") + except configparser.NoOptionError: + disk = [] + logging.warning(f"Unset {CONF_COMMON}.{CONF_COMMON_DISK}, set to default") + + try: + stage_name = config.get(CONF_COMMON, CONF_COMMON_STAGE).lower() + stage = [] if stage_name == "default" else stage_name.split(",") + except configparser.NoOptionError: + stage = [] + logging.warning(f"Unset {CONF_COMMON}.{CONF_COMMON_STAGE}, set to default") + + if len(disk) > 10: + logging.warning(f"Too many {CONF_COMMON}.disks, record only max 10 disks") + disk = disk[:10] + + try: + iotype_name = config.get(CONF_COMMON, CONF_COMMON_IOTYPE).lower().split(",") + iotype_list = [rw.lower() for rw in iotype_name if rw.lower() in ['read', 'write']] + err_iotype = [rw.lower() for rw in iotype_name if rw.lower() not in ['read', 'write']] + + if err_iotype: + report_alarm_fail(f"Invalid {CONF_COMMON}.{CONF_COMMON_IOTYPE} config") + + except configparser.NoOptionError: + iotype_list = DEFAULT_PARAM[CONF_COMMON][CONF_COMMON_IOTYPE] + logging.warning(f"Unset {CONF_COMMON}.{CONF_COMMON_IOTYPE}, use {iotupe_list} as default") + + try: + period_time = int(config.get(CONF_COMMON, CONF_COMMON_PER_TIME)) + if not (1 <= period_time <= 300): + raise ValueError("Invalid period_time") + except ValueError: + report_alarm_fail(f"Invalid {CONF_COMMON}.{CONF_COMMON_PER_TIME}") + except configparser.NoOptionError: + period_time = DEFAULT_PARAM[CONF_COMMON][CONF_COMMON_PER_TIME] + logging.warning(f"Unset {CONF_COMMON}.{CONF_COMMON_PER_TIME}, use {period_time} as default") + + return period_time, disk, stage, iotype_list + + +def read_config_algorithm(config): + """read config file, get [algorithm] section value""" + if not config.has_section(CONF_ALGO): + report_alarm_fail(f"Cannot find {CONF_ALGO} section in config file") + + try: + win_size = int(config.get(CONF_ALGO, CONF_ALGO_SIZE)) + if not (1 <= win_size <= 300): + raise ValueError(f"Invalid {CONF_ALGO}.{CONF_ALGO_SIZE}") + except ValueError: + report_alarm_fail(f"Invalid {CONF_ALGO}.{CONF_ALGO_SIZE} config") + except configparser.NoOptionError: + win_size = DEFAULT_PARAM[CONF_ALGO][CONF_ALGO_SIZE] + logging.warning(f"Unset {CONF_ALGO}.{CONF_ALGO_SIZE}, use {win_size} as default") + + try: + win_threshold = int(config.get(CONF_ALGO, CONF_ALGO_THRE)) + if win_threshold < 1 or win_threshold > 300 or win_threshold > win_size: + raise ValueError(f"Invalid {CONF_ALGO}.{CONF_ALGO_THRE}") + except ValueError: + report_alarm_fail(f"Invalid {CONF_ALGO}.{CONF_ALGO_THRE} config") + except configparser.NoOptionError: + win_threshold = DEFAULT_PARAM[CONF_ALGO]['win_threshold'] + logging.warning(f"Unset {CONF_ALGO}.{CONF_ALGO_THRE}, use {win_threshold} as default") + + return win_size, win_threshold + + +def read_config_latency(config): + """read config file, get [latency_xxx] section value""" + common_param = {} + for type_name in Disk_Type: + section_name = CONF_LATENCY.format(Disk_Type[type_name]) + if not config.has_section(section_name): + report_alarm_fail(f"Cannot find {section_name} section in config file") + + common_param[Disk_Type[type_name]] = get_section_value(section_name, config) + return common_param + + +def read_config_iodump(config): + """read config file, get [iodump] section value""" + if not config.has_section(CONF_IODUMP): + report_alarm_fail(f"Cannot find {CONF_IODUMP} section in config file") + + return get_section_value(CONF_IODUMP, config) + + +def read_config_stage(config, stage, iotype_list, curr_disk_type): + """read config file, get [STAGE_NAME_diskType] section value""" + res = {} + section_name = f"{stage}_{curr_disk_type}" + if not config.has_section(section_name): + return res + + for key in config[section_name]: + if config[stage][key].isdecimal(): + res[key] = int(config[stage][key]) + + return res diff --git a/src/python/sentryPlugins/avg_block_io/module_conn.py b/src/python/sentryPlugins/avg_block_io/module_conn.py new file mode 100644 index 0000000..a67ef45 --- /dev/null +++ b/src/python/sentryPlugins/avg_block_io/module_conn.py @@ -0,0 +1,145 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +import json +import logging +import sys +import time + +from .utils import is_abnormal, get_win_data, log_slow_win +from sentryCollector.collect_plugin import is_iocollect_valid, get_io_data, Result_Messages, get_disk_type, Disk_Type +from syssentry.result import ResultLevel, report_result +from xalarm.sentry_notify import xalarm_report, MINOR_ALM, ALARM_TYPE_OCCUR + + +TASK_NAME = "avg_block_io" + +def sig_handler(signum, _f): + """stop avg_block_io""" + report_result(TASK_NAME, ResultLevel.PASS, json.dumps({})) + logging.info("Finished avg_block_io plugin running.") + sys.exit(0) + +def avg_get_io_data(io_dic): + """get_io_data from sentryCollector""" + logging.debug(f"send to sentryCollector get_io_data: period={io_dic['period_time']}, " + f"disk={io_dic['disk_list']}, stage={io_dic['stage_list']}, iotype={io_dic['iotype_list']}") + res = get_io_data(io_dic["period_time"], io_dic["disk_list"], io_dic["stage_list"], io_dic["iotype_list"]) + return check_result_validation(res, 'get io data') + + +def avg_is_iocollect_valid(io_dic, config_disk, config_stage): + """is_iocollect_valid from sentryCollector""" + logging.debug(f"send to sentryCollector is_iocollect_valid: period={io_dic['period_time']}, " + f"disk={config_disk}, stage={config_stage}") + res = is_iocollect_valid(io_dic["period_time"], config_disk, config_stage) + is_success, data = check_result_validation(res, 'check config validation') + if not is_success: + report_alarm_fail(f"{data['msg']}") + return data + + +def check_result_validation(res, reason): + """check validation of result from sentryCollector""" + if not 'ret' in res or not 'message' in res: + return False, {'msg': f"Failed to {reason}: Cannot connect to sentryCollector"} + if res['ret'] != 0: + return False, {'msg': f"Failed to {reason}: {Result_Messages[res['ret']]}"} + + try: + json_data = json.loads(res['message']) + except json.JSONDecodeError: + return False, {'msg': f"Failed to {reason}: invalid return message"} + + return True, json_data + + +def report_alarm_fail(alarm_info): + """report result to xalarmd""" + report_result(TASK_NAME, ResultLevel.FAIL, json.dumps({"msg": alarm_info})) + logging.critical(alarm_info) + sys.exit(1) + + +def process_report_data(disk_name, rw, io_data): + """check abnormal window and report to xalarm""" + abnormal, abnormal_list = is_abnormal((disk_name, 'bio', rw), io_data) + if not abnormal: + return + + msg = { + "alarm_source": TASK_NAME, "driver_name": disk_name, "io_type": rw, + "reason": "unknown", "block_stack": "bio", "alarm_type": abnormal_list, + "details": get_win_data(disk_name, rw, io_data) + } + + # io press + ctrl_stage = ['throtl', 'wbt', 'iocost', 'bfq'] + for stage_name in ctrl_stage: + abnormal, abnormal_list = is_abnormal((disk_name, stage_name, rw), io_data) + if not abnormal: + continue + msg["reason"] = "IO press" + msg["block_stack"] = f"bio,{stage_name}" + msg["alarm_type"] = abnormal_list + log_slow_win(msg, "IO press") + xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) + return + + # driver slow + abnormal, abnormal_list = is_abnormal((disk_name, 'rq_driver', rw), io_data) + if abnormal: + msg["reason"] = "driver slow" + msg["block_stack"] = "bio,rq_driver" + msg["alarm_type"] = abnormal_list + log_slow_win(msg, "driver slow") + xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) + return + + # kernel slow + kernel_stage = ['gettag', 'plug', 'deadline', 'hctx', 'requeue'] + for stage_name in kernel_stage: + abnormal, abnormal_list = is_abnormal((disk_name, stage_name, rw), io_data) + if not abnormal: + continue + msg["reason"] = "kernel slow" + msg["block_stack"] = f"bio,{stage_name}" + msg["alarm_type"] = abnormal_list + log_slow_win(msg, "kernel slow") + xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) + return + + log_slow_win(msg, "unknown") + xalarm_report(1002, MINOR_ALM, ALARM_TYPE_OCCUR, json.dumps(msg)) + + +def check_disk_list_validation(disk_list): + valid_disk_list = [] + for disk_name in disk_list: + is_success, _ = check_result_validation(get_disk_type(disk_name), "") + if not is_success: + continue + valid_disk_list.append(disk_name) + return valid_disk_list + + +def get_disk_type_by_name(disk_name): + logging.debug(f"send to sentryCollector get_disk_type: disk_name={disk_name}") + is_success, disk_type_str = check_result_validation(get_disk_type(disk_name), f'Invalid disk type {disk_name}') + if not is_success: + report_alarm_fail(f"{disk_type_str['msg']}") + try: + curr_disk_type = int(disk_type_str) + if curr_disk_type not in Disk_Type: + raise ValueError + except ValueError: + report_alarm_fail(f"Failed to get disk type for {disk_name}") + + return Disk_Type[curr_disk_type] diff --git a/src/python/sentryPlugins/avg_block_io/stage_window.py b/src/python/sentryPlugins/avg_block_io/stage_window.py new file mode 100644 index 0000000..587bd49 --- /dev/null +++ b/src/python/sentryPlugins/avg_block_io/stage_window.py @@ -0,0 +1,55 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. + +class AbnormalWindowBase: + def __init__(self, window_size=10, window_threshold=7): + self.window_size = window_size + self.window_threshold = window_threshold + self.abnormal_window = [False] * window_size + self.window_data = [-1] * window_size + + def append_new_data(self, ab_res): + self.window_data.pop(0) + self.window_data.append(ab_res) + + def append_new_period(self, ab_res, avg_val=0): + self.abnormal_window.pop(0) + if self.is_abnormal_period(ab_res, avg_val): + self.abnormal_window.append(True) + else: + self.abnormal_window.append(False) + + def is_abnormal_window(self): + return sum(self.abnormal_window) >= self.window_threshold + + def window_data_to_string(self): + return ",".join(str(x) for x in self.window_data) + + +class IoWindow(AbnormalWindowBase): + def __init__(self, window_size=10, window_threshold=7, abnormal_multiple=5, abnormal_multiple_lim=30, abnormal_time=40): + super().__init__(window_size, window_threshold) + self.abnormal_multiple = abnormal_multiple + self.abnormal_multiple_lim = abnormal_multiple_lim + self.abnormal_time = abnormal_time + + def is_abnormal_period(self, value, avg_val): + return (value > avg_val * self.abnormal_multiple and value > self.abnormal_multiple_lim) or \ + (value > self.abnormal_time) + + +class IoDumpWindow(AbnormalWindowBase): + def __init__(self, window_size=10, window_threshold=7, abnormal_time=40): + super().__init__(window_size, window_threshold) + self.abnormal_time = abnormal_time + + def is_abnormal_period(self, value, avg_val=0): + return value > self.abnormal_time diff --git a/src/python/sentryPlugins/avg_block_io/utils.py b/src/python/sentryPlugins/avg_block_io/utils.py new file mode 100644 index 0000000..1bfd4e8 --- /dev/null +++ b/src/python/sentryPlugins/avg_block_io/utils.py @@ -0,0 +1,140 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +import logging +import os + +AVG_VALUE = 0 +AVG_COUNT = 1 + + +def get_nested_value(data, keys): + """get data from nested dict""" + for key in keys: + if key in data: + data = data[key] + else: + return None + return data + + +def set_nested_value(data, keys, value): + """set data to nested dict""" + for key in keys[:-1]: + if key in data: + data = data[key] + else: + return False + data[keys[-1]] = value + return True + + +def get_win_data(disk_name, rw, io_data): + """get latency and iodump win data""" + latency = '' + iodump = '' + for stage_name in io_data[disk_name]: + if 'latency' in io_data[disk_name][stage_name][rw]: + latency_list = io_data[disk_name][stage_name][rw]['latency'].window_data_to_string() + latency += f'{stage_name}: [{latency_list}], ' + if 'iodump' in io_data[disk_name][stage_name][rw]: + iodump_list = io_data[disk_name][stage_name][rw]['iodump'].window_data_to_string() + iodump += f'{stage_name}: [{iodump_list}], ' + return {"latency": latency[:-2], "iodump": iodump[:-2]} + + +def is_abnormal(io_key, io_data): + """check if latency and iodump win abnormal""" + abnormal_list = '' + for key in ['latency', 'iodump']: + all_keys = get_nested_value(io_data, io_key) + if all_keys and key in all_keys: + win = get_nested_value(io_data, io_key + (key,)) + if win and win.is_abnormal_window(): + abnormal_list += key + ', ' + if not abnormal_list: + return False, abnormal_list + return True, abnormal_list[:-2] + + +def update_io_avg(old_avg, period_value, win_size): + """update average of latency window""" + if old_avg[AVG_COUNT] < win_size: + new_avg_count = old_avg[AVG_COUNT] + 1 + new_avg_value = (old_avg[AVG_VALUE] * old_avg[AVG_COUNT] + period_value[0]) / new_avg_count + else: + new_avg_count = old_avg[AVG_COUNT] + new_avg_value = (old_avg[AVG_VALUE] * (old_avg[AVG_COUNT] - 1) + period_value[0]) / new_avg_count + return [new_avg_value, new_avg_count] + + +def update_io_period(old_avg, period_value, io_data, io_key): + """update period of latency and iodump window""" + all_wins = get_nested_value(io_data, io_key) + if all_wins and "latency" in all_wins: + io_data[io_key[0]][io_key[1]][io_key[2]]["latency"].append_new_period(period_value[0], old_avg[AVG_VALUE]) + if all_wins and "iodump" in all_wins: + io_data[io_key[0]][io_key[1]][io_key[2]]["iodump"].append_new_period(period_value[1]) + + +def update_io_data(period_value, io_data, io_key): + """update data of latency and iodump window""" + all_wins = get_nested_value(io_data, io_key) + if all_wins and "latency" in all_wins: + io_data[io_key[0]][io_key[1]][io_key[2]]["latency"].append_new_data(period_value[0]) + if all_wins and "iodump" in all_wins: + io_data[io_key[0]][io_key[1]][io_key[2]]["iodump"].append_new_data(period_value[1]) + + +def log_abnormal_period(old_avg, period_value, io_data, io_key): + """record log of abnormal period""" + all_wins = get_nested_value(io_data, io_key) + if all_wins and "latency" in all_wins: + if all_wins["latency"].is_abnormal_period(period_value[0], old_avg[AVG_VALUE]): + logging.info(f"[abnormal_period] disk: {io_key[0]}, stage: {io_key[1]}, iotype: {io_key[2]}, " + f"type: latency, avg: {round(old_avg[AVG_VALUE], 3)}, curr_val: {period_value[0]}") + if all_wins and "iodump" in all_wins: + if all_wins["iodump"].is_abnormal_period(period_value[1]): + logging.info(f"[abnormal_period] disk: {io_key[0]}, stage: {io_key[1]}, iotype: {io_key[2]}, " + f"type: iodump, curr_val: {period_value[1]}") + + +def log_slow_win(msg, reason): + """record log of slow win""" + logging.warning(f"[SLOW IO] disk: {msg['driver_name']}, stage: {msg['block_stack']}, " + f"iotype: {msg['io_type']}, type: {msg['alarm_type']}, reason: {reason}") + logging.info(f"latency: {msg['details']['latency']}") + logging.info(f"iodump: {msg['details']['iodump']}") + + +def update_avg_and_check_abnormal(data, io_key, win_size, io_avg_value, io_data): + """update avg and check abonrmal, return true if win_size full""" + period_value = get_nested_value(data, io_key) + old_avg = get_nested_value(io_avg_value, io_key) + + # 更新avg数据 + update_io_data(period_value, io_data, io_key) + if old_avg[AVG_COUNT] < win_size: + set_nested_value(io_avg_value, io_key, update_io_avg(old_avg, period_value, win_size)) + return False + + # 打印异常周期数据 + log_abnormal_period(old_avg, period_value, io_data, io_key) + + # 更新win数据 -- 判断异常周期 + update_io_period(old_avg, period_value, io_data, io_key) + all_wins = get_nested_value(io_data, io_key) + if not all_wins or 'latency' not in all_wins: + return True + period = get_nested_value(io_data, io_key + ("latency",)) + if period and period.is_abnormal_period(period_value[0], old_avg[AVG_VALUE]): + return True + set_nested_value(io_avg_value, io_key, update_io_avg(old_avg, period_value, win_size)) + return True -- 2.43.0