-
Notifications
You must be signed in to change notification settings - Fork 1
/
red_cmd_pipeline_builder.cpp
129 lines (107 loc) · 2.75 KB
/
red_cmd_pipeline_builder.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#include "red_cmd_pipeline_builder.h"
#include <stdio.h>
#include <string.h>
#include <assert.h>
#define REDIS_COMMAND_PIPELINE_BUILDER_POOL_SIZE 4096
#define REDIS_COMMAND_PIPELINE_BUSY_MAX 1024
red_cmd_pipeline_chain_t *
alloc_red_cmd_pipeline_chain_link(nx_pool_t *pool, red_cmd_pipeline_chain_t **ll, red_cmd_pipeline_chain_t **free)
{
red_cmd_pipeline_chain_t *cl, *ln;
red_cmd_pipeline_t *cp;
if (*free) {
cl = *free;
*free = cl->next;
cl->next = NULL;
return cl;
}
else {
cl = (red_cmd_pipeline_chain_t *)nx_palloc(pool, sizeof(red_cmd_pipeline_chain_t));
if (cl == NULL) {
return NULL;
}
nx_memzero(cl, sizeof(red_cmd_pipeline_chain_t));
cp = new_red_cmd_pipeline();
cl->elem = cp;
cl->next = 0;
/* link */
if ((*ll) == NULL) {
(*ll) = cl;
}
else {
for (ln = (*ll); ln->next; ln = ln->next) { /* void */ }
ln->next = cl;
}
return cl;
}
}
red_cmd_pipeline_builder_t *
create_red_cmd_pipeline_builder()
{
red_cmd_pipeline_builder_t *cpb = (red_cmd_pipeline_builder_t *)nx_alloc(sizeof(red_cmd_pipeline_builder_t));
nx_memzero(cpb, sizeof(red_cmd_pipeline_builder_t));
cpb->pool = nx_create_pool(REDIS_COMMAND_PIPELINE_BUILDER_POOL_SIZE);
return cpb;
}
void
destroy_red_cmd_pipeline_builder(red_cmd_pipeline_builder_t *cpb)
{
red_cmd_pipeline_chain_t *ln;
for (ln = cpb->cpl; ln; ln = ln->next) {
FREE_RED_CMD_PIPELINE_CHAIN_LINK(ln, &cpb->freel);
--cpb->nbusy;
++cpb->nfree;
}
assert(0 == cpb->nbusy);
for (ln = cpb->freel; ln; ln = ln->next) {
delete_red_cmd_pipeline(ln->elem);
ln->elem = NULL;
--cpb->nfree;
}
assert(0 == cpb->nfree);
nx_destroy_pool(cpb->pool);
nx_free(cpb);
}
void
red_cmd_pipeline_builder_update(red_cmd_pipeline_builder_t *cpb)
{
red_cmd_pipeline_chain_t *ln;
red_cmd_pipeline_t *cp;
if (cpb->nbusy >= REDIS_COMMAND_PIPELINE_BUSY_MAX) {
for (ln = cpb->cpl; ln; ln = ln->next) {
cp = ln->elem;
if (red_cmd_pipeline_t::PROCESS_OVER == cp->_state) {
FREE_RED_CMD_PIPELINE_CHAIN_LINK(ln, &cpb->freel);
--cpb->nbusy;
++cpb->nfree;
}
else {
break;
}
}
}
}
red_cmd_pipeline_t *
new_red_cmd_pipeline()
{
red_cmd_pipeline_t *cp = new red_cmd_pipeline_t;
reset_red_cmd_pipeline(cp);
return cp;
}
void
delete_red_cmd_pipeline(red_cmd_pipeline_t *cp)
{
delete cp;
}
void
reset_red_cmd_pipeline(red_cmd_pipeline_t *cp)
{
cp->_sn = 0;
cp->_commands.reserve(1024);
cp->_commands.resize(0);
cp->_built_num = 0;
cp->_processed_num = 0;
cp->_reply_cb = NULL;
cp->_dispose_cb = NULL;
cp->_state = red_cmd_pipeline_t::IDLE;
}