Skip to content

安全に使えるユースケース

BIシステムで安全に実装できるユースケースと、その実装方法を詳しく解説します。

ユースケース:

複数のデータソースからデータを抽出し、変換してデータウェアハウスに読み込む。

# ✅ 良い例: エラーハンドリングとトランザクションを含む
import logging
from contextlib import contextmanager
logger = logging.getLogger(__name__)
@contextmanager
def get_connection():
"""データベース接続を取得し、確実に解放する"""
conn = connection_pool.get_connection()
try:
yield conn
finally:
connection_pool.return_connection(conn)
def etl_process():
"""ETLプロセスのメイン処理"""
try:
logger.info("ETL process started")
# 1. データを抽出
raw_data = extract_data()
logger.info(f"Extracted {len(raw_data)} records")
# 2. データを変換
transformed_data = transform_data(raw_data)
logger.info(f"Transformed {len(transformed_data)} records")
# 3. トランザクション内でデータを読み込み
with get_connection() as conn:
with conn.transaction():
# 既存のデータを削除
delete_existing_data(conn)
# 新しいデータを読み込み
load_data(conn, transformed_data)
logger.info(f"Loaded {len(transformed_data)} records")
logger.info("ETL process completed successfully")
except Exception as e:
logger.error(f"ETL process failed: {e}", exc_info=True)
# エラー通知を送信
send_error_notification(e)
raise
def extract_data():
"""データを抽出"""
data = []
for source in data_sources:
try:
with get_connection() as conn:
source_data = extract_from_source(conn, source)
data.extend(source_data)
except Exception as e:
logger.error(f"Failed to extract from {source}: {e}")
# 個別のエラーは記録するが、処理は続行
continue
return data
def transform_data(raw_data):
"""データを変換"""
transformed = []
for record in raw_data:
try:
transformed_record = {
'id': record['id'],
'name': record['name'].upper(),
'value': float(record['value']),
'created_at': parse_date(record['created_at']),
}
transformed.append(transformed_record)
except Exception as e:
logger.warning(f"Failed to transform record {record.get('id')}: {e}")
# 変換に失敗したレコードはスキップ
continue
return transformed
def load_data(conn, data):
"""データを読み込み"""
batch_size = 1000
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
conn.execute(
"INSERT INTO data_warehouse (id, name, value, created_at) VALUES (?, ?, ?, ?)",
[(r['id'], r['name'], r['value'], r['created_at']) for r in batch]
)

安全なポイント:

  • エラーハンドリング: try-catchでエラーを捕捉
  • トランザクション: トランザクション内でデータを読み込み
  • 接続管理: 接続を確実に解放
  • バッチ処理: 大量のデータをバッチで処理
  • ログ出力: 処理状況をログに記録

ユースケース:

複数のデータソースから並列にデータを取得し、処理する。

# ✅ 良い例: 非同期処理で並列実行
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def parallel_etl_process():
"""並列ETL処理"""
try:
logger.info("Parallel ETL process started")
# すべてのデータソースを並列に処理
tasks = [process_source_async(source) for source in data_sources]
results = await asyncio.gather(*tasks, return_exceptions=True)
# エラーをチェック
errors = []
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Source {data_sources[i]} processing failed: {result}")
errors.append((data_sources[i], result))
if errors:
# エラー通知を送信
send_error_notification(errors)
logger.info("Parallel ETL process completed")
except Exception as e:
logger.error(f"Parallel ETL process failed: {e}", exc_info=True)
send_error_notification(e)
raise
async def process_source_async(source):
"""データソースを非同期に処理"""
try:
# データを抽出
data = await extract_data_async(source)
# データを変換
transformed_data = transform_data(data)
# データを読み込み
await load_data_async(transformed_data)
logger.info(f"Source {source} processed successfully")
except Exception as e:
logger.error(f"Failed to process source {source}: {e}")
raise
async def extract_data_async(source):
"""データを非同期に抽出"""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
data = await loop.run_in_executor(
executor,
extract_from_source_sync,
source
)
return data
def extract_from_source_sync(source):
"""データを同期に抽出(実際の実装)"""
with get_connection() as conn:
return extract_from_source(conn, source)

