본문으로 건너뛰기

Caffeine.Infrastructure API

Caffeine Framework 인프라 서비스 API 레퍼런스입니다.

개요

Caffeine.Infrastructure는 데이터 저장, 메시징, 로깅 등 인프라 서비스를 제공하는 모듈입니다.

네임스페이스: Caffeine.Infrastructure
어셈블리: Caffeine.Infrastructure.dll
.NET 버전: .NET 10.0


주요 클래스

RedisRepository

Redis 기반 실시간 데이터 캐시 저장소입니다.

네임스페이스: Caffeine.Infrastructure.Repositories

생성자

public RedisRepository(IConnectionMultiplexer redis, ILogger<RedisRepository> logger)

매개변수:

  • redis (IConnectionMultiplexer): Redis 연결
  • logger (ILogger<RedisRepository>): 로거

주요 메서드

SetAsync()

Redis에 키-값 쌍을 비동기로 저장합니다.

Task<bool> SetAsync<T>(string key, T value, TimeSpan? expiry = null);

매개변수:

  • key (string): 키
  • value (T): 저장할 값
  • expiry (TimeSpan?): 만료 시간 (선택)

반환값: Task<bool> - 저장 성공 시 true

사용 예시:

var repository = new RedisRepository(redis, logger);

// 태그 값 저장 (1시간 TTL)
var tagValue = new TagValue("Equipment1.Temperature", 72.5);
await repository.SetAsync($"tags:{tagValue.TagName}", tagValue, TimeSpan.FromHours(1));
GetAsync()

Redis에서 값을 비동기로 가져옵니다.

Task<T?> GetAsync<T>(string key);

매개변수:

  • key (string): 키

반환값: Task<T?> - 값 또는 null

사용 예시:

var tagValue = await repository.GetAsync<TagValue>("tags:Equipment1.Temperature");
if (tagValue != null)
{
Console.WriteLine($"캐시된 온도: {tagValue.Value}°C");
}
DeleteAsync()

Redis에서 키를 삭제합니다.

Task<bool> DeleteAsync(string key);

사용 예시:

await repository.DeleteAsync("tags:Equipment1.Temperature");
PublishAsync()

Redis Pub/Sub으로 메시지를 발행합니다.

Task<long> PublishAsync<T>(string channel, T message);

사용 예시:

// 태그 변경 이벤트 발행
var tagValue = new TagValue("Equipment1.Temperature", 75.0);
await repository.PublishAsync("tag-changes", tagValue);
SubscribeAsync()

Redis Pub/Sub 채널을 구독합니다.

Task SubscribeAsync<T>(string channel, Action<T> handler);

사용 예시:

await repository.SubscribeAsync<TagValue>("tag-changes", (tagValue) =>
{
Console.WriteLine($"태그 변경: {tagValue.TagName} = {tagValue.Value}");
});

InfluxDbRepository

InfluxDB 기반 시계열 데이터 저장소입니다.

네임스페이스: Caffeine.Infrastructure.Repositories

생성자

public InfluxDbRepository(InfluxDBClient client, ILogger<InfluxDbRepository> logger)

주요 메서드

WriteTagAsync()

태그 값을 InfluxDB에 저장합니다.

Task WriteTagAsync(TagValue tagValue, string bucket = "caffeine");

매개변수:

  • tagValue (TagValue): 저장할 태그 값
  • bucket (string): InfluxDB 버킷 이름

사용 예시:

var repository = new InfluxDbRepository(influxClient, logger);

var tagValue = new TagValue("Equipment1.Temperature", 72.5);
await repository.WriteTagAsync(tagValue);
WriteTagsAsync()

여러 태그 값을 배치로 저장합니다.

Task WriteTagsAsync(IEnumerable<TagValue> tagValues, string bucket = "caffeine");

사용 예시:

var tagValues = new List<TagValue>
{
new("Equipment1.Temperature", 72.5),
new("Equipment1.Pressure", 101.3),
new("Equipment1.Flow", 45.2)
};

