SOF-ELKは Elasticスタックを用いたOSSのログ分析ツールであり、 SANS FOR572(Advanced Network Forensics and Analysis)や FOR509(Enterprise Cloud Forensics and Incident Response)の授業で採用されています。NetflowログからGCPログに至るまで、事前に定義されたディレクトリにファイルを格納するだけでインデックス化してくれる点が分析者にとって便利な点です。データの取り込みに関しては、非常に優秀だと言えます。
その反面、データ加工や可視化ツールとしては、SOF-ELKは快適ではありません。それはSOF-ELKというより、Kibanaの技術的制約です。 Kibanaのデフォルトのクエリ言語は表現力に乏しく(たとえばsummarizeできない)、データの把握には向いていません。また、多くの操作をGUIに依存しているので、手早い分析が困難なうえに、再利用がしづらいです。
そこで、MSTICPyのときと同様のアプローチを考えましょう。すなわちSOF-ELKのデータをRで分析できるようにするのです。
ElasticsearchのAPI(9200/tcp)にアクセスできる必要があります。 SOF-ELKの既定の設定では、APIアクセスはlocalhostに限定されています。たとえばSSHのローカルポート転送機能を使って、localhost:9200で Elasticsearchにアクセスできるようにしましょう。
私はいまFOR509を学習中なので、FOR509のSOF-ELKに接続しています。
$ ssh -L 9200:localhost:9200 elk_user@<IPADDRESS>
$ curl http://localhost:9200/
{
"name" : "sof-elk",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "_fZNncROSXCVh78fMMqjVg",
"version" : {
"number" : "8.7.1",
"build_flavor" : "default",
"build_type" : "rpm",
"build_hash" : "f229ed3f893a515d590d0f39b05f68913e2d9b53",
"build_date" : "2023-04-27T04:33:42.127815583Z",
"build_snapshot" : false,
"lucene_version" : "9.5.0",
"minimum_wire_compatibility_version" : "7.17.0",
"minimum_index_compatibility_version" : "7.0.0"
},
"tagline" : "You Know, for Search"
}
MSTICPyのときと同様に、 Reticulateパッケージを使って Pandasデータフレームを取得します。
Mambaforge(Miniforge3)が入っていることを前提に、以下のコマンドで「elastic」という名前のConda環境を作ります。 elandは Elasticが提供するPythonクライアントです。
mamba create -n elastic eland
RStudioでは、プロジェクトごとにPython実行環境を指定できます。〈Tools〉-〈Project Options…〉のPythonタブにて、先に作成したConda環境のPythonを選択します。
まず、インデックスの一覧を取得しましょう。 curlを使ってインデックス一覧を取得するときには、以下のようなリクエスト文字列を使うことが一般的に紹介されています。 SOF-ELKのM365モジュールは、月別にインデックスを生成するようですね。
$ curl -X GET "localhost:9200/_cat/indices?v"
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open microsoft365-2023.05 LaKC2SJhRiOc-0nm9A1FBg 1 0 531 0 1.5mb 1.5mb
green open microsoft365-2023.04 eNJpQW5XSY6DwJIQhFYKpw 1 0 121 0 568.1kb 568.1kb
しかしElasticsearchでは、 Elasticsearch SQLというSQLライクなクエリ言語が使えます。この機能を使ってインデックスの一覧を取得してみます。
$ curl -X POST "localhost:9200/_sql?format=txt" -H 'Content-Type: application/json' -d'
{
"query": "SHOW TABLES"
}'
catalog | name | type | kind
---------------+--------------------+---------------+---------------
elasticsearch |microsoft365-2023.04|TABLE |INDEX
elasticsearch |microsoft365-2023.05|TABLE |INDEX
内部動作では、Elasticsearch SQL → Query DSL → Lucene Queryのように変換されています。 Kibanaで使えるKQL(MicrosoftのKusutoとは異なる言語です)も、同じようにKQL → Query DSL → Lucene Queryの変換経路をたどります。
KibanaのDiscovery画面ではKQLかLuceneかの2択を迫られますが、使い慣れたSQLが使えることは、データ分析者にとっては非常に有用です。
では、microsoft365系のインデックスを対象に、 2023年4月22日から26日までの時間範囲で、 timestamp、user_name、operation、workloadの4つのフィールドを取得しましょう。さらに、画面上の都合で、5件だけを選択することにします。
まずpandasとelasticsearchをインポートして、次にSQL文を書きます。
import pandas as pd
from elasticsearch import Elasticsearch
sql = """
SELECT "@timestamp" AS timestamp, user_name, operation, workload
FROM "microsoft365-*"
WHERE timestamp BETWEEN '2023-04-22T00:00:00.000Z' AND '2023-04-26T00:00:00.000Z'
LIMIT 5
"""
KQLと違って、時刻範囲をクエリの中に直接書ける点が個人的には嬉しいです。上ではISO 8601形式で書いていますが、もっと緩く「2023-04-22」という日付単位でも動作します。
引用符の扱いに注意しましょう。SELECTやFROMの中で二重引用符を使っているのは、フィールド名のエスケープ文字としてSQL-92で定義されているからです。そして、WHERE句の中で一重引用符を使っているのは、文字列リテラルだからです。
では、このSQL文を本文に埋め込んで、Elasticに対してリクエストを投げます。 JSONを指定するヘッダーは必須です。
es = Elasticsearch(['http://localhost:9200'])
response = es.transport.perform_request(
'POST', '/_sql',
body={"query": sql},
headers={'Content-Type': 'application/json'}
)
response_body = response.body
最後に、レスポンスからデータフレームを起こします。
df_pd_sql = pd.DataFrame(response_body['rows'], columns=[col['name'] for col in response_body['columns']])
RStudioではPandasのデータフレームも直接Viewerで閲覧できますが、さらに加工するするにはtibbleに変換したいですね。
Reticulateパッケージを使うと、R・Python間でオブジェクトが相互参照できます。
RからPythonオブジェクトを呼べる変数がpy、逆がrです。
df_sql <- py$df_pd_sql |> as_tibble()
df_sql
| timestamp | user_name | operation | workload |
|---|---|---|---|
| 2023-04-24T21:44:20.000Z | Luis@pymtechlabs.com | UserLoggedIn | AzureActiveDirectory |
| 2023-04-24T21:44:16.000Z | Luis@pymtechlabs.com | UserLoggedIn | AzureActiveDirectory |
| 2023-04-24T21:44:03.000Z | Luis@pymtechlabs.com | UserLoginFailed | AzureActiveDirectory |
| 2023-04-24T21:43:59.000Z | Luis@pymtechlabs.com | UserLoginFailed | AzureActiveDirectory |
| 2023-04-24T21:42:49.000Z | Luis@pymtechlabs.com | UserLoggedIn | AzureActiveDirectory |
timestampが文字列型になっています。これは不便なので、POSIXctになるようにしておきましょう。
df_sql <- py$df_pd_sql |> as_tibble() |> mutate(timestamp = as_datetime(timestamp))
df_sql
| timestamp | user_name | operation | workload |
|---|---|---|---|
| 2023-04-24 21:44:20 | Luis@pymtechlabs.com | UserLoggedIn | AzureActiveDirectory |
| 2023-04-24 21:44:16 | Luis@pymtechlabs.com | UserLoggedIn | AzureActiveDirectory |
| 2023-04-24 21:44:03 | Luis@pymtechlabs.com | UserLoginFailed | AzureActiveDirectory |
| 2023-04-24 21:43:59 | Luis@pymtechlabs.com | UserLoginFailed | AzureActiveDirectory |
| 2023-04-24 21:42:49 | Luis@pymtechlabs.com | UserLoggedIn | AzureActiveDirectory |
次に、Eland Python Clientの利用を検討します。 Elandは Elkから連想された単語だと思うのですが、シカ科ではなくウシ科であることを、いま知りました。
天下り式ですが、先の4つのフィールドが含まれるデータフレームを取得するコードを記します。
import eland as ed
import pandas as pd
from elasticsearch import Elasticsearch
es = Elasticsearch(['http://localhost:9200'])
df_eld = ed.DataFrame(es_client=es,
es_index_pattern="microsoft365-*",
columns=["@timestamp", "user_name", "operation", "workload"]
)
df_eld.head(5)
| @timestamp | user_name | operation | workload | |
|---|---|---|---|---|
| UOArpo8Bf1qR3yD0uIPy | 2023-04-24 21:44:20+00:00 | Luis@pymtechlabs.com | UserLoggedIn | AzureActiveDirectory |
| ZeArpo8Bf1qR3yD0uIJt | 2023-05-06 22:52:23+00:00 | admin@pymtechlabs.com | SharingSet | OneDrive |
| UeArpo8Bf1qR3yD0uIPy | 2023-04-24 21:44:16+00:00 | Luis@pymtechlabs.com | UserLoggedIn | AzureActiveDirectory |
| ZuArpo8Bf1qR3yD0uIJt | 2023-05-06 22:52:23+00:00 | admin@pymtechlabs.com | SecureLinkCreated | OneDrive |
| UuArpo8Bf1qR3yD0uIPy | 2023-04-24 21:44:03+00:00 | Luis@pymtechlabs.com | UserLoginFailed | AzureActiveDirectory |
5 rows × 4 columns
このデータフレームはPandasに似せてはいますが、Elasticsearch内部のものです。言い方を変えると、このデータフレームに対する操作は、ローカルではなく Elasticsearchの中で行われます。
使えるメソッドがReferenceに掲載されています。時刻範囲を狭めてみましょう。
start_time = pd.to_datetime("2023-04-22T00:00:00.000Z")
end_time = pd.to_datetime("2023-04-26T00:00:00.000Z")
df_eld_filtered = df_eld[
(df_eld["@timestamp"] >= start_time) &
(df_eld["@timestamp"] < end_time)
]
Pandasの全メソッドが使えるわけではありません。私はフィールド名「@timestamp」を「timestamp」に変えたかったのですが、 rename()もassign()も使えず、よい方法を見つけられませんでした。 ElandのデータフレームをPandasデータフレームに変換して対処することになりそうです。
df_eld_pd = ed.eland_to_pandas(df_eld_filtered, show_progress=False)
df_eld_pd = df_eld_pd.rename(columns={"@timestamp": "timestamp"})
df_eld_pd.info()
## <class 'pandas.core.frame.DataFrame'>
## Index: 121 entries, UOArpo8Bf1qR3yD0uIPy to u-Arpo8Bf1qR3yD0WoGO
## Data columns (total 4 columns):
## # Column Non-Null Count Dtype
## --- ------ -------------- -----
## 0 timestamp 121 non-null datetime64[ns, UTC]
## 1 user_name 121 non-null object
## 2 operation 121 non-null object
## 3 workload 121 non-null object
## dtypes: datetime64[ns, UTC](1), object(3)
## memory usage: 4.7+ KB
この手法でPandasのデータフレームに変換すると、timestampの型がdatetime64なんですね。では、古いものから5行を抽出してみます。
df_eld_pd.sort_values('timestamp').head(5)
## timestamp ... workload
## beArpo8Bf1qR3yD0uIPy 2023-04-22 21:12:46+00:00 ... AzureActiveDirectory
## bOArpo8Bf1qR3yD0uIPy 2023-04-22 21:12:49+00:00 ... AzureActiveDirectory
## a-Arpo8Bf1qR3yD0uIPy 2023-04-22 21:14:16+00:00 ... AzureActiveDirectory
## auArpo8Bf1qR3yD0uIPy 2023-04-22 21:14:20+00:00 ... AzureActiveDirectory
## jOArpo8Bf1qR3yD0uIPy 2023-04-22 22:58:29+00:00 ... AzureActiveDirectory
##
## [5 rows x 4 columns]
df_eld_pdをRのデータフレームに持ってきます。
df_elad_r <- as_tibble(py$df_eld_pd)
df_elad_r |>
arrange(timestamp) |>
slice_head(n = 5)
| timestamp | user_name | operation | workload |
|---|---|---|---|
| 2023-04-23 06:12:46 | Luis@pymtechlabs.com | UserLoggedIn | AzureActiveDirectory |
| 2023-04-23 06:12:49 | Luis@pymtechlabs.com | UserLoggedIn | AzureActiveDirectory |
| 2023-04-23 06:14:16 | Luis@pymtechlabs.com | UserLoggedIn | AzureActiveDirectory |
| 2023-04-23 06:14:20 | Luis@pymtechlabs.com | UserLoggedIn | AzureActiveDirectory |
| 2023-04-23 07:58:29 | slang@pymtechlabs.com | UserLoginFailed | AzureActiveDirectory |
時刻が9時間ずれているのは、私の手元のRStudio環境のTZがAsia/Tokyoだからです。ログ分析をするときには、UTCに固定したほうがよいでしょう。
Sys.setenv(TZ = "UTC")
df_elad_r |>
arrange(timestamp) |>
slice_head(n = 5)
| timestamp | user_name | operation | workload |
|---|---|---|---|
| 2023-04-22 21:12:46 | Luis@pymtechlabs.com | UserLoggedIn | AzureActiveDirectory |
| 2023-04-22 21:12:49 | Luis@pymtechlabs.com | UserLoggedIn | AzureActiveDirectory |
| 2023-04-22 21:14:16 | Luis@pymtechlabs.com | UserLoggedIn | AzureActiveDirectory |
| 2023-04-22 21:14:20 | Luis@pymtechlabs.com | UserLoggedIn | AzureActiveDirectory |
| 2023-04-22 22:58:29 | slang@pymtechlabs.com | UserLoginFailed | AzureActiveDirectory |
これでUTCで表示されるようになりました。
一見すると、Elasticsearch上で一定のデータ操作が行え、時刻の型が保持される Elandのほうが便利な気がします。ただ、現時点ではElandには互換性の課題があります。「Supports Python 3.8+ and Pandas 1.5」とあるとおり、Pandas 2系が使えないのです。このため、Pandas 2系を必要とする他のモジュールと同一仮想環境で共存させることができません。
以上、2つの方法でElasticsearchのインデックスをRのデータフレームとして取得する方法を見てきました。(使った言語はほとんどPythonでしたが。)
私は数日前からFOR509の勉強をはじめ、ひさしぶりにSOF-ELKを触ったのですが、 Lab 1.1で「これはつらい」という感想を抱きました。といってSplunkに移すのも芸がないので、SOF-ELKとR/Pythonとが共存できる手法を検討し、整理しておくことにしました。
私としては効果も効率も向上しましたが、資格試験で使用する端末では環境が固定されていてElaticsearch SQLもElandも使えないでしょうから、 SOF-ELKを使った分析にも慣れておく必要があります。