본문으로 건너뛰기

데이터 파이프라인 구축

실시간 데이터 수집, 처리, 저장 파이프라인을 구축합니다.

🎯 학습 목표

  • 실시간 데이터 수집
  • Redis 캐시 활용
  • InfluxDB 시계열 저장
  • Kafka 이벤트 스트리밍

예상 소요 시간: 45분


📋 사전 요구사항


Step 1: 인프라 설정

docker-compose.yml 생성:

version: '3.8'

services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data

influxdb:
image: influxdb:2.7
ports:
- "8086:8086"
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=password123
- DOCKER_INFLUXDB_INIT_ORG=nexcode
- DOCKER_INFLUXDB_INIT_BUCKET=caffeine
volumes:
- influxdb-data:/var/lib/influxdb2

kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181

volumes:
redis-data:
influxdb-data:
# 인프라 시작
docker-compose up -d

# 상태 확인
docker-compose ps

Step 2: 데이터 파이프라인 구현

DataPipeline.cs:

using Caffeine.Core.Models;
using Caffeine.Infrastructure.Repositories;
using Caffeine.Infrastructure.Messaging;
using Microsoft.Extensions.Logging;

namespace DataPipelineTutorial;

public class DataPipeline
{
private readonly RedisRepository _redis;
private readonly InfluxDbRepository _influx;
private readonly KafkaProducer _kafka;
private readonly ILogger<DataPipeline> _logger;

public DataPipeline(
RedisRepository redis,
InfluxDbRepository influx,
KafkaProducer kafka,
ILogger<DataPipeline> logger)
{
_redis = redis;
_influx = influx;
_kafka = kafka;
_logger = logger;
}

public async Task ProcessTagValueAsync(TagValue tagValue)
{
_logger.LogInformation("처리 중: {TagName} = {Value}",
tagValue.TagName, tagValue.Value);

try
{
// 1. Redis 캐시 업데이트 (실시간 조회용)
await UpdateCacheAsync(tagValue);

// 2. InfluxDB 저장 (장기 보관용)
await SaveToTimeSeriesAsync(tagValue);

// 3. Kafka 이벤트 발행 (외부 시스템 연동용)
await PublishEventAsync(tagValue);

// 4. 알람 체크
await CheckAlarmsAsync(tagValue);

_logger.LogInformation("✅ 처리 완료: {TagName}", tagValue.TagName);
}
catch (Exception ex)
{
_logger.LogError(ex, "❌ 처리 실패: {TagName}", tagValue.TagName);
}
}

private async Task UpdateCacheAsync(TagValue tagValue)
{
// Redis에 최신 값 저장 (5분 TTL)
await _redis.SetAsync(
$"tags:{tagValue.TagName}",
tagValue,
TimeSpan.FromMinutes(5)
);

// Pub/Sub으로 실시간 구독자에게 알림
await _redis.PublishAsync("tag-changes", tagValue);
}

private async Task SaveToTimeSeriesAsync(TagValue tagValue)
{
// InfluxDB에 시계열 데이터 저장
await _influx.WriteTagAsync(tagValue, bucket: "caffeine");
}

private async Task PublishEventAsync(TagValue tagValue)
{
// Kafka 토픽에 이벤트 발행
await _kafka.ProduceAsync(
topic: "tag-events",
key: tagValue.TagName,
value: tagValue
);
}

private async Task CheckAlarmsAsync(TagValue tagValue)
{
// 알람 조건 체크
if (tagValue.TagName.Contains("Temperature") &&
tagValue.Value is double temp && temp > 80)
{
var alarm = new
{
TagName = tagValue.TagName,
Message = $"고온 경고: {temp}°C",
Severity = "High",
Timestamp = DateTime.UtcNow
};

await _kafka.ProduceAsync("alarms", tagValue.TagName, alarm);
_logger.LogWarning("⚠️ 알람 발생: {Message}", alarm.Message);
}
}
}

Step 3: 데이터 수집기 구현

DataCollector.cs:

using Caffeine.Core.Abstractions;
using Caffeine.Core.Models;
using Microsoft.Extensions.Logging;

namespace DataPipelineTutorial;

