Browse Source

- Implement leveldb_load_queue()

- Add MAXQUEUELEN
decke 7 years ago
parent
commit
2b7cb8d042
5 changed files with 80 additions and 3 deletions
  1. 58
    2
      leveldb.c
  2. 1
    0
      leveldb.h
  3. 2
    0
      stomp.h
  4. 18
    1
      stomputil.c
  5. 1
    0
      stomputil.h

+ 58
- 2
leveldb.c View File

@@ -25,6 +25,7 @@
25 25
  */
26 26
 
27 27
 #include <stdio.h>
28
+#include <stdlib.h>
28 29
 #include <string.h>
29 30
 
30 31
 /* atomic_fetchadd */
@@ -36,6 +37,7 @@
36 37
 #include "log.h"
37 38
 #include "util.h"
38 39
 #include "client.h"
40
+#include "stomp.h"
39 41
 
40 42
 #define CheckNoError(err) \
41 43
     if ((err) != NULL) { \
@@ -99,12 +101,12 @@ int leveldb_free(void)
99 101
 int leveldb_add_message(struct queue *queue, char *message)
100 102
 {
101 103
     leveldb_writebatch_t *wb;
102
-    char key[256];
104
+    char key[MAXQUEUELEN+10];
103 105
     char value[16];
104 106
     char *error = NULL;
105 107
     int seq;
106 108
 
107
-    if(strlen(queue->queuename) > strlen(key)-10){
109
+    if(strlen(queue->queuename) >= MAXQUEUELEN){
108 110
         logerror("LevelDB add_message failed: Queuename too long");
109 111
         return 1;
110 112
     }
@@ -130,6 +132,60 @@ int leveldb_add_message(struct queue *queue, char *message)
130 132
         return 1;
131 133
     }
132 134
 
135
+    loginfo("Added message %d to %s: %.20s", queue->write, queue->queuename, message);
136
+
137
+    return 0;
138
+}
139
+
140
+int leveldb_load_queue(struct queue *queue)
141
+{
142
+    char key[MAXQUEUELEN+10];
143
+    char *value;
144
+    char *error = NULL;
145
+    size_t value_len;
146
+
147
+    if(strlen(queue->queuename) >= MAXQUEUELEN){
148
+        logerror("LevelDB add_message failed: Queuename too long");
149
+        return 1;
150
+    }
151
+
152
+    /* queuename.read */
153
+    snprintf(key, sizeof(key)-1, "%s.read", queue->queuename);
154
+    key[sizeof(key)-1] = '\0';
155
+
156
+    value = leveldb_get(db, roptions, key, strlen(key), &value_len, &error);
157
+    if(error != NULL){
158
+       logerror("LevelDB load_queue failed: %s", error);
159
+       return 1;
160
+    }
161
+
162
+    if(value != NULL){
163
+       queue->read = atoi(value)+1;
164
+       free(value);
165
+       value = NULL;
166
+    }
167
+    else
168
+       queue->read = 1;
169
+
170
+
171
+    /* queuename.write */
172
+    snprintf(key, sizeof(key)-1, "%s.write", queue->queuename);
173
+    key[sizeof(key)-1] = '\0';
174
+
175
+    value = leveldb_get(db, roptions, key, strlen(key), &value_len, &error);
176
+    if(error != NULL){
177
+       logerror("LevelDB load_queue failed: %s", error);
178
+       return 1;
179
+    }
180
+
181
+    if(value != NULL){
182
+       queue->write = atoi(value)+1;
183
+       free(value);
184
+       value = NULL;
185
+    }
186
+    else
187
+       queue->write = 1;
188
+
133 189
     return 0;
134 190
 }
135 191
 

+ 1
- 0
leveldb.h View File

@@ -33,5 +33,6 @@ extern int leveldb_free(void);
33 33
 extern int leveldb_add_message(struct queue *queue, char *message);
34 34
 extern char* leveldb_get_message(struct queue *queue);
35 35
 extern int leveldb_ack_message(struct queue *queue);
36
+extern int leveldb_load_queue(struct queue *queue);
36 37
 
37 38
 #endif /* _LEVELDB_H_ */

+ 2
- 0
stomp.h View File

@@ -30,6 +30,8 @@
30 30
 #include <event2/buffer.h>
31 31
 #include <event2/http.h>
32 32
 
33
+#define MAXQUEUELEN	128
34
+
33 35
 enum stomp_direction {
34 36
    STOMP_IN = 1,
35 37
    STOMP_OUT

+ 18
- 1
stomputil.c View File

@@ -39,13 +39,14 @@
39 39
 #include "log.h"
40 40
 #include "stomp.h"
41 41
 #include "stomputil.h"
42
+#include "leveldb.h"
42 43
 
43 44
 
44 45
 struct queue* stomp_add_queue(const char *queuename)
45 46
 {
46 47
    struct queue *entry;
47 48
    
48
-   if(queuename == NULL || strlen(queuename) > 512)
49
+   if(queuename == NULL || strlen(queuename) >= MAXQUEUELEN)
49 50
       return NULL;
50 51
          
51 52
    entry = malloc(sizeof(*entry));
@@ -54,6 +55,16 @@ struct queue* stomp_add_queue(const char *queuename)
54 55
  
55 56
    TAILQ_INIT(&entry->subscribers);
56 57
    TAILQ_INSERT_TAIL(&queues, entry, entries);
58
+
59
+#ifdef WITH_LEVELDB
60
+   if(leveldb_load_queue(entry) != 0){
61
+      stomp_free_queue(entry);
62
+      return NULL;
63
+   }
64
+#else
65
+   entry->read = 1;
66
+   entry->write = 1;
67
+#endif
57 68
        
58 69
    return entry;
59 70
 }  
@@ -71,6 +82,12 @@ struct queue* stomp_find_queue(const char *queuename)
71 82
    return NULL;
72 83
 }
73 84
 
85
+void stomp_free_queue(struct queue *queue)
86
+{
87
+   /* TODO: implement stomp_free_queue */
88
+   ;
89
+}
90
+
74 91
 void stomp_free_client(struct client *client)
75 92
 {        
76 93
    /* TODO: remove all subscriptions */

+ 1
- 0
stomputil.h View File

@@ -29,6 +29,7 @@
29 29
 
30 30
 extern struct queue* stomp_add_queue(const char *queuename);
31 31
 extern struct queue* stomp_find_queue(const char *queuename);
32
+extern void stomp_free_queue(struct queue *queue);
32 33
 
33 34
 extern int stomp_parse_headers(struct evkeyvalq *headers, char *request);
34 35
 extern void stomp_free_client(struct client *client);

Loading…
Cancel
Save