await repository.WriteTagsAsync(tagValues);
QueryAsync()

Flux 쿼리로 데이터를 조회합니다.

Task<List<TagValue>> QueryAsync(string fluxQuery, string bucket = "caffeine");

사용 예시:

var query = @"
from(bucket: ""caffeine"")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == ""tags"")
|> filter(fn: (r) => r.tagName == ""Equipment1.Temperature"")
";

var results = await repository.QueryAsync(query);
foreach (var tagValue in results)
{
Console.WriteLine($"{tagValue.Timestamp}: {tagValue.Value}");
}
QueryRangeAsync()

시간 범위로 데이터를 조회합니다.

Task<List<TagValue>> QueryRangeAsync(string tagName, DateTime start, DateTime end, string bucket = "caffeine");

사용 예시:

var start = DateTime.UtcNow.AddHours(-24);
var end = DateTime.UtcNow;

var history = await repository.QueryRangeAsync("Equipment1.Temperature", start, end);
Console.WriteLine($"{history.Count}개 데이터 포인트 조회됨");

KafkaProducer

Kafka 메시지 프로듀서입니다.

네임스페이스: Caffeine.Infrastructure.Messaging

생성자

public KafkaProducer(ProducerConfig config, ILogger<KafkaProducer> logger)

주요 메서드

ProduceAsync()

Kafka 토픽에 메시지를 발행합니다.

Task<DeliveryResult<string, T>> ProduceAsync<T>(string topic, string key, T value);

매개변수:

  • topic (string): Kafka 토픽
  • key (string): 메시지 키
  • value (T): 메시지 값

사용 예시:

var producer = new KafkaProducer(config, logger);

var tagValue = new TagValue("Equipment1.Temperature", 72.5);
var result = await producer.ProduceAsync(
topic: "tag-events",
key: tagValue.TagName,
value: tagValue
);

Console.WriteLine($"메시지 발행: Partition {result.Partition}, Offset {result.Offset}");
ProduceBatchAsync()

여러 메시지를 배치로 발행합니다.

Task<List<DeliveryResult<string, T>>> ProduceBatchAsync<T>(string topic, Dictionary<string, T> messages);

사용 예시:

var messages = new Dictionary<string, TagValue>
{
["Equipment1.Temperature"] = new TagValue("Equipment1.Temperature", 72.5),
["Equipment1.Pressure"] = new TagValue("Equipment1.Pressure", 101.3)
};

var results = await producer.ProduceBatchAsync("tag-events", messages);
Console.WriteLine($"{results.Count}개 메시지 발행 완료");

예제

Redis 캐시 사용

using Caffeine.Infrastructure.Repositories;
using StackExchange.Redis;

public class TagCacheService
{
private readonly RedisRepository _redis;

public TagCacheService(IConnectionMultiplexer redis, ILogger<TagCacheService> logger)
{
_redis = new RedisRepository(redis, logger);
}

public async Task<TagValue?> GetOrFetchTagAsync(string tagName, Func<Task<TagValue>> fetchFunc)
{
// 1. 캐시 확인
var cached = await _redis.GetAsync<TagValue>($"tags:{tagName}");
if (cached != null)
{
return cached;
}

// 2. 캐시 미스 - 원본 데이터 가져오기
var tagValue = await fetchFunc();

// 3. 캐시에 저장 (1분 TTL)
await _redis.SetAsync($"tags:{tagName}", tagValue, TimeSpan.FromMinutes(1));

return tagValue;
}
}

InfluxDB 시계열 데이터 저장

using Caffeine.Infrastructure.Repositories;

