A fast, lightweight and simple STOMP compatible messaging server
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

leveldb.c 5.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. /*
  2. * Copyright (C) 2011 Bernhard Froehlich <decke@bluelife.at>
  3. * All rights reserved.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions
  7. * are met:
  8. *
  9. * 1. Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. * 2. Author's name may not be used endorse or promote products derived
  12. * from this software without specific prior written permission.
  13. *
  14. * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
  15. * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  16. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  17. * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
  18. * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  19. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
  20. * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  21. * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
  22. * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
  23. * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  24. * POSSIBILITY OF SUCH DAMAGE.
  25. */
  26. #include <stdio.h>
  27. #include <stdlib.h>
  28. #include <string.h>
  29. /* atomic_fetchadd */
  30. #include <sys/types.h>
  31. #include <machine/atomic.h>
  32. #include <leveldb/c.h>
  33. #include "log.h"
  34. #include "util.h"
  35. #include "client.h"
  36. #include "stomp.h"
  37. #define CheckNoError(err) \
  38. if ((err) != NULL) { \
  39. logerror("%s:%d: %s\n", __FILE__, __LINE__, (err)); \
  40. abort(); \
  41. }
  42. leveldb_t* db;
  43. leveldb_cache_t* cache;
  44. leveldb_env_t* env;
  45. leveldb_options_t* options;
  46. leveldb_readoptions_t* roptions;
  47. leveldb_writeoptions_t* woptions;
  48. int leveldb_init(void)
  49. {
  50. char *error = NULL;
  51. /* Initialize LevelDB */
  52. env = leveldb_create_default_env();
  53. cache = leveldb_cache_create_lru(100000);
  54. options = leveldb_options_create();
  55. leveldb_options_set_cache(options, cache);
  56. leveldb_options_set_env(options, env);
  57. leveldb_options_set_create_if_missing(options, 1);
  58. leveldb_options_set_error_if_exists(options, 0);
  59. roptions = leveldb_readoptions_create();
  60. leveldb_readoptions_set_verify_checksums(roptions, 1);
  61. leveldb_readoptions_set_fill_cache(roptions, 0);
  62. woptions = leveldb_writeoptions_create();
  63. leveldb_writeoptions_set_sync(woptions, 1);
  64. db = leveldb_open(options, configget("dbFile"), &error);
  65. if(error != NULL){
  66. logerror("LevelDB Error: %s", error);
  67. return 1;
  68. }
  69. return 0;
  70. }
  71. int leveldb_free(void)
  72. {
  73. leveldb_close(db);
  74. leveldb_options_destroy(options);
  75. leveldb_readoptions_destroy(roptions);
  76. leveldb_writeoptions_destroy(woptions);
  77. leveldb_cache_destroy(cache);
  78. leveldb_env_destroy(env);
  79. return 0;
  80. }
  81. int leveldb_add_message(struct queue *queue, char *message)
  82. {
  83. leveldb_writebatch_t *wb;
  84. char key[MAXQUEUELEN+10];
  85. char value[16];
  86. char *error = NULL;
  87. int seq;
  88. if(strlen(queue->queuename) >= MAXQUEUELEN){
  89. logerror("LevelDB add_message failed: Queuename too long");
  90. return 1;
  91. }
  92. seq = atomic_fetchadd_int(&queue->write, 1);
  93. wb = leveldb_writebatch_create();
  94. snprintf(key, sizeof(key)-1, "%s.%d", queue->queuename, seq);
  95. key[sizeof(key)-1] = '\0';
  96. leveldb_writebatch_put(wb, key, strlen(key), message, strlen(message));
  97. snprintf(key, sizeof(key)-1, "%s.write", queue->queuename);
  98. key[sizeof(key)-1] = '\0';
  99. sprintf(value, "%d", seq);
  100. leveldb_writebatch_put(wb, key, strlen(key), value, strlen(value));
  101. leveldb_write(db, woptions, wb, &error);
  102. leveldb_writebatch_destroy(wb);
  103. if(error != NULL){
  104. logerror("LevelDB add_message failed: %s", error);
  105. return 1;
  106. }
  107. loginfo("Added message %d to %s: %.20s", queue->write, queue->queuename, message);
  108. return 0;
  109. }
  110. int leveldb_load_queue(struct queue *queue)
  111. {
  112. char key[MAXQUEUELEN+10];
  113. char *value;
  114. char *error = NULL;
  115. size_t value_len;
  116. if(strlen(queue->queuename) >= MAXQUEUELEN){
  117. logerror("LevelDB add_message failed: Queuename too long");
  118. return 1;
  119. }
  120. /* queuename.read */
  121. snprintf(key, sizeof(key)-1, "%s.read", queue->queuename);
  122. key[sizeof(key)-1] = '\0';
  123. value = leveldb_get(db, roptions, key, strlen(key), &value_len, &error);
  124. if(error != NULL){
  125. logerror("LevelDB load_queue failed: %s", error);
  126. return 1;
  127. }
  128. if(value != NULL){
  129. queue->read = atoi(value)+1;
  130. free(value);
  131. value = NULL;
  132. }
  133. else
  134. queue->read = 1;
  135. /* queuename.write */
  136. snprintf(key, sizeof(key)-1, "%s.write", queue->queuename);
  137. key[sizeof(key)-1] = '\0';
  138. value = leveldb_get(db, roptions, key, strlen(key), &value_len, &error);
  139. if(error != NULL){
  140. logerror("LevelDB load_queue failed: %s", error);
  141. return 1;
  142. }
  143. if(value != NULL){
  144. queue->write = atoi(value)+1;
  145. free(value);
  146. value = NULL;
  147. }
  148. else
  149. queue->write = 1;
  150. return 0;
  151. }
  152. char* leveldb_get_message(struct queue *queue)
  153. {
  154. return 0;
  155. }
  156. int leveldb_ack_message(struct queue *queue)
  157. {
  158. return 0;
  159. }