public class DataCollector
{
private readonly IDriver _driver;
private readonly DataPipeline _pipeline;
private readonly ILogger<DataCollector> _logger;
private readonly List<string> _tagNames;
private readonly int _scanInterval;

public DataCollector(
IDriver driver,
DataPipeline pipeline,
ILogger<DataCollector> logger,
List<string> tagNames,
int scanInterval = 1000)
{
_driver = driver;
_pipeline = pipeline;
_logger = logger;
_tagNames = tagNames;
_scanInterval = scanInterval;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("데이터 수집 시작");

// 드라이버 연결
await _driver.ConnectAsync(cancellationToken);

while (!cancellationToken.IsCancellationRequested)
{
try
{
// 모든 태그 읽기
foreach (var tagName in _tagNames)
{
var tagValue = await _driver.ReadTagAsync(tagName, cancellationToken);

if (tagValue != null && tagValue.Quality == TagQuality.Good)
{
// 파이프라인으로 전달
await _pipeline.ProcessTagValueAsync(tagValue);
}
}

// 스캔 주기 대기
await Task.Delay(_scanInterval, cancellationToken);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "데이터 수집 오류");
await Task.Delay(5000, cancellationToken); // 5초 대기 후 재시도
}
}

await _driver.DisconnectAsync();
_logger.LogInformation("데이터 수집 종료");
}
}

Step 4: 프로그램 구성

Program.cs:

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using InfluxDB.Client;
using Confluent.Kafka;
using DataPipelineTutorial;

// 서비스 컨테이너 구성
var services = new ServiceCollection();

// 로깅
services.AddLogging(builder => builder.AddConsole());

// Redis
var redis = ConnectionMultiplexer.Connect("localhost:6379");
services.AddSingleton(redis);
services.AddSingleton<RedisRepository>();

// InfluxDB
var influxClient = new InfluxDBClient(new InfluxDBClientOptions
{
Url = "http://localhost:8086",
Token = "your-token",
Org = "nexcode",
Bucket = "caffeine"
});
services.AddSingleton(influxClient);
services.AddSingleton<InfluxDbRepository>();

// Kafka
var kafkaConfig = new ProducerConfig
{
BootstrapServers = "localhost:9092"
};
services.AddSingleton(kafkaConfig);
services.AddSingleton<KafkaProducer>();

// 파이프라인
services.AddSingleton<DataPipeline>();

// 드라이버 (예: MyCustomDriver)
services.AddSingleton<IDriver, MyCustomDriver>();

// 수집기
services.AddSingleton<DataCollector>();

var provider = services.BuildServiceProvider();

// 실행
var collector = provider.GetRequiredService<DataCollector>();
var tagNames = new List<string>
{
"Equipment1.Temperature",
"Equipment1.Pressure",
"Equipment1.Flow"
};

using var cts = new CancellationTokenSource();
Console.CancelKeyPress += (s, e) =>
{
e.Cancel = true;
cts.Cancel();
};

await collector.StartAsync(cts.Token);

Step 5: 실행 및 검증

# 실행
dotnet run

예상 출력:

데이터 수집 시작
처리 중: Equipment1.Temperature = 72.5
✅ 처리 완료: Equipment1.Temperature
처리 중: Equipment1.Pressure = 101.3
✅ 처리 완료: Equipment1.Pressure
처리 중: Equipment1.Flow = 45.2
✅ 처리 완료: Equipment1.Flow
⚠️ 알람 발생: 고온 경고: 85.2°C

Redis 확인

# Redis CLI 접속
docker exec -it <redis-container> redis-cli

# 캐시된 데이터 확인
> GET tags:Equipment1.Temperature
> KEYS tags:*

InfluxDB 확인

# InfluxDB UI 접속
http://localhost:8086

# Flux 쿼리
from(bucket: "caffeine")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "tags")

Kafka 확인

# Kafka 토픽 확인
docker exec -it <kafka-container> kafka-topics --list --bootstrap-server localhost:9092

# 메시지 확인
docker exec -it <kafka-container> kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic tag-events \
--from-beginning

🎓 배운 내용

1. 다층 데이터 저장

2. 데이터 파이프라인 패턴

  • Hot Path: Redis (실시간)
  • Cold Path: InfluxDB (장기 보관)
  • Event Stream: Kafka (이벤트 기반)

3. 성능 최적화

  • 비동기 처리
  • 배치 쓰기
  • 캐시 활용

🚀 다음 단계

다음 튜토리얼:

추가 학습:


완료 시간: 약 45분
난이도: ⭐⭐⭐ 고급
이전: ← 첫 번째 드라이버 | 다음: 엣지 애플리케이션 →