Skip to content

障害時に起きること

BIシステムで障害が発生した際のシナリオを詳しく解説します。

シナリオ1: データベース接続プールの枯渇

Section titled “シナリオ1: データベース接続プールの枯渇”
時刻: 2024-01-01 10:00:00
状況: 複数のETLプロセスが同時実行中
10:00:00.000 - ETLプロセス1開始(接続取得: 1/10)
10:00:00.100 - ETLプロセス2開始(接続取得: 2/10)
10:00:00.200 - ETLプロセス3開始(接続取得: 3/10)
...
10:00:01.000 - ETLプロセス10開始(接続取得: 10/10)
10:00:01.100 - ETLプロセス11開始(接続取得待機)
10:00:01.200 - ETLプロセス12開始(接続取得待機)
...
10:00:30.000 - 接続が解放されず、すべてのプロセスが待機状態
10:00:30.100 - タイムアウトエラーが大量発生
10:00:30.200 - システム全体が応答不能

実際のコード:

# ❌ 問題のあるコード
def etl_process():
for source in data_sources:
conn = create_connection() # 接続が閉じられない
data = extract_data(conn, source)
transformed_data = transform_data(data)
load_data(conn, transformed_data)
# 問題: 接続が閉じられず、接続プールが枯渇する

障害の影響:

  1. 接続プールの枯渇: 接続が解放されず、接続プールが枯渇する
  2. プロセスの待機: 新しいプロセスが接続を取得できず、待機状態になる
  3. タイムアウトエラー: 接続の取得待ちがタイムアウトし、エラーが発生する
  4. システム全体の停止: 接続プールが枯渇し、システム全体が応答不能になる

解決策:

# ✅ 解決策: contextmanagerで接続を管理
from contextlib import contextmanager
@contextmanager
def get_connection():
"""データベース接続を取得し、確実に解放する"""
conn = connection_pool.get_connection()
try:
yield conn
finally:
connection_pool.return_connection(conn)
def etl_process():
"""ETLプロセスのメイン処理"""
for source in data_sources:
with get_connection() as conn:
data = extract_data(conn, source)
transformed_data = transform_data(data)
load_data(conn, transformed_data)
# 接続が確実に解放される

シナリオ2: メモリ不足によるクラッシュ

Section titled “シナリオ2: メモリ不足によるクラッシュ”
時刻: 2024-01-01 10:00:00
状況: 大量のデータ処理中
10:00:00.000 - ETLプロセス開始
10:00:01.000 - データ抽出開始(10GBのデータ)
10:00:30.000 - メモリ使用量: 8GB(メモリ不足の兆候)
10:01:00.000 - ガベージコレクションが頻発
10:01:30.000 - メモリ使用量: 10GB(メモリ不足)
10:02:00.000 - OutOfMemoryError発生
10:02:00.100 - プロセスがクラッシュ
10:02:00.200 - 処理が中途半端な状態で終了

実際のコード:

# ❌ 問題のあるコード
def etl_process():
# 10GBのデータを一度にメモリに読み込む
data = extract_all_data() # メモリ不足
transformed_data = transform_data(data)
load_data(transformed_data)

障害の影響:

  1. メモリ不足: 大量のデータをメモリに読み込むと、メモリ不足が発生する
  2. ガベージコレクションの頻発: メモリ不足により、ガベージコレクションが頻発する
  3. パフォーマンスの低下: ガベージコレクションにより、パフォーマンスが低下する
  4. プロセスのクラッシュ: メモリ不足により、プロセスがクラッシュする

解決策:

# ✅ 解決策: バッチ処理でデータを分割
def etl_process():
"""ETLプロセスのメイン処理(バッチ処理)"""
batch_size = 10000
offset = 0
while True:
# バッチでデータを取得
batch = extract_data_batch(offset, batch_size)
if len(batch) == 0:
break
# バッチを変換
transformed_batch = transform_data(batch)
# バッチを読み込み
load_data_batch(transformed_batch)
offset += batch_size
# メモリを解放
del batch
del transformed_batch

シナリオ3: データの整合性の破綻

Section titled “シナリオ3: データの整合性の破綻”
時刻: 2024-01-01 10:00:00
状況: 複数のデータソースからデータを取得中
10:00:00.000 - データソース1からデータ取得開始
10:00:01.000 - データソース2からデータ取得開始
10:00:02.000 - データソース1からデータ取得完了(UTC)
10:00:03.000 - データソース2からデータ取得完了(JST)
10:00:04.000 - データを結合(タイムゾーンが異なる)
10:00:05.000 - データを読み込み
10:00:06.000 - データウェアハウスに不正なデータが保存される
10:00:07.000 - ダッシュボードで不正なデータが表示される

