A fast, lightweight and simple STOMP compatible messaging server
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. /*
  2. * Copyright (C) 2011 Bernhard Froehlich <decke@bluelife.at>
  3. * All rights reserved.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions
  7. * are met:
  8. *
  9. * 1. Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. * 2. Author's name may not be used endorse or promote products derived
  12. * from this software without specific prior written permission.
  13. *
  14. * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
  15. * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  16. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  17. * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
  18. * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  19. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
  20. * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  21. * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
  22. * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
  23. * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  24. * POSSIBILITY OF SUCH DAMAGE.
  25. */
  26. #include <stdlib.h>
  27. #include <stdio.h>
  28. #include <string.h>
  29. #include <sys/queue.h>
  30. #include <unistd.h>
  31. #include <event2/event.h>
  32. #include <event2/buffer.h>
  33. #include <event2/bufferevent.h>
  34. #include <event2/http.h>
  35. #include <event2/keyvalq_struct.h>
  36. #include "log.h"
  37. #include "util.h"
  38. #include "server.h"
  39. #include "client.h"
  40. #include "stomp.h"
  41. #include "stomputil.h"
  42. #include "leveldb.h"
  43. /* internal data structs */
  44. struct CommandHandler
  45. {
  46. enum stomp_cmd cmd;
  47. char command[15];
  48. enum stomp_direction direction;
  49. int (*handler)(struct client *client);
  50. };
  51. struct CommandHandler commandreg[] = {
  52. { STOMP_CMD_NONE, "", STOMP_OUT, NULL },
  53. { STOMP_CMD_DISCONNECT, "", STOMP_OUT, NULL },
  54. { STOMP_CMD_CONNECT, "CONNECT", STOMP_IN, stomp_connect },
  55. { STOMP_CMD_CONNECTED, "CONNECTED", STOMP_OUT, NULL },
  56. { STOMP_CMD_SEND, "SEND", STOMP_IN, stomp_send },
  57. { STOMP_CMD_MESSAGE, "MESSAGE", STOMP_OUT, NULL },
  58. { STOMP_CMD_SUBSCRIBE, "SUBSCRIBE", STOMP_IN, stomp_subscribe },
  59. { STOMP_CMD_UNSUBSCRIBE, "UNSUBSCRIBE", STOMP_IN, NULL },
  60. { STOMP_CMD_ACK, "ACK", STOMP_IN, NULL },
  61. { STOMP_CMD_RECEIPT, "RECEIPT", STOMP_OUT, NULL },
  62. { STOMP_CMD_DISCONNECT, "DISCONNECT", STOMP_IN, stomp_disconnect },
  63. { STOMP_CMD_ERROR, "ERROR", STOMP_OUT, NULL },
  64. };
  65. int stomp_handle_request(struct client *client)
  66. {
  67. int i;
  68. for(i=0; i < sizeof(commandreg)/sizeof(struct CommandHandler); i++){
  69. if(commandreg[i].direction != STOMP_IN || commandreg[i].handler == NULL)
  70. continue;
  71. if(strncmp(client->request, commandreg[i].command, strlen(commandreg[i].command)) == 0){
  72. if(client->authenticated == 0){
  73. if(commandreg[i].cmd != STOMP_CMD_CONNECT && commandreg[i].cmd != STOMP_CMD_DISCONNECT){
  74. client->response_cmd = STOMP_CMD_ERROR;
  75. evhttp_add_header(client->response_headers, "message", "Authentication required");
  76. return 1;
  77. }
  78. }
  79. client->request_cmd = commandreg[i].cmd;
  80. return commandreg[i].handler(client);
  81. }
  82. }
  83. client->response_cmd = STOMP_CMD_ERROR;
  84. evhttp_add_header(client->response_headers, "message", "Unknown command");
  85. return 1;
  86. }
  87. int stomp_handle_response(struct client *client)
  88. {
  89. int i;
  90. int found;
  91. const char *receipt;
  92. struct evkeyval *header;
  93. if(client->response_buf == NULL)
  94. client->response_buf = evbuffer_new();
  95. if(client->request_headers){
  96. receipt = evhttp_find_header(client->request_headers, "receipt");
  97. if(receipt != NULL && client->response_cmd != STOMP_CMD_ERROR){
  98. evbuffer_add_printf(client->response_buf, "RECEIPT\n");
  99. evbuffer_add_printf(client->response_buf, "receipt:%s\n", receipt);
  100. evbuffer_add_printf(client->response_buf, "\n");
  101. evbuffer_add(client->response_buf, "\0", 1);
  102. evhttp_remove_header(client->request_headers, "receipt");
  103. }
  104. }
  105. for(i=0,found=0; i < sizeof(commandreg)/sizeof(struct CommandHandler); i++){
  106. if(commandreg[i].direction != STOMP_OUT)
  107. continue;
  108. if(commandreg[i].cmd == client->response_cmd){
  109. found = 1;
  110. if(commandreg[i].command[0] == '\0')
  111. break;
  112. evbuffer_add_printf(client->response_buf, "%s\n", commandreg[i].command);
  113. TAILQ_FOREACH(header, client->response_headers, next) {
  114. if(strcmp(header->key, "receipt") == 0)
  115. continue;
  116. evbuffer_add_printf(client->response_buf, "%s:%s\n", header->key, header->value);
  117. }
  118. evbuffer_add_printf(client->response_buf, "\n");
  119. if(client->response != NULL){
  120. evbuffer_add(client->response_buf, client->response, strlen(client->response));
  121. }
  122. evbuffer_add(client->response_buf, "\0", 1);
  123. break;
  124. }
  125. }
  126. if(found == 0){
  127. evbuffer_add_printf(client->response_buf, "ERROR\n");
  128. evbuffer_add_printf(client->response_buf, "message:Internal error\n\n");
  129. evbuffer_add(client->response_buf, "\0", 1);
  130. }
  131. bufferevent_write_buffer(client->bev, client->response_buf);
  132. bufferevent_flush(client->bev, EV_WRITE, BEV_FINISHED);
  133. if(client->response_cmd == STOMP_CMD_ERROR || client->response_cmd == STOMP_CMD_DISCONNECT){
  134. stomp_free_client(client);
  135. }
  136. return !found;
  137. }
  138. int stomp_connect(struct client *client)
  139. {
  140. const char *login;
  141. const char *passcode;
  142. if(strlen(configget("authUser")) > 0 && strlen(configget("authPass")) > 0){
  143. login = evhttp_find_header(client->request_headers, "login");
  144. if(login == NULL){
  145. client->response_cmd = STOMP_CMD_ERROR;
  146. evhttp_add_header(client->response_headers, "message", "Authentication failed");
  147. return 1;
  148. }
  149. passcode = evhttp_find_header(client->request_headers, "passcode");
  150. if(passcode == NULL){
  151. client->response_cmd = STOMP_CMD_ERROR;
  152. evhttp_add_header(client->response_headers, "message", "Authentication failed");
  153. return 1;
  154. }
  155. if(strcmp(login, configget("authUser")) != 0 || strcmp(passcode, configget("authPass")) != 0){
  156. client->response_cmd = STOMP_CMD_ERROR;
  157. evhttp_add_header(client->response_headers, "message", "Authentication failed");
  158. return 1;
  159. }
  160. }
  161. if(evhttp_find_header(client->request_headers, "receipt") != NULL){
  162. client->response_cmd = STOMP_CMD_ERROR;
  163. evhttp_add_header(client->response_headers, "message", "Receipt for connect not supported");
  164. return 1;
  165. }
  166. client->authenticated = 1;
  167. client->response_cmd = STOMP_CMD_CONNECTED;
  168. evhttp_add_header(client->response_headers, "session", "0");
  169. return 0;
  170. }
  171. int stomp_disconnect(struct client *client)
  172. {
  173. client->response_cmd = STOMP_CMD_DISCONNECT;
  174. return 0;
  175. }
  176. int stomp_subscribe(struct client *client)
  177. {
  178. struct queue *entry;
  179. const char *queuename;
  180. client->response_cmd = STOMP_CMD_NONE;
  181. queuename = evhttp_find_header(client->request_headers, "destination");
  182. if(queuename == NULL){
  183. client->response_cmd = STOMP_CMD_ERROR;
  184. evhttp_add_header(client->response_headers, "message", "Destination header missing");
  185. return 1;
  186. }
  187. entry = stomp_find_queue(queuename);
  188. if (entry == NULL){
  189. entry = stomp_add_queue(queuename);
  190. if(entry == NULL){
  191. client->response_cmd = STOMP_CMD_ERROR;
  192. evhttp_add_header(client->response_headers, "message", "Could not create destination");
  193. return 1;
  194. }
  195. }
  196. TAILQ_INSERT_TAIL(&entry->subscribers, client, entries);
  197. return 0;
  198. }
  199. int stomp_send(struct client *client)
  200. {
  201. struct client *subscriber;
  202. struct queue *queue;
  203. const char *queuename;
  204. queuename = evhttp_find_header(client->request_headers, "destination");
  205. if(queuename == NULL){
  206. client->response_cmd = STOMP_CMD_ERROR;
  207. evhttp_add_header(client->response_headers, "message", "Destination header missing");
  208. return 1;
  209. }
  210. queue = stomp_find_queue(queuename);
  211. if (queue == NULL){
  212. queue = stomp_add_queue(queuename);
  213. if(queue == NULL){
  214. client->response_cmd = STOMP_CMD_ERROR;
  215. evhttp_add_header(client->response_headers, "message", "Creating destination failed");
  216. return 1;
  217. }
  218. }
  219. #ifdef WITH_LEVELDB
  220. if(strncmp(queuename, "/topic/", 7) != 0){
  221. if(leveldb_add_message(queue, client->request) != 0){
  222. client->response_cmd = STOMP_CMD_ERROR;
  223. evhttp_add_header(client->response_headers, "message", "Storing message failed");
  224. return 1;
  225. }
  226. }
  227. #endif
  228. /* Send it out to the subscribers */
  229. TAILQ_FOREACH(subscriber, &queue->subscribers, entries){
  230. subscriber->response_cmd = STOMP_CMD_MESSAGE;
  231. subscriber->response_headers = client->request_headers;
  232. subscriber->response = client->request_body;
  233. stomp_handle_response(subscriber);
  234. subscriber->response_cmd = STOMP_CMD_NONE;
  235. subscriber->response_headers = NULL;
  236. subscriber->response = NULL;
  237. }
  238. client->response_cmd = STOMP_CMD_NONE;
  239. client->response = NULL;
  240. return 0;
  241. }