Browse Source

- First round of refactoring

master
decke 7 years ago
parent
commit
ed0fdae57b
7 changed files with 336 additions and 175 deletions
  1. 1
    1
      Makefile
  2. 67
    0
      client.h
  3. 51
    0
      common.c
  4. 32
    0
      common.h
  5. 30
    174
      server.c
  6. 37
    0
      server.h
  7. 118
    0
      stomp.c

+ 1
- 1
Makefile View File

@@ -7,7 +7,7 @@ LOCALBASE?=/usr/local
CPPFLAGS=-I${LOCALBASE}/include -g -Wall
LDFLAGS=-L${LOCALBASE}/lib -L${LOCALBASE}/lib/event2

SRC = server.c
SRC = server.c common.c stomp.c
OBJS = ${SRC:.c=.o}

all: redqueue

+ 67
- 0
client.h View File

@@ -0,0 +1,67 @@
/*
* Copyright (C) 2011 Bernhard Froehlich <decke@bluelife.at>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Author's name may not be used endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef _CLIENT_H_
#define _CLIENT_H_

#include <sys/queue.h>

/**
* A struct for client specific data, also includes
* pointer to create a list of clients.
*/
struct client {
/* The clients socket. */
int fd;

/* The bufferedevent for this client. */
struct bufferevent *buf_in;

/* The output buffer for this client. */
struct evbuffer *buf_out;

/* Plain request */
char *request;

/* Parsed Headers */
struct evkeyvalq *headers;

TAILQ_ENTRY(client) entries;
};

TAILQ_HEAD(, client) clients;


struct queue {
char *queuename;

TAILQ_HEAD(, client) subscribers;
TAILQ_ENTRY(queue) entries;
};

TAILQ_HEAD(, queue) queues;
#endif /* _CLIENT_H_ */

+ 51
- 0
common.c View File

@@ -0,0 +1,51 @@
/*
* Copyright (C) 2011 Bernhard Froehlich <decke@bluelife.at>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Author's name may not be used endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include <string.h>
#include <sys/queue.h>

#include <event2/buffer.h>

#include "common.h"
#include "client.h"

int find_readers(char *queuename, struct evbuffer *evb)
{
struct queue *entry, *tmp_entry;
char buf[MAX_BUF];

*buf = '\0';
for (entry = TAILQ_FIRST(&queues); entry != NULL; entry = tmp_entry) {
tmp_entry = TAILQ_NEXT(entry, entries);
if (strncmp(queuename, entry->queuename, strlen(entry->queuename)) == 0){
evbuffer_add_printf(evb, "queue: %s", queuename);
//TAILQ_REMOVE(&readers, entry, entries);
return 1;
}
}
return 0;
}


+ 32
- 0
common.h View File

@@ -0,0 +1,32 @@
/*
* Copyright (C) 2011 Bernhard Froehlich <decke@bluelife.at>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Author's name may not be used endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef _COMMON_H_
#define _COMMON_H_

#define MAX_BUF 16384
#endif /* _COMMON_H_ */

+ 30
- 174
server.c View File

@@ -41,7 +41,6 @@
#include <unistd.h>
#include <errno.h>
#include <err.h>
#include <sys/queue.h>
#include <sys/stat.h>

/* Libevent. */
@@ -54,30 +53,9 @@
/* LevelDB */
#include <leveldb/c.h>

#define SERVER_PORT 8080
#define MAX_BUF 16384

/**
* A struct for client specific data, also includes pointer to create
* a list of clients.
*/
struct client {
/* The clients socket. */
int fd;

/* The bufferedevent for this client. */
struct bufferevent *buf_ev;

TAILQ_ENTRY(client) entries;
};
TAILQ_HEAD(, client) clients;

struct queue {
char *queuename;
TAILQ_HEAD(, client) subscribers;
TAILQ_ENTRY(queue) entries;
};
TAILQ_HEAD(, queue) queues;
#include "common.h"
#include "server.h"
#include "client.h"

struct event_base *base;

@@ -119,64 +97,6 @@ int setnonblock(int fd)
return 0;
}

int find_readers(char *queuename, struct evbuffer *evb)
{
struct queue *entry, *tmp_entry;
char buf[MAX_BUF];

*buf = '\0';
for (entry = TAILQ_FIRST(&queues); entry != NULL; entry = tmp_entry) {
tmp_entry = TAILQ_NEXT(entry, entries);
if (strncmp(queuename, entry->queuename, strlen(entry->queuename)) == 0){
evbuffer_add_printf(evb, "queue: %s", queuename);
//TAILQ_REMOVE(&readers, entry, entries);
return 1;
}
}
return 0;
}