public class TimeSeriesDataService
{
private readonly InfluxDbRepository _influx;

public TimeSeriesDataService(InfluxDBClient client, ILogger<TimeSeriesDataService> logger)
{
_influx = new InfluxDbRepository(client, logger);
}

public async Task SaveTagHistoryAsync(string tagName, List<TagValue> values)
{
// 배치로 저장 (성능 최적화)
await _influx.WriteTagsAsync(values);
Console.WriteLine($"{values.Count}개 데이터 포인트 저장 완료");
}

public async Task<List<TagValue>> GetTagHistoryAsync(string tagName, TimeSpan duration)
{
var start = DateTime.UtcNow - duration;
var end = DateTime.UtcNow;

return await _influx.QueryRangeAsync(tagName, start, end);
}

public async Task<double> GetAverageAsync(string tagName, TimeSpan duration)
{
var query = $@"
from(bucket: ""caffeine"")
|> range(start: -{(int)duration.TotalHours}h)
|> filter(fn: (r) => r.tagName == ""{tagName}"")
|> mean()
";

var results = await _influx.QueryAsync(query);
return results.FirstOrDefault()?.Value as double? ?? 0.0;
}
}

Kafka 이벤트 스트리밍

using Caffeine.Infrastructure.Messaging;

public class EventStreamService
{
private readonly KafkaProducer _kafka;

public EventStreamService(ProducerConfig config, ILogger<EventStreamService> logger)
{
_kafka = new KafkaProducer(config, logger);
}

public async Task PublishTagChangeEventAsync(TagValue tagValue)
{
var result = await _kafka.ProduceAsync(
topic: "tag-changes",
key: tagValue.TagName,
value: tagValue
);

Console.WriteLine($"이벤트 발행: {tagValue.TagName} → Partition {result.Partition}");
}

public async Task PublishAlarmEventAsync(string tagName, string message, string severity)
{
var alarmEvent = new
{
TagName = tagName,
Message = message,
Severity = severity,
Timestamp = DateTime.UtcNow
};

await _kafka.ProduceAsync(
topic: "alarms",
key: tagName,
value: alarmEvent
);
}
}

통합 데이터 파이프라인

public class DataPipeline
{
private readonly RedisRepository _redis;
private readonly InfluxDbRepository _influx;
private readonly KafkaProducer _kafka;

public DataPipeline(
RedisRepository redis,
InfluxDbRepository influx,
KafkaProducer kafka)
{
_redis = redis;
_influx = influx;
_kafka = kafka;
}

public async Task ProcessTagValueAsync(TagValue tagValue)
{
// 1. Redis 캐시 업데이트 (실시간 조회용)
await _redis.SetAsync($"tags:{tagValue.TagName}", tagValue, TimeSpan.FromMinutes(5));

// 2. InfluxDB 저장 (장기 보관용)
await _influx.WriteTagAsync(tagValue);

// 3. Kafka 이벤트 발행 (외부 시스템 연동용)
await _kafka.ProduceAsync("tag-events", tagValue.TagName, tagValue);

// 4. Redis Pub/Sub 발행 (내부 구독자용)
await _redis.PublishAsync("tag-changes", tagValue);
}
}

설정

Redis 설정

var redis = ConnectionMultiplexer.Connect(new ConfigurationOptions
{
EndPoints = { "localhost:6379" },
Password = "your-password",
ConnectTimeout = 5000,
SyncTimeout = 5000,
AbortOnConnectFail = false
});

var repository = new RedisRepository(redis, logger);

InfluxDB 설정

var influxClient = new InfluxDBClient(new InfluxDBClientOptions
{
Url = "http://localhost:8086",
Token = "your-token",
Org = "nexcode",
Bucket = "caffeine"
});

var repository = new InfluxDbRepository(influxClient, logger);

Kafka 설정

var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
ClientId = "caffeine-producer",
Acks = Acks.All,
EnableIdempotence = true,
MaxInFlight = 5,
CompressionType = CompressionType.Snappy
};

var producer = new KafkaProducer(config, logger);

성능 최적화

Redis 파이프라인

public async Task SaveMultipleTagsAsync(List<TagValue> tagValues)
{
var batch = _redis.CreateBatch();
var tasks = tagValues.Select(tv =>
batch.SetAsync($"tags:{tv.TagName}", tv, TimeSpan.FromMinutes(5))
);

batch.Execute();
await Task.WhenAll(tasks);
}

