sysSentry/add-pyxalarm-and-pySentryNotify-add-multi-users-supp.patch

737 lines
25 KiB
Diff
Raw Normal View History

From 600e20fb50f15b23252b556ca1fca806339cadfe Mon Sep 17 00:00:00 2001
From: PshySimon <caixiaomeng2@huawei.com>
Date: Thu, 26 Sep 2024 16:12:25 +0800
Subject: [PATCH] add pyxalarm and pySentryNotify, add multi users support for
xalarmd and adapt libxalarm
fix xalarm_getdesc function return not a valid str problem
xalarm add alarm msg length to 8192
fix bugs of xalarmd
fix xalarm non-uniform log formatting
---
src/libso/xalarm/register_xalarm.c | 54 +++-----
src/libso/xalarm/register_xalarm.h | 10 +-
src/python/xalarm/register_xalarm.py | 196 +++++++++++++++++++++++++++
src/python/xalarm/sentry_notify.py | 72 ++++++++++
src/python/xalarm/xalarm_api.py | 18 ++-
src/python/xalarm/xalarm_server.py | 50 ++++++-
src/python/xalarm/xalarm_transfer.py | 104 ++++++++++++--
7 files changed, 446 insertions(+), 58 deletions(-)
create mode 100644 src/python/xalarm/register_xalarm.py
create mode 100644 src/python/xalarm/sentry_notify.py
diff --git a/src/libso/xalarm/register_xalarm.c b/src/libso/xalarm/register_xalarm.c
index 9eeed74..aa0645d 100644
--- a/src/libso/xalarm/register_xalarm.c
+++ b/src/libso/xalarm/register_xalarm.c
@@ -35,7 +35,7 @@
#define ALARM_SOCKET_PERMISSION 0700
#define TIME_UNIT_MILLISECONDS 1000
-#define MAX_PARAS_LEN 511
+#define MAX_PARAS_LEN 8191
#define MIN_ALARM_ID 1001
#define MAX_ALARM_ID (MIN_ALARM_ID + MAX_NUM_OF_ALARM_ID - 1)
@@ -91,7 +91,7 @@ static int create_unix_socket(const char *path)
return -1;
}
- fd = socket(AF_UNIX, SOCK_DGRAM, 0);
+ fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0) {
printf("socket failed:%s\n", strerror(errno));
return -1;
@@ -103,14 +103,6 @@ static int create_unix_socket(const char *path)
goto release_socket;
}
- if (access(PATH_REG_ALARM, F_OK) == 0) {
- ret = unlink(PATH_REG_ALARM);
- if (ret != 0) {
- printf("unlink register socket file failed\n");
- goto release_socket;
- }
- }
-
if (access(DIR_XALARM, F_OK) == -1) {
if (mkdir(DIR_XALARM, ALARM_DIR_PERMISSION) == -1) {
printf("mkdir %s failed\n", DIR_XALARM);
@@ -120,32 +112,22 @@ static int create_unix_socket(const char *path)
if (memset(&alarm_addr, 0, sizeof(alarm_addr)) == NULL) {
printf("create_unix_socket: memset alarm_addr failed, ret: %d\n", ret);
- goto remove_dir;
+ goto release_socket;
}
alarm_addr.sun_family = AF_UNIX;
strncpy(alarm_addr.sun_path, path, sizeof(alarm_addr.sun_path) - 1);
- if (bind(fd, (struct sockaddr *)&alarm_addr, sizeof(alarm_addr.sun_family) + strlen(alarm_addr.sun_path)) < 0) {
- printf("bind socket failed:%s\n", strerror(errno));
- goto remove_dir;
+ if (connect(fd, (struct sockaddr*)&alarm_addr, sizeof(alarm_addr)) == -1) {
+ printf("create_unix_socket: connect alarm_addr failed, ret: %d\n", ret);
+ goto release_socket;
}
if (chmod(path, ALARM_SOCKET_PERMISSION) < 0) {
printf("chmod %s failed: %s\n", path, strerror(errno));
- goto unlink_sockfile;
+ goto release_socket;
}
return fd;
-unlink_sockfile:
- ret = unlink(PATH_REG_ALARM);
- if (ret != 0) {
- printf("unlink register socket file failed\n");
- }
-remove_dir:
- ret = rmdir(DIR_XALARM);
- if (ret != 0) {
- printf("rmdir %s failed: %s\n", path, strerror(errno));
- }
release_socket:
(void)close(fd);
@@ -174,6 +156,10 @@ static void *alarm_recv(void *arg)
continue;
}
printf("recv error len:%d errno:%d\n", recvlen, errno);
+ } else if (recvlen == 0) {
+ printf("connection closed by xalarmd, maybe connections reach max num or service stopped.\n");
+ g_register_info.thread_should_stop = 1;
+ break;
}
}
return NULL;
@@ -229,6 +215,10 @@ bool xalarm_Upgrade(struct alarm_subscription_info id_filter, int client_id)
printf("%s: invalid args\n", __func__);
return false;
}
+ if (g_register_info.thread_should_stop) {
+ printf("%s: upgrade failed, alarm thread has stopped\n", __func__);
+ return false;
+ }
set_alarm_id(id_filter);
return true;
@@ -271,8 +261,6 @@ int xalarm_Register(alarm_callback_func callback, struct alarm_subscription_info
void xalarm_UnRegister(int client_id)
{
- int ret;
-
if (!g_register_info.is_registered) {
printf("%s: alarm has not registered\n", __func__);
return;
@@ -292,10 +280,6 @@ void xalarm_UnRegister(int client_id)
if (g_register_info.register_fd != -1) {
(void)close(g_register_info.register_fd);
g_register_info.register_fd = -1;
- ret = unlink(PATH_REG_ALARM);
- if (ret != 0) {
- printf("%s: unlink register socket file failed\n", __func__);
- }
}
memset(g_register_info.alarm_enable_bitmap, 0, MAX_NUM_OF_ALARM_ID * sizeof(char));
@@ -357,7 +341,7 @@ int xalarm_Report(unsigned short usAlarmId, unsigned char ucAlarmLevel,
struct sockaddr_un alarm_addr;
if ((usAlarmId < MIN_ALARM_ID || usAlarmId > MAX_ALARM_ID) ||
- (ucAlarmLevel < ALARM_LEVEL_FATAL || ucAlarmLevel > ALARM_LEVEL_DEBUG) ||
+ (ucAlarmLevel < MINOR_ALM || ucAlarmLevel > CRITICAL_ALM) ||
(ucAlarmType < ALARM_TYPE_OCCUR || ucAlarmType > ALARM_TYPE_RECOVER)) {
fprintf(stderr, "%s: alarm info invalid\n", __func__);
return -1;
@@ -368,6 +352,11 @@ int xalarm_Report(unsigned short usAlarmId, unsigned char ucAlarmLevel,
return -1;
}
+ if (pucParas == NULL || (int)strlen(pucParas) > MAX_PARAS_LEN) {
+ fprintf(stderr, "%s: alarm info invalid\n", __func__);
+ return -1;
+ }
+
if (memset(&info, 0, sizeof(struct alarm_info)) == NULL) {
fprintf(stderr, "%s: memset info failed, ret: %d\n", __func__, ret);
return -1;
@@ -671,3 +660,4 @@ int report_result(const char *task_name, enum RESULT_LEVEL result_level, const c
return RETURE_CODE_SUCCESS;
}
+
diff --git a/src/libso/xalarm/register_xalarm.h b/src/libso/xalarm/register_xalarm.h
index 1f26c6a..dcf4f03 100644
--- a/src/libso/xalarm/register_xalarm.h
+++ b/src/libso/xalarm/register_xalarm.h
@@ -11,7 +11,7 @@
#include <sys/time.h>
#include <stdbool.h>
-#define ALARM_INFO_MAX_PARAS_LEN 512
+#define ALARM_INFO_MAX_PARAS_LEN 8192
#define MAX_STRERROR_SIZE 1024
#define MAX_ALARM_TYEPS 1024
#define MIN_ALARM_ID 1001
@@ -19,11 +19,9 @@
#define MEMORY_ALARM_ID 1001
-#define ALARM_LEVEL_FATAL 1
-#define ALARM_LEVEL_ERROR 2
-#define ALARM_LEVEL_WARNING 3
-#define ALARM_LEVEL_INFO 4
-#define ALARM_LEVEL_DEBUG 5
+#define MINOR_ALM 1
+#define MAJOR_ALM 2
+#define CRITICAL_ALM 3
#define ALARM_TYPE_OCCUR 1
#define ALARM_TYPE_RECOVER 2
diff --git a/src/python/xalarm/register_xalarm.py b/src/python/xalarm/register_xalarm.py
new file mode 100644
index 0000000..6ac1eb7
--- /dev/null
+++ b/src/python/xalarm/register_xalarm.py
@@ -0,0 +1,196 @@
+import os
+import sys
+import socket
+import logging
+import threading
+import time
+import fcntl
+import inspect
+from struct import error as StructParseError
+
+from .xalarm_api import Xalarm, alarm_bin2stu
+
+
+ALARM_REPORT_LEN = 8216
+MAX_NUM_OF_ALARM_ID=128
+MIN_ALARM_ID = 1001
+MAX_ALARM_ID = (MIN_ALARM_ID + MAX_NUM_OF_ALARM_ID - 1)
+DIR_XALARM = "/var/run/xalarm"
+PATH_REG_ALARM = "/var/run/xalarm/alarm"
+PATH_REPORT_ALARM = "/var/run/xalarm/report"
+ALARM_DIR_PERMISSION = 0o0750
+ALARM_REG_SOCK_PERMISSION = 0o0700
+ALARM_SOCKET_PERMISSION = 0o0700
+TIME_UNIT_MILLISECONDS = 1000
+ALARM_REGISTER_INFO = None
+
+
+class AlarmRegister:
+ def __init__(self, id_filter: list, callback: callable):
+ self.id_filter = id_filter
+ self.callback = callback
+ self.socket = self.create_unix_socket()
+ self.is_registered = True
+ self.thread = threading.Thread(target=self.alarm_recv)
+ self.thread_should_stop = False
+
+ def check_params(self) -> bool:
+ if (len(self.id_filter) != MAX_NUM_OF_ALARM_ID):
+ sys.stderr.write("check_params: invalid param id_filter\n")
+ return False
+
+ sig = inspect.signature(self.callback)
+ if len(sig.parameters) != 1:
+ sys.stderr.write("check_params: invalid param callback\n")
+ return False
+
+ if self.socket is None:
+ sys.stderr.write("check_params: socket create failed\n")
+ return False
+ return True
+
+ def set_id_filter(self, id_filter: list) -> bool:
+ if (len(id_filter) > MAX_NUM_OF_ALARM_ID):
+ sys.stderr.write("set_id_filter: invalid param id_filter\n")
+ return False
+ self.id_filter = id_filter
+
+ def id_is_registered(self, alarm_id) -> bool:
+ if alarm_id < MIN_ALARM_ID or alarm_id > MAX_ALARM_ID:
+ return False
+ return self.id_filter[alarm_id - MIN_ALARM_ID]
+
+ def put_alarm_info(self, alarm_info: Xalarm) -> None:
+ if not self.callback or not alarm_info:
+ return
+ if not self.id_is_registered(alarm_info.alarm_id):
+ return
+ self.callback(alarm_info)
+
+ def create_unix_socket(self) -> socket.socket:
+ try:
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.setblocking(False)
+
+ if not os.access(DIR_XALARM, os.F_OK):
+ os.makedirs(DIR_XALARM)
+ os.chmod(DIR_XALARM, ALARM_DIR_PERMISSION)
+
+ sock.connect(PATH_REG_ALARM)
+ return sock
+ except (IOError, OSError, FileNotFoundError) as e:
+ sock.close()
+ sys.stderr.write(f"create_unix_socket: create socket error:{e}\n")
+ return None
+
+ def alarm_recv(self):
+ while not self.thread_should_stop:
+ try:
+ data = self.socket.recv(ALARM_REPORT_LEN)
+ if not data:
+ sys.stderr.write("connection closed by xalarmd, maybe connections reach max num or service stopped.\n")
+ self.thread_should_stop = True
+ break
+ if len(data) != ALARM_REPORT_LEN:
+ sys.stderr.write(f"server receive report msg length wrong {len(data)}\n")
+ continue
+
+ alarm_info = alarm_bin2stu(data)
+ self.put_alarm_info(alarm_info)
+ except (BlockingIOError) as e:
+ time.sleep(0.1)
+ except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError):
+ sys.stderr.write("Connection closed by the server.\n")
+ self.thread_should_stop = True
+ except (ValueError, StructParseError, InterruptedError) as e:
+ sys.stderr.write(f"{e}\n")
+ except Exception as e:
+ sys.stderr.write(f"{e}\n")
+ self.thread_should_stop = True
+
+ def start_thread(self) -> None:
+ self.thread.daemon = True
+ self.thread.start()
+
+ def stop_thread(self) -> None:
+ self.thread_should_stop = True
+ self.thread.join()
+
+
+def xalarm_register(callback: callable, id_filter: list) -> int:
+ global ALARM_REGISTER_INFO
+
+ if ALARM_REGISTER_INFO is not None:
+ sys.stderr.write("xalarm_register: alarm has registered\n")
+ return -1
+
+ ALARM_REGISTER_INFO = AlarmRegister(id_filter, callback)
+ if not ALARM_REGISTER_INFO.check_params():
+ return -1
+
+ ALARM_REGISTER_INFO.start_thread()
+
+ return 0
+
+
+def xalarm_unregister(clientId: int) -> None:
+ global ALARM_REGISTER_INFO
+ if clientId < 0:
+ sys.stderr.write("xalarm_unregister: invalid client\n")
+ return
+
+ if ALARM_REGISTER_INFO is None:
+ sys.stderr.write("xalarm_unregister: alarm has not registered\n")
+ return
+
+ ALARM_REGISTER_INFO.stop_thread()
+ ALARM_REGISTER_INFO = None
+
+
+def xalarm_upgrade(clientId: int, id_filter: list) -> bool:
+ global ALARM_REGISTER_INFO
+ if clientId < 0:
+ sys.stderr.write("xalarm_upgrade: invalid client\n")
+ return False
+ if ALARM_REGISTER_INFO is None:
+ sys.stderr.write("xalarm_upgrade: alarm has not registered\n")
+ return False
+ if ALARM_REGISTER_INFO.thread_should_stop:
+ sys.stderr.write("xalarm_upgrade: upgrade failed, alarm thread has stopped\n")
+ return False
+ ALARM_REGISTER_INFO.id_filter = id_filter
+ return True
+
+
+def xalarm_getid(alarm_info: Xalarm) -> int:
+ if not alarm_info:
+ return 0
+ return alarm_info.alarm_id
+
+
+def xalarm_getlevel(alarm_info: Xalarm) -> int:
+ if not alarm_info:
+ return 0
+ return alarm_info.alarm_level
+
+
+def xalarm_gettype(alarm_info: Xalarm) -> int:
+ if not alarm_info:
+ return 0
+ return alarm_info.alarm_type
+
+
+def xalarm_gettime(alarm_info: Xalarm) -> int:
+ if not alarm_info:
+ return 0
+ return alarm_info.timetamp.tv_sec * TIME_UNIT_MILLISECONDS + alarm_info.timetamp.tv_usec / TIME_UNIT_MILLISECONDS
+
+def xalarm_getdesc(alarm_info: Xalarm) -> str:
+ if not alarm_info:
+ return None
+ try:
+ desc_str = alarm_info.msg1.rstrip(b'\x00').decode('utf-8')
+ except UnicodeError:
+ desc_str = None
+ return desc_str
+
diff --git a/src/python/xalarm/sentry_notify.py b/src/python/xalarm/sentry_notify.py
new file mode 100644
index 0000000..1e3fa76
--- /dev/null
+++ b/src/python/xalarm/sentry_notify.py
@@ -0,0 +1,72 @@
+import os
+import sys
+import time
+import socket
+import logging
+from struct import error as StructParseError
+
+from .xalarm_api import alarm_stu2bin, Xalarm
+
+MAX_NUM_OF_ALARM_ID = 128
+MIN_ALARM_ID = 1001
+MAX_ALARM_ID = (MIN_ALARM_ID + MAX_NUM_OF_ALARM_ID - 1)
+
+MINOR_ALM = 1
+MAJOR_ALM = 2
+CRITICAL_ALM = 3
+
+ALARM_TYPE_OCCUR = 1
+ALARM_TYPE_RECOVER = 2
+
+MAX_PUC_PARAS_LEN = 8192
+
+DIR_XALARM = "/var/run/xalarm"
+PATH_REPORT_ALARM = "/var/run/xalarm/report"
+ALARM_DIR_PERMISSION = 0o750
+ALARM_SOCKET_PERMISSION = 0o700
+
+
+def check_params(alarm_id, alarm_level, alarm_type, puc_paras) -> bool:
+ if not os.path.exists(DIR_XALARM):
+ logging.error(f"check_params: {DIR_XALARM} not exist, failed")
+ return False
+
+ if not os.path.exists(PATH_REPORT_ALARM):
+ logging.error(f"check_params: {PATH_REPORT_ALARM} not exist, failed")
+ return False
+
+ if (alarm_id < MIN_ALARM_ID or alarm_id > MAX_ALARM_ID or
+ alarm_level < MINOR_ALM or alarm_level > CRITICAL_ALM or
+ alarm_type < ALARM_TYPE_OCCUR or alarm_type > ALARM_TYPE_RECOVER):
+ logging.error("check_params: alarm info invalid")
+ return False
+
+ if len(puc_paras) >= MAX_PUC_PARAS_LEN:
+ logging.error(f"check_params: alarm msg should be less than {MAX_PUC_PARAS_LEN}")
+ return False
+
+ return True
+
+def xalarm_report(alarm_id, alarm_level, alarm_type, puc_paras) -> bool:
+ if not check_params(alarm_id, alarm_level, alarm_type, puc_paras):
+ return False
+
+ try:
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+
+ current_time = time.time()
+ current_time_seconds = int(current_time)
+ current_microseconds = int((current_time - current_time_seconds) * 1_000_000)
+ alarm_info = Xalarm(alarm_id, alarm_type, alarm_level,
+ current_time_seconds, current_microseconds, puc_paras)
+
+ sock.sendto(alarm_stu2bin(alarm_info), PATH_REPORT_ALARM)
+ except (FileNotFoundError, StructParseError, socket.error, OSError, UnicodeError) as e:
+ logging.error(f"error occurs when sending msg.")
+ return False
+ finally:
+ sock.close()
+
+ return True
+
+
diff --git a/src/python/xalarm/xalarm_api.py b/src/python/xalarm/xalarm_api.py
index 94d7638..c365019 100644
--- a/src/python/xalarm/xalarm_api.py
+++ b/src/python/xalarm/xalarm_api.py
@@ -23,6 +23,7 @@ ALARM_LEVELS = (1, 2, 3, 4, 5)
ALARM_SOCK_PATH = "/var/run/xalarm/report"
MIN_ALARM_ID = 1001
MAX_ALARM_ID = 1128
+MAX_MSG_LEN = 8192
@dataclasses.dataclass
@@ -98,7 +99,7 @@ class Xalarm:
"""msg1 setter
"""
if len(msg) > 512:
- raise ValueError("msg1 length must below 255")
+ raise ValueError("msg1 length must below 512")
self._msg1 = msg
@@ -116,3 +117,18 @@ def alarm_bin2stu(bin_data):
alarm_info.msg1 = struct_data[5]
return alarm_info
+
+
+def alarm_stu2bin(alarm_info: Xalarm):
+ alarm_msg = alarm_info.msg1
+ padding_length = MAX_MSG_LEN - len(alarm_msg)
+ if padding_length > 0:
+ alarm_msg = alarm_msg + ('\x00' * padding_length)
+ return struct.pack(
+ f'@HBBll{MAX_MSG_LEN}s',
+ alarm_info.alarm_id,
+ alarm_info.alarm_level,
+ alarm_info.alarm_type,
+ alarm_info.timetamp.tv_sec,
+ alarm_info.timetamp.tv_usec,
+ alarm_msg.encode('utf-8'))
diff --git a/src/python/xalarm/xalarm_server.py b/src/python/xalarm/xalarm_server.py
index 84db273..f90a0e2 100644
--- a/src/python/xalarm/xalarm_server.py
+++ b/src/python/xalarm/xalarm_server.py
@@ -17,16 +17,25 @@ Create: 2023-11-02
import socket
import os
import logging
+import select
+import threading
from struct import error as StructParseError
from .xalarm_api import alarm_bin2stu
-from .xalarm_transfer import check_filter, transmit_alarm
+from .xalarm_transfer import (
+ check_filter,
+ transmit_alarm,
+ wait_for_connection,
+ peroid_task_to_cleanup_connections
+)
ALARM_DIR = "/var/run/xalarm"
+USER_RECV_SOCK = "/var/run/xalarm/alarm"
SOCK_FILE = "/var/run/xalarm/report"
-ALARM_REPORT_LEN = 536
+ALARM_REPORT_LEN = 8216
ALARM_DIR_PERMISSION = 0o750
+ALARM_LISTEN_QUEUE_LEN = 5
def clear_sock_path():
@@ -37,6 +46,8 @@ def clear_sock_path():
os.chmod(ALARM_DIR, ALARM_DIR_PERMISSION)
if os.path.exists(SOCK_FILE):
os.unlink(SOCK_FILE)
+ if os.path.exists(USER_RECV_SOCK):
+ os.unlink(USER_RECV_SOCK)
def server_loop(alarm_config):
@@ -49,6 +60,25 @@ def server_loop(alarm_config):
sock.bind(SOCK_FILE)
os.chmod(SOCK_FILE, 0o600)
+ alarm_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ alarm_sock.bind(USER_RECV_SOCK)
+ os.chmod(USER_RECV_SOCK, 0o600)
+ alarm_sock.listen(ALARM_LISTEN_QUEUE_LEN)
+ alarm_sock.setblocking(False)
+
+ epoll = select.epoll()
+ epoll.register(alarm_sock.fileno(), select.EPOLLIN)
+ fd_to_socket = {alarm_sock.fileno(): alarm_sock,}
+ thread_should_stop = False
+
+ conn_thread = threading.Thread(target=wait_for_connection, args=(alarm_sock, epoll, fd_to_socket, thread_should_stop))
+ conn_thread.daemon = True
+ conn_thread.start()
+
+ cleanup_thread = threading.Thread(target=peroid_task_to_cleanup_connections, args=(alarm_sock, epoll, fd_to_socket, thread_should_stop))
+ cleanup_thread.daemon = True
+ cleanup_thread.start()
+
while True:
try:
data, _ = sock.recvfrom(ALARM_REPORT_LEN)
@@ -58,14 +88,22 @@ def server_loop(alarm_config):
logging.debug("server receive report msg length wrong %d",
len(data))
continue
-
alarm_info = alarm_bin2stu(data)
logging.debug("server bin2stu msg")
if not check_filter(alarm_info, alarm_config):
continue
+ transmit_alarm(alarm_sock, epoll, fd_to_socket, data)
+ except Exception as e:
+ logging.error(f"Error server:{e}")
+
+ thread_should_stop = True
+ conn_thread.join()
+ cleanup_thread.join()
- transmit_alarm(data)
- except (ValueError, StructParseError):
- pass
+ epoll.unregister(alarm_sock.fileno())
+ epoll.close()
+ alarm_sock.close()
+ os.unlink(USER_RECV_SOCK)
sock.close()
+
diff --git a/src/python/xalarm/xalarm_transfer.py b/src/python/xalarm/xalarm_transfer.py
index b590b43..010d2ee 100644
--- a/src/python/xalarm/xalarm_transfer.py
+++ b/src/python/xalarm/xalarm_transfer.py
@@ -16,10 +16,14 @@ Create: 2023-11-02
import socket
import logging
+import select
+from time import sleep
-USER_RECV_SOCK = "/var/run/xalarm/alarm"
MIN_ID_NUMBER = 1001
MAX_ID_NUMBER = 1128
+MAX_CONNECTION_NUM = 100
+TEST_CONNECT_BUFFER_SIZE = 32
+PEROID_SCANN_TIME = 60
def check_filter(alarm_info, alarm_filter):
@@ -35,16 +39,90 @@ def check_filter(alarm_info, alarm_filter):
return True
-def transmit_alarm(bin_data):
- """forward alarm message
+def cleanup_closed_connections(server_sock, epoll, fd_to_socket):
"""
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
- try:
- sock.sendto(bin_data, USER_RECV_SOCK)
- logging.debug("transfer alarm success")
- except ConnectionRefusedError:
- logging.debug("transfer sendto failed")
- except FileNotFoundError:
- logging.debug("transfer sendto failed")
- finally:
- sock.close()
+ clean invalid client socket connections saved in 'fd_to_socket'
+ :param server_sock: server socket instance of alarm
+ :param epoll: epoll instance, used to unregister invalid client connections
+ :param fd_to_socket: dict instance, used to hold client connections and server connections
+ """
+ to_remove = []
+ for fileno, connection in fd_to_socket.items():
+ if connection is server_sock:
+ continue
+ try:
+ # test whether connection still alive, use MSG_DONTWAIT to avoid blocking thread
+ # use MSG_PEEK to avoid consuming buffer data
+ data = connection.recv(TEST_CONNECT_BUFFER_SIZE, socket.MSG_DONTWAIT | socket.MSG_PEEK)
+ if not data:
+ to_remove.append(fileno)
+ except BlockingIOError:
+ pass
+ except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError):
+ to_remove.append(fileno)
+
+ for fileno in to_remove:
+ epoll.unregister(fileno)
+ fd_to_socket[fileno].close()
+ del fd_to_socket[fileno]
+ logging.info(f"cleaned up connection {fileno} for client lost connection.")
+
+
+def peroid_task_to_cleanup_connections(server_sock, epoll, fd_to_socket, thread_should_stop):
+ while not thread_should_stop:
+ sleep(PEROID_SCANN_TIME)
+ cleanup_closed_connections(server_sock, epoll, fd_to_socket)
+
+
+def wait_for_connection(server_sock, epoll, fd_to_socket, thread_should_stop):
+ """
+ thread function for catch and save client connection
+ :param server_sock: server socket instance of alarm
+ :param epoll: epoll instance, used to unregister invalid client connections
+ :param fd_to_socket: dict instance, used to hold client connections and server connections
+ :param thread_should_stop: bool instance
+ """
+ while not thread_should_stop:
+ try:
+ events = epoll.poll(1)
+
+ for fileno, event in events:
+ if fileno == server_sock.fileno():
+ connection, client_address = server_sock.accept()
+ # if reach max connection, cleanup closed connections
+ if len(fd_to_socket) - 1 >= MAX_CONNECTION_NUM:
+ cleanup_closed_connections(server_sock, epoll, fd_to_socket)
+ # if connections still reach max num, close this connection automatically
+ if len(fd_to_socket) - 1 >= MAX_CONNECTION_NUM:
+ logging.info(f"connection reach max num of {MAX_CONNECTION_NUM}, closed current connection!")
+ connection.close()
+ continue
+ epoll.register(connection.fileno(), select.EPOLLOUT)
+ fd_to_socket[connection.fileno()] = connection
+ except socket.error as e:
+ logging.debug("socket error, reason is %s", e)
+ break
+ except (KeyError, OSError, ValueError) as e:
+ logging.debug("wait for connection failed %s", e)
+
+
+def transmit_alarm(server_sock, epoll, fd_to_socket, bin_data):
+ """
+ this function is to broadcast alarm data to client, if fail to send data, remove connections held by fd_to_socket
+ :param server_sock: server socket instance of alarm
+ :param epoll: epoll instance, used to unregister invalid client connections
+ :param fd_to_socket: dict instance, used to hold client connections and server connections
+ :param bin_data: binary instance, alarm info data in C-style struct format defined in xalarm_api.py
+ """
+ to_remove = []
+ for fileno, connection in fd_to_socket.items():
+ if connection is not server_sock:
+ try:
+ connection.sendall(bin_data)
+ except (BrokenPipeError, ConnectionResetError):
+ to_remove.append(fileno)
+ for fileno in to_remove:
+ epoll.unregister(fileno)
+ fd_to_socket[fileno].close()
+ del fd_to_socket[fileno]
+ logging.info(f"cleaned up connection {fileno} for client lost connection.")
--
2.27.0