int stomp_parse_headers(struct evkeyvalq* headers, struct evbuffer* buffer)
{
char *line;
size_t line_length;
char *skey, *svalue;

TAILQ_INIT(headers);

while ((line = evbuffer_readln(buffer, &line_length, EVBUFFER_EOL_CRLF)) != NULL) {
skey = NULL;
svalue = NULL;

if (*line == '\0') {
free(line);
return 0;
}

/* Processing of header lines */
svalue = line;
skey = strsep(&svalue, ":");
if (svalue == NULL){
free(line);
return 1;
}

svalue += strspn(svalue, " ");

printf("HEADER: <%s> <%s>\n", skey, svalue);

if (evhttp_add_header(headers, skey, svalue) == -1){
free(line);
return 2;
}

free(line);
}

return 0;
}


/**
* Called by libevent when there is data to read.
*/
@@ -185,80 +105,45 @@ void buffered_on_read(struct bufferevent *bev, void *arg)
/* Write back the read buffer. It is important to note that
* bufferevent_write_buffer will drain the incoming data so it
* is effectively gone after we call it. */
struct client *cli = (struct client *)arg;
struct queue *entry, *tmp_entry;
struct evbuffer *evb, *evb2;
char *request, *header_begin;
struct evkeyvalq *headers;
const char *queuename;
struct client *client = (struct client *)arg;
struct evbuffer *evb;
char *header_begin;
request = evbuffer_readln(bufferevent_get_input(bev), NULL, EVBUFFER_EOL_NUL);
if (request == NULL) {
client->request = evbuffer_readln(bufferevent_get_input(bev), NULL, EVBUFFER_EOL_NUL);
if (client->request == NULL) {
return;
}
header_begin = strstr(request, "\r\n");
header_begin = strstr(client->request, "\r\n");
if(header_begin == NULL){
free(request);
free(client->request);
return;
}

client->buf_out = evbuffer_new();
evb = evbuffer_new();
evb2 = evbuffer_new();

evbuffer_prepend(evb2, header_begin+2, strlen(header_begin+2));
evbuffer_prepend(evb, header_begin+2, strlen(header_begin+2));

headers = calloc(1, sizeof(struct evkeyvalq));
if(headers == NULL){
goto error;
if(stomp_parse_headers(client->headers, evb) != 0){
evbuffer_add_printf(client->buf_out, "Invalid Request\n");
}

if(stomp_parse_headers(headers, evb2) != 0){
evbuffer_add_printf(evb, "Invalid Request\n");
}
else if (strncmp(request, "SUBSCRIBE", 9) == 0) {
queuename = evhttp_find_header(headers, "destination");
if(queuename == NULL){
evbuffer_add_printf(evb, "Destination header missing\n");
goto error;
}

for (entry = TAILQ_FIRST(&queues); entry != NULL; entry = tmp_entry) {
tmp_entry = TAILQ_NEXT(entry, entries);
if (strcmp(entry->queuename, queuename) == 0){
evbuffer_add_printf(evb, "queue %s found\n", queuename);
entry = tmp_entry;
break;
}
}

if (entry == NULL){
entry = malloc(sizeof(*entry));
entry->queuename = malloc(strlen(queuename)+1);
strcpy(entry->queuename, queuename);
TAILQ_INIT(&entry->subscribers);
TAILQ_INSERT_TAIL(&queues, entry, entries);
evbuffer_add_printf(evb, "queue %s created\n", queuename);
}

/* TODO: check if already subscribed */

TAILQ_INSERT_TAIL(&entry->subscribers, cli, entries);
}
else if (strncmp(request, "exit", 4) == 0 || strncmp(request, "quit", 4) == 0) {
evbuffer_add_printf(evb, "ok bye\n");
shutdown(cli->fd, SHUT_RDWR);
else if (strncmp(client->request, "SUBSCRIBE", 9) == 0) {
/* TODO: improve interface API for handlers */
}
else if (strncmp(client->request, "exit", 4) == 0 || strncmp(client->request, "quit", 4) == 0) {
evbuffer_add_printf(client->buf_out, "ok bye\n");
shutdown(client->fd, SHUT_RDWR);
}
else {
evbuffer_add_printf(evb, "error unknown command\n");
evbuffer_add_printf(client->buf_out, "error unknown command\n");
}

error:
bufferevent_write_buffer(bev, evb);
evbuffer_free(evb2);
bufferevent_write_buffer(bev, client->buf_out);
evbuffer_free(evb);
free(request);
free(headers);
evbuffer_free(client->buf_out);
free(client->request);
free(client->headers);
}

/**
@@ -297,7 +182,7 @@ void buffered_on_error(struct bufferevent *bev, short what, void *arg)

/* TODO: remove from subscribers */

bufferevent_free(client->buf_ev);
bufferevent_free(client->buf_in);
close(client->fd);
free(client);
}
@@ -328,42 +213,13 @@ void on_accept(int fd, short ev, void *arg)
if (client == NULL)
err(1, "malloc failed");
client->fd = client_fd;
/* Create the buffered event.
*
* The first argument is the file descriptor that will trigger
* the events, in this case the clients socket.
*
* The second argument is the callback that will be called
* when data has been read from the socket and is available to
* the application.
*
* The third argument is a callback to a function that will be
* called when the write buffer has reached a low watermark.
* That usually means that when the write buffer is 0 length,
* this callback will be called. It must be defined, but you
* don't actually have to do anything in this callback.
*
* The fourth argument is a callback that will be called when
* there is a socket error. This is where you will detect
* that the client disconnected or other socket errors.
*
* The fifth and final argument is to store an argument in
* that will be passed to the callbacks. We store the client
* object here.
*/
/*
client->buf_ev = bufferevent_new(client_fd, buffered_on_read,
buffered_on_write, buffered_on_error, client);
*/

client->buf_ev = bufferevent_socket_new(base, client_fd, 0);
bufferevent_setcb(client->buf_ev, buffered_on_read, buffered_on_write,
client->buf_in = bufferevent_socket_new(base, client_fd, 0);
bufferevent_setcb(client->buf_in, buffered_on_read, buffered_on_write,
buffered_on_error, client);

/* We have to enable it before our callbacks will be
* called. */
bufferevent_enable(client->buf_ev, EV_READ);
bufferevent_enable(client->buf_in, EV_READ);
}

int main(int argc, char **argv)

+ 37
- 0
server.h View File

@@ -0,0 +1,37 @@
/*
* Copyright (C) 2011 Bernhard Froehlich <decke@bluelife.at>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Author's name may not be used endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef _SERVER_H_
#define _SERVER_H_

#define REDQUEUE_VERSION "0.0.1"
#define DAEMON_NAME "redqd"
#define CONF_FILE "redqd.conf"
#define PID_FILE "/var/run/redqd.pid"

#define SERVER_PORT 8080
#endif /* _SERVER_H_ */

+ 118
- 0
stomp.c View File

@@ -0,0 +1,118 @@
/*
* Copyright (C) 2011 Bernhard Froehlich <decke@bluelife.at>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Author's name may not be used endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include <stdlib.h>
#include <string.h>
#include <sys/queue.h>

#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/http.h>
#include <event2/keyvalq_struct.h>

#include "client.h"


int stomp_subscribe(struct client *client)
{
struct queue *entry, *tmp_entry;
const char *queuename;

queuename = evhttp_find_header(client->headers, "destination");
if(queuename == NULL){
evbuffer_add_printf(client->buf_out, "Destination header missing\n");
return 1;
}
for (entry = TAILQ_FIRST(&queues); entry != NULL; entry = tmp_entry) {
tmp_entry = TAILQ_NEXT(entry, entries);
if (strcmp(entry->queuename, queuename) == 0){
evbuffer_add_printf(client->buf_out, "queue %s found\n", queuename);
entry = tmp_entry;
break;
}
}

if (entry == NULL){
entry = malloc(sizeof(*entry));
entry->queuename = malloc(strlen(queuename)+1);
strcpy(entry->queuename, queuename);
TAILQ_INIT(&entry->subscribers);
TAILQ_INSERT_TAIL(&queues, entry, entries);
evbuffer_add_printf(client->buf_out, "queue %s created\n", queuename);
}

/* TODO: check if already subscribed */

TAILQ_INSERT_TAIL(&entry->subscribers, client, entries);

return 0;
}


int stomp_parse_headers(struct evkeyvalq* headers, struct evbuffer* buffer)
{
char *line;
size_t line_length;
char *skey, *svalue;

headers = calloc(1, sizeof(struct evkeyvalq));
if(headers == NULL){
return 1;
}

TAILQ_INIT(headers);

while ((line = evbuffer_readln(buffer, &line_length, EVBUFFER_EOL_CRLF)) != NULL) {
skey = NULL;
svalue = NULL;

if (*line == '\0') {
free(line);
return 0;
}

/* Processing of header lines */
svalue = line;
skey = strsep(&svalue, ":");
if (svalue == NULL){
free(line);
return 2;
}

svalue += strspn(svalue, " ");

if (evhttp_add_header(headers, skey, svalue) == -1){
free(line);
return 1;
}

free(line);
}

return 0;
}


Loading…
Cancel
Save