Browse Source

- Introduce stomputil.c|h and add a few more queue functions

decke 6 years ago
parent
commit
74800232ef
6 changed files with 217 additions and 110 deletions
  1. 1
    1
      Makefile
  2. 1
    0
      server.c
  3. 35
    106
      stomp.c
  4. 0
    3
      stomp.h
  5. 144
    0
      stomputil.c
  6. 36
    0
      stomputil.h

+ 1
- 1
Makefile View File

@@ -20,7 +20,7 @@ SRC+=	leveldb.c
20 20
 CFLAGS+=-I${LOCALBASE}/include
21 21
 LDFLAGS+=-L${LOCALBASE}/lib/event2 -L${LOCALBASE}/lib
22 22
 
23
-SRC+=	log.c util.c server.c common.c stomp.c
23
+SRC+=	log.c util.c server.c common.c stomp.c stomputil.c
24 24
 OBJS=	${SRC:.c=.o}
25 25
 
26 26
 all:	redqd

+ 1
- 0
server.c View File

@@ -56,6 +56,7 @@
56 56
 #include "server.h"
57 57
 #include "client.h"
58 58
 #include "stomp.h"
59
+#include "stomputil.h"
59 60
 #include "leveldb.h"
60 61
 
61 62
 struct event_base *base;

+ 35
- 106
stomp.c View File

@@ -41,6 +41,8 @@
41 41
 #include "server.h"
42 42
 #include "client.h"
43 43
 #include "stomp.h"
44
+#include "stomputil.h"
45
+#include "leveldb.h"
44 46
 
45 47
 /* internal data structs */
46 48
 struct CommandHandler
@@ -216,7 +218,7 @@ int stomp_disconnect(struct client *client)
216 218
 
217 219
 int stomp_subscribe(struct client *client)
218 220
 {
219
-   struct queue *entry, *tmp_entry;
221
+   struct queue *entry;
220 222
    const char *queuename;
221 223
 
222 224
    client->response_cmd = STOMP_CMD_NONE;
@@ -228,24 +230,16 @@ int stomp_subscribe(struct client *client)
228 230
       return 1;
229 231
    }
230 232
          
231
-   for (entry = TAILQ_FIRST(&queues); entry != NULL; entry = tmp_entry) {
232
-      tmp_entry = TAILQ_NEXT(entry, entries);
233
-      if (strcmp(entry->queuename, queuename) == 0){
234
-         entry = tmp_entry;
235
-         break;
236
-      }
237
-   }
238
-
233
+   entry = stomp_find_queue(queuename);
239 234
    if (entry == NULL){
240
-      entry = malloc(sizeof(*entry));
241
-      entry->queuename = malloc(strlen(queuename)+1);
242
-      strcpy(entry->queuename, queuename);
243
-      TAILQ_INIT(&entry->subscribers);
244
-      TAILQ_INSERT_TAIL(&queues, entry, entries);
235
+      entry = stomp_add_queue(queuename);
236
+      if(entry == NULL){
237
+         client->response_cmd = STOMP_CMD_ERROR;
238
+         evhttp_add_header(client->response_headers, "message", "Could not create destination");
239
+         return 1;
240
+      }
245 241
    }
246 242
 
247
-   /* TODO: check if already subscribed */
248
-
249 243
    TAILQ_INSERT_TAIL(&entry->subscribers, client, entries);
250 244
 
251 245
    return 0;
@@ -264,107 +258,42 @@ int stomp_send(struct client *client)
264 258
       return 1;
265 259
    }
266 260
 
