library(reticulate)
library(tidyverse)
## ── Attaching core tidyverse packages ──────────────────────── tidyverse 2.0.0 ──
## ✔ dplyr     1.1.3     ✔ readr     2.1.4
## ✔ forcats   1.0.0     ✔ stringr   1.5.0
## ✔ ggplot2   3.4.3     ✔ tibble    3.2.1
## ✔ lubridate 1.9.2     ✔ tidyr     1.3.0
## ✔ purrr     1.0.2     
## ── Conflicts ────────────────────────────────────────── tidyverse_conflicts() ──
## ✖ dplyr::filter() masks stats::filter()
## ✖ dplyr::lag()    masks stats::lag()
## ℹ Use the conflicted package (<http://conflicted.r-lib.org/>) to force all conflicts to become errors
use_condaenv('ct-log-analyzer')
isKnitting <- isTRUE(getOption('knitr.in.progress'))
import json
import re
from contextlib import contextmanager
from typing import Dict, Optional

from dateutil import parser as ts_parser
from elasticsearch import Elasticsearch
import jmespath
import pandas as pd
from pygments import highlight
from pygments.lexers import JsonLexer
from pygments.formatters import HtmlFormatter, TerminalFormatter
def json_format(a_dict: Dict[str, any]) -> str:
  return highlight(
    json.dumps(a_dict, indent=2),
    JsonLexer(),
    HtmlFormatter() if r.isKnitting else TerminalFormatter()
  )

def json_pp(a_dict: Dict[str, any]):
  print(json_format(a_dict))
es = Elasticsearch('http://localhost:9200')
def query_run(index: str, body: Dict[str, any], search: Optional[str] = None):
  result = es.search(index=index, **body).body
  return (
    jmespath.search(expression=search, data=result) 
    if search is not None else result
  )

def query_format(index: str, body: Dict[str, any], search: Optional[str] = None): 
  return json_format(query_run(index, body, search))

def query_pp(index: str, body: Dict[str, any], search: Optional[str] = None):
  print(query_format(index, body, search))

Available Indices

indices = es.indices.get(index="*")
json_pp(list(indices.keys()))
[
  "continuous-tests-pods-2023.09.25",
  "continuous-tests-pods-2023.09.26",
  "dist-tests-logs-2023.09.19",
  "dist-tests-pods-2023.09.25",
  "dist-tests-runner-2023.09.19",
  "dist-tests-runners-2023.09.19",
  "dist-tests-status-2023.08.31",
  "dist-tests-status-2023.09.01",
  "dist-tests-status-2023.09.04",
  "dist-tests-status-2023.09.06",
  "dist-tests-status-2023.09.11",
  "dist-tests-status-2023.09.14",
  "dist-tests-status-2023.09.19"
]
INDEX = 'continuous-tests-pods-2023.09.25'

Available Run IDs

query_pp(INDEX, {
  'size': 0,
  'aggs': {
    'distinct_runids': {
      'terms': {
        'field': 'kubernetes.pod_labels.runid.keyword'
      }  
    }
  }
}, 'aggregations')
{
  "distinct_runids": {
    "doc_count_error_upper_bound": 0,
    "sum_other_doc_count": 0,
    "buckets": [
      {
        "key": "20230925-112739",
        "doc_count": 18723380
      }
    ]
  }
}

Oops, I guess I screwed up setting the run ID (this run ID is Slava’s). But I do use a custom codex image, so maybe I can use that to identify my run. Which field is it stored in? Let’s have a look at a document.

query_pp(INDEX, {
  'size': 1,
  'query': {
   'match_all': {}
  }
}, 'hits.hits[0]')
{
  "_index": "continuous-tests-pods-2023.09.25",
  "_id": "oE1izIoB9lE2L5EEMZbY",
  "_score": 1.0,
  "_ignored": [
    "message.keyword"
  ],
  "_source": {
    "file": "/var/log/pods/codex-continuous-tests-slava_codex4-6-fcdc4f4b9-7tgrk_168a95cf-c6b5-4171-8d57-bdca566c49de/codex4-6/0.log.20230925-124813",
    "kubernetes": {
      "container_id": "containerd://634fd8e604c30e41970320b98775ce0355e2e7ce38540c07174de14d5980d5c3",
      "container_image": "codexstorage/nim-codex:latest-dist-tests",
      "container_image_id": "docker.io/codexstorage/nim-codex@sha256:f9b21c2e563b797bc94950b6fb5acd3166282adf1c51032edf41482a7c16c39e",
      "container_name": "codex4-6",
      "pod_ip": "10.244.1.32",
      "pod_labels": {
        "app": "codex",
        "codexid": "codexstorage-nim-codex-latest-dist-tests",
        "pod-template-hash": "fcdc4f4b9",
        "runid": "20230925-112739",
        "testid": "envvar-testid-notset",
        "tests-type": "continuous-tests"
      },
      "pod_name": "codex4-6-fcdc4f4b9-7tgrk",
      "pod_namespace": "codex-continuous-tests-slava",
      "pod_node_name": "pool-8vcpu-16gb-ypopd",
      "pod_uid": "168a95cf-c6b5-4171-8d57-bdca566c49de"
    },
    "message": "TRC 2023-09-25 12:47:54.182+00:00 pushing data to channel                    topics=\"libp2p mplex\" tid=1 m=16U*iu7J5H:6511814f461d13b0bcaffda5 channel=16U*iu7J5H:6511814f461d13b0bcaffdaa:6511814ff7b02ba1773fd38a len=65535 msgType=MsgOut id=2 initiator=false size=65535",
    "source_type": "kubernetes_logs",
    "stream": "stdout",
    "timestamp": "2023-09-25T12:47:54.182546549Z",
    "timestamp_end": "2023-09-25T12:47:54.182546549Z"
  }
}

