added Cache_background_download() to enable the download of the next block

This commit is contained in:
Fufu Fang 2019-04-24 23:58:26 +01:00
parent 12c19e3421
commit e442871899
5 changed files with 115 additions and 33 deletions

View File

@ -1,7 +1,9 @@
VERSION=1.1.0 VERSION=1.1.0
CFLAGS+= -g -O2 -Wall -Wextra -D_FILE_OFFSET_BITS=64 -DVERSION=\"$(VERSION)\" `pkg-config --cflags-only-I gumbo libcurl fuse` CFLAGS+= -g -O2 -Wall -Wextra -D_FILE_OFFSET_BITS=64 -DVERSION=\"$(VERSION)\" \
LDFLAGS+= -lgumbo -lcurl -lfuse -lcrypto `pkg-config --libs-only-L gumbo libcurl fuse` `pkg-config --cflags-only-I gumbo libcurl fuse`
LDFLAGS+= -pthread -lgumbo -lcurl -lfuse -lcrypto \
`pkg-config --libs-only-L gumbo libcurl fuse`
COBJS = main.o network.o fuse_local.o link.o cache.o util.o COBJS = main.o network.o fuse_local.o link.o cache.o util.o
prefix ?= /usr/local prefix ?= /usr/local

View File

@ -45,11 +45,6 @@ typedef enum {
int CACHE_SYSTEM_INIT = 0; int CACHE_SYSTEM_INIT = 0;
/**
* \brief the receive buffer
*/
static uint8_t RECV_BUF[DATA_BLK_SZ];
/** /**
* \brief The metadata directory * \brief The metadata directory
*/ */
@ -280,19 +275,13 @@ static long Data_size(const char *fn)
* - negative values on error, * - negative values on error,
* - otherwise, the number of bytes read. * - otherwise, the number of bytes read.
*/ */
static long Data_read(const Cache *cf, uint8_t *buf, off_t len, off_t offset) static long Data_read(Cache *cf, uint8_t *buf, off_t len, off_t offset)
{ {
if (len == 0) { if (len == 0) {
fprintf(stderr, "Data_read(): requested to read 0 byte!\n"); fprintf(stderr, "Data_read(): requested to read 0 byte!\n");
return -EINVAL; return -EINVAL;
} }
size_t start = offset;
size_t end = start + len;
char range_str[64];
snprintf(range_str, sizeof(range_str), "%lu-%lu", start, end);
fprintf(stderr, "Data_read(%s, %s);\n", cf->path, range_str);
long byte_read = -EIO; long byte_read = -EIO;
if (fseeko(cf->dfp, offset, SEEK_SET)) { if (fseeko(cf->dfp, offset, SEEK_SET)) {
@ -332,7 +321,7 @@ static long Data_read(const Cache *cf, uint8_t *buf, off_t len, off_t offset)
* - otherwise, the number of bytes written. * - otherwise, the number of bytes written.
*/ */
static long Data_write(const Cache *cf, const uint8_t *buf, off_t len, static long Data_write(Cache *cf, const uint8_t *buf, off_t len,
off_t offset) off_t offset)
{ {
if (len == 0) { if (len == 0) {
@ -340,12 +329,6 @@ static long Data_write(const Cache *cf, const uint8_t *buf, off_t len,
return -EINVAL; return -EINVAL;
} }
size_t start = offset;
size_t end = start + len;
char range_str[64];
snprintf(range_str, sizeof(range_str), "%lu-%lu", start, end);
fprintf(stderr, "Data_write(%s, %s);\n", cf->path, range_str);
long byte_written = -EIO; long byte_written = -EIO;
if (fseeko(cf->dfp, offset, SEEK_SET)) { if (fseeko(cf->dfp, offset, SEEK_SET)) {
@ -404,8 +387,26 @@ static Cache *Cache_alloc()
} }
if (pthread_mutex_init(&cf->rw_lock, NULL)) { if (pthread_mutex_init(&cf->rw_lock, NULL)) {
printf( fprintf(stderr, "Cache_alloc(): rw_lock initialisation failed!\n");
"Cache_alloc(): rw_lock initialisation failed!\n"); }
if (pthread_mutex_init(&cf->bgt_lock, NULL)) {
fprintf(stderr, "Cache_alloc(): seg_lock initialisation failed!\n");
}
if (pthread_mutexattr_init(&cf->bgt_lock_attr)) {
fprintf(stderr,
"Cache_alloc(): bgt_lock_attr initialisation failed!\n");
}
if (pthread_mutexattr_setpshared(&cf->bgt_lock_attr,
PTHREAD_PROCESS_SHARED)) {
fprintf(stderr, "Cache_alloc(): could not set bgt_lock_attr!\n");
}
if (pthread_mutex_init(&cf->bgt_lock, NULL)) {
fprintf(stderr, "Cache_alloc(): bgt_lock initialisation failed!\n");
} }
return cf; return cf;
@ -420,12 +421,21 @@ static void Cache_free(Cache *cf)
fprintf(stderr, "Cache_free(): could not destroy rw_lock!\n"); fprintf(stderr, "Cache_free(): could not destroy rw_lock!\n");
} }
if (pthread_mutex_destroy(&cf->bgt_lock)) {
fprintf(stderr, "Cache_free(): could not destroy bgt_lock!\n");
}
if (pthread_mutexattr_destroy(&cf->bgt_lock_attr)) {
fprintf(stderr, "Cache_alloc(): could not destroy bgt_lock_attr!\n");
}
if (cf->path) { if (cf->path) {
free(cf->path); free(cf->path);
} }
if (cf->seg) { if (cf->seg) {
free(cf->seg); free(cf->seg);
} }
free(cf); free(cf);
} }
@ -626,7 +636,6 @@ Cache *Cache_open(const char *fn)
{ {
/* Check if both metadata and data file exist */ /* Check if both metadata and data file exist */
if (!Cache_exist(fn)) { if (!Cache_exist(fn)) {
// fprintf(stderr, "dataset does not exist!\n");
return NULL; return NULL;
} }
@ -689,6 +698,10 @@ cf->content_length: %ld, Data_size(fn): %ld.\n", fn, cf->content_length,
void Cache_close(Cache *cf) void Cache_close(Cache *cf)
{ {
/* Must wait for the background download thread to stop */
pthread_mutex_lock(&cf->bgt_lock);
pthread_mutex_unlock(&cf->bgt_lock);
if (Meta_write(cf)) { if (Meta_write(cf)) {
fprintf(stderr, "Cache_close(): Meta_write() error."); fprintf(stderr, "Cache_close(): Meta_write() error.");
} }
@ -728,9 +741,41 @@ static void Seg_set(Cache *cf, off_t offset, int i)
cf->seg[byte] = i; cf->seg[byte] = i;
} }
/**
* \brief Background download function
* \details If we are requesting the data from the second half of the current
* segment, we can spawn a pthread using this function to download the next
* segment.
*/
static void *Cache_background_download(void *arg)
{
fprintf(stderr, "Starting Cache_background_download in its own thread.\n");
Cache *cf = (Cache *) arg;
uint8_t recv_buf[DATA_BLK_SZ];
long recv = path_download(cf->path, (char *) recv_buf, cf->blksz,
cf->next_offset);
if ( (recv == cf->blksz) ||
(cf->next_offset == (cf->content_length / cf->blksz * cf->blksz)) )
{
Data_write(cf, recv_buf, cf->blksz, cf->next_offset);
Seg_set(cf, cf->next_offset, 1);
} else {
fprintf(stderr,
"Cache_background_download(): recv (%ld) < cf->blksz! \
Possible network error?\n",
recv);
}
pthread_mutex_unlock(&cf->bgt_lock);
fprintf(stderr, "Exiting Cache_background_download thread.\n");
pthread_exit(NULL);
}
long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset) long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset)
{ {
long send; long send;
uint8_t recv_buf[DATA_BLK_SZ];
/* /*
* Quick fix for SIGFPE, * Quick fix for SIGFPE,
* this shouldn't happen in the first place! * this shouldn't happen in the first place!
@ -741,7 +786,10 @@ long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset)
cf->blksz); cf->blksz);
return path_download(cf->path, output_buf, len, offset); return path_download(cf->path, output_buf, len, offset);
} }
pthread_mutex_lock(&cf->rw_lock); pthread_mutex_lock(&cf->rw_lock);
/* Calculate the aligned offset */
off_t dl_offset = offset / cf->blksz * cf->blksz;
if (Seg_exist(cf, offset)) { if (Seg_exist(cf, offset)) {
/* /*
* The metadata shows the segment already exists. This part is easy, * The metadata shows the segment already exists. This part is easy,
@ -749,10 +797,8 @@ long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset)
*/ */
send = Data_read(cf, (uint8_t *) output_buf, len, offset); send = Data_read(cf, (uint8_t *) output_buf, len, offset);
} else { } else {
/* Calculate the aligned offset */
off_t dl_offset = offset / cf->blksz * cf->blksz;
/* Download the segment */ /* Download the segment */
long recv = path_download(cf->path, (char *) RECV_BUF, cf->blksz, long recv = path_download(cf->path, (char *) recv_buf, cf->blksz,
dl_offset); dl_offset);
/* /*
* check if we have received enough data * check if we have received enough data
@ -762,13 +808,14 @@ long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset)
* Condition 2: offset is the last segment * Condition 2: offset is the last segment
*/ */
if ( (recv == cf->blksz) || if ( (recv == cf->blksz) ||
(dl_offset == (cf->content_length / cf->blksz * cf->blksz)) ) { (dl_offset == (cf->content_length / cf->blksz * cf->blksz)) )
memmove(output_buf, RECV_BUF + (offset - dl_offset), len); {
memmove(output_buf, recv_buf + (offset - dl_offset), len);
send = len; send = len;
Data_write(cf, RECV_BUF, cf->blksz, dl_offset); Data_write(cf, recv_buf, cf->blksz, dl_offset);
Seg_set(cf, dl_offset, 1); Seg_set(cf, dl_offset, 1);
} else { } else {
memmove(output_buf, RECV_BUF + (offset - dl_offset), recv); memmove(output_buf, recv_buf + (offset - dl_offset), recv);
send = recv; send = recv;
fprintf(stderr, fprintf(stderr,
"Cache_read(): recv (%ld) < cf->blksz! Possible network error?\n", "Cache_read(): recv (%ld) < cf->blksz! Possible network error?\n",
@ -776,5 +823,19 @@ long Cache_read(Cache *cf, char *output_buf, off_t len, off_t offset)
} }
} }
pthread_mutex_unlock(&cf->rw_lock); pthread_mutex_unlock(&cf->rw_lock);
/* Download the next segment in background */
cf->next_offset = round_div(offset, cf->blksz) * cf->blksz;
if ( (cf->next_offset > dl_offset) && !Seg_exist(cf, cf->next_offset) ) {
/* Stop the spawning of multiple background pthreads */
if(!pthread_mutex_trylock(&cf->bgt_lock)) {
if (pthread_create(&cf->bgt, NULL, Cache_background_download, cf)) {
fprintf(stderr,
"Cache_read(): Error creating background download thread\n"
);
}
}
}
return send; return send;
} }

View File

@ -24,12 +24,19 @@ typedef uint8_t Seg;
*/ */
typedef struct { typedef struct {
char *path; /**< the path to the file on the web server */ char *path; /**< the path to the file on the web server */
Link *link; /**< The Link associated with this cache data set */
long time; /**<the modified time of the file */ long time; /**<the modified time of the file */
off_t content_length; /**<the size of the file */ off_t content_length; /**<the size of the file */
pthread_mutex_t rw_lock; /**< mutex for disk operation */
pthread_t bgt; /**< background pthread */
pthread_mutex_t bgt_lock; /**< mutex for spawning a background thread */
pthread_mutexattr_t bgt_lock_attr;
off_t next_offset; /**<the offset of the next segment to be
downloaded in background*/
pthread_mutex_t rw_lock; /**< mutex for read/write operation */
FILE *dfp; /**< The FILE pointer for the data file*/ FILE *dfp; /**< The FILE pointer for the data file*/
FILE *mfp; /**< The FILE pointer for the metadata */ FILE *mfp; /**< The FILE pointer for the metadata */
Link *link; /**< The Link associated with this cache data set */
int blksz; /**<the block size of the data file */ int blksz; /**<the block size of the data file */
long segbc; /**<segment array byte count */ long segbc; /**<segment array byte count */
Seg *seg; /**< the detail of each segment */ Seg *seg; /**< the detail of each segment */

View File

@ -28,3 +28,8 @@ char *strndupcat(const char *a, const char *b, int n)
c[nc-1] = '\0'; c[nc-1] = '\0';
return c; return c;
} }
int64_t round_div(int64_t a, int64_t b)
{
return (a + (b / 2)) / b;
}

View File

@ -1,6 +1,8 @@
#ifndef UTIL_H #ifndef UTIL_H
#define UTIL_H #define UTIL_H
#include <stdint.h>
/** /**
* \file util.h * \file util.h
* \brief utility functions * \brief utility functions
@ -16,4 +18,9 @@
*/ */
char *strndupcat(const char *a, const char *b, int n); char *strndupcat(const char *a, const char *b, int n);
/**
* \brief division, but rounded to the nearest integer rather than truncating
*/
int64_t round_div(int64_t a, int64_t b);
#endif #endif