aops-zeus/0012-conf-trace-info-and-conf-sync-optimize.patch
smjiao e4d8a6109b conf sync optimization and add file trace interface
(cherry picked from commit f6ec2c621812ce176182c812dc2d514ac0b524d7)
2024-07-02 11:38:20 +08:00

3044 lines
114 KiB
Diff
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

diff --git a/ansible_task/inventory/hosts.yml b/ansible_task/inventory/hosts.yml
new file mode 100644
index 0000000..ca2b55b
--- /dev/null
+++ b/ansible_task/inventory/hosts.yml
@@ -0,0 +1,4 @@
+all:
+ children:
+ sync:
+ hosts: {}
\ No newline at end of file
diff --git a/ansible_task/playbook_entries/conf_trace.yml b/ansible_task/playbook_entries/conf_trace.yml
new file mode 100644
index 0000000..60ea52f
--- /dev/null
+++ b/ansible_task/playbook_entries/conf_trace.yml
@@ -0,0 +1,14 @@
+- name: sync config to host
+ hosts: all
+ remote_user: root
+ gather_facts: no
+ max_fail_percentage: 30
+ strategy: free
+ vars:
+ - ip: "{{ ip }}"
+ - port: "{{ port }}"
+ - conf_list_str: "{{ conf_list_str }}"
+ - domain_name: "{{ domain_name }}"
+ - host_id: "{{ hostvars[inventory_hostname]['host_id'] }}"
+ roles:
+ - ../roles/conf_trace
\ No newline at end of file
diff --git a/ansible_task/playbook_entries/sync_config.yml b/ansible_task/playbook_entries/sync_config.yml
new file mode 100644
index 0000000..7819723
--- /dev/null
+++ b/ansible_task/playbook_entries/sync_config.yml
@@ -0,0 +1,9 @@
+- name: sync config to host
+ hosts: all
+ remote_user: root
+ gather_facts: no
+ max_fail_percentage: 30
+ serial: "{{ serial_count }}"
+ strategy: free
+ roles:
+ - ../roles/sync_domain_config
\ No newline at end of file
diff --git a/ansible_task/roles/conf_trace/tasks/main.yml b/ansible_task/roles/conf_trace/tasks/main.yml
new file mode 100644
index 0000000..446bbfc
--- /dev/null
+++ b/ansible_task/roles/conf_trace/tasks/main.yml
@@ -0,0 +1,49 @@
+---
+- name: install dependency
+ dnf:
+ name: python3-libselinux
+ state: present
+ when: action == "start" or action == "update"
+- name: copy ragdoll-filetrace bin
+ copy:
+ src: /usr/bin/ragdoll-filetrace
+ dest: /usr/bin/ragdoll-filetrace
+ owner: root
+ group: root
+ mode: '0755'
+ when: action == "start" or action == "update"
+- name: copy ragdoll-filetrace systemctl service config
+ copy:
+ src: /usr/lib/systemd/system/ragdoll-filetrace.service
+ dest: /usr/lib/systemd/system/ragdoll-filetrace.service
+ owner: root
+ group: root
+ mode: '0755'
+ when: action == "start" or action == "update"
+- name: reload systemctl service config
+ command: systemctl daemon-reload
+ when: action == "start" or action == "update"
+- name: enable ragdoll-filetrace systemd
+ command: systemctl enable ragdoll-filetrace
+ when: action == "start" or action == "update"
+- name: dependency install
+ shell: yum install python3-psutil kernel-devel-$(uname -r) bcc-tools bcc python3-bpfcc python3-requests llvm-12.0.1-4.iss22 llvm-libs-12.0.1-4.iss22 -y
+ when: action == "start"
+- name: Ensure /etc/ragdoll-filetrace directory exists
+ file:
+ path: /etc/ragdoll-filetrace
+ state: directory
+ mode: '0755'
+ when: action == "update" or action == "start"
+- name: update ragdoll-filetrace config
+ template:
+ src: agith.config.j2
+ dest: /etc/ragdoll-filetrace/ragdoll-filetrace.conf
+ mode: '0755'
+ when: action == "update" or action == "start"
+- name: stop ragdoll-filetrace when action is update
+ command: systemctl stop ragdoll-filetrace
+ when: action == "update" or action == "stop"
+- name: start ragdoll-filetrace systemd
+ command: systemctl start ragdoll-filetrace
+ when: action == "update" or action == "start"
diff --git a/ansible_task/roles/conf_trace/templates/agith.config.j2 b/ansible_task/roles/conf_trace/templates/agith.config.j2
new file mode 100644
index 0000000..c7a1476
--- /dev/null
+++ b/ansible_task/roles/conf_trace/templates/agith.config.j2
@@ -0,0 +1,18 @@
+{
+ "Repository": {
+ "concern_syscalls": [
+ "write",
+ "clone",
+ "unlinkat",
+ "unlink",
+ "connect",
+ "sendto",
+ "recvfrom",
+ "mkdir"
+ ],
+ "aops_zeus": "http://{{ ip }}:{{ port }}/conftrace/data",
+ "conf_list": "{{ conf_list_str }}",
+ "domain_name": "{{ domain_name }}",
+ "host_id": "{{ host_id }}"
+ }
+}
\ No newline at end of file
diff --git a/ansible_task/roles/sync_domain_config/tasks/main.yml b/ansible_task/roles/sync_domain_config/tasks/main.yml
new file mode 100644
index 0000000..29b8857
--- /dev/null
+++ b/ansible_task/roles/sync_domain_config/tasks/main.yml
@@ -0,0 +1,18 @@
+---
+- name: Check if software is installed
+ command: rpm -qi python3-libselinux
+ register: result
+ ignore_errors: yes
+- name: install dependency
+ dnf:
+ name: python3-libselinux
+ state: present
+ when: "'not installed' in result.stdout"
+- name: sync config to host
+ copy:
+ src: "{{ item.key }}"
+ dest: "{{ item.value }}"
+ owner: root
+ group: root
+ mode: '0644'
+ with_dict: "{{ file_path_infos }}"
\ No newline at end of file
diff --git a/aops-zeus.spec b/aops-zeus.spec
index 0d62cd6..badc290 100644
--- a/aops-zeus.spec
+++ b/aops-zeus.spec
@@ -37,6 +37,7 @@ cp -r database %{buildroot}/opt/aops/
%files
%doc README.*
%attr(0644,root,root) %{_sysconfdir}/aops/zeus.ini
+%attr(0644,root,root) %{_sysconfdir}/aops/zeus_crontab.yml
%attr(0755,root,root) %{_bindir}/aops-zeus
%attr(0755,root,root) %{_unitdir}/aops-zeus.service
%{python3_sitelib}/aops_zeus*.egg-info
diff --git a/conf/zeus.ini b/conf/zeus.ini
index 945d6b4..c87110f 100644
--- a/conf/zeus.ini
+++ b/conf/zeus.ini
@@ -36,4 +36,11 @@ port=11112
[apollo]
ip=127.0.0.1
-port=11116
\ No newline at end of file
+port=11116
+
+[serial]
+serial_count=10
+
+[update_sync_status]
+update_sync_status_address = "http://127.0.0.1"
+update_sync_status_port = 11114
\ No newline at end of file
diff --git a/conf/zeus_crontab.yml b/conf/zeus_crontab.yml
new file mode 100644
index 0000000..d60d306
--- /dev/null
+++ b/conf/zeus_crontab.yml
@@ -0,0 +1,31 @@
+# Timed task configuration file specification (YAML):
+
+# Name of a scheduled task, name should be unique e.g
+# task: download security bulletin
+
+# Task type, only 'update_config_sync_status' are supported
+# type: update_config_sync_status
+
+# Whether scheduled tasks are allowed to run
+# enable: true
+
+# meta info for the task, it's customised for user
+# meta:
+# cvrf_url: https://repo.openeuler.org/security/data/cvrf
+
+# Timed config, set the scheduled time and polling policy
+# timed:
+# value between 0-6, for example, 0 means Monday, 0-6 means everyday
+# day_of_week: 0-6
+# value between 0-23, for example, 2 means 2:00 in a day
+# hour: 3
+# Polling strategy, The value can only be 'cron' 'date' 'interval', default value is 'cron'
+# trigger: cron
+
+- task: update config sync status
+ type: update_config_sync_status_task
+ enable: true
+ timed:
+ day_of_week: 0-6
+ hour: 3
+ trigger: cron
\ No newline at end of file
diff --git a/database/zeus.sql b/database/zeus.sql
index b54e931..6d9a722 100644
--- a/database/zeus.sql
+++ b/database/zeus.sql
@@ -54,4 +54,23 @@ CREATE TABLE IF NOT EXISTS `host` (
CONSTRAINT `host_ibfk_2` FOREIGN KEY (`host_group_id`) REFERENCES `host_group` (`host_group_id`) ON DELETE RESTRICT ON UPDATE RESTRICT
) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;
+CREATE TABLE IF NOT EXISTS `host_conf_sync_status` (
+ `host_id` int(11) NOT NULL,
+ `host_ip` varchar(16) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
+ `domain_name` varchar(16) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
+ `sync_status` int(1) unsigned zerofill NULL DEFAULT NULL,
+ CONSTRAINT hd_host_sync PRIMARY KEY (host_id,domain_name),
+ INDEX `sync_status`(`sync_status`) USING BTREE
+) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;
+
+CREATE TABLE IF NOT EXISTS `conf_trace_info` (
+ `UUID` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
+ `domain_name` varchar(16) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
+ `host_id` int(11) NOT NULL,
+ `conf_name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
+ `info` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
+ `create_time` datetime DEFAULT NULL,
+ PRIMARY KEY (`UUID`) USING BTREE
+) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;
+
INSERT INTO `user` (`username`, `password`) VALUE('admin', 'pbkdf2:sha256:150000$h1oaTY7K$5b1ff300a896f6f373928294fd8bac8ed6d2a1d6a7c5ea2d2ccd2075e6177896') ON DUPLICATE KEY UPDATE username= 'admin',password='pbkdf2:sha256:150000$h1oaTY7K$5b1ff300a896f6f373928294fd8bac8ed6d2a1d6a7c5ea2d2ccd2075e6177896';
\ No newline at end of file
diff --git a/setup.py b/setup.py
index 1b45b43..d62ee0c 100644
--- a/setup.py
+++ b/setup.py
@@ -26,6 +26,7 @@ setup(
author='cmd-lsw-yyy-zyc',
data_files=[
('/etc/aops', ['conf/zeus.ini']),
+ ('/etc/aops', ['conf/zeus_crontab.yml']),
('/usr/lib/systemd/system', ['aops-zeus.service']),
("/opt/aops/database", ["database/zeus.sql"]),
],
diff --git a/zeus/conf/constant.py b/zeus/conf/constant.py
index 27aef66..a0e0a98 100644
--- a/zeus/conf/constant.py
+++ b/zeus/conf/constant.py
@@ -57,9 +57,15 @@ ADD_GROUP = "/manage/host/group/add"
DELETE_GROUP = "/manage/host/group/delete"
GET_GROUP = "/manage/host/group/get"
+ADD_HOST_SYNC_STATUS = "/manage/host/sync/status/add"
+DELETE_HOST_SYNC_STATUS = "/manage/host/sync/status/delete"
+DELETE_ALL_HOST_SYNC_STATUS = "/manage/all/host/sync/status/delete"
+GET_HOST_SYNC_STATUS = "/manage/host/sync/status/get"
+
COLLECT_CONFIG = '/manage/config/collect'
SYNC_CONFIG = '/manage/config/sync'
OBJECT_FILE_CONFIG = '/manage/config/objectfile'
+BATCH_SYNC_CONFIG = '/manage/config/batch/sync'
USER_LOGIN = "/manage/account/login"
LOGOUT = "/manage/account/logout"
@@ -94,6 +100,17 @@ VUL_TASK_CVE_SCAN_NOTICE = "/vulnerability/task/callback/cve/scan/notice"
CHECK_IDENTIFY_SCENE = "/check/scene/identify"
CHECK_WORKFLOW_HOST_EXIST = '/check/workflow/host/exist'
+# ragdoll
+DOMAIN_LIST_API = "/domain/queryDomain"
+EXPECTED_CONFS_API = "/confs/queryExpectedConfs"
+DOMAIN_CONF_DIFF_API = "/confs/domain/diff"
+
+# conf trace
+CONF_TRACE_MGMT = "/conftrace/mgmt"
+CONF_TRACE_QUERY = "/conftrace/query"
+CONF_TRACE_DATA = "/conftrace/data"
+CONF_TRACE_DELETE = "/conftrace/delete"
+
# host template file content
HOST_TEMPLATE_FILE_CONTENT = """host_ip,ssh_port,ssh_user,password,ssh_pkey,host_name,host_group_name,management
127.0.0.1,22,root,password,private key,test_host,test_host_group,FALSE
@@ -106,6 +123,20 @@ HOST_TEMPLATE_FILE_CONTENT = """host_ip,ssh_port,ssh_user,password,ssh_pkey,host
"4. 上传本文件前,请删除此部分提示内容",,,,,,,
"""
+# ansible sync config
+PARENT_DIRECTORY = "/opt/aops/ansible_task/"
+HOST_PATH_FILE = "/opt/aops/ansible_task/inventory/"
+SYNC_CONFIG_YML = "/opt/aops/ansible_task/playbook_entries/sync_config.yml"
+CONF_TRACE_YML = "/opt/aops/ansible_task/playbook_entries/conf_trace.yml"
+SYNC_LOG_PATH = "/var/log/aops/sync/"
+CONF_TRACE_LOG_PATH = "/var/log/aops/conftrace/"
+KEY_FILE_PREFIX = "/tmp/"
+KEY_FILE_SUFFIX = "_id_dsa"
+IP_START_PATTERN = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}"
+
+DIRECTORY_FILE_PATH_LIST = ["/etc/pam.d"]
+
+TIMED_TASK_CONFIG_PATH = "/etc/aops/zeus_crontab.yml"
# cve task status
class CveTaskStatus:
diff --git a/zeus/conf/default_config.py b/zeus/conf/default_config.py
index 5e05f64..1e713ae 100644
--- a/zeus/conf/default_config.py
+++ b/zeus/conf/default_config.py
@@ -32,7 +32,10 @@ apollo = {"IP": "127.0.0.1", "PORT": 11116}
redis = {"IP": "127.0.0.1", "PORT": 6379}
-
prometheus = {"IP": "127.0.0.1", "PORT": 9090, "QUERY_RANGE_STEP": "15s"}
agent = {"DEFAULT_INSTANCE_PORT": 8888}
+
+serial = {"SERIAL_COUNT": 10}
+
+update_sync_status = {"UPDATE_SYNC_STATUS_ADDRESS": "http://127.0.0.1", "UPDATE_SYNC_STATUS_PORT": 11114}
diff --git a/zeus/config_manager/view.py b/zeus/config_manager/view.py
index b012c62..28c9d32 100644
--- a/zeus/config_manager/view.py
+++ b/zeus/config_manager/view.py
@@ -15,17 +15,29 @@ Time:
Author:
Description: Restful APIs for host
"""
+import glob
import json
import os
+import queue
+import subprocess
+import threading
+import time
+from configparser import RawConfigParser
from typing import List, Dict
+import yaml
+from vulcanus import LOGGER
from vulcanus.multi_thread_handler import MultiThreadHandler
from vulcanus.restful.resp import state
from vulcanus.restful.response import BaseResponse
-from zeus.conf.constant import CERES_COLLECT_FILE, CERES_SYNC_CONF, CERES_OBJECT_FILE_CONF
+
+from zeus.conf import configuration
+from zeus.conf.constant import CERES_COLLECT_FILE, CERES_SYNC_CONF, CERES_OBJECT_FILE_CONF, SYNC_LOG_PATH, \
+ HOST_PATH_FILE, SYNC_CONFIG_YML, PARENT_DIRECTORY, IP_START_PATTERN, KEY_FILE_PREFIX, KEY_FILE_SUFFIX
from zeus.database.proxy.host import HostProxy
from zeus.function.model import ClientConnectArgs
-from zeus.function.verify.config import CollectConfigSchema, SyncConfigSchema, ObjectFileConfigSchema
+from zeus.function.verify.config import CollectConfigSchema, SyncConfigSchema, ObjectFileConfigSchema, \
+ BatchSyncConfigSchema
from zeus.host_manager.ssh import execute_command_and_parse_its_result, execute_command_sftp_result
@@ -147,7 +159,7 @@ class CollectConfig(BaseResponse):
return file_content
- @BaseResponse.handle(schema=CollectConfigSchema, token=False)
+ @BaseResponse.handle(schema=CollectConfigSchema, token=True)
def post(self, **param):
"""
Get config
@@ -201,16 +213,16 @@ class CollectConfig(BaseResponse):
file_content = self.convert_host_id_to_failed_data_format(
list(host_id_with_config_file.keys()), host_id_with_config_file
)
- return self.response(code=state.DATABASE_CONNECT_ERROR, data={"resp": file_content})
+ return self.response(code=state.DATABASE_CONNECT_ERROR, data=file_content)
status, host_list = proxy.get_host_info(
- {"username": "admin", "host_list": list(host_id_with_config_file.keys())}, True
+ {"username": param.get("username"), "host_list": list(host_id_with_config_file.keys())}, True
)
if status != state.SUCCEED:
file_content = self.convert_host_id_to_failed_data_format(
list(host_id_with_config_file.keys()), host_id_with_config_file
)
- return self.response(code=status, data={"resp": file_content})
+ return self.response(code=status, data=file_content)
# Get file content
tasks = [(host, host_id_with_config_file[host["host_id"]]) for host in host_list]
multi_thread = MultiThreadHandler(lambda data: self.get_file_content(*data), tasks, None)
@@ -262,7 +274,7 @@ class SyncConfig(BaseResponse):
host_info.get("ssh_user"), host_info.get("pkey")), command)
return status
- @BaseResponse.handle(schema=SyncConfigSchema, token=False)
+ @BaseResponse.handle(schema=SyncConfigSchema, token=True)
def put(self, **params):
sync_config_info = dict()
@@ -277,19 +289,19 @@ class SyncConfig(BaseResponse):
# Query host address from database
proxy = HostProxy()
if not proxy.connect():
- return self.response(code=state.DATABASE_CONNECT_ERROR, data={"resp": sync_result})
+ return self.response(code=state.DATABASE_CONNECT_ERROR, data=sync_result)
status, host_list = proxy.get_host_info(
- {"username": "admin", "host_list": [params.get('host_id')]}, True)
+ {"username": params.get("username"), "host_list": [params.get('host_id')]}, True)
if status != state.SUCCEED:
- return self.response(code=status, data={"resp": sync_result})
+ return self.response(code=status, data=sync_result)
host_info = host_list[0]
status = self.sync_config_content(host_info, sync_config_info)
if status == state.SUCCEED:
sync_result['sync_result'] = True
- return self.response(code=state.SUCCEED, data={"resp": sync_result})
- return self.response(code=state.UNKNOWN_ERROR, data={"resp": sync_result})
+ return self.response(code=state.SUCCEED, data=sync_result)
+ return self.response(code=state.UNKNOWN_ERROR, data=sync_result)
class ObjectFileConfig(BaseResponse):
@@ -302,7 +314,7 @@ class ObjectFileConfig(BaseResponse):
host_info.get("ssh_user"), host_info.get("pkey")), command)
return status, content
- @BaseResponse.handle(schema=ObjectFileConfigSchema, token=False)
+ @BaseResponse.handle(schema=ObjectFileConfigSchema, token=True)
def post(self, **params):
object_file_result = {
"object_file_paths": list(),
@@ -311,12 +323,12 @@ class ObjectFileConfig(BaseResponse):
# Query host address from database
proxy = HostProxy()
if not proxy.connect():
- return self.response(code=state.DATABASE_CONNECT_ERROR, data={"resp": object_file_result})
+ return self.response(code=state.DATABASE_CONNECT_ERROR, data=object_file_result)
status, host_list = proxy.get_host_info(
- {"username": "admin", "host_list": [params.get('host_id')]}, True)
+ {"username": params.get("username"), "host_list": [params.get('host_id')]}, True)
if status != state.SUCCEED:
- return self.response(code=status, data={"resp": object_file_result})
+ return self.response(code=status, data=object_file_result)
host_info = host_list[0]
status, content = self.object_file_config_content(host_info, params.get('file_directory'))
@@ -326,5 +338,206 @@ class ObjectFileConfig(BaseResponse):
if content_res.get("resp"):
resp = content_res.get("resp")
object_file_result['object_file_paths'] = resp
- return self.response(code=state.SUCCEED, data={"resp": object_file_result})
- return self.response(code=state.UNKNOWN_ERROR, data={"resp": object_file_result})
+ return self.response(code=state.SUCCEED, data=object_file_result)
+ return self.response(code=state.UNKNOWN_ERROR, data=object_file_result)
+
+
+class BatchSyncConfig(BaseResponse):
+ @staticmethod
+ def run_subprocess(cmd, result_queue):
+ try:
+ completed_process = subprocess.run(cmd, cwd=PARENT_DIRECTORY, shell=True, capture_output=True, text=True)
+ result_queue.put(completed_process)
+ except subprocess.CalledProcessError as ex:
+ result_queue.put(ex)
+
+ @staticmethod
+ def ansible_handler(now_time, ansible_forks, extra_vars, HOST_FILE):
+ if not os.path.exists(SYNC_LOG_PATH):
+ os.makedirs(SYNC_LOG_PATH)
+
+ SYNC_LOG = SYNC_LOG_PATH + "sync_config_" + now_time + ".log"
+ cmd = f"ansible-playbook -f {ansible_forks} -e '{extra_vars}' " \
+ f"-i {HOST_FILE} {SYNC_CONFIG_YML} |tee {SYNC_LOG} "
+ result_queue = queue.Queue()
+ thread = threading.Thread(target=BatchSyncConfig.run_subprocess, args=(cmd, result_queue))
+ thread.start()
+
+ thread.join()
+ try:
+ completed_process = result_queue.get(block=False)
+ if isinstance(completed_process, subprocess.CalledProcessError):
+ LOGGER.error("ansible subprocess error:", completed_process)
+ else:
+ if completed_process.returncode == 0:
+ return completed_process.stdout
+ else:
+ LOGGER.error("ansible subprocess error:", completed_process)
+ except queue.Empty:
+ LOGGER.error("ansible subprocess nothing result")
+
+ @staticmethod
+ def ansible_sync_domain_config_content(host_list: list, file_path_infos: list):
+ # 初始化参数和响应
+ now_time = str(int(time.time()))
+ host_ip_sync_result = {}
+ BatchSyncConfig.generate_config(host_list, host_ip_sync_result, now_time)
+
+ ansible_forks = len(host_list)
+ # 配置文件中读取并发数量
+ # 从内存中获取serial_count
+ serial_count = configuration.serial.get("SERIAL_COUNT")
+ # 换种方式
+ path_infos = {}
+ for file_info in file_path_infos:
+ file_path = file_info.get("file_path")
+ file_content = file_info.get("content")
+ # 写临时文件
+ src_file_path = "/tmp/" + os.path.basename(file_path)
+ with open(src_file_path, "w", encoding="UTF-8") as f:
+ f.write(file_content)
+ path_infos[src_file_path] = file_path
+
+ # 调用ansible
+ extra_vars = json.dumps({"serial_count": serial_count, "file_path_infos": path_infos})
+ try:
+ HOST_FILE = HOST_PATH_FILE + "hosts_" + now_time + ".yml"
+ result = BatchSyncConfig.ansible_handler(now_time, ansible_forks, extra_vars, HOST_FILE)
+ except Exception as ex:
+ LOGGER.error("ansible playbook execute error:", ex)
+ return host_ip_sync_result
+
+ processor_result = result.splitlines()
+ char_to_filter = 'item='
+ filtered_list = [item for item in processor_result if char_to_filter in item]
+ if not filtered_list:
+ return host_ip_sync_result
+ for line in filtered_list:
+ start_index = line.find("[") + 1
+ end_index = line.find("]", start_index)
+ ip_port = line[start_index:end_index]
+ sync_results = host_ip_sync_result.get(ip_port)
+
+ start_index1 = line.find("{")
+ end_index1 = line.find(")", start_index1)
+ path_str = line[start_index1:end_index1]
+ file_path = json.loads(path_str.replace("'", "\"")).get("value")
+ if line.startswith("ok:") or line.startswith("changed:"):
+ signal_file_sync = {
+ "filePath": file_path,
+ "result": "SUCCESS"
+ }
+ else:
+ signal_file_sync = {
+ "filePath": file_path,
+ "result": "FAIL"
+ }
+ sync_results.append(signal_file_sync)
+ # 删除中间文件
+ try:
+ # 删除/tmp下面以id_dsa结尾的文件
+ file_pattern = "*id_dsa"
+ tmp_files_to_delete = glob.glob(os.path.join(KEY_FILE_PREFIX, file_pattern))
+ for tmp_file_path in tmp_files_to_delete:
+ os.remove(tmp_file_path)
+
+ # 删除/tmp下面临时写的path_infos的key值文件
+ for path in path_infos.keys():
+ os.remove(path)
+
+ # 删除临时的HOST_PATH_FILE的临时inventory文件
+ os.remove(HOST_FILE)
+ except OSError as ex:
+ LOGGER.error("remove file error: %s", ex)
+ return host_ip_sync_result
+
+ @staticmethod
+ def generate_config(host_list, host_ip_sync_result, now_time):
+ # 取出host_ip,并传入ansible的hosts中
+ hosts = {
+ "all": {
+ "children": {
+ "sync": {
+ "hosts": {
+
+ }
+ }
+ }
+ }
+ }
+
+ for host in host_list:
+ # 生成临时的密钥key文件用于ansible访问远端主机
+ key_file_path = KEY_FILE_PREFIX + host['host_ip'] + KEY_FILE_SUFFIX
+ with open(key_file_path, 'w', encoding="UTF-8") as keyfile:
+ os.chmod(key_file_path, 0o600)
+ keyfile.write(host['pkey'])
+ host_ip = host['host_ip']
+ host_vars = {
+ "ansible_host": host_ip,
+ "ansible_ssh_user": "root",
+ "ansible_ssh_private_key_file": key_file_path,
+ "ansible_ssh_port": host['ssh_port'],
+ "ansible_python_interpreter": "/usr/bin/python3",
+ "host_key_checking": False,
+ "interpreter_python": "auto_legacy_silent",
+ "become": True,
+ "become_method": "sudo",
+ "become_user": "root",
+ "become_ask_pass": False,
+ "ssh_args": "-C -o ControlMaster=auto -o ControlPersist=60s StrictHostKeyChecking=no"
+ }
+
+ hosts['all']['children']['sync']['hosts'][host_ip + "_" + str(host['ssh_port'])] = host_vars
+ # 初始化结果
+ host_ip_sync_result[host['host_ip'] + "_" + str(host['ssh_port'])] = list()
+ HOST_FILE = HOST_PATH_FILE + "hosts_" + now_time + ".yml"
+ with open(HOST_FILE, 'w') as outfile:
+ yaml.dump(hosts, outfile, default_flow_style=False)
+
+ @staticmethod
+ def ini2json(ini_path):
+ json_data = {}
+ cfg = RawConfigParser()
+ cfg.read(ini_path)
+ for s in cfg.sections():
+ json_data[s] = dict(cfg.items(s))
+ return json_data
+
+ @BaseResponse.handle(schema=BatchSyncConfigSchema, proxy=HostProxy, token=True)
+ def put(self, callback: HostProxy, **params):
+ # 初始化响应
+ file_path_infos = params.get('file_path_infos')
+ host_ids = params.get('host_ids')
+ sync_result = list()
+ # Query host address from database
+ if not callback.connect():
+ return self.response(code=state.DATABASE_CONNECT_ERROR, data=sync_result)
+
+ # 校验token
+ status, host_list = callback.get_host_info(
+ # 校验token 拿到用户
+ {"username": params.get("username"), "host_list": host_ids}, True)
+ if status != state.SUCCEED:
+ return self.response(code=status, data=sync_result)
+
+ # 将ip和id对应起来
+ host_id_ip_dict = dict()
+ if host_list:
+ for host in host_list:
+ key = host['host_ip'] + "_" + str(host['ssh_port'])
+ host_id_ip_dict[key] = host['host_id']
+
+ host_ip_sync_result = self.ansible_sync_domain_config_content(host_list, file_path_infos)
+
+ if not host_ip_sync_result:
+ return self.response(code=state.EXECUTE_COMMAND_ERROR, data=sync_result)
+ # 处理成id对应结果
+ for key, value in host_ip_sync_result.items():
+ host_id = host_id_ip_dict.get(key)
+ single_result = {
+ "host_id": host_id,
+ "syncResult": value
+ }
+ sync_result.append(single_result)
+ return self.response(code=state.SUCCEED, data=sync_result)
diff --git a/zeus/conftrace_manage/__init__.py b/zeus/conftrace_manage/__init__.py
new file mode 100644
index 0000000..0470fef
--- /dev/null
+++ b/zeus/conftrace_manage/__init__.py
@@ -0,0 +1,18 @@
+#!/usr/bin/python3
+# ******************************************************************************
+# Copyright (C) 2023 isoftstone Technologies Co., Ltd. All rights reserved.
+# licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+# ******************************************************************************/
+"""
+@FileName: __init__.py.py
+@Time: 2024/4/19 9:21
+@Author: JiaoSiMao
+Description:
+"""
diff --git a/zeus/conftrace_manage/view.py b/zeus/conftrace_manage/view.py
new file mode 100644
index 0000000..a1faffc
--- /dev/null
+++ b/zeus/conftrace_manage/view.py
@@ -0,0 +1,279 @@
+#!/usr/bin/python3
+# ******************************************************************************
+# Copyright (C) 2023 isoftstone Technologies Co., Ltd. All rights reserved.
+# licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+# ******************************************************************************/
+"""
+Time:
+Author:
+Description: Restful APIs for conf trace
+"""
+import glob
+import json
+import os
+import queue
+import subprocess
+import threading
+import time
+
+import yaml
+from vulcanus import LOGGER
+from vulcanus.restful.resp import state
+from vulcanus.restful.resp.state import SUCCEED, SERVER_ERROR
+from vulcanus.restful.response import BaseResponse
+
+from zeus.conf import configuration
+from zeus.conf.constant import KEY_FILE_PREFIX, KEY_FILE_SUFFIX, HOST_PATH_FILE, CONF_TRACE_LOG_PATH, \
+ PARENT_DIRECTORY, CONF_TRACE_YML
+from zeus.database.proxy.conf_trace import ConfTraceProxy
+from zeus.database.proxy.host import HostProxy
+from zeus.function.verify.conf_trace import ConfTraceMgmtSchema, ConfTraceDataSchema, ConfTraceQuerySchema, \
+ ConfTraceDataDeleteSchema
+
+
+class ConfTraceMgmt(BaseResponse):
+ """
+ Interface for register user.
+ Restful API: post
+ """
+
+ @staticmethod
+ def parse_result(action, result, host_ip_trace_result, HOST_FILE):
+ code_num = SUCCEED
+ code_string = f"{action} ragdoll-filetrace succeed"
+ processor_result = result.splitlines()
+ char_to_filter = 'unreachable='
+ filtered_list = [item for item in processor_result if char_to_filter in item]
+ if not filtered_list:
+ code_num = SERVER_ERROR
+ code_string = f"{action} ragdoll-filetrace error, no result"
+ for line in filtered_list:
+ result_start_index = line.find(":")
+ ip_port = line[0:result_start_index]
+ trace_result = host_ip_trace_result.get(ip_port.strip())
+ print(trace_result)
+ result_str = line[result_start_index:]
+ if "unreachable=0" in result_str and "failed=0" in result_str:
+ host_ip_trace_result[ip_port.strip()] = True
+ else:
+ host_ip_trace_result[ip_port.strip()] = False
+
+ # 删除中间文件
+ try:
+ # 删除/tmp下面以id_dsa结尾的文件
+ dsa_file_pattern = "*id_dsa"
+ dsa_tmp_files_to_delete = glob.glob(os.path.join(KEY_FILE_PREFIX, dsa_file_pattern))
+ for dsa_tmp_file_path in dsa_tmp_files_to_delete:
+ os.remove(dsa_tmp_file_path)
+
+ # 删除临时的HOST_PATH_FILE的临时inventory文件
+ os.remove(HOST_FILE)
+ except OSError as ex:
+ LOGGER.error("remove file error: %s", ex)
+ return code_num, code_string
+
+ @staticmethod
+ def run_subprocess(cmd, result_queue):
+ try:
+ completed_process = subprocess.run(cmd, cwd=PARENT_DIRECTORY, shell=True, capture_output=True, text=True)
+ result_queue.put(completed_process)
+ except subprocess.CalledProcessError as ex:
+ result_queue.put(ex)
+
+ @staticmethod
+ def ansible_handler(now_time, ansible_forks, extra_vars, HOST_FILE):
+ if not os.path.exists(CONF_TRACE_LOG_PATH):
+ os.makedirs(CONF_TRACE_LOG_PATH)
+
+ CONF_TRACE_LOG = CONF_TRACE_LOG_PATH + "conf_trace_" + now_time + ".log"
+
+ cmd = f"ansible-playbook -f {ansible_forks} -e '{extra_vars}' " \
+ f"-i {HOST_FILE} {CONF_TRACE_YML} |tee {CONF_TRACE_LOG} "
+ result_queue = queue.Queue()
+ thread = threading.Thread(target=ConfTraceMgmt.run_subprocess, args=(cmd, result_queue))
+ thread.start()
+
+ thread.join()
+ try:
+ completed_process = result_queue.get(block=False)
+ if isinstance(completed_process, subprocess.CalledProcessError):
+ LOGGER.error("ansible subprocess error:", completed_process)
+ else:
+ if completed_process.returncode == 0:
+ return completed_process.stdout
+ else:
+ LOGGER.error("ansible subprocess error:", completed_process)
+ except queue.Empty:
+ LOGGER.error("ansible subprocess nothing result")
+
+ @staticmethod
+ def generate_config(host_list, now_time, conf_files, host_ip_trace_result, domain_name):
+ # 取出host_ip,并传入ansible的hosts中
+ hosts = {
+ "all": {
+ "children": {
+ "sync": {
+ "hosts": {
+
+ }
+ }
+ }
+ }
+ }
+
+ for host in host_list:
+ # 生成临时的密钥key文件用于ansible访问远端主机
+ key_file_path = KEY_FILE_PREFIX + host['host_ip'] + KEY_FILE_SUFFIX
+ with open(key_file_path, 'w', encoding="UTF-8") as keyfile:
+ os.chmod(key_file_path, 0o600)
+ keyfile.write(host['pkey'])
+ host_ip = host['host_ip']
+ host_vars = {
+ "ansible_host": host_ip,
+ "ansible_ssh_user": "root",
+ "ansible_ssh_private_key_file": key_file_path,
+ "ansible_ssh_port": host['ssh_port'],
+ "ansible_python_interpreter": "/usr/bin/python3",
+ "host_key_checking": False,
+ "interpreter_python": "auto_legacy_silent",
+ "become": True,
+ "become_method": "sudo",
+ "become_user": "root",
+ "become_ask_pass": False,
+ "ssh_args": "-C -o ControlMaster=auto -o ControlPersist=60s StrictHostKeyChecking=no",
+ "host_id": host['host_id']
+ }
+
+ hosts['all']['children']['sync']['hosts'][host_ip + "_" + str(host['ssh_port'])] = host_vars
+ # 初始化结果
+ host_ip_trace_result[host['host_ip'] + "_" + str(host['ssh_port'])] = True
+
+ HOST_FILE = HOST_PATH_FILE + "hosts_" + now_time + ".yml"
+ with open(HOST_FILE, 'w') as outfile:
+ yaml.dump(hosts, outfile, default_flow_style=False)
+
+ @staticmethod
+ def ansible_conf_trace_mgmt(host_list: list, action: str, conf_files: list, domain_name: str):
+ now_time = str(int(time.time()))
+ host_ip_trace_result = {}
+ ConfTraceMgmt.generate_config(host_list, now_time, conf_files, host_ip_trace_result, domain_name)
+ ansible_forks = len(host_list)
+ # 配置文件中读取并发数量
+ # 从内存中获取serial_count
+ # serial_count = configuration.serial.get("SERIAL_COUNT")
+ # 组装ansible执行的extra参数
+ ip = configuration.zeus.get('IP')
+ port = configuration.zeus.get("PORT")
+ if conf_files:
+ conf_list_str = ",".join(conf_files)
+ else:
+ conf_list_str = ""
+ extra_vars = f"action={action} ip={ip} port={port} conf_list_str={conf_list_str} " \
+ f"domain_name={domain_name} "
+ # 调用ansible
+ try:
+ HOST_FILE = HOST_PATH_FILE + "hosts_" + now_time + ".yml"
+ result = ConfTraceMgmt.ansible_handler(now_time, ansible_forks, extra_vars, HOST_FILE)
+ except Exception as ex:
+ LOGGER.error("ansible playbook execute error:", ex)
+ conf_trace_mgmt_result = "ragdoll-filetrace ansible playbook execute error"
+ return SERVER_ERROR, conf_trace_mgmt_result, host_ip_trace_result
+ # 根据action解析每个result
+ code_num, code_string = ConfTraceMgmt.parse_result(action, result, host_ip_trace_result, HOST_FILE)
+ return code_num, code_string, host_ip_trace_result
+
+ @BaseResponse.handle(schema=ConfTraceMgmtSchema, proxy=HostProxy, token=True)
+ def put(self, callback: HostProxy, **params):
+ host_ids = params.get("host_ids")
+ action = params.get("action")
+ conf_files = params.get("conf_files")
+ domain_name = params.get("domain_name")
+
+ # 根据id获取host信息
+ # Query host address from database
+ if not callback.connect():
+ return self.response(code=state.DATABASE_CONNECT_ERROR, message="database connect error")
+
+ # 校验token
+ status, host_list = callback.get_host_info(
+ # 校验token 拿到用户
+ {"username": params.get("username"), "host_list": host_ids}, True)
+ if status != state.SUCCEED:
+ return self.response(code=status, message="get host info error")
+
+ # 组装ansible外部数据
+ code_num, code_string, host_ip_trace_result = self.ansible_conf_trace_mgmt(host_list, action, conf_files,
+ domain_name)
+ return self.response(code=code_num, message=code_string, data=host_ip_trace_result)
+
+
+class ConfTraceData(BaseResponse):
+ @staticmethod
+ def validate_conf_trace_info(params: dict):
+ """
+ query conf trace info, validate that the host sync status info is valid
+ return host object
+
+ Args:
+ params (dict): e.g
+ {
+ "domain_name": "aops",
+ "host_id": 1,
+ "conf_name": "/etc/hostname",
+ "info": ""
+ }
+
+ Returns:
+ tuple:
+ status code, host sync status object
+ """
+ # 检查host 是否存在
+ host_proxy = HostProxy()
+ if not host_proxy.connect():
+ LOGGER.error("Connect to database error")
+ return state.DATABASE_CONNECT_ERROR, {}
+ data = {"host_list": [params.get("host_id")]}
+ code_num, result_list = host_proxy.get_host_info_by_host_id(data)
+ if code_num != SUCCEED:
+ LOGGER.error("query host info error")
+ return state.DATABASE_QUERY_ERROR, {}
+ if len(result_list) == 0:
+ return state.NO_DATA, []
+ return code_num, result_list
+
+ @BaseResponse.handle(schema=ConfTraceDataSchema, proxy=ConfTraceProxy, token=False)
+ def post(self, callback: ConfTraceProxy, **params):
+ # 校验hostId是否存在
+ code_num, result_list = self.validate_conf_trace_info(params)
+ if code_num != SUCCEED or len(result_list) == 0:
+ return self.response(code=SERVER_ERROR, message="request param host id does not exist")
+
+ status_code = callback.add_conf_trace_info(params)
+ if status_code != state.SUCCEED:
+ return self.response(code=SERVER_ERROR, message="Failed to upload data, service error")
+ return self.response(code=SUCCEED, message="Succeed to upload conf trace info data")
+
+
+class ConfTraceQuery(BaseResponse):
+ @BaseResponse.handle(schema=ConfTraceQuerySchema, proxy=ConfTraceProxy, token=True)
+ def post(self, callback: ConfTraceProxy, **params):
+ status_code, result = callback.query_conf_trace_info(params)
+ if status_code != SUCCEED:
+ return self.response(code=SERVER_ERROR, message="Failed to query data, service error")
+ return self.response(code=SUCCEED, message="Succeed to query conf trace info data", data=result)
+
+
+class ConfTraceDataDelete(BaseResponse):
+ @BaseResponse.handle(schema=ConfTraceDataDeleteSchema, proxy=ConfTraceProxy, token=True)
+ def post(self, callback: ConfTraceProxy, **params):
+ status_code = callback.delete_conf_trace_info(params)
+ if status_code != state.SUCCEED:
+ return self.response(code=SERVER_ERROR, message="Failed to delete data, service error")
+ return self.response(code=SUCCEED, message="Succeed to delete conf trace info data")
diff --git a/zeus/cron/__init__.py b/zeus/cron/__init__.py
new file mode 100644
index 0000000..377a23d
--- /dev/null
+++ b/zeus/cron/__init__.py
@@ -0,0 +1,21 @@
+#!/usr/bin/python3
+# ******************************************************************************
+# Copyright (c) Huawei Technologies Co., Ltd. 2021-2022. All rights reserved.
+# licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+# ******************************************************************************/
+"""
+@FileName: __init__.py.py
+@Time: 2024/3/5 16:55
+@Author: JiaoSiMao
+Description:
+"""
+from zeus.cron.update_config_sync_status_task import UpdateConfigSyncStatusTask
+
+task_meta = {"update_config_sync_status_task": UpdateConfigSyncStatusTask}
diff --git a/zeus/cron/update_config_sync_status_task.py b/zeus/cron/update_config_sync_status_task.py
new file mode 100644
index 0000000..c836291
--- /dev/null
+++ b/zeus/cron/update_config_sync_status_task.py
@@ -0,0 +1,267 @@
+#!/usr/bin/python3
+# ******************************************************************************
+# Copyright (c) Huawei Technologies Co., Ltd. 2021-2022. All rights reserved.
+# licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+# ******************************************************************************/
+"""
+@FileName: update_config_sync_status_task.py
+@Time: 2024/3/5 16:56
+@Author: JiaoSiMao
+Description:
+"""
+import json
+
+import requests
+from vulcanus.log.log import LOGGER
+from vulcanus.timed import TimedTask
+from vulcanus.database.helper import make_mysql_engine_url, create_database_engine
+from vulcanus.database.proxy import MysqlProxy
+from vulcanus.restful.resp import state
+from vulcanus.restful.resp.state import SUCCEED
+
+from zeus.conf import configuration
+from zeus.conf.constant import DIRECTORY_FILE_PATH_LIST
+from zeus.config_manager.view import ObjectFileConfig, CollectConfig
+from zeus.database.proxy.host import HostProxy
+from zeus.database.proxy.host_sync_status import HostSyncProxy
+from zeus.utils.conf_tools import ConfTools
+
+
+class UpdateConfigSyncStatusTask(TimedTask):
+ @staticmethod
+ def get_domain_files(domain_paths: dict, expected_confs_resp: list):
+ # 获取domain中要获取文件内容的文件路径
+ for domain_confs in expected_confs_resp:
+ domain_name = domain_confs.get("domainName")
+ conf_base_infos = domain_confs.get("confBaseInfos")
+ file_list = []
+ if conf_base_infos:
+ for conf_info in conf_base_infos:
+ file_list.append(conf_info.get("filePath"))
+ domain_paths[domain_name] = file_list
+
+ @staticmethod
+ def deal_pam_d_config(host_info, directory_path):
+ # 先获取/etc/pam.d下有哪些文件
+ status, content = ObjectFileConfig.object_file_config_content(
+ host_info, directory_path
+ )
+ if status == state.SUCCEED:
+ content_dict = json.loads(content)
+ directory_paths = content_dict.get("resp")
+ return directory_paths
+ return []
+
+ @staticmethod
+ def deal_host_file_content(domain_result, host_file_content_result):
+ host_id = host_file_content_result.get("host_id")
+ infos = host_file_content_result.get("infos")
+ file_content_list = []
+ pam_d_file_list = []
+ if infos:
+ for info in infos:
+ pam_d_file = {}
+ info_path = str(info.get("path"))
+ for file_path in DIRECTORY_FILE_PATH_LIST:
+ if info_path.find(file_path) == -1:
+ signal_file_content = {
+ "filePath": info.get("path"),
+ "contents": info.get("content"),
+ }
+ file_content_list.append(signal_file_content)
+ else:
+ pam_d_file[info_path] = info.get("content")
+ pam_d_file_list.append(pam_d_file)
+ if pam_d_file_list:
+ directory_file_dict = {}
+ for file_path in DIRECTORY_FILE_PATH_LIST:
+ directory_file_dict[file_path] = {}
+ for path, content_list in directory_file_dict.items():
+ for pam_d_file in pam_d_file_list:
+ pam_d_file_path = str(list(pam_d_file.keys())[0])
+ if path in pam_d_file_path:
+ content_list[pam_d_file_path] = pam_d_file.get(pam_d_file_path)
+ for key, value in directory_file_dict.items():
+ pam_d_file_content = {"filePath": key, "contents": json.dumps(value)}
+ file_content_list.append(pam_d_file_content)
+ if file_content_list:
+ domain_result[str(host_id)] = file_content_list
+
+ def collect_file_infos(self, param, host_infos_result):
+ # 组装host_id和要获取内容的文件列表 一一对应
+ domain_result = {}
+ host_id_with_config_file = {}
+ for host in param.get("infos"):
+ host_id_with_config_file[host.get("host_id")] = host.get("config_list")
+
+ for host_id, file_list in host_id_with_config_file.items():
+ host_info = host_infos_result.get(host_id)
+ # 处理/etc/pam.d
+ for file_path in DIRECTORY_FILE_PATH_LIST:
+ if file_path in file_list:
+ file_list.remove(file_path)
+ object_file_paths = self.deal_pam_d_config(host_info, file_path)
+ if object_file_paths:
+ file_list.extend(object_file_paths)
+ host_file_content_result = CollectConfig.get_file_content(
+ host_info, file_list
+ )
+ # 处理结果
+ self.deal_host_file_content(domain_result, host_file_content_result)
+ return domain_result
+
+ @staticmethod
+ def make_database_engine():
+ engine_url = make_mysql_engine_url(configuration)
+ MysqlProxy.engine = create_database_engine(
+ engine_url,
+ configuration.mysql.get("POOL_SIZE"),
+ configuration.mysql.get("POOL_RECYCLE"),
+ )
+
+ @staticmethod
+ def get_domain_host_ids(domain_list_resp, host_sync_proxy):
+ domain_host_id_dict = {}
+ for domain in domain_list_resp:
+ domain_name = domain["domainName"]
+ status, host_sync_infos = host_sync_proxy.get_domain_host_sync_status(
+ domain_name
+ )
+ if status != SUCCEED or not host_sync_infos:
+ continue
+ host_ids = [host_sync["host_id"] for host_sync in host_sync_infos]
+ domain_host_id_dict[domain_name] = host_ids
+ return domain_host_id_dict
+
+ @staticmethod
+ def get_all_host_infos():
+ host_infos_result = {}
+ proxy = HostProxy()
+ proxy.connect()
+ status, host_list = proxy.get_host_info(
+ {"username": "admin", "host_list": list()}, True
+ )
+ if status != state.SUCCEED:
+ return {}
+ for host in host_list:
+ host_infos_result[host["host_id"]] = host
+ return host_infos_result
+
+ @staticmethod
+ def compare_conf(expected_confs_resp, domain_result):
+ headers = {"Content-Type": "application/json"}
+ # 获取所有的domain
+ domain_conf_diff_url = ConfTools.load_url_by_conf().get("domain_conf_diff_url")
+ # 调用ragdoll接口比对
+ try:
+ request_data = {
+ "expectedConfsResp": expected_confs_resp,
+ "domainResult": domain_result,
+ }
+ domain_diff_response = requests.post(
+ domain_conf_diff_url, data=json.dumps(request_data), headers=headers
+ )
+ domain_diff_resp = json.loads(domain_diff_response.text)
+ if domain_diff_resp.get("data"):
+ return domain_diff_resp.get("data")
+ return []
+ except requests.exceptions.RequestException as connect_ex:
+ LOGGER.error(f"Failed to get domain list, an error occurred: {connect_ex}")
+ return []
+
+ @staticmethod
+ def update_sync_status_for_db(domain_diff_resp, host_sync_proxy):
+ if domain_diff_resp:
+ status, save_ids = host_sync_proxy.update_domain_host_sync_status(
+ domain_diff_resp
+ )
+ update_result = sum(save_ids)
+ if status != SUCCEED or update_result == 0:
+ LOGGER.error("failed update host sync status data")
+ if update_result > 0:
+ LOGGER.info(
+ "update %s host sync status basic info succeed", update_result
+ )
+ else:
+ LOGGER.info("no host sync status data need to update")
+ return
+
+ def execute(self):
+ headers = {"Content-Type": "application/json"}
+ # 获取所有的domain
+ domain_list_url = ConfTools.load_url_by_conf().get("domain_list_url")
+ try:
+ domain_list_response = requests.post(domain_list_url, data=json.dumps({}), headers=headers)
+ domain_list_resp = json.loads(domain_list_response.text)
+ except requests.exceptions.RequestException as connect_ex:
+ LOGGER.error(f"Failed to get domain list, an error occurred: {connect_ex}")
+ return
+ # 处理响应
+ if not domain_list_resp.get("data"):
+ LOGGER.error(
+ "Failed to get all domain, please check interface /domain/queryDomain"
+ )
+ return
+
+ # 调用ragdoll query_excepted_confs接口获取所有业务域的基线配置内容
+ domain_list_url = ConfTools.load_url_by_conf().get("expected_confs_url")
+ domain_names = {"domainNames": domain_list_resp.get("data")}
+ try:
+ expected_confs_response = requests.post(
+ domain_list_url, data=json.dumps(domain_names), headers=headers
+ )
+ expected_confs_resp = json.loads(expected_confs_response.text)
+ except requests.exceptions.RequestException as connect_ex:
+ LOGGER.error(
+ f"Failed to get all domain expected conf list, an error occurred: {connect_ex}"
+ )
+ return
+ if not expected_confs_resp.get("data"):
+ LOGGER.error(
+ "Failed to get all domain confs, please check interface /confs/queryExpectedConfs"
+ )
+ return
+
+ # 方式一 创建数据引擎
+ self.make_database_engine()
+ # 方式一 根据domain获取所有的id从host_conf_sync_status表中读取
+ host_sync_proxy = HostSyncProxy()
+ host_sync_proxy.connect()
+ domain_host_id_dict = self.get_domain_host_ids(
+ domain_list_resp.get("data"), host_sync_proxy
+ )
+ if not domain_host_id_dict:
+ LOGGER.info("no host sync status data need to update")
+ return
+ # 获取所有admin下面的ip的信息
+ host_infos_result = self.get_all_host_infos()
+ if not host_infos_result:
+ LOGGER.info("no host sync status data need to update")
+ return
+
+ # 方式一 组装参数并调用CollectConfig接口get_file_content获取文件真实内容
+ domain_paths = {}
+ self.get_domain_files(domain_paths, expected_confs_resp.get("data"))
+
+ domain_result = {}
+ for domain_name, host_id_list in domain_host_id_dict.items():
+ data = {"infos": []}
+ file_paths = domain_paths.get(domain_name)
+ if file_paths:
+ for host_id in host_id_list:
+ data_info = {"host_id": host_id, "config_list": file_paths}
+ data["infos"].append(data_info)
+ if data["infos"]:
+ result = self.collect_file_infos(data, host_infos_result)
+ domain_result[domain_name] = result
+ # 调用ragdoll接口进行对比
+ domain_diff_resp = self.compare_conf(expected_confs_resp.get("data"), domain_result)
+ # 根据结果更新数据库
+ self.update_sync_status_for_db(domain_diff_resp, host_sync_proxy)
diff --git a/zeus/database/__init__.py b/zeus/database/__init__.py
index 2b8a163..2077be7 100644
--- a/zeus/database/__init__.py
+++ b/zeus/database/__init__.py
@@ -15,7 +15,6 @@ Time:
Author:
Description:
"""
-from flask import g
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.scoping import scoped_session
from vulcanus.database.helper import make_mysql_engine_url
diff --git a/zeus/database/proxy/conf_trace.py b/zeus/database/proxy/conf_trace.py
new file mode 100644
index 0000000..e0e032f
--- /dev/null
+++ b/zeus/database/proxy/conf_trace.py
@@ -0,0 +1,238 @@
+#!/usr/bin/python3
+# ******************************************************************************
+# Copyright (C) 2023 isoftstone Technologies Co., Ltd. All rights reserved.
+# licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+# ******************************************************************************/
+"""
+@FileName: conf_trace.py
+@Time: 2024/4/23 14:24
+@Author: JiaoSiMao
+Description:
+"""
+import datetime
+import json
+import math
+import uuid
+
+import sqlalchemy
+from sqlalchemy import desc, asc, func
+
+from vulcanus.database.proxy import MysqlProxy
+from vulcanus.log.log import LOGGER
+from vulcanus.restful.resp.state import (
+ DATABASE_INSERT_ERROR,
+ SUCCEED, DATABASE_QUERY_ERROR, DATABASE_DELETE_ERROR,
+)
+
+from zeus.database.table import ConfTraceInfo
+
+
+class ConfTraceProxy(MysqlProxy):
+ """
+ Conf trace related table operation
+ """
+
+ def add_conf_trace_info(self, data):
+ """
+ add conf trace info to table
+
+ Args:
+ data: parameter, e.g.
+ {
+ "domain_name": "aops",
+ "host_id": 1,
+ "conf_name": "/etc/hostname",
+ "info": ""
+ }
+
+ Returns:
+ int: SUCCEED or DATABASE_INSERT_ERROR
+ """
+ domain_name = data.get('domain_name')
+ host_id = int(data.get('host_id'))
+ conf_name = data.get('file')
+ info = json.dumps(data)
+ conf_trace_info = ConfTraceInfo(UUID=str(uuid.uuid4()), domain_name=domain_name, host_id=host_id,
+ conf_name=conf_name, info=info, create_time=datetime.datetime.now())
+ try:
+
+ self.session.add(conf_trace_info)
+ self.session.commit()
+ LOGGER.info(
+ f"add {conf_trace_info.domain_name} {conf_trace_info.host_id} {conf_trace_info.conf_name} conf trace "
+ f"info succeed")
+ return SUCCEED
+ except sqlalchemy.exc.SQLAlchemyError as error:
+ LOGGER.error(error)
+ LOGGER.error(
+ f"add {conf_trace_info.domain_name} {conf_trace_info.host_ip} {conf_trace_info.conf_name} conf trace "
+ f"info fail")
+ self.session.rollback()
+ return DATABASE_INSERT_ERROR
+
+ def query_conf_trace_info(self, data):
+ """
+ query conf trace info from table
+
+ Args:
+ data: parameter, e.g.
+ {
+ "domain_name": "aops",
+ "host_id": 1,
+ "conf_name": "/etc/hostname",
+ }
+
+ Returns:
+ int: SUCCEED or DATABASE_INSERT_ERROR
+ """
+ result = {}
+ try:
+ result = self._sort_trace_info_by_column(data)
+ self.session.commit()
+ LOGGER.debug("query conf trace info succeed")
+ return SUCCEED, result
+ except sqlalchemy.exc.SQLAlchemyError as error:
+ LOGGER.error(error)
+ LOGGER.error("query conf trace info fail")
+ return DATABASE_QUERY_ERROR, result
+
+ def delete_conf_trace_info(self, data):
+ """
+ delete conf trace info from table
+
+ Args:
+ data: parameter, e.g.
+ {
+ "domain_name": "aops",
+ "host_ids": [1]
+ }
+
+ Returns:
+ int: SUCCEED or DATABASE_INSERT_ERROR
+ """
+ domainName = data['domain_name']
+ host_ids = data['host_ids']
+ try:
+ # delete matched conf trace info
+ if host_ids:
+ conf_trace_filters = {ConfTraceInfo.host_id.in_(host_ids), ConfTraceInfo.domain_name == domainName}
+ else:
+ conf_trace_filters = {ConfTraceInfo.domain_name == domainName}
+ confTraceInfos = self.session.query(ConfTraceInfo).filter(*conf_trace_filters).all()
+ for confTraceInfo in confTraceInfos:
+ self.session.delete(confTraceInfo)
+ self.session.commit()
+ return SUCCEED
+ except sqlalchemy.exc.SQLAlchemyError as error:
+ LOGGER.error(error)
+ LOGGER.error("delete conf trace info fail")
+ self.session.rollback()
+ return DATABASE_DELETE_ERROR
+
+ @staticmethod
+ def _get_conf_trace_filters(data):
+ """
+ Generate filters
+
+ Args:
+ data(dict)
+
+ Returns:
+ set
+ """
+ domain_name = data.get('domain_name')
+ host_id = data.get('host_id')
+ conf_name = data.get('conf_name')
+ filters = {ConfTraceInfo.host_id > 0}
+ if domain_name:
+ filters.add(ConfTraceInfo.domain_name == domain_name)
+ if host_id:
+ filters.add(ConfTraceInfo.host_id == host_id)
+ if conf_name:
+ filters.add(ConfTraceInfo.conf_name == conf_name)
+ return filters
+
+ def _get_conf_trace_count(self, filters):
+ """
+ Query according to filters
+
+ Args:
+ filters(set): query filters
+
+ Returns:
+ int
+ """
+ total_count = self.session.query(func.count(ConfTraceInfo.UUID)).filter(*filters).scalar()
+ return total_count
+
+ def _sort_trace_info_by_column(self, data):
+ """
+ Sort conf trace info by specified column
+
+ Args:
+ data(dict): sorted condition info
+
+ Returns:
+ dict
+ """
+ result = {"total_count": 0, "total_page": 0, "conf_trace_infos": []}
+ sort = data.get('sort')
+ direction = desc if data.get('direction') == 'desc' else asc
+ page = data.get('page')
+ per_page = data.get('per_page')
+ total_page = 1
+ filters = self._get_conf_trace_filters(data)
+ total_count = self._get_conf_trace_count(filters)
+ if total_count == 0:
+ return result
+
+ if sort:
+ if page and per_page:
+ total_page = math.ceil(total_count / per_page)
+ conf_trace_infos = (
+ self.session.query(ConfTraceInfo)
+ .filter(*filters)
+ .order_by(direction(getattr(ConfTraceInfo, sort)))
+ .offset((page - 1) * per_page)
+ .limit(per_page)
+ .all()
+ )
+ else:
+ conf_trace_infos = self.session.query(ConfTraceInfo).filter(*filters).order_by(
+ direction(getattr(ConfTraceInfo, sort))).all()
+ else:
+ if page and per_page:
+ total_page = math.ceil(total_count / per_page)
+ conf_trace_infos = self.session.query(ConfTraceInfo).filter(*filters).offset(
+ (page - 1) * per_page).limit(per_page).all()
+ else:
+ conf_trace_infos = self.session.query(ConfTraceInfo).filter(*filters).all()
+
+ LOGGER.error(f"conf_trace_infos is {conf_trace_infos}")
+ for conf_trace_info in conf_trace_infos:
+ info_dict = json.loads(conf_trace_info.info)
+ info_str = f"进程:{info_dict.get('cmd')} 修改了文件:{info_dict.get('file')}"
+ ptrace_data = "=> ".join(f"{item['cmd']}:{item['pid']}" for item in info_dict.get('ptrace'))
+ ptrace = f"{info_dict.get('cmd')} => {ptrace_data}"
+ conf_trace_info = {
+ "UUID": conf_trace_info.UUID,
+ "domain_name": conf_trace_info.domain_name,
+ "host_id": conf_trace_info.host_id,
+ "conf_name": conf_trace_info.conf_name,
+ "info": info_str,
+ "create_time": str(conf_trace_info.create_time),
+ "ptrace": ptrace
+ }
+ result["conf_trace_infos"].append(conf_trace_info)
+
+ result["total_page"] = total_page
+ result["total_count"] = total_count
+ LOGGER.error(f"result is {result}")
+ return result
diff --git a/zeus/database/proxy/host.py b/zeus/database/proxy/host.py
index 2e4a6ce..1dad1fd 100644
--- a/zeus/database/proxy/host.py
+++ b/zeus/database/proxy/host.py
@@ -278,6 +278,7 @@ class HostProxy(MysqlProxy):
"management": host.management,
"scene": host.scene,
"os_version": host.os_version,
+ "status": host.status,
"ssh_port": host.ssh_port,
}
result['host_infos'].append(host_info)
@@ -346,6 +347,64 @@ class HostProxy(MysqlProxy):
LOGGER.error("query host %s basic info fail", host_list)
return DATABASE_QUERY_ERROR, result
+ def get_host_info_by_host_id(self, data):
+ """
+ Get host basic info according to host id from table
+
+ Args:
+ data(dict): parameter, e.g.
+ {
+ "username": "admin"
+ "host_list": ["id1", "id2"]
+ }
+ is_collect_file (bool)
+
+ Returns:
+ int: status code
+ dict: query result
+ """
+ host_list = data.get('host_list')
+ result = []
+ query_fields = [
+ Host.host_id,
+ Host.host_name,
+ Host.host_ip,
+ Host.os_version,
+ Host.ssh_port,
+ Host.host_group_name,
+ Host.management,
+ Host.status,
+ Host.scene,
+ Host.pkey,
+ Host.ssh_user
+ ]
+ filters = set()
+ if host_list:
+ filters.add(Host.host_id.in_(host_list))
+ try:
+ hosts = self.session.query(*query_fields).filter(*filters).all()
+ for host in hosts:
+ host_info = {
+ "host_id": host.host_id,
+ "host_group_name": host.host_group_name,
+ "host_name": host.host_name,
+ "host_ip": host.host_ip,
+ "management": host.management,
+ "status": host.status,
+ "scene": host.scene,
+ "os_version": host.os_version,
+ "ssh_port": host.ssh_port,
+ "pkey": host.pkey,
+ "ssh_user": host.ssh_user,
+ }
+ result.append(host_info)
+ LOGGER.debug("query host %s basic info succeed", host_list)
+ return SUCCEED, result
+ except sqlalchemy.exc.SQLAlchemyError as error:
+ LOGGER.error(error)
+ LOGGER.error("query host %s basic info fail", host_list)
+ return DATABASE_QUERY_ERROR, result
+
def get_host_ssh_info(self, data):
"""
Get host ssh info according to host id from table
diff --git a/zeus/database/proxy/host_sync_status.py b/zeus/database/proxy/host_sync_status.py
new file mode 100644
index 0000000..7f4e165
--- /dev/null
+++ b/zeus/database/proxy/host_sync_status.py
@@ -0,0 +1,208 @@
+#!/usr/bin/python3
+# ******************************************************************************
+# Copyright (C) 2023 isoftstone Technologies Co., Ltd. All rights reserved.
+# licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+# ******************************************************************************/
+"""
+Time:
+Author:
+Description: Host table operation
+"""
+from typing import Tuple
+
+import sqlalchemy
+from vulcanus.database.proxy import MysqlProxy
+from vulcanus.log.log import LOGGER
+from vulcanus.restful.resp.state import (
+ DATABASE_DELETE_ERROR,
+ DATABASE_INSERT_ERROR,
+ SUCCEED, DATABASE_QUERY_ERROR,
+)
+from zeus.database.table import HostSyncStatus
+
+
+class HostSyncProxy(MysqlProxy):
+ """
+ Host related table operation
+ """
+
+ def add_host_sync_status(self, data) -> int:
+ """
+ add host to table
+
+ Args:
+ host_sync_status: parameter, e.g.
+ {
+ "host_id": 1,
+ "host_ip": "192.168.1.1",
+ "domain_name": "aops",
+ "sync_status": 0
+ }
+
+ Returns:
+ int: SUCCEED or DATABASE_INSERT_ERROR
+ """
+ host_id = data.get('host_id')
+ host_ip = str(data.get('host_ip'))
+ domain_name = data.get('domain_name')
+ sync_status = data.get('sync_status')
+ host_sync_status = HostSyncStatus(host_id=host_id, host_ip=host_ip, domain_name=domain_name,
+ sync_status=sync_status)
+ try:
+
+ self.session.add(host_sync_status)
+ self.session.commit()
+ LOGGER.info(f"add {host_sync_status.domain_name} {host_sync_status.host_ip} host sync status succeed")
+ return SUCCEED
+ except sqlalchemy.exc.SQLAlchemyError as error:
+ LOGGER.error(error)
+ LOGGER.error(f"add {host_sync_status.domain_name} {host_sync_status.host_ip} host sync status fail")
+ self.session.rollback()
+ return DATABASE_INSERT_ERROR
+
+ def add_host_sync_status_batch(self, host_sync_list: list) -> str:
+ """
+ Add host to the table in batches
+
+ Args:
+ host_sync_list(list): list of host sync status object
+
+ Returns:
+ str: SUCCEED or DATABASE_INSERT_ERROR
+ """
+ try:
+ self.session.bulk_save_objects(host_sync_list)
+ self.session.commit()
+ LOGGER.info(f"add host {[host_sync_status.host_ip for host_sync_status in host_sync_list]} succeed")
+ return SUCCEED
+ except sqlalchemy.exc.SQLAlchemyError as error:
+ LOGGER.error(error)
+ self.session.rollback()
+ return DATABASE_INSERT_ERROR
+
+ def delete_host_sync_status(self, data):
+ """
+ Delete host from table
+
+ Args:
+ data(dict): parameter, e.g.
+ {
+ "host_id": 1,
+ "domain_name": "aops",
+ }
+
+ Returns:
+ int
+ """
+ host_id = data['host_id']
+ domain_name = data['domain_name']
+ try:
+ # query matched host sync status
+ hostSyncStatus = self.session.query(HostSyncStatus). \
+ filter(HostSyncStatus.host_id == host_id). \
+ filter(HostSyncStatus.domain_name == domain_name). \
+ all()
+ for host_sync in hostSyncStatus:
+ self.session.delete(host_sync)
+ self.session.commit()
+ LOGGER.info(f"delete {domain_name} {host_id} host sync status succeed")
+ return SUCCEED
+ except sqlalchemy.exc.SQLAlchemyError as error:
+ LOGGER.error(error)
+ LOGGER.error("delete host sync status fail")
+ self.session.rollback()
+ return DATABASE_DELETE_ERROR
+
+ def delete_all_host_sync_status(self, data):
+ """
+ Delete host from table
+
+ Args:
+ data(dict): parameter, e.g.
+ {
+ "host_ids": [1],
+ "domain_name": "aops",
+ }
+
+ Returns:
+ int
+ """
+ host_ids = data['host_ids']
+ domain_name = data['domain_name']
+ try:
+ # query matched host sync status
+ if host_ids:
+ host_conf_sync_filters = {HostSyncStatus.host_id.in_(host_ids),
+ HostSyncStatus.domain_name == domain_name}
+ else:
+ host_conf_sync_filters = {HostSyncStatus.domain_name == domain_name}
+ hostSyncStatus = self.session.query(HostSyncStatus). \
+ filter(*host_conf_sync_filters). \
+ all()
+ for host_sync in hostSyncStatus:
+ self.session.delete(host_sync)
+ self.session.commit()
+ LOGGER.info(f"delete {domain_name} {host_ids} host sync status succeed")
+ return SUCCEED
+ except sqlalchemy.exc.SQLAlchemyError as error:
+ LOGGER.error(error)
+ LOGGER.error("delete host sync status fail")
+ self.session.rollback()
+ return DATABASE_DELETE_ERROR
+
+ def get_host_sync_status(self, data) -> Tuple[int, dict]:
+ host_id = data['host_id']
+ domain_name = data['domain_name']
+ try:
+ host_sync_status = self.session.query(HostSyncStatus). \
+ filter(HostSyncStatus.host_id == host_id). \
+ filter(HostSyncStatus.domain_name == domain_name).one_or_none()
+ return SUCCEED, host_sync_status
+ except sqlalchemy.exc.SQLAlchemyError as error:
+ LOGGER.error(error)
+ return DATABASE_QUERY_ERROR, {}
+
+ def get_domain_host_sync_status(self, domain_name: str):
+ try:
+ host_sync_status = self.session.query(HostSyncStatus). \
+ filter(HostSyncStatus.domain_name == domain_name).all()
+ result = []
+ for host_sync in host_sync_status:
+ single_host_sync_status = {
+ "host_id": host_sync.host_id,
+ "host_ip": host_sync.host_ip,
+ "domain_name": host_sync.domain_name,
+ "sync_status": host_sync.sync_status
+ }
+ result.append(single_host_sync_status)
+ self.session.commit()
+ LOGGER.debug("query host sync status %s basic info succeed", result)
+ return SUCCEED, result
+ except sqlalchemy.exc.SQLAlchemyError as error:
+ LOGGER.error(error)
+ return DATABASE_QUERY_ERROR, []
+
+ def update_domain_host_sync_status(self, domain_diff_resp: list):
+ try:
+ saved_ids = []
+ for domain_diff in domain_diff_resp:
+ update_count = self.session.query(HostSyncStatus).filter(
+ HostSyncStatus.host_id == domain_diff.get("host_id")). \
+ filter(HostSyncStatus.domain_name == domain_diff.get("domain_name")).update(domain_diff)
+ saved_ids.append(update_count)
+ self.session.commit()
+ LOGGER.debug("update host sync status { %s, %s }basic info succeed", domain_diff.get("host_id"),
+ domain_diff.get("domain_name"))
+ if saved_ids:
+ return SUCCEED, saved_ids
+ return DATABASE_QUERY_ERROR, []
+ except sqlalchemy.exc.SQLAlchemyError as error:
+ LOGGER.error(error)
+ return DATABASE_QUERY_ERROR, []
diff --git a/zeus/database/table.py b/zeus/database/table.py
index 9cf604b..e9c20ec 100644
--- a/zeus/database/table.py
+++ b/zeus/database/table.py
@@ -15,7 +15,9 @@ Time:
Author:
Description: mysql tables
"""
-from sqlalchemy import Column, ForeignKey
+import datetime
+
+from sqlalchemy import Column, ForeignKey, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.sql.sqltypes import Boolean, Integer, String
@@ -132,6 +134,33 @@ class Auth(Base, MyBase):
username = Column(String(40), ForeignKey('user.username'))
+class HostSyncStatus(Base, MyBase):
+ """
+ HostSyncStatus table
+ """
+
+ __tablename__ = "host_conf_sync_status"
+
+ host_id = Column(Integer, primary_key=True)
+ host_ip = Column(String(16), nullable=False)
+ domain_name = Column(String(16), primary_key=True)
+ sync_status = Column(Integer, default=0)
+
+
+class ConfTraceInfo(Base, MyBase):
+ """
+ ConfTraceInfo table
+ """
+ __tablename__ = "conf_trace_info"
+
+ UUID = Column(String(36), primary_key=True)
+ domain_name = Column(String(16))
+ host_id = Column(Integer)
+ conf_name = Column(String(100))
+ info = Column(Text)
+ create_time = Column(DateTime, default=datetime.datetime)
+
+
def create_utils_tables(base, engine):
"""
Create basic database tables, e.g. user, host, hostgroup
@@ -142,6 +171,6 @@ def create_utils_tables(base, engine):
engine (instance): _engine.Engine instance
"""
# pay attention, the sequence of list is important. Base table need to be listed first.
- tables = [User, HostGroup, Host, Auth]
+ tables = [User, HostGroup, Host, Auth, HostSyncStatus, ConfTraceInfo]
tables_objects = [base.metadata.tables[table.__tablename__] for table in tables]
create_tables(base, engine, tables=tables_objects)
diff --git a/zeus/function/verify/conf_trace.py b/zeus/function/verify/conf_trace.py
new file mode 100644
index 0000000..932c369
--- /dev/null
+++ b/zeus/function/verify/conf_trace.py
@@ -0,0 +1,70 @@
+#!/usr/bin/python3
+# ******************************************************************************
+# Copyright (C) 2023 isoftstone Technologies Co., Ltd. All rights reserved.
+# licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+# ******************************************************************************/
+"""
+@FileName: conf_trace.py
+@Time: 2024/4/19 10:23
+@Author: JiaoSiMao
+Description:
+"""
+from marshmallow import Schema, fields, validate
+
+
+class ConfTraceMgmtSchema(Schema):
+ """
+ validators for parameter of /conftrace/mgmt
+ """
+ host_ids = fields.List(fields.Integer(), required=True)
+ action = fields.Str(required=True, validate=validate.OneOf(['stop', 'start', 'update']))
+ conf_files = fields.List(fields.String(), required=False)
+ domain_name = fields.String(required=True, validate=lambda s: len(s) > 0)
+
+
+class PtraceSchema(Schema):
+ cmd = fields.String(required=True, validate=lambda s: len(s) > 0)
+ pid = fields.Integer(required=True)
+
+
+class ConfTraceDataSchema(Schema):
+ """
+ validators for parameter of /conftrace/data
+ """
+ host_id = fields.Integer(required=True, validate=lambda s: s > 0)
+ domain_name = fields.String(required=True, validate=lambda s: len(s) > 0)
+ file = fields.String(required=True, validate=lambda s: len(s) > 0)
+ syscall = fields.String(required=True)
+ pid = fields.Integer(required=True, validate=lambda s: s > 0)
+ inode = fields.Integer(required=True)
+ cmd = fields.String(required=True, validate=lambda s: len(s) > 0)
+ ptrace = fields.List(fields.Nested(PtraceSchema()), required=True)
+ flag = fields.Integer(required=True)
+
+
+class ConfTraceQuerySchema(Schema):
+ """
+ validators for parameter of /conftrace/query
+ """
+ domain_name = fields.String(required=True, validate=lambda s: len(s) > 0)
+ host_id = fields.Integer(required=True, validate=lambda s: s > 0)
+ conf_name = fields.String(required=True, validate=lambda s: len(s) > 0)
+ sort = fields.String(required=False, validate=validate.OneOf(["create_time", "host_id", ""]))
+ direction = fields.String(required=False, validate=validate.OneOf(["desc", "asc"]))
+ page = fields.Integer(required=False, validate=lambda s: s > 0)
+ per_page = fields.Integer(required=False, validate=lambda s: 50 > s > 0)
+
+
+class ConfTraceDataDeleteSchema(Schema):
+ """
+ validators for parameter of /conftrace/delete
+ """
+ domain_name = fields.String(required=True, validate=lambda s: len(s) > 0)
+ host_ids = fields.List(fields.Integer(), required=False)
diff --git a/zeus/function/verify/config.py b/zeus/function/verify/config.py
index 1ef7b97..021a45f 100644
--- a/zeus/function/verify/config.py
+++ b/zeus/function/verify/config.py
@@ -53,3 +53,18 @@ class ObjectFileConfigSchema(Schema):
"""
host_id = fields.Integer(required=True, validate=lambda s: s > 0)
file_directory = fields.String(required=True, validate=lambda s: len(s) > 0)
+
+
+class SingleSyncConfig(Schema):
+ file_path = fields.String(required=True, validate=lambda s: len(s) > 0)
+ content = fields.String(required=True, validate=lambda s: len(s) > 0)
+
+
+class BatchSyncConfigSchema(Schema):
+ """
+ validators for SyncConfigSchema
+ """
+ host_ids = fields.List(fields.Integer(required=True, validate=lambda s: s > 0), required=True,
+ validate=lambda s: len(s) > 0)
+ file_path_infos = fields.List(fields.Nested(SingleSyncConfig(), required=True), required=True,
+ validate=lambda s: len(s) > 0)
diff --git a/zeus/function/verify/host.py b/zeus/function/verify/host.py
index 7dedfee..48c434c 100644
--- a/zeus/function/verify/host.py
+++ b/zeus/function/verify/host.py
@@ -149,3 +149,39 @@ class UpdateHostSchema(Schema):
host_group_name = fields.String(required=False, validate=lambda s: 20 >= len(s) > 0)
management = fields.Boolean(required=False, truthy={True}, falsy={False})
ssh_pkey = fields.String(required=False, validate=lambda s: 4096 >= len(s) >= 0)
+
+
+class AddHostSyncStatusSchema(Schema):
+ """
+ validators for parameter of /manage/host/sync/status/add
+ """
+
+ host_id = fields.Integer(required=True, validate=lambda s: s > 0)
+ host_ip = fields.IP(required=True)
+ domain_name = fields.String(required=True, validate=lambda s: len(s) > 0)
+ sync_status = fields.Integer(required=True, validate=lambda s: s >= 0)
+
+
+class DeleteHostSyncStatusSchema(Schema):
+ """
+ validators for parameter of /manage/host/sync/status/delete
+ """
+
+ host_id = fields.Integer(required=True, validate=lambda s: s > 0)
+ domain_name = fields.String(required=True, validate=lambda s: len(s) > 0)
+
+
+class DeleteAllHostSyncStatusSchema(Schema):
+ """
+ validators for parameter of /manage/host/sync/status/delete
+ """
+
+ host_ids = fields.List(fields.Integer(), required=False)
+ domain_name = fields.String(required=True, validate=lambda s: len(s) > 0)
+
+
+class GetHostSyncStatusSchema(Schema):
+ """
+ validators for parameter of /manage/host/sync/status/get
+ """
+ domain_name = fields.String(required=True)
diff --git a/zeus/host_manager/ssh.py b/zeus/host_manager/ssh.py
index a4e7628..fa6d2c0 100644
--- a/zeus/host_manager/ssh.py
+++ b/zeus/host_manager/ssh.py
@@ -12,14 +12,14 @@
# ******************************************************************************/
import socket
from io import StringIO
-from typing import Tuple
+from typing import Tuple, Union
import paramiko
from vulcanus.log.log import LOGGER
from vulcanus.restful.resp import state
-__all__ = ["SSH", "generate_key", "execute_command_and_parse_its_result"]
+__all__ = ["SSH", "InteroperableSSH", "generate_key", "execute_command_and_parse_its_result"]
from zeus.function.model import ClientConnectArgs
@@ -57,13 +57,7 @@ class SSH:
"""
def __init__(self, ip, username, port, password=None, pkey=None):
- self._client_args = {
- 'hostname': ip,
- 'username': username,
- 'port': port,
- "password": password,
- "pkey": pkey
- }
+ self._client_args = {'hostname': ip, 'username': username, 'port': port, "password": password, "pkey": pkey}
self._client = self.client()
def client(self):
@@ -77,15 +71,15 @@ class SSH:
def execute_command(self, command: str, timeout: float = None) -> tuple:
"""
- create a ssh client, execute command and parse result
+ create a ssh client, execute command and parse result
- Args:
- command(str): shell command
- timeout(float): the maximum time to wait for the result of command execution
+ Args:
+ command(str): shell command
+ timeout(float): the maximum time to wait for the result of command execution
- Returns:
- tuple:
- status, result, error message
+ Returns:
+ tuple:
+ status, result, error message
"""
open_channel = self._client.get_transport().open_session(timeout=timeout)
open_channel.set_combine_stderr(False)
@@ -102,6 +96,94 @@ class SSH:
self._client.close()
+class InteroperableSSH:
+ """
+ An interactive SSH client used to run command in remote node
+
+ Attributes:
+ ip(str): host ip address, the field is used to record ip information in method paramiko.SSHClient()
+ username(str): remote login user
+ port(int or str): remote login port
+ password(str)
+ pkey(str): RSA-KEY string
+
+ Notes:
+ In this project, the password field is used when connect to the host for the first
+ time, and the pkey field is used when need to execute the command on the client.
+ """
+
+ def __init__(
+ self,
+ ip: str,
+ port: Union[int, str],
+ username: str = 'root',
+ password: str = None,
+ pkey: str = None,
+ channel_timeout=1000,
+ recv_buffer: int = 4096,
+ ) -> None:
+ self.__client = paramiko.SSHClient()
+ self.__client.load_system_host_keys()
+ self.__client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ self.__client.connect(
+ hostname=ip,
+ port=int(port),
+ username=username,
+ password=password,
+ pkey=paramiko.RSAKey.from_private_key(StringIO(pkey)),
+ timeout=5,
+ )
+
+ # Open an SSH channel and start a shell
+ self.__chan = self.__client.get_transport().open_session()
+ self.__chan.get_pty()
+ self.__chan.invoke_shell()
+ self.__chan.settimeout(channel_timeout)
+
+ self.buffer = recv_buffer
+
+ @property
+ def is_active(self):
+ """
+ Returns the current status of the SSH connection.
+
+ Returns True if the connection is active, False otherwise.
+ """
+ return self.__client.get_transport().is_active()
+
+ def send(self, cmd: str):
+ """
+ Sends a command to the SSH channel.
+
+ cmd: The command to send, e.g., 'ls -l'.
+ """
+ self.__chan.send(cmd)
+
+ def recv(self) -> str:
+ """
+ Receives data from the SSH channel and decodes it into a UTF-8 string.
+
+ Returns: The received data, e.g., 'file1\nfile2\n'.
+ """
+
+ return self.__chan.recv(self.buffer).decode("utf-8")
+
+ def close(self):
+ """
+ close open_channel
+ """
+ self.__client.close()
+
+ def resize(self, cols: int, rows: int):
+ """
+ Resizes the terminal size of the SSH channel.
+
+ cols: The number of columns for the terminal.
+ rows: The number of rows for the terminal.
+ """
+ self.__chan.resize_pty(width=cols, height=rows)
+
+
def execute_command_and_parse_its_result(connect_args: ClientConnectArgs, command: str) -> tuple:
"""
create a ssh client, execute command and parse result
@@ -116,14 +198,13 @@ def execute_command_and_parse_its_result(connect_args: ClientConnectArgs, comman
status, result
"""
if not connect_args.pkey:
- return state.SSH_AUTHENTICATION_ERROR, f"ssh authentication failed when connect host " \
- f"{connect_args.host_ip}"
+ return state.SSH_AUTHENTICATION_ERROR, f"ssh authentication failed when connect host " f"{connect_args.host_ip}"
try:
client = SSH(
ip=connect_args.host_ip,
username=connect_args.ssh_user,
port=connect_args.ssh_port,
- pkey=paramiko.RSAKey.from_private_key(StringIO(connect_args.pkey))
+ pkey=paramiko.RSAKey.from_private_key(StringIO(connect_args.pkey)),
)
exit_status, stdout, stderr = client.execute_command(command, connect_args.timeout)
except socket.error as error:
@@ -155,14 +236,13 @@ def execute_command_sftp_result(connect_args: ClientConnectArgs, local_path=None
"""
global sftp_client, client
if not connect_args.pkey:
- return state.SSH_AUTHENTICATION_ERROR, f"ssh authentication failed when connect host " \
- f"{connect_args.host_ip}"
+ return state.SSH_AUTHENTICATION_ERROR, f"ssh authentication failed when connect host " f"{connect_args.host_ip}"
try:
client = SSH(
ip=connect_args.host_ip,
username=connect_args.ssh_user,
port=connect_args.ssh_port,
- pkey=paramiko.RSAKey.from_private_key(StringIO(connect_args.pkey))
+ pkey=paramiko.RSAKey.from_private_key(StringIO(connect_args.pkey)),
)
sftp_client = client.client().open_sftp()
diff --git a/zeus/host_manager/terminal.py b/zeus/host_manager/terminal.py
new file mode 100644
index 0000000..e9ce452
--- /dev/null
+++ b/zeus/host_manager/terminal.py
@@ -0,0 +1,250 @@
+#!/usr/bin/python3
+# ******************************************************************************
+# Copyright (c) Huawei Technologies Co., Ltd. 2022-2022. All rights reserved.
+# licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+# ******************************************************************************/
+import sqlalchemy
+from flask import request
+from flask_socketio import SocketIO, Namespace, join_room, leave_room
+from vulcanus.log.log import LOGGER
+from vulcanus.exceptions import DatabaseConnectionFailed
+from zeus.database.proxy.host import HostProxy
+from zeus.host_manager.utils.sockets import XtermRoom
+from zeus.database.table import Host
+from zeus.host_manager.ssh import InteroperableSSH
+
+
+socketio = SocketIO(
+ cors_allowed_origins="*",
+ async_mode="gevent",
+)
+
+# init singleton xterm rooms in global properties
+# to avoid duplicated initializing in different sessions.
+socket_room = XtermRoom(sio=socketio)
+
+
+class TerminalNamspace(Namespace):
+ def on_open(self, event: dict):
+ """
+ Handle Terminal open event
+
+ Args
+ event:
+ ssh_info:
+ type: object
+ example: {
+ host_id(int): 12
+ }
+ room:
+ type: string
+ example: abc
+
+ Returns: None
+ """
+ room_id = event.get("room")
+ ssh_info = event.get("ssh_info")
+
+ if not room_id or not ssh_info:
+ self._handle_error(
+ "lack of room or ssh information, \
+ fail to establish ssh connection"
+ )
+
+ host_info = self._get_host_info(ssh_info.get('host_id'))
+
+ try:
+ joined = socket_room.join(
+ room_id=room_id,
+ namespace=self.namespace,
+ room_sock=InteroperableSSH(
+ ip=host_info.get('host_ip', '0.0.0.0'),
+ port=host_info.get('ssh_port', 22),
+ username=host_info.get('ssh_user', 'root'),
+ pkey=host_info.get('pkey'),
+ ),
+ )
+ if not joined:
+ raise RuntimeError(f"could not create socket_room[{room_id}]")
+ join_room(room=room_id)
+ except Exception as error:
+ LOGGER.error(error)
+ socket_room.leave(room_id)
+ leave_room(room_id)
+
+ def on_join(self, event: dict):
+ """
+ Handle join event
+
+ Args:
+ event:
+ room:
+ type: string
+ example: abc
+
+ Returns: None
+ """
+ room = event.get("room")
+ if not room:
+ LOGGER.error("lack of room token, fail to join in.")
+
+ try:
+ socket_room.join(room)
+ join_room(room)
+
+ except Exception as error:
+ LOGGER.error(error)
+ socket_room.leave(room)
+ leave_room(room)
+
+ def on_stdin(self, event: dict):
+ """
+ Handle stdin event
+
+ Args:
+ event:
+ room:
+ type: string
+ .e.g: abc
+ data:
+ type: string
+ .e.g: 'ls -a'
+ Returns: None
+ """
+ room = event.get("room")
+ data = event.get("data")
+ if not room or not data:
+ return
+
+ if not socket_room.has(room):
+ self._handle_error(f"socket_room['{room}'] does not exist")
+ leave_room(room=room)
+
+ sent = socket_room.send(room_id=room, data=data)
+ if not sent:
+ self._handle_error(
+ f"socket_room['{room}'] does not exist, \
+ could not send data to it."
+ )
+
+ def on_leave(self, event: dict):
+ """
+ Handle leave room event
+
+ Args:
+ event:
+ room:
+ type: string
+ .e.g: abc
+
+ Returns: None
+ """
+ room = event.get("room")
+ if not room or not socket_room.has(room):
+ return
+
+ socket_room.leave(room_id=room)
+ leave_room(room)
+
+ def on_resize(self, event: dict):
+ """
+ Handle resize event
+
+ Args:
+ event:
+ room:
+ type: string
+ .e.g: abc
+ data:
+ type: dict
+ cols:
+ type: number
+ .e.g: 30
+ cows:
+ type: number
+ .e.g: 30
+
+ Returns: None
+ """
+ room = event.get("room")
+ data = event.get("data")
+ if not room or not data:
+ return
+
+ if not socket_room.has(room):
+ self._handle_error(f"socket_room[{room}] does not exist")
+ leave_room(room)
+
+ resized = socket_room.resize(room, data.get("cols"), data.get("rows"))
+ if not resized:
+ self._handle_error(
+ f"socket_room[{room}] does not exist,\
+ could not send data to it."
+ )
+
+ def _get_host_info(self, host_id: int):
+ """
+ select host_ip, ssh_port, ssh_user, pkey from host table by host id
+
+ Args:
+ host_id: int e.g. 3
+
+ Returns: host_info
+ dict: e.g.
+ {
+ "host_ip": "127.0.0.1",
+ "ssh_port": 22,
+ "ssh_user": "root",
+ "pkey": "xxxxxxxxxxxxxxxx"
+ }
+ """
+ query_fields = [
+ Host.host_ip,
+ Host.ssh_port,
+ Host.pkey,
+ Host.ssh_user,
+ ]
+ host_info = {}
+
+ try:
+ with HostProxy() as db_proxy:
+ host: Host = db_proxy.session.query(*query_fields).filter(Host.host_id == host_id).first()
+ host_info = {
+ "host_ip": host.host_ip,
+ "ssh_port": host.ssh_port,
+ "pkey": host.pkey,
+ "ssh_user": host.ssh_user,
+ }
+ LOGGER.debug("query host info %s succeed", host_info)
+ return host_info
+ except DatabaseConnectionFailed as connect_error:
+ LOGGER.error('connect database failed, %s', connect_error)
+ return host_info
+ except sqlalchemy.exc.SQLAlchemyError as query_error:
+ LOGGER.error("query host info failed %s", query_error)
+ return host_info
+
+ def _handle_error(self, err: str):
+ """
+ unified handling of exceptions
+ """
+ LOGGER.error(
+ "session[ %s ] connects testbox terminal, failed: { %s }",
+ request.sid,
+ str(err),
+ )
+ socketio.emit(
+ "error",
+ f"connect failed: {str(err)}",
+ namespace=self.namespace,
+ )
+
+
+socketio.on_namespace(TerminalNamspace("/terminal"))
diff --git a/zeus/host_manager/utils/__init__.py b/zeus/host_manager/utils/__init__.py
new file mode 100644
index 0000000..cb8be16
--- /dev/null
+++ b/zeus/host_manager/utils/__init__.py
@@ -0,0 +1,18 @@
+#!/usr/bin/python3
+# ******************************************************************************
+# Copyright (C) 2023 isoftstone Technologies Co., Ltd. All rights reserved.
+# licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+# ******************************************************************************/
+"""
+@FileName: __init__.py.py
+@Time: 2024/5/29 10:13
+@Author: JiaoSiMao
+Description:
+"""
diff --git a/zeus/host_manager/utils/sockets.py b/zeus/host_manager/utils/sockets.py
new file mode 100644
index 0000000..5dfaa75
--- /dev/null
+++ b/zeus/host_manager/utils/sockets.py
@@ -0,0 +1,198 @@
+#!/usr/bin/python3
+# ******************************************************************************
+# Copyright (c) Huawei Technologies Co., Ltd. 2022-2022. All rights reserved.
+# licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+# ******************************************************************************/
+import threading
+from flask_socketio import SocketIO
+from vulcanus.log.log import LOGGER
+from zeus.host_manager.ssh import InteroperableSSH
+
+
+class XtermRoom:
+ """
+ This class represents a collection of Xterm rooms.
+ Each room is a unique SSH connection.
+ """
+
+ # The rooms dictionary stores all the active rooms.
+ # Note: This implementation is only suitable for a single-process server.
+ # If you need to deploy a multi-process server, consider using a database,
+ # middleware, or a separate service to manage the rooms.
+ rooms = {}
+
+ def __init__(self, sio: SocketIO) -> None:
+ """
+ Initialize the XtermRooms instance.
+
+ sio: The SocketIO instance used for communication.
+ """
+ self.sio = sio
+ self.stop_event = threading.Event()
+
+ def has(self, room_id: str) -> bool:
+ """
+ Check if a room with the given ID exists and is active.
+
+ room_id: The ID of the room to check.
+
+ Returns: True if the room exists and is active, False otherwise.
+ """
+
+ room_info = self.rooms.get(room_id)
+
+ if (
+ not room_info
+ or not room_info["socket"].is_active
+ or room_info["conns"] < 1
+ or not room_info["thread"].is_alive()
+ ):
+ self._del(room_id)
+ return False
+
+ return True
+
+ def send(self, room_id: str, data: str) -> bool:
+ """
+ Sends data to the room with the given ID.
+
+ room_id: The ID of the room to send data to.
+ data: The data to send.
+
+ Returns: True if the operation is successful, False otherwise.
+ """
+
+ if not self.rooms.get(room_id):
+ return False
+
+ self.rooms[room_id]["socket"].send(data)
+ return True
+
+ def join(self, room_id: str, namespace: str = None, room_sock=None) -> bool:
+ """
+ Join a room with the given ID. If the room does not exist,
+ create it.
+
+ room_id: The ID of the room to join.
+ room_sock: The socket of the room to join.
+ If None, a new socket will be created.
+
+ Returns: True if the operation is successful, False otherwise.
+ """
+ if not self.rooms.get(room_id):
+ return self._add(room_id, namespace, room_sock)
+
+ self.rooms[room_id]["conns"] += 1
+ self.rooms[room_id]["socket"].send("")
+ return True
+
+ def leave(self, room_id: str) -> bool:
+ """
+ Leave a room with the given ID. If the room is empty after leaving,
+ delete it.
+
+ room_id: The ID of the room to leave.
+
+ Returns: True if the operation is successful, False otherwise.
+ """
+ if not self.rooms.get(room_id) or self.rooms[room_id]["conns"] < 1:
+ return False
+
+ self.rooms[room_id]["conns"] -= 1
+ if self.rooms[room_id]["conns"] == 0:
+ return self._del(room_id)
+
+ return True
+
+ def resize(self, room_id: str, cols: int, rows: int) -> bool:
+ """
+ Resizes the terminal size of the room with the given ID.
+
+ room_id: The ID of the room to resize.
+ cols: The number of columns for the terminal.
+ rows: The number of rows for the terminal.
+
+ Returns: True if the operation is successful, False otherwise.
+ """
+ if not self.rooms.get(room_id):
+ return False
+
+ self.rooms[room_id]["socket"].resize(cols, rows)
+ return True
+
+ def _add(self, room_id: str, namespace: str, room_sock=None) -> bool:
+ """
+ Add a new room with the given ID and socket.
+
+ room_id: The ID of the room to add.
+ namespace: The namespace of the room to add.
+ room_sock: The socket of the room to add.
+
+ Returns: True if the operation is successful, False otherwise.
+ """
+
+ if self.rooms.get(room_id):
+ return False
+
+ if not isinstance(room_sock, InteroperableSSH) or not room_sock.is_active:
+ return False
+
+ self.rooms[room_id] = {
+ "socket": room_sock,
+ "conns": 1,
+ "thread": threading.Thread(target=self._bg_recv, args=(room_id, namespace)),
+ }
+
+ self.rooms[room_id]["thread"].start()
+
+ return True
+
+ def _del(self, room_id: str) -> bool:
+ """
+ Delete a room with the given ID.
+
+ room_id: The ID of the room to delete.
+
+ Returns: True if the operation is successful, False otherwise.
+ """
+ room_info = self.rooms.get(room_id)
+ if not room_info:
+ return False
+
+ try:
+ if room_info["socket"].is_active:
+ room_info["socket"].close()
+ except Exception as error:
+ LOGGER.error("Error while closing socket: %s", error)
+ # self.stop_event.set() # Set the event to signal thread termination
+ # self.rooms[room_id]["thread"].join() # Wait for the thread to finish
+ self.rooms.pop(room_id)
+ return True
+
+ def _bg_recv(self, room_id: str, namespace: str):
+ """
+ Continuously receive data from the room's socket in the background and
+ emit it to the room.
+
+ room_id: The ID of the room to receive data from.
+ """
+ while True:
+ if len(self.rooms) == 0:
+ break
+ is_active = self.rooms[room_id]["socket"].is_active
+
+ if not is_active:
+ break
+ self.sio.emit(
+ "message",
+ self.rooms[room_id]["socket"].recv(),
+ namespace=namespace,
+ to=room_id, # Emit the received data to the room
+ )
diff --git a/zeus/host_manager/view.py b/zeus/host_manager/view.py
index d13868c..f1cc399 100644
--- a/zeus/host_manager/view.py
+++ b/zeus/host_manager/view.py
@@ -35,7 +35,8 @@ from vulcanus.restful.response import BaseResponse
from vulcanus.restful.serialize.validate import validate
from zeus.conf.constant import CERES_HOST_INFO, HOST_TEMPLATE_FILE_CONTENT, HostStatus
from zeus.database.proxy.host import HostProxy
-from zeus.database.table import Host
+from zeus.database.proxy.host_sync_status import HostSyncProxy
+from zeus.database.table import Host, HostSyncStatus
from zeus.function.model import ClientConnectArgs
from zeus.function.verify.host import (
AddHostBatchSchema,
@@ -47,7 +48,8 @@ from zeus.function.verify.host import (
GetHostInfoSchema,
GetHostSchema,
GetHostStatusSchema,
- UpdateHostSchema,
+ UpdateHostSchema, AddHostSyncStatusSchema, DeleteHostSyncStatusSchema, GetHostSyncStatusSchema,
+ DeleteAllHostSyncStatusSchema
)
from zeus.host_manager.ssh import SSH, execute_command_and_parse_its_result, generate_key
@@ -965,3 +967,109 @@ class UpdateHost(BaseResponse):
return self.response(code=state.PARAM_ERROR, message="please update password or authentication key.")
return self.response(callback.update_host_info(params.pop("host_id"), params))
+
+
+class AddHostSyncStatus(BaseResponse):
+ """
+ Interface for add host sync status.
+ Restful API: POST
+ """
+
+ def validate_host_sync_info(self, host_sync_info: dict) -> Tuple[int, dict]:
+ """
+ query host sync status info, validate that the host sync status info is valid
+ return host object
+
+ Args:
+ host_sync_info (dict): e.g
+ {
+ "host_id": 1,
+ "host_ip":"192.168.1.1",
+ "domain_name": "aops",
+ "sync_status": 0
+ }
+
+ Returns:
+ tuple:
+ status code, host sync status object
+ """
+ status, host_sync_status = self.proxy.get_host_sync_status(host_sync_info)
+ if status != state.SUCCEED:
+ return status, HostSyncStatus()
+
+ if host_sync_status is not None:
+ return state.DATA_EXIST, host_sync_status
+ return state.SUCCEED, {}
+
+ @BaseResponse.handle(schema=AddHostSyncStatusSchema, proxy=HostSyncProxy, token=False)
+ def post(self, callback: HostSyncProxy, **params):
+ """
+ add host sync status
+
+ Args:
+ host_id (int): host id
+ host_ip (str): host ip
+ domain_name (str): domain name
+ sync_status (int): sync status
+
+ Returns:
+ dict: response body
+ """
+ self.proxy = callback
+
+ status, host_sync = self.validate_host_sync_info(params)
+ if status != state.SUCCEED:
+ return self.response(code=status)
+
+ status_code = self.proxy.add_host_sync_status(params)
+ return self.response(code=status_code)
+
+
+class DeleteHostSyncStatus(BaseResponse):
+ @BaseResponse.handle(schema=DeleteHostSyncStatusSchema, proxy=HostSyncProxy, token=False)
+ def post(self, callback: HostSyncProxy, **params):
+ """
+ Add host sync status
+
+ Args:
+ host_id (int): host id
+ domain_name (str): domain name
+
+ Returns:
+ dict: response body
+ """
+ status_code = callback.delete_host_sync_status(params)
+ return self.response(code=status_code)
+
+
+class DeleteAllHostSyncStatus(BaseResponse):
+ @BaseResponse.handle(schema=DeleteAllHostSyncStatusSchema, proxy=HostSyncProxy, token=False)
+ def post(self, callback: HostSyncProxy, **params):
+ """
+ Add host sync status
+
+ Args:
+ host_id (int): host id
+ domain_name (str): domain name
+
+ Returns:
+ dict: response body
+ """
+ status_code = callback.delete_all_host_sync_status(params)
+ return self.response(code=status_code)
+
+
+class GetHostSyncStatus(BaseResponse):
+ @BaseResponse.handle(schema=GetHostSyncStatusSchema, proxy=HostSyncProxy, token=False)
+ def post(self, callback: HostSyncProxy, **params):
+ """
+ get host sync status
+
+ Args:
+ domain_name (str): domain name
+ Returns:
+ dict: response body
+ """
+ domain_name = params.get("domain_name")
+ status_code, result = callback.get_domain_host_sync_status(domain_name)
+ return self.response(code=status_code, data=result)
diff --git a/zeus/manage.py b/zeus/manage.py
index 7aab56d..222cd3c 100644
--- a/zeus/manage.py
+++ b/zeus/manage.py
@@ -15,6 +15,7 @@ Time:
Author:
Description: Manager that start aops-zeus
"""
+
try:
from gevent import monkey
@@ -22,12 +23,51 @@ try:
except:
pass
-from vulcanus import init_application
+from vulcanus import init_application, LOGGER
+from vulcanus.timed import TimedTaskManager
from zeus.conf import configuration
from zeus.url import URLS
+from zeus.conf.constant import TIMED_TASK_CONFIG_PATH
+from zeus.cron import task_meta
+from zeus.host_manager.terminal import socketio
+
+
+def _init_timed_task(application):
+ """
+ Initialize and create a scheduled task
+
+ Args:
+ application:flask.Application
+ """
+ timed_task = TimedTaskManager(app=application, config_path=TIMED_TASK_CONFIG_PATH)
+ if not timed_task.timed_config:
+ LOGGER.warning(
+ "If you want to start a scheduled task, please add a timed config."
+ )
+ return
+
+ for task_info in timed_task.timed_config.values():
+ task_type = task_info.get("type")
+ if task_type not in task_meta:
+ continue
+ meta_class = task_meta[task_type]
+ timed_task.add_job(meta_class(timed_config=task_info))
+
+ timed_task.start()
+
-app = init_application(name="zeus", settings=configuration, register_urls=URLS)
+def main():
+ _app = init_application(name="zeus", settings=configuration, register_urls=URLS)
+ socketio.init_app(app=_app)
+ _init_timed_task(application=_app)
+ return _app
+app = main()
if __name__ == "__main__":
- app.run(host=configuration.zeus.get('IP'), port=configuration.zeus.get('PORT'))
+ app.run(host=configuration.zeus.get("IP"), port=configuration.zeus.get("PORT"))
+ socketio.run(
+ app,
+ host=configuration.zeus.get("IP"),
+ port=configuration.zeus.get("PORT"),
+ )
diff --git a/zeus/url.py b/zeus/url.py
index 5f00ef9..099b6b5 100644
--- a/zeus/url.py
+++ b/zeus/url.py
@@ -53,11 +53,21 @@ from zeus.conf.constant import (
SYNC_CONFIG,
OBJECT_FILE_CONFIG,
GET_HOST_STATUS,
+ BATCH_SYNC_CONFIG,
+ ADD_HOST_SYNC_STATUS,
+ DELETE_HOST_SYNC_STATUS,
+ GET_HOST_SYNC_STATUS,
+ CONF_TRACE_MGMT,
+ CONF_TRACE_DATA,
+ CONF_TRACE_QUERY,
+ CONF_TRACE_DELETE,
+ DELETE_ALL_HOST_SYNC_STATUS
)
from zeus.config_manager import view as config_view
from zeus.host_manager import view as host_view
from zeus.metric_manager import view as metric_view
from zeus.vulnerability_manage import view as vulnerability_view
+from zeus.conftrace_manage import view as conf_trace_view
URLS = []
@@ -82,6 +92,10 @@ SPECIFIC_URLS = {
(host_view.GetHostInfo, QUERY_HOST_DETAIL),
(host_view.GetHostCount, GET_HOST_COUNT),
(host_view.GetHostTemplateFile, GET_HOST_TEMPLATE_FILE),
+ (host_view.AddHostSyncStatus, ADD_HOST_SYNC_STATUS),
+ (host_view.DeleteHostSyncStatus, DELETE_HOST_SYNC_STATUS),
+ (host_view.DeleteAllHostSyncStatus, DELETE_ALL_HOST_SYNC_STATUS),
+ (host_view.GetHostSyncStatus, GET_HOST_SYNC_STATUS)
],
"HOST_GROUP_URLS": [
(host_view.AddHostGroup, ADD_GROUP),
@@ -92,6 +106,7 @@ SPECIFIC_URLS = {
(config_view.CollectConfig, COLLECT_CONFIG),
(config_view.SyncConfig, SYNC_CONFIG),
(config_view.ObjectFileConfig, OBJECT_FILE_CONFIG),
+ (config_view.BatchSyncConfig, BATCH_SYNC_CONFIG)
],
'AGENT_URLS': [
(agent_view.AgentPluginInfo, AGENT_PLUGIN_INFO),
@@ -111,6 +126,12 @@ SPECIFIC_URLS = {
(metric_view.QueryHostMetricData, QUERY_METRIC_DATA),
(metric_view.QueryHostMetricList, QUERY_METRIC_LIST),
],
+ 'CONF_TRACE_URLS': [
+ (conf_trace_view.ConfTraceMgmt, CONF_TRACE_MGMT),
+ (conf_trace_view.ConfTraceData, CONF_TRACE_DATA),
+ (conf_trace_view.ConfTraceQuery, CONF_TRACE_QUERY),
+ (conf_trace_view.ConfTraceDataDelete, CONF_TRACE_DELETE),
+ ]
}
for _, value in SPECIFIC_URLS.items():
diff --git a/zeus/utils/__init__.py b/zeus/utils/__init__.py
new file mode 100644
index 0000000..4b94fcf
--- /dev/null
+++ b/zeus/utils/__init__.py
@@ -0,0 +1,18 @@
+#!/usr/bin/python3
+# ******************************************************************************
+# Copyright (C) 2023 isoftstone Technologies Co., Ltd. All rights reserved.
+# licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+# ******************************************************************************/
+"""
+@FileName: __init__.py.py
+@Time: 2024/1/22 11:42
+@Author: JiaoSiMao
+Description:
+"""
diff --git a/zeus/utils/conf_tools.py b/zeus/utils/conf_tools.py
new file mode 100644
index 0000000..4b9d073
--- /dev/null
+++ b/zeus/utils/conf_tools.py
@@ -0,0 +1,55 @@
+#!/usr/bin/python3
+# ******************************************************************************
+# Copyright (C) 2023 isoftstone Technologies Co., Ltd. All rights reserved.
+# licensed under the Mulan PSL v2.
+# You can use this software according to the terms and conditions of the Mulan PSL v2.
+# You may obtain a copy of Mulan PSL v2 at:
+# http://license.coscl.org.cn/MulanPSL2
+# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+# PURPOSE.
+# See the Mulan PSL v2 for more details.
+# ******************************************************************************/
+"""
+@FileName: conf_tools.py
+@Time: 2024/1/22 13:38
+@Author: JiaoSiMao
+Description:
+"""
+import ast
+import configparser
+import os
+
+from zeus.conf import MANAGER_CONFIG_PATH
+from zeus.conf.constant import DOMAIN_LIST_API, EXPECTED_CONFS_API, DOMAIN_CONF_DIFF_API
+
+
+class ConfTools(object):
+
+ @staticmethod
+ def load_url_by_conf():
+ """
+ desc: get the url of sync conf
+ """
+ cf = configparser.ConfigParser()
+ if os.path.exists(MANAGER_CONFIG_PATH):
+ cf.read(MANAGER_CONFIG_PATH, encoding="utf-8")
+ else:
+ parent = os.path.dirname(os.path.realpath(__file__))
+ conf_path = os.path.join(parent, "../config/zeus.ini")
+ cf.read(conf_path, encoding="utf-8")
+
+ update_sync_status_address = ast.literal_eval(cf.get("update_sync_status", "update_sync_status_address"))
+
+ update_sync_status_port = str(cf.get("update_sync_status", "update_sync_status_port"))
+ domain_list_url = "{address}:{port}{api}".format(address=update_sync_status_address, api=DOMAIN_LIST_API,
+ port=update_sync_status_port)
+ expected_confs_url = "{address}:{port}{api}".format(address=update_sync_status_address, api=EXPECTED_CONFS_API,
+ port=update_sync_status_port)
+ domain_conf_diff_url = "{address}:{port}{api}".format(address=update_sync_status_address,
+ api=DOMAIN_CONF_DIFF_API,
+ port=update_sync_status_port)
+
+ url = {"domain_list_url": domain_list_url, "expected_confs_url": expected_confs_url,
+ "domain_conf_diff_url": domain_conf_diff_url}
+ return url