はじめに

SOF-ELKは Elasticスタックを用いたOSSのログ分析ツールであり、 SANS FOR572(Advanced Network Forensics and Analysis)や FOR509(Enterprise Cloud Forensics and Incident Response)の授業で採用されています。NetflowログからGCPログに至るまで、事前に定義されたディレクトリにファイルを格納するだけでインデックス化してくれる点が分析者にとって便利な点です。データの取り込みに関しては、非常に優秀だと言えます。

その反面、データ加工や可視化ツールとしては、SOF-ELKは快適ではありません。それはSOF-ELKというより、Kibanaの技術的制約です。 Kibanaのデフォルトのクエリ言語は表現力に乏しく(たとえばsummarizeできない)、データの把握には向いていません。また、多くの操作をGUIに依存しているので、手早い分析が困難なうえに、再利用がしづらいです。

そこで、MSTICPyのときと同様のアプローチを考えましょう。すなわちSOF-ELKのデータをRで分析できるようにするのです。

セットアップ

SOF-ELKの準備

ElasticsearchのAPI(9200/tcp)にアクセスできる必要があります。 SOF-ELKの既定の設定では、APIアクセスはlocalhostに限定されています。たとえばSSHのローカルポート転送機能を使って、localhost:9200で Elasticsearchにアクセスできるようにしましょう。

私はいまFOR509を学習中なので、FOR509のSOF-ELKに接続しています。

