Use a separate event structure to handle meta data writes.

Make meta socket events persistent.
This commit is contained in:
Guus Sliepen 2007-05-17 20:20:10 +00:00
parent 17c8033029
commit 0f6f54ff8a
5 changed files with 50 additions and 39 deletions

View file

@ -101,6 +101,7 @@ typedef struct connection_t {
int outbufstart; /* index of first meaningful byte in output buffer */ int outbufstart; /* index of first meaningful byte in output buffer */
int outbuflen; /* number of meaningful bytes in output buffer */ int outbuflen; /* number of meaningful bytes in output buffer */
int outbufsize; /* number of bytes allocated to output buffer */ int outbufsize; /* number of bytes allocated to output buffer */
struct event outev; /* events on this metadata connection */
time_t last_ping_time; /* last time we saw some activity from the other end or pinged them */ time_t last_ping_time; /* last time we saw some activity from the other end or pinged them */
time_t last_flushed_time; /* last time buffer was empty. Only meaningful if outbuflen > 0 */ time_t last_flushed_time; /* last time buffer was empty. Only meaningful if outbuflen > 0 */

View file

@ -44,8 +44,13 @@ bool send_meta(connection_t *c, const char *buffer, int length)
ifdebug(META) logger(LOG_DEBUG, _("Sending %d bytes of metadata to %s (%s)"), length, ifdebug(META) logger(LOG_DEBUG, _("Sending %d bytes of metadata to %s (%s)"), length,
c->name, c->hostname); c->name, c->hostname);
if(!c->outbuflen) if(!c->outbuflen) {
c->last_flushed_time = now; c->last_flushed_time = now;
if(event_add(&c->outev, NULL) < 0) {
logger(LOG_EMERG, _("event_add failed: %s"), strerror(errno));
abort();
}
}
/* Find room in connection's buffer */ /* Find room in connection's buffer */
if(length + c->outbuflen > c->outbufsize) { if(length + c->outbuflen > c->outbufsize) {
@ -79,8 +84,9 @@ bool send_meta(connection_t *c, const char *buffer, int length)
return true; return true;
} }
bool flush_meta(connection_t *c) void flush_meta(int fd, short events, void *data)
{ {
connection_t *c = data;
int result; int result;
ifdebug(META) logger(LOG_DEBUG, _("Flushing %d bytes to %s (%s)"), ifdebug(META) logger(LOG_DEBUG, _("Flushing %d bytes to %s (%s)"),
@ -98,22 +104,24 @@ bool flush_meta(connection_t *c)
} else if(errno == EWOULDBLOCK) { } else if(errno == EWOULDBLOCK) {
ifdebug(CONNECTIONS) logger(LOG_DEBUG, _("Flushing %d bytes to %s (%s) would block"), ifdebug(CONNECTIONS) logger(LOG_DEBUG, _("Flushing %d bytes to %s (%s) would block"),
c->outbuflen, c->name, c->hostname); c->outbuflen, c->name, c->hostname);
return true; return;
#endif #endif
} else { } else {
logger(LOG_ERR, _("Flushing meta data to %s (%s) failed: %s"), c->name, logger(LOG_ERR, _("Flushing meta data to %s (%s) failed: %s"), c->name,
c->hostname, strerror(errno)); c->hostname, strerror(errno));
} }
return false; terminate_connection(c, c->status.active);
return;
} }
c->outbufstart += result; c->outbufstart += result;
c->outbuflen -= result; c->outbuflen -= result;
} }
event_del(&c->outev);
c->outbufstart = 0; /* avoid unnecessary memmoves */ c->outbufstart = 0; /* avoid unnecessary memmoves */
return true;
} }
void broadcast_meta(connection_t *from, const char *buffer, int length) void broadcast_meta(connection_t *from, const char *buffer, int length)

View file

@ -27,7 +27,7 @@
extern bool send_meta(struct connection_t *, const char *, int); extern bool send_meta(struct connection_t *, const char *, int);
extern void broadcast_meta(struct connection_t *, const char *, int); extern void broadcast_meta(struct connection_t *, const char *, int);
extern bool flush_meta(struct connection_t *); extern void flush_meta(int fd, short events, void *data);
extern bool receive_meta(struct connection_t *); extern bool receive_meta(struct connection_t *);
#endif /* __TINC_META_H__ */ #endif /* __TINC_META_H__ */