安全なポイント:

  • 並列処理: 複数のデータソースを並列に処理
  • エラーハンドリング: 個別のエラーが全体に影響しない
  • リソース管理: スレッドプールでリソースを管理
  • ログ出力: 処理状況をログに記録

ユースケース:

既存のデータを更新する際、増分データのみを処理する。

# ✅ 良い例: 増分データ更新
def incremental_update():
"""増分データ更新"""
try:
logger.info("Incremental update started")
# 最後の更新時刻を取得
last_update_time = get_last_update_time()
logger.info(f"Last update time: {last_update_time}")
# 更新されたデータのみを抽出
updated_data = extract_updated_data(last_update_time)
logger.info(f"Extracted {len(updated_data)} updated records")
if len(updated_data) == 0:
logger.info("No updates found")
return
# データを変換
transformed_data = transform_data(updated_data)
# データを更新(UPSERT)
with get_connection() as conn:
with conn.transaction():
upsert_data(conn, transformed_data)
# 更新時刻を記録
update_last_update_time(conn, datetime.now())
logger.info("Incremental update completed successfully")
except Exception as e:
logger.error(f"Incremental update failed: {e}", exc_info=True)
send_error_notification(e)
raise
def upsert_data(conn, data):
"""データをUPSERT(存在する場合は更新、存在しない場合は挿入)"""
batch_size = 1000
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
for record in batch:
conn.execute(
"""
INSERT INTO data_warehouse (id, name, value, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
name = excluded.name,
value = excluded.value,
updated_at = excluded.updated_at
""",
(record['id'], record['name'], record['value'], record['updated_at'])
)

安全なポイント:

  • 増分更新: 更新されたデータのみを処理
  • UPSERT: 存在する場合は更新、存在しない場合は挿入
  • トランザクション: トランザクション内でデータを更新
  • 更新時刻の記録: 最後の更新時刻を記録

ユースケース:

データの品質をチェックし、問題があるデータを検出する。

# ✅ 良い例: データ品質チェック
def data_quality_check(data):
"""データ品質チェック"""
errors = []
warnings = []
for record in data:
# 必須フィールドのチェック
if not record.get('id'):
errors.append({
'record': record,
'field': 'id',
'error': 'Missing required field',
})
continue
# データ型のチェック
if not isinstance(record.get('value'), (int, float)):
errors.append({
'record': record,
'field': 'value',
'error': 'Invalid data type',
})
continue
# 値の範囲チェック
if record.get('value') < 0:
warnings.append({
'record': record,
'field': 'value',
'warning': 'Negative value',
})
# 重複チェック
if is_duplicate(record['id']):
warnings.append({
'record': record,
'field': 'id',
'warning': 'Duplicate ID',
})
# エラーと警告をログに記録
if errors:
logger.error(f"Data quality check found {len(errors)} errors")
for error in errors:
logger.error(f"Error: {error}")
if warnings:
logger.warning(f"Data quality check found {len(warnings)} warnings")
for warning in warnings:
logger.warning(f"Warning: {warning}")
# エラーがある場合は例外を発生
if errors:
raise DataQualityError(f"Data quality check failed: {len(errors)} errors")
return {
'errors': errors,
'warnings': warnings,
}

安全なポイント:

  • データ品質チェック: データの品質をチェック
  • エラー検出: 問題があるデータを検出
  • 警告: 警告レベルの問題を記録
  • ログ出力: エラーと警告をログに記録

BIシステムで安全に実装できるユースケースは、エラーハンドリング、トランザクション管理、リソース管理を考慮した設計が重要です。

重要なポイント:

  • ETLパイプライン: エラーハンドリングとトランザクションを含む
  • 並列処理: 非同期処理でパフォーマンスを向上
  • 増分更新: 更新されたデータのみを処理
  • データ品質チェック: データの品質をチェック

これらのベストプラクティスを守ることで、堅牢で効率的なBIシステムを構築できます。