1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33#include <linux/kernel.h>
34#include <linux/random.h>
35#include <linux/export.h>
36
37#include "rds.h"
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71struct workqueue_struct *rds_wq;
72EXPORT_SYMBOL_GPL(rds_wq);
73
74void rds_connect_complete(struct rds_connection *conn)
75{
76 if (!rds_conn_transition(conn, RDS_CONN_CONNECTING, RDS_CONN_UP)) {
77 printk(KERN_WARNING "%s: Cannot transition to state UP, "
78 "current state is %d\n",
79 __func__,
80 atomic_read(&conn->c_state));
81 atomic_set(&conn->c_state, RDS_CONN_ERROR);
82 queue_work(rds_wq, &conn->c_down_w);
83 return;
84 }
85
86 rdsdebug("conn %p for %pI4 to %pI4 complete\n",
87 conn, &conn->c_laddr, &conn->c_faddr);
88
89 conn->c_reconnect_jiffies = 0;
90 set_bit(0, &conn->c_map_queued);
91 queue_delayed_work(rds_wq, &conn->c_send_w, 0);
92 queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
93}
94EXPORT_SYMBOL_GPL(rds_connect_complete);
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114void rds_queue_reconnect(struct rds_connection *conn)
115{
116 unsigned long rand;
117
118 rdsdebug("conn %p for %pI4 to %pI4 reconnect jiffies %lu\n",
119 conn, &conn->c_laddr, &conn->c_faddr,
120 conn->c_reconnect_jiffies);
121
122 set_bit(RDS_RECONNECT_PENDING, &conn->c_flags);
123 if (conn->c_reconnect_jiffies == 0) {
124 conn->c_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
125 queue_delayed_work(rds_wq, &conn->c_conn_w, 0);
126 return;
127 }
128
129 get_random_bytes(&rand, sizeof(rand));
130 rdsdebug("%lu delay %lu ceil conn %p for %pI4 -> %pI4\n",
131 rand % conn->c_reconnect_jiffies, conn->c_reconnect_jiffies,
132 conn, &conn->c_laddr, &conn->c_faddr);
133 queue_delayed_work(rds_wq, &conn->c_conn_w,
134 rand % conn->c_reconnect_jiffies);
135
136 conn->c_reconnect_jiffies = min(conn->c_reconnect_jiffies * 2,
137 rds_sysctl_reconnect_max_jiffies);
138}
139
140void rds_connect_worker(struct work_struct *work)
141{
142 struct rds_connection *conn = container_of(work, struct rds_connection, c_conn_w.work);
143 int ret;
144
145 clear_bit(RDS_RECONNECT_PENDING, &conn->c_flags);
146 if (rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
147 ret = conn->c_trans->conn_connect(conn);
148 rdsdebug("conn %p for %pI4 to %pI4 dispatched, ret %d\n",
149 conn, &conn->c_laddr, &conn->c_faddr, ret);
150
151 if (ret) {
152 if (rds_conn_transition(conn, RDS_CONN_CONNECTING, RDS_CONN_DOWN))
153 rds_queue_reconnect(conn);
154 else
155 rds_conn_error(conn, "RDS: connect failed\n");
156 }
157 }
158}
159
160void rds_send_worker(struct work_struct *work)
161{
162 struct rds_connection *conn = container_of(work, struct rds_connection, c_send_w.work);
163 int ret;
164
165 if (rds_conn_state(conn) == RDS_CONN_UP) {
166 ret = rds_send_xmit(conn);
167 rdsdebug("conn %p ret %d\n", conn, ret);
168 switch (ret) {
169 case -EAGAIN:
170 rds_stats_inc(s_send_immediate_retry);
171 queue_delayed_work(rds_wq, &conn->c_send_w, 0);
172 break;
173 case -ENOMEM:
174 rds_stats_inc(s_send_delayed_retry);
175 queue_delayed_work(rds_wq, &conn->c_send_w, 2);
176 default:
177 break;
178 }
179 }
180}
181
182void rds_recv_worker(struct work_struct *work)
183{
184 struct rds_connection *conn = container_of(work, struct rds_connection, c_recv_w.work);
185 int ret;
186
187 if (rds_conn_state(conn) == RDS_CONN_UP) {
188 ret = conn->c_trans->recv(conn);
189 rdsdebug("conn %p ret %d\n", conn, ret);
190 switch (ret) {
191 case -EAGAIN:
192 rds_stats_inc(s_recv_immediate_retry);
193 queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
194 break;
195 case -ENOMEM:
196 rds_stats_inc(s_recv_delayed_retry);
197 queue_delayed_work(rds_wq, &conn->c_recv_w, 2);
198 default:
199 break;
200 }
201 }
202}
203
204void rds_shutdown_worker(struct work_struct *work)
205{
206 struct rds_connection *conn = container_of(work, struct rds_connection, c_down_w);
207
208 rds_conn_shutdown(conn);
209}
210
211void rds_threads_exit(void)
212{
213 destroy_workqueue(rds_wq);
214}
215
216int rds_threads_init(void)
217{
218 rds_wq = create_singlethread_workqueue("krdsd");
219 if (!rds_wq)
220 return -ENOMEM;
221
222 return 0;
223}
224