Merge pull request #2594 from Sea-n/multithread-part1

Add Multi-thread Support
This commit is contained in:
Gerardo O 2023-12-20 17:09:21 -06:00 committed by GitHub
commit 1e116d04f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 378 additions and 289 deletions

3
.gitignore vendored
View File

@ -3,6 +3,7 @@ config.status
config.guess
config.sub
config.rpath
configure~
**/.deps/
/INSTALL
/Makefile
@ -103,6 +104,6 @@ Makefile.in
/resources/css/fa.min.css.tmp
/resources/js/app.js.tmp
/resources/js/charts.js.tmp
/resources/js/d3.v3.min.js.tmp
/resources/js/d3.v?.min.js.tmp
/resources/js/hogan.min.js.tmp
/resources/tpls.html.tmp

View File

@ -313,6 +313,18 @@ like to process all log files `access.log*`, we can do:
_Note_: On Mac OS X, use `gunzip -c` instead of `zcat`.
### Multi-thread Support ###
Use `--jobs=<count>` (or `-j`) to enable multi-thread parsing. For example:
# goaccess access.log -o report.html -j 4
And use `--chunk-size=<256-32768>` to adjust chunk size, the default chunk size is 1024. For example:
# goaccess access.log -o report.html -j 4 --chunk-size=8192
### Real-time HTML outputs ###
GoAccess has the ability the output real-time data in the HTML report. You can

View File