実際のコード:

# ❌ 問題のあるコード
def etl_process():
# 複数のデータソースからデータを取得
data1 = extract_from_source1() # UTC
data2 = extract_from_source2() # JST
# 問題: タイムゾーンが異なるため、データの整合性が保たれない
combined_data = data1 + data2
load_data(combined_data)

障害の影響:

  1. データの整合性の破綻: タイムゾーンが異なるため、データの整合性が保たれない
  2. 分析結果の誤り: 不正なデータにより、分析結果が誤る
  3. 意思決定の誤り: 誤った分析結果に基づいて、誤った意思決定が行われる

解決策:

# ✅ 解決策: タイムゾーンを統一
from datetime import datetime
import pytz
def etl_process():
"""ETLプロセスのメイン処理(タイムゾーン統一)"""
# UTCに統一
utc = pytz.UTC
# データソース1からデータ取得(UTC)
data1 = extract_from_source1()
for record in data1:
if record.get('created_at'):
record['created_at'] = utc.localize(record['created_at'])
# データソース2からデータ取得(JST → UTC)
data2 = extract_from_source2()
jst = pytz.timezone('Asia/Tokyo')
for record in data2:
if record.get('created_at'):
# JSTからUTCに変換
jst_time = jst.localize(record['created_at'])
record['created_at'] = jst_time.astimezone(utc)
# データを結合(タイムゾーンが統一されている)
combined_data = data1 + data2
load_data(combined_data)
時刻: 2024-01-01 10:00:00
状況: データ品質チェックなしでデータを読み込み中
10:00:00.000 - ETLプロセス開始
10:00:01.000 - データ抽出開始
10:00:30.000 - データ変換完了(不正なデータを含む)
10:01:00.000 - データ読み込み完了
10:01:01.000 - データウェアハウスに不正なデータが保存される
10:02:00.000 - ダッシュボードで不正なデータが表示される
10:03:00.000 - 意思決定者が誤ったデータに基づいて判断

実際のコード:

# ❌ 問題のあるコード
def etl_process():
data = extract_data()
transformed_data = transform_data(data)
load_data(transformed_data)
# 問題: データの品質をチェックしない
# 問題: 不正なデータが読み込まれる可能性がある

障害の影響:

  1. 不正なデータの読み込み: データの品質をチェックしないため、不正なデータが読み込まれる
  2. データ不整合: 不正なデータにより、データの整合性が保たれない
  3. 分析結果の誤り: 不正なデータにより、分析結果が誤る
  4. 意思決定の誤り: 誤った分析結果に基づいて、誤った意思決定が行われる

解決策:

# ✅ 解決策: データ品質チェックを実装
def etl_process():
"""ETLプロセスのメイン処理(データ品質チェック)"""
# データを抽出
data = extract_data()
# データ品質チェック
quality_result = data_quality_check(data)
if quality_result['errors']:
logger.error(f"Data quality check failed: {len(quality_result['errors'])} errors")
raise DataQualityError(f"Data quality check failed: {len(quality_result['errors'])} errors")
if quality_result['warnings']:
logger.warning(f"Data quality check found {len(quality_result['warnings'])} warnings")
# データを変換
transformed_data = transform_data(data)
# データを読み込み
load_data(transformed_data)
def data_quality_check(data):
"""データ品質チェック"""
errors = []
warnings = []
for record in data:
# 必須フィールドのチェック
if not record.get('id'):
errors.append({
'record': record,
'field': 'id',
'error': 'Missing required field',
})
continue
# データ型のチェック
if not isinstance(record.get('value'), (int, float)):
errors.append({
'record': record,
'field': 'value',
'error': 'Invalid data type',
})
continue
# 値の範囲チェック
if record.get('value') < 0:
warnings.append({
'record': record,
'field': 'value',
'warning': 'Negative value',
})
return {
'errors': errors,
'warnings': warnings,
}

BIシステムで障害が発生した際は、データベース接続プールの枯渇、メモリ不足、データの整合性の破綻、データ品質の問題が主な原因です。

重要なポイント:

  • 接続管理: contextmanagerで確実に接続を解放する
  • メモリ管理: バッチ処理でメモリ使用量を制限する
  • データ整合性: タイムゾーンを統一し、データの整合性を保証する
  • データ品質チェック: データの品質をチェックし、不正なデータを検出する

これらの対策を実装することで、障害に強いBIシステムを構築できます。