`
CharlesCui
  • 浏览: 415021 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

epoll+消息队列-通过使用pthread_cond_signal的一个简单实现

阅读更多
第一次用epoll去实现一个服务器,
之前并不清楚epoll的用法,
了解之后才发现epoll服务器的主线程其实最好和处理业务的代码分开,
也就是说:
epoll响应外界的io请求,当epoll得到一个请求的时候,扔到一个消息队列中,然后epoll直接返回,再去等待io请求.而消息队列会通知多个线程去处理这些业务逻辑.

epoll第一次用,消息队列更是第一次用,开始一直在想,怎么写个阻塞的队列,而且要有主动通知的功能,想了一会儿发现pthread_cond_wait和pthread_cond_signal可以实现,于是就简单的试试,下面的代码已经可以实现我刚才想要得到的那个模型,细节就不管了.

对消息队列熟悉的同学请帮忙提点意见,并告诉我下还有哪些方法可以实现阻塞的消息队列.

对代码的解释和描述都写到注释中了.

/*
几个用到的类型定义以及全局变量bq
*/
char smtp_cmd_format;
struct epoll_event ev, events[MAX_EPOLL_SIZE];
int kdpfd,nfds;
struct block_queue
{
	int queue[THREADS_COUNT];
	long size;
	pthread_cond_t cond;
	pthread_mutex_t mutex;
}block_queue_t;
block_queue_t bq;
struct block_queue_param
{
	void* func;
	void* queue;
}block_queue_param_t;



void *block_queue(void * param)
{
	void(* func)(void* );
	int fd,i;
	/*
由于block_queue是pthread_create的回调方法,
所以block_queue的参数必须是void*类型
       */
	block_queue_t* bque = (block_queue_param_t*)param->queue;
        /*
param->func是block_queue解锁时需要调用的函数,
而这个函数的参数是一个int fd,
该fd是消息队列中刚刚插入的一个元素.
       */
	func = (block_queue_param_t*)param->func;
	
	for(;;)
	{
/*
lock->wait->unlock
这是经典的模式,
切记:
pthread_cond_wait的方法自带了先解锁,再等待,最后再加锁的代码
*/
		pthread_mutex_lock(bque->mutex);
/*
线程在pthread_cond_wait这里被block住了,
当听到pthread_cond_signal通知的时候,
内核会从阻塞队列里面通过先进先出的原则唤醒一个线程,
这个线程会执行pthread_cond_wait之后的代码.
*/
		pthread_cond_wait(bque->cond,bque->mutex);
		
		if(bque->size==0)
		{
/*
啥也不做
*/
		}else
		{
			fd = bque->queue[0];
/*
移动队列,
由于该队列是简单的用数组保存fd,
所以移动这个操作必不可少,但肯定性能比链表差很多,
这是懒惰的代价
*/
			for(i = 0; i < bque->size; ++i)
				bque->queue[i] = bque->queue[i+1];
				bque->queue[bque->size-1] = 0;
			bque->size--;
/*
执行被唤醒后的方法,参数是刚刚插入到队列中的一个fd
*/
                        func(fd);
		}
		
		pthread_mutex_unlock(bque->mutex);
	}
}

void insert_queue(struct block_queue *bque,int fd)
{
/*
加锁->通知->解锁

将元素插入队列之前需要先加锁
*/
	pthread_mutex_lock(bque->mutex);
/*
检查队列目前的大小,
检查1:
当大小已经达到定义的数组大小THREADS_COUNT时,
抛弃该fd,说明服务器忙不过来了,消息队列已经满了

检查2:
当大小超过数组定义的大小THREADS_COUNT时,
肯定发生了异常,那就直接退出服务吧.
*/
	if(bque->size == THREADS_COUNT)
		return;
/*
bque->size其实也是队列末尾位置指针,
当插入一个元素后,这个指针自然也要向后移动一位.
*/
	bque->queue[bque->size+1] = fd;
	if(++bque->size > THREADS_COUNT)
	{
		fprintf(stderr,"Queue size over folow.%d",bque->size);
		exit 1;
	}
/*
当元素插入bque队列时,
该通过pthread_cond_signal通知内核去调度wait的线程了
*/
	pthread_cond_signal(bque->cond);
	pthread_mutex_unlock(bque->mutex);
	
}

/*
init_threads代码是初始化线程组的,
随便写写的,大家知道怎么实现就行
*/
int init_threads()
{
	size_t i=0;
	block_queue_param_t bqp;
/*
smtp_echo是处理epoll扔进队列中的fd的函数,
该方法实现了整个模型的业务逻辑,
整体代码的IO处理+消息队列以及业务处理分的很清晰,
三个模块每个只有一处代码和其它模块通讯,没有多少耦合.
*/
	bqp.func = (void*)smtp_echo;
	bqp.queue = (void*)bq;
	pthread_cond_init(bqp.cond,NULL);
	pthread_mutex_init(bqp.mutex,NULL);
	for( i = 0; i < THREADS_COUNT; ++i)
	{
		pthread_t child_thread;
	    pthread_attr_t child_thread_attr;
	    pthread_attr_init(&child_thread_attr);
	    pthread_attr_setdetachstate(&child_thread_attr,PTHREAD_CREATE_DETACHED);
	    if( pthread_create(&child_thread,&child_thread_attr,block_queue, (void *)bqp) < 0 )
	    {
			printf("pthread_create Failed : %s\n",strerror(errno));
			return 1;
		}
		else
		{
			printf("pthread_create Success : %d\n",(int)child_thread);
			return 0;
		}
	}

}

/*
handler是主线程访问的方法,
主线程通过handler把一个fd扔到消息队列之后,
不再做任何事情就直接返回了.

在我的应用中,主线程是个epoll实现的服务器,
由于epoll被响应的时候会知道哪些fd已经就位,
于是直接把就位的fd扔到消息队列中就好了,
主线程在继续等待其它fd的响应,而不需要去关心fd如何被处理.
*/

int handler(void* fd)
{
	printf("handler:fd => %d\n",*(int *)(fd));
	insert_queue(&bq,fd);
	return 0;
}


/*
main函数是整个程序的入口点,
也是epoll服务器的实现,
epoll的思想很精髓,用法很简单,
只要把man 4 epoll_ctl的例子copy出来,就可用了,
不过那个例子语法有点问题,
而且events数组是用指针,应该用[]实现,因为指针没有分配空间.
*/
int main(int argc, char **argv)
{	
	int server_socket = init_smtp();
	int n;
	
	if(init_threads() == 0)
		printf("Success full init_threads.");
	
	smtp_cmd_format = "^([a-zA-Z0-9]) (.*)$";
	kdpfd = epoll_create(MAX_EPOLL_SIZE);
    ev.events = EPOLLIN | EPOLLET;
    ev.data.fd = server_socket;
    if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, server_socket, &ev) < 0) {
        fprintf(stderr, "epoll set insertion error: fd=%d < 0",
                server_socket);
        return -1;
    }
	
/*
epoll的使用看这里
*/
    for(;;) {
		struct sockaddr_in local;
		socklen_t length = sizeof(local);
		int client;
		
        nfds = epoll_wait(kdpfd, events, MAX_EPOLL_SIZE, -1);

/*
当没有事件要处理时,epoll会阻塞住,
否则,内核会填充events数组,里面的每一个events[n].data.fd就是发生io时间的文件句柄
*/

        for(n = 0; n < nfds; ++n) {
/*
这里要判断一下请求的来源,
if(events[n].data.fd == server_socket) {
这里是新建的连接,
因为io发生在server_socket上
}
else{
这里是已有的连接,
因为fd!= server_socket
那fd肯定是之前从server_socket接收到,
并且通过epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev)
加入到kdpfd所指向的内存空间中.
kdpfd其实是个文件句柄,在epoll_create(MAX_EPOLL_SIZE)时得到
}
*/
            if(events[n].data.fd == server_socket) {
                client = accept(server_socket, (struct sockaddr *) &local,&length);
                if(client < 0){
                    perror("accept");
                    continue;
                }
                setnonblocking(client);
				smtp_cmd("220",client);
                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = client;
                if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) {
                    fprintf(stderr, "epoll set insertion error: fd=%d < 0",
                            client);
                    return -1;
                }
            }
            else
/*
当已有的fd发生io操作时,
执行如下代码.也就是把fd扔到消息队列中.
*/
                if(handler((void *)&events[n].data.fd) != 0)
                	perror("handler ret != 0");
        }
    }

    close(server_socket);
    return 0;
}
分享到:
评论
5 楼 CharlesCui 2011-01-09  
看我另一篇:http://charlescui.iteye.com/blog/726914
4 楼 LaoLiulaoliu 2011-01-09  
有几处不解,还望楼主指点。

1. bq.size 没有初始化(已修改)
2.block_queue 里面的bque实际指向全局变量bq,这样多个线程都指向这个全局的变量(修改参数传地址的错误)

3.setnonblocking(client)是否是伪代码,可以用下面表示:
fcntl(client, F_SETFL, O_NONBLOCK);

4.init_smtp() , smtp_echo(), smtp_cmd() 3个函数没有实现,这样就不明白楼主要干什么了。

5.如果我有许多的fd需要insert,那么每次insert完成后,都要队列的移动和执行func后才能释放锁,然后再次insert。这样就不能保证很快的响应其他的请求了。

下面我把我修改的代码也贴出来吧:
#include <pthread.h>

#define MAX_EPOLL_SIZE 0x40
#define THREADS_COUNT 0xa
/*
	几个用到的类型定义以及全局变量bq
*/
struct block_queue
{
	int queue[THREADS_COUNT];
	long size;
	pthread_cond_t cond;
	pthread_mutex_t mutex;
}block_queue_t;

struct block_queue_param
{
	void* func;
	void* queue;
}block_queue_param_t;

block_queue_t bq;

void *block_queue(void *param)
{
	void(* func)(void* );
	int fd, i;
	/*
		由于block_queue是pthread_create的回调方法,
		所以block_queue的参数必须是void*类型
		bque实际指向全局变量bq
    */
	block_queue_t *bque = (block_queue_param_t *)param->queue;
    /*
		param->func是block_queue解锁时需要调用的函数,
		而这个函数的参数是一个int fd,
		该fd是消息队列中刚刚插入的一个元素.
    */
	func = (block_queue_param_t*)param->func;
	
	for(;;)
	{
		/*
			lock->wait->unlock
			这是经典的模式,
			切记:
				pthread_cond_wait的方法自带了先解锁,再等待,最后再加锁的代码
				只有一个线程会抢到锁
		*/
		pthread_mutex_lock(&bque->mutex);
		/*
			线程在pthread_cond_wait这里被block住了,
			当听到pthread_cond_signal通知的时候,
			内核会从阻塞队列里面通过先进先出的原则唤醒一个线程,
			这个线程会执行pthread_cond_wait之后的代码.
		*/
		pthread_cond_wait(&bque->cond, &bque->mutex);
		
		if(bque->size==0)
		{
		}else
		{
			fd = bque->queue[0];
			/*
				移动队列,
				由于该队列是简单的用数组保存fd,
				所以移动这个操作必不可少,但肯定性能比链表差很多,
				这是懒惰的代价
			*/
			for(i = 0; i < bque->size - 1; ++i)
				bque->queue[i] = bque->queue[i+1];
			bque->queue[bque->size-1] = 0;
			bque->size--;
			/*
				执行被唤醒后的方法,参数是刚刚插入到队列中的一个fd
			*/
			func(fd);
		}
		
		pthread_mutex_unlock(&bque->mutex);
	}
}

void insert_queue(struct block_queue *bque,int fd)
{
	/*
		加锁->通知->解锁

		将元素插入队列之前需要先加锁
	*/
	pthread_mutex_lock(&bque->mutex);
	/*
		检查队列目前的大小,
		检查1:
		当大小已经达到定义的数组大小THREADS_COUNT时,
		抛弃该fd,说明服务器忙不过来了,消息队列已经满了

		检查2:
		当大小超过数组定义的大小THREADS_COUNT时,
		肯定发生了异常,那就直接退出服务吧.
	*/
	if(bque->size == THREADS_COUNT)
		return;
	/*
		bque->size其实也是队列末尾位置指针,
		当插入一个元素后,这个指针自然也要向后移动一位.
	*/
	bque->queue[bque->size] = fd;
	if(++bque->size > THREADS_COUNT)
	{
		fprintf(stderr,"Queue size over folow.%d",bque->size);
		exit 1;
	}
	/*
		当元素插入bque队列时,
		该通过pthread_cond_signal通知内核去调度wait的线程了
	*/
	pthread_cond_signal(&bque->cond);
	pthread_mutex_unlock(&bque->mutex);
	
}

/*
	init_threads代码是初始化线程组的
*/
int init_threads()
{
	size_t i=0;
	block_queue_param_t bqp;
	/*
		smtp_echo是处理epoll扔进队列中的fd的函数,
		该方法实现了整个模型的业务逻辑,
		整体代码的IO处理+消息队列以及业务处理分的很清晰,
		三个模块每个只有一处代码和其它模块通讯,没有多少耦合.
	*/
	bqp.func = (void*)smtp_echo;
	bqp.queue = (void*)&bq;
	if ( !pthread_mutex_init(bq.mutex, NULL) )
		return 1;
	if ( !pthread_cond_init(bq.cond, NULL) )
		return 1;
	for(i = 0; i < THREADS_COUNT; ++i)
	{
		pthread_t child_thread;
	    pthread_attr_t child_thread_attr;
	    if ( pthread_attr_init(&child_thread_attr) )
			continue;
	    pthread_attr_setdetachstate(&child_thread_attr,PTHREAD_CREATE_DETACHED);
	    if(pthread_create(&child_thread,&child_thread_attr,block_queue,(void *)&bqp) != 0)
	    {
			printf("pthread_create Failed : %s\n",strerror(errno));
			continue;
		}
		else
		{
			printf("pthread_create Success : %d\n",(int)child_thread);
		}
		pthread_attr_destroy(&child_thread_attr);
	}

	return 0;
}

/*
	handler是主线程访问的方法,
	主线程通过handler把一个fd扔到消息队列之后,
	不再做任何事情就直接返回了.

	在我的应用中,主线程是个epoll实现的服务器,
	由于epoll被响应的时候会知道哪些fd已经就位,
	于是直接把就位的fd扔到消息队列中就好了,
	主线程在继续等待其它fd的响应,而不需要去关心fd如何被处理.
*/

int handler(void* fd)
{
	printf("handler:fd => %d\n",*(int *)(fd));
	insert_queue(&bq,fd);
	return 0;
}


/*
	main函数是整个程序的入口点,
	也是epoll服务器的实现,
	epoll的思想很精髓,用法很简单,
	只要把man 4 epoll_ctl的例子copy出来,就可用了,
	不过那个例子语法有点问题,
	而且events数组是用指针,应该用[]实现,因为指针没有分配空间.
*/
int main(int argc, char **argv)
{	
	char smtp_cmd_format;
	int kdpfd, nfds;
	struct epoll_event ev, events[MAX_EPOLL_SIZE];

	int server_socket = init_smtp();
	int n;
	
	bq.size = 0;
	if(init_threads() == 0)
		printf("Success full init_threads.");
	
	smtp_cmd_format = "^([a-zA-Z0-9]) (.*)$";
	kdpfd = epoll_create(MAX_EPOLL_SIZE);
	//kdpfd = epoll_create1(0);
	if (kdpfd == -1) {
		perror("epoll_create1");
		return -1;
	}
    ev.events = EPOLLIN | EPOLLET;
    ev.data.fd = server_socket;
    if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, server_socket, &ev) < 0) {
        fprintf(stderr, "epoll set insertion error: fd=%d < 0",
                server_socket);
        return -1;
    }
	
	/*
		epoll的使用看这里
	*/
    for(;;) {
		struct sockaddr_in local;
		socklen_t length = sizeof(local);
		int client;
		
        nfds = epoll_wait(kdpfd, events, MAX_EPOLL_SIZE, -1);

		/*
			当没有事件要处理时,epoll会阻塞住,
			否则,内核会填充events数组,里面的每一个events[n].data.fd就是发生io时间的文件句柄
		*/

        for(n = 0; n < nfds; ++n) {
		/*
			这里要判断一下请求的来源,
			if(events[n].data.fd == server_socket) {
				这里是新建的连接,
				因为io发生在server_socket上
			}
			else{
				这里是已有的连接,
				因为fd!= server_socket
				那fd肯定是之前从server_socket接收到,
				并且通过epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev)
				加入到kdpfd所指向的内存空间中.
				kdpfd其实是个文件句柄,在epoll_create(MAX_EPOLL_SIZE)时得到
			}
		*/
            if(events[n].data.fd == server_socket) {
                client = accept(server_socket, (struct sockaddr *) &local,&length);
                if(client < 0){
                    perror("accept");
                    continue;
                }
                setnonblocking(client);
				smtp_cmd("220",client);
                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = client;
                if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) {
                    fprintf(stderr, "epoll set insertion error: fd=%d < 0",
                            client);
                    return -1;
                }
            }
            else
			/*
				当已有的fd发生io操作时,
				执行如下代码.也就是把fd扔到消息队列中.
			*/
                if(handler((void *)&events[n].data.fd) != 0)
                	perror("handler ret != 0");
        }
    }

    close(server_socket);
    return 0;
}
3 楼 CharlesCui 2010-07-15  
sunzixun 写道
attr 没有destory 内存泄漏


:)谢谢提醒

这个代码是完全编译不通过的,仔细看结构体的定义都有问题,我对语法还不是很记得清,要是有同学想编译的话,我还有个可执行的版本可以提供.
2 楼 sunzixun 2010-07-14  
attr 没有destory 内存泄漏
1 楼 sunzixun 2010-07-14  
msgrcv (key, you_buffer, mesg_len, mesg_type, (m_iBlock ? MSG_NOERROR : IPC_NOWAIT|MSG_NOERROR));

相关推荐

Global site tag (gtag.js) - Google Analytics