It’s container_image, let’s check that it contains what I need:

query_pp(INDEX, {
  'size': 0,
  'aggs': {
    'distinct_codex_images': {
      'terms': {
        'field': 'container_image.keyword'
      }
    }
  },
}, 'aggregations')
{
  "distinct_codex_images": {
    "doc_count_error_upper_bound": 0,
    "sum_other_doc_count": 0,
    "buckets": [
      {
        "key": "docker.io/codexstorage/nim-codex:sha-9d735f9-dist-tests",
        "doc_count": 77600129
      },
      {
        "key": "codexstorage/dist-tests-geth:latest",
        "doc_count": 214420
      },
      {
        "key": "grafana/grafana-oss:10.0.3",
        "doc_count": 1251
      },
      {
        "key": "codexstorage/dist-tests-prometheus:latest",
        "doc_count": 84
      },
      {
        "key": "codexstorage/codex-contracts-eth:latest-dist-tests",
        "doc_count": 13
      }
    ]
  }
}

OK it works, so we filter by:

DOCKER_IMAGE = 'docker.io/codexstorage/nim-codex:sha-9d735f9-dist-tests'
NAMESPACE = 'codex-continuous-tests'
POD_NAME = 'codex3-workflow3-ff476767d-98zx4'

Log Validation

VALIDATION_QUERY = {
  'query': {
    'bool': {
      'filter': [
          {
            'term': {
              'container_image.keyword': DOCKER_IMAGE
            }
          },
          {
            'term': {
              'pod_namespace.keyword': NAMESPACE  
            }
          },
          {
            'term': {
              'pod_name.keyword': POD_NAME
            }
          }
      ]
    }
  },
  'fields': [ '@timestamp', 'pod_name', 'message' ],
  # sorting is expensive but makes it easier on our end.
  'sort': [ { 'timestamp': { 'order': 'asc' } } ]
}

To curb dataset size, we’ll look into a few range caps.

query_pp(
  INDEX,
  {
    'size': 0,
    'query': VALIDATION_QUERY['query'],
    'aggs': {
      'max_timestamp': { 'max': { 'field': '@timestamp' } },
      'min_timestamp': { 'min': { 'field': '@timestamp' } }
    }
  },
  'aggregations'
)
{
  "min_timestamp": {
    "value": 1695646943559.0,
    "value_as_string": "2023-09-25T13:02:23.559Z"
  },
  "max_timestamp": {
    "value": 1695681230750.0,
    "value_as_string": "2023-09-25T22:33:50.750Z"
  }
}
buckets = query_run(
  INDEX,
  { 
    'size': 0,
    'query': VALIDATION_QUERY['query'],
    'aggs': {
      'hourly_counts': {
        'date_histogram': {
          'field': '@timestamp',
          'fixed_interval': '30m'
        }
      }
    }
  },
  'aggregations.hourly_counts.buckets | [][ key_as_string, doc_count ]'
)

