Caffeine.Infrastructure API
Caffeine Framework 인프라 서비스 API 레퍼런스입니다.
개요
Caffeine.Infrastructure는 데이터 저장, 메시징, 로깅 등 인프라 서비스를 제공하는 모듈입니다.
네임스페이스: Caffeine.Infrastructure
어셈블리: Caffeine.Infrastructure.dll
.NET 버전: .NET 10.0
주요 클래스
RedisRepository
Redis 기반 실시간 데이터 캐시 저장소입니다.
네임스페이스: Caffeine.Infrastructure.Repositories
생성자
public RedisRepository(IConnectionMultiplexer redis, ILogger<RedisRepository> logger)
매개변수:
redis(IConnectionMultiplexer): Redis 연결logger(ILogger<RedisRepository>): 로거
주요 메서드
SetAsync()
Redis에 키-값 쌍을 비동기로 저장합니다.
Task<bool> SetAsync<T>(string key, T value, TimeSpan? expiry = null);
매개변수:
key(string): 키value(T): 저장할 값expiry(TimeSpan?): 만료 시간 (선택)
반환값: Task<bool> - 저장 성공 시 true
사용 예시:
var repository = new RedisRepository(redis, logger);
// 태그 값 저장 (1시간 TTL)
var tagValue = new TagValue("Equipment1.Temperature", 72.5);
await repository.SetAsync($"tags:{tagValue.TagName}", tagValue, TimeSpan.FromHours(1));
GetAsync()
Redis에서 값을 비동기로 가져옵니다.
Task<T?> GetAsync<T>(string key);
매개변수:
key(string): 키
반환값: Task<T?> - 값 또는 null
사용 예시:
var tagValue = await repository.GetAsync<TagValue>("tags:Equipment1.Temperature");
if (tagValue != null)
{
Console.WriteLine($"캐시된 온도: {tagValue.Value}°C");
}
DeleteAsync()
Redis에서 키를 삭제합니다.
Task<bool> DeleteAsync(string key);
사용 예시:
await repository.DeleteAsync("tags:Equipment1.Temperature");
PublishAsync()
Redis Pub/Sub으로 메시지를 발행합니다.
Task<long> PublishAsync<T>(string channel, T message);
사용 예시:
// 태그 변경 이벤트 발행
var tagValue = new TagValue("Equipment1.Temperature", 75.0);
await repository.PublishAsync("tag-changes", tagValue);
SubscribeAsync()
Redis Pub/Sub 채널을 구독합니다.
Task SubscribeAsync<T>(string channel, Action<T> handler);
사용 예시:
await repository.SubscribeAsync<TagValue>("tag-changes", (tagValue) =>
{
Console.WriteLine($"태그 변경: {tagValue.TagName} = {tagValue.Value}");
});
InfluxDbRepository
InfluxDB 기반 시계열 데이터 저장소입니다.
네임스페이스: Caffeine.Infrastructure.Repositories
생성자
public InfluxDbRepository(InfluxDBClient client, ILogger<InfluxDbRepository> logger)
주요 메서드
WriteTagAsync()
태그 값을 InfluxDB에 저장합니다.
Task WriteTagAsync(TagValue tagValue, string bucket = "caffeine");
매개변수:
tagValue(TagValue): 저장할 태그 값bucket(string): InfluxDB 버킷 이름
사용 예시:
var repository = new InfluxDbRepository(influxClient, logger);
var tagValue = new TagValue("Equipment1.Temperature", 72.5);
await repository.WriteTagAsync(tagValue);
WriteTagsAsync()
여러 태그 값을 배치로 저장합니다.
Task WriteTagsAsync(IEnumerable<TagValue> tagValues, string bucket = "caffeine");
사용 예시:
var tagValues = new List<TagValue>
{
new("Equipment1.Temperature", 72.5),
new("Equipment1.Pressure", 101.3),
new("Equipment1.Flow", 45.2)
};
await repository.WriteTagsAsync(tagValues);
QueryAsync()
Flux 쿼리로 데이터를 조회합니다.
Task<List<TagValue>> QueryAsync(string fluxQuery, string bucket = "caffeine");
사용 예시:
var query = @"
from(bucket: ""caffeine"")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == ""tags"")
|> filter(fn: (r) => r.tagName == ""Equipment1.Temperature"")
";
var results = await repository.QueryAsync(query);
foreach (var tagValue in results)
{
Console.WriteLine($"{tagValue.Timestamp}: {tagValue.Value}");
}
QueryRangeAsync()
시간 범위로 데이터를 조회합니다.
Task<List<TagValue>> QueryRangeAsync(string tagName, DateTime start, DateTime end, string bucket = "caffeine");
사용 예시:
var start = DateTime.UtcNow.AddHours(-24);
var end = DateTime.UtcNow;
var history = await repository.QueryRangeAsync("Equipment1.Temperature", start, end);
Console.WriteLine($"{history.Count}개 데이터 포인트 조회됨");
KafkaProducer
Kafka 메시지 프로듀서입니다.
네임스페이스: Caffeine.Infrastructure.Messaging
생성자
public KafkaProducer(ProducerConfig config, ILogger<KafkaProducer> logger)
주요 메서드
ProduceAsync()
Kafka 토픽에 메시지를 발행합니다.
Task<DeliveryResult<string, T>> ProduceAsync<T>(string topic, string key, T value);
매개변수:
topic(string): Kafka 토픽key(string): 메시지 키value(T): 메시지 값
사용 예시:
var producer = new KafkaProducer(config, logger);
var tagValue = new TagValue("Equipment1.Temperature", 72.5);
var result = await producer.ProduceAsync(
topic: "tag-events",
key: tagValue.TagName,
value: tagValue
);
Console.WriteLine($"메시지 발행: Partition {result.Partition}, Offset {result.Offset}");
ProduceBatchAsync()
여러 메시지를 배치로 발행합니다.
Task<List<DeliveryResult<string, T>>> ProduceBatchAsync<T>(string topic, Dictionary<string, T> messages);
사용 예시:
var messages = new Dictionary<string, TagValue>
{
["Equipment1.Temperature"] = new TagValue("Equipment1.Temperature", 72.5),
["Equipment1.Pressure"] = new TagValue("Equipment1.Pressure", 101.3)
};
var results = await producer.ProduceBatchAsync("tag-events", messages);
Console.WriteLine($"{results.Count}개 메시지 발행 완료");
예제
Redis 캐시 사용
using Caffeine.Infrastructure.Repositories;
using StackExchange.Redis;
public class TagCacheService
{
private readonly RedisRepository _redis;
public TagCacheService(IConnectionMultiplexer redis, ILogger<TagCacheService> logger)
{
_redis = new RedisRepository(redis, logger);
}
public async Task<TagValue?> GetOrFetchTagAsync(string tagName, Func<Task<TagValue>> fetchFunc)
{
// 1. 캐시 확인
var cached = await _redis.GetAsync<TagValue>($"tags:{tagName}");
if (cached != null)
{
return cached;
}
// 2. 캐시 미스 - 원본 데이터 가져오기
var tagValue = await fetchFunc();
// 3. 캐시에 저장 (1분 TTL)
await _redis.SetAsync($"tags:{tagName}", tagValue, TimeSpan.FromMinutes(1));
return tagValue;
}
}
InfluxDB 시계열 데이터 저장
using Caffeine.Infrastructure.Repositories;
public class TimeSeriesDataService
{
private readonly InfluxDbRepository _influx;
public TimeSeriesDataService(InfluxDBClient client, ILogger<TimeSeriesDataService> logger)
{
_influx = new InfluxDbRepository(client, logger);
}
public async Task SaveTagHistoryAsync(string tagName, List<TagValue> values)
{
// 배치로 저장 (성능 최적화)
await _influx.WriteTagsAsync(values);
Console.WriteLine($"{values.Count}개 데이터 포인트 저장 완료");
}
public async Task<List<TagValue>> GetTagHistoryAsync(string tagName, TimeSpan duration)
{
var start = DateTime.UtcNow - duration;
var end = DateTime.UtcNow;
return await _influx.QueryRangeAsync(tagName, start, end);
}
public async Task<double> GetAverageAsync(string tagName, TimeSpan duration)
{
var query = $@"
from(bucket: ""caffeine"")
|> range(start: -{(int)duration.TotalHours}h)
|> filter(fn: (r) => r.tagName == ""{tagName}"")
|> mean()
";
var results = await _influx.QueryAsync(query);
return results.FirstOrDefault()?.Value as double? ?? 0.0;
}
}
Kafka 이벤트 스트리밍
using Caffeine.Infrastructure.Messaging;
public class EventStreamService
{
private readonly KafkaProducer _kafka;
public EventStreamService(ProducerConfig config, ILogger<EventStreamService> logger)
{
_kafka = new KafkaProducer(config, logger);
}
public async Task PublishTagChangeEventAsync(TagValue tagValue)
{
var result = await _kafka.ProduceAsync(
topic: "tag-changes",
key: tagValue.TagName,
value: tagValue
);
Console.WriteLine($"이벤트 발행: {tagValue.TagName} → Partition {result.Partition}");
}
public async Task PublishAlarmEventAsync(string tagName, string message, string severity)
{
var alarmEvent = new
{
TagName = tagName,
Message = message,
Severity = severity,
Timestamp = DateTime.UtcNow
};
await _kafka.ProduceAsync(
topic: "alarms",
key: tagName,
value: alarmEvent
);
}
}
통합 데이터 파이프라인
public class DataPipeline
{
private readonly RedisRepository _redis;
private readonly InfluxDbRepository _influx;
private readonly KafkaProducer _kafka;
public DataPipeline(
RedisRepository redis,
InfluxDbRepository influx,
KafkaProducer kafka)
{
_redis = redis;
_influx = influx;
_kafka = kafka;
}
public async Task ProcessTagValueAsync(TagValue tagValue)
{
// 1. Redis 캐시 업데이트 (실시간 조회용)
await _redis.SetAsync($"tags:{tagValue.TagName}", tagValue, TimeSpan.FromMinutes(5));
// 2. InfluxDB 저장 (장기 보관용)
await _influx.WriteTagAsync(tagValue);
// 3. Kafka 이벤트 발행 (외부 시스템 연동용)
await _kafka.ProduceAsync("tag-events", tagValue.TagName, tagValue);
// 4. Redis Pub/Sub 발행 (내부 구독자용)
await _redis.PublishAsync("tag-changes", tagValue);
}
}
설정
Redis 설정
var redis = ConnectionMultiplexer.Connect(new ConfigurationOptions
{
EndPoints = { "localhost:6379" },
Password = "your-password",
ConnectTimeout = 5000,
SyncTimeout = 5000,
AbortOnConnectFail = false
});
var repository = new RedisRepository(redis, logger);
InfluxDB 설정
var influxClient = new InfluxDBClient(new InfluxDBClientOptions
{
Url = "http://localhost:8086",
Token = "your-token",
Org = "nexcode",
Bucket = "caffeine"
});
var repository = new InfluxDbRepository(influxClient, logger);
Kafka 설정
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
ClientId = "caffeine-producer",
Acks = Acks.All,
EnableIdempotence = true,
MaxInFlight = 5,
CompressionType = CompressionType.Snappy
};
var producer = new KafkaProducer(config, logger);
성능 최적화
Redis 파이프라인
public async Task SaveMultipleTagsAsync(List<TagValue> tagValues)
{
var batch = _redis.CreateBatch();
var tasks = tagValues.Select(tv =>
batch.SetAsync($"tags:{tv.TagName}", tv, TimeSpan.FromMinutes(5))
);
batch.Execute();
await Task.WhenAll(tasks);
}
InfluxDB 배치 쓰기
// 1000개씩 배치로 저장
const int batchSize = 1000;
for (int i = 0; i < tagValues.Count; i += batchSize)
{
var batch = tagValues.Skip(i).Take(batchSize);
await _influx.WriteTagsAsync(batch);
}
참고 항목
- Caffeine.Core API - 핵심 API
- Caffeine.Client API - 클라이언트 SDK
- Docker 배포 가이드
- 데이터 파이프라인 튜토리얼
🆕 Phase 15 업데이트 (2026-02-05)
Polly v8 Resilience Pipeline
InfluxDB와 gRPC 통신에 현대적인 복원력 파이프라인이 적용되었습니다.
InfluxDbTraceWriter
using Polly;
using Polly.Retry;
using Polly.CircuitBreaker;
public class InfluxDbTraceWriter : ITraceWriter
{
private readonly ResiliencePipeline _resiliencePipeline;
public InfluxDbTraceWriter(ILogger<InfluxDbTraceWriter> logger, ...)
{
_resiliencePipeline = new ResiliencePipelineBuilder()
.AddRetry(new RetryStrategyOptions
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromSeconds(1),
BackoffType = DelayBackoffType.Exponential // 1s, 2s, 4s
})
.AddCircuitBreaker(new CircuitBreakerStrategyOptions
{
FailureRatio = 0.5,
SamplingDuration = TimeSpan.FromSeconds(30),
BreakDuration = TimeSpan.FromSeconds(30)
})
.Build();
}
public async Task WriteAsync(ProcessedValue[] values)
{
await _resiliencePipeline.ExecuteAsync(async ct =>
{
var points = values.Select(v => CreatePoint(v)).ToList();
await _writeApi.WritePointsAsync(points, _bucket, _org);
});
}
}
주요 기능:
- 지수 백오프 재시도: 1초, 2초, 4초 (최대 3회)
- Circuit Breaker: 5회 연속 실패 시 30초 차단
- 구조화 로깅: ILogger 통합
🆕 v2.1.0 업데이트 (2026-03-07)
FIR-013: Resilience DI 표준화
AddCaffeineResilience() 내부 구현이 AddResiliencePipeline (표준 Polly.Extensions 패턴)으로 전환되어 ResiliencePipelineProvider<string>를 통한 표준 접근이 가능합니다.
등록 (DI)
// Program.cs / Startup.cs
builder.Services.AddCaffeineResilience();
7개 기본 파이프라인 키
| 서비스 | 파이프라인 키 | 정책 |
|---|---|---|
| InfluxDB 시계열 DB | "influxdb" | Retry 3 + CircuitBreaker + Timeout 10s |
| Redis 캐시/스트림 | "redis" | Retry 3 + CircuitBreaker + Timeout 5s |
| TypeDB 지식 그래프 | "typedb" | Retry 3 + CircuitBreaker + Timeout 30s |
| Kafka 메시지 브로커 | "kafka" | Retry 3 + Timeout 15s |
| MQTT Edge 전송 | "mqtt" | Retry 3 + Timeout 30s |
| Email 알림 | "email" | Retry 2 + Timeout 15s |
| Teams Webhook | "teams" | Retry 2 + Timeout 10s |
사용 방법 비교
// v2.1.0 — 표준 방식 (ResiliencePipelineProvider)
public class InfluxDbRepository
{
private readonly ResiliencePipeline _pipeline;
public InfluxDbRepository(ResiliencePipelineProvider<string> provider)
{
_pipeline = provider.GetPipeline("influxdb");
}
public async Task WriteAsync(CancellationToken ct)
{
await _pipeline.ExecuteAsync(async ct => { /* 외부 호출 */ }, ct);
}
}
// v2.0.x — 키드 서비스 방식 (하위 호환 유지)
public class LegacyService
{
public LegacyService([FromKeyedServices("influxdb")] ResiliencePipeline pipeline) { }
}
FIR-014: AddCaffeineLicenseVerification()
검증 전용 라이선스 DI 확장 메서드로, 라이선스 생성·서명 기능 없이 검증에 필요한 서비스만 등록합니다.
등록
// Program.cs
builder.Services.AddCaffeineLicenseVerification();
등록되는 서비스
| 인터페이스 | 구현체 | 생명주기 |
|---|---|---|
ILicenseService | LicenseVerificationService | Singleton |
IHardwareInfo | DefaultHardwareInfo | Singleton |
DefaultHardwareInfo 크로스 플랫폼 지원
// Windows: Win32 API 기반 HWID 수집
// Linux: /proc/cpuinfo + /etc/machine-id 기반
// macOS: system_profiler 기반 폴백
var hwInfo = new DefaultHardwareInfo();
string hwid = hwInfo.GetHardwareId();
AddCaffeineLicenseVerification()은 검증(Verification) 전용입니다. 라이선스 생성·서명 기능은 내부 전용 Caffeine.Licensing 어셈블리에만 포함됩니다.
RedisService with IDisposable
리소스 자동 정리를 위한 IDisposable 패턴 구현
public class RedisService : IRedisService, IDisposable
{
private readonly ConnectionMultiplexer _redis;
public void Dispose()
{
_redis?.Dispose();
GC.SuppressFinalize(this);
}
}
RedisCurrentValueProvider Async
Deadlock 방지를 위한 비동기 메서드 추가
public class RedisCurrentValueProvider : ICurrentValueProvider
{
// 새로 추가된 비동기 메서드
public async Task<object?> GetValueAsync(string tagId)
{
return await _redis.GetAsync<object?>(GetKey(tagId));
}
// 기존 동기 메서드 (하위 호환성)
public object? GetValue(string tagId)
{
return GetValueAsync(tagId).GetAwaiter().GetResult();
}
}
인터페이스:
public interface ICurrentValueProvider
{
object? GetValue(string tagId);
Task<object?> GetValueAsync(string tagId); // Phase 15 추가
}
RedisBulkService Bounded Channel
OOM 방지를 위한 Bounded Channel 전환
public class RedisBulkService : BackgroundService
{
private readonly Channel<RedisBulkMessage> _channel;
public RedisBulkService(...)
{
_channel = Channel.CreateBounded<RedisBulkMessage>(
new BoundedChannelOptions(100000)
{
FullMode = BoundedChannelFullMode.Wait
});
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var message in _channel.Reader.ReadAllAsync(stoppingToken))
{
// 지수 백오프 재시도 (최대 3회)
int retryCount = 0;
while (retryCount < 3)
{
try
{
await _redis.SetAsync(message.Key, message.Value);
break;
}
catch (Exception ex)
{
retryCount++;
var delay = TimeSpan.FromSeconds(Math.Min(Math.Pow(2, retryCount), 30));
await Task.Delay(delay, stoppingToken);
}
}
}
}
}
설정:
- 용량: 100,000개
- 전략: Wait (생산자 대기)
- 재시도: 지수 백오프 (1s, 2s, 4s, 8s, 16s, 최대 30s)
InfluxDB 크리티컬 버그 수정
PointData 불변 객체 미반영 문제 해결:
// ❌ Before (Bug)
var point = PointData.Measurement("tag_standard_trace");
point.Field("value", convertedValue); // 반환값 무시!
// ✅ After (Fixed)
var point = PointData.Measurement("tag_standard_trace");
point = point.Field("value", convertedValue); // 재할당 필수!
영향: InfluxDB에 데이터가 전혀 기록되지 않던 문제 → Phase 15에서 수정 완료
검증 완료 사항
| 항목 | 결과 |
|---|---|
| 타임스탬프 정밀도 | Nanosecond (9자리) |
| Float/Double 정밀도 | 소수점 15자리 |
| Int32 지원 | > 32767 값 저장 가능 |
| 전체 테스트 | 175/175 통과 (100%) |
| InfluxDB 레코드 | 4228+ 확인 |
네임스페이스: Caffeine.Infrastructure 어셈블리: Caffeine.Infrastructure.dll (v2.1.0) .NET 플랫폼: .NET 10.0
Phase 15 업데이트: 2026-02-05 변경 이력: Changelog