InfluxDB 배치 쓰기

// 1000개씩 배치로 저장
const int batchSize = 1000;
for (int i = 0; i < tagValues.Count; i += batchSize)
{
var batch = tagValues.Skip(i).Take(batchSize);
await _influx.WriteTagsAsync(batch);
}

참고 항목


🆕 Phase 15 업데이트 (2026-02-05)

Polly v8 Resilience Pipeline

InfluxDB와 gRPC 통신에 현대적인 복원력 파이프라인이 적용되었습니다.

InfluxDbTraceWriter

using Polly;
using Polly.Retry;
using Polly.CircuitBreaker;

public class InfluxDbTraceWriter : ITraceWriter
{
private readonly ResiliencePipeline _resiliencePipeline;

public InfluxDbTraceWriter(ILogger<InfluxDbTraceWriter> logger, ...)
{
_resiliencePipeline = new ResiliencePipelineBuilder()
.AddRetry(new RetryStrategyOptions
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromSeconds(1),
BackoffType = DelayBackoffType.Exponential // 1s, 2s, 4s
})
.AddCircuitBreaker(new CircuitBreakerStrategyOptions
{
FailureRatio = 0.5,
SamplingDuration = TimeSpan.FromSeconds(30),
BreakDuration = TimeSpan.FromSeconds(30)
})
.Build();
}

public async Task WriteAsync(ProcessedValue[] values)
{
await _resiliencePipeline.ExecuteAsync(async ct =>
{
var points = values.Select(v => CreatePoint(v)).ToList();
await _writeApi.WritePointsAsync(points, _bucket, _org);
});
}
}

주요 기능:

  • 지수 백오프 재시도: 1초, 2초, 4초 (최대 3회)
  • Circuit Breaker: 5회 연속 실패 시 30초 차단
  • 구조화 로깅: ILogger 통합

🆕 v2.1.0 업데이트 (2026-03-07)

FIR-013: Resilience DI 표준화

AddCaffeineResilience() 내부 구현이 AddResiliencePipeline (표준 Polly.Extensions 패턴)으로 전환되어 ResiliencePipelineProvider<string>를 통한 표준 접근이 가능합니다.

등록 (DI)

// Program.cs / Startup.cs
builder.Services.AddCaffeineResilience();

7개 기본 파이프라인 키

서비스파이프라인 키정책
InfluxDB 시계열 DB"influxdb"Retry 3 + CircuitBreaker + Timeout 10s
Redis 캐시/스트림"redis"Retry 3 + CircuitBreaker + Timeout 5s
TypeDB 지식 그래프"typedb"Retry 3 + CircuitBreaker + Timeout 30s
Kafka 메시지 브로커"kafka"Retry 3 + Timeout 15s
MQTT Edge 전송"mqtt"Retry 3 + Timeout 30s
Email 알림"email"Retry 2 + Timeout 15s
Teams Webhook"teams"Retry 2 + Timeout 10s

사용 방법 비교

// v2.1.0 — 표준 방식 (ResiliencePipelineProvider)
public class InfluxDbRepository
{
private readonly ResiliencePipeline _pipeline;

public InfluxDbRepository(ResiliencePipelineProvider<string> provider)
{
_pipeline = provider.GetPipeline("influxdb");
}

public async Task WriteAsync(CancellationToken ct)
{
await _pipeline.ExecuteAsync(async ct => { /* 외부 호출 */ }, ct);
}
}

// v2.0.x — 키드 서비스 방식 (하위 호환 유지)
public class LegacyService
{
public LegacyService([FromKeyedServices("influxdb")] ResiliencePipeline pipeline) { }
}

FIR-014: AddCaffeineLicenseVerification()

검증 전용 라이선스 DI 확장 메서드로, 라이선스 생성·서명 기능 없이 검증에 필요한 서비스만 등록합니다.

