commit
13d78b87a6
67
src/cache.c
67
src/cache.c
|
@ -13,18 +13,18 @@
|
|||
|
||||
/**
|
||||
* \brief Data file block size
|
||||
* \details We set it to 1024*1024*8 = 8MiB
|
||||
* \details We set it to 2*1024*1024 = 8MiB
|
||||
*/
|
||||
|
||||
#define DEFAULT_DATA_BLK_SZ 8*1024*1024
|
||||
#define DEFAULT_DATA_BLK_SZ 2*1024*1024
|
||||
|
||||
/**
|
||||
* \brief Maximum segment block count
|
||||
* \details This is set to 128*1024 blocks, which uses 128KB. By default,
|
||||
* this allows the user to store (128*1024)*(8*1024*1024) = 1TB of data
|
||||
* \details This is set to 512*1024 blocks, which uses 128KB. By default,
|
||||
* this allows the user to store (512*1024)*(2*1024*1024) = 1TB of data
|
||||
*/
|
||||
|
||||
#define DEFAULT_MAX_SEGBC 128*1024
|
||||
#define DEFAULT_MAX_SEGBC 512*1024
|
||||
|
||||
/**
|
||||
* \brief error associated with metadata
|
||||
|
@ -756,8 +756,18 @@ cf->content_length: %ld, Data_size(fn): %ld.\n", fn, cf->content_length,
|
|||
void Cache_close(Cache *cf)
|
||||
{
|
||||
/* Must wait for the background download thread to stop */
|
||||
fprintf(stderr, "Cache_close(): locking bgt_lock;\n");
|
||||
fflush(stderr);
|
||||
pthread_mutex_lock(&cf->bgt_lock);
|
||||
fprintf(stderr, "Cache_close(): unlocking bgt_lock;\n");
|
||||
fflush(stderr);
|
||||
pthread_mutex_unlock(&cf->bgt_lock);
|
||||
fprintf(stderr, "Cache_close(): locking rw_lock;\n");
|
||||
fflush(stderr);
|
||||
pthread_mutex_lock(&cf->rw_lock);
|
||||
fprintf(stderr, "Cache_close(): unlocking rw_lock;\n");
|
||||
fflush(stderr);
|
||||
pthread_mutex_unlock(&cf->rw_lock);
|
||||
|
||||
if (Meta_write(cf)) {
|
||||
fprintf(stderr, "Cache_close(): Meta_write() error.");
|
||||
|
@ -807,9 +817,12 @@ static void Seg_set(Cache *cf, off_t offset, int i)
|
|||
static void *Cache_bgdl(void *arg)
|
||||
{
|
||||
Cache *cf = (Cache *) arg;
|
||||
fprintf(stderr, "Cache_bgdl(): thread %lu: locking rw_lock;\n",
|
||||
pthread_self());
|
||||
fflush(stderr);
|
||||
pthread_mutex_lock(&cf->rw_lock);
|
||||
uint8_t *recv_buf = calloc(cf->blksz, sizeof(uint8_t));
|
||||
fprintf(stderr, "Cache_bgdl(): ");
|
||||
fprintf(stderr, "Cache_bgdl(): thread %lu:", pthread_self());
|
||||
long recv = path_download(cf->path, (char *) recv_buf, cf->blksz,
|
||||
cf->next_offset);
|
||||
if ( (recv == cf->blksz) ||
|
||||
|
@ -823,18 +836,25 @@ static void *Cache_bgdl(void *arg)
|
|||
cf->next_offset);
|
||||
}
|
||||
free(recv_buf);
|
||||
fprintf(stderr, "Cache_bgdl(): thread %lu: unlocking bgt_lock;\n",
|
||||
pthread_self());
|
||||
fflush(stderr);
|
||||
pthread_mutex_unlock(&cf->bgt_lock);
|
||||
fprintf(stderr, "Cache_bgdl(): thread %lu: unlocking rw_lock;\n",
|
||||
pthread_self());
|
||||
fflush(stderr);
|
||||
pthread_mutex_unlock(&cf->rw_lock);
|
||||
pthread_exit(NULL);
|
||||
}
|
||||
|
||||
long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset)
|
||||
{
|
||||
// size_t start = offset;
|
||||
// size_t end = start + len;
|
||||
// char range_str[64];
|
||||
// snprintf(range_str, sizeof(range_str), "%lu-%lu", start, end);
|
||||
// fprintf(stderr, "Cache_read(%s, %s);\n", cf->path, range_str);
|
||||
size_t start = offset;
|
||||
size_t end = start + len;
|
||||
char range_str[64];
|
||||
snprintf(range_str, sizeof(range_str), "%lu-%lu", start, end);
|
||||
fprintf(stderr, "Cache_read(): thread %lu: %s, %s;\n", pthread_self(),
|
||||
cf->path, range_str);
|
||||
|
||||
/* SIGFPE prevention, although this shouldn't happen in the first place! */
|
||||
if (!cf->blksz) {
|
||||
|
@ -852,14 +872,27 @@ long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset)
|
|||
send = Data_read(cf, (uint8_t *) output_buf, len, offset);
|
||||
goto bgdl;
|
||||
} else {
|
||||
/* Wait for the other I/O threads to finish, then lock */
|
||||
pthread_mutex_lock(&cf->rw_lock);
|
||||
/* Wait for the background download thread to finish */
|
||||
fprintf(stderr, "Cache_read(): thread %lu: locking bgt_lock;\n",
|
||||
pthread_self());
|
||||
fflush(stderr);
|
||||
pthread_mutex_lock(&cf->bgt_lock);
|
||||
fprintf(stderr, "Cache_read(): thread %lu: unlocking bgt_lock;\n",
|
||||
pthread_self());
|
||||
fflush(stderr);
|
||||
pthread_mutex_unlock(&cf->bgt_lock);
|
||||
/* Wait for any other download thread to finish*/
|
||||
fprintf(stderr, "Cache_read(): thread %lu: locking rw_lock;\n",
|
||||
pthread_self());
|
||||
fflush(stderr);
|
||||
pthread_mutex_lock(&cf->rw_lock);
|
||||
if (Seg_exist(cf, offset)) {
|
||||
/* The segment already exists, send it off the unlock the I/O */
|
||||
/* The segment already exists - it was downloaded by other
|
||||
* download thread. Send it off and unlock the I/O */
|
||||
send = Data_read(cf, (uint8_t *) output_buf, len, offset);
|
||||
fprintf(stderr, "Cache_read(): thread %lu: unlocking rw_lock;\n",
|
||||
pthread_self());
|
||||
fflush(stderr);
|
||||
pthread_mutex_unlock(&cf->rw_lock);
|
||||
goto bgdl;
|
||||
}
|
||||
|
@ -893,6 +926,9 @@ long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset)
|
|||
recv);
|
||||
}
|
||||
free(recv_buf);
|
||||
fprintf(stderr, "Cache_read(): thread %lu: unlocking rw_lock;\n",
|
||||
pthread_self());
|
||||
fflush(stderr);
|
||||
pthread_mutex_unlock(&cf->rw_lock);
|
||||
|
||||
/* -----------Download the next segment in background -------------------*/
|
||||
|
@ -903,6 +939,9 @@ long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset)
|
|||
cf->next_offset < cf->content_length ){
|
||||
/* Stop the spawning of multiple background pthreads */
|
||||
if(!pthread_mutex_trylock(&cf->bgt_lock)) {
|
||||
fprintf(stderr, "Cache_read(): thread %lu: trylocked bgt_lock;\n",
|
||||
pthread_self());
|
||||
fflush(stderr);
|
||||
if (pthread_create(&cf->bgt, NULL, Cache_bgdl, cf)) {
|
||||
fprintf(stderr,
|
||||
"Cache_read(): Error creating background download thread\n"
|
||||
|
|
Loading…
Reference in New Issue