View file

@ -128,17 +128,9 @@ static int build_fdset(void)
connection_del(c); connection_del(c);
if(!connection_tree->head) if(!connection_tree->head)
purge(); purge();
} else {
short events = EV_READ;
if(c->outbuflen > 0)
events |= EV_WRITE;
event_del(&c->ev);
event_set(&c->ev, c->socket, events,
handle_meta_connection_data, c);
if (event_add(&c->ev, NULL) < 0)
return -1;
} }
} }
return 0; return 0;
} }
@ -168,6 +160,8 @@ void terminate_connection(connection_t *c, bool report)
if(c->socket) if(c->socket)
closesocket(c->socket); closesocket(c->socket);
event_del(&c->ev);
if(c->edge) { if(c->edge) {
if(report && !tunnelserver) if(report && !tunnelserver)
send_del_edge(broadcast, c->edge); send_del_edge(broadcast, c->edge);
@ -274,33 +268,25 @@ void handle_meta_connection_data(int fd, short events, void *data)
if (c->status.remove) if (c->status.remove)
return; return;
if (events & EV_READ) { if(c->status.connecting) {
if(c->status.connecting) { c->status.connecting = false;
c->status.connecting = false; getsockopt(c->socket, SOL_SOCKET, SO_ERROR, &result, &len);
getsockopt(c->socket, SOL_SOCKET, SO_ERROR, &result, &len);
if(!result) if(!result)
finish_connecting(c); finish_connecting(c);
else { else {
ifdebug(CONNECTIONS) logger(LOG_DEBUG, ifdebug(CONNECTIONS) logger(LOG_DEBUG,
_("Error while connecting to %s (%s): %s"), _("Error while connecting to %s (%s): %s"),
c->name, c->hostname, strerror(result)); c->name, c->hostname, strerror(result));
closesocket(c->socket); closesocket(c->socket);
do_outgoing_connection(c); do_outgoing_connection(c);
return;
}
}
if (!receive_meta(c)) {
terminate_connection(c, c->status.active);
return; return;
} }
} }
if (events & EV_WRITE) { if (!receive_meta(c)) {
if(!flush_meta(c)) { terminate_connection(c, c->status.active);
terminate_connection(c, c->status.active); return;
}
} }
} }

View file

@ -395,6 +395,14 @@ void setup_outgoing_connection(outgoing_t *outgoing)
c->outgoing = outgoing; c->outgoing = outgoing;
c->last_ping_time = now; c->last_ping_time = now;
event_set(&c->ev, c->socket, EV_READ | EV_PERSIST, handle_meta_connection_data, c);
event_set(&c->outev, c->socket, EV_WRITE | EV_PERSIST, flush_meta, c);
if(event_add(&c->ev, NULL) < 0) {
logger(LOG_ERR, _("event_add failed: %s"), strerror(errno));
connection_del(c);
return;
}
connection_add(c); connection_add(c);
do_outgoing_connection(c); do_outgoing_connection(c);
@ -416,8 +424,8 @@ void handle_new_meta_connection(int sock, short events, void *data)
fd = accept(sock, &sa.sa, &len); fd = accept(sock, &sa.sa, &len);
if(fd < 0) { if(fd < 0) {
logger(LOG_ERR, _("Accepting a new connection failed: %s"), logger(LOG_ERR, _("Accepting a new connection failed: %s"), strerror(errno));
strerror(errno)); return;
} }
sockaddrunmap(&sa); sockaddrunmap(&sa);
@ -436,6 +444,14 @@ void handle_new_meta_connection(int sock, short events, void *data)
ifdebug(CONNECTIONS) logger(LOG_NOTICE, _("Connection from %s"), c->hostname); ifdebug(CONNECTIONS) logger(LOG_NOTICE, _("Connection from %s"), c->hostname);
event_set(&c->ev, c->socket, EV_READ | EV_PERSIST, handle_meta_connection_data, c);
event_set(&c->outev, c->socket, EV_WRITE | EV_PERSIST, flush_meta, c);
if(event_add(&c->ev, NULL) < 0) {
logger(LOG_ERR, _("event_add failed: %s"), strerror(errno));
connection_del(c);
return;
}
configure_tcp(c); configure_tcp(c);
connection_add(c); connection_add(c);