데이터 파이프라인 구축
실시간 데이터 수집, 처리, 저장 파이프라인을 구축합니다.
🎯 학습 목표
- 실시간 데이터 수집
- Redis 캐시 활용
- InfluxDB 시계열 저장
- Kafka 이벤트 스트리밍
예상 소요 시간: 45분
📋 사전 요구사항
- 첫 번째 드라이버 튜토리얼 완료
- Docker 및 Docker Compose
- Caffeine.Infrastructure 패키지
Step 1: 인프라 설정
docker-compose.yml 생성:
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
influxdb:
image: influxdb:2.7
ports:
- "8086:8086"
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=password123
- DOCKER_INFLUXDB_INIT_ORG=nexcode
- DOCKER_INFLUXDB_INIT_BUCKET=caffeine
volumes:
- influxdb-data:/var/lib/influxdb2
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
volumes:
redis-data:
influxdb-data:
# 인프라 시작
docker-compose up -d
# 상태 확인
docker-compose ps
Step 2: 데이터 파이프라인 구현
DataPipeline.cs:
using Caffeine.Core.Models;
using Caffeine.Infrastructure.Repositories;
using Caffeine.Infrastructure.Messaging;
using Microsoft.Extensions.Logging;
namespace DataPipelineTutorial;
public class DataPipeline
{
private readonly RedisRepository _redis;
private readonly InfluxDbRepository _influx;
private readonly KafkaProducer _kafka;
private readonly ILogger<DataPipeline> _logger;
public DataPipeline(
RedisRepository redis,
InfluxDbRepository influx,
KafkaProducer kafka,
ILogger<DataPipeline> logger)
{
_redis = redis;
_influx = influx;
_kafka = kafka;
_logger = logger;
}
public async Task ProcessTagValueAsync(TagValue tagValue)
{
_logger.LogInformation("처리 중: {TagName} = {Value}",
tagValue.TagName, tagValue.Value);
try
{
// 1. Redis 캐시 업데이트 (실시간 조회용)
await UpdateCacheAsync(tagValue);
// 2. InfluxDB 저장 (장기 보관용)
await SaveToTimeSeriesAsync(tagValue);
// 3. Kafka 이벤트 발행 (외부 시스템 연동용)
await PublishEventAsync(tagValue);
// 4. 알람 체크
await CheckAlarmsAsync(tagValue);
_logger.LogInformation("✅ 처리 완료: {TagName}", tagValue.TagName);
}
catch (Exception ex)
{
_logger.LogError(ex, "❌ 처리 실패: {TagName}", tagValue.TagName);
}
}
private async Task UpdateCacheAsync(TagValue tagValue)
{
// Redis에 최신 값 저장 (5분 TTL)
await _redis.SetAsync(
$"tags:{tagValue.TagName}",
tagValue,
TimeSpan.FromMinutes(5)
);
// Pub/Sub으로 실시간 구독자에게 알림
await _redis.PublishAsync("tag-changes", tagValue);
}
private async Task SaveToTimeSeriesAsync(TagValue tagValue)
{
// InfluxDB에 시계열 데이터 저장
await _influx.WriteTagAsync(tagValue, bucket: "caffeine");
}
private async Task PublishEventAsync(TagValue tagValue)
{
// Kafka 토픽에 이벤트 발행
await _kafka.ProduceAsync(
topic: "tag-events",
key: tagValue.TagName,
value: tagValue
);
}
private async Task CheckAlarmsAsync(TagValue tagValue)
{
// 알람 조건 체크
if (tagValue.TagName.Contains("Temperature") &&
tagValue.Value is double temp && temp > 80)
{
var alarm = new
{
TagName = tagValue.TagName,
Message = $"고온 경고: {temp}°C",
Severity = "High",
Timestamp = DateTime.UtcNow
};
await _kafka.ProduceAsync("alarms", tagValue.TagName, alarm);
_logger.LogWarning("⚠️ 알람 발생: {Message}", alarm.Message);
}
}
}
Step 3: 데이터 수집기 구현
DataCollector.cs:
using Caffeine.Core.Abstractions;
using Caffeine.Core.Models;
using Microsoft.Extensions.Logging;
namespace DataPipelineTutorial;
public class DataCollector
{
private readonly IDriver _driver;
private readonly DataPipeline _pipeline;
private readonly ILogger<DataCollector> _logger;
private readonly List<string> _tagNames;
private readonly int _scanInterval;
public DataCollector(
IDriver driver,
DataPipeline pipeline,
ILogger<DataCollector> logger,
List<string> tagNames,
int scanInterval = 1000)
{
_driver = driver;
_pipeline = pipeline;
_logger = logger;
_tagNames = tagNames;
_scanInterval = scanInterval;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("데이터 수집 시작");
// 드라이버 연결
await _driver.ConnectAsync(cancellationToken);
while (!cancellationToken.IsCancellationRequested)
{
try
{
// 모든 태그 읽기
foreach (var tagName in _tagNames)
{
var tagValue = await _driver.ReadTagAsync(tagName, cancellationToken);
if (tagValue != null && tagValue.Quality == TagQuality.Good)
{
// 파이프라인으로 전달
await _pipeline.ProcessTagValueAsync(tagValue);
}
}
// 스캔 주기 대기
await Task.Delay(_scanInterval, cancellationToken);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "데이터 수집 오류");
await Task.Delay(5000, cancellationToken); // 5초 대기 후 재시도
}
}
await _driver.DisconnectAsync();
_logger.LogInformation("데이터 수집 종료");
}
}
Step 4: 프로그램 구성
Program.cs:
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using InfluxDB.Client;
using Confluent.Kafka;
using DataPipelineTutorial;
// 서비스 컨테이너 구성
var services = new ServiceCollection();
// 로깅
services.AddLogging(builder => builder.AddConsole());
// Redis
var redis = ConnectionMultiplexer.Connect("localhost:6379");
services.AddSingleton(redis);
services.AddSingleton<RedisRepository>();
// InfluxDB
var influxClient = new InfluxDBClient(new InfluxDBClientOptions
{
Url = "http://localhost:8086",
Token = "your-token",
Org = "nexcode",
Bucket = "caffeine"
});
services.AddSingleton(influxClient);
services.AddSingleton<InfluxDbRepository>();
// Kafka
var kafkaConfig = new ProducerConfig
{
BootstrapServers = "localhost:9092"
};
services.AddSingleton(kafkaConfig);
services.AddSingleton<KafkaProducer>();
// 파이프라인
services.AddSingleton<DataPipeline>();
// 드라이버 (예: MyCustomDriver)
services.AddSingleton<IDriver, MyCustomDriver>();
// 수집기
services.AddSingleton<DataCollector>();
var provider = services.BuildServiceProvider();
// 실행
var collector = provider.GetRequiredService<DataCollector>();
var tagNames = new List<string>
{
"Equipment1.Temperature",
"Equipment1.Pressure",
"Equipment1.Flow"
};
using var cts = new CancellationTokenSource();
Console.CancelKeyPress += (s, e) =>
{
e.Cancel = true;
cts.Cancel();
};
await collector.StartAsync(cts.Token);
Step 5: 실행 및 검증
# 실행
dotnet run
예상 출력:
데이터 수집 시작
처리 중: Equipment1.Temperature = 72.5
✅ 처리 완료: Equipment1.Temperature
처리 중: Equipment1.Pressure = 101.3
✅ 처리 완료: Equipment1.Pressure
처리 중: Equipment1.Flow = 45.2
✅ 처리 완료: Equipment1.Flow
⚠️ 알람 발생: 고온 경고: 85.2°C
Redis 확인
# Redis CLI 접속
docker exec -it <redis-container> redis-cli
# 캐시된 데이터 확인
> GET tags:Equipment1.Temperature
> KEYS tags:*
InfluxDB 확인
# InfluxDB UI 접속
http://localhost:8086
# Flux 쿼리
from(bucket: "caffeine")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "tags")
Kafka 확인
# Kafka 토픽 확인
docker exec -it <kafka-container> kafka-topics --list --bootstrap-server localhost:9092
# 메시지 확인
docker exec -it <kafka-container> kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic tag-events \
--from-beginning
🎓 배운 내용
1. 다층 데이터 저장
2. 데이터 파이프라인 패턴
- Hot Path: Redis (실시간)
- Cold Path: InfluxDB (장기 보관)
- Event Stream: Kafka (이벤트 기반)
3. 성능 최적화
- 비동기 처리
- 배치 쓰기
- 캐시 활용
🚀 다음 단계
다음 튜토리얼:
추가 학습:
- Caffeine.Infrastructure API - 전체 API
- Docker 배포 가이드 - 컨테이너화
완료 시간: 약 45분
난이도: ⭐⭐⭐ 고급
이전: ← 첫 번째 드라이버 | 다음: 엣지 애플리케이션 →