library(reticulate)
library(tidyverse)
## ── Attaching core tidyverse packages ──────────────────────── tidyverse 2.0.0 ──
## ✔ dplyr 1.1.3 ✔ readr 2.1.4
## ✔ forcats 1.0.0 ✔ stringr 1.5.0
## ✔ ggplot2 3.4.3 ✔ tibble 3.2.1
## ✔ lubridate 1.9.2 ✔ tidyr 1.3.0
## ✔ purrr 1.0.2
## ── Conflicts ────────────────────────────────────────── tidyverse_conflicts() ──
## ✖ dplyr::filter() masks stats::filter()
## ✖ dplyr::lag() masks stats::lag()
## ℹ Use the conflicted package (<http://conflicted.r-lib.org/>) to force all conflicts to become errors
use_condaenv('ct-log-analyzer')
isKnitting <- isTRUE(getOption('knitr.in.progress'))
import json
import re
from contextlib import contextmanager
from typing import Dict, Optional
from dateutil import parser as ts_parser
from elasticsearch import Elasticsearch
import jmespath
import pandas as pd
from pygments import highlight
from pygments.lexers import JsonLexer
from pygments.formatters import HtmlFormatter, TerminalFormatter
def json_format(a_dict: Dict[str, any]) -> str:
return highlight(
json.dumps(a_dict, indent=2),
JsonLexer(),
HtmlFormatter() if r.isKnitting else TerminalFormatter()
)
def json_pp(a_dict: Dict[str, any]):
print(json_format(a_dict))
es = Elasticsearch('http://localhost:9200')
def query_run(index: str, body: Dict[str, any], search: Optional[str] = None):
result = es.search(index=index, **body).body
return (
jmespath.search(expression=search, data=result)
if search is not None else result
)
def query_format(index: str, body: Dict[str, any], search: Optional[str] = None):
return json_format(query_run(index, body, search))
def query_pp(index: str, body: Dict[str, any], search: Optional[str] = None):
print(query_format(index, body, search))
indices = es.indices.get(index="*")
json_pp(list(indices.keys()))
[
"continuous-tests-pods-2023.09.25",
"continuous-tests-pods-2023.09.26",
"dist-tests-logs-2023.09.19",
"dist-tests-pods-2023.09.25",
"dist-tests-runner-2023.09.19",
"dist-tests-runners-2023.09.19",
"dist-tests-status-2023.08.31",
"dist-tests-status-2023.09.01",
"dist-tests-status-2023.09.04",
"dist-tests-status-2023.09.06",
"dist-tests-status-2023.09.11",
"dist-tests-status-2023.09.14",
"dist-tests-status-2023.09.19"
]
INDEX = 'continuous-tests-pods-2023.09.25'
query_pp(INDEX, {
'size': 0,
'aggs': {
'distinct_runids': {
'terms': {
'field': 'kubernetes.pod_labels.runid.keyword'
}
}
}
}, 'aggregations')
{
"distinct_runids": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "20230925-112739",
"doc_count": 18723380
}
]
}
}
Oops, I guess I screwed up setting the run ID (this run ID is Slava’s). But I do use a custom codex image, so maybe I can use that to identify my run. Which field is it stored in? Let’s have a look at a document.
query_pp(INDEX, {
'size': 1,
'query': {
'match_all': {}
}
}, 'hits.hits[0]')
{
"_index": "continuous-tests-pods-2023.09.25",
"_id": "oE1izIoB9lE2L5EEMZbY",
"_score": 1.0,
"_ignored": [
"message.keyword"
],
"_source": {
"file": "/var/log/pods/codex-continuous-tests-slava_codex4-6-fcdc4f4b9-7tgrk_168a95cf-c6b5-4171-8d57-bdca566c49de/codex4-6/0.log.20230925-124813",
"kubernetes": {
"container_id": "containerd://634fd8e604c30e41970320b98775ce0355e2e7ce38540c07174de14d5980d5c3",
"container_image": "codexstorage/nim-codex:latest-dist-tests",
"container_image_id": "docker.io/codexstorage/nim-codex@sha256:f9b21c2e563b797bc94950b6fb5acd3166282adf1c51032edf41482a7c16c39e",
"container_name": "codex4-6",
"pod_ip": "10.244.1.32",
"pod_labels": {
"app": "codex",
"codexid": "codexstorage-nim-codex-latest-dist-tests",
"pod-template-hash": "fcdc4f4b9",
"runid": "20230925-112739",
"testid": "envvar-testid-notset",
"tests-type": "continuous-tests"
},
"pod_name": "codex4-6-fcdc4f4b9-7tgrk",
"pod_namespace": "codex-continuous-tests-slava",
"pod_node_name": "pool-8vcpu-16gb-ypopd",
"pod_uid": "168a95cf-c6b5-4171-8d57-bdca566c49de"
},
"message": "TRC 2023-09-25 12:47:54.182+00:00 pushing data to channel topics=\"libp2p mplex\" tid=1 m=16U*iu7J5H:6511814f461d13b0bcaffda5 channel=16U*iu7J5H:6511814f461d13b0bcaffdaa:6511814ff7b02ba1773fd38a len=65535 msgType=MsgOut id=2 initiator=false size=65535",
"source_type": "kubernetes_logs",
"stream": "stdout",
"timestamp": "2023-09-25T12:47:54.182546549Z",
"timestamp_end": "2023-09-25T12:47:54.182546549Z"
}
}
It’s container_image
, let’s check that it contains what
I need:
query_pp(INDEX, {
'size': 0,
'aggs': {
'distinct_codex_images': {
'terms': {
'field': 'container_image.keyword'
}
}
},
}, 'aggregations')
{
"distinct_codex_images": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "docker.io/codexstorage/nim-codex:sha-9d735f9-dist-tests",
"doc_count": 77600129
},
{
"key": "codexstorage/dist-tests-geth:latest",
"doc_count": 214420
},
{
"key": "grafana/grafana-oss:10.0.3",
"doc_count": 1251
},
{
"key": "codexstorage/dist-tests-prometheus:latest",
"doc_count": 84
},
{
"key": "codexstorage/codex-contracts-eth:latest-dist-tests",
"doc_count": 13
}
]
}
}
OK it works, so we filter by:
DOCKER_IMAGE = 'docker.io/codexstorage/nim-codex:sha-9d735f9-dist-tests'
NAMESPACE = 'codex-continuous-tests'
POD_NAME = 'codex3-workflow3-ff476767d-98zx4'
VALIDATION_QUERY = {
'query': {
'bool': {
'filter': [
{
'term': {
'container_image.keyword': DOCKER_IMAGE
}
},
{
'term': {
'pod_namespace.keyword': NAMESPACE
}
},
{
'term': {
'pod_name.keyword': POD_NAME
}
}
]
}
},
'fields': [ '@timestamp', 'pod_name', 'message' ],
# sorting is expensive but makes it easier on our end.
'sort': [ { 'timestamp': { 'order': 'asc' } } ]
}
To curb dataset size, we’ll look into a few range caps.
query_pp(
INDEX,
{
'size': 0,
'query': VALIDATION_QUERY['query'],
'aggs': {
'max_timestamp': { 'max': { 'field': '@timestamp' } },
'min_timestamp': { 'min': { 'field': '@timestamp' } }
}
},
'aggregations'
)
{
"min_timestamp": {
"value": 1695646943559.0,
"value_as_string": "2023-09-25T13:02:23.559Z"
},
"max_timestamp": {
"value": 1695681230750.0,
"value_as_string": "2023-09-25T22:33:50.750Z"
}
}
buckets = query_run(
INDEX,
{
'size': 0,
'query': VALIDATION_QUERY['query'],
'aggs': {
'hourly_counts': {
'date_histogram': {
'field': '@timestamp',
'fixed_interval': '30m'
}
}
}
},
'aggregations.hourly_counts.buckets | [][ key_as_string, doc_count ]'
)
json_pp(buckets)
[
[
"2023-09-25T13:00:00.000Z",
894971
],
[
"2023-09-25T13:30:00.000Z",
1506779
],
[
"2023-09-25T14:00:00.000Z",
1569353
],
[
"2023-09-25T14:30:00.000Z",
1142445
],
[
"2023-09-25T15:00:00.000Z",
1116724
],
[
"2023-09-25T15:30:00.000Z",
303417
],
[
"2023-09-25T16:00:00.000Z",
1053777
],
[
"2023-09-25T16:30:00.000Z",
767584
],
[
"2023-09-25T17:00:00.000Z",
1264705
],
[
"2023-09-25T17:30:00.000Z",
354873
],
[
"2023-09-25T18:00:00.000Z",
1121863
],
[
"2023-09-25T18:30:00.000Z",
433585
],
[
"2023-09-25T19:00:00.000Z",
1023348
],
[
"2023-09-25T19:30:00.000Z",
530973
],
[
"2023-09-25T20:00:00.000Z",
922793
],
[
"2023-09-25T20:30:00.000Z",
635274
],
[
"2023-09-25T21:00:00.000Z",
848572
],
[
"2023-09-25T21:30:00.000Z",
711271
],
[
"2023-09-25T22:00:00.000Z",
769870
],
[
"2023-09-25T22:30:00.000Z",
77480
]
]
We’ll start with the first \(30\)-min bucket by scrolling through the index and checking for gaps.
COUNTER_RE = re.compile(r'count=(?P<counter>\d+)$')
def extract_counter(document) -> Optional[int]:
message = document['message']
counter = COUNTER_RE.search(message)
if counter is None:
return None
return int(counter.groupdict()['counter'])
def extract(documents, n_rows: int):
for i, document in enumerate(documents):
if i == n_rows:
break
content = document['_source']
counter = extract_counter(content)
if counter is None:
continue
yield {
'timestamp': content['@timestamp'],
'counter': counter
}
@contextmanager
def scoped_scan(index, query, batch_size=10_000):
initial = es.search(index=index, **query, scroll='2m', size=batch_size)
scroll_id = initial['_scroll_id']
def scroll_iterator(initial):
results = initial
while True:
docs = results['hits']['hits']
if not docs:
break
for doc in docs:
yield doc
results = es.scroll(
scroll_id=scroll_id,
scroll='2m'
)
try:
yield scroll_iterator(initial)
finally:
print('Clear scroll.')
es.clear_scroll(scroll_id=scroll_id)
import copy
def constrained_validation_query(until, since = None, drop = []):
query_copy = copy.deepcopy(VALIDATION_QUERY)
range_clause = { 'lte': until }
if since is not None:
range_clause['gt'] = since
query_copy['query']['bool']['filter'].append({
'range': {
'@timestamp': range_clause
}
})
for key in drop:
query_copy.pop(key)
return query_copy
Let’s double check we’re picking the right bucket:
es.count(index=INDEX, **constrained_validation_query('2023-09-25T15:00:00.000Z', drop=['fields', 'sort']))
## ObjectApiResponse({'count': 5113548, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}})
We’ll download logs and write them locally to make this simpler.
from csv import DictWriter
def stream_to_disk(fname, until, since = None):
print(f'Stream from {since} to {until} using {fname}')
with scoped_scan(
index=INDEX,
query=constrained_validation_query(until=until, since=since, drop=['fields', 'sort'])
) as log_entries, open(
fname,
mode='w',
encoding='utf-8'
) as outfile:
writer = DictWriter(outfile, ['timestamp', 'counter'])
for i, doc in enumerate(extract(log_entries, n_rows=100_000_000_000)):
a = writer.writerow(doc)
return i
for i, (ts, count) in enumerate(buckets[4:]):
bucket_start = ts_parser.parse(ts)
written = stream_to_disk(f'./logs/from_es/codexlogs-{i}.csv',
since=bucket_start.isoformat(),
until=(bucket_start + timedelta(minutes=30)).isoformat())
print(f'Bucket size was {count}, written {written}.')
library(megautils)
codex3_logs <- purrr::map(list.files('./logs/from_es/', full.names = TRUE), function(path) {
content <- read_csv(path)
colnames(content) <- c('timestamp', 'counter')
content
}) |> bind_rows()
Have we downloaded all data?
min(codex3_logs$timestamp)
## [1] "2023-09-25 13:02:23 UTC"
max(codex3_logs$timestamp)
## [1] "2023-09-25 18:30:00 UTC"
codex3_logs |> nrow()
## [1] 11096871
OK, we are missing \(2\) log messages, but let’s go ahead anyway.
codex3_logs |>
arrange(counter, timestamp) |>
select(counter, timestamp) |>
mutate(gap = counter - lag(counter)) |>
group_by(gap) |>
filter(gap > 1)
These size \(2\) gaps are probably artifacts from bucketed reading. So there’s really no huge loss going on, at least on these logs, as far as I can see.