1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Fluent Bit
4  *  ==========
5  *  Copyright (C) 2019-2021 The Fluent Bit Authors
6  *  Copyright (C) 2015-2018 Treasure Data Inc.
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 #ifndef FLB_CORO_H
22 #define FLB_CORO_H
23 
24 /* Required by OSX */
25 #ifndef _XOPEN_SOURCE
26 #define _XOPEN_SOURCE
27 #endif
28 
29 #ifndef _DEFAULT_SOURCE
30 #define _DEFAULT_SOURCE
31 #endif
32 
33 #include <fluent-bit/flb_info.h>
34 #include <fluent-bit/flb_macros.h>
35 #include <fluent-bit/flb_log.h>
36 #include <fluent-bit/flb_mem.h>
37 
38 #include <monkey/mk_core.h>
39 
40 #include <stdlib.h>
41 #include <limits.h>
42 #include <libco.h>
43 
44 #ifdef FLB_HAVE_VALGRIND
45 #include <valgrind/valgrind.h>
46 #endif
47 
48 #ifdef __cplusplus
49 extern "C" {
50 #endif
51 
52 struct flb_coro {
53 
54 #ifdef FLB_HAVE_VALGRIND
55     unsigned int valgrind_stack_id;
56 #endif
57 
58     /* libco 'contexts' */
59     cothread_t caller;
60     cothread_t callee;
61 
62     void *data;
63 };
64 
65 #ifdef FLB_CORO_STACK_SIZE
66 #define FLB_CORO_STACK_SIZE      FLB_CORO_STACK_SIZE
67 #else
68 #define FLB_CORO_STACK_SIZE      ((6 * PTHREAD_STACK_MIN) / 2)
69 #endif
70 
71 #define FLB_CORO_DATA(coro)      (((char *) coro) + sizeof(struct flb_coro))
72 
flb_coro_yield(struct flb_coro * coro,int ended)73 static FLB_INLINE void flb_coro_yield(struct flb_coro *coro, int ended)
74 {
75     co_switch(coro->caller);
76 }
77 
flb_coro_destroy(struct flb_coro * coro)78 static FLB_INLINE void flb_coro_destroy(struct flb_coro *coro)
79 {
80     flb_trace("[coro] destroy coroutine=%p data=%p", coro,
81               FLB_CORO_DATA(coro));
82 
83 #ifdef FLB_HAVE_VALGRIND
84     VALGRIND_STACK_DEREGISTER(coro->valgrind_stack_id);
85 #endif
86 
87     if (coro->callee != NULL) {
88         co_delete(coro->callee);
89     }
90 
91     flb_free(coro);
92 }
93 
94 #define flb_coro_return(th) co_switch(th->caller)
95 
96 void flb_coro_init();
97 void flb_coro_thread_init();
98 
99 struct flb_coro *flb_coro_get();
100 void flb_coro_set(struct flb_coro *coro);
101 
flb_coro_resume(struct flb_coro * coro)102 static FLB_INLINE void flb_coro_resume(struct flb_coro *coro)
103 {
104     flb_coro_set(coro);
105     coro->caller = co_active();
106     co_switch(coro->callee);
107 }
108 
flb_coro_create(void * data)109 static FLB_INLINE struct flb_coro *flb_coro_create(void *data)
110 {
111     struct flb_coro *coro;
112 
113     /* Create a thread context and initialize */
114     coro = (struct flb_coro *) flb_calloc(1, sizeof(struct flb_coro));
115     if (!coro) {
116         flb_errno();
117         return NULL;
118     }
119     coro->data = data;
120     return coro;
121 }
122 
123 #ifdef __cplusplus
124 }
125 #endif
126 
127 #endif /* !FLB_CORO_H */
128