Browse Source

- Implement leveldb_add_message()

decke 6 years ago
parent
commit
2f2238e270
3 changed files with 73 additions and 1 deletions
  1. 2
    0
      client.h
  2. 67
    1
      leveldb.c
  3. 4
    0
      leveldb.h

+ 2
- 0
client.h View File

@@ -81,6 +81,8 @@ TAILQ_HEAD(, client) clients;
81 81
 
82 82
 struct queue {
83 83
    char *queuename;
84
+   volatile int read;
85
+   volatile int write;
84 86
 
85 87
    TAILQ_HEAD(, client) subscribers;
86 88
    TAILQ_ENTRY(queue) entries;

+ 67
- 1
leveldb.c View File

@@ -24,11 +24,18 @@
24 24
  * POSSIBILITY OF SUCH DAMAGE.
25 25
  */
26 26
 
27
-/* LevelDB */
27
+#include <stdio.h>
28
+#include <string.h>
29
+
30
+/* atomic_fetchadd */
31
+#include <sys/types.h>
32
+#include <machine/atomic.h>
33
+
28 34
 #include <leveldb/c.h>
29 35
 
30 36
 #include "log.h"
31 37
 #include "util.h"
38
+#include "client.h"
32 39
 
33 40
 #define CheckNoError(err) \
34 41
     if ((err) != NULL) { \
@@ -41,6 +48,8 @@ leveldb_t* db;
41 48
 leveldb_cache_t* cache;
42 49
 leveldb_env_t* env;
43 50
 leveldb_options_t* options;
51
+leveldb_readoptions_t* roptions;
52
+leveldb_writeoptions_t* woptions;
44 53
 
45 54
 
46 55
 int leveldb_init(void)
@@ -57,6 +66,13 @@ int leveldb_init(void)
57 66
     leveldb_options_set_create_if_missing(options, 1);
58 67
     leveldb_options_set_error_if_exists(options, 0);
59 68
 
69
+    roptions = leveldb_readoptions_create();
70
+    leveldb_readoptions_set_verify_checksums(roptions, 1);
71
+    leveldb_readoptions_set_fill_cache(roptions, 0);
72
+
73
+    woptions = leveldb_writeoptions_create();
74
+    leveldb_writeoptions_set_sync(woptions, 1);
75
+
60 76
     db = leveldb_open(options, configget("dbFile"), &error);
61 77
     if(error != NULL){
62 78
        logerror("LevelDB Error: %s", error);
@@ -71,7 +87,57 @@ int leveldb_free(void)
71 87
 {
72 88
     leveldb_close(db);
73 89
     leveldb_options_destroy(options);
90
+    leveldb_readoptions_destroy(roptions);
91
+    leveldb_writeoptions_destroy(woptions);
74 92
     leveldb_cache_destroy(cache);
75 93
     leveldb_env_destroy(env);
76 94
 }
77 95
 
96
+
97
+int leveldb_add_message(struct queue *queue, char *message)
98
+{
99
+    leveldb_writebatch_t *wb;
100
+    char key[256];
101
+    char value[16];
102
+    char *error = NULL;
103
+    int seq;
104
+
105
+    if(strlen(queue->queuename) > strlen(key)-10){
106
+        logerror("LevelDB add_message failed: Queuename too long");
107
+        return 1;
108
+    }
109
+
110
+    seq = atomic_fetchadd_int(&queue->write, 1);
111
+    wb = leveldb_writebatch_create();
112
+
113
+    snprintf(key, sizeof(key)-1, "%s.%ld", queue->queuename, seq);
114
+    key[sizeof(key)-1] = '\0';
115
+    leveldb_writebatch_put(wb, key, strlen(key), message, strlen(message));
116
+
117
+    snprintf(key, sizeof(key)-1, "%s.write", queue->queuename);
118
+    key[sizeof(key)-1] = '\0';
119
+    
120
+    sprintf(value, "%s", seq);
121
+    leveldb_writebatch_put(wb, key, strlen(key), value, strlen(value));
122
+
123
+    leveldb_write(db, woptions, wb, &error);
124
+    leveldb_writebatch_destroy(wb);
125
+
126
+    if(error != NULL){
127
+        logerror("LevelDB add_message failed: %s", error);
128
+        return 1;
129
+    }
130
+
131
+    return 0;
132
+}
133
+
134
+char* leveldb_get_message(struct queue *queue)
135
+{
136
+    return 0;
137
+}
138
+
139
+int leveldb_ack_message(struct queue *queue)
140
+{
141
+    return 0;
142
+}
143
+

+ 4
- 0
leveldb.h View File

@@ -30,4 +30,8 @@
30 30
 extern int leveldb_init(void);
31 31
 extern int leveldb_free(void);
32 32
 
33
+extern int leveldb_add_message(struct queue *queue, char *message);
34
+extern char* leveldb_get_message(struct queue *queue);
35
+extern int leveldb_ack_message(struct queue *queue);
36
+
33 37
 #endif /* _LEVELDB_H_ */

Loading…
Cancel
Save