267
-   TAILQ_FOREACH(queue, &queues, entries) {
268
-      if(strcmp(queue->queuename, queuename) == 0)
269
-         break;
270
-   }
271
-
272
-   if(queue != NULL){
273
-      /* Send it out to the subscribers */
274
-      TAILQ_FOREACH(subscriber, &queue->subscribers, entries){
275
-         subscriber->response_cmd = STOMP_CMD_MESSAGE;
276
-         subscriber->response_headers = client->request_headers;
277
-         subscriber->response = client->request_body;
278
-
279
-         stomp_handle_response(subscriber);
280
-
281
-         subscriber->response_cmd = STOMP_CMD_NONE;
282
-         subscriber->response_headers = NULL;
283
-         subscriber->response = NULL;
261
+   queue = stomp_find_queue(queuename);
262
+   if (queue == NULL){
263
+      queue = stomp_add_queue(queuename);
264
+      if(queue == NULL){
265
+         client->response_cmd = STOMP_CMD_ERROR;
266
+         evhttp_add_header(client->response_headers, "message", "Creating destination failed");
267
+         return 1;
284 268
       }
285 269
    }
286
-   else if(strncmp(queuename, "/topic/", 7) != 0){
287
-      /* TODO: Store message in LevelDB */
288
-
289
-      loginfo("No active subscribers on that queue");
290
-   }
291
-
292
-   client->response_cmd = STOMP_CMD_NONE;
293
-   client->response = NULL;
294
-
295
-   return 0;
296
-}
297
-
298 270
 
299
-int stomp_parse_headers(struct evkeyvalq *headers, char *request)
300
-{
301
-   char *line;
302
-   size_t line_length;
303
-   char *skey, *svalue;
304
-   struct evbuffer *buffer;
305
-
306
-   buffer = evbuffer_new();
307
-
308
-   evbuffer_add(buffer, request, strlen(request));
309
-
310
-   while ((line = evbuffer_readln(buffer, &line_length, EVBUFFER_EOL_CRLF)) != NULL) {
311
-      skey = NULL;
312
-      svalue = NULL;
313
-
314
-      if(strchr(line, ':') == NULL){
315
-         continue;
316
-      }
317
-
318
-      /* Processing of header lines */
319
-      svalue = line;
320
-      skey = strsep(&svalue, ":");
321
-      if (svalue == NULL){
322
-         free(line);
323
-         evbuffer_free(buffer);
324
-         return 2;
325
-      }
326
-
327
-      svalue += strspn(svalue, " ");
328
-
329
-      /* TODO: check if header with same name already parsed */
330
-
331
-      if (evhttp_add_header(headers, skey, svalue) == -1){
332
-         free(line);
333
-         evbuffer_free(buffer);
271
+#ifdef WITH_LEVELDB
272
+   if(strncmp(queuename, "/topic/", 7) != 0){
273
+      if(leveldb_add_message(queue, client->request) != 0){
274
+         client->response_cmd = STOMP_CMD_ERROR;
275
+         evhttp_add_header(client->response_headers, "message", "Storing message failed");
334 276
          return 1;
335 277
       }
336
-
337
-      free(line);
338 278
    }
279
+#endif
339 280
 
340
-   evbuffer_free(buffer);
341
-
342
-   return 0;
343
-}
344
-
345
-
346
-void stomp_free_client(struct client *client)
347
-{
348
-   /* TODO: remove all subscriptions */
349
-   /* TODO: free all allocated memory */
281
+   /* Send it out to the subscribers */
282
+   TAILQ_FOREACH(subscriber, &queue->subscribers, entries){
283
+      subscriber->response_cmd = STOMP_CMD_MESSAGE;
284
+      subscriber->response_headers = client->request_headers;
285
+      subscriber->response = client->request_body;
350 286
 
351
-   struct client *entry, *tmp_entry;
287
+      stomp_handle_response(subscriber);
352 288
 
353
-   for (entry = TAILQ_FIRST(&clients); entry != NULL; entry = tmp_entry) {
354
-      tmp_entry = TAILQ_NEXT(entry, entries);
355
-      if ((void *)tmp_entry != NULL && client->fd == tmp_entry->fd) {
356
-         TAILQ_REMOVE(&clients, entry, entries);
357
-         free(entry);
358
-      }
289
+      subscriber->response_cmd = STOMP_CMD_NONE;
290
+      subscriber->response_headers = NULL;
291
+      subscriber->response = NULL;
359 292
    }
360 293
 
361
-   client->authenticated = 0;
362
-   logwarn("Free client %d", client->fd);
294
+   client->response_cmd = STOMP_CMD_NONE;
295
+   client->response = NULL;
363 296
 
364
-   if(client->response_cmd == STOMP_CMD_DISCONNECT){
365
-      bufferevent_free(client->bev);
366
-      close(client->fd);
367
-      free(client);
368
-   }
297
+   return 0;
369 298
 }
370 299
 

+ 0
- 3
stomp.h View File

@@ -56,8 +56,5 @@ extern int stomp_send(struct client *client);
56 56
 
57 57
 extern int stomp_handle_request(struct client *client);
58 58
 extern int stomp_handle_response(struct client *client);
59
-extern int stomp_parse_headers(struct evkeyvalq *headers, char *request);
60
-extern void stomp_free_client(struct client *client);
61
-
62 59
  
63 60
 #endif /* _STOMP_H_ */

+ 144
- 0
stomputil.c View File

@@ -0,0 +1,144 @@
1
+/*
2
+ * Copyright (C) 2011 Bernhard Froehlich <decke@bluelife.at>
3
+ * All rights reserved.
4
+ *
5
+ * Redistribution and use in source and binary forms, with or without
6
+ * modification, are permitted provided that the following conditions
7
+ * are met:
8
+ *
9
+ * 1. Redistributions of source code must retain the above copyright
10
+ *    notice, this list of conditions and the following disclaimer.
11
+ * 2. Author's name may not be used endorse or promote products derived
12
+ *    from this software without specific prior written permission.
13
+ *
14
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
15
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
+ * DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
18
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
19
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
20
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
22
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
23
+ * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
24
+ * POSSIBILITY OF SUCH DAMAGE.
25
+ */
26
+
27
+#include <stdlib.h>
28
+#include <string.h>
29
+#include <sys/queue.h>
30
+#include <unistd.h>
31
+
32
+#include <event2/event.h>
33
+#include <event2/buffer.h>
34
+#include <event2/bufferevent.h>
35
+#include <event2/http.h>
36
+#include <event2/keyvalq_struct.h>
37
+
38
+#include "client.h"
39
+#include "log.h"
40
+#include "stomp.h"
41
+#include "stomputil.h"
42
+
43
+
44
+struct queue* stomp_add_queue(const char *queuename)
45
+{
46
+   struct queue *entry;
47
+   
48
+   if(queuename == NULL || strlen(queuename) > 512)
49
+      return NULL;
50
+         
51
+   entry = malloc(sizeof(*entry));
52
+   entry->queuename = malloc(strlen(queuename)+1);
53
+   strcpy(entry->queuename, queuename);
54
+ 
55
+   TAILQ_INIT(&entry->subscribers);
56
+   TAILQ_INSERT_TAIL(&queues, entry, entries);
57
+       
58
+   return entry;
59
+}  
60
+   
61
+struct queue* stomp_find_queue(const char *queuename)
62
+{
63
+   struct queue *queue;
64
+
65
+   TAILQ_FOREACH(queue, &queues, entries) {
66
+      if(strcmp(queue->queuename, queuename) == 0){
67
+         return queue;
68
+      }
69
+   }
70
+
71
+   return NULL;
72
+}
73
+
74
+void stomp_free_client(struct client *client)
75
+{        
76
+   /* TODO: remove all subscriptions */
77
+   /* TODO: free all allocated memory */
78
+
79
+   struct client *entry, *tmp_entry;
80
+         
81
+   for (entry = TAILQ_FIRST(&clients); entry != NULL; entry = tmp_entry) {
82
+      tmp_entry = TAILQ_NEXT(entry, entries);
83
+      if ((void *)tmp_entry != NULL && client->fd == tmp_entry->fd) {
84
+         TAILQ_REMOVE(&clients, entry, entries);
85
+         free(entry);
86
+      }
87
+   }
88
+
89
+   client->authenticated = 0;
90
+   logwarn("Free client %d", client->fd);
91
+
92
+   if(client->response_cmd == STOMP_CMD_DISCONNECT){
93
+      bufferevent_free(client->bev);
94
+      close(client->fd);
95
+      free(client);
96
+   }
97
+}
98
+
99
+int stomp_parse_headers(struct evkeyvalq *headers, char *request)
100
+{
101
+   char *line;
102
+   size_t line_length;
103
+   char *skey, *svalue;
104
+   struct evbuffer *buffer;
105
+
106
+   buffer = evbuffer_new();
107
+
108
+   evbuffer_add(buffer, request, strlen(request));
109
+
110
+   while ((line = evbuffer_readln(buffer, &line_length, EVBUFFER_EOL_CRLF)) != NULL) {
111
+      skey = NULL;
112
+      svalue = NULL;
113
+
114
+      if(strchr(line, ':') == NULL){
115
+         continue;
116
+      }
117
+
118
+      /* Processing of header lines */
119
+      svalue = line;
120
+      skey = strsep(&svalue, ":");
121
+      if (svalue == NULL){
122
+         free(line);
123
+         evbuffer_free(buffer);
124
+         return 2;
125
+      }
126
+
127
+      svalue += strspn(svalue, " ");
128
+
129
+      /* TODO: check if header with same name already parsed */
130
+
131
+      if (evhttp_add_header(headers, skey, svalue) == -1){
132
+         free(line);
133
+         evbuffer_free(buffer);
134
+         return 1;
135
+      }
136
+
137
+      free(line);
138
+   }
139
+
140
+   evbuffer_free(buffer);
141
+
142
+   return 0;
143
+}
144
+

+ 36
- 0
stomputil.h View File

@@ -0,0 +1,36 @@
1
+/*
2
+ * Copyright (C) 2011 Bernhard Froehlich <decke@bluelife.at>
3
+ * All rights reserved.
4
+ *
5
+ * Redistribution and use in source and binary forms, with or without
6
+ * modification, are permitted provided that the following conditions
7
+ * are met:
8
+ *
9
+ * 1. Redistributions of source code must retain the above copyright
10
+ *    notice, this list of conditions and the following disclaimer.
11
+ * 2. Author's name may not be used endorse or promote products derived
12
+ *    from this software without specific prior written permission.
13
+ *
14
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
15
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
+ * DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
18
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
19
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
20
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
22
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
23
+ * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
24
+ * POSSIBILITY OF SUCH DAMAGE.
25
+ */
26
+
27
+#ifndef _STOMPUTIL_H_
28
+#define _STOMPUTIL_H_
29
+
30
+extern struct queue* stomp_add_queue(const char *queuename);
31
+extern struct queue* stomp_find_queue(const char *queuename);
32
+
33
+extern int stomp_parse_headers(struct evkeyvalq *headers, char *request);
34
+extern void stomp_free_client(struct client *client);
35
+
36
+#endif /* _STOMPUTIL_H_ */

Loading…
Cancel
Save