diff options
| -rw-r--r-- | proxy.c | 828 |
1 files changed, 434 insertions, 394 deletions
| @@ -4,33 +4,33 @@ | |||
| 4 | $Id$ */ | 4 | $Id$ */ |
| 5 | 5 | ||
| 6 | /* System */ | 6 | /* System */ |
| 7 | #include <arpa/inet.h> | ||
| 8 | #include <ctype.h> | ||
| 9 | #include <errno.h> | ||
| 10 | #include <pthread.h> | ||
| 11 | #include <pwd.h> | ||
| 12 | #include <signal.h> | ||
| 7 | #include <stdint.h> | 13 | #include <stdint.h> |
| 14 | #include <stdio.h> | ||
| 8 | #include <stdlib.h> | 15 | #include <stdlib.h> |
| 9 | #include <string.h> | 16 | #include <string.h> |
| 10 | #include <arpa/inet.h> | ||
| 11 | #include <sys/socket.h> | 17 | #include <sys/socket.h> |
| 12 | #include <unistd.h> | 18 | #include <unistd.h> |
| 13 | #include <errno.h> | ||
| 14 | #include <signal.h> | ||
| 15 | #include <stdio.h> | ||
| 16 | #include <pwd.h> | ||
| 17 | #include <ctype.h> | ||
| 18 | #include <pthread.h> | ||
| 19 | 19 | ||
| 20 | /* Libowfat */ | 20 | /* Libowfat */ |
| 21 | #include "socket.h" | 21 | #include "byte.h" |
| 22 | #include "io.h" | 22 | #include "io.h" |
| 23 | #include "iob.h" | 23 | #include "iob.h" |
| 24 | #include "byte.h" | ||
| 25 | #include "scan.h" | ||
| 26 | #include "ip6.h" | 24 | #include "ip6.h" |
| 27 | #include "ndelay.h" | 25 | #include "ndelay.h" |
| 26 | #include "scan.h" | ||
| 27 | #include "socket.h" | ||
| 28 | 28 | ||
| 29 | /* Opentracker */ | 29 | /* Opentracker */ |
| 30 | #include "trackerlogic.h" | ||
| 31 | #include "ot_vector.h" | ||
| 32 | #include "ot_mutex.h" | 30 | #include "ot_mutex.h" |
| 33 | #include "ot_stats.h" | 31 | #include "ot_stats.h" |
| 32 | #include "ot_vector.h" | ||
| 33 | #include "trackerlogic.h" | ||
| 34 | 34 | ||
| 35 | #ifndef WANT_SYNC_LIVE | 35 | #ifndef WANT_SYNC_LIVE |
| 36 | #define WANT_SYNC_LIVE | 36 | #define WANT_SYNC_LIVE |
| @@ -40,26 +40,26 @@ | |||
| 40 | ot_ip6 g_serverip; | 40 | ot_ip6 g_serverip; |
| 41 | uint16_t g_serverport = 9009; | 41 | uint16_t g_serverport = 9009; |
| 42 | uint32_t g_tracker_id; | 42 | uint32_t g_tracker_id; |
| 43 | char groupip_1[4] = { 224,0,23,5 }; | 43 | char groupip_1[4] = {224, 0, 23, 5}; |
| 44 | int g_self_pipe[2]; | 44 | int g_self_pipe[2]; |
| 45 | 45 | ||
| 46 | /* If you have more than 10 peers, don't use this proxy | 46 | /* If you have more than 10 peers, don't use this proxy |
| 47 | Use 20 slots for 10 peers to have room for 10 incoming connection slots | 47 | Use 20 slots for 10 peers to have room for 10 incoming connection slots |
| 48 | */ | 48 | */ |
| 49 | #define MAX_PEERS 20 | 49 | #define MAX_PEERS 20 |
| 50 | 50 | ||
| 51 | #define LIVESYNC_INCOMING_BUFFSIZE (256*256) | 51 | #define LIVESYNC_INCOMING_BUFFSIZE (256 * 256) |
| 52 | #define STREAMSYNC_OUTGOING_BUFFSIZE (256*256) | 52 | #define STREAMSYNC_OUTGOING_BUFFSIZE (256 * 256) |
| 53 | 53 | ||
| 54 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 | 54 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1480 |
| 55 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) | 55 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer) + sizeof(ot_hash)) |
| 56 | #define LIVESYNC_MAXDELAY 15 /* seconds */ | 56 | #define LIVESYNC_MAXDELAY 15 /* seconds */ |
| 57 | 57 | ||
| 58 | /* The amount of time a complete sync cycle should take */ | 58 | /* The amount of time a complete sync cycle should take */ |
| 59 | #define OT_SYNC_INTERVAL_MINUTES 2 | 59 | #define OT_SYNC_INTERVAL_MINUTES 2 |
| 60 | 60 | ||
| 61 | /* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */ | 61 | /* So after each bucket wait 1 / OT_BUCKET_COUNT intervals */ |
| 62 | #define OT_SYNC_SLEEP ( ( ( OT_SYNC_INTERVAL_MINUTES ) * 60 * 1000000 ) / ( OT_BUCKET_COUNT ) ) | 62 | #define OT_SYNC_SLEEP (((OT_SYNC_INTERVAL_MINUTES) * 60 * 1000000) / (OT_BUCKET_COUNT)) |
| 63 | 63 | ||
| 64 | enum { OT_SYNC_PEER4, OT_SYNC_PEER6 }; | 64 | enum { OT_SYNC_PEER4, OT_SYNC_PEER6 }; |
| 65 | enum { FLAG_SERVERSOCKET = 1 }; | 65 | enum { FLAG_SERVERSOCKET = 1 }; |
| @@ -75,151 +75,153 @@ static uint8_t *g_peerbuffer_pos; | |||
| 75 | static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS; | 75 | static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS; |
| 76 | static ot_time g_next_packet_time; | 76 | static ot_time g_next_packet_time; |
| 77 | 77 | ||
| 78 | static void * livesync_worker( void * args ); | 78 | static void *livesync_worker(void *args); |
| 79 | static void * streamsync_worker( void * args ); | 79 | static void *streamsync_worker(void *args); |
| 80 | static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ); | 80 | static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer); |
| 81 | 81 | ||
| 82 | void exerr( char * message ) { | 82 | void exerr(char *message) { |
| 83 | fprintf( stderr, "%s\n", message ); | 83 | fprintf(stderr, "%s\n", message); |
| 84 | exit( 111 ); | 84 | exit(111); |
| 85 | } | 85 | } |
| 86 | 86 | ||
| 87 | void stats_issue_event( ot_status_event event, PROTO_FLAG proto, uintptr_t event_data ) { | 87 | void stats_issue_event(ot_status_event event, PROTO_FLAG proto, uintptr_t event_data) { |
| 88 | (void) event; | 88 | (void)event; |
| 89 | (void) proto; | 89 | (void)proto; |
| 90 | (void) event_data; | 90 | (void)event_data; |
| 91 | } | 91 | } |
| 92 | 92 | ||
| 93 | void livesync_bind_mcast( ot_ip6 ip, uint16_t port) { | 93 | void livesync_bind_mcast(ot_ip6 ip, uint16_t port) { |
| 94 | char tmpip[4] = {0,0,0,0}; | 94 | char tmpip[4] = {0, 0, 0, 0}; |
| 95 | char *v4ip; | 95 | char *v4ip; |
| 96 | 96 | ||
| 97 | if( !ip6_isv4mapped(ip)) | 97 | if (!ip6_isv4mapped(ip)) |
| 98 | exerr("v6 mcast support not yet available."); | 98 | exerr("v6 mcast support not yet available."); |
| 99 | v4ip = ip+12; | 99 | v4ip = ip + 12; |
| 100 | 100 | ||
| 101 | if( g_socket_in != -1 ) | 101 | if (g_socket_in != -1) |
| 102 | exerr("Error: Livesync listen ip specified twice."); | 102 | exerr("Error: Livesync listen ip specified twice."); |
| 103 | 103 | ||
| 104 | if( ( g_socket_in = socket_udp4( )) < 0) | 104 | if ((g_socket_in = socket_udp4()) < 0) |
| 105 | exerr("Error: Cant create live sync incoming socket." ); | 105 | exerr("Error: Cant create live sync incoming socket."); |
| 106 | ndelay_off(g_socket_in); | 106 | ndelay_off(g_socket_in); |
| 107 | 107 | ||
| 108 | if( socket_bind4_reuse( g_socket_in, tmpip, port ) == -1 ) | 108 | if (socket_bind4_reuse(g_socket_in, tmpip, port) == -1) |
| 109 | exerr("Error: Cant bind live sync incoming socket." ); | 109 | exerr("Error: Cant bind live sync incoming socket."); |
| 110 | 110 | ||
| 111 | if( socket_mcjoin4( g_socket_in, groupip_1, v4ip ) ) | 111 | if (socket_mcjoin4(g_socket_in, groupip_1, v4ip)) |
| 112 | exerr("Error: Cant make live sync incoming socket join mcast group."); | 112 | exerr("Error: Cant make live sync incoming socket join mcast group."); |
| 113 | 113 | ||
| 114 | if( ( g_socket_out = socket_udp4()) < 0) | 114 | if ((g_socket_out = socket_udp4()) < 0) |
| 115 | exerr("Error: Cant create live sync outgoing socket." ); | 115 | exerr("Error: Cant create live sync outgoing socket."); |
| 116 | if( socket_bind4_reuse( g_socket_out, v4ip, port ) == -1 ) | 116 | if (socket_bind4_reuse(g_socket_out, v4ip, port) == -1) |
| 117 | exerr("Error: Cant bind live sync outgoing socket." ); | 117 | exerr("Error: Cant bind live sync outgoing socket."); |
| 118 | 118 | ||
| 119 | socket_mcttl4(g_socket_out, 1); | 119 | socket_mcttl4(g_socket_out, 1); |
| 120 | socket_mcloop4(g_socket_out, 1); | 120 | socket_mcloop4(g_socket_out, 1); |
| 121 | } | 121 | } |
| 122 | 122 | ||
| 123 | size_t add_peer_to_torrent_proxy( ot_hash hash, ot_peer *peer, size_t peer_size ) { | 123 | size_t add_peer_to_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) { |
| 124 | int exactmatch; | 124 | int exactmatch; |
| 125 | ot_torrent *torrent; | 125 | ot_torrent *torrent; |
| 126 | ot_peerlist *peer_list; | 126 | ot_peerlist *peer_list; |
| 127 | ot_peer *peer_dest; | 127 | ot_peer *peer_dest; |
| 128 | ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ); | 128 | ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash); |
| 129 | size_t compare_size = OT_PEER_COMPARE_SIZE_FROM_PEER_SIZE(peer_size); | 129 | size_t compare_size = OT_PEER_COMPARE_SIZE_FROM_PEER_SIZE(peer_size); |
| 130 | 130 | ||
| 131 | torrent = vector_find_or_insert( torrents_list, (void*)hash, sizeof( ot_torrent ), compare_size, &exactmatch ); | 131 | torrent = vector_find_or_insert(torrents_list, (void *)hash, sizeof(ot_torrent), compare_size, &exactmatch); |
| 132 | if( !torrent ) | 132 | if (!torrent) |
| 133 | return -1; | 133 | return -1; |
| 134 | 134 | ||
| 135 | if( !exactmatch ) { | 135 | if (!exactmatch) { |
| 136 | /* Create a new torrent entry, then */ | 136 | /* Create a new torrent entry, then */ |
| 137 | memcpy( torrent->hash, hash, sizeof(ot_hash) ); | 137 | memcpy(torrent->hash, hash, sizeof(ot_hash)); |
| 138 | 138 | ||
| 139 | if( !( torrent->peer_list6 = malloc( sizeof (ot_peerlist) ) ) || | 139 | if (!(torrent->peer_list6 = malloc(sizeof(ot_peerlist))) || !(torrent->peer_list4 = malloc(sizeof(ot_peerlist)))) { |
| 140 | !( torrent->peer_list4 = malloc( sizeof (ot_peerlist) ) ) ) { | 140 | vector_remove_torrent(torrents_list, torrent); |
| 141 | vector_remove_torrent( torrents_list, torrent ); | 141 | mutex_bucket_unlock_by_hash(hash, 0); |
| 142 | mutex_bucket_unlock_by_hash( hash, 0 ); | ||
| 143 | return -1; | 142 | return -1; |
| 144 | } | 143 | } |
| 145 | 144 | ||
| 146 | byte_zero( torrent->peer_list6, sizeof( ot_peerlist ) ); | 145 | byte_zero(torrent->peer_list6, sizeof(ot_peerlist)); |
| 147 | byte_zero( torrent->peer_list4, sizeof( ot_peerlist ) ); | 146 | byte_zero(torrent->peer_list4, sizeof(ot_peerlist)); |
| 148 | } | 147 | } |
| 149 | 148 | ||
| 150 | peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4; | 149 | peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4; |
| 151 | 150 | ||
| 152 | /* Check for peer in torrent */ | 151 | /* Check for peer in torrent */ |
| 153 | peer_dest = vector_find_or_insert_peer( &(peer_list->peers), peer, peer_size, &exactmatch ); | 152 | peer_dest = vector_find_or_insert_peer(&(peer_list->peers), peer, peer_size, &exactmatch); |
| 154 | if( !peer_dest ) { | 153 | if (!peer_dest) { |
| 155 | mutex_bucket_unlock_by_hash( hash, 0 ); | 154 | mutex_bucket_unlock_by_hash(hash, 0); |
| 156 | return -1; | 155 | return -1; |
| 157 | } | 156 | } |
| 158 | /* Tell peer that it's fresh */ | 157 | /* Tell peer that it's fresh */ |
| 159 | OT_PEERTIME( peer, peer_size ) = 0; | 158 | OT_PEERTIME(peer, peer_size) = 0; |
| 160 | 159 | ||
| 161 | /* If we hadn't had a match create peer there */ | 160 | /* If we hadn't had a match create peer there */ |
| 162 | if( !exactmatch ) { | 161 | if (!exactmatch) { |
| 163 | peer_list->peer_count++; | 162 | peer_list->peer_count++; |
| 164 | if( OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_SEEDING ) | 163 | if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_SEEDING) |
| 165 | peer_list->seed_count++; | 164 | peer_list->seed_count++; |
| 166 | } | 165 | } |
| 167 | memcpy( peer_dest, peer, peer_size ); | 166 | memcpy(peer_dest, peer, peer_size); |
| 168 | mutex_bucket_unlock_by_hash( hash, 0 ); | 167 | mutex_bucket_unlock_by_hash(hash, 0); |
| 169 | return 0; | 168 | return 0; |
| 170 | } | 169 | } |
| 171 | 170 | ||
| 172 | size_t remove_peer_from_torrent_proxy( ot_hash hash, ot_peer *peer, size_t peer_size ) { | 171 | size_t remove_peer_from_torrent_proxy(ot_hash hash, ot_peer *peer, size_t peer_size) { |
| 173 | int exactmatch; | 172 | int exactmatch; |
| 174 | ot_vector *torrents_list = mutex_bucket_lock_by_hash( hash ); | 173 | ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash); |
| 175 | ot_torrent *torrent = binary_search( hash, torrents_list->data, torrents_list->size, sizeof( ot_torrent ), OT_HASH_COMPARE_SIZE, &exactmatch ); | 174 | ot_torrent *torrent = binary_search(hash, torrents_list->data, torrents_list->size, sizeof(ot_torrent), OT_HASH_COMPARE_SIZE, &exactmatch); |
| 176 | 175 | ||
| 177 | if( exactmatch ) { | 176 | if (exactmatch) { |
| 178 | ot_peerlist *peer_list = peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4; | 177 | ot_peerlist *peer_list = peer_list = peer_size == OT_PEER_SIZE6 ? torrent->peer_list6 : torrent->peer_list4; |
| 179 | switch( vector_remove_peer( &peer_list->peers, peer, peer_size ) ) { | 178 | switch (vector_remove_peer(&peer_list->peers, peer, peer_size)) { |
| 180 | case 2: peer_list->seed_count--; /* Intentional fallthrough */ | 179 | case 2: |
| 181 | case 1: peer_list->peer_count--; /* Intentional fallthrough */ | 180 | peer_list->seed_count--; /* Intentional fallthrough */ |
| 182 | default: break; | 181 | case 1: |
| 182 | peer_list->peer_count--; /* Intentional fallthrough */ | ||
| 183 | default: | ||
| 184 | break; | ||
| 183 | } | 185 | } |
| 184 | } | 186 | } |
| 185 | 187 | ||
| 186 | mutex_bucket_unlock_by_hash( hash, 0 ); | 188 | mutex_bucket_unlock_by_hash(hash, 0); |
| 187 | return 0; | 189 | return 0; |
| 188 | } | 190 | } |
| 189 | 191 | ||
| 190 | void free_peerlist( ot_peerlist *peer_list ) { | 192 | void free_peerlist(ot_peerlist *peer_list) { |
| 191 | if( peer_list->peers.data ) { | 193 | if (peer_list->peers.data) { |
| 192 | if( OT_PEERLIST_HASBUCKETS( peer_list ) ) { | 194 | if (OT_PEERLIST_HASBUCKETS(peer_list)) { |
| 193 | ot_vector *bucket_list = (ot_vector*)(peer_list->peers.data); | 195 | ot_vector *bucket_list = (ot_vector *)(peer_list->peers.data); |
| 194 | 196 | ||
| 195 | while( peer_list->peers.size-- ) | 197 | while (peer_list->peers.size--) |
| 196 | free( bucket_list++->data ); | 198 | free(bucket_list++->data); |
| 197 | } | 199 | } |
| 198 | free( peer_list->peers.data ); | 200 | free(peer_list->peers.data); |
| 199 | } | 201 | } |
| 200 | free( peer_list ); | 202 | free(peer_list); |
| 201 | } | 203 | } |
| 202 | 204 | ||
| 203 | static void livesync_handle_peersync( ssize_t datalen, size_t peer_size ) { | 205 | static void livesync_handle_peersync(ssize_t datalen, size_t peer_size) { |
| 204 | int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); | 206 | int off = sizeof(g_tracker_id) + sizeof(uint32_t); |
| 205 | 207 | ||
| 206 | fprintf( stderr, "." ); | 208 | fprintf(stderr, "."); |
| 207 | 209 | ||
| 208 | while( (ssize_t)(off + sizeof( ot_hash ) + peer_size) <= datalen ) { | 210 | while ((ssize_t)(off + sizeof(ot_hash) + peer_size) <= datalen) { |
| 209 | ot_peer *peer = (ot_peer*)(g_inbuffer + off + sizeof(ot_hash)); | 211 | ot_peer *peer = (ot_peer *)(g_inbuffer + off + sizeof(ot_hash)); |
| 210 | ot_hash *hash = (ot_hash*)(g_inbuffer + off); | 212 | ot_hash *hash = (ot_hash *)(g_inbuffer + off); |
| 211 | 213 | ||
| 212 | if( OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_STOPPED ) | 214 | if (OT_PEERFLAG_D(peer, peer_size) & PEER_FLAG_STOPPED) |
| 213 | remove_peer_from_torrent_proxy( *hash, peer, peer_size ); | 215 | remove_peer_from_torrent_proxy(*hash, peer, peer_size); |
| 214 | else | 216 | else |
| 215 | add_peer_to_torrent_proxy( *hash, peer, peer_size ); | 217 | add_peer_to_torrent_proxy(*hash, peer, peer_size); |
| 216 | 218 | ||
| 217 | off += sizeof( ot_hash ) + peer_size; | 219 | off += sizeof(ot_hash) + peer_size; |
| 218 | } | 220 | } |
| 219 | } | 221 | } |
| 220 | 222 | ||
| 221 | int usage( char *self ) { | 223 | int usage(char *self) { |
| 222 | fprintf( stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self ); | 224 | fprintf(stderr, "Usage: %s -L <livesync_iface_ip> -l <listenip>:<listenport> -c <connectip>:<connectport>\n", self); |
| 223 | return 0; | 225 | return 0; |
| 224 | } | 226 | } |
| 225 | 227 | ||
| @@ -234,115 +236,115 @@ enum { | |||
| 234 | FLAG_MASK = 0x07 | 236 | FLAG_MASK = 0x07 |
| 235 | }; | 237 | }; |
| 236 | 238 | ||
| 237 | #define PROXYPEER_NEEDSCONNECT(flag) ((flag)==FLAG_OUTGOING) | 239 | #define PROXYPEER_NEEDSCONNECT(flag) ((flag) == FLAG_OUTGOING) |
| 238 | #define PROXYPEER_ISCONNECTED(flag) (((flag)&FLAG_MASK)==FLAG_CONNECTED) | 240 | #define PROXYPEER_ISCONNECTED(flag) (((flag) & FLAG_MASK) == FLAG_CONNECTED) |
| 239 | #define PROXYPEER_SETDISCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_DISCONNECTED) | 241 | #define PROXYPEER_SETDISCONNECTED(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_DISCONNECTED) |
| 240 | #define PROXYPEER_SETCONNECTING(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTING) | 242 | #define PROXYPEER_SETCONNECTING(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTING) |
| 241 | #define PROXYPEER_SETWAITTRACKERID(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_WAITTRACKERID) | 243 | #define PROXYPEER_SETWAITTRACKERID(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_WAITTRACKERID) |
| 242 | #define PROXYPEER_SETCONNECTED(flag) (flag)=(((flag)&FLAG_OUTGOING)|FLAG_CONNECTED) | 244 | #define PROXYPEER_SETCONNECTED(flag) (flag) = (((flag) & FLAG_OUTGOING) | FLAG_CONNECTED) |
| 243 | 245 | ||
| 244 | typedef struct { | 246 | typedef struct { |
| 245 | int state; /* Whether we want to connect, how far our handshake is, etc. */ | 247 | int state; /* Whether we want to connect, how far our handshake is, etc. */ |
| 246 | ot_ip6 ip; /* The peer to connect to */ | 248 | ot_ip6 ip; /* The peer to connect to */ |
| 247 | uint16_t port; /* The peers port */ | 249 | uint16_t port; /* The peers port */ |
| 248 | uint8_t indata[8192*16]; /* Any data not processed yet */ | 250 | uint8_t indata[8192 * 16]; /* Any data not processed yet */ |
| 249 | size_t indata_length; /* Length of unprocessed data */ | 251 | size_t indata_length; /* Length of unprocessed data */ |
| 250 | uint32_t tracker_id; /* How the other end greeted */ | 252 | uint32_t tracker_id; /* How the other end greeted */ |
| 251 | int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ | 253 | int64 fd; /* A file handle, if connected, <= 0 is disconnected (0 initially, -1 else) */ |
| 252 | io_batch outdata; /* The iobatch containing our sync data */ | 254 | io_batch outdata; /* The iobatch containing our sync data */ |
| 253 | 255 | ||
| 254 | size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */ | 256 | size_t packet_tcount; /* Number of unprocessed torrents in packet we currently receive */ |
| 255 | uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */ | 257 | uint8_t packet_tprefix; /* Prefix byte for all torrents in current packet */ |
| 256 | uint8_t packet_type; /* Type of current packet */ | 258 | uint8_t packet_type; /* Type of current packet */ |
| 257 | uint32_t packet_tid; /* Tracker id for current packet */ | 259 | uint32_t packet_tid; /* Tracker id for current packet */ |
| 258 | 260 | ||
| 259 | } proxy_peer; | 261 | } proxy_peer; |
| 260 | static void process_indata( proxy_peer * peer ); | 262 | static void process_indata(proxy_peer *peer); |
| 261 | 263 | ||
| 262 | void reset_info_block( proxy_peer * peer ) { | 264 | void reset_info_block(proxy_peer *peer) { |
| 263 | peer->indata_length = 0; | 265 | peer->indata_length = 0; |
| 264 | peer->tracker_id = 0; | 266 | peer->tracker_id = 0; |
| 265 | peer->fd = -1; | 267 | peer->fd = -1; |
| 266 | peer->packet_tcount = 0; | 268 | peer->packet_tcount = 0; |
| 267 | iob_reset( &peer->outdata ); | 269 | iob_reset(&peer->outdata); |
| 268 | PROXYPEER_SETDISCONNECTED( peer->state ); | 270 | PROXYPEER_SETDISCONNECTED(peer->state); |
| 269 | } | 271 | } |
| 270 | 272 | ||
| 271 | /* Number of connections to peers | 273 | /* Number of connections to peers |
| 272 | * If a peer's IP is set, we try to reconnect, when the connection drops | 274 | * If a peer's IP is set, we try to reconnect, when the connection drops |
| 273 | * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it | 275 | * If we already have a connected tracker_id in our records for an _incoming_ connection, drop it |
| 274 | * Multiple connections to/from the same ip are okay, if tracker_id doesn't match | 276 | * Multiple connections to/from the same ip are okay, if tracker_id doesn't match |
| 275 | * Reconnect attempts occur only twice a minute | 277 | * Reconnect attempts occur only twice a minute |
| 276 | */ | 278 | */ |
| 277 | static int g_connection_count; | 279 | static int g_connection_count; |
| 278 | static ot_time g_connection_reconn; | 280 | static ot_time g_connection_reconn; |
| 279 | static proxy_peer g_connections[MAX_PEERS]; | 281 | static proxy_peer g_connections[MAX_PEERS]; |
| 280 | 282 | ||
| 281 | static void handle_reconnects( void ) { | 283 | static void handle_reconnects(void) { |
| 282 | int i; | 284 | int i; |
| 283 | for( i=0; i<g_connection_count; ++i ) | 285 | for (i = 0; i < g_connection_count; ++i) |
| 284 | if( PROXYPEER_NEEDSCONNECT( g_connections[i].state ) ) { | 286 | if (PROXYPEER_NEEDSCONNECT(g_connections[i].state)) { |
| 285 | int64 newfd = socket_tcp6( ); | 287 | int64 newfd = socket_tcp6(); |
| 286 | fprintf( stderr, "(Re)connecting to peer..." ); | 288 | fprintf(stderr, "(Re)connecting to peer..."); |
| 287 | if( newfd < 0 ) continue; /* No socket for you */ | 289 | if (newfd < 0) |
| 290 | continue; /* No socket for you */ | ||
| 288 | io_fd(newfd); | 291 | io_fd(newfd); |
| 289 | if( socket_bind6_reuse(newfd,g_serverip,g_serverport,0) ) { | 292 | if (socket_bind6_reuse(newfd, g_serverip, g_serverport, 0)) { |
| 290 | io_close( newfd ); | 293 | io_close(newfd); |
| 291 | continue; | 294 | continue; |
| 292 | } | 295 | } |
| 293 | if( socket_connect6(newfd,g_connections[i].ip,g_connections[i].port,0) == -1 && | 296 | if (socket_connect6(newfd, g_connections[i].ip, g_connections[i].port, 0) == -1 && errno != EINPROGRESS && errno != EWOULDBLOCK) { |
| 294 | errno != EINPROGRESS && errno != EWOULDBLOCK ) { | ||
| 295 | close(newfd); | 297 | close(newfd); |
| 296 | continue; | 298 | continue; |
| 297 | } | 299 | } |
| 298 | io_wantwrite(newfd); /* So we will be informed when it is connected */ | 300 | io_wantwrite(newfd); /* So we will be informed when it is connected */ |
| 299 | io_setcookie(newfd,g_connections+i); | 301 | io_setcookie(newfd, g_connections + i); |
| 300 | 302 | ||
| 301 | /* Prepare connection info block */ | 303 | /* Prepare connection info block */ |
| 302 | reset_info_block( g_connections+i ); | 304 | reset_info_block(g_connections + i); |
| 303 | g_connections[i].fd = newfd; | 305 | g_connections[i].fd = newfd; |
| 304 | PROXYPEER_SETCONNECTING( g_connections[i].state ); | 306 | PROXYPEER_SETCONNECTING(g_connections[i].state); |
| 305 | } | 307 | } |
| 306 | g_connection_reconn = time(NULL) + 30; | 308 | g_connection_reconn = time(NULL) + 30; |
| 307 | } | 309 | } |
| 308 | 310 | ||
| 309 | /* Handle incoming connection requests, check against whitelist */ | 311 | /* Handle incoming connection requests, check against whitelist */ |
| 310 | static void handle_accept( int64 serversocket ) { | 312 | static void handle_accept(int64 serversocket) { |
| 311 | int64 newfd; | 313 | int64 newfd; |
| 312 | ot_ip6 ip; | 314 | ot_ip6 ip; |
| 313 | uint16 port; | 315 | uint16 port; |
| 314 | 316 | ||
| 315 | while( ( newfd = socket_accept6( serversocket, ip, &port, NULL ) ) != -1 ) { | 317 | while ((newfd = socket_accept6(serversocket, ip, &port, NULL)) != -1) { |
| 316 | 318 | ||
| 317 | /* XXX some access control */ | 319 | /* XXX some access control */ |
| 318 | 320 | ||
| 319 | /* Put fd into a non-blocking mode */ | 321 | /* Put fd into a non-blocking mode */ |
| 320 | io_nonblock( newfd ); | 322 | io_nonblock(newfd); |
| 321 | 323 | ||
| 322 | if( !io_fd( newfd ) ) | 324 | if (!io_fd(newfd)) |
| 323 | io_close( newfd ); | 325 | io_close(newfd); |
| 324 | else { | 326 | else { |
| 325 | /* Find a new home for our incoming connection */ | 327 | /* Find a new home for our incoming connection */ |
| 326 | int i; | 328 | int i; |
| 327 | for( i=0; i<MAX_PEERS; ++i ) | 329 | for (i = 0; i < MAX_PEERS; ++i) |
| 328 | if( g_connections[i].state == FLAG_DISCONNECTED ) | 330 | if (g_connections[i].state == FLAG_DISCONNECTED) |
| 329 | break; | 331 | break; |
| 330 | if( i == MAX_PEERS ) { | 332 | if (i == MAX_PEERS) { |
| 331 | fprintf( stderr, "No room for incoming connection." ); | 333 | fprintf(stderr, "No room for incoming connection."); |
| 332 | close( newfd ); | 334 | close(newfd); |
| 333 | continue; | 335 | continue; |
| 334 | } | 336 | } |
| 335 | 337 | ||
| 336 | /* Prepare connection info block */ | 338 | /* Prepare connection info block */ |
| 337 | reset_info_block( g_connections+i ); | 339 | reset_info_block(g_connections + i); |
| 338 | PROXYPEER_SETCONNECTING( g_connections[i].state ); | 340 | PROXYPEER_SETCONNECTING(g_connections[i].state); |
| 339 | g_connections[i].port = port; | 341 | g_connections[i].port = port; |
| 340 | g_connections[i].fd = newfd; | 342 | g_connections[i].fd = newfd; |
| 341 | 343 | ||
| 342 | io_setcookie( newfd, g_connections + i ); | 344 | io_setcookie(newfd, g_connections + i); |
| 343 | 345 | ||
| 344 | /* We expect the connecting side to begin with its tracker_id */ | 346 | /* We expect the connecting side to begin with its tracker_id */ |
| 345 | io_wantread( newfd ); | 347 | io_wantread(newfd); |
| 346 | } | 348 | } |
| 347 | } | 349 | } |
| 348 | 350 | ||
| @@ -350,117 +352,116 @@ static void handle_accept( int64 serversocket ) { | |||
| 350 | } | 352 | } |
| 351 | 353 | ||
| 352 | /* New sync data on the stream */ | 354 | /* New sync data on the stream */ |
| 353 | static void handle_read( int64 peersocket ) { | 355 | static void handle_read(int64 peersocket) { |
| 354 | int i; | 356 | int i; |
| 355 | int64 datalen; | 357 | int64 datalen; |
| 356 | uint32_t tracker_id; | 358 | uint32_t tracker_id; |
| 357 | proxy_peer *peer = io_getcookie( peersocket ); | 359 | proxy_peer *peer = io_getcookie(peersocket); |
| 358 | 360 | ||
| 359 | if( !peer ) { | 361 | if (!peer) { |
| 360 | /* Can't happen ;) */ | 362 | /* Can't happen ;) */ |
| 361 | io_close( peersocket ); | 363 | io_close(peersocket); |
| 362 | return; | 364 | return; |
| 363 | } | 365 | } |
| 364 | switch( peer->state & FLAG_MASK ) { | 366 | switch (peer->state & FLAG_MASK) { |
| 365 | case FLAG_DISCONNECTED: | 367 | case FLAG_DISCONNECTED: |
| 366 | io_close( peersocket ); | 368 | io_close(peersocket); |
| 367 | break; /* Shouldnt happen */ | 369 | break; /* Shouldnt happen */ |
| 368 | case FLAG_CONNECTING: | 370 | case FLAG_CONNECTING: |
| 369 | case FLAG_WAITTRACKERID: | 371 | case FLAG_WAITTRACKERID: |
| 370 | /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) | 372 | /* We want at least the first four bytes to come at once, to avoid keeping extra states (for now) |
| 371 | This also catches 0 bytes reads == EOF and negative values, denoting connection errors */ | 373 | This also catches 0 bytes reads == EOF and negative values, denoting connection errors */ |
| 372 | if( io_tryread( peersocket, (void*)&tracker_id, sizeof( tracker_id ) ) != sizeof( tracker_id ) ) | 374 | if (io_tryread(peersocket, (void *)&tracker_id, sizeof(tracker_id)) != sizeof(tracker_id)) |
| 373 | goto close_socket; | 375 | goto close_socket; |
| 374 | 376 | ||
| 375 | /* See, if we already have a connection to that peer */ | 377 | /* See, if we already have a connection to that peer */ |
| 376 | for( i=0; i<MAX_PEERS; ++i ) | 378 | for (i = 0; i < MAX_PEERS; ++i) |
| 377 | if( ( g_connections[i].state & FLAG_MASK ) == FLAG_CONNECTED && | 379 | if ((g_connections[i].state & FLAG_MASK) == FLAG_CONNECTED && g_connections[i].tracker_id == tracker_id) { |
| 378 | g_connections[i].tracker_id == tracker_id ) { | 380 | fprintf(stderr, "Peer already connected. Closing connection.\n"); |
| 379 | fprintf( stderr, "Peer already connected. Closing connection.\n" ); | ||
| 380 | goto close_socket; | 381 | goto close_socket; |
| 381 | } | 382 | } |
| 382 | 383 | ||
| 383 | /* Also no need for soliloquy */ | 384 | /* Also no need for soliloquy */ |
| 384 | if( tracker_id == g_tracker_id ) | 385 | if (tracker_id == g_tracker_id) |
| 385 | goto close_socket; | 386 | goto close_socket; |
| 386 | 387 | ||
| 387 | /* The new connection is good, send our tracker_id on incoming connections */ | 388 | /* The new connection is good, send our tracker_id on incoming connections */ |
| 388 | if( peer->state == FLAG_CONNECTING ) | 389 | if (peer->state == FLAG_CONNECTING) |
| 389 | if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) != sizeof( g_tracker_id ) ) | 390 | if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) != sizeof(g_tracker_id)) |
| 390 | goto close_socket; | 391 | goto close_socket; |
| 391 | 392 | ||
| 392 | peer->tracker_id = tracker_id; | 393 | peer->tracker_id = tracker_id; |
| 393 | PROXYPEER_SETCONNECTED( peer->state ); | 394 | PROXYPEER_SETCONNECTED(peer->state); |
| 394 | 395 | ||
| 395 | if( peer->state & FLAG_OUTGOING ) | 396 | if (peer->state & FLAG_OUTGOING) |
| 396 | fprintf( stderr, "succeeded.\n" ); | 397 | fprintf(stderr, "succeeded.\n"); |
| 397 | else | 398 | else |
| 398 | fprintf( stderr, "Incoming connection successful.\n" ); | 399 | fprintf(stderr, "Incoming connection successful.\n"); |
| 399 | 400 | ||
| 400 | break; | 401 | break; |
| 401 | close_socket: | 402 | close_socket: |
| 402 | fprintf( stderr, "Handshake incomplete, closing socket\n" ); | 403 | fprintf(stderr, "Handshake incomplete, closing socket\n"); |
| 403 | io_close( peersocket ); | 404 | io_close(peersocket); |
| 404 | reset_info_block( peer ); | 405 | reset_info_block(peer); |
| 405 | break; | 406 | break; |
| 406 | case FLAG_CONNECTED: | 407 | case FLAG_CONNECTED: |
| 407 | /* Here we acutally expect data from peer | 408 | /* Here we acutally expect data from peer |
| 408 | indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */ | 409 | indata_length should be less than 20+256*7 bytes, for incomplete torrent entries */ |
| 409 | datalen = io_tryread( peersocket, (void*)(peer->indata + peer->indata_length), sizeof( peer->indata ) - peer->indata_length ); | 410 | datalen = io_tryread(peersocket, (void *)(peer->indata + peer->indata_length), sizeof(peer->indata) - peer->indata_length); |
| 410 | if( !datalen || datalen < -1 ) { | 411 | if (!datalen || datalen < -1) { |
| 411 | fprintf( stderr, "Connection closed by remote peer.\n" ); | 412 | fprintf(stderr, "Connection closed by remote peer.\n"); |
| 412 | io_close( peersocket ); | 413 | io_close(peersocket); |
| 413 | reset_info_block( peer ); | 414 | reset_info_block(peer); |
| 414 | } else if( datalen > 0 ) { | 415 | } else if (datalen > 0) { |
| 415 | peer->indata_length += datalen; | 416 | peer->indata_length += datalen; |
| 416 | process_indata( peer ); | 417 | process_indata(peer); |
| 417 | } | 418 | } |
| 418 | break; | 419 | break; |
| 419 | } | 420 | } |
| 420 | } | 421 | } |
| 421 | 422 | ||
| 422 | /* Can write new sync data to the stream */ | 423 | /* Can write new sync data to the stream */ |
| 423 | static void handle_write( int64 peersocket ) { | 424 | static void handle_write(int64 peersocket) { |
| 424 | proxy_peer *peer = io_getcookie( peersocket ); | 425 | proxy_peer *peer = io_getcookie(peersocket); |
| 425 | 426 | ||
| 426 | if( !peer ) { | 427 | if (!peer) { |
| 427 | /* Can't happen ;) */ | 428 | /* Can't happen ;) */ |
| 428 | io_close( peersocket ); | 429 | io_close(peersocket); |
| 429 | return; | 430 | return; |
| 430 | } | 431 | } |
| 431 | 432 | ||
| 432 | switch( peer->state & FLAG_MASK ) { | 433 | switch (peer->state & FLAG_MASK) { |
| 433 | case FLAG_DISCONNECTED: | 434 | case FLAG_DISCONNECTED: |
| 434 | default: /* Should not happen */ | 435 | default: /* Should not happen */ |
| 435 | io_close( peersocket ); | 436 | io_close(peersocket); |
| 436 | break; | 437 | break; |
| 437 | case FLAG_CONNECTING: | 438 | case FLAG_CONNECTING: |
| 438 | /* Ensure that the connection is established and handle connection error */ | 439 | /* Ensure that the connection is established and handle connection error */ |
| 439 | if( peer->state & FLAG_OUTGOING && !socket_connected( peersocket ) ) { | 440 | if (peer->state & FLAG_OUTGOING && !socket_connected(peersocket)) { |
| 440 | fprintf( stderr, "failed\n" ); | 441 | fprintf(stderr, "failed\n"); |
| 441 | reset_info_block( peer ); | 442 | reset_info_block(peer); |
| 442 | io_close( peersocket ); | 443 | io_close(peersocket); |
| 443 | break; | 444 | break; |
| 444 | } | 445 | } |
| 445 | 446 | ||
| 446 | if( io_trywrite( peersocket, (void*)&g_tracker_id, sizeof( g_tracker_id ) ) == sizeof( g_tracker_id ) ) { | 447 | if (io_trywrite(peersocket, (void *)&g_tracker_id, sizeof(g_tracker_id)) == sizeof(g_tracker_id)) { |
| 447 | PROXYPEER_SETWAITTRACKERID( peer->state ); | 448 | PROXYPEER_SETWAITTRACKERID(peer->state); |
| 448 | io_dontwantwrite( peersocket ); | 449 | io_dontwantwrite(peersocket); |
| 449 | io_wantread( peersocket ); | 450 | io_wantread(peersocket); |
| 450 | } else { | 451 | } else { |
| 451 | fprintf( stderr, "Handshake incomplete, closing socket\n" ); | 452 | fprintf(stderr, "Handshake incomplete, closing socket\n"); |
| 452 | io_close( peersocket ); | 453 | io_close(peersocket); |
| 453 | reset_info_block( peer ); | 454 | reset_info_block(peer); |
| 454 | } | 455 | } |
| 455 | break; | 456 | break; |
| 456 | case FLAG_CONNECTED: | 457 | case FLAG_CONNECTED: |
| 457 | switch( iob_send( peersocket, &peer->outdata ) ) { | 458 | switch (iob_send(peersocket, &peer->outdata)) { |
| 458 | case 0: /* all data sent */ | 459 | case 0: /* all data sent */ |
| 459 | io_dontwantwrite( peersocket ); | 460 | io_dontwantwrite(peersocket); |
| 460 | break; | 461 | break; |
| 461 | case -3: /* an error occured */ | 462 | case -3: /* an error occured */ |
| 462 | io_close( peersocket ); | 463 | io_close(peersocket); |
| 463 | reset_info_block( peer ); | 464 | reset_info_block(peer); |
| 464 | break; | 465 | break; |
| 465 | default: /* Normal operation or eagain */ | 466 | default: /* Normal operation or eagain */ |
| 466 | break; | 467 | break; |
| @@ -475,289 +476,324 @@ static void server_mainloop() { | |||
| 475 | int64 sock; | 476 | int64 sock; |
| 476 | 477 | ||
| 477 | /* inlined livesync_init() */ | 478 | /* inlined livesync_init() */ |
| 478 | memset( g_peerbuffer_start, 0, sizeof( g_peerbuffer_start ) ); | 479 | memset(g_peerbuffer_start, 0, sizeof(g_peerbuffer_start)); |
| 479 | g_peerbuffer_pos = g_peerbuffer_start; | 480 | g_peerbuffer_pos = g_peerbuffer_start; |
| 480 | memcpy( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); | 481 | memcpy(g_peerbuffer_pos, &g_tracker_id, sizeof(g_tracker_id)); |
| 481 | uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER); | 482 | uint32_pack_big((char *)g_peerbuffer_pos + sizeof(g_tracker_id), OT_SYNC_PEER); |
| 482 | g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t); | 483 | g_peerbuffer_pos += sizeof(g_tracker_id) + sizeof(uint32_t); |
| 483 | g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; | 484 | g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; |
| 484 | 485 | ||
| 485 | while(1) { | 486 | while (1) { |
| 486 | /* See if we need to connect to anyone */ | 487 | /* See if we need to connect to anyone */ |
| 487 | if( time(NULL) > g_connection_reconn ) | 488 | if (time(NULL) > g_connection_reconn) |
| 488 | handle_reconnects( ); | 489 | handle_reconnects(); |
| 489 | 490 | ||
| 490 | /* Wait for io events until next approx reconn check time */ | 491 | /* Wait for io events until next approx reconn check time */ |
| 491 | io_waituntil2( 30*1000 ); | 492 | io_waituntil2(30 * 1000); |
| 492 | 493 | ||
| 493 | /* Loop over readable sockets */ | 494 | /* Loop over readable sockets */ |
| 494 | while( ( sock = io_canread( ) ) != -1 ) { | 495 | while ((sock = io_canread()) != -1) { |
| 495 | const void *cookie = io_getcookie( sock ); | 496 | const void *cookie = io_getcookie(sock); |
| 496 | if( (uintptr_t)cookie == FLAG_SERVERSOCKET ) | 497 | if ((uintptr_t)cookie == FLAG_SERVERSOCKET) |
| 497 | handle_accept( sock ); | 498 | handle_accept(sock); |
| 498 | else | 499 | else |
| 499 | handle_read( sock ); | 500 | handle_read(sock); |
| 500 | } | 501 | } |
| 501 | 502 | ||
| 502 | /* Loop over writable sockets */ | 503 | /* Loop over writable sockets */ |
| 503 | while( ( sock = io_canwrite( ) ) != -1 ) | 504 | while ((sock = io_canwrite()) != -1) |
| 504 | handle_write( sock ); | 505 | handle_write(sock); |
| 505 | 506 | ||
| 506 | livesync_ticker( ); | 507 | livesync_ticker(); |
| 507 | } | 508 | } |
| 508 | } | 509 | } |
| 509 | 510 | ||
| 510 | static void panic( const char *routine ) { | 511 | static void panic(const char *routine) { |
| 511 | fprintf( stderr, "%s: %s\n", routine, strerror(errno) ); | 512 | fprintf(stderr, "%s: %s\n", routine, strerror(errno)); |
| 512 | exit( 111 ); | 513 | exit(111); |
| 513 | } | 514 | } |
| 514 | 515 | ||
| 515 | static int64_t ot_try_bind( ot_ip6 ip, uint16_t port ) { | 516 | static int64_t ot_try_bind(ot_ip6 ip, uint16_t port) { |
| 516 | int64 sock = socket_tcp6( ); | 517 | int64 sock = socket_tcp6(); |
| 517 | 518 | ||
| 518 | if( socket_bind6_reuse( sock, ip, port, 0 ) == -1 ) | 519 | if (socket_bind6_reuse(sock, ip, port, 0) == -1) |
| 519 | panic( "socket_bind6_reuse" ); | 520 | panic("socket_bind6_reuse"); |
| 520 | 521 | ||
| 521 | if( socket_listen( sock, SOMAXCONN) == -1 ) | 522 | if (socket_listen(sock, SOMAXCONN) == -1) |
| 522 | panic( "socket_listen" ); | 523 | panic("socket_listen"); |
| 523 | 524 | ||
| 524 | if( !io_fd( sock ) ) | 525 | if (!io_fd(sock)) |
| 525 | panic( "io_fd" ); | 526 | panic("io_fd"); |
| 526 | 527 | ||
| 527 | io_setcookie( sock, (void*)FLAG_SERVERSOCKET ); | 528 | io_setcookie(sock, (void *)FLAG_SERVERSOCKET); |
| 528 | io_wantread( sock ); | 529 | io_wantread(sock); |
| 529 | return sock; | 530 | return sock; |
| 530 | } | 531 | } |
| 531 | 532 | ||
| 532 | 533 | static int scan_ip6_port(const char *src, ot_ip6 ip, uint16 *port) { | |
| 533 | static int scan_ip6_port( const char *src, ot_ip6 ip, uint16 *port ) { | ||
| 534 | const char *s = src; | 534 | const char *s = src; |
| 535 | int off, bracket = 0; | 535 | int off, bracket = 0; |
| 536 | while( isspace(*s) ) ++s; | 536 | while (isspace(*s)) |
| 537 | if( *s == '[' ) ++s, ++bracket; /* for v6 style notation */ | 537 | ++s; |
| 538 | if( !(off = scan_ip6( s, ip ) ) ) | 538 | if (*s == '[') |
| 539 | ++s, ++bracket; /* for v6 style notation */ | ||
| 540 | if (!(off = scan_ip6(s, ip))) | ||
| 539 | return 0; | 541 | return 0; |
| 540 | s += off; | 542 | s += off; |
| 541 | if( *s == 0 || isspace(*s)) return s-src; | 543 | if (*s == 0 || isspace(*s)) |
| 542 | if( *s == ']' && bracket ) ++s; | 544 | return s - src; |
| 543 | if( !ip6_isv4mapped(ip)){ | 545 | if (*s == ']' && bracket) |
| 544 | if( ( bracket && *(s) != ':' ) || ( *(s) != '.' ) ) return 0; | 546 | ++s; |
| 547 | if (!ip6_isv4mapped(ip)) { | ||
| 548 | if ((bracket && *(s) != ':') || (*(s) != '.')) | ||
| 549 | return 0; | ||
| 545 | s++; | 550 | s++; |
| 546 | } else { | 551 | } else { |
| 547 | if( *(s++) != ':' ) return 0; | 552 | if (*(s++) != ':') |
| 553 | return 0; | ||
| 548 | } | 554 | } |
| 549 | if( !(off = scan_ushort (s, port ) ) ) | 555 | if (!(off = scan_ushort(s, port))) |
| 550 | return 0; | 556 | return 0; |
| 551 | return off+s-src; | 557 | return off + s - src; |
| 552 | } | 558 | } |
| 553 | 559 | ||
| 554 | int main( int argc, char **argv ) { | 560 | int main(int argc, char **argv) { |
| 555 | static pthread_t sync_in_thread_id; | 561 | static pthread_t sync_in_thread_id; |
| 556 | static pthread_t sync_out_thread_id; | 562 | static pthread_t sync_out_thread_id; |
| 557 | ot_ip6 serverip; | 563 | ot_ip6 serverip; |
| 558 | uint16_t tmpport; | 564 | uint16_t tmpport; |
| 559 | int scanon = 1, lbound = 0, sbound = 0; | 565 | int scanon = 1, lbound = 0, sbound = 0; |
| 560 | 566 | ||
| 561 | srandom( time(NULL) ); | 567 | srandom(time(NULL)); |
| 562 | #ifdef WANT_ARC4RANDOM | 568 | #ifdef WANT_ARC4RANDOM |
| 563 | g_tracker_id = arc4random(); | 569 | g_tracker_id = arc4random(); |
| 564 | #else | 570 | #else |
| 565 | g_tracker_id = random(); | 571 | g_tracker_id = random(); |
| 566 | #endif | 572 | #endif |
| 567 | 573 | ||
| 568 | while( scanon ) { | 574 | while (scanon) { |
| 569 | switch( getopt( argc, argv, ":l:c:L:h" ) ) { | 575 | switch (getopt(argc, argv, ":l:c:L:h")) { |
| 570 | case -1: scanon = 0; break; | 576 | case -1: |
| 577 | scanon = 0; | ||
| 578 | break; | ||
| 571 | case 'l': | 579 | case 'l': |
| 572 | tmpport = 0; | 580 | tmpport = 0; |
| 573 | if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); } | 581 | if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) { |
| 574 | ot_try_bind( serverip, tmpport ); | 582 | usage(argv[0]); |
| 583 | exit(1); | ||
| 584 | } | ||
| 585 | ot_try_bind(serverip, tmpport); | ||
| 575 | ++sbound; | 586 | ++sbound; |
| 576 | break; | 587 | break; |
| 577 | case 'c': | 588 | case 'c': |
| 578 | if( g_connection_count > MAX_PEERS / 2 ) exerr( "Connection limit exceeded.\n" ); | 589 | if (g_connection_count > MAX_PEERS / 2) |
| 590 | exerr("Connection limit exceeded.\n"); | ||
| 579 | tmpport = 0; | 591 | tmpport = 0; |
| 580 | if( !scan_ip6_port( optarg, | 592 | if (!scan_ip6_port(optarg, g_connections[g_connection_count].ip, &g_connections[g_connection_count].port) || !g_connections[g_connection_count].port) { |
| 581 | g_connections[g_connection_count].ip, | 593 | usage(argv[0]); |
| 582 | &g_connections[g_connection_count].port ) || | 594 | exit(1); |
| 583 | !g_connections[g_connection_count].port ) { usage( argv[0] ); exit( 1 ); } | 595 | } |
| 584 | g_connections[g_connection_count++].state = FLAG_OUTGOING; | 596 | g_connections[g_connection_count++].state = FLAG_OUTGOING; |
| 585 | break; | 597 | break; |
| 586 | case 'L': | 598 | case 'L': |
| 587 | tmpport = 9696; | 599 | tmpport = 9696; |
| 588 | if( !scan_ip6_port( optarg, serverip, &tmpport ) || !tmpport ) { usage( argv[0] ); exit( 1 ); } | 600 | if (!scan_ip6_port(optarg, serverip, &tmpport) || !tmpport) { |
| 589 | livesync_bind_mcast( serverip, tmpport); ++lbound; break; | 601 | usage(argv[0]); |
| 602 | exit(1); | ||
| 603 | } | ||
| 604 | livesync_bind_mcast(serverip, tmpport); | ||
| 605 | ++lbound; | ||
| 606 | break; | ||
| 590 | default: | 607 | default: |
| 591 | case '?': usage( argv[0] ); exit( 1 ); | 608 | case '?': |
| 609 | usage(argv[0]); | ||
| 610 | exit(1); | ||
| 592 | } | 611 | } |
| 593 | } | 612 | } |
| 594 | 613 | ||
| 595 | if( !lbound ) exerr( "No livesync port bound." ); | 614 | if (!lbound) |
| 596 | if( !g_connection_count && !sbound ) exerr( "No streamsync port bound." ); | 615 | exerr("No livesync port bound."); |
| 597 | pthread_create( &sync_in_thread_id, NULL, livesync_worker, NULL ); | 616 | if (!g_connection_count && !sbound) |
| 598 | pthread_create( &sync_out_thread_id, NULL, streamsync_worker, NULL ); | 617 | exerr("No streamsync port bound."); |
| 618 | pthread_create(&sync_in_thread_id, NULL, livesync_worker, NULL); | ||
| 619 | pthread_create(&sync_out_thread_id, NULL, streamsync_worker, NULL); | ||
| 599 | 620 | ||
| 600 | server_mainloop(); | 621 | server_mainloop(); |
| 601 | return 0; | 622 | return 0; |
| 602 | } | 623 | } |
| 603 | 624 | ||
| 604 | static void * streamsync_worker( void * args ) { | 625 | static void *streamsync_worker(void *args) { |
| 605 | (void)args; | 626 | (void)args; |
| 606 | while( 1 ) { | 627 | while (1) { |
| 607 | int bucket; | 628 | int bucket; |
| 608 | /* For each bucket... */ | 629 | /* For each bucket... */ |
| 609 | for( bucket=0; bucket<OT_BUCKET_COUNT; ++bucket ) { | 630 | for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) { |
| 610 | /* Get exclusive access to that bucket */ | 631 | /* Get exclusive access to that bucket */ |
| 611 | ot_vector *torrents_list = mutex_bucket_lock( bucket ); | 632 | ot_vector *torrents_list = mutex_bucket_lock(bucket); |
| 612 | size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0; | 633 | size_t tor_offset, count_def = 0, count_one = 0, count_two = 0, count_peers = 0; |
| 613 | size_t mem, mem_a = 0, mem_b = 0; | 634 | size_t mem, mem_a = 0, mem_b = 0; |
| 614 | uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c; | 635 | uint8_t *ptr = 0, *ptr_a, *ptr_b, *ptr_c; |
| 615 | 636 | ||
| 616 | if( !torrents_list->size ) goto unlock_continue; | 637 | if (!torrents_list->size) |
| 638 | goto unlock_continue; | ||
| 617 | 639 | ||
| 618 | /* For each torrent in this bucket.. */ | 640 | /* For each torrent in this bucket.. */ |
| 619 | for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { | 641 | for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) { |
| 620 | /* Address torrents members */ | 642 | /* Address torrents members */ |
| 621 | ot_peerlist *peer_list = ( ((ot_torrent*)(torrents_list->data))[tor_offset] ).peer_list; | 643 | ot_peerlist *peer_list = (((ot_torrent *)(torrents_list->data))[tor_offset]).peer_list; |
| 622 | switch( peer_list->peer_count ) { | 644 | switch (peer_list->peer_count) { |
| 623 | case 2: count_two++; break; | 645 | case 2: |
| 624 | case 1: count_one++; break; | 646 | count_two++; |
| 625 | case 0: break; | 647 | break; |
| 626 | default: count_def++; | 648 | case 1: |
| 627 | count_peers += peer_list->peer_count; | 649 | count_one++; |
| 650 | break; | ||
| 651 | case 0: | ||
| 652 | break; | ||
| 653 | default: | ||
| 654 | count_def++; | ||
| 655 | count_peers += peer_list->peer_count; | ||
| 628 | } | 656 | } |
| 629 | } | 657 | } |
| 630 | 658 | ||
| 631 | /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */ | 659 | /* Maximal memory requirement: max 3 blocks, max torrents * 20 + max peers * 7 */ |
| 632 | mem = 3 * ( 1 + 1 + 2 ) + ( count_one + count_two ) * ( 19 + 1 ) + count_def * ( 19 + 8 ) + | 660 | mem = 3 * (1 + 1 + 2) + (count_one + count_two) * (19 + 1) + count_def * (19 + 8) + (count_one + 2 * count_two + count_peers) * 7; |
| 633 | ( count_one + 2 * count_two + count_peers ) * 7; | 661 | |
| 634 | 662 | fprintf(stderr, "Mem: %zd\n", mem); | |
| 635 | fprintf( stderr, "Mem: %zd\n", mem ); | 663 | |
| 636 | 664 | ptr = ptr_a = ptr_b = ptr_c = malloc(mem); | |
| 637 | ptr = ptr_a = ptr_b = ptr_c = malloc( mem ); | 665 | if (!ptr) |
| 638 | if( !ptr ) goto unlock_continue; | 666 | goto unlock_continue; |
| 639 | 667 | ||
| 640 | if( count_one > 4 || !count_def ) { | 668 | if (count_one > 4 || !count_def) { |
| 641 | mem_a = 1 + 1 + 2 + count_one * ( 19 + 7 ); | 669 | mem_a = 1 + 1 + 2 + count_one * (19 + 7); |
| 642 | ptr_b += mem_a; ptr_c += mem_a; | 670 | ptr_b += mem_a; |
| 643 | ptr_a[0] = 1; /* Offset 0: packet type 1 */ | 671 | ptr_c += mem_a; |
| 644 | ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ | 672 | ptr_a[0] = 1; /* Offset 0: packet type 1 */ |
| 645 | ptr_a[2] = count_one >> 8; | 673 | ptr_a[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ |
| 646 | ptr_a[3] = count_one & 255; | 674 | ptr_a[2] = count_one >> 8; |
| 647 | ptr_a += 4; | 675 | ptr_a[3] = count_one & 255; |
| 676 | ptr_a += 4; | ||
| 648 | } else | 677 | } else |
| 649 | count_def += count_one; | 678 | count_def += count_one; |
| 650 | 679 | ||
| 651 | if( count_two > 4 || !count_def ) { | 680 | if (count_two > 4 || !count_def) { |
| 652 | mem_b = 1 + 1 + 2 + count_two * ( 19 + 14 ); | 681 | mem_b = 1 + 1 + 2 + count_two * (19 + 14); |
| 653 | ptr_c += mem_b; | 682 | ptr_c += mem_b; |
| 654 | ptr_b[0] = 2; /* Offset 0: packet type 2 */ | 683 | ptr_b[0] = 2; /* Offset 0: packet type 2 */ |
| 655 | ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ | 684 | ptr_b[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ |
| 656 | ptr_b[2] = count_two >> 8; | 685 | ptr_b[2] = count_two >> 8; |
| 657 | ptr_b[3] = count_two & 255; | 686 | ptr_b[3] = count_two & 255; |
| 658 | ptr_b += 4; | 687 | ptr_b += 4; |
| 659 | } else | 688 | } else |
| 660 | count_def += count_two; | 689 | count_def += count_two; |
| 661 | 690 | ||
| 662 | if( count_def ) { | 691 | if (count_def) { |
| 663 | ptr_c[0] = 0; /* Offset 0: packet type 0 */ | 692 | ptr_c[0] = 0; /* Offset 0: packet type 0 */ |
| 664 | ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ | 693 | ptr_c[1] = (bucket << 8) >> OT_BUCKET_COUNT_BITS; /* Offset 1: the shared prefix */ |
| 665 | ptr_c[2] = count_def >> 8; | 694 | ptr_c[2] = count_def >> 8; |
| 666 | ptr_c[3] = count_def & 255; | 695 | ptr_c[3] = count_def & 255; |
| 667 | ptr_c += 4; | 696 | ptr_c += 4; |
| 668 | } | 697 | } |
| 669 | 698 | ||
| 670 | /* For each torrent in this bucket.. */ | 699 | /* For each torrent in this bucket.. */ |
| 671 | for( tor_offset=0; tor_offset<torrents_list->size; ++tor_offset ) { | 700 | for (tor_offset = 0; tor_offset < torrents_list->size; ++tor_offset) { |
| 672 | /* Address torrents members */ | 701 | /* Address torrents members */ |
| 673 | ot_torrent *torrent = ((ot_torrent*)(torrents_list->data)) + tor_offset; | 702 | ot_torrent *torrent = ((ot_torrent *)(torrents_list->data)) + tor_offset; |
| 674 | ot_peerlist *peer_list = torrent->peer_list; | 703 | ot_peerlist *peer_list = torrent->peer_list; |
| 675 | ot_peer *peers = (ot_peer*)(peer_list->peers.data); | 704 | ot_peer *peers = (ot_peer *)(peer_list->peers.data); |
| 676 | uint8_t **dst; | 705 | uint8_t **dst; |
| 677 | 706 | ||
| 678 | /* Determine destination slot */ | 707 | /* Determine destination slot */ |
| 679 | count_peers = peer_list->peer_count; | 708 | count_peers = peer_list->peer_count; |
| 680 | switch( count_peers ) { | 709 | switch (count_peers) { |
| 681 | case 0: continue; | 710 | case 0: |
| 682 | case 1: dst = mem_a ? &ptr_a : &ptr_c; break; | 711 | continue; |
| 683 | case 2: dst = mem_b ? &ptr_b : &ptr_c; break; | 712 | case 1: |
| 684 | default: dst = &ptr_c; break; | 713 | dst = mem_a ? &ptr_a : &ptr_c; |
| 714 | break; | ||
| 715 | case 2: | ||
| 716 | dst = mem_b ? &ptr_b : &ptr_c; | ||
| 717 | break; | ||
| 718 | default: | ||
| 719 | dst = &ptr_c; | ||
| 720 | break; | ||
| 685 | } | 721 | } |
| 686 | 722 | ||
| 687 | /* Copy tail of info_hash, advance pointer */ | 723 | /* Copy tail of info_hash, advance pointer */ |
| 688 | memcpy( *dst, ((uint8_t*)torrent->hash) + 1, sizeof( ot_hash ) - 1); | 724 | memcpy(*dst, ((uint8_t *)torrent->hash) + 1, sizeof(ot_hash) - 1); |
| 689 | *dst += sizeof( ot_hash ) - 1; | 725 | *dst += sizeof(ot_hash) - 1; |
| 690 | 726 | ||
| 691 | /* Encode peer count */ | 727 | /* Encode peer count */ |
| 692 | if( dst == &ptr_c ) | 728 | if (dst == &ptr_c) |
| 693 | while( count_peers ) { | 729 | while (count_peers) { |
| 694 | if( count_peers <= 0x7f ) | 730 | if (count_peers <= 0x7f) |
| 695 | *(*dst)++ = count_peers; | 731 | *(*dst)++ = count_peers; |
| 696 | else | 732 | else |
| 697 | *(*dst)++ = 0x80 | ( count_peers & 0x7f ); | 733 | *(*dst)++ = 0x80 | (count_peers & 0x7f); |
| 698 | count_peers >>= 7; | 734 | count_peers >>= 7; |
| 699 | } | 735 | } |
| 700 | 736 | ||
| 701 | /* Copy peers */ | 737 | /* Copy peers */ |
| 702 | count_peers = peer_list->peer_count; | 738 | count_peers = peer_list->peer_count; |
| 703 | while( count_peers-- ) { | 739 | while (count_peers--) { |
| 704 | memcpy( *dst, peers++, OT_IP_SIZE + 3 ); | 740 | memcpy(*dst, peers++, OT_IP_SIZE + 3); |
| 705 | *dst += OT_IP_SIZE + 3; | 741 | *dst += OT_IP_SIZE + 3; |
| 706 | } | 742 | } |
| 707 | free_peerlist(peer_list); | 743 | free_peerlist(peer_list); |
| 708 | } | 744 | } |
| 709 | 745 | ||
| 710 | free( torrents_list->data ); | 746 | free(torrents_list->data); |
| 711 | memset( torrents_list, 0, sizeof(*torrents_list ) ); | 747 | memset(torrents_list, 0, sizeof(*torrents_list)); |
| 712 | unlock_continue: | 748 | unlock_continue: |
| 713 | mutex_bucket_unlock( bucket, 0 ); | 749 | mutex_bucket_unlock(bucket, 0); |
| 714 | 750 | ||
| 715 | if( ptr ) { | 751 | if (ptr) { |
| 716 | int i; | 752 | int i; |
| 717 | 753 | ||
| 718 | if( ptr_b > ptr_c ) ptr_c = ptr_b; | 754 | if (ptr_b > ptr_c) |
| 719 | if( ptr_a > ptr_c ) ptr_c = ptr_a; | 755 | ptr_c = ptr_b; |
| 756 | if (ptr_a > ptr_c) | ||
| 757 | ptr_c = ptr_a; | ||
| 720 | mem = ptr_c - ptr; | 758 | mem = ptr_c - ptr; |
| 721 | 759 | ||
| 722 | for( i=0; i < MAX_PEERS; ++i ) { | 760 | for (i = 0; i < MAX_PEERS; ++i) { |
| 723 | if( PROXYPEER_ISCONNECTED(g_connections[i].state) ) { | 761 | if (PROXYPEER_ISCONNECTED(g_connections[i].state)) { |
| 724 | void *tmp = malloc( mem ); | 762 | void *tmp = malloc(mem); |
| 725 | if( tmp ) { | 763 | if (tmp) { |
| 726 | memcpy( tmp, ptr, mem ); | 764 | memcpy(tmp, ptr, mem); |
| 727 | iob_addbuf_free( &g_connections[i].outdata, tmp, mem ); | 765 | iob_addbuf_free(&g_connections[i].outdata, tmp, mem); |
| 728 | io_wantwrite( g_connections[i].fd ); | 766 | io_wantwrite(g_connections[i].fd); |
| 729 | } | 767 | } |
| 730 | } | 768 | } |
| 731 | } | 769 | } |
| 732 | 770 | ||
| 733 | free( ptr ); | 771 | free(ptr); |
| 734 | } | 772 | } |
| 735 | usleep( OT_SYNC_SLEEP ); | 773 | usleep(OT_SYNC_SLEEP); |
| 736 | } | 774 | } |
| 737 | } | 775 | } |
| 738 | return 0; | 776 | return 0; |
| 739 | } | 777 | } |
| 740 | 778 | ||
| 741 | static void livesync_issue_peersync( ) { | 779 | static void livesync_issue_peersync() { |
| 742 | socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, | 780 | socket_send4(g_socket_out, (char *)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, groupip_1, LIVESYNC_PORT); |
| 743 | groupip_1, LIVESYNC_PORT); | 781 | g_peerbuffer_pos = g_peerbuffer_start + sizeof(g_tracker_id) + sizeof(uint32_t); |
| 744 | g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t ); | ||
| 745 | g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; | 782 | g_next_packet_time = time(NULL) + LIVESYNC_MAXDELAY; |
| 746 | } | 783 | } |
| 747 | 784 | ||
| 748 | void livesync_ticker( ) { | 785 | void livesync_ticker() { |
| 749 | /* livesync_issue_peersync sets g_next_packet_time */ | 786 | /* livesync_issue_peersync sets g_next_packet_time */ |
| 750 | if( time(NULL) > g_next_packet_time && | 787 | if (time(NULL) > g_next_packet_time && g_peerbuffer_pos > g_peerbuffer_start + sizeof(g_tracker_id)) |
| 751 | g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) ) | ||
| 752 | livesync_issue_peersync(); | 788 | livesync_issue_peersync(); |
| 753 | } | 789 | } |
| 754 | 790 | ||
| 755 | static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *peer ) { | 791 | static void livesync_proxytell(uint8_t prefix, uint8_t *info_hash, uint8_t *peer) { |
| 756 | // unsigned int i; | 792 | // unsigned int i; |
| 757 | 793 | ||
| 758 | *g_peerbuffer_pos = prefix; | 794 | *g_peerbuffer_pos = prefix; |
| 759 | memcpy( g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1 ); | 795 | memcpy(g_peerbuffer_pos + 1, info_hash, sizeof(ot_hash) - 1); |
| 760 | memcpy( g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1 ); | 796 | memcpy(g_peerbuffer_pos + sizeof(ot_hash), peer, sizeof(ot_peer) - 1); |
| 761 | 797 | ||
| 762 | #if 0 | 798 | #if 0 |
| 763 | /* Dump info_hash */ | 799 | /* Dump info_hash */ |
| @@ -772,80 +808,84 @@ static void livesync_proxytell( uint8_t prefix, uint8_t *info_hash, uint8_t *pee | |||
| 772 | #endif | 808 | #endif |
| 773 | g_peerbuffer_pos += sizeof(ot_peer); | 809 | g_peerbuffer_pos += sizeof(ot_peer); |
| 774 | 810 | ||
| 775 | if( g_peerbuffer_pos >= g_peerbuffer_highwater ) | 811 | if (g_peerbuffer_pos >= g_peerbuffer_highwater) |
| 776 | livesync_issue_peersync(); | 812 | livesync_issue_peersync(); |
| 777 | } | 813 | } |
| 778 | 814 | ||
| 779 | static void process_indata( proxy_peer * peer ) { | 815 | static void process_indata(proxy_peer *peer) { |
| 780 | size_t consumed, peers; | 816 | size_t consumed, peers; |
| 781 | uint8_t *data = peer->indata, *hash; | 817 | uint8_t *data = peer->indata, *hash; |
| 782 | uint8_t *dataend = data + peer->indata_length; | 818 | uint8_t *dataend = data + peer->indata_length; |
| 783 | 819 | ||
| 784 | while( 1 ) { | 820 | while (1) { |
| 785 | /* If we're not inside of a packet, make a new one */ | 821 | /* If we're not inside of a packet, make a new one */ |
| 786 | if( !peer->packet_tcount ) { | 822 | if (!peer->packet_tcount) { |
| 787 | /* Ensure the header is complete or postpone processing */ | 823 | /* Ensure the header is complete or postpone processing */ |
| 788 | if( data + 4 > dataend ) break; | 824 | if (data + 4 > dataend) |
| 789 | peer->packet_type = data[0]; | 825 | break; |
| 790 | peer->packet_tprefix = data[1]; | 826 | peer->packet_type = data[0]; |
| 791 | peer->packet_tcount = data[2] * 256 + data[3]; | 827 | peer->packet_tprefix = data[1]; |
| 792 | data += 4; | 828 | peer->packet_tcount = data[2] * 256 + data[3]; |
| 793 | printf( "type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount ); | 829 | data += 4; |
| 830 | printf("type: %hhu, prefix: %02X, torrentcount: %zd\n", peer->packet_type, peer->packet_tprefix, peer->packet_tcount); | ||
| 794 | } | 831 | } |
| 795 | 832 | ||
| 796 | /* Ensure size for a minimal torrent block */ | 833 | /* Ensure size for a minimal torrent block */ |
| 797 | if( data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend ) break; | 834 | if (data + sizeof(ot_hash) + OT_IP_SIZE + 3 > dataend) |
| 835 | break; | ||
| 798 | 836 | ||
| 799 | /* Advance pointer to peer count or peers */ | 837 | /* Advance pointer to peer count or peers */ |
| 800 | hash = data; | 838 | hash = data; |
| 801 | data += sizeof(ot_hash) - 1; | 839 | data += sizeof(ot_hash) - 1; |
| 802 | 840 | ||
| 803 | /* Type 0 has peer count encoded before each peers */ | 841 | /* Type 0 has peer count encoded before each peers */ |
| 804 | peers = peer->packet_type; | 842 | peers = peer->packet_type; |
| 805 | if( !peers ) { | 843 | if (!peers) { |
| 806 | int shift = 0; | 844 | int shift = 0; |
| 807 | do peers |= ( 0x7f & *data ) << ( 7 * shift ); | 845 | do |
| 808 | while ( *(data++) & 0x80 && shift++ < 6 ); | 846 | peers |= (0x7f & *data) << (7 * shift); |
| 847 | while (*(data++) & 0x80 && shift++ < 6); | ||
| 809 | } | 848 | } |
| 810 | #if 0 | 849 | #if 0 |
| 811 | printf( "peers: %zd\n", peers ); | 850 | printf( "peers: %zd\n", peers ); |
| 812 | #endif | 851 | #endif |
| 813 | /* Ensure enough data being read to hold all peers */ | 852 | /* Ensure enough data being read to hold all peers */ |
| 814 | if( data + (OT_IP_SIZE + 3) * peers > dataend ) { | 853 | if (data + (OT_IP_SIZE + 3) * peers > dataend) { |
| 815 | data = hash; | 854 | data = hash; |
| 816 | break; | 855 | break; |
| 817 | } | 856 | } |
| 818 | while( peers-- ) { | 857 | while (peers--) { |
| 819 | livesync_proxytell( peer->packet_tprefix, hash, data ); | 858 | livesync_proxytell(peer->packet_tprefix, hash, data); |
| 820 | data += OT_IP_SIZE + 3; | 859 | data += OT_IP_SIZE + 3; |
| 821 | } | 860 | } |
| 822 | --peer->packet_tcount; | 861 | --peer->packet_tcount; |
| 823 | } | 862 | } |
| 824 | 863 | ||
| 825 | consumed = data - peer->indata; | 864 | consumed = data - peer->indata; |
| 826 | memmove( peer->indata, data, peer->indata_length - consumed ); | 865 | memmove(peer->indata, data, peer->indata_length - consumed); |
| 827 | peer->indata_length -= consumed; | 866 | peer->indata_length -= consumed; |
| 828 | } | 867 | } |
| 829 | 868 | ||
| 830 | static void * livesync_worker( void * args ) { | 869 | static void *livesync_worker(void *args) { |
| 831 | (void)args; | 870 | (void)args; |
| 832 | while( 1 ) { | 871 | while (1) { |
| 833 | ot_ip6 in_ip; uint16_t in_port; | 872 | ot_ip6 in_ip; |
| 834 | size_t datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12+(char*)in_ip, &in_port); | 873 | uint16_t in_port; |
| 874 | size_t datalen = socket_recv4(g_socket_in, (char *)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, 12 + (char *)in_ip, &in_port); | ||
| 835 | 875 | ||
| 836 | /* Expect at least tracker id and packet type */ | 876 | /* Expect at least tracker id and packet type */ |
| 837 | if( datalen <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) | 877 | if (datalen <= (ssize_t)(sizeof(g_tracker_id) + sizeof(uint32_t))) |
| 838 | continue; | 878 | continue; |
| 839 | if( !memcmp( g_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) { | 879 | if (!memcmp(g_inbuffer, &g_tracker_id, sizeof(g_tracker_id))) { |
| 840 | /* drop packet coming from ourselves */ | 880 | /* drop packet coming from ourselves */ |
| 841 | continue; | 881 | continue; |
| 842 | } | 882 | } |
| 843 | switch( uint32_read_big( (char*)g_inbuffer + sizeof( g_tracker_id ) ) ) { | 883 | switch (uint32_read_big((char *)g_inbuffer + sizeof(g_tracker_id))) { |
| 844 | case OT_SYNC_PEER4: | 884 | case OT_SYNC_PEER4: |
| 845 | livesync_handle_peersync( datalen, OT_PEER_SIZE4 ); | 885 | livesync_handle_peersync(datalen, OT_PEER_SIZE4); |
| 846 | break; | 886 | break; |
| 847 | case OT_SYNC_PEER6: | 887 | case OT_SYNC_PEER6: |
| 848 | livesync_handle_peersync( datalen, OT_PEER_SIZE6 ); | 888 | livesync_handle_peersync(datalen, OT_PEER_SIZE6); |
| 849 | break; | 889 | break; |
| 850 | default: | 890 | default: |
| 851 | // fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ); | 891 | // fprintf( stderr, "Received an unknown live sync packet type %u.\n", uint32_read_big( sizeof( g_tracker_id ) + (char*)g_inbuffer ) ); |
