dtm_flow_master
2016-04-05 16:08:22 0 举报
dtm_flow_master是一个用于管理数据流程的工具,它可以帮助用户更好地控制和管理数据流。通过使用dtm_flow_master,用户可以创建、修改和删除数据流程,以及监控数据流程的运行状态。此外,dtm_flow_master还提供了一些高级功能,如错误处理、日志记录和性能优化等,以帮助用户更好地管理和优化他们的数据流程。总之,dtm_flow_master是一个非常实用的工具,它能够帮助用户更好地管理和优化他们的数据流程,从而提高数据处理的效率和准确性。
作者其他创作
大纲/内容
network_socket_pool* network_socket_pool_create()
/* 初始化连接池 */if (NULL == (g_srv-client_socket_pool = network_socket_pool_create())) {if (NULL == (g_srv-server_socket_pool = network_socket_pool_create())) {g_srv-client_socket_pool-srv = g_srv;g_srv-server_socket_pool-srv = g_srv;
return 0;
if (g_srv && g_srv-process_type == WORKER_PROCESS) {g_gracefully_shutdown = 1;
如果有子进程挂了if (epid 0)
if ( pool ) { s = network_socket_pool_get(pool);
Y
这个挂掉的子进程,挂掉的原因是不是因为被master kill的if(proc_info[i].graceful_shutdown == 1)
if ( g_srv ) {g_gracefully_shutdown = g_gracefully_reload = 1;
if (0 == pool-sockets-length)return NULL;
s-srv = pool-srv; return s;
int set_fd_flags(int fd)
if (0 != record_path()) {
p_cur_worker = &cur_worker;p_accepted_worker = &accepted_worker;dtm_shmtx_enable(&dtm_accept_mutex);
return s;
p_cur_worker-remove(pid); p_accepted_worker-remove(pid);
byte_array* byte_array_sized_new(size_t capacity){
g_gracefully_loadconf = 0; config_t *tmp_config = new config_t();if (0 != tmp_config-load_config_from_file(tmp_config-basic_config.config_file_name)) {delete g_srv-config;g_srv-config = tmp_config;
init_signal_handlers();
//初始化共享内存 if (0 != init_dtm_shm())
//如果收到reload/SIGWINCH信号就重启if (g_gracefully_reload == 1 && ! reload_dtm())
if( NULL == ( arr = (byte_array *)malloc(sizeof(byte_array)))){
if (0 network_server_start()) {
//遍历所有自己的子进程,看看是哪个for (unsigned int i = 0; i process_pool_size; i++){if (proc_info[i].pid != -1 && epid == proc_info[i].pid){
//如果是新进程,则exec新program//如果是老进程,就函数返回if (pid == 0)
/* 加载配置 */ if (NULL == (g_srv-config = new config_t())) {
if (change_cwd_to_install_path() 0) {
network_socket* create_network_socket(int is_client)
network_socket *s = NULL;s = (network_socket *)g_queue_pop_head(pool-sockets);s-use_times++;
void init_signal_handlers()
inline int get_host_ip()
if ( g_srv-config-basic_config.max_threads MAX_NUM_OF_PROCESS) {
N
if (NULL == g_srv) {
if ( NULL == s) { s = create_network_socket(is_client);
ab_config_file
while (g_gracefully_shutdown == 0)
static void dtm_unlock_mutexes(int pid)
logger-werror_log_inode = st.st_ino;logger-werror_fd = fd;logger-log_inode = st.st_ino;logger-fd = fd;
void notify_gracefully_loadconf()
一个主循环结束
//创建的时候,默认设置会话和事务均为自动提交 s-is_auto_commit = 1; s-bind_server = 0; s-is_session_auto_commit = 1;
if(NULL == (arr-data = (unsigned char *)malloc(capacity))){
if ( g_srv ) {g_gracefully_shutdown = 1;
//判断是不是被某种信号干掉的,并打印日志if (WIFSIGNALED(statloc))
//将当前所有工作中的worker标记为“即将关闭”for (unsigned int i = 0; i process_pool_size; i++){if (proc_info[i].pid != -1&& proc_info[i].process_type == WORKER_PROCESS&& proc_info[i].graceful_shutdown == 0){/* 标注为即将优雅退出的进程 */proc_info[i].graceful_shutdown = 1;
network_server* create_network_server(char *config_file_name)
close(s-fd);s-fd = g_reload_socket;
int record_path()
void notify_gracefully_reload()
//如果cur为空,将accpted与cur互换 if (p_cur_worker-empty()){list* temp_worker_ptr;temp_worker_ptr = p_cur_worker;p_cur_worker = p_accepted_worker;p_accepted_worker = temp_worker_ptr;
return true;
void del_accept_worker(unsigned int pid)
int reset_client_attr(network_client_attr *client_attr)
int network_server_start()
init_accept_manager();
if (0 != get_host_ip()) {
s-send_buf = byte_array_sized_new(SEND_BUF_DEFAULT_SIZE);s-self_buf = byte_array_sized_new(SELF_BUF_DEFAULT_SIZE);s-result.fields_array = g_ptr_array_new();s-result.rows_array = g_ptr_array_new();
network_client_attr *client_attr = (network_client_attr *)s-other_attr;client_attr-cache_cmds = g_ptr_array_new();client_attr-server_array = g_ptr_array_new();client_attr-pvi = new view_info();client_attr-pvs = new view_for_save(\"\");client_attr-p_vec_last_query_servers = new vector();reset_client_attr(client_attr);
if (1 == g_gracefully_loadconf) {
dtm_unlock_mutexes(epid);
network_socket* network_socket_pool_get(network_socket_pool *pool)
del_accept_worker(epid);
/* 从isis产生的cache文件加载详细配置 */g_srv-config-check_config_update(); //初始化basic_config.conf_modify_time if ( 0 != g_srv-config-load_config_from_file(g_srv-config-basic_config.config_file_name) ) {
p_cur_worker-push_back(pid);
int dtm_shm_alloc(dtm_shm_t *shm)
static void dtm_shmtx_wakeup(dtm_shmtx_t *mtx)
//没问题,回收标识这个进程的结构,不重起一个proc_info[i].pid = -1;proc_info[i].process_type = (process_type_t)-1;proc_info[i].process_num = 0;proc_info[i].graceful_shutdown = 0;
if (is_client)
if (g_reload_socket != -1)
g_srv-log_path[0] = '\\0';g_srv-conf_path[0] = '\\0';/** 初始化随机数种子 */srand(time(NULL) ^ getpid());return g_srv;
if (g_srv) {g_gracefully_loadconf = 1;
void notify_gracefully_restart()
void init_accept_manager()
/* 初始化: 加载配置、初始化日志、初始化连接池 */ if ( NULL == create_network_server(ab_config_file) )
pid_t pid = fork();
void add_accept_worker(unsigned int pid)
//无论如何,因为不是被master主动kill的(因为在master里面没有标识为shutdow),所以要重启这个进程//现在判断是不是worker子进程if (proc_info[i].process_type == WORKER_PROCESS)
static int init_dtm_shm()
void notify_gracefully_shutdown()
bool reload_dtm()
unsigned int pnum = 0;//generate child process. unsigned int process_pool_size = 2 * MAX_NUM_OF_PROCESS;process_info_t proc_info[process_pool_size];for (pnum = 0; pnum process_pool_size; pnum ++){proc_info[pnum].pid = -1;proc_info[pnum].process_num = 0;proc_info[pnum].process_type = (process_type_t)-1;proc_info[pnum].graceful_shutdown = 0;
void accept_worker_manager()
dtm_accept_mutex_ptr = shm.addr; dtm_log_mutex_ptr = shm.addr + 1 * cl; dtm_binlog_mutex_ptr = shm.addr + 2 * cl;
network_server_attr *server_attr = (network_server_attr *)s-other_attr;server_attr-last_send_query = new string();server_attr-p_vec_self_buf = new vector();server_attr-p_vec_multi_query = new vector();reset_server_attr(server_attr);
//如果两个队列都空,则返回if (p_cur_worker-empty() && p_accepted_worker-empty())return;
int reset_server_attr(network_server_attr *server_attr)
dtm_shm_t shm; shm.size = size; if (dtm_shm_alloc(&shm) != 0) {
0 条评论
回复 删除
下一页