Skip to content

Instantly share code, notes, and snippets.

@simonsj
Last active December 15, 2015 06:39
Show Gist options
  • Select an option

  • Save simonsj/5217839 to your computer and use it in GitHub Desktop.

Select an option

Save simonsj/5217839 to your computer and use it in GitHub Desktop.
Fun.
#!/bin/env ruby
#
# gen.rb --
s = '{ "app": "github", "error":"something bad happened" }'
require 'socket'
1000.times do |i|
puts i
socket = UNIXSocket.new("/tmp/needle.socket")
socket.puts(s)
socket.close
#sleep 0.01
end
#include <errno.h>
#include <fcntl.h>
#include <poll.h>
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#define MSGMAX (1024)
#define MSGSIZE (1024)
#define SOCKNAME "/tmp/needle.socket"
typedef struct {
int fd;
int offset;
char buf[MSGSIZE];
} Message;
static Message Messages[MSGMAX];
static struct pollfd PollFds[MSGMAX + 1];
int bindFd;
nfds_t populate_pollfd() {
PollFds[0].fd = bindFd;
PollFds[0].events = POLLIN;
PollFds[0].revents = 0x0;
nfds_t p = 1; // index into PollFds
int i = 0;
for (i = 0; i < MSGMAX; i++) {
if (Messages[i].fd != 0) {
printf("found message[%d], setting pollfds (%d)\n", i, Messages[i].fd);
PollFds[p].fd = Messages[i].fd;
PollFds[p].events = POLLIN;
PollFds[p].revents = 0x0;
p += 1;
}
}
return p;
}
Message *find_message(int fd) {
int i = 0;
for (i = 0; i < MSGMAX; i++) {
if (Messages[i].fd == fd) {
return &Messages[i];
}
}
return NULL;
}
Message *init_message(int fd) {
int i = 0;
for (i = 0; i < MSGMAX; i++) {
if (Messages[i].fd == 0) {
printf("setting messages[%d]\n", i);
Messages[i].fd = fd;
Messages[i].offset = 0;
memset(Messages[i].buf, 0x0, sizeof(Messages[i].buf));
return &Messages[i];
}
}
return NULL;
}
void process_pollfd(int numFds) {
int f = 0;
if (PollFds[0].events & PollFds[0].revents) {
printf("0 events: 0x%x\n", PollFds[0].revents);
struct sockaddr_un clientAddr;
int clientLen;
int fd2 = accept(bindFd, (struct sockaddr *)&clientAddr, &clientLen);
if (fd2 < 0) {
//goto erraccept;
printf("accept failed\n");
}
int fd2Flags = fcntl(fd2, F_GETFL);
if (fcntl(fd2, F_SETFL, fd2Flags | O_NONBLOCK) == -1) {
//goto errsetnonblock;
printf("SETFL failed\n");
}
Message *m = init_message(fd2);
if (!m) {
printf("couldnt' init_message");
close(fd2);
}
}
int i = 1;
for (i = 1; i < MSGMAX; i++) {
if ((PollFds[i].fd != 0) &&
(PollFds[i].events & PollFds[i].revents)) {
printf("something happened for fd %d\n", i);
Message *m = find_message(PollFds[i].fd);
if (!m) {
printf("couldn't find message\n");
close(PollFds[i].fd);
memset(&PollFds[i], 0x0, sizeof(struct pollfd));
continue;
}
int r = read(PollFds[i].fd, m->buf + m->offset, MSGSIZE - m->offset - 1);
printf("read: %d\n", r);
if (r == -1) {
if (errno == EAGAIN) {
printf("eagain: %d\n", r);
} else {
printf("error reading\n");
close(PollFds[i].fd);
memset(&PollFds[i], 0x0, sizeof(struct pollfd));
}
} else if (r == 0) {
/* process */
m->buf[m->offset] = '\0';
printf("done: %s\n", m->buf);
close(PollFds[i].fd);
memset(&PollFds[i], 0x0, sizeof(struct pollfd));
memset(m, 0x0, sizeof(*m));
} else {
m->offset += r;
}
}
}
}
int main(int argc, char **argv) {
int ret = 0;
bindFd = socket(PF_UNIX, SOCK_STREAM, 0);
if (bindFd == -1) {
printf("couldn't open socket\n");
ret = -1;
goto errfd;
}
struct sockaddr_un sa;
sa.sun_family = AF_UNIX;
strcpy(sa.sun_path, SOCKNAME);
int len = SUN_LEN(&sa);
unlink(SOCKNAME);
if (0 != bind(bindFd, (struct sockaddr *)&sa, len)) {
printf("couldn't bind\n");
goto errbind;
}
if (0 != listen(bindFd, 128)) {
printf("couldn't listen\n");
goto errlisten;
}
nfds_t numFds = populate_pollfd();
int pollRet = 0;
while ((pollRet = poll(PollFds, numFds, -1)) >= 0) {
printf("pollRet: %d\n", pollRet);
/*
struct sockaddr_un clientAddr;
int clientLen;
int fd2 = accept(fd, (struct sockaddr *)&clientAddr, &clientLen);
if (fd2 < 0) {
goto erraccept;
}
int fd2Flags = fcntl(fd2, F_GETFL);
if (fcntl(fd2, F_SETFL, fd2Flags | O_NONBLOCK) == -1) {
goto errsetnonblock;
}
*/
process_pollfd(pollRet);
numFds = populate_pollfd();
}
if (pollRet == -1) {
printf("poll returned -1\n");
}
/*
char buf[1024];
memset(&buf, 0x0, sizeof(buf));
ret = read(fd2, &buf, sizeof(buf));
while (ret > 0) {
printf("%s", buf);
memset(&buf, 0x0, sizeof(buf));
ret = read(fd2, &buf, sizeof(buf));
}
*/
errsetnonblock:
//close(fd2);
errlisten:
erraccept:
errbind:
close(bindFd);
errfd:
return ret;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment