はじめに

Linuxの認証ログは、Red Hat系では/var/log/secure、Debian系では/var/log/auth.logに格納されています。形式はsyslog(RFC3164)であることが一般的でしょう。けっこう厄介な半構造データで、awkやgrepなどのシェル芸を駆使して調査することが多いのではないでしょうか。

今回、syslogをRを使ってパースしてみます。元データにはElasticのサンプルを使わせていただきました。

library(tidyverse)
library(lubridate)
#log_raw <- read_lines("https://raw.githubusercontent.com/elastic/examples/master/Machine%20Learning/Security%20Analytics%20Recipes/suspicious_login_activity/data/auth.log")
log_raw <- read_lines("data/auth.log") 
log_raw[1:5]
## [1] "Mar 27 13:06:56 ip-10-77-20-248 sshd[1291]: Server listening on 0.0.0.0 port 22."                                 
## [2] "Mar 27 13:06:56 ip-10-77-20-248 sshd[1291]: Server listening on :: port 22."                                      
## [3] "Mar 27 13:06:56 ip-10-77-20-248 systemd-logind[1118]: Watching system buttons on /dev/input/event0 (Power Button)"
## [4] "Mar 27 13:06:56 ip-10-77-20-248 systemd-logind[1118]: Watching system buttons on /dev/input/event1 (Sleep Button)"
## [5] "Mar 27 13:06:56 ip-10-77-20-248 systemd-logind[1118]: New seat seat0."

一番左はタイムスタンプですが、タイムゾーンはおろか「年」がありません。ホスト名に続きsshdなどのサービス名。括弧はPIDで、その次からメッセージが格納されます。メッセージはサービスが自由に記述しますから、分析したいサービスに合わせてパースする必要があります。

正規表現を使う

まずは大雑把にパースしましょう。fluentdの説明書に正規表現のパターンが掲載されているので、そちらを流用します。

ptn <- r"{^(?<time>[^ ]* {1,2}[^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[^ :\[]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$}"
df_auth <- 
  log_raw |> 
  str_match(ptn) |> 
  as_tibble(.name_repair = "unique") |> 
  select(time, host, ident, message) |> 
  mutate(
    time = parse_date_time2(time, "b d H:M:S", tz = "UTC") |> update(year = 2021)
  )

df_auth
## # A tibble: 7,121 × 4
##    time                host            ident          message                   
##    <dttm>              <chr>           <chr>          <chr>                     
##  1 2021-03-27 13:06:56 ip-10-77-20-248 sshd           Server listening on 0.0.0…
##  2 2021-03-27 13:06:56 ip-10-77-20-248 sshd           Server listening on :: po…
##  3 2021-03-27 13:06:56 ip-10-77-20-248 systemd-logind Watching system buttons o…
##  4 2021-03-27 13:06:56 ip-10-77-20-248 systemd-logind Watching system buttons o…
##  5 2021-03-27 13:06:56 ip-10-77-20-248 systemd-logind New seat seat0.           
##  6 2021-03-27 13:08:09 ip-10-77-20-248 sshd           Accepted publickey for ub…
##  7 2021-03-27 13:08:09 ip-10-77-20-248 sshd           pam_unix(sshd:session): s…
##  8 2021-03-27 13:08:09 ip-10-77-20-248 systemd        pam_unix(systemd-user:ses…
##  9 2021-03-27 13:08:09 ip-10-77-20-248 systemd-logind New session 1 of user ubu…
## 10 2021-03-27 13:09:37 ip-10-77-20-248 sudo           ubuntu : TTY=pts/0 ; PWD=…
## # … with 7,111 more rows

パターン文字列の囲みが「r”{文字列}“」となっています。これはR 4.0から利用可能になったraw stringの書式です。バックスラッシュなどをエスケープしたくないので、利用しています。

正規表現処理にはstringrパッケージの関数を使っています。 str_match()は戻り値がmatrixなので、as_tibble()でデータフレームに変換しています。

最後に年を2021年(タイムゾーンをUTC)に設定し、timeフィールドを日時型に変換します。年をまたぐ可能性があるので危険ですが、今回のデータは年中の2か月分しかなかったので手を抜きました。

あとはmessageをidentに応じてパースしていけばよいでしょう。

Elastic Stackの力を借りる

……と言うは易く行うは難しで、個別サービスのパースも意外と手間なものです。たとえばsshだと、試行ユーザー名、成否、ソースIP、認証方式などなど、フィールドとして切り出せる候補が多々あります。加えて、ソースIPの情報をリッチ化しておきたいところです。

そこで、Elastic Stackの利用を考えます。filebeatsにパースさせて Elasticsearchにデータを取り込み、それをJSON形式でダンプします。あとはダンプしたデータをRに取り込んで分析すればよい、というわけです。

こうした目的に打ってつけなのが、SOF-ELKです。これはネットワークフォレンジックの用途に適したCentOS 7.xベースのディストロで、典型用途のログフォーマットをbeatsでパースしてKibanaで表示できるよう事前設定されています。詳細はRead Meをご確認ください。

SOF-ELKでsyslogを取り込む

syslogなら、/logstash/syslog以下に年のディレクトリ(2021、など)を作り、そこにauth.logファイルを放り込みます。するとfilebeatsが自動的にパースしてくれます。そして、logstash-YYYY-MM-DDという名前で、日付ごとにインデックスが作られます。結果は、sof-elk_clear.pyというコマンドで確認可能です。

[elk_user@sof-elk ~]$ sof-elk_clear.py -i list
The following indices are currently active in Elasticsearch:
- .async (0 documents)
- .ds (6 documents)
- .security (0 documents)
- logstash (7,121 documents)

取り込んだログはKibanaで閲覧・分析できます。もちろんKibanaで分析してもいいのですが、ここはRの話をする場所なので、先を続けましょう。

SOF-ELKにelasticdumpを入れる

Elasticsearchのインデックスをダンプするにはelasticdumpが便利ですが、 SOF-ELKにはインストールされていません。elasticdumpはnode.jsで動作しますので、そこから始めます。下に手順を簡単に示しました。

# リポジトリを登録する
$ sudo yum install centos-release-scl-rh

# node.jsをインストールする
$ sudo yum install rh-nodejs10

# bashにNode.js関係の環境変数を入れる
$ sudo scl enable rh-nodejs10 bash

# elasticdumpをインストールする
$ sudo npm install elasticdump -g

Cent OS 7.xにはデフォルトでNode.jsの6系が入っていますが、これだとelasticdumpが動作しません。ここではSoftware Collectionsの手によるNodeJS 10系を用いました。

elasticdumpでダンプする

elasticdumpは直感的に使えます。inputオプションにインデックス名を指定し、 outputオプションに出力ファイルを指定します。inputオプションではワイルドカードが使えるので、下のように2021年分を全部出す、という指定も可能です。

$ scl enable rh-nodejs10 bash
$ elasticdump --input=http://localhost:9200/logstash-2021* --output=logstash.json

これをR環境に持ってくる前に、JSONの十徳ナイフであるjqを使って整理しておきます。

$ cat logstash.json | jq ._source | jq -s > logstash.source.json

最初のjqは、Elasticsearchで実データが入っているフィールドが ._source以下だからで、少し無駄を省いただけです。

次のjqの-sオプションが重要です。API経由でJSONデータをバルクで取得した際、個々のJSONファイルの集合にはなっているものの、全体としては正規のJSONファイルになっていないケースを目にします。 elasticdumpも例外ではありませんでした。

jqの-sオプションでは、そうしたJSONファイルの集まりを配列でくるみ、正規のJSON形式に変換してくれるものです。

tidyrの関数群で矩形化する

以上でJSONファイルの準備ができましたから、これをRに取り込みます。

read_json()関数でリストとして取り込んだのち、必要なフィールドを hoist()関数で切り出していくのがいいのではないかと思います。下は一例で、sshdでフィルターして典型的なフィールドを抜き出したものです。

なお、Rにはjqrというパッケージが存在し、jqと同じコマンドが使えます。ただ、一定規模を超えるとスタックオーバーフローを引き起こすので、素直にCLIで前処理するか、どうしてもRスクリプトに入れておきたいなら system2()関数などで外部コマンドとして呼び出したほうがよいでしょう。

json_raw <- jsonlite::read_json("data/logstash.source.json")

df_logstash <- 
  json_raw %>% 
  enframe() %>% 
  hoist(value, "timestamp" = "@timestamp") %>% 
  mutate(timestamp = as_datetime(timestamp)) %>%
  hoist(value, "ident" = "syslog_program") %>% 
  filter(ident == "sshd") %>% 
  hoist(value, "user", 
        "method" = "login_method",
        "result" = "login_result", 
        "src_ip" = "source_ip",
        "asn" = list("source_geo", "asn"),
        "country" = list("source_geo", "country_name"),
        "message") %>% 
  select(!c("name", "value", "ident"))

df_logstash
## # A tibble: 4,095 × 8
##    timestamp           user    method    result   src_ip     asn country message
##    <dttm>              <chr>   <chr>     <chr>    <chr>    <int> <chr>   <chr>  
##  1 2021-03-27 17:08:36 <NA>    <NA>      <NA>     <NA>        NA <NA>    pam_un…
##  2 2021-03-27 14:02:16 <NA>    <NA>      <NA>     <NA>        NA <NA>    pam_un…
##  3 2021-03-27 17:08:23 <NA>    <NA>      <NA>     <NA>        NA <NA>    pam_un…
##  4 2021-03-27 18:27:20 support <NA>      <NA>     190.178… 22927 Argent… Invali…
##  5 2021-03-27 19:17:03 admin   <NA>      <NA>     201.43.… 27699 Brazil  Invali…
##  6 2021-03-27 22:02:41 ftpuser <NA>      <NA>     112.135…  9329 Sri La… Invali…
##  7 2021-03-27 15:48:29 <NA>    <NA>      <NA>     <NA>        NA <NA>    pam_un…
##  8 2021-03-27 20:29:31 <NA>    <NA>      <NA>     <NA>        NA <NA>    pam_un…
##  9 2021-03-27 16:16:06 <NA>    <NA>      <NA>     218.65.…  4134 China   Discon…
## 10 2021-03-27 17:08:36 ubuntu  publickey Accepted 85.245.…  3243 Portug… Accept…
## # … with 4,085 more rows

4000行ほどしかないので一通り目を通してみると、filebeatではパースできていない行が見つかったりもします。その点に留意したうえで、ggplot2を使って視覚化などするのが便利な使い方だと思います。

更新履歴

2022-12-10 stringrで名前付きキャプチャーが可能なことに気づいたので、そちらを使うようにしました。