A fast, lightweight and simple STOMP compatible messaging server
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

server.c 8.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. /*
  2. * Copyright (C) 2011 Bernhard Froehlich <decke@bluelife.at>
  3. * All rights reserved.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions
  7. * are met:
  8. *
  9. * 1. Redistributions of source code must retain the above copyright
  10. * notice, this list of conditions and the following disclaimer.
  11. * 2. Author's name may not be used endorse or promote products derived
  12. * from this software without specific prior written permission.
  13. *
  14. * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
  15. * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  16. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  17. * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
  18. * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  19. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
  20. * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  21. * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
  22. * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
  23. * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  24. * POSSIBILITY OF SUCH DAMAGE.
  25. */
  26. #include <sys/types.h>
  27. #include <sys/socket.h>
  28. #include <netinet/in.h>
  29. #include <arpa/inet.h>
  30. #include <signal.h>
  31. #include <limits.h>
  32. /* Required by event.h. */
  33. #include <sys/time.h>
  34. #include <stdlib.h>
  35. #include <stdio.h>
  36. #include <string.h>
  37. #include <fcntl.h>
  38. #include <unistd.h>
  39. #include <errno.h>
  40. #include <err.h>
  41. #include <sys/stat.h>
  42. /* Libevent. */
  43. #include <event2/event.h>
  44. #include <event2/buffer.h>
  45. #include <event2/bufferevent.h>
  46. #include <event2/http.h>
  47. #include <event2/keyvalq_struct.h>
  48. #include "log.h"
  49. #include "util.h"
  50. #include "common.h"
  51. #include "server.h"
  52. #include "client.h"
  53. #include "stomp.h"
  54. #include "stomputil.h"
  55. #include "leveldb.h"
  56. struct event_base *base;
  57. void signal_handler(int sig) {
  58. switch(sig) {
  59. case SIGTERM:
  60. case SIGHUP:
  61. logclose();
  62. logopen(configget("logFile"));
  63. case SIGINT:
  64. event_base_loopbreak(base);
  65. break;
  66. default:
  67. logwarn("Unhandled signal (%d) %s", sig, strsignal(sig));
  68. break;
  69. }
  70. }
  71. /**
  72. * Set a socket to non-blocking mode.
  73. */
  74. int setnonblock(int fd)
  75. {
  76. int flags;
  77. flags = fcntl(fd, F_GETFL);
  78. if (flags < 0)
  79. return flags;
  80. flags |= O_NONBLOCK;
  81. if (fcntl(fd, F_SETFL, flags) < 0)
  82. return -1;
  83. return 0;
  84. }
  85. /**
  86. * Called by libevent when there is data to read.
  87. */
  88. void buffered_on_read(struct bufferevent *bev, void *arg)
  89. {
  90. /* Write back the read buffer. It is important to note that
  91. * bufferevent_write_buffer will drain the incoming data so it
  92. * is effectively gone after we call it. */
  93. struct client *client = (struct client *)arg;
  94. size_t read_len;
  95. client->rawrequest = evbuffer_readln(bufferevent_get_input(bev), &read_len, EVBUFFER_EOL_NUL);
  96. if(read_len >= MAXREQUESTLEN){
  97. client->response_cmd = STOMP_CMD_DISCONNECT;
  98. goto response;
  99. }
  100. if (client->rawrequest == NULL)
  101. goto error;
  102. client->request_headers = calloc(1, sizeof(struct evkeyvalq));
  103. if(client->request_headers == NULL)
  104. goto error;
  105. TAILQ_INIT(client->request_headers);
  106. client->response_buf = evbuffer_new();
  107. if(client->response_buf == NULL)
  108. goto error;
  109. client->response_headers = calloc(1, sizeof(struct evkeyvalq));
  110. if(client->response_headers == NULL)
  111. goto error;
  112. TAILQ_INIT(client->response_headers);
  113. client->request = client->rawrequest;
  114. /* skip leading whitespace */
  115. while(*client->request == '\r' || *client->request == '\n')
  116. *(client->request)++;
  117. if(strstr(client->request, "\r\n\r\n") != NULL){
  118. client->request_body = strstr(client->request, "\r\n\r\n")+4;
  119. }
  120. else if(strstr(client->request, "\n\n") != NULL){
  121. client->request_body = strstr(client->request, "\n\n")+2;
  122. }
  123. if(stomp_parse_headers(client->request_headers, client->request) != 0){
  124. client->response_cmd = STOMP_CMD_DISCONNECT;
  125. goto response;
  126. }
  127. stomp_handle_request(client);
  128. response:
  129. stomp_handle_response(client);
  130. error:
  131. client->request_cmd = STOMP_CMD_NONE;
  132. client->response_cmd = STOMP_CMD_NONE;
  133. client->request_body = NULL;
  134. client->request = NULL;
  135. if(client->response_headers){
  136. free(client->response_headers);
  137. client->response_headers = NULL;
  138. }
  139. if(client->response_buf){
  140. evbuffer_free(client->response_buf);
  141. client->response_buf = NULL;
  142. }
  143. if(client->request_headers){
  144. free(client->request_headers);
  145. client->request_headers = NULL;
  146. }
  147. if(client->rawrequest){
  148. free(client->rawrequest);
  149. client->rawrequest = NULL;
  150. }
  151. }
  152. /**
  153. * Called by libevent when the write buffer reaches 0. We only
  154. * provide this because libevent expects it, but we don't use it.
  155. */
  156. void buffered_on_write(struct bufferevent *bev, void *arg)
  157. {
  158. }
  159. /**
  160. * Called by libevent when there is an error on the underlying socket
  161. * descriptor.
  162. */
  163. void buffered_on_error(struct bufferevent *bev, short what, void *arg)
  164. {
  165. struct client *client = (struct client *)arg;
  166. if (what & BEV_EVENT_EOF) {
  167. loginfo("Client %d disconnected.", client->fd);
  168. }
  169. else {
  170. logwarn("Client %d socket error, disconnecting.", client->fd);
  171. }
  172. client->response_cmd = STOMP_CMD_DISCONNECT;
  173. stomp_free_client(client);
  174. }
  175. /**
  176. * This function will be called by libevent when there is a connection
  177. * ready to be accepted.
  178. */
  179. void on_accept(int fd, short ev, void *arg)
  180. {
  181. int client_fd;
  182. struct sockaddr_in client_addr;
  183. socklen_t client_len = sizeof(client_addr);
  184. struct client *client;
  185. client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
  186. if (client_fd < 0) {
  187. warn("accept failed");
  188. return;
  189. }
  190. /* Set the client socket to non-blocking mode. */
  191. if (setnonblock(client_fd) < 0)
  192. warn("failed to set client socket non-blocking");
  193. /* We've accepted a new client, create a client object. */
  194. client = calloc(1, sizeof(*client));
  195. if (client == NULL)
  196. err(1, "malloc failed");
  197. client->fd = client_fd;
  198. client->bev = bufferevent_socket_new(base, client_fd, BEV_OPT_CLOSE_ON_FREE);
  199. bufferevent_setcb(client->bev, buffered_on_read, buffered_on_write,
  200. buffered_on_error, client);
  201. /* We have to enable it before our callbacks will be
  202. * called. */
  203. bufferevent_enable(client->bev, EV_READ);
  204. }
  205. int main(int argc, char **argv)
  206. {
  207. char config[PATH_MAX] = CONF_FILE;
  208. int listen_fd, ch;
  209. int daemon = 0;
  210. struct sockaddr_in listen_addr;
  211. struct event *ev_accept;
  212. int reuseaddr_on;
  213. pid_t pid, sid;
  214. signal(SIGHUP, signal_handler);
  215. signal(SIGTERM, signal_handler);
  216. signal(SIGINT, signal_handler);
  217. signal(SIGQUIT, signal_handler);
  218. while ((ch = getopt(argc, argv, "d:")) != -1) {
  219. switch (ch) {
  220. case 'd':
  221. daemon = 1;
  222. break;
  223. }
  224. }
  225. if(configparse(config)){
  226. printf("Could not load config file %s\n", config);
  227. exit(EXIT_FAILURE);
  228. }
  229. if(logopen(configget("logFile")) != 0)
  230. exit(EXIT_FAILURE);
  231. logwrite(LOG_INFO, "-------------------------------");
  232. logwrite(LOG_INFO, "%s/%s started", DAEMON_NAME, REDQUEUE_VERSION);
  233. if (daemon) {
  234. pid = fork();
  235. if (pid < 0) {
  236. exit(EXIT_FAILURE);
  237. } else if (pid > 0) {
  238. exit(EXIT_SUCCESS);
  239. }
  240. umask(0);
  241. sid = setsid();
  242. if (sid < 0) {
  243. exit(EXIT_FAILURE);
  244. }
  245. }
  246. TAILQ_INIT(&clients);
  247. TAILQ_INIT(&queues);
  248. #ifdef WITH_LEVELDB
  249. /* Initialize LevelDB */
  250. if(leveldb_init() != 0)
  251. exit(EXIT_FAILURE);
  252. #endif
  253. /* Initialize libevent. */
  254. base = event_base_new();
  255. /* Create our listening socket. */
  256. listen_fd = socket(AF_INET, SOCK_STREAM, 0);
  257. if (listen_fd < 0)
  258. err(1, "listen failed");
  259. memset(&listen_addr, 0, sizeof(listen_addr));
  260. listen_addr.sin_family = AF_INET;
  261. listen_addr.sin_addr.s_addr = INADDR_ANY;
  262. listen_addr.sin_port = htons(atoi(configget("listenPort")));
  263. if (bind(listen_fd, (struct sockaddr *)&listen_addr, sizeof(listen_addr)) < 0)
  264. err(1, "bind failed");
  265. if (listen(listen_fd, 5) < 0)
  266. err(1, "listen failed");
  267. setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, sizeof(reuseaddr_on));
  268. /* Set the socket to non-blocking, this is essential in event
  269. * based programming with libevent. */
  270. if (setnonblock(listen_fd) < 0)
  271. err(1, "failed to set server socket to non-blocking");
  272. /* We now have a listening socket, we create a read event to
  273. * be notified when a client connects. */
  274. ev_accept = event_new(base, listen_fd, EV_READ|EV_PERSIST, on_accept, NULL);
  275. event_add(ev_accept, NULL);
  276. /* Start the event loop. */
  277. event_base_dispatch(base);
  278. shutdown(listen_fd, SHUT_RDWR);
  279. close(listen_fd);
  280. #ifdef WITH_LEVELDB
  281. leveldb_free();
  282. #endif
  283. logclose();
  284. return EXIT_SUCCESS;
  285. }