Bugfix: partially fixed the cache lock

- now when the same file is opened twice, the fread() output is consistent.
This commit is contained in:
Fufu Fang 2019-09-01 11:39:47 +01:00
parent 20f30a0e38
commit ed5457c76f
No known key found for this signature in database
GPG Key ID: 0F6BB5EF6F8BB729
5 changed files with 145 additions and 110 deletions

View File

@ -354,9 +354,22 @@ static long Data_read(Cache *cf, uint8_t *buf, off_t len, off_t offset)
return -EINVAL;
}
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Data_read(): thread %lu: locking seek_lock;\n",
pthread_self());
#endif
PTHREAD_MUTEX_LOCK(&cf->seek_lock);
if (fseeko(cf->dfp, offset, SEEK_SET)) {
/* fseeko failed */
fprintf(stderr, "Data_read(): fseeko(): %s\n", strerror(errno));
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Data_read(): thread %lu: unlocking seek_lock;\n",
pthread_self());
#endif
PTHREAD_MUTEX_UNLOCK(&cf->seek_lock);
return -EIO;
}
@ -377,6 +390,12 @@ static long Data_read(Cache *cf, uint8_t *buf, off_t len, off_t offset)
}
}
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Data_read(): thread %lu: unlocking seek_lock;\n",
pthread_self());
#endif
PTHREAD_MUTEX_UNLOCK(&cf->seek_lock);
return byte_read;
}
@ -399,10 +418,21 @@ static long Data_write(Cache *cf, const uint8_t *buf, off_t len,
return -EINVAL;
}
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Data_write(): thread %lu: locking seek_lock;\n",
pthread_self());
#endif
PTHREAD_MUTEX_LOCK(&cf->seek_lock);
if (fseeko(cf->dfp, offset, SEEK_SET)) {
/* fseeko failed */
fprintf(stderr, "Data_write(): fseeko(): %s\n", strerror(errno));
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Data_write(): thread %lu: unlocking seek_lock;\n",
pthread_self());
#endif
PTHREAD_MUTEX_UNLOCK(&cf->seek_lock);
return -EIO;
}
@ -417,7 +447,11 @@ static long Data_write(Cache *cf, const uint8_t *buf, off_t len,
"Data_write(): fwrite(): encountered error!\n");
}
}
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Data_write(): thread %lu: unlocking seek_lock;\n",
pthread_self());
#endif
PTHREAD_MUTEX_UNLOCK(&cf->seek_lock);
return byte_written;
}
@ -450,18 +484,12 @@ static Cache *Cache_alloc()
exit(EXIT_FAILURE);
}
if (pthread_mutexattr_init(&cf->rw_lock_attr)) {
fprintf(stderr,
"Cache_alloc(): rw_lock_attr initialisation failed!\n");
if (pthread_mutex_init(&cf->seek_lock, NULL)) {
fprintf(stderr, "Cache_alloc(): seek_lock initialisation failed!\n");
}
if (pthread_mutexattr_setpshared(&cf->rw_lock_attr,
PTHREAD_PROCESS_SHARED)) {
fprintf(stderr, "Cache_alloc(): could not set rw_lock_attr!\n");
}
if (pthread_mutex_init(&cf->rw_lock, &cf->rw_lock_attr)) {
fprintf(stderr, "Cache_alloc(): rw_lock initialisation failed!\n");
if (pthread_mutex_init(&cf->w_lock, NULL)) {
fprintf(stderr, "Cache_alloc(): w_lock initialisation failed!\n");
}
if (pthread_mutexattr_init(&cf->bgt_lock_attr)) {
@ -486,12 +514,12 @@ static Cache *Cache_alloc()
*/
static void Cache_free(Cache *cf)
{
if (pthread_mutex_destroy(&cf->rw_lock)) {
fprintf(stderr, "Cache_free(): could not destroy rw_lock!\n");
if (pthread_mutex_destroy(&cf->seek_lock)) {
fprintf(stderr, "Cache_free(): could not destroy seek_lock!\n");
}
if (pthread_mutexattr_destroy(&cf->rw_lock_attr)) {
fprintf(stderr, "Cache_alloc(): could not destroy rw_lock_attr!\n");
if (pthread_mutex_destroy(&cf->w_lock)) {
fprintf(stderr, "Cache_free(): could not destroy w_lock!\n");
}
if (pthread_mutex_destroy(&cf->bgt_lock)) {
@ -720,26 +748,26 @@ Cache *Cache_open(const char *fn)
/*---------------- Cache_open() critical section -----------------*/
#ifdef CACHE_LOCK_DEBUG
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Cache_open(): thread %lu: locking cf_lock;\n",
pthread_self());
#endif
#endif
PTHREAD_MUTEX_LOCK(&cf_lock);
if (link->cache_opened) {
link->cache_opened++;
#ifdef CACHE_LOCK_DEBUG
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Cache_open(): thread %lu: unlocking cf_lock;\n",
pthread_self());
#endif
#endif
PTHREAD_MUTEX_UNLOCK(&cf_lock);
return link->cache_ptr;
}
#ifdef CACHE_LOCK_DEBUG
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Cache_open(): thread %lu: unlocking cf_lock;\n",
pthread_self());
#endif
#endif
PTHREAD_MUTEX_UNLOCK(&cf_lock);
/*----------------------------------------------------------------*/
@ -804,27 +832,27 @@ void Cache_close(Cache *cf)
{
/*--------------- Cache_close() critical section -----------------*/
#ifdef CACHE_LOCK_DEBUG
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Cache_close(): thread %lu: locking cf_lock;\n",
pthread_self());
#endif
#endif
PTHREAD_MUTEX_LOCK(&cf_lock);
cf->link->cache_opened--;
if (cf->link->cache_opened > 0) {
#ifdef CACHE_LOCK_DEBUG
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Cache_close(): thread %lu: unlocking cf_lock;\n",
pthread_self());
#endif
#endif
PTHREAD_MUTEX_UNLOCK(&cf_lock);
return;
}
#ifdef CACHE_LOCK_DEBUG
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Cache_close(): thread %lu: unlocking cf_lock;\n",
pthread_self());
#endif
#endif
PTHREAD_MUTEX_UNLOCK(&cf_lock);
/*----------------------------------------------------------------*/
@ -877,11 +905,11 @@ static void Seg_set(Cache *cf, off_t offset, int i)
static void *Cache_bgdl(void *arg)
{
Cache *cf = (Cache *) arg;
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Cache_bgdl(): thread %lu: locking rw_lock;\n",
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Cache_bgdl(): thread %lu: locking w_lock;\n",
pthread_self());
#endif
PTHREAD_MUTEX_LOCK(&cf->rw_lock);
#endif
PTHREAD_MUTEX_LOCK(&cf->w_lock);
uint8_t *recv_buf = calloc(cf->blksz, sizeof(uint8_t));
fprintf(stderr, "Cache_bgdl(): thread %lu:", pthread_self());
long recv = path_download(cf->path, (char *) recv_buf, cf->blksz,
@ -896,16 +924,16 @@ static void *Cache_bgdl(void *arg)
"Cache_bgdl(): received %ld, possible network error.\n", recv);
}
free(recv_buf);
#ifdef CACHE_LOCK_DEBUG
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Cache_bgdl(): thread %lu: unlocking bgt_lock;\n",
pthread_self());
#endif
#endif
PTHREAD_MUTEX_UNLOCK(&cf->bgt_lock);
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Cache_bgdl(): thread %lu: unlocking rw_lock;\n",
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Cache_bgdl(): thread %lu: unlocking w_lock;\n",
pthread_self());
#endif
PTHREAD_MUTEX_UNLOCK(&cf->rw_lock);
#endif
PTHREAD_MUTEX_UNLOCK(&cf->w_lock);
pthread_detach(pthread_self());
pthread_exit(NULL);
}
@ -919,14 +947,6 @@ long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset)
fprintf(stderr, "Cache_read(): thread %lu: %s, %s;\n", pthread_self(),
cf->path, range_str);
/* SIGFPE prevention, although this shouldn't happen in the first place! */
if (!cf->blksz) {
fprintf(stderr,
"Cache_read(): Warning: cf->blksz: %d, directly downloading",
cf->blksz);
return path_download(cf->path, output_buf, len, offset);
}
long send;
off_t dl_offset = offset / cf->blksz * cf->blksz;
@ -936,30 +956,21 @@ long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset)
goto bgdl;
} else {
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr,
"Cache_read(): thread %lu: locking and unlocking bgt_lock;\n",
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Cache_read(): thread %lu: locking w_lock;\n",
pthread_self());
#endif
/* Wait for the background download thread to finish */
PTHREAD_MUTEX_LOCK(&cf->bgt_lock);
PTHREAD_MUTEX_UNLOCK(&cf->bgt_lock);
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Cache_read(): thread %lu: locking rw_lock;\n",
pthread_self());
#endif
#endif
/* Wait for any other download thread to finish*/
PTHREAD_MUTEX_LOCK(&cf->rw_lock);
PTHREAD_MUTEX_LOCK(&cf->w_lock);
if (Seg_exist(cf, offset)) {
/* The segment now exists - it was downloaded by another
* download thread. Send it off and unlock the I/O */
send = Data_read(cf, (uint8_t *) output_buf, len, offset);
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Cache_read(): thread %lu: unlocking rw_lock;\n",
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Cache_read(): thread %lu: unlocking w_lock;\n",
pthread_self());
#endif
PTHREAD_MUTEX_UNLOCK(&cf->rw_lock);
#endif
PTHREAD_MUTEX_UNLOCK(&cf->w_lock);
goto bgdl;
}
}
@ -991,11 +1002,11 @@ long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset)
"Cache_read(): received %ld, possible network error.\n", recv);
}
free(recv_buf);
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Cache_read(): thread %lu: unlocking rw_lock;\n",
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Cache_read(): thread %lu: unlocking w_lock;\n",
pthread_self());
#endif
PTHREAD_MUTEX_UNLOCK(&cf->rw_lock);
#endif
PTHREAD_MUTEX_UNLOCK(&cf->w_lock);
/* -----------Download the next segment in background -------------------*/
bgdl:
@ -1005,10 +1016,10 @@ long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset)
cf->next_offset < cf->content_length ){
/* Stop the spawning of multiple background pthreads */
if(!pthread_mutex_trylock(&cf->bgt_lock)) {
#ifdef CACHE_LOCK_DEBUG
#ifdef CACHE_LOCK_DEBUG
fprintf(stderr, "Cache_read(): thread %lu: trylocked bgt_lock;\n",
pthread_self());
#endif
#endif
if (pthread_create(&cf->bgt, NULL, Cache_bgdl, cf)) {
fprintf(stderr,
"Cache_read(): Error creating background download thread\n"

View File

@ -27,25 +27,42 @@ typedef uint8_t Seg;
* \brief cache data type in-memory data structure
*/
struct Cache {
char *path; /**< the path to the file on the web server */
Link *link; /**< the Link associated with this cache data set */
long time; /**<the modified time of the file */
off_t content_length; /**<the size of the file */
/** \brief The FILE pointer for the data file*/
FILE *dfp;
/** \brief The FILE pointer for the metadata */
FILE *mfp;
/** \brief the path to the file on the web server */
char *path;
/** \brief the Link associated with this cache data set */
Link *link;
/** \brief the modified time of the file */
long time;
/** \brief the size of the file */
off_t content_length;
/** \brief the block size of the data file */
int blksz;
/** \brief segment array byte count */
long segbc;
/** \brief the detail of each segment */
Seg *seg;
pthread_t bgt; /**< background download pthread */
pthread_mutex_t bgt_lock; /**< mutex for the background download thread */
pthread_mutexattr_t bgt_lock_attr; /**< attributes for bgt_lock */
off_t next_offset; /**<the offset of the next segment to be
downloaded in background*/
/** \brief mutex lock for seek operation */
pthread_mutex_t seek_lock;
/** \brief mutex lock for write operation */
pthread_mutex_t w_lock;
pthread_mutex_t rw_lock; /**< mutex for read/write operation */
pthread_mutexattr_t rw_lock_attr; /**< attributes for rw_lock */
FILE *dfp; /**< The FILE pointer for the data file*/
FILE *mfp; /**< The FILE pointer for the metadata */
int blksz; /**<the block size of the data file */
long segbc; /**<segment array byte count */
Seg *seg; /**< the detail of each segment */
/** \brief background download pthread */
pthread_t bgt;
/**
* \brief mutex lock for the background download thread
* \note This lock is locked by the foreground thread, but unlocked by the
* background thread!
*/
pthread_mutex_t bgt_lock;
/** \brief mutex attributes for bgt_lock */
pthread_mutexattr_t bgt_lock_attr;
/** \brief the offset of the next segment to be downloaded in background*/
off_t next_offset;
};
/**

View File

@ -284,11 +284,11 @@ static void LinkTable_print(LinkTable *linktbl)
LinkTable *LinkTable_new(const char *url)
{
#ifdef LINK_LOCK_DEBUG
#ifdef LINK_LOCK_DEBUG
fprintf(stderr,
"LinkTable_new(): thread %lu: locking link_lock;\n",
pthread_self());
#endif
#endif
PTHREAD_MUTEX_LOCK(&link_lock);
LinkTable *linktbl = calloc(1, sizeof(LinkTable));
if (!linktbl) {
@ -583,11 +583,11 @@ long path_download(const char *path, char *output_buf, size_t size,
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&buf);
curl_easy_setopt(curl, CURLOPT_RANGE, range_str);
#ifdef LINK_LOCK_DEBUG
#ifdef LINK_LOCK_DEBUG
fprintf(stderr,
"path_download(): thread %lu: locking and unlocking link_lock;\n",
pthread_self());
#endif
#endif
PTHREAD_MUTEX_LOCK(&link_lock);
PTHREAD_MUTEX_UNLOCK(&link_lock);

View File

@ -33,15 +33,22 @@ typedef struct LinkTable LinkTable;
* \brief Link type data structure
*/
struct Link {
char linkname[MAX_FILENAME_LEN+1]; /**< The link name in the last level of
the URL */
char f_url[MAX_PATH_LEN+1]; /**< The full URL of the file */
LinkType type; /**< The type of the link */
size_t content_length; /**< CURLINFO_CONTENT_LENGTH_DOWNLOAD of the file */
LinkTable *next_table; /**< The next LinkTable level, if it is a LINK_DIR */
long time; /**< CURLINFO_FILETIME obtained from the server */
int cache_opened; /**< How many times associated cache has been opened */
Cache *cache_ptr; /**< The pointer associated with the cache file */
/** \brief The link name in the last level of the URL */
char linkname[MAX_FILENAME_LEN+1];
/** \brief The full URL of the file */
char f_url[MAX_PATH_LEN+1];
/** \brief The type of the link */
LinkType type;
/** \brief CURLINFO_CONTENT_LENGTH_DOWNLOAD of the file */
size_t content_length;
/** \brief The next LinkTable level, if it is a LINK_DIR */
LinkTable *next_table;
/** \brief CURLINFO_FILETIME obtained from the server */
long time;
/** \brief How many times associated cache has been opened */
int cache_opened;
/** \brief The pointer associated with the cache file */
Cache *cache_ptr;
};
struct LinkTable {

View File

@ -162,11 +162,11 @@ static void curl_process_msgs(CURLMsg *curl_msg, int n_running_curl,
*/
int curl_multi_perform_once()
{
#ifdef NETWORK_LOCK_DEBUG
#ifdef NETWORK_LOCK_DEBUG
fprintf(stderr,
"curl_multi_perform_once(): thread %lu: locking transfer_lock;\n",
pthread_self());
#endif
#endif
PTHREAD_MUTEX_LOCK(&transfer_lock);
/* Get curl multi interface to perform pending tasks */
int n_running_curl;
@ -224,11 +224,11 @@ int curl_multi_perform_once()
while((curl_msg = curl_multi_info_read(curl_multi, &n_mesgs))) {
curl_process_msgs(curl_msg, n_running_curl, n_mesgs);
}
#ifdef NETWORK_LOCK_DEBUG
#ifdef NETWORK_LOCK_DEBUG
fprintf(stderr,
"curl_multi_perform_once(): thread %lu: unlocking transfer_lock;\n",
pthread_self());
#endif
#endif
PTHREAD_MUTEX_UNLOCK(&transfer_lock);
return n_running_curl;
}
@ -338,18 +338,18 @@ void transfer_blocking(CURL *curl)
transfer.type = DATA;
transfer.transferring = 1;
curl_easy_setopt(curl, CURLOPT_PRIVATE, &transfer);
#ifdef NETWORK_LOCK_DEBUG
#ifdef NETWORK_LOCK_DEBUG
fprintf(stderr,
"transfer_blocking(): thread %lu: locking transfer_lock;\n",
pthread_self());
#endif
#endif
PTHREAD_MUTEX_LOCK(&transfer_lock);
CURLMcode res = curl_multi_add_handle(curl_multi, curl);
#ifdef NETWORK_LOCK_DEBUG
#ifdef NETWORK_LOCK_DEBUG
fprintf(stderr,
"transfer_blocking(): thread %lu: unlocking transfer_lock;\n",
pthread_self());
#endif
#endif
PTHREAD_MUTEX_UNLOCK(&transfer_lock);
if(res > 0) {
@ -365,18 +365,18 @@ void transfer_blocking(CURL *curl)
void transfer_nonblocking(CURL *curl)
{
#ifdef NETWORK_LOCK_DEBUG
#ifdef NETWORK_LOCK_DEBUG
fprintf(stderr,
"transfer_nonblocking(): thread %lu: locking transfer_lock;\n",
pthread_self());
#endif
#endif
PTHREAD_MUTEX_LOCK(&transfer_lock);
CURLMcode res = curl_multi_add_handle(curl_multi, curl);
#ifdef NETWORK_LOCK_DEBUG
#ifdef NETWORK_LOCK_DEBUG
fprintf(stderr,
"transfer_nonblocking(): thread %lu: unlocking transfer_lock;\n",
pthread_self());
#endif
#endif
PTHREAD_MUTEX_UNLOCK(&transfer_lock);
if(res > 0) {