dtm_flow_master
2016-04-05 16:08:22 0 举报
dtm_flow_master是一个用于管理数据流程的系统。它能够对数据流进行监控、控制和优化,以确保数据的高效传输和处理。通过使用dtm_flow_master,用户可以轻松地创建和管理复杂的数据流程,包括数据采集、转换、存储和分析等环节。此外,dtm_flow_master还提供了丰富的可视化工具,帮助用户直观地了解数据流程的运行状态和性能指标。总之,dtm_flow_master是一个强大而灵活的数据流程管理工具,能够帮助用户更好地应对日益复杂的数据处理需求。
作者其他创作
大纲/内容
network_socket_pool* network_socket_pool_create()
/* 初始化连接池 */if (NULL == (g_srv-client_socket_pool = network_socket_pool_create())) {if (NULL == (g_srv-server_socket_pool = network_socket_pool_create())) {g_srv-client_socket_pool-srv = g_srv;g_srv-server_socket_pool-srv = g_srv;
void notify_gracefully_reload()
int record_path()
if (g_srv && g_srv-process_type == WORKER_PROCESS) {g_gracefully_shutdown = 1;
int reset_client_attr(network_client_attr *client_attr)
int network_server_start()
if ( pool ) { s = network_socket_pool_get(pool);
if (0 == pool-sockets-length)return NULL;
if ( g_srv ) {g_gracefully_shutdown = g_gracefully_reload = 1;
if (0 != get_host_ip()) {
s-send_buf = byte_array_sized_new(SEND_BUF_DEFAULT_SIZE);s-self_buf = byte_array_sized_new(SELF_BUF_DEFAULT_SIZE);s-result.fields_array = g_ptr_array_new();s-result.rows_array = g_ptr_array_new();
s-srv = pool-srv; return s;
if (0 != record_path()) {
byte_array* byte_array_sized_new(size_t capacity){
network_client_attr *client_attr = (network_client_attr *)s-other_attr;client_attr-cache_cmds = g_ptr_array_new();client_attr-server_array = g_ptr_array_new();client_attr-pvi = new view_info();client_attr-pvs = new view_for_save(\"\");client_attr-p_vec_last_query_servers = new vector();reset_client_attr(client_attr);
network_socket* network_socket_pool_get(network_socket_pool *pool)
init_signal_handlers();
//初始化共享内存 if (0 != init_dtm_shm())
if( NULL == ( arr = (byte_array *)malloc(sizeof(byte_array)))){
/* 从isis产生的cache文件加载详细配置 */g_srv-config-check_config_update(); //初始化basic_config.conf_modify_time if ( 0 != g_srv-config-load_config_from_file(g_srv-config-basic_config.config_file_name) ) {
if (0 network_server_start()) {
/* 加载配置 */ if (NULL == (g_srv-config = new config_t())) {
int dtm_shm_alloc(dtm_shm_t *shm)
if (change_cwd_to_install_path() 0) {
network_socket* create_network_socket(int is_client)
network_socket *s = NULL;s = (network_socket *)g_queue_pop_head(pool-sockets);s-use_times++;
void init_signal_handlers()
if (is_client)
inline int get_host_ip()
if ( g_srv-config-basic_config.max_threads MAX_NUM_OF_PROCESS) {
if ( NULL == s) { s = create_network_socket(is_client);
if (NULL == g_srv) {
g_srv-log_path[0] = '\\0';g_srv-conf_path[0] = '\\0';/** 初始化随机数种子 */srand(time(NULL) ^ getpid());return g_srv;
if (g_srv) {g_gracefully_loadconf = 1;
ab_config_file
void notify_gracefully_restart()
/* 初始化: 加载配置、初始化日志、初始化连接池 */ if ( NULL == create_network_server(ab_config_file) )
logger-werror_log_inode = st.st_ino;logger-werror_fd = fd;logger-log_inode = st.st_ino;logger-fd = fd;
static int init_dtm_shm()
void notify_gracefully_loadconf()
void notify_gracefully_shutdown()
//创建的时候,默认设置会话和事务均为自动提交 s-is_auto_commit = 1; s-bind_server = 0; s-is_session_auto_commit = 1;
if(NULL == (arr-data = (unsigned char *)malloc(capacity))){
if ( g_srv ) {g_gracefully_shutdown = 1;
network_server* create_network_server(char *config_file_name)
dtm_accept_mutex_ptr = shm.addr; dtm_log_mutex_ptr = shm.addr + 1 * cl; dtm_binlog_mutex_ptr = shm.addr + 2 * cl;
network_server_attr *server_attr = (network_server_attr *)s-other_attr;server_attr-last_send_query = new string();server_attr-p_vec_self_buf = new vector();server_attr-p_vec_multi_query = new vector();reset_server_attr(server_attr);
int reset_server_attr(network_server_attr *server_attr)
dtm_shm_t shm; shm.size = size; if (dtm_shm_alloc(&shm) != 0) {
0 条评论
回复 删除
下一页