json_pp(buckets)
[
  [
    "2023-09-25T13:00:00.000Z",
    894971
  ],
  [
    "2023-09-25T13:30:00.000Z",
    1506779
  ],
  [
    "2023-09-25T14:00:00.000Z",
    1569353
  ],
  [
    "2023-09-25T14:30:00.000Z",
    1142445
  ],
  [
    "2023-09-25T15:00:00.000Z",
    1116724
  ],
  [
    "2023-09-25T15:30:00.000Z",
    303417
  ],
  [
    "2023-09-25T16:00:00.000Z",
    1053777
  ],
  [
    "2023-09-25T16:30:00.000Z",
    767584
  ],
  [
    "2023-09-25T17:00:00.000Z",
    1264705
  ],
  [
    "2023-09-25T17:30:00.000Z",
    354873
  ],
  [
    "2023-09-25T18:00:00.000Z",
    1121863
  ],
  [
    "2023-09-25T18:30:00.000Z",
    433585
  ],
  [
    "2023-09-25T19:00:00.000Z",
    1023348
  ],
  [
    "2023-09-25T19:30:00.000Z",
    530973
  ],
  [
    "2023-09-25T20:00:00.000Z",
    922793
  ],
  [
    "2023-09-25T20:30:00.000Z",
    635274
  ],
  [
    "2023-09-25T21:00:00.000Z",
    848572
  ],
  [
    "2023-09-25T21:30:00.000Z",
    711271
  ],
  [
    "2023-09-25T22:00:00.000Z",
    769870
  ],
  [
    "2023-09-25T22:30:00.000Z",
    77480
  ]
]

We’ll start with the first \(30\)-min bucket by scrolling through the index and checking for gaps.

COUNTER_RE = re.compile(r'count=(?P<counter>\d+)$')


def extract_counter(document) -> Optional[int]:
  message = document['message']
  counter = COUNTER_RE.search(message)
  if counter is None:
    return None
  
  return int(counter.groupdict()['counter'])


def extract(documents, n_rows: int):
  for i, document in enumerate(documents):
    if i == n_rows:
      break
    
    content = document['_source']
    counter = extract_counter(content)
    if counter is None:
      continue
    
    yield {
      'timestamp': content['@timestamp'],
      'counter': counter
    }
@contextmanager
def scoped_scan(index, query, batch_size=10_000):
  initial = es.search(index=index, **query, scroll='2m', size=batch_size)
  scroll_id = initial['_scroll_id']
  
  def scroll_iterator(initial):
    results = initial
    while True:
      docs = results['hits']['hits']
      if not docs:
        break
      
      for doc in docs:
        yield doc
      
      results = es.scroll(
        scroll_id=scroll_id,
        scroll='2m'  
      )

  try:  
    yield scroll_iterator(initial)
  finally:
    print('Clear scroll.')
    es.clear_scroll(scroll_id=scroll_id)
import copy 

def constrained_validation_query(until, since = None, drop = []):
  query_copy = copy.deepcopy(VALIDATION_QUERY)
  range_clause = { 'lte': until }
  
  if since is not None:
    range_clause['gt'] = since

  query_copy['query']['bool']['filter'].append({
    'range': {
      '@timestamp': range_clause
    }
  })

  for key in drop:
    query_copy.pop(key)
  
  return query_copy

Let’s double check we’re picking the right bucket:

es.count(index=INDEX, **constrained_validation_query('2023-09-25T15:00:00.000Z', drop=['fields', 'sort']))
## ObjectApiResponse({'count': 5113548, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}})

We’ll download logs and write them locally to make this simpler.

from csv import DictWriter
def stream_to_disk(fname, until, since = None):
  print(f'Stream from {since} to {until} using {fname}')
  with scoped_scan(
        index=INDEX, 
        query=constrained_validation_query(until=until, since=since, drop=['fields', 'sort'])
      ) as log_entries, open(
        fname, 
        mode='w', 
        encoding='utf-8'
      ) as outfile:
    writer = DictWriter(outfile, ['timestamp', 'counter'])
    for i, doc in enumerate(extract(log_entries, n_rows=100_000_000_000)):
      a = writer.writerow(doc)
    
    return i
for i, (ts, count) in enumerate(buckets[4:]):
  bucket_start = ts_parser.parse(ts)
  written = stream_to_disk(f'./logs/from_es/codexlogs-{i}.csv', 
    since=bucket_start.isoformat(), 
    until=(bucket_start + timedelta(minutes=30)).isoformat())
    
  print(f'Bucket size was {count}, written {written}.')
library(megautils)

codex3_logs <- purrr::map(list.files('./logs/from_es/', full.names = TRUE), function(path) {
  content <- read_csv(path)
  colnames(content) <- c('timestamp', 'counter')
  content
}) |> bind_rows()

Have we downloaded all data?

min(codex3_logs$timestamp)
## [1] "2023-09-25 13:02:23 UTC"
max(codex3_logs$timestamp)
## [1] "2023-09-25 18:30:00 UTC"
codex3_logs |> nrow()
## [1] 11096871

OK, we are missing \(2\) log messages, but let’s go ahead anyway.

codex3_logs |> 
  arrange(counter, timestamp) |> 
  select(counter, timestamp) |>
  mutate(gap = counter - lag(counter)) |>
  group_by(gap) |> 
  filter(gap > 1)

These size \(2\) gaps are probably artifacts from bucketed reading. So there’s really no huge loss going on, at least on these logs, as far as I can see.