등록

// Program.cs
builder.Services.AddCaffeineLicenseVerification();

등록되는 서비스

인터페이스구현체생명주기
ILicenseServiceLicenseVerificationServiceSingleton
IHardwareInfoDefaultHardwareInfoSingleton

DefaultHardwareInfo 크로스 플랫폼 지원

// Windows: Win32 API 기반 HWID 수집
// Linux: /proc/cpuinfo + /etc/machine-id 기반
// macOS: system_profiler 기반 폴백
var hwInfo = new DefaultHardwareInfo();
string hwid = hwInfo.GetHardwareId();
보안 정책

AddCaffeineLicenseVerification()은 검증(Verification) 전용입니다. 라이선스 생성·서명 기능은 내부 전용 Caffeine.Licensing 어셈블리에만 포함됩니다.

RedisService with IDisposable

리소스 자동 정리를 위한 IDisposable 패턴 구현

public class RedisService : IRedisService, IDisposable
{
private readonly ConnectionMultiplexer _redis;

public void Dispose()
{
_redis?.Dispose();
GC.SuppressFinalize(this);
}
}

RedisCurrentValueProvider Async

Deadlock 방지를 위한 비동기 메서드 추가

public class RedisCurrentValueProvider : ICurrentValueProvider
{
// 새로 추가된 비동기 메서드
public async Task<object?> GetValueAsync(string tagId)
{
return await _redis.GetAsync<object?>(GetKey(tagId));
}

// 기존 동기 메서드 (하위 호환성)
public object? GetValue(string tagId)
{
return GetValueAsync(tagId).GetAwaiter().GetResult();
}
}

인터페이스:

public interface ICurrentValueProvider
{
object? GetValue(string tagId);
Task<object?> GetValueAsync(string tagId); // Phase 15 추가
}

RedisBulkService Bounded Channel

OOM 방지를 위한 Bounded Channel 전환

public class RedisBulkService : BackgroundService
{
private readonly Channel<RedisBulkMessage> _channel;

public RedisBulkService(...)
{
_channel = Channel.CreateBounded<RedisBulkMessage>(
new BoundedChannelOptions(100000)
{
FullMode = BoundedChannelFullMode.Wait
});
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var message in _channel.Reader.ReadAllAsync(stoppingToken))
{
// 지수 백오프 재시도 (최대 3회)
int retryCount = 0;
while (retryCount < 3)
{
try
{
await _redis.SetAsync(message.Key, message.Value);
break;
}
catch (Exception ex)
{
retryCount++;
var delay = TimeSpan.FromSeconds(Math.Min(Math.Pow(2, retryCount), 30));
await Task.Delay(delay, stoppingToken);
}
}
}
}
}

설정:

  • 용량: 100,000개
  • 전략: Wait (생산자 대기)
  • 재시도: 지수 백오프 (1s, 2s, 4s, 8s, 16s, 최대 30s)

InfluxDB 크리티컬 버그 수정

PointData 불변 객체 미반영 문제 해결:

// ❌ Before (Bug)
var point = PointData.Measurement("tag_standard_trace");
point.Field("value", convertedValue); // 반환값 무시!

// ✅ After (Fixed)
var point = PointData.Measurement("tag_standard_trace");
point = point.Field("value", convertedValue); // 재할당 필수!

영향: InfluxDB에 데이터가 전혀 기록되지 않던 문제 → Phase 15에서 수정 완료

검증 완료 사항

항목결과
타임스탬프 정밀도Nanosecond (9자리)
Float/Double 정밀도소수점 15자리
Int32 지원> 32767 값 저장 가능
전체 테스트175/175 통과 (100%)
InfluxDB 레코드4228+ 확인

네임스페이스: Caffeine.Infrastructure 어셈블리: Caffeine.Infrastructure.dll (v2.1.0) .NET 플랫폼: .NET 10.0

Phase 15 업데이트: 2026-02-05 변경 이력: Changelog