$ ssh -L 9200:localhost:9200 elk_user@<IPADDRESS>
$ curl http://localhost:9200/
{
  "name" : "sof-elk",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "_fZNncROSXCVh78fMMqjVg",
  "version" : {
    "number" : "8.7.1",
    "build_flavor" : "default",
    "build_type" : "rpm",
    "build_hash" : "f229ed3f893a515d590d0f39b05f68913e2d9b53",
    "build_date" : "2023-04-27T04:33:42.127815583Z",
    "build_snapshot" : false,
    "lucene_version" : "9.5.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

PythonおよびRProjectの準備

MSTICPyのときと同様に、 Reticulateパッケージを使って Pandasデータフレームを取得します。

Mambaforge(Miniforge3)が入っていることを前提に、以下のコマンドで「elastic」という名前のConda環境を作ります。 elandは Elasticが提供するPythonクライアントです。

mamba create -n elastic eland

RStudioでは、プロジェクトごとにPython実行環境を指定できます。〈Tools〉-〈Project Options…〉のPythonタブにて、先に作成したConda環境のPythonを選択します。

方法1: Elasticsearch SQL

インデックスの一覧を取得する

まず、インデックスの一覧を取得しましょう。 curlを使ってインデックス一覧を取得するときには、以下のようなリクエスト文字列を使うことが一般的に紹介されています。 SOF-ELKのM365モジュールは、月別にインデックスを生成するようですね。

$ curl -X GET "localhost:9200/_cat/indices?v"
health status index                uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   microsoft365-2023.05 LaKC2SJhRiOc-0nm9A1FBg   1   0        531            0      1.5mb          1.5mb
green  open   microsoft365-2023.04 eNJpQW5XSY6DwJIQhFYKpw   1   0        121            0    568.1kb        568.1kb

しかしElasticsearchでは、 Elasticsearch SQLというSQLライクなクエリ言語が使えます。この機能を使ってインデックスの一覧を取得してみます。

$ curl -X POST "localhost:9200/_sql?format=txt" -H 'Content-Type: application/json' -d'
{
  "query": "SHOW TABLES"
}'
    catalog    |        name        |     type      |     kind
---------------+--------------------+---------------+---------------
elasticsearch  |microsoft365-2023.04|TABLE          |INDEX
elasticsearch  |microsoft365-2023.05|TABLE          |INDEX

内部動作では、Elasticsearch SQL → Query DSL → Lucene Queryのように変換されています。 Kibanaで使えるKQL(MicrosoftのKusutoとは異なる言語です)も、同じようにKQL → Query DSL → Lucene Queryの変換経路をたどります。

KibanaのDiscovery画面ではKQLかLuceneかの2択を迫られますが、使い慣れたSQLが使えることは、データ分析者にとっては非常に有用です。

Pandasデータフレームを取得する

では、microsoft365系のインデックスを対象に、 2023年4月22日から26日までの時間範囲で、 timestamp、user_name、operation、workloadの4つのフィールドを取得しましょう。さらに、画面上の都合で、5件だけを選択することにします。

まずpandasとelasticsearchをインポートして、次にSQL文を書きます。

import pandas as pd
from elasticsearch import Elasticsearch

sql = """
  SELECT "@timestamp" AS timestamp, user_name, operation, workload 
  FROM "microsoft365-*" 
  WHERE timestamp BETWEEN '2023-04-22T00:00:00.000Z' AND '2023-04-26T00:00:00.000Z'
  LIMIT 5
"""

KQLと違って、時刻範囲をクエリの中に直接書ける点が個人的には嬉しいです。上ではISO 8601形式で書いていますが、もっと緩く「2023-04-22」という日付単位でも動作します。

引用符の扱いに注意しましょう。SELECTやFROMの中で二重引用符を使っているのは、フィールド名のエスケープ文字としてSQL-92で定義されているからです。そして、WHERE句の中で一重引用符を使っているのは、文字列リテラルだからです。

では、このSQL文を本文に埋め込んで、Elasticに対してリクエストを投げます。 JSONを指定するヘッダーは必須です。

es = Elasticsearch(['http://localhost:9200'])
response = es.transport.perform_request(
    'POST', '/_sql',
    body={"query": sql},
    headers={'Content-Type': 'application/json'}
)

response_body = response.body

最後に、レスポンスからデータフレームを起こします。

df_pd_sql = pd.DataFrame(response_body['rows'], columns=[col['name'] for col in response_body['columns']])

RStudioではPandasのデータフレームも直接Viewerで閲覧できますが、さらに加工するするにはtibbleに変換したいですね。

Reticulateパッケージを使うと、R・Python間でオブジェクトが相互参照できます。 RからPythonオブジェクトを呼べる変数がpy、逆がrです。

df_sql <- py$df_pd_sql |> as_tibble()
df_sql
timestamp user_name operation workload
2023-04-24T21:44:20.000Z UserLoggedIn AzureActiveDirectory
2023-04-24T21:44:16.000Z UserLoggedIn AzureActiveDirectory
2023-04-24T21:44:03.000Z UserLoginFailed AzureActiveDirectory
2023-04-24T21:43:59.000Z UserLoginFailed AzureActiveDirectory
2023-04-24T21:42:49.000Z UserLoggedIn AzureActiveDirectory

timestampが文字列型になっています。これは不便なので、POSIXctになるようにしておきましょう。

df_sql <- py$df_pd_sql |> as_tibble() |> mutate(timestamp = as_datetime(timestamp))
df_sql
timestamp user_name operation workload
2023-04-24 21:44:20 UserLoggedIn AzureActiveDirectory
2023-04-24 21:44:16 UserLoggedIn AzureActiveDirectory
2023-04-24 21:44:03 UserLoginFailed AzureActiveDirectory
2023-04-24 21:43:59 UserLoginFailed AzureActiveDirectory
2023-04-24 21:42:49 UserLoggedIn AzureActiveDirectory

方法2: Eland Python Client

次に、Eland Python Clientの利用を検討します。 ElandElkから連想された単語だと思うのですが、シカ科ではなくウシ科であることを、いま知りました。

天下り式ですが、先の4つのフィールドが含まれるデータフレームを取得するコードを記します。

import eland as ed
import pandas as pd
from elasticsearch import Elasticsearch
es = Elasticsearch(['http://localhost:9200'])

df_eld = ed.DataFrame(es_client=es, 
                  es_index_pattern="microsoft365-*",
                  columns=["@timestamp", "user_name", "operation", "workload"]
                  )

df_eld.head(5)
@timestamp user_name operation workload
UOArpo8Bf1qR3yD0uIPy 2023-04-24 21:44:20+00:00 Luis@pymtechlabs.com UserLoggedIn AzureActiveDirectory
ZeArpo8Bf1qR3yD0uIJt 2023-05-06 22:52:23+00:00 admin@pymtechlabs.com SharingSet OneDrive
UeArpo8Bf1qR3yD0uIPy 2023-04-24 21:44:16+00:00 Luis@pymtechlabs.com UserLoggedIn AzureActiveDirectory
ZuArpo8Bf1qR3yD0uIJt 2023-05-06 22:52:23+00:00 admin@pymtechlabs.com SecureLinkCreated OneDrive
UuArpo8Bf1qR3yD0uIPy 2023-04-24 21:44:03+00:00 Luis@pymtechlabs.com UserLoginFailed AzureActiveDirectory

5 rows × 4 columns

このデータフレームはPandasに似せてはいますが、Elasticsearch内部のものです。言い方を変えると、このデータフレームに対する操作は、ローカルではなく Elasticsearchの中で行われます。

使えるメソッドがReferenceに掲載されています。時刻範囲を狭めてみましょう。

start_time = pd.to_datetime("2023-04-22T00:00:00.000Z")
end_time = pd.to_datetime("2023-04-26T00:00:00.000Z")

df_eld_filtered = df_eld[
    (df_eld["@timestamp"] >= start_time) & 
    (df_eld["@timestamp"] < end_time)
]

Pandasの全メソッドが使えるわけではありません。私はフィールド名「@timestamp」を「timestamp」に変えたかったのですが、 rename()もassign()も使えず、よい方法を見つけられませんでした。 ElandのデータフレームをPandasデータフレームに変換して対処することになりそうです。

df_eld_pd = ed.eland_to_pandas(df_eld_filtered, show_progress=False)
df_eld_pd = df_eld_pd.rename(columns={"@timestamp": "timestamp"})
df_eld_pd.info()
## <class 'pandas.core.frame.DataFrame'>
## Index: 121 entries, UOArpo8Bf1qR3yD0uIPy to u-Arpo8Bf1qR3yD0WoGO
## Data columns (total 4 columns):
##  #   Column     Non-Null Count  Dtype              
## ---  ------     --------------  -----              
##  0   timestamp  121 non-null    datetime64[ns, UTC]
##  1   user_name  121 non-null    object             
##  2   operation  121 non-null    object             
##  3   workload   121 non-null    object             
## dtypes: datetime64[ns, UTC](1), object(3)
## memory usage: 4.7+ KB

この手法でPandasのデータフレームに変換すると、timestampの型がdatetime64なんですね。では、古いものから5行を抽出してみます。

df_eld_pd.sort_values('timestamp').head(5)
##                                      timestamp  ...              workload
## beArpo8Bf1qR3yD0uIPy 2023-04-22 21:12:46+00:00  ...  AzureActiveDirectory
## bOArpo8Bf1qR3yD0uIPy 2023-04-22 21:12:49+00:00  ...  AzureActiveDirectory
## a-Arpo8Bf1qR3yD0uIPy 2023-04-22 21:14:16+00:00  ...  AzureActiveDirectory
## auArpo8Bf1qR3yD0uIPy 2023-04-22 21:14:20+00:00  ...  AzureActiveDirectory
## jOArpo8Bf1qR3yD0uIPy 2023-04-22 22:58:29+00:00  ...  AzureActiveDirectory
## 
## [5 rows x 4 columns]

df_eld_pdをRのデータフレームに持ってきます。

df_elad_r <- as_tibble(py$df_eld_pd)
df_elad_r |> 
  arrange(timestamp) |> 
  slice_head(n = 5)
timestamp user_name operation workload
2023-04-23 06:12:46 UserLoggedIn AzureActiveDirectory
2023-04-23 06:12:49 UserLoggedIn AzureActiveDirectory
2023-04-23 06:14:16 UserLoggedIn AzureActiveDirectory
2023-04-23 06:14:20 UserLoggedIn AzureActiveDirectory
2023-04-23 07:58:29 UserLoginFailed AzureActiveDirectory

時刻が9時間ずれているのは、私の手元のRStudio環境のTZがAsia/Tokyoだからです。ログ分析をするときには、UTCに固定したほうがよいでしょう。

Sys.setenv(TZ = "UTC")
df_elad_r |> 
  arrange(timestamp) |> 
  slice_head(n = 5)
timestamp user_name operation workload
2023-04-22 21:12:46 UserLoggedIn AzureActiveDirectory
2023-04-22 21:12:49 UserLoggedIn AzureActiveDirectory
2023-04-22 21:14:16 UserLoggedIn AzureActiveDirectory
2023-04-22 21:14:20 UserLoggedIn AzureActiveDirectory
2023-04-22 22:58:29 UserLoginFailed AzureActiveDirectory

これでUTCで表示されるようになりました。

Elandの制約

一見すると、Elasticsearch上で一定のデータ操作が行え、時刻の型が保持される Elandのほうが便利な気がします。ただ、現時点ではElandには互換性の課題があります。「Supports Python 3.8+ and Pandas 1.5」とあるとおり、Pandas 2系が使えないのです。このため、Pandas 2系を必要とする他のモジュールと同一仮想環境で共存させることができません。

おわりに

以上、2つの方法でElasticsearchのインデックスをRのデータフレームとして取得する方法を見てきました。(使った言語はほとんどPythonでしたが。)

私は数日前からFOR509の勉強をはじめ、ひさしぶりにSOF-ELKを触ったのですが、 Lab 1.1で「これはつらい」という感想を抱きました。といってSplunkに移すのも芸がないので、SOF-ELKとR/Pythonとが共存できる手法を検討し、整理しておくことにしました。

私としては効果も効率も向上しましたが、資格試験で使用する端末では環境が固定されていてElaticsearch SQLもElandも使えないでしょうから、 SOF-ELKを使った分析にも慣れておく必要があります。