diff options
Diffstat (limited to 'ot_livesync.c')
| -rw-r--r-- | ot_livesync.c | 400 | 
1 files changed, 319 insertions, 81 deletions
| diff --git a/ot_livesync.c b/ot_livesync.c index 3cad121..47a371a 100644 --- a/ot_livesync.c +++ b/ot_livesync.c | |||
| @@ -9,59 +9,109 @@ | |||
| 9 | #include <string.h> | 9 | #include <string.h> | 
| 10 | #include <pthread.h> | 10 | #include <pthread.h> | 
| 11 | #include <unistd.h> | 11 | #include <unistd.h> | 
| 12 | #include <stdlib.h> | ||
| 12 | 13 | ||
| 13 | /* Libowfat */ | 14 | /* Libowfat */ | 
| 14 | #include "socket.h" | 15 | #include "socket.h" | 
| 15 | #include "ndelay.h" | 16 | #include "ndelay.h" | 
| 17 | #include "byte.h" | ||
| 16 | 18 | ||
| 17 | /* Opentracker */ | 19 | /* Opentracker */ | 
| 18 | #include "trackerlogic.h" | 20 | #include "trackerlogic.h" | 
| 19 | #include "ot_livesync.h" | 21 | #include "ot_livesync.h" | 
| 20 | #include "ot_accesslist.h" | 22 | #include "ot_accesslist.h" | 
| 21 | #include "ot_stats.h" | 23 | #include "ot_stats.h" | 
| 24 | #include "ot_mutex.h" | ||
| 22 | 25 | ||
| 23 | #ifdef WANT_SYNC_LIVE | 26 | #ifdef WANT_SYNC_LIVE | 
| 24 | 27 | ||
| 25 | char groupip_1[4] = { 224,0,23,42 }; | 28 | char groupip_1[4] = { 224,0,23,5 }; | 
| 26 | 29 | ||
| 27 | #define LIVESYNC_BUFFINSIZE (256*256) | 30 | #define LIVESYNC_INCOMING_BUFFSIZE (256*256) | 
| 28 | #define LIVESYNC_BUFFSIZE 1504 | ||
| 29 | #define LIVESYNC_BUFFWATER (sizeof(ot_peer)+sizeof(ot_hash)) | ||
| 30 | 31 | ||
| 31 | #define LIVESYNC_MAXDELAY 15 | 32 | #define LIVESYNC_OUTGOING_BUFFSIZE_PEERS 1504 | 
| 33 | #define LIVESYNC_OUTGOING_WATERMARK_PEERS (sizeof(ot_peer)+sizeof(ot_hash)) | ||
| 34 | |||
| 35 | #ifdef WANT_SYNC_SCRAPE | ||
| 36 | #define LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE 1504 | ||
| 37 | #define LIVESYNC_OUTGOING_WATERMARK_SCRAPE (sizeof(ot_hash)+sizeof(uint64_t)+sizeof(uint32_t)) | ||
| 38 | #define LIVESYNC_OUTGOING_MAXPACKETS_SCRAPE 100 | ||
| 39 | |||
| 40 | #define LIVESYNC_FIRST_BEACON_DELAY (30*60) /* seconds */ | ||
| 41 | #define LIVESYNC_BEACON_INTERVAL 60 /* seconds */ | ||
| 42 | #define LIVESYNC_INQUIRE_THRESH 0.75 | ||
| 43 | #endif /* WANT_SYNC_SCRAPE */ | ||
| 44 | |||
| 45 | #define LIVESYNC_MAXDELAY 15 /* seconds */ | ||
| 46 | |||
| 47 | enum { OT_SYNC_PEER | ||
| 48 | #ifdef WANT_SYNC_SCRAPE | ||
| 49 | , OT_SYNC_SCRAPE_BEACON, OT_SYNC_SCRAPE_INQUIRE, OT_SYNC_SCRAPE_TELL | ||
| 50 | #endif | ||
| 51 | }; | ||
| 32 | 52 | ||
| 33 | /* Forward declaration */ | 53 | /* Forward declaration */ | 
| 34 | static void * livesync_worker( void * args ); | 54 | static void * livesync_worker( void * args ); | 
| 35 | 55 | ||
| 36 | /* For outgoing packets */ | 56 | /* For outgoing packets */ | 
| 37 | static int64 g_livesync_socket_in = -1; | 57 | static int64 g_socket_in = -1; | 
| 38 | 58 | ||
| 39 | /* For incoming packets */ | 59 | /* For incoming packets */ | 
| 40 | static int64 g_livesync_socket_out = -1; | 60 | static int64 g_socket_out = -1; | 
| 61 | static uint8_t g_inbuffer[LIVESYNC_INCOMING_BUFFSIZE]; | ||
| 41 | 62 | ||
| 42 | static uint8_t livesync_inbuffer[LIVESYNC_BUFFINSIZE]; | 63 | static uint8_t g_peerbuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_PEERS]; | 
| 43 | static uint8_t livesync_outbuffer_start[ LIVESYNC_BUFFSIZE ]; | 64 | static uint8_t *g_peerbuffer_pos; | 
| 44 | static uint8_t *livesync_outbuffer_pos; | 65 | static uint8_t *g_peerbuffer_highwater = g_peerbuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_PEERS - LIVESYNC_OUTGOING_WATERMARK_PEERS; | 
| 45 | static uint8_t *livesync_outbuffer_highwater = livesync_outbuffer_start + LIVESYNC_BUFFSIZE - LIVESYNC_BUFFWATER; | 66 | |
| 46 | static ot_time livesync_lastpacket_time; | 67 | static ot_time g_next_packet_time; | 
| 68 | |||
| 69 | #ifdef WANT_SYNC_SCRAPE | ||
| 70 | /* Live sync scrape buffers, states and timers */ | ||
| 71 | static ot_time g_next_beacon_time; | ||
| 72 | static ot_time g_next_inquire_time; | ||
| 73 | |||
| 74 | static uint8_t g_scrapebuffer_start[LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE]; | ||
| 75 | static uint8_t *g_scrapebuffer_pos; | ||
| 76 | static uint8_t *g_scrapebuffer_highwater = g_scrapebuffer_start + LIVESYNC_OUTGOING_BUFFSIZE_SCRAPE - LIVESYNC_OUTGOING_WATERMARK_SCRAPE; | ||
| 77 | |||
| 78 | static size_t g_inquire_remote_count; | ||
| 79 | static uint32_t g_inquire_remote_host; | ||
| 80 | static int g_inquire_inprogress; | ||
| 81 | static int g_inquire_bucket; | ||
| 82 | #endif /* WANT_SYNC_SCRAPE */ | ||
| 47 | 83 | ||
| 48 | static pthread_t thread_id; | 84 | static pthread_t thread_id; | 
| 49 | void livesync_init( ) { | 85 | void livesync_init( ) { | 
| 50 | if( g_livesync_socket_in == -1 ) | 86 | if( g_socket_in == -1 ) | 
| 51 | exerr( "No socket address for live sync specified." ); | 87 | exerr( "No socket address for live sync specified." ); | 
| 52 | livesync_outbuffer_pos = livesync_outbuffer_start; | 88 | |
| 53 | memmove( livesync_outbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); | 89 | /* Prepare outgoing peers buffer */ | 
| 54 | livesync_outbuffer_pos += sizeof( g_tracker_id ); | 90 | g_peerbuffer_pos = g_peerbuffer_start; | 
| 55 | livesync_lastpacket_time = g_now_seconds; | 91 | memmove( g_peerbuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); | 
| 92 | uint32_pack_big( (char*)g_peerbuffer_pos + sizeof( g_tracker_id ), OT_SYNC_PEER); | ||
| 93 | g_peerbuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t); | ||
| 94 | |||
| 95 | #ifdef WANT_SYNC_SCRAPE | ||
| 96 | /* Prepare outgoing scrape buffer */ | ||
| 97 | g_scrapebuffer_pos = g_scrapebuffer_start; | ||
| 98 | memmove( g_scrapebuffer_pos, &g_tracker_id, sizeof( g_tracker_id ) ); | ||
| 99 | uint32_pack_big( (char*)g_scrapebuffer_pos + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_TELL); | ||
| 100 | g_scrapebuffer_pos += sizeof( g_tracker_id ) + sizeof( uint32_t); | ||
| 101 | |||
| 102 | /* Wind up timers for inquires */ | ||
| 103 | g_next_beacon_time = g_now_seconds + LIVESYNC_FIRST_BEACON_DELAY; | ||
| 104 | #endif /* WANT_SYNC_SCRAPE */ | ||
| 105 | g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; | ||
| 56 | 106 | ||
| 57 | pthread_create( &thread_id, NULL, livesync_worker, NULL ); | 107 | pthread_create( &thread_id, NULL, livesync_worker, NULL ); | 
| 58 | } | 108 | } | 
| 59 | 109 | ||
| 60 | void livesync_deinit() { | 110 | void livesync_deinit() { | 
| 61 | if( g_livesync_socket_in != -1 ) | 111 | if( g_socket_in != -1 ) | 
| 62 | close( g_livesync_socket_in ); | 112 | close( g_socket_in ); | 
| 63 | if( g_livesync_socket_out != -1 ) | 113 | if( g_socket_out != -1 ) | 
| 64 | close( g_livesync_socket_out ); | 114 | close( g_socket_out ); | 
| 65 | 115 | ||
| 66 | pthread_cancel( thread_id ); | 116 | pthread_cancel( thread_id ); | 
| 67 | } | 117 | } | 
| @@ -69,104 +119,292 @@ void livesync_deinit() { | |||
| 69 | void livesync_bind_mcast( char *ip, uint16_t port) { | 119 | void livesync_bind_mcast( char *ip, uint16_t port) { | 
| 70 | char tmpip[4] = {0,0,0,0}; | 120 | char tmpip[4] = {0,0,0,0}; | 
| 71 | 121 | ||
| 72 | if( g_livesync_socket_in != -1 ) | 122 | if( g_socket_in != -1 ) | 
| 73 | exerr("Error: Livesync listen ip specified twice."); | 123 | exerr("Error: Livesync listen ip specified twice."); | 
| 74 | 124 | ||
| 75 | if( ( g_livesync_socket_in = socket_udp4( )) < 0) | 125 | if( ( g_socket_in = socket_udp4( )) < 0) | 
| 76 | exerr("Error: Cant create live sync incoming socket." ); | 126 | exerr("Error: Cant create live sync incoming socket." ); | 
| 77 | ndelay_off(g_livesync_socket_in); | 127 | ndelay_off(g_socket_in); | 
| 78 | 128 | ||
| 79 | if( socket_bind4_reuse( g_livesync_socket_in, tmpip, port ) == -1 ) | 129 | if( socket_bind4_reuse( g_socket_in, tmpip, port ) == -1 ) | 
| 80 | exerr("Error: Cant bind live sync incoming socket." ); | 130 | exerr("Error: Cant bind live sync incoming socket." ); | 
| 81 | 131 | ||
| 82 | if( socket_mcjoin4( g_livesync_socket_in, groupip_1, ip ) ) | 132 | if( socket_mcjoin4( g_socket_in, groupip_1, ip ) ) | 
| 83 | exerr("Error: Cant make live sync incoming socket join mcast group."); | 133 | exerr("Error: Cant make live sync incoming socket join mcast group."); | 
| 84 | 134 | ||
| 85 | if( ( g_livesync_socket_out = socket_udp4()) < 0) | 135 | if( ( g_socket_out = socket_udp4()) < 0) | 
| 86 | exerr("Error: Cant create live sync outgoing socket." ); | 136 | exerr("Error: Cant create live sync outgoing socket." ); | 
| 87 | if( socket_bind4_reuse( g_livesync_socket_out, ip, port ) == -1 ) | 137 | if( socket_bind4_reuse( g_socket_out, ip, port ) == -1 ) | 
| 88 | exerr("Error: Cant bind live sync outgoing socket." ); | 138 | exerr("Error: Cant bind live sync outgoing socket." ); | 
| 89 | 139 | ||
| 90 | socket_mcttl4(g_livesync_socket_out, 1); | 140 | socket_mcttl4(g_socket_out, 1); | 
| 91 | socket_mcloop4(g_livesync_socket_out, 0); | 141 | socket_mcloop4(g_socket_out, 0); | 
| 92 | } | 142 | } | 
| 93 | 143 | ||
| 94 | static void livesync_issuepacket( ) { | 144 | static void livesync_issue_peersync( ) { | 
| 95 | socket_send4(g_livesync_socket_out, (char*)livesync_outbuffer_start, livesync_outbuffer_pos - livesync_outbuffer_start, | 145 | socket_send4(g_socket_out, (char*)g_peerbuffer_start, g_peerbuffer_pos - g_peerbuffer_start, | 
| 96 | groupip_1, LIVESYNC_PORT); | 146 | groupip_1, LIVESYNC_PORT); | 
| 97 | livesync_outbuffer_pos = livesync_outbuffer_start + sizeof( g_tracker_id ); | 147 | g_peerbuffer_pos = g_peerbuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t ); | 
| 98 | livesync_lastpacket_time = g_now_seconds; | 148 | g_next_packet_time = g_now_seconds + LIVESYNC_MAXDELAY; | 
| 99 | } | 149 | } | 
| 100 | 150 | ||
| 101 | /* Inform live sync about whats going on. */ | 151 | static void livesync_handle_peersync( ssize_t datalen ) { | 
| 102 | void livesync_tell( ot_hash * const info_hash, const ot_peer * const peer ) { | 152 | int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); | 
| 103 | int i; | 153 | |
| 104 | for(i=0;i<20;i+=4) WRITE32(livesync_outbuffer_pos,i,READ32(info_hash,i)); | 154 | /* Now basic sanity checks have been done on the live sync packet | 
| 105 | WRITE32(livesync_outbuffer_pos,20,READ32(peer,0)); | 155 | We might add more testing and logging. */ | 
| 106 | WRITE32(livesync_outbuffer_pos,24,READ32(peer,4)); | 156 | while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) { | 
| 107 | livesync_outbuffer_pos += 28; | 157 | ot_peer *peer = (ot_peer*)(g_inbuffer + off + sizeof(ot_hash)); | 
| 108 | 158 | ot_hash *hash = (ot_hash*)(g_inbuffer + off); | |
| 109 | if( livesync_outbuffer_pos >= livesync_outbuffer_highwater ) | 159 | |
| 110 | livesync_issuepacket(); | 160 | if( !g_opentracker_running ) return; | 
| 161 | |||
| 162 | if( OT_PEERFLAG(peer) & PEER_FLAG_STOPPED ) | ||
| 163 | remove_peer_from_torrent(hash, peer, NULL, FLAG_MCA ); | ||
| 164 | else | ||
| 165 | add_peer_to_torrent( hash, peer, FLAG_MCA ); | ||
| 166 | |||
| 167 | off += sizeof( ot_hash ) + sizeof( ot_peer ); | ||
| 168 | } | ||
| 169 | |||
| 170 | stats_issue_event(EVENT_SYNC, 0, datalen / ((ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ))); | ||
| 111 | } | 171 | } | 
| 112 | 172 | ||
| 173 | #ifdef WANT_SYNC_SCRAPE | ||
| 174 | void livesync_issue_beacon( ) { | ||
| 175 | size_t torrent_count = mutex_get_torrent_count(); | ||
| 176 | uint8_t beacon[ sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof( uint64_t ) ]; | ||
| 177 | |||
| 178 | memmove( beacon, &g_tracker_id, sizeof( g_tracker_id ) ); | ||
| 179 | uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_BEACON); | ||
| 180 | uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ) + sizeof(uint32_t), (uint32_t)((uint64_t)(torrent_count)>>32) ); | ||
| 181 | uint32_pack_big( (char*)beacon + sizeof( g_tracker_id ) + 2 * sizeof(uint32_t), (uint32_t)torrent_count ); | ||
| 182 | |||
| 183 | socket_send4(g_socket_out, (char*)beacon, sizeof(beacon), groupip_1, LIVESYNC_PORT); | ||
| 184 | } | ||
| 185 | |||
| 186 | void livesync_handle_beacon( ssize_t datalen ) { | ||
| 187 | size_t torrent_count_local, torrent_count_remote; | ||
| 188 | if( datalen != sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof( uint64_t ) ) | ||
| 189 | return; | ||
| 190 | torrent_count_local = mutex_get_torrent_count(); | ||
| 191 | torrent_count_remote = (size_t)(((uint64_t)uint32_read_big((char*)g_inbuffer+sizeof( g_tracker_id ) + sizeof(uint32_t))) << 32); | ||
| 192 | torrent_count_remote |= (size_t)uint32_read_big((char*)g_inbuffer+sizeof( g_tracker_id ) + 2 * sizeof(uint32_t)); | ||
| 193 | |||
| 194 | /* Empty tracker is useless */ | ||
| 195 | if( !torrent_count_remote ) return; | ||
| 196 | |||
| 197 | if( ((double)torrent_count_local ) / ((double)torrent_count_remote) < LIVESYNC_INQUIRE_THRESH) { | ||
| 198 | if( !g_next_inquire_time ) { | ||
| 199 | g_next_inquire_time = g_now_seconds + 2 * LIVESYNC_BEACON_INTERVAL; | ||
| 200 | g_inquire_remote_count = 0; | ||
| 201 | } | ||
| 202 | |||
| 203 | if( torrent_count_remote > g_inquire_remote_count ) { | ||
| 204 | g_inquire_remote_count = torrent_count_remote; | ||
| 205 | memmove( &g_inquire_remote_host, g_inbuffer, sizeof( g_tracker_id ) ); | ||
| 206 | } | ||
| 207 | } | ||
| 208 | } | ||
| 209 | |||
| 210 | void livesync_issue_inquire( ) { | ||
| 211 | uint8_t inquire[ sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof(g_tracker_id)]; | ||
| 212 | |||
| 213 | memmove( inquire, &g_tracker_id, sizeof( g_tracker_id ) ); | ||
| 214 | uint32_pack_big( (char*)inquire + sizeof( g_tracker_id ), OT_SYNC_SCRAPE_INQUIRE); | ||
| 215 | memmove( inquire + sizeof(g_tracker_id) + sizeof(uint32_t), &g_inquire_remote_host, sizeof( g_tracker_id ) ); | ||
| 216 | |||
| 217 | socket_send4(g_socket_out, (char*)inquire, sizeof(inquire), groupip_1, LIVESYNC_PORT); | ||
| 218 | } | ||
| 219 | |||
| 220 | void livesync_handle_inquire( ssize_t datalen ) { | ||
| 221 | if( datalen != sizeof(g_tracker_id) + sizeof(uint32_t) + sizeof(g_tracker_id) ) | ||
| 222 | return; | ||
| 223 | |||
| 224 | /* If it isn't us, they're inquiring, ignore inquiry */ | ||
| 225 | if( memcmp( &g_tracker_id, g_inbuffer, sizeof( g_tracker_id ) ) ) | ||
| 226 | return; | ||
| 227 | |||
| 228 | /* Start scrape tell on next ticker */ | ||
| 229 | if( !g_inquire_inprogress ) { | ||
| 230 | g_inquire_inprogress = 1; | ||
| 231 | g_inquire_bucket = 0; | ||
| 232 | } | ||
| 233 | } | ||
| 234 | |||
| 235 | void livesync_issue_tell( ) { | ||
| 236 | int packets_to_send = LIVESYNC_OUTGOING_MAXPACKETS_SCRAPE; | ||
| 237 | while( packets_to_send > 0 && g_inquire_bucket < OT_BUCKET_COUNT ) { | ||
| 238 | ot_vector *torrents_list = mutex_bucket_lock( g_inquire_bucket ); | ||
| 239 | unsigned int j; | ||
| 240 | for( j=0; j<torrents_list->size; ++j ) { | ||
| 241 | ot_torrent *torrent = (ot_torrent*)(torrents_list->data) + j; | ||
| 242 | memmove(g_scrapebuffer_pos, torrent->hash, sizeof(ot_hash)); | ||
| 243 | g_scrapebuffer_pos += sizeof(ot_hash); | ||
| 244 | uint32_pack_big( (char*)g_scrapebuffer_pos , (uint32_t)(g_now_minutes - torrent->peer_list->base )); | ||
| 245 | uint32_pack_big( (char*)g_scrapebuffer_pos + 4, (uint32_t)((uint64_t)(torrent->peer_list->down_count)>>32) ); | ||
| 246 | uint32_pack_big( (char*)g_scrapebuffer_pos + 8, (uint32_t)torrent->peer_list->down_count ); | ||
| 247 | g_scrapebuffer_pos += 12; | ||
| 248 | |||
| 249 | if( g_scrapebuffer_pos >= g_scrapebuffer_highwater ) { | ||
| 250 | socket_send4(g_socket_out, (char*)g_scrapebuffer_start, g_scrapebuffer_pos - g_scrapebuffer_start, groupip_1, LIVESYNC_PORT); | ||
| 251 | g_scrapebuffer_pos = g_scrapebuffer_start + sizeof( g_tracker_id ) + sizeof( uint32_t); | ||
| 252 | --packets_to_send; | ||
| 253 | } | ||
| 254 | } | ||
| 255 | mutex_bucket_unlock( g_inquire_bucket++, 0 ); | ||
| 256 | if( !g_opentracker_running ) | ||
| 257 | return; | ||
| 258 | } | ||
| 259 | if( g_inquire_bucket == OT_BUCKET_COUNT ) { | ||
| 260 | socket_send4(g_socket_out, (char*)g_scrapebuffer_start, g_scrapebuffer_pos - g_scrapebuffer_start, groupip_1, LIVESYNC_PORT); | ||
| 261 | g_inquire_inprogress = 0; | ||
| 262 | } | ||
| 263 | } | ||
| 264 | |||
| 265 | void livesync_handle_tell( ssize_t datalen ) { | ||
| 266 | int off = sizeof( g_tracker_id ) + sizeof( uint32_t ); | ||
| 267 | |||
| 268 | /* Some instance is in progress of telling. Our inquiry was successful. | ||
| 269 | Don't ask again until we see next beacon. */ | ||
| 270 | g_next_inquire_time = 0; | ||
| 271 | |||
| 272 | /* Don't cause any new inquiries during another tracker's tell */ | ||
| 273 | if( g_next_beacon_time - g_now_seconds < LIVESYNC_BEACON_INTERVAL ) | ||
| 274 | g_next_beacon_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL; | ||
| 275 | |||
| 276 | while( off + sizeof(ot_hash) + 12 <= (size_t)datalen ) { | ||
| 277 | ot_hash *hash = (ot_hash*)(g_inbuffer+off); | ||
| 278 | ot_vector *torrents_list = mutex_bucket_lock_by_hash(hash); | ||
| 279 | size_t down_count_remote; | ||
| 280 | int exactmatch; | ||
| 281 | ot_torrent * torrent = vector_find_or_insert(torrents_list, hash, sizeof(ot_hash), OT_HASH_COMPARE_SIZE, &exactmatch); | ||
| 282 | if( !torrent ) { | ||
| 283 | mutex_bucket_unlock_by_hash( hash, 0 ); | ||
| 284 | continue; | ||
| 285 | } | ||
| 286 | |||
| 287 | if( !exactmatch ) { | ||
| 288 | /* Create a new torrent entry, then */ | ||
| 289 | int i; for(i=0;i<20;i+=4) WRITE32(&torrent->hash,i,READ32(hash,i)); | ||
| 290 | |||
| 291 | if( !( torrent->peer_list = malloc( sizeof (ot_peerlist) ) ) ) { | ||
| 292 | vector_remove_torrent( torrents_list, torrent ); | ||
| 293 | mutex_bucket_unlock_by_hash( hash, 0 ); | ||
| 294 | continue; | ||
| 295 | } | ||
| 296 | |||
| 297 | byte_zero( torrent->peer_list, sizeof( ot_peerlist ) ); | ||
| 298 | torrent->peer_list->base = g_now_minutes - uint32_read_big((char*)g_inbuffer+off+sizeof(ot_hash)); | ||
| 299 | } | ||
| 300 | |||
| 301 | down_count_remote = (size_t)(((uint64_t)uint32_read_big((char*)g_inbuffer+off+sizeof( ot_hash ) + sizeof(uint32_t))) << 32); | ||
| 302 | down_count_remote |= (size_t) uint32_read_big((char*)g_inbuffer+off+sizeof( ot_hash ) + 2 * sizeof(uint32_t)); | ||
| 303 | |||
| 304 | if( down_count_remote > torrent->peer_list->down_count ) | ||
| 305 | torrent->peer_list->down_count = down_count_remote; | ||
| 306 | /* else | ||
| 307 | We might think of sending a tell packet, if we have a much larger downloaded count | ||
| 308 | */ | ||
| 309 | |||
| 310 | mutex_bucket_unlock( g_inquire_bucket++, exactmatch?0:1 ); | ||
| 311 | if( !g_opentracker_running ) | ||
| 312 | return; | ||
| 313 | off += sizeof(ot_hash) + 12; | ||
| 314 | } | ||
| 315 | } | ||
| 316 | #endif /* WANT_SYNC_SCRAPE */ | ||
| 317 | |||
| 113 | /* Tickle the live sync module from time to time, so no events get | 318 | /* Tickle the live sync module from time to time, so no events get | 
| 114 | stuck when there's not enough traffic to fill udp packets fast | 319 | stuck when there's not enough traffic to fill udp packets fast | 
| 115 | enough */ | 320 | enough */ | 
| 116 | void livesync_ticker( ) { | 321 | void livesync_ticker( ) { | 
| 117 | if( ( g_now_seconds - livesync_lastpacket_time > LIVESYNC_MAXDELAY) && | 322 | |
| 118 | ( livesync_outbuffer_pos > livesync_outbuffer_start + sizeof( g_tracker_id ) ) ) | 323 | /* livesync_issue_peersync sets g_next_packet_time */ | 
| 119 | livesync_issuepacket(); | 324 | if( g_now_seconds > g_next_packet_time && | 
| 325 | g_peerbuffer_pos > g_peerbuffer_start + sizeof( g_tracker_id ) ) | ||
| 326 | livesync_issue_peersync(); | ||
| 327 | |||
| 328 | #ifdef WANT_SYNC_SCRAPE | ||
| 329 | /* Send first beacon after running at least LIVESYNC_FIRST_BEACON_DELAY | ||
| 330 | seconds and not more often than every LIVESYNC_BEACON_INTERVAL seconds */ | ||
| 331 | if( g_now_seconds > g_next_beacon_time ) { | ||
| 332 | livesync_issue_beacon( ); | ||
| 333 | g_next_beacon_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL; | ||
| 334 | } | ||
| 335 | |||
| 336 | /* If we're interested in an inquiry and waited long enough to see all | ||
| 337 | tracker's beacons, go ahead and inquire */ | ||
| 338 | if( g_next_inquire_time && g_now_seconds > g_next_inquire_time ) { | ||
| 339 | livesync_issue_inquire(); | ||
| 340 | |||
| 341 | /* If packet gets lost, ask again after LIVESYNC_BEACON_INTERVAL */ | ||
| 342 | g_next_inquire_time = g_now_seconds + LIVESYNC_BEACON_INTERVAL; | ||
| 343 | } | ||
| 344 | |||
| 345 | /* If we're in process of telling, let's tell. */ | ||
| 346 | if( g_inquire_inprogress ) | ||
| 347 | livesync_issue_tell( ); | ||
| 348 | |||
| 349 | #endif /* WANT_SYNC_SCRAPE */ | ||
| 350 | } | ||
| 351 | |||
| 352 | /* Inform live sync about whats going on. */ | ||
| 353 | void livesync_tell( ot_hash * const info_hash, const ot_peer * const peer ) { | ||
| 354 | unsigned int i; | ||
| 355 | for(i=0;i<sizeof(ot_hash)/4;i+=4) WRITE32(g_peerbuffer_pos,i,READ32(info_hash,i)); | ||
| 356 | |||
| 357 | WRITE32(g_peerbuffer_pos,sizeof(ot_hash) ,READ32(peer,0)); | ||
| 358 | WRITE32(g_peerbuffer_pos,sizeof(ot_hash)+4,READ32(peer,4)); | ||
| 359 | |||
| 360 | g_peerbuffer_pos += sizeof(ot_hash)+8; | ||
| 361 | |||
| 362 | if( g_peerbuffer_pos >= g_peerbuffer_highwater ) | ||
| 363 | livesync_issue_peersync(); | ||
| 120 | } | 364 | } | 
| 121 | 365 | ||
| 122 | static void * livesync_worker( void * args ) { | 366 | static void * livesync_worker( void * args ) { | 
| 123 | uint8_t in_ip[4]; uint16_t in_port; | 367 | uint8_t in_ip[4]; uint16_t in_port; | 
| 124 | ssize_t datalen; | 368 | ssize_t datalen; | 
| 125 | int off; | ||
| 126 | 369 | ||
| 127 | args = args; | 370 | (void)args; | 
| 128 | 371 | ||
| 129 | while( 1 ) { | 372 | while( 1 ) { | 
| 130 | datalen = socket_recv4(g_livesync_socket_in, (char*)livesync_inbuffer, LIVESYNC_BUFFINSIZE, (char*)in_ip, &in_port); | 373 | datalen = socket_recv4(g_socket_in, (char*)g_inbuffer, LIVESYNC_INCOMING_BUFFSIZE, (char*)in_ip, &in_port); | 
| 131 | off = 4; | ||
| 132 | 374 | ||
| 133 | if( datalen <= 0 ) | 375 | /* Expect at least tracker id and packet type */ | 
| 376 | if( datalen <= (ssize_t)(sizeof( g_tracker_id ) + sizeof( uint32_t )) ) | ||
| 134 | continue; | 377 | continue; | 
| 135 | 378 | if( !accesslist_isblessed((char*)in_ip, OT_PERMISSION_MAY_LIVESYNC)) | |
| 136 | if( datalen < (ssize_t)(sizeof( g_tracker_id ) + sizeof( ot_hash ) + sizeof( ot_peer ) ) ) { | ||
| 137 | /* TODO: log invalid sync packet */ | ||
| 138 | continue; | 379 | continue; | 
| 139 | } | 380 | if( !memcmp( g_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) { | 
| 140 | |||
| 141 | if( !accesslist_isblessed((char*)in_ip, OT_PERMISSION_MAY_LIVESYNC)) { | ||
| 142 | /* TODO: log invalid sync packet */ | ||
| 143 | continue; | ||
| 144 | } | ||
| 145 | |||
| 146 | if( !memcmp( livesync_inbuffer, &g_tracker_id, sizeof( g_tracker_id ) ) ) { | ||
| 147 | /* TODO: log packet coming from ourselves */ | 381 | /* TODO: log packet coming from ourselves */ | 
| 148 | continue; | 382 | continue; | 
| 149 | } | 383 | } | 
| 150 | 384 | ||
| 151 | /* Now basic sanity checks have been done on the live sync packet | 385 | switch( uint32_read_big( (char*)g_inbuffer ) ) { | 
| 152 | We might add more testing and logging. */ | 386 | case OT_SYNC_PEER: | 
| 153 | while( off + (ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ) <= datalen ) { | 387 | livesync_handle_peersync( datalen ); | 
| 154 | ot_peer *peer = (ot_peer*)(livesync_inbuffer + off + sizeof(ot_hash)); | 388 | break; | 
| 155 | ot_hash *hash = (ot_hash*)(livesync_inbuffer + off); | 389 | #ifdef WANT_SYNC_SCRAPE | 
| 156 | 390 | case OT_SYNC_SCRAPE_BEACON: | |
| 157 | if( !g_opentracker_running ) | 391 | livesync_handle_beacon( datalen ); | 
| 158 | return NULL; | 392 | break; | 
| 159 | 393 | case OT_SYNC_SCRAPE_INQUIRE: | |
| 160 | if( OT_PEERFLAG(peer) & PEER_FLAG_STOPPED ) | 394 | livesync_handle_inquire( datalen ); | 
| 161 | remove_peer_from_torrent(hash, peer, NULL, FLAG_MCA); | 395 | break; | 
| 162 | else | 396 | case OT_SYNC_SCRAPE_TELL: | 
| 163 | add_peer_to_torrent( hash, peer WANT_SYNC_PARAM(1)); | 397 | livesync_handle_tell( datalen ); | 
| 164 | 398 | break; | |
| 165 | off += sizeof( ot_hash ) + sizeof( ot_peer ); | 399 | #endif /* WANT_SYNC_SCRAPE */ | 
| 400 | default: | ||
| 401 | break; | ||
| 166 | } | 402 | } | 
| 167 | 403 | ||
| 168 | stats_issue_event(EVENT_SYNC, 0, datalen / ((ssize_t)sizeof( ot_hash ) + (ssize_t)sizeof( ot_peer ))); | 404 | /* Handle outstanding requests */ | 
| 405 | livesync_ticker( ); | ||
| 169 | } | 406 | } | 
| 407 | |||
| 170 | /* Never returns. */ | 408 | /* Never returns. */ | 
| 171 | return NULL; | 409 | return NULL; | 
| 172 | } | 410 | } | 
