13 #include <event2/event.h>
14 #include <event2/thread.h>
17 #include "evhtp/thread.h"
31 #ifdef EVTHR_SHARED_PIPE
52 #ifdef EVTHR_SHARED_PIPE
54 struct event * shared_pool_ev;
56 TAILQ_ENTRY(
evthr) next;
59 #define _evthr_read(thr, cmd, sock) \
60 (recv(sock, cmd, sizeof(evthr_cmd_t), 0) == sizeof(evthr_cmd_t)) ? 1 : 0
68 if (!(thread = (evthr_t *)
args)) {
78 (cmd.cb)(thread, cmd.args, thread->arg);
83 event_base_loopbreak(thread->evbase);
93 if (!(thread = (evthr_t *)
args)) {
97 if (thread == NULL || thread->thr == NULL) {
101 thread->evbase = event_base_new();
102 thread->event = event_new(thread->evbase, thread->rdr,
105 event_add(thread->event, NULL);
107 #ifdef EVTHR_SHARED_PIPE
108 if (thread->pool_rdr > 0) {
109 thread->shared_pool_ev = event_new(thread->evbase, thread->pool_rdr,
111 event_add(thread->shared_pool_ev, NULL);
115 pthread_mutex_lock(&thread->lock);
116 if (thread->init_cb != NULL) {
117 (thread->init_cb)(thread, thread->arg);
119 pthread_mutex_unlock(&thread->lock);
121 event_base_loop(thread->evbase, 0);
123 pthread_mutex_lock(&thread->lock);
124 if (thread->exit_cb != NULL) {
125 (thread->exit_cb)(thread, thread->arg);
127 pthread_mutex_unlock(&thread->lock);
129 if (thread->err == 1) {
130 fprintf(stderr,
"FATAL ERROR!\n");
144 if (send(thread->wdr, &cmd,
sizeof(cmd), 0) <= 0) {
145 return EVTHR_RES_RETRY;
159 if (send(thread->wdr, &cmd,
sizeof(evthr_cmd_t), 0) < 0) {
160 return EVTHR_RES_RETRY;
163 pthread_join(*thread->thr, NULL);
169 return thr ? thr->evbase : NULL;
181 return thr ? thr->aux : NULL;
211 if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) {
215 evutil_make_socket_nonblocking(fds[0]);
216 evutil_make_socket_nonblocking(fds[1]);
218 if (!(thread = calloc(
sizeof(evthr_t), 1))) {
222 thread->thr = malloc(
sizeof(pthread_t));
224 thread->rdr = fds[0];
225 thread->wdr = fds[1];
230 if (pthread_mutex_init(&thread->lock, NULL)) {
250 if (thread == NULL || thread->thr == NULL) {
254 if (pthread_create(thread->thr, NULL,
_evthr_loop, (
void *)thread)) {
263 if (thread == NULL) {
267 if (thread->rdr > 0) {
271 if (thread->wdr > 0) {
280 event_free(thread->event);
283 if (thread->evbase) {
284 event_base_free(thread->evbase);
300 TAILQ_REMOVE(&pool->threads, thread, next);
314 return EVTHR_RES_FATAL;
326 #ifdef EVTHR_SHARED_PIPE
333 if (
evhtp_unlikely(send(pool->wdr, &cmd,
sizeof(cmd), 0) == -1)) {
334 return EVTHR_RES_RETRY;
339 evthr_t * thr = NULL;
342 return EVTHR_RES_FATAL;
346 return EVTHR_RES_NOCB;
349 thr = TAILQ_FIRST(&pool->threads);
351 TAILQ_REMOVE(&pool->threads, thr, next);
352 TAILQ_INSERT_TAIL(&pool->threads, thr, next);
359 static evthr_pool_t *
361 evthr_init_cb init_cb,
362 evthr_exit_cb exit_cb,
367 #ifdef EVTHR_SHARED_PIPE
375 if (!(pool = calloc(
sizeof(evthr_pool_t), 1))) {
379 pool->nthreads = nthreads;
380 TAILQ_INIT(&pool->threads);
382 #ifdef EVTHR_SHARED_PIPE
383 if (evutil_socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) == -1) {
387 evutil_make_socket_nonblocking(fds[0]);
388 evutil_make_socket_nonblocking(fds[1]);
394 for (i = 0; i < nthreads; i++) {
402 #ifdef EVTHR_SHARED_PIPE
403 thread->pool_rdr = fds[0];
406 TAILQ_INSERT_TAIL(&pool->threads, thread, next);
419 evthr_init_cb init_cb,
420 evthr_exit_cb exit_cb,
void * shared) {
426 evthr_t *
evthr = NULL;
432 TAILQ_FOREACH(
evthr, &pool->threads, next) {