diff --git a/src/cache.c b/src/cache.c index 0c86ae8..13d060d 100644 --- a/src/cache.c +++ b/src/cache.c @@ -354,9 +354,22 @@ static long Data_read(Cache *cf, uint8_t *buf, off_t len, off_t offset) return -EINVAL; } + #ifdef CACHE_LOCK_DEBUG + fprintf(stderr, "Data_read(): thread %lu: locking seek_lock;\n", + pthread_self()); + #endif + PTHREAD_MUTEX_LOCK(&cf->seek_lock); + if (fseeko(cf->dfp, offset, SEEK_SET)) { /* fseeko failed */ fprintf(stderr, "Data_read(): fseeko(): %s\n", strerror(errno)); + + #ifdef CACHE_LOCK_DEBUG + fprintf(stderr, "Data_read(): thread %lu: unlocking seek_lock;\n", + pthread_self()); + #endif + PTHREAD_MUTEX_UNLOCK(&cf->seek_lock); + return -EIO; } @@ -377,6 +390,12 @@ static long Data_read(Cache *cf, uint8_t *buf, off_t len, off_t offset) } } + #ifdef CACHE_LOCK_DEBUG + fprintf(stderr, "Data_read(): thread %lu: unlocking seek_lock;\n", + pthread_self()); + #endif + PTHREAD_MUTEX_UNLOCK(&cf->seek_lock); + return byte_read; } @@ -399,10 +418,21 @@ static long Data_write(Cache *cf, const uint8_t *buf, off_t len, return -EINVAL; } + #ifdef CACHE_LOCK_DEBUG + fprintf(stderr, "Data_write(): thread %lu: locking seek_lock;\n", + pthread_self()); + #endif + PTHREAD_MUTEX_LOCK(&cf->seek_lock); if (fseeko(cf->dfp, offset, SEEK_SET)) { /* fseeko failed */ fprintf(stderr, "Data_write(): fseeko(): %s\n", strerror(errno)); + + #ifdef CACHE_LOCK_DEBUG + fprintf(stderr, "Data_write(): thread %lu: unlocking seek_lock;\n", + pthread_self()); + #endif + PTHREAD_MUTEX_UNLOCK(&cf->seek_lock); return -EIO; } @@ -417,7 +447,11 @@ static long Data_write(Cache *cf, const uint8_t *buf, off_t len, "Data_write(): fwrite(): encountered error!\n"); } } - + #ifdef CACHE_LOCK_DEBUG + fprintf(stderr, "Data_write(): thread %lu: unlocking seek_lock;\n", + pthread_self()); + #endif + PTHREAD_MUTEX_UNLOCK(&cf->seek_lock); return byte_written; } @@ -450,18 +484,12 @@ static Cache *Cache_alloc() exit(EXIT_FAILURE); } - if (pthread_mutexattr_init(&cf->rw_lock_attr)) { - fprintf(stderr, - "Cache_alloc(): rw_lock_attr initialisation failed!\n"); + if (pthread_mutex_init(&cf->seek_lock, NULL)) { + fprintf(stderr, "Cache_alloc(): seek_lock initialisation failed!\n"); } - if (pthread_mutexattr_setpshared(&cf->rw_lock_attr, - PTHREAD_PROCESS_SHARED)) { - fprintf(stderr, "Cache_alloc(): could not set rw_lock_attr!\n"); - } - - if (pthread_mutex_init(&cf->rw_lock, &cf->rw_lock_attr)) { - fprintf(stderr, "Cache_alloc(): rw_lock initialisation failed!\n"); + if (pthread_mutex_init(&cf->w_lock, NULL)) { + fprintf(stderr, "Cache_alloc(): w_lock initialisation failed!\n"); } if (pthread_mutexattr_init(&cf->bgt_lock_attr)) { @@ -486,12 +514,12 @@ static Cache *Cache_alloc() */ static void Cache_free(Cache *cf) { - if (pthread_mutex_destroy(&cf->rw_lock)) { - fprintf(stderr, "Cache_free(): could not destroy rw_lock!\n"); + if (pthread_mutex_destroy(&cf->seek_lock)) { + fprintf(stderr, "Cache_free(): could not destroy seek_lock!\n"); } - if (pthread_mutexattr_destroy(&cf->rw_lock_attr)) { - fprintf(stderr, "Cache_alloc(): could not destroy rw_lock_attr!\n"); + if (pthread_mutex_destroy(&cf->w_lock)) { + fprintf(stderr, "Cache_free(): could not destroy w_lock!\n"); } if (pthread_mutex_destroy(&cf->bgt_lock)) { @@ -720,26 +748,26 @@ Cache *Cache_open(const char *fn) /*---------------- Cache_open() critical section -----------------*/ -#ifdef CACHE_LOCK_DEBUG + #ifdef CACHE_LOCK_DEBUG fprintf(stderr, "Cache_open(): thread %lu: locking cf_lock;\n", pthread_self()); -#endif + #endif PTHREAD_MUTEX_LOCK(&cf_lock); if (link->cache_opened) { link->cache_opened++; -#ifdef CACHE_LOCK_DEBUG + #ifdef CACHE_LOCK_DEBUG fprintf(stderr, "Cache_open(): thread %lu: unlocking cf_lock;\n", pthread_self()); -#endif + #endif PTHREAD_MUTEX_UNLOCK(&cf_lock); return link->cache_ptr; } -#ifdef CACHE_LOCK_DEBUG + #ifdef CACHE_LOCK_DEBUG fprintf(stderr, "Cache_open(): thread %lu: unlocking cf_lock;\n", pthread_self()); -#endif + #endif PTHREAD_MUTEX_UNLOCK(&cf_lock); /*----------------------------------------------------------------*/ @@ -804,27 +832,27 @@ void Cache_close(Cache *cf) { /*--------------- Cache_close() critical section -----------------*/ -#ifdef CACHE_LOCK_DEBUG + #ifdef CACHE_LOCK_DEBUG fprintf(stderr, "Cache_close(): thread %lu: locking cf_lock;\n", pthread_self()); -#endif + #endif PTHREAD_MUTEX_LOCK(&cf_lock); cf->link->cache_opened--; if (cf->link->cache_opened > 0) { -#ifdef CACHE_LOCK_DEBUG + #ifdef CACHE_LOCK_DEBUG fprintf(stderr, "Cache_close(): thread %lu: unlocking cf_lock;\n", pthread_self()); -#endif + #endif PTHREAD_MUTEX_UNLOCK(&cf_lock); return; } -#ifdef CACHE_LOCK_DEBUG + #ifdef CACHE_LOCK_DEBUG fprintf(stderr, "Cache_close(): thread %lu: unlocking cf_lock;\n", pthread_self()); -#endif + #endif PTHREAD_MUTEX_UNLOCK(&cf_lock); /*----------------------------------------------------------------*/ @@ -877,11 +905,11 @@ static void Seg_set(Cache *cf, off_t offset, int i) static void *Cache_bgdl(void *arg) { Cache *cf = (Cache *) arg; -#ifdef CACHE_LOCK_DEBUG - fprintf(stderr, "Cache_bgdl(): thread %lu: locking rw_lock;\n", + #ifdef CACHE_LOCK_DEBUG + fprintf(stderr, "Cache_bgdl(): thread %lu: locking w_lock;\n", pthread_self()); -#endif - PTHREAD_MUTEX_LOCK(&cf->rw_lock); + #endif + PTHREAD_MUTEX_LOCK(&cf->w_lock); uint8_t *recv_buf = calloc(cf->blksz, sizeof(uint8_t)); fprintf(stderr, "Cache_bgdl(): thread %lu:", pthread_self()); long recv = path_download(cf->path, (char *) recv_buf, cf->blksz, @@ -896,16 +924,16 @@ static void *Cache_bgdl(void *arg) "Cache_bgdl(): received %ld, possible network error.\n", recv); } free(recv_buf); -#ifdef CACHE_LOCK_DEBUG + #ifdef CACHE_LOCK_DEBUG fprintf(stderr, "Cache_bgdl(): thread %lu: unlocking bgt_lock;\n", pthread_self()); -#endif + #endif PTHREAD_MUTEX_UNLOCK(&cf->bgt_lock); -#ifdef CACHE_LOCK_DEBUG - fprintf(stderr, "Cache_bgdl(): thread %lu: unlocking rw_lock;\n", + #ifdef CACHE_LOCK_DEBUG + fprintf(stderr, "Cache_bgdl(): thread %lu: unlocking w_lock;\n", pthread_self()); -#endif - PTHREAD_MUTEX_UNLOCK(&cf->rw_lock); + #endif + PTHREAD_MUTEX_UNLOCK(&cf->w_lock); pthread_detach(pthread_self()); pthread_exit(NULL); } @@ -919,14 +947,6 @@ long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset) fprintf(stderr, "Cache_read(): thread %lu: %s, %s;\n", pthread_self(), cf->path, range_str); - /* SIGFPE prevention, although this shouldn't happen in the first place! */ - if (!cf->blksz) { - fprintf(stderr, - "Cache_read(): Warning: cf->blksz: %d, directly downloading", - cf->blksz); - return path_download(cf->path, output_buf, len, offset); - } - long send; off_t dl_offset = offset / cf->blksz * cf->blksz; @@ -936,30 +956,21 @@ long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset) goto bgdl; } else { -#ifdef CACHE_LOCK_DEBUG - fprintf(stderr, - "Cache_read(): thread %lu: locking and unlocking bgt_lock;\n", + #ifdef CACHE_LOCK_DEBUG + fprintf(stderr, "Cache_read(): thread %lu: locking w_lock;\n", pthread_self()); -#endif - /* Wait for the background download thread to finish */ - PTHREAD_MUTEX_LOCK(&cf->bgt_lock); - PTHREAD_MUTEX_UNLOCK(&cf->bgt_lock); - -#ifdef CACHE_LOCK_DEBUG - fprintf(stderr, "Cache_read(): thread %lu: locking rw_lock;\n", - pthread_self()); -#endif + #endif /* Wait for any other download thread to finish*/ - PTHREAD_MUTEX_LOCK(&cf->rw_lock); + PTHREAD_MUTEX_LOCK(&cf->w_lock); if (Seg_exist(cf, offset)) { /* The segment now exists - it was downloaded by another * download thread. Send it off and unlock the I/O */ send = Data_read(cf, (uint8_t *) output_buf, len, offset); -#ifdef CACHE_LOCK_DEBUG - fprintf(stderr, "Cache_read(): thread %lu: unlocking rw_lock;\n", + #ifdef CACHE_LOCK_DEBUG + fprintf(stderr, "Cache_read(): thread %lu: unlocking w_lock;\n", pthread_self()); -#endif - PTHREAD_MUTEX_UNLOCK(&cf->rw_lock); + #endif + PTHREAD_MUTEX_UNLOCK(&cf->w_lock); goto bgdl; } } @@ -991,11 +1002,11 @@ long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset) "Cache_read(): received %ld, possible network error.\n", recv); } free(recv_buf); -#ifdef CACHE_LOCK_DEBUG - fprintf(stderr, "Cache_read(): thread %lu: unlocking rw_lock;\n", + #ifdef CACHE_LOCK_DEBUG + fprintf(stderr, "Cache_read(): thread %lu: unlocking w_lock;\n", pthread_self()); -#endif - PTHREAD_MUTEX_UNLOCK(&cf->rw_lock); + #endif + PTHREAD_MUTEX_UNLOCK(&cf->w_lock); /* -----------Download the next segment in background -------------------*/ bgdl: @@ -1005,10 +1016,10 @@ long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset) cf->next_offset < cf->content_length ){ /* Stop the spawning of multiple background pthreads */ if(!pthread_mutex_trylock(&cf->bgt_lock)) { -#ifdef CACHE_LOCK_DEBUG + #ifdef CACHE_LOCK_DEBUG fprintf(stderr, "Cache_read(): thread %lu: trylocked bgt_lock;\n", pthread_self()); -#endif + #endif if (pthread_create(&cf->bgt, NULL, Cache_bgdl, cf)) { fprintf(stderr, "Cache_read(): Error creating background download thread\n" diff --git a/src/cache.h b/src/cache.h index 0e8f6a6..68860fe 100644 --- a/src/cache.h +++ b/src/cache.h @@ -27,25 +27,42 @@ typedef uint8_t Seg; * \brief cache data type in-memory data structure */ struct Cache { - char *path; /**< the path to the file on the web server */ - Link *link; /**< the Link associated with this cache data set */ - long time; /** 0) { @@ -365,18 +365,18 @@ void transfer_blocking(CURL *curl) void transfer_nonblocking(CURL *curl) { -#ifdef NETWORK_LOCK_DEBUG + #ifdef NETWORK_LOCK_DEBUG fprintf(stderr, "transfer_nonblocking(): thread %lu: locking transfer_lock;\n", pthread_self()); -#endif + #endif PTHREAD_MUTEX_LOCK(&transfer_lock); CURLMcode res = curl_multi_add_handle(curl_multi, curl); -#ifdef NETWORK_LOCK_DEBUG + #ifdef NETWORK_LOCK_DEBUG fprintf(stderr, "transfer_nonblocking(): thread %lu: unlocking transfer_lock;\n", pthread_self()); -#endif + #endif PTHREAD_MUTEX_UNLOCK(&transfer_lock); if(res > 0) {