@ -996,31 +996,28 @@ ins_u648 (khash_t (u648) *hash, uint64_t key, uint8_t value) {
}
/* Increase an uint32_t value given an uint32_t key.
* Note: If the key exists, its value is increased by the given inc.
*
* On error, 0 is returned.
* On success the inserted value is returned */
* On success the increased value is returned */
uint32_t
inc_ii32 (khash_t (ii32) *hash, uint32_t key, uint32_t inc) {
khint_t k;
int ret;
uint32_t value = inc;
if (!hash)
return 0;
k = kh_get (ii32, hash, key);
/* key found, increment current value by the given `inc` */
if (k != kh_end (hash))
value = kh_val (hash, k) + inc;
/* key not found, put a new hash with val=0 */
if (k == kh_end (hash)) {
k = kh_put (ii32, hash, key, &ret);
/* operation failed */
if (ret == -1)
return 0;
kh_val (hash, k) = 0;
}
k = kh_put (ii32, hash, key, &ret);
if (ret == -1)
return 0;
kh_val (hash, k) = value;
return value;
return __sync_add_and_fetch (&kh_val (hash, k), inc);
}
/* Increase a uint64_t value given a string key.
@ -1082,7 +1079,7 @@ inc_iu64 (khash_t (iu64) *hash, uint32_t key, uint64_t inc) {
return 0;
}
/* Increase a uint32_t value given a string key.
/* Increase an uint32_t value given a string key.
*
* On error, 0 is returned.
* On success the increased value is returned */
@ -1090,25 +1087,21 @@ static uint32_t
inc_si32 (khash_t (si32) *hash, const char *key, uint32_t inc) {
khint_t k;
int ret;
uint32_t value = inc;
if (!hash)
return 0;
k = kh_get (si32, hash, key);
/* key not found, set new value to the given `inc` */
/* key not found, put a new hash with val=0 */
if (k == kh_end (hash)) {
k = kh_put (si32, hash, key, &ret);
/* operation failed */
if (ret == -1)
return 0;
} else {
value = kh_val (hash, k) + inc;
kh_val (hash, k) = 0;
}
kh_val (hash, k) = value;
return value;
return __sync_add_and_fetch (&kh_val (hash, k), inc);
}
/* Insert a string key and auto increment int value.
@ -1200,7 +1193,7 @@ get_si32 (khash_t (si32) *hash, const char *key) {
k = kh_get (si32, hash, key);
/* key found, return current value */
if (k != kh_end (hash))
return kh_val (hash, k);
return __sync_add_and_fetch (&kh_val (hash, k), 0);
return 0;
}
@ -1291,15 +1284,14 @@ get_ss32 (khash_t (ss32) *hash, const char *key) {
uint32_t
get_ii32 (khash_t (ii32) *hash, uint32_t key) {
khint_t k;
uint32_t value = 0;
if (!hash)
return 0;
k = kh_get (ii32, hash, key);
/* key found, return current value */
if (k != kh_end (hash) && (value = kh_val (hash, k)))
return value;
if (k != kh_end (hash))
return __sync_add_and_fetch (&kh_val (hash, k), 0);
return 0;
}

View File

@ -80,7 +80,9 @@
GConf conf = {
.append_method = 1,
.append_protocol = 1,
.chunk_size = 1024,
.hl_header = 1,
.jobs = 1,
.num_tests = 10,
};
@ -767,6 +769,7 @@ read_client (void *ptr_data) {
/* Parse tailed lines */
static void
parse_tail_follow (GLog *glog, FILE *fp) {
GLogItem *logitem;
#ifdef WITH_GETLINE
char *buf = NULL;
#else
@ -780,7 +783,13 @@ parse_tail_follow (GLog *glog, FILE *fp) {
while (fgets (buf, LINE_BUFFER, fp) != NULL) {
#endif
pthread_mutex_lock (&gdns_thread.mutex);
pre_process_log (glog, buf, 0);
logitem = parse_line (glog, buf, 0);
if (logitem != NULL) {
if (logitem->errstr == NULL)
process_log (logitem);
count_process (glog);
free_glog (logitem);
}
pthread_mutex_unlock (&gdns_thread.mutex);
glog->bytes += strlen (buf);
#ifdef WITH_GETLINE

View File

@ -558,7 +558,7 @@ count_bw (int numdate, uint64_t resp_size) {
/* Keep track of all invalid log strings. */
static void
count_invalid (GLog *glog, const char *line) {
count_invalid (GLog *glog, GLogItem *logitem, const char *line) {
glog->invalid++;
ht_inc_cnt_overall ("failed_requests", 1);
@ -566,8 +566,8 @@ count_invalid (GLog *glog, const char *line) {
LOG_INVALID (("%s", line));
}
if (glog->items->errstr && glog->log_erridx < MAX_LOG_ERRORS) {
glog->errors[glog->log_erridx++] = xstrdup (glog->items->errstr);
if (logitem->errstr && glog->log_erridx < MAX_LOG_ERRORS) {
glog->errors[glog->log_erridx++] = xstrdup (logitem->errstr);
}
}
@ -610,16 +610,16 @@ count_valid (int numdate) {
/* Keep track of all valid and processed log strings. */
void
count_process (GLog *glog) {
__sync_add_and_fetch(&glog->processed, 1);
lock_spinner ();
glog->processed++;
ht_inc_cnt_overall ("total_requests", 1);
unlock_spinner ();
}
void
count_process_and_invalid (GLog *glog, const char *line) {
count_process_and_invalid (GLog *glog, GLogItem *logitem, const char *line) {
count_process (glog);
count_invalid (glog, line);
count_invalid (glog, logitem, line);
}
/* Keep track of all excluded log strings (IPs).
@ -1324,8 +1324,8 @@ gen_status_code_key (GKeyData *kdata, GLogItem *logitem) {
if (!logitem->status)
return 1;
type = verify_status_code_type (logitem->status);
status = verify_status_code (logitem->status);
type = verify_status_code_type (logitem->status);
get_kdata (kdata, status, status);
get_kroot (kdata, type, type);
@ -1390,7 +1390,7 @@ static int
include_uniq (GLogItem *logitem) {
int u = conf.client_err_to_unique_count;
if (!logitem->status || logitem->status[0] != '4' || (u && logitem->status[0] == '4'))
if (!logitem->status || (logitem->status / 100) != 4 || (u && (logitem->status / 100) == '4'))
return 1;
return 0;
}

View File

@ -147,7 +147,7 @@ char *get_mtr_str (GSMetric metric);
int excluded_ip (GLogItem * logitem);
uint32_t *i322ptr (uint32_t val);
uint64_t *uint642ptr (uint64_t val);
void count_process_and_invalid (GLog * glog, const char *line);
void count_process_and_invalid (GLog * glog, GLogItem * logitem, const char *line);
void count_process (GLog * glog);
void free_gmetrics (GMetrics * metric);
void insert_methods_protocols (void);

View File

@ -50,11 +50,11 @@
#include "xmalloc.h"
static char short_options[] = "f:e:p:o:l:H:M:S:b:"
static char short_options[] = "b:e:f:j:l:o:p:H:M:S:"
#ifdef HAVE_LIBGEOIP
"g"
#endif
"acirmMhHqdsV";
"acdhimqrsV";
/* *INDENT-OFF* */
struct option long_opts[] = {
@ -71,6 +71,7 @@ struct option long_opts[] = {
{"hl-header" , no_argument , 0 , 'i' } ,
{"http-method" , required_argument , 0 , 'M' } ,
{"http-protocol" , required_argument , 0 , 'H' } ,
{"jobs" , required_argument , 0 , 'j' } ,
{"log-file" , required_argument , 0 , 'f' } ,
{"log-size" , required_argument , 0 , 'S' } ,
{"no-query-string" , no_argument , 0 , 'q' } ,
@ -90,6 +91,7 @@ struct option long_opts[] = {
{"color" , required_argument , 0 , 0 } ,
{"color-scheme" , required_argument , 0 , 0 } ,
{"crawlers-only" , no_argument , 0 , 0 } ,
{"chunk-size" , required_argument , 0 , 0 } ,
{"daemonize" , no_argument , 0 , 0 } ,
{"datetime-format" , required_argument , 0 , 0 } ,
{"date-format" , required_argument , 0 , 0 } ,
@ -261,6 +263,8 @@ cmd_help (void)
" -d --with-output-resolver - Enable IP resolver on HTML|JSON output.\n"
" -e --exclude-ip=<IP> - Exclude one or multiple IPv4/6. Allows IP\n"
" ranges. e.g., 192.168.0.1-192.168.0.10\n"
" -j --jobs=<1-6> - Threads count for parsing log. Default\n"
" is 1 thread. Use 4 threads is recommended.\n"
" -H --http-protocol=<yes|no> - Set/unset HTTP request protocol if found.\n"
" -M --http-method=<yes|no> - Set/unset HTTP request method if found.\n"
" -o --output=<format|filename> - Output to stdout or the specified file.\n"
@ -276,6 +280,7 @@ cmd_help (void)
" report.\n"
" --anonymize-level=<1|2|3> - Anonymization levels: 1 => default, 2 =>\n"
" strong, 3 => pedantic.\n"
" --chunk-size=<256-32768> - Chunk size for every threads. Default is 1024.\n"
" --crawlers-only - Parse and display only crawlers.\n"
" --date-spec=<date|hr|min> - Date specificity. Possible values: `date`\n"
" (default), `hr` or `min`.\n"
@ -578,6 +583,16 @@ parse_long_opt (const char *name, const char *oarg) {
if (!strcmp ("all-static-files", name))
conf.all_static_files = 1;
/* chunk size */
if (!strcmp ("chunk-size", name)) {
/* Recommended chunk size is 256 - 32768, hard limit is 32 - 1048576. */
conf.chunk_size = atoi (oarg);
if (conf.chunk_size < 32)
FATAL ("The hard lower limit of --chunk-size is 32.");
if (conf.chunk_size > 1048576)
FATAL ("The hard limit of --chunk-size is 1048576.");
}
/* crawlers only */
if (!strcmp ("crawlers-only", name))
conf.crawlers_only = 1;
@ -627,7 +642,8 @@ parse_long_opt (const char *name, const char *oarg) {
/* ignore status code */
if (!strcmp ("ignore-status", name))
set_array_opt (oarg, conf.ignore_status, &conf.ignore_status_idx, MAX_IGNORE_STATUS);
if (conf.ignore_status_idx < MAX_IGNORE_STATUS)
conf.ignore_status[conf.ignore_status_idx++] = atoi (oarg);
/* ignore static requests */
if (!strcmp ("ignore-statics", name)) {
@ -804,6 +820,12 @@ read_option_args (int argc, char **argv) {
case 'i':
conf.hl_header = 1;
break;
case 'j':
/* Recommended 4 threads, soft limit is 6, hard limit is 12. */
conf.jobs = atoi (optarg);
if (conf.jobs > 12)
FATAL ("The hard limit of --jobs is 12.");
break;
case 'q':
conf.ignore_qstr = 1;
break;

View File

@ -55,6 +55,7 @@
#include <unistd.h>
#include <fcntl.h>
#include <inttypes.h>
#include <pthread.h>
#include <libgen.h>
#include "gkhash.h"
@ -252,10 +253,8 @@ free_logs (Logs *logs) {
* On success, the new GLogItem instance is returned. */
GLogItem *
init_log_item (GLog *glog) {
time_t now = time (0);
GLogItem *logitem;
glog->items = xmalloc (sizeof (GLogItem));
logitem = glog->items;
logitem = xmalloc (sizeof (GLogItem));
memset (logitem, 0, sizeof *logitem);
logitem->agent = NULL;
@ -278,7 +277,7 @@ init_log_item (GLog *glog) {
logitem->req = NULL;
logitem->resp_size = 0LL;
logitem->serve_time = 0;
logitem->status = NULL;
logitem->status = 0;
logitem->time = NULL;
logitem->uniq_key = NULL;
logitem->vhost = NULL;
@ -293,13 +292,13 @@ init_log_item (GLog *glog) {
memset (logitem->site, 0, sizeof (logitem->site));
memset (logitem->agent_hex, 0, sizeof (logitem->agent_hex));
localtime_r (&now, &logitem->dt);
logitem->dt = glog->start_time;
return logitem;
}
/* Free all members of a GLogItem */
static void
void
free_glog (GLogItem *logitem) {
if (logitem->agent != NULL)
free (logitem->agent);
@ -337,8 +336,6 @@ free_glog (GLogItem *logitem) {
free (logitem->req_key);
if (logitem->req != NULL)
free (logitem->req);
if (logitem->status != NULL)
free (logitem->status);
if (logitem->time != NULL)
free (logitem->time);
if (logitem->uniq_key != NULL)
@ -549,14 +546,6 @@ extract_method (const char *token) {
return NULL;
}
/* Determine if time-served data was stored on-disk. */
static void
contains_usecs (void) {
if (conf.serve_usecs)
return;
conf.serve_usecs = 1; /* flag */
}
static int
is_cache_hit (const char *tkn) {
if (strcasecmp ("MISS", tkn) == 0)
@ -720,7 +709,7 @@ get_delim (char *dest, const char *p) {
*
* On success, the malloc'd token is returned. */
static char *
parsed_string (const char *pch, char **str, int move_ptr) {
parsed_string (const char *pch, const char **str, int move_ptr) {
char *p;
size_t len = (pch - *str + 1);
@ -738,9 +727,9 @@ parsed_string (const char *pch, char **str, int move_ptr) {
* On error, or unable to parse it, NULL is returned.
* On success, the malloc'd token is returned. */
static char *
parse_string (char **str, const char *delims, int cnt) {
parse_string (const char **str, const char *delims, int cnt) {
int idx = 0;
char *pch = *str, *p = NULL;
const char *pch = *str, *p = NULL;
char end;
if ((*delims != 0x0) && (p = strpbrk (*str, delims)) == NULL)
@ -763,15 +752,15 @@ parse_string (char **str, const char *delims, int cnt) {
}
char *
extract_by_delim (char **str, const char *end) {
extract_by_delim (const char **str, const char *end) {
return parse_string (&(*str), end, 1);
}
/* Move forward through the log string until a non-space (!isspace)
* char is found. */
static void
find_alpha (char **str) {
char *s = *str;
find_alpha (const char **str) {
const char *s = *str;
while (*s) {
if (isspace (*s))
s++;
@ -784,9 +773,9 @@ find_alpha (char **str) {
/* Move forward through the log string until a non-space (!isspace)
* char is found and returns the count. */
static int
find_alpha_count (char *str) {
find_alpha_count (const char *str) {
int cnt = 0;
char *s = str;
const char *s = str;
while (*s) {
if (isspace (*s))
s++, cnt++;
@ -901,22 +890,20 @@ set_agent_hash (GLogItem *logitem) {
* On error, or unable to parse it, 1 is returned.
* On success, the malloc'd token is assigned to a GLogItem member. */
static int
parse_specifier (GLogItem *logitem, char **str, const char *p, const char *end) {
parse_specifier (GLogItem *logitem, const char **str, const char *p, const char *end) {
struct tm tm;
time_t now = time (0);
const char *dfmt = conf.date_format;
const char *tfmt = conf.time_format;
char *pch, *sEnd, *bEnd, *tkn = NULL;
double serve_secs = 0.0;
uint64_t bandw = 0, serve_time = 0;
long status = 0L;
int dspc = 0, fmtspcs = 0;
errno = 0;
memset (&tm, 0, sizeof (tm));
tm.tm_isdst = -1;
localtime_r (&now, &tm);
tm = logitem->dt;
switch (*p) {
/* date */
@ -1119,19 +1106,14 @@ parse_specifier (GLogItem *logitem, char **str, const char *p, const char *end)
if (!(tkn = parse_string (&(*str), end, 1)))
return spec_err (logitem, ERR_SPEC_TOKN_NUL, *p, NULL);
/* do not validate HTTP status code */
if (conf.no_strict_status) {
logitem->status = tkn;
break;
}
status = strtol (tkn, &sEnd, 10);
if (tkn == sEnd || *sEnd != '\0' || errno == ERANGE || status < 100 || status > 599) {
logitem->status = strtol (tkn, &sEnd, 10);
if (tkn == sEnd || *sEnd != '\0' || errno == ERANGE ||
(!conf.no_strict_status && (logitem->status < 100 || logitem->status > 599))) {
spec_err (logitem, ERR_SPEC_TOKN_INV, *p, tkn);
free (tkn);
return 1;
}
logitem->status = tkn;
free (tkn);
break;
/* size of response in bytes - excluding HTTP headers */
case 'b':
@ -1144,7 +1126,7 @@ parse_specifier (GLogItem *logitem, char **str, const char *p, const char *end)
if (tkn == bEnd || *bEnd != '\0' || errno == ERANGE)
bandw = 0;
logitem->resp_size = bandw;
conf.bandwidth = 1;
__sync_bool_compare_and_swap (&conf.bandwidth, 0, 1); /* set flag */
free (tkn);
break;
/* referrer */
@ -1212,7 +1194,8 @@ parse_specifier (GLogItem *logitem, char **str, const char *p, const char *end)
/* convert it to microseconds */
logitem->serve_time = (serve_secs > 0) ? serve_secs * MILS : 0;
contains_usecs (); /* set flag */
/* Determine if time-served data was stored on-disk. */
__sync_bool_compare_and_swap (&conf.serve_usecs, 0, 1); /* set flag */
free (tkn);
break;
/* time taken to serve the request, in seconds with a milliseconds
@ -1234,7 +1217,8 @@ parse_specifier (GLogItem *logitem, char **str, const char *p, const char *end)
/* convert it to microseconds */
logitem->serve_time = (serve_secs > 0) ? serve_secs * SECS : 0;
contains_usecs (); /* set flag */
/* Determine if time-served data was stored on-disk. */
__sync_bool_compare_and_swap (&conf.serve_usecs, 0, 1); /* set flag */
free (tkn);
break;
/* time taken to serve the request, in microseconds */
@ -1250,7 +1234,8 @@ parse_specifier (GLogItem *logitem, char **str, const char *p, const char *end)
serve_time = 0;
logitem->serve_time = serve_time;
contains_usecs (); /* set flag */
/* Determine if time-served data was stored on-disk. */
__sync_bool_compare_and_swap (&conf.serve_usecs, 0, 1); /* set flag */
free (tkn);
break;
/* time taken to serve the request, in nanoseconds */
@ -1268,7 +1253,8 @@ parse_specifier (GLogItem *logitem, char **str, const char *p, const char *end)
/* convert it to microseconds */
logitem->serve_time = (serve_time > 0) ? serve_time / MILS : 0;
contains_usecs (); /* set flag */
/* Determine if time-served data was stored on-disk. */
__sync_bool_compare_and_swap (&conf.serve_usecs, 0, 1); /* set flag */
free (tkn);
break;
/* UMS: Krypto (TLS) "ECDHE-RSA-AES128-GCM-SHA256" */
@ -1335,8 +1321,9 @@ parse_specifier (GLogItem *logitem, char **str, const char *p, const char *end)
* If no unable to find both curly braces (boundaries), NULL is returned.
* On success, the malloc'd reject set is returned. */
static char *
extract_braces (char **p) {
char *b1 = NULL, *b2 = NULL, *ret = NULL, *s = *p;
extract_braces (const char **p) {
const char *b1 = NULL, *b2 = NULL, *s = *p;
char *ret = NULL;
int esc = 0;
ptrdiff_t len = 0;
@ -1374,8 +1361,8 @@ extract_braces (char **p) {
* On success, the malloc'd token is assigned to a GLogItem->host and
* 0 is returned. */
static int
set_xff_host (GLogItem *logitem, char *str, char *skips, int out) {
char *ptr = NULL, *tkn = NULL;
set_xff_host (GLogItem *logitem, const char *str, const char *skips, int out) {
const char *ptr = NULL, *tkn = NULL;
int invalid_ip = 1, len = 0, type_ip = TYPE_IPINV;
int idx = 0, skips_len = 0;
@ -1399,14 +1386,14 @@ set_xff_host (GLogItem *logitem, char *str, char *skips, int out) {
invalid_ip = invalid_ipaddr (tkn, &type_ip);
/* done, already have IP and current token is not a host */
if (logitem->host && invalid_ip) {
free (tkn);
free ((void *) tkn);
break;
}
if (!logitem->host && !invalid_ip) {
logitem->host = xstrdup (tkn);
logitem->type_ip = type_ip;
}
free (tkn);
free ((void *) tkn);
idx = 0;
/* found the client IP, break then */
@ -1425,7 +1412,7 @@ set_xff_host (GLogItem *logitem, char *str, char *skips, int out) {
* If no IP is found, 1 is returned.
* On success, the malloc'd token is assigned to a GLogItem->host and 0 is returned. */
static int
find_xff_host (GLogItem *logitem, char **str, char **p) {
find_xff_host (GLogItem *logitem, const char **str, const char **p) {
char *skips = NULL, *extract = NULL;
char pch[2] = { 0 };
int res = 0;
@ -1460,7 +1447,7 @@ clean:
* On success, the malloc'd token is assigned to a GLogItem member and
* 0 is returned. */
static int
special_specifier (GLogItem *logitem, char **str, char **p) {
special_specifier (GLogItem *logitem, const char **str, const char **p) {
switch (**p) {
/* XFF remote hostname (IP only) */
case 'h':
@ -1478,9 +1465,9 @@ special_specifier (GLogItem *logitem, char **str, char **p) {
* On success, the malloc'd token is assigned to a GLogItem member and
* 0 is returned. */
static int
parse_format (GLogItem *logitem, char *str, char *lfmt) {
parse_format (GLogItem *logitem, const char *str, const char * lfmt) {
char end[2 + 1] = { 0 };
char *p = NULL, *last = NULL;
const char *p = NULL, *last = NULL;
int perc = 0, tilde = 0, ret = 0;
if (str == NULL || *str == '\0')
@ -1633,12 +1620,14 @@ is_static (const char *req) {
* If the status code is not within the ignore-array, 0 is returned.
* If the status code is within the ignore-array, 1 is returned. */
static int
ignore_status_code (const char *status) {
ignore_status_code (int status) {
if (!status || conf.ignore_status_idx == 0)
return 0;
if (str_inarray (status, conf.ignore_status, conf.ignore_status_idx) != -1)
return 1;
for (int i=0; i<conf.ignore_status_idx; i++)
if (status == conf.ignore_status[i])
return 1;
return 0;
}
@ -1659,13 +1648,11 @@ ignore_static (const char *req) {
* If the request is a 404, 1 is returned. */
static int
is_404 (GLogItem *logitem) {
if (!logitem->status || *logitem->status == '\0')
return 0;
/* is this a 404? */
if (!memcmp (logitem->status, "404", 3))
if (logitem->status == 404)
return 1;
/* treat 444 as 404? */
else if (!memcmp (logitem->status, "444", 3) && conf.code444_as_404)
else if (logitem->status == 444 && conf.code444_as_404)
return 1;
return 0;
}
@ -1804,7 +1791,7 @@ process_invalid (GLog *glog, GLogItem *logitem, const char *line) {
/* if not restoring from disk, then count entry as proceeded and invalid */
if (!conf.restore) {
count_process_and_invalid (glog, line);
count_process_and_invalid (glog, logitem, line);
return;
}
@ -1815,13 +1802,13 @@ process_invalid (GLog *glog, GLogItem *logitem, const char *line) {
if (glog->props.inode && is_likely_same_log (glog, &lp)) {
/* only count invalids if we're past the last parsed line */
if (glog->props.size > lp.size && glog->read >= lp.line)
count_process_and_invalid (glog, line);
count_process_and_invalid (glog, logitem, line);
return;
}
/* no timestamp to compare against, just count the invalid then */
if (!logitem->numdate) {
count_process_and_invalid (glog, line);
count_process_and_invalid (glog, logitem, line);
return;
}
@ -1833,7 +1820,7 @@ process_invalid (GLog *glog, GLogItem *logitem, const char *line) {
* then we simply don't count the entry as proceed & invalid to attempt over
* counting restored data */
if (should_restore_from_disk (glog) == 0)
count_process_and_invalid (glog, line);
count_process_and_invalid (glog, logitem, line);
}
static int
@ -1865,40 +1852,60 @@ parse_json_format (GLogItem *logitem, char *str) {
* account multiple parsing options prior to setting data into the
* corresponding data structure.
*
* On success, 0 is returned */
int
pre_process_log (GLog *glog, char *line, int dry_run) {
* On error, logitem->errstr will contains the error message. */
GLogItem *
parse_line (GLog *glog, char *line, int dry_run) {
int64_t oldts, newts;
GLogItem *logitem;
int ret = 0;
char *fmt = conf.log_format;
/* soft ignore these lines */
if (valid_line (line))
return -1;
logitem = init_log_item (glog);
/* soft ignore these lines */
if (valid_line (line)) {
logitem->errstr = xstrdup ("Invalid line");
return logitem;
}
/* Parse a line of log, and fill structure with appropriate values */
if (conf.is_json_log_format)
ret = parse_json_format (logitem, line);
else
ret = parse_format (logitem, line, fmt);
if (ret) {
if (logitem->errstr == NULL)
logitem->errstr = xstrdup ("Parse format error");
process_invalid (glog, logitem, line);
return logitem;
}
if (!glog->piping && conf.fname_as_vhost && glog->fname_as_vhost)
logitem->vhost = xstrdup (glog->fname_as_vhost);
if (ret || (ret = verify_missing_fields (logitem))) {
if (verify_missing_fields (logitem)) {
logitem->errstr = xstrdup ("Missing fields");
process_invalid (glog, logitem, line);
goto cleanup;
return logitem;
}
if ((glog->lp.ts = mktime (&logitem->dt)) == -1)
goto cleanup;
// glog->lp.ts = max(glog->lp.ts, logitem->dt)
newts = mktime (&logitem->dt);
for (;;) {
oldts = glog->lp.ts;
if (oldts >= newts) break;
if (__sync_bool_compare_and_swap (&glog->lp.ts, oldts, newts)) break;
}
if (newts == -1)
return logitem;
if (should_restore_from_disk (glog))
goto cleanup;
return logitem;
count_process (glog);
/* testing log only */
if (dry_run)
return logitem;
/* agent will be null in cases where %u is not specified */
if (logitem->agent == NULL) {
@ -1906,14 +1913,10 @@ pre_process_log (GLog *glog, char *line, int dry_run) {
set_agent_hash (logitem);
}
/* testing log only */
if (dry_run)
goto cleanup;
logitem->ignorelevel = ignore_line (logitem);
/* ignore line */
if (logitem->ignorelevel == IGNORE_LEVEL_PANEL)
goto cleanup;
return logitem;
if (is_404 (logitem))
logitem->is_404 = 1;
@ -1922,39 +1925,39 @@ pre_process_log (GLog *glog, char *line, int dry_run) {
logitem->uniq_key = get_uniq_visitor_key (logitem);
process_log (logitem);
cleanup:
free_glog (logitem);
return ret;
return logitem;
}
/* Entry point to process the given line from the log.
*
* On error, 1 is returned.
* On success or soft ignores, 0 is returned. */
static int
* On error, NULL is returned.
* On success or soft ignores, GLogItem is returned. */
static GLogItem *
read_line (GLog *glog, char *line, int *test, int *cnt, int dry_run) {
int ret = 0;
GLogItem *logitem;
/* start processing log line */
if ((ret = pre_process_log (glog, line, dry_run)) == 0 && *test)
*test = 0;
logitem = parse_line (glog, line, dry_run);
if (logitem != NULL) {
/* soft ignore */
if (logitem->errstr != NULL && strcmp (logitem->errstr, "Invalid line") == 0)
return logitem;
/* soft ignores */
if (ret == -1)
return 0;
if (logitem->errstr == NULL)
*test = 0;
}
/* reached num of lines to test and no valid records were found, log
* format is likely not matching */
if (conf.num_tests && ++(*cnt) == (int) conf.num_tests && *test) {
if (conf.num_tests && ++(*cnt) >= (int) conf.num_tests && *test) {
uncount_processed (glog);
uncount_invalid (glog);
return 1;
if (logitem != NULL)
free_glog (logitem);
return NULL;
}
return 0;
return logitem;
}
/* A replacement for GNU getline() to dynamically expand fgets buffer.
@ -2001,84 +2004,166 @@ fgetline (FILE *fp) {
return NULL;
}
/* Iterate over the log and read line by line (use GNU get_line to parse the
* whole line).
*
* On error, 1 is returned.
* On success, 0 is returned. */
/* Parse chunk of lines to logitems */
void *
read_lines_thread (void *arg) {
GJob *job = (GJob *) arg;
for (int i=0; i<job->p; i++) {
job->logitems[i] = read_line (job->glog, job->lines[i], &job->test, &job->cnt, job->dry_run);
#ifdef WITH_GETLINE
static int
read_lines (FILE *fp, GLog *glog, int dry_run) {
char *line = NULL;
int ret = 0, cnt = 0, test = conf.num_tests > 0 ? 1 : 0;
glog->bytes = 0;
while ((line = fgetline (fp)) != NULL) {
/* handle SIGINT */
if (conf.stop_processing)
goto out;
if ((ret = read_line (glog, line, &test, &cnt, dry_run)))
goto out;
if (dry_run && NUM_TESTS == cnt)
goto out;
glog->bytes += strlen (line);
free (line);
glog->read++;
}
/* if no data was available to read from (probably from a pipe) and
* still in test mode, we simply return until data becomes available */
if (!line && (errno == EAGAIN || errno == EWOULDBLOCK) && test)
return 0;
return (line && test) || ret || (!line && test && glog->processed);
out:
free (line);
/* fails if
- we're still reading the log but the test flag was still set
- ret flag is not 0, read_line failed
- reached the end of file, test flag was still set and we processed lines */
return test || ret || (test && glog->processed);
}
free (job->lines[i]);
#endif
}
return (void *) 0;
}
/* Iterate over the log and read line by line (uses a buffer of fixed size).
void *
process_lines_thread (void *arg) {
GJob *job = (GJob *) arg;
for (int i = 0; i < job->p; i++) {
if (job->logitems[i] == NULL)
break;
if (!job->dry_run && job->logitems[i]->errstr == NULL)
process_log (job->logitems[i]);
count_process (job->glog);
free_glog (job->logitems[i]);
}
return (void *) 0;
}
/* Iterate over the log and read line by line.
* With GETLINE: use GNU get_line to parse the whole line.
* Without GETLINE: uses a buffer of fixed size.
*
* On error, 1 is returned.
* On success, 0 is returned. */
#ifndef WITH_GETLINE
static int
read_lines (FILE *fp, GLog *glog, int dry_run) {
int b, k, cnt = 0, test = conf.num_tests > 0 ? 1 : 0;
void *status;
GJob jobs[2][conf.jobs];
pthread_t threads[conf.jobs];
#ifndef WITH_GETLINE
char *s = NULL;
char line[LINE_BUFFER] = { 0 };
int ret = 0, cnt = 0, test = conf.num_tests > 0 ? 1 : 0;
#endif
glog->bytes = 0;
while ((s = fgets (line, LINE_BUFFER, fp)) != NULL) {
for (b = 0; b < 2; b++) {
for (k = 0; k < conf.jobs; k++) {
jobs[b][k].p = 0;
jobs[b][k].cnt = 0;
jobs[b][k].glog = glog;
jobs[b][k].test = test;
jobs[b][k].dry_run = dry_run;
jobs[b][k].running = 0;
jobs[b][k].logitems = xmalloc(conf.chunk_size * sizeof(GLogItem));
jobs[b][k].lines = xmalloc(conf.chunk_size * sizeof(char *));
#ifndef WITH_GETLINE
for (int i=0; i<conf.chunk_size; i++)
jobs[b][k].lines[i] = xmalloc(sizeof(char) * LINE_BUFFER);
#endif
}
}
b = 0;
while (!feof (fp)) { // b = 0 or 1
for (k = 1; k < conf.jobs || (conf.jobs == 1 && k == 1); k++) {
#ifdef WITH_GETLINE
while ((jobs[b][k].lines[jobs[b][k].p] = fgetline (fp)) != NULL) {
#else
while ((s = fgets (jobs[b][k].lines[jobs[b][k].p], LINE_BUFFER, fp)) != NULL) {
#endif
glog->bytes += strlen (jobs[b][k].lines[jobs[b][k].p]);
glog->read++;
if (++(jobs[b][k].p) >= conf.chunk_size)
break; // goto next chunk
}
} // for k = jobs
if (conf.jobs == 1) {
read_lines_thread(&jobs[b][1]);
} else {
for (k = 1; k < conf.jobs; k++) {
jobs[b][k].running = 1;
pthread_create(&threads[k], NULL, read_lines_thread, (void *) &jobs[b][k]);
}
}
/* flip from block A/B to B/A */
if (conf.jobs > 1)
b = b ^ 1;
for (k = 1; k < conf.jobs || (conf.jobs == 1 && k == 1); k++) {
process_lines_thread(&jobs[b][k]);
cnt += jobs[b][k].cnt;
jobs[b][k].cnt = 0;
test &= jobs[b][k].test;
jobs[b][k].p = 0;
}
/* flip from block B/A to A/B */
if (conf.jobs > 1)
b = b ^ 1;
for (k = 1; k < conf.jobs; k++) {
if (jobs[b][k].running) {
pthread_join(threads[k], &status);
jobs[b][k].running = 0;
}
}
if (dry_run && cnt >= NUM_TESTS)
break;
/* handle SIGINT */
if (conf.stop_processing)
break;
if ((ret = read_line (glog, line, &test, &cnt, dry_run)))
break;
if (dry_run && NUM_TESTS == cnt)
break;
glog->bytes += strlen (line);
glog->read++;
/* flip from block A/B to B/A */
if (conf.jobs > 1)
b = b ^ 1;
} // while (!eof)
/* After eof, process last data */
for (b = 0; b < 2; b++) {
for (k = 1; k < conf.jobs; k++) {
if (jobs[b][k].running) {
pthread_join(threads[k], &status);
jobs[b][k].running = 0;
}
if (jobs[b][k].p) {
process_lines_thread(&jobs[b][k]);
cnt += jobs[b][k].cnt;
jobs[b][k].cnt = 0;
test &= jobs[b][k].test;
jobs[b][k].p = 0;
}
}
} // while (!eof)
for (b = 0; b < 2; b++) {
for (k = 0; k < conf.jobs; k++) {
#ifndef WITH_GETLINE
for (int i=0; i<conf.chunk_size; i++)
free (jobs[b][k].lines[i]);
#endif
free (jobs[b][k].logitems);
free (jobs[b][k].lines);
}
}
/* if no data was available to read from (probably from a pipe) and
* still in test mode, we simply return until data becomes available */
if (!s && (errno == EAGAIN || errno == EWOULDBLOCK) && test)
if (errno == EAGAIN || errno == EWOULDBLOCK)
return 0;
/* fails if
- we're still reading the log but the test flag was still set
- ret flag is not 0, read_line failed
- reached the end of file, test flag was still set and we processed lines */
return (s && test) || ret || (!s && test && glog->processed);
return test;
}
#endif
/* Read the given log file and attempt to mmap a fixed number of bytes so we
* can compare its content on future runs.
@ -2088,6 +2173,7 @@ read_lines (FILE *fp, GLog *glog, int dry_run) {
int
set_initial_persisted_data (GLog *glog, FILE *fp, const char *fn) {
size_t len;
time_t now = time (0);
/* reset the snippet */
memset (glog->snippet, 0, sizeof (glog->snippet));
@ -2100,6 +2186,7 @@ set_initial_persisted_data (GLog *glog, FILE *fp, const char *fn) {
if ((fread (glog->snippet, len, 1, fp)) != 1 && ferror (fp))
FATAL ("Unable to fread the specified log file '%s'", fn);
glog->snippetlen = len;
localtime_r (&now, &glog->start_time);
fseek (fp, 0, SEEK_SET);

View File

@ -82,7 +82,7 @@ typedef struct GLogItem_ {
char *ref;
char *req;
char *req_key;
char *status;
int status;
char *time;
char *uniq_key;
char *vhost;
@ -136,9 +136,9 @@ typedef struct GLog_ {
uint16_t snippetlen;
char snippet[READ_BYTES + 1];
GLogItem *items;
GLastParse lp;
GLogProp props;
struct tm start_time;
char *fname_as_vhost;
char **errors;
@ -158,6 +158,14 @@ typedef struct Logs_ {
GLog *glog;
} Logs;
/* Pthread jobs for multi-thread */
typedef struct GJob_ {
int p, cnt, test, dry_run, running;
GLog *glog;
GLogItem **logitems;
char **lines;
} GJob;
/* Raw data field type */
typedef enum {
U32,
@ -183,16 +191,19 @@ typedef struct GRawData_ {
} GRawData;
char *extract_by_delim (char **str, const char *end);
char *extract_by_delim (const char **str, const char *end);
char *fgetline (FILE * fp);
char **test_format (Logs * logs, int *len);
int parse_log (Logs * logs, int dry_run);
int pre_process_log (GLog * glog, char *line, int dry_run);
GLogItem *parse_line (GLog * glog, char *line, int dry_run);
void *read_lines_thread (void *arg);
void *process_lines_thread (void *arg);
int set_glog (Logs * logs, const char *filename);
int set_initial_persisted_data (GLog * glog, FILE * fp, const char *fn);
int set_log (Logs * logs, const char *value);
void free_logerrors (GLog * glog);
void free_logs (Logs * logs);
void free_glog (GLogItem *logitem);
void free_raw_data (GRawData * raw_data);
void output_logerrors (void);
void reset_struct (Logs * logs);

View File

@ -342,7 +342,7 @@ migrate_si32_to_ii32 (GSMetric metric, const char *path, int module) {
}
static char *
migrate_unique_key (char *key) {
migrate_unique_key (const char *key) {
char *nkey = NULL, *token = NULL, *ptr = NULL;
char agent_hex[64] = { 0 };
uint32_t delims = 0;

View File

@ -103,7 +103,7 @@ typedef struct GConf_
const char *ignore_ips[MAX_IGNORE_IPS]; /* array of ips to ignore */
const char *ignore_panels[TOTAL_MODULES]; /* array of panels to ignore */
const char *ignore_referers[MAX_IGNORE_REF]; /* referrers to ignore */
const char *ignore_status[MAX_IGNORE_STATUS]; /* status to ignore */
int ignore_status[MAX_IGNORE_STATUS]; /* status to ignore */
const char *output_formats[MAX_OUTFORMATS]; /* output format, e.g. , HTML */
const char *sort_panels[TOTAL_MODULES]; /* sorting options for each panel */
const char *static_files[MAX_EXTENSIONS]; /* static extensions */
@ -154,7 +154,8 @@ typedef struct GConf_
int client_err_to_unique_count; /* count 400s as visitors */
int code444_as_404; /* 444 as 404s? */
int color_scheme; /* color scheme */
int crawlers_only ; /* crawlers only */
int chunk_size; /* chunk size for each thread */
int crawlers_only; /* crawlers only */
int daemonize; /* run program as a Unix daemon */
const char *username; /* user to run program as */
int double_decode; /* need to double decode */
@ -166,6 +167,7 @@ typedef struct GConf_
int unknowns_as_crawlers; /* unknown OS and browsers are classified as crawlers */
int ignore_qstr; /* ignore query string */
int ignore_statics; /* ignore static files */
int jobs; /* multi-thread jobs count */
int json_pretty_print; /* pretty print JSON data */
int list_agents; /* show list of agents per host */
int load_conf_dlg; /* load curses config dialog */

View File

@ -57,80 +57,37 @@
#include "xmalloc.h"
/* HTTP status codes categories */
static const char *code_type[][2] = {
{"1", STATUS_CODE_1XX},
{"2", STATUS_CODE_2XX},
{"3", STATUS_CODE_3XX},
{"4", STATUS_CODE_4XX},
{"5", STATUS_CODE_5XX},
static const char *code_type[] = {
NULL,
STATUS_CODE_1XX,
STATUS_CODE_2XX,
STATUS_CODE_3XX,
STATUS_CODE_4XX,
STATUS_CODE_5XX,
};
/* HTTP status codes */
static const char *codes[][2] = {
{"100", STATUS_CODE_100},
{"101", STATUS_CODE_101},
{"200", STATUS_CODE_200},
{"201", STATUS_CODE_201},
{"202", STATUS_CODE_202},
{"203", STATUS_CODE_203},
{"204", STATUS_CODE_204},
{"205", STATUS_CODE_205},
{"206", STATUS_CODE_206},
{"207", STATUS_CODE_207},
{"208", STATUS_CODE_208},
{"300", STATUS_CODE_300},
{"301", STATUS_CODE_301},
{"302", STATUS_CODE_302},
{"303", STATUS_CODE_303},
{"304", STATUS_CODE_304},
{"305", STATUS_CODE_305},
{"307", STATUS_CODE_307},
{"308", STATUS_CODE_308},
{"400", STATUS_CODE_400},
{"401", STATUS_CODE_401},
{"402", STATUS_CODE_402},
{"403", STATUS_CODE_403},
{"404", STATUS_CODE_404},
{"405", STATUS_CODE_405},
{"406", STATUS_CODE_406},
{"407", STATUS_CODE_407},
{"408", STATUS_CODE_408},
{"409", STATUS_CODE_409},
{"410", STATUS_CODE_410},
{"411", STATUS_CODE_411},
{"412", STATUS_CODE_412},
{"413", STATUS_CODE_413},
{"414", STATUS_CODE_414},
{"415", STATUS_CODE_415},
{"416", STATUS_CODE_416},
{"417", STATUS_CODE_417},
{"418", STATUS_CODE_418},
{"421", STATUS_CODE_421},
{"422", STATUS_CODE_422},
{"423", STATUS_CODE_423},
{"424", STATUS_CODE_424},
{"426", STATUS_CODE_426},
{"428", STATUS_CODE_428},
{"429", STATUS_CODE_429},
{"431", STATUS_CODE_431},
{"444", STATUS_CODE_444},
{"451", STATUS_CODE_451},
{"494", STATUS_CODE_494},
{"495", STATUS_CODE_495},
{"496", STATUS_CODE_496},
{"497", STATUS_CODE_497},
{"499", STATUS_CODE_499},
{"500", STATUS_CODE_500},
{"501", STATUS_CODE_501},
{"502", STATUS_CODE_502},
{"503", STATUS_CODE_503},
{"504", STATUS_CODE_504},
{"505", STATUS_CODE_505},
{"520", STATUS_CODE_520},
{"521", STATUS_CODE_521},
{"522", STATUS_CODE_522},
{"523", STATUS_CODE_523},
{"524", STATUS_CODE_524}
static const char *codes[1000] = {
[100]=STATUS_CODE_100, STATUS_CODE_101,
[200]=STATUS_CODE_200, STATUS_CODE_201, STATUS_CODE_202, STATUS_CODE_203, STATUS_CODE_204,
[205]=STATUS_CODE_205, STATUS_CODE_206, STATUS_CODE_207, STATUS_CODE_208,
[300]=STATUS_CODE_300, STATUS_CODE_301, STATUS_CODE_302, STATUS_CODE_303, STATUS_CODE_304,
[305]=STATUS_CODE_305, NULL, STATUS_CODE_307, STATUS_CODE_308,
[400]=STATUS_CODE_400, STATUS_CODE_401, STATUS_CODE_402, STATUS_CODE_403, STATUS_CODE_404,
[405]=STATUS_CODE_405, STATUS_CODE_406, STATUS_CODE_407, STATUS_CODE_408, STATUS_CODE_409,
[410]=STATUS_CODE_410, STATUS_CODE_411, STATUS_CODE_412, STATUS_CODE_413, STATUS_CODE_414,
[415]=STATUS_CODE_415, STATUS_CODE_416, STATUS_CODE_417, STATUS_CODE_418, NULL,
[420]=NULL, STATUS_CODE_421, STATUS_CODE_422, STATUS_CODE_423, STATUS_CODE_424,
[425]=NULL, STATUS_CODE_426, NULL, STATUS_CODE_428, STATUS_CODE_429,
[431]=STATUS_CODE_431,
[444]=STATUS_CODE_444,
[451]=STATUS_CODE_451,
[494]=STATUS_CODE_494,
[495]=STATUS_CODE_495, STATUS_CODE_496, STATUS_CODE_497, NULL, STATUS_CODE_499,
[500]=STATUS_CODE_500, STATUS_CODE_501, STATUS_CODE_502, STATUS_CODE_503, STATUS_CODE_504,
[505]=STATUS_CODE_505,
[520]=STATUS_CODE_520, STATUS_CODE_521, STATUS_CODE_522, STATUS_CODE_523, STATUS_CODE_524,
[999]=NULL
};
/* Return part of a string
@ -683,7 +640,7 @@ convert_date (char *res, const char *data, const char *from, const char *to, int
* On error, 1 is returned.
* On success, 0 is returned. */
int
invalid_ipaddr (char *str, int *ipvx) {
invalid_ipaddr (const char *str, int *ipvx) {
union {
struct sockaddr addr;
struct sockaddr_in6 addr6;
@ -745,13 +702,11 @@ file_size (const char *filename) {
* If not found, "Unknown" is returned.
* On success, the status code type/category is returned. */
const char *
verify_status_code_type (const char *str) {
size_t i;
for (i = 0; i < ARRAY_SIZE (code_type); i++)
if (strchr (code_type[i][0], str[0]) != NULL)
return _(code_type[i][1]);
verify_status_code_type (int code) {
if (code < 100 || code > 599 || code_type[code / 100] == NULL)
return "Unknown";
return "Unknown";
return code_type[code / 100];
}
/* Determine if the given status code is within the list of status
@ -760,13 +715,11 @@ verify_status_code_type (const char *str) {
* If not found, "Unknown" is returned.
* On success, the status code is returned. */
const char *
verify_status_code (char *str) {
size_t i;
for (i = 0; i < ARRAY_SIZE (codes); i++)
if (strstr (str, codes[i][0]) != NULL)
return _(codes[i][1]);
verify_status_code (int code) {
if (code < 100 || code > 599 || codes[code] == NULL)
return "Unknown";
return "Unknown";
return codes[code];
}
/* Checks if the given string is within the given array.

View File

@ -84,15 +84,15 @@ char *u322str (uint32_t d, int width);
char *u642str (uint64_t d, int width);
char *unescape_str (const char *src);
char *usecs_to_str (unsigned long long usec);
const char *verify_status_code (char *str);
const char *verify_status_code_type (const char *str);
const char *verify_status_code (int code);
const char *verify_status_code_type (int code);
int convert_date (char *res, const char *data, const char *from, const char *to, int size);
int count_matches (const char *s1, char c);
int find_output_type (char **filename, const char *ext, int alloc);
int hide_referer (const char *host);
int ignore_referer (const char *host);
int intlen (uint64_t num);
int invalid_ipaddr (char *str, int *ipvx);
int invalid_ipaddr (const char *str, int *ipvx);
int ip_in_range (const char *ip);
int ptr2int (char *ptr);
int str2int (const char *date);