Edge↔Cloud 통신 API
Caffeine Edge↔Cloud 통신 추상화 시스템의 인터페이스, 타입, 구현체 레퍼런스이다.
개요
Caffeine Bridge(Edge)와 Engine(Cloud) 간의 통신을 추상화하여, gRPC/MQTT/Hybrid 전송 프로토콜을 런타임에 교체할 수 있다. 네트워크 단절 시 IOfflineBuffer가 메시지를 디스크에 영속 저장하고, 재연결 시 FIFO 순서로 자동 복원한다.
Core 인터페이스
IEdgeCloudTransport
Edge↔Cloud 통합 전송 인터페이스. IAsyncDisposable을 구현한다.
위치: src/Caffeine.Core/Abstractions/Communication/IEdgeCloudTransport.cs
public interface IEdgeCloudTransport : IAsyncDisposable
{
/// 전송 프로토콜 타입 ("grpc", "mqtt", "hybrid")
string TransportType { get; }
/// 현재 연결 상태
bool IsConnected { get; }
/// 연결 수립
Task ConnectAsync(TransportOptions options, CancellationToken ct = default);
/// 연결 해제
Task DisconnectAsync(CancellationToken ct = default);
/// 데이터 전송 (Bridge → Engine)
Task<bool> SendAsync(TransportMessage message, CancellationToken ct = default);
/// 명령 수신 스트림 (Engine → Bridge)
IObservable<TransportMessage> IncomingMessages { get; }
/// 상태 변경 스트림
IObservable<TransportState> StateChanges { get; }
/// 전송 메트릭 조회
TransportMetrics GetMetrics();
}
System.Reactive 통합: IncomingMessages와 StateChanges는 IObservable<T>을 반환하여, Rx 파이프라인으로 메시지를 처리할 수 있다.
IOfflineBuffer
오프라인 메시지 버퍼 인터페이스. 네트워크 연결이 끊긴 동안 메시지를 영속적으로 저장하고, 재연결 시 FIFO 순서로 전송한다.
위치: src/Caffeine.Core/Abstractions/Communication/IOfflineBuffer.cs
public interface IOfflineBuffer : IAsyncDisposable
{
/// 메시지를 버퍼에 추가
Task EnqueueAsync(TransportMessage message, CancellationToken ct = default);
/// 버퍼의 모든 메시지를 전송 채널로 Flush (전송 성공 건수 반환)
Task<int> FlushAsync(IEdgeCloudTransport transport, CancellationToken ct = default);
/// 대기 중인 메시지 수
int PendingCount { get; }
/// 대기 중인 메시지의 총 바이트 크기
long PendingBytes { get; }
/// 현재 적용 중인 버퍼 설정
OfflineBufferOptions Options { get; }
}
OfflineBufferOptions
오프라인 버퍼 설정 sealed record. IOptions<OfflineBufferOptions> 주입 방식으로 DI 등록한다.
위치: src/Caffeine.Core/Abstractions/Communication/OfflineBufferOptions.cs
public sealed record OfflineBufferOptions
{
/// SQLite DB 파일 경로 (기본: "offline_buffer.db")
public string DbPath { get; init; } = "offline_buffer.db";
/// 최대 버퍼 크기 MB (기본: 100)
public int MaxSizeMB { get; init; } = 100;
/// 메시지 TTL 시간 (기본: 24시간)
public int MessageTtlHours { get; init; } = 24;
/// 오프라인 버퍼 활성화 여부 (기본: true)
public bool Enabled { get; init; } = true;
}
| 속성 | 기본값 | 설명 |
|---|---|---|
DbPath | "offline_buffer.db" | SQLite DB 파일 경로 |
MaxSizeMB | 100 | 최대 버퍼 크기 (MB), 초과 시 오래된 메시지 삭제 |
MessageTtlHours | 24 | 메시지 TTL (시간), 만료 시 자동 정리 |
Enabled | true | 오프라인 버퍼 활성화 여부 |
DI 등록 예시:
services.Configure<OfflineBufferOptions>(
configuration.GetSection("OfflineBuffer"));
services.AddSingleton<IOfflineBuffer, SqliteOfflineBuffer>();
타입 정의
TransportMessage
Edge↔Cloud 전송 메시지 record. 드라이버 데이터(Bridge→Engine)와 명령(Engine→Bridge)을 통합 표현한다.
위치: src/Caffeine.Core/Abstractions/Communication/TransportMessage.cs
public sealed record TransportMessage(
string DriverId,
string Topic,
ReadOnlyMemory<byte> Payload,
MessageDirection Direction,
DateTimeOffset Timestamp,
Dictionary<string, string>? Headers = null,
int Priority = 0,
string? DeviceId = null)
{
/// 데이터 전송용 팩토리 메서드 (Bridge → Engine)
static TransportMessage CreateOutbound(string driverId, string topic, ReadOnlyMemory<byte> payload);
/// 명령 수신용 팩토리 메서드 (Engine → Bridge)
static TransportMessage CreateInbound(string driverId, string topic, ReadOnlyMemory<byte> payload);
/// 우선순위 지정 Outbound 메시지
static TransportMessage CreateOutboundWithPriority(
string driverId, string topic, ReadOnlyMemory<byte> payload, int priority);
/// DeviceId 지정 Outbound 메시지 — MQTT 토픽 라우팅용
static TransportMessage CreateOutboundForDevice(
string driverId, string deviceId, string topic, ReadOnlyMemory<byte> payload);
}
MessagePriority
우선순위 정적 상수 클래스.
public static class MessagePriority
{
public const int Normal = 0; // 기본 우선순위
public const int High = 1; // 높은 우선순위
public const int Critical = 2; // 긴급 메시지
}
MessageDirection
public enum MessageDirection
{
Outbound, // Bridge → Engine (데이터 전송)
Inbound // Engine → Bridge (명령 수신)
}
TransportOptions
전송 채널 설정.
위치: src/Caffeine.Core/Abstractions/Communication/TransportOptions.cs
public class TransportOptions
{
/// Engine(서버) 주소
public string ServerUrl { get; set; } = "http://localhost:5000";
/// 인증 API 키
public string? ApiKey { get; set; }
/// TLS 사용 여부
public bool UseTls { get; set; }
/// 연결 타임아웃 (기본 30초)
public TimeSpan ConnectTimeout { get; set; } = TimeSpan.FromSeconds(30);
/// 최대 재시도 횟수 (기본 5회)
public int MaxRetryAttempts { get; set; } = 5;
/// 재시도 기본 대기 시간 (기본 2초, 지수 백오프 적용)
public TimeSpan RetryBaseDelay { get; set; } = TimeSpan.FromSeconds(2);
}
TransportMetrics
전송 채널 메트릭 record.
위치: src/Caffeine.Core/Abstractions/Communication/TransportMetrics.cs
public sealed record TransportMetrics(
long MessagesSent,
long MessagesReceived,
long BytesSent,
long BytesReceived,
long SendErrors,
int ReconnectCount,
DateTimeOffset? LastConnectedAt,
DateTimeOffset? LastDisconnectedAt,
double AvgLatencyMs = 0,
double P99LatencyMs = 0,
int QueueDepth = 0);
| 추가 속성 | 타입 | 설명 |
|---|---|---|
AvgLatencyMs | double | 평균 전송 레이턴시 (ms) |
P99LatencyMs | double | 99번째 백분위 레이턴시 (ms) |
QueueDepth | int | 현재 전송 큐 깊이 |
TransportState
전송 채널 상태 머신.
위치: src/Caffeine.Core/Abstractions/Communication/TransportState.cs
public enum TransportState
{
Disconnected, // 초기 상태, 연결 전
Connecting, // 연결 시도 중
Connected, // 연결됨, 정상 운영
Reconnecting, // 재연결 시도 중
Faulted // 오류로 인한 연결 실패
}
상태 전이:
구현체
GrpcEdgeCloudTransport
기존 gRPC NativeBridgeService를 IEdgeCloudTransport로 래핑한 구현체. LAN 환경에서 고성능 양방향 스트리밍을 제공한다.
위치: src/Caffeine.Bridge.Host/Transport/GrpcEdgeCloudTransport.cs
| 항목 | 값 |
|---|---|
| TransportType | "grpc" |
| 프로토콜 | gRPC h2c (HTTP/2 unencrypted) |
| 재시도 | Polly 지수 백오프 (2s, 4s, 8s, 16s, 32s) |
| 스트리밍 | AsyncDuplexStreamingCall (양방향) |
| 직렬화 | Protobuf (IoPacket, ControlCommand) |
수신 명령 처리:
ScanConfig: 드라이버 스캔 계획 업데이트 (IIOScheduler.UpdateScanPlan)WriteRequest: 드라이버 태그 쓰기 (IIOScheduler.ScheduleWriteAsync)
레이턴시 메트릭 수집 (R-3):
SendAsync()호출 시_totalLatencyMs,_latencySamples누적 →AvgLatencyMs계산_p99LatencyMs슬라이딩 윈도우 방식으로 추정_queueDepthChannel의 현재 메시지 수 반영
MqttEdgeCloudTransport
MQTTnet 기반 IEdgeCloudTransport 구현체. WAN 환경에서 MQTT 브로커를 통해 통신한다.
위치: src/Caffeine.Bridge.Host/Transport/MqttEdgeCloudTransport.cs
| 항목 | 값 |
|---|---|
| TransportType | "mqtt" |
| 라이브러리 | MQTTnet |
| QoS | AtLeastOnce (QoS 1) |
| LWT | caffeine/{tenantId}/{driverId}/status → "offline" |
| TLS | TransportOptions.UseTls로 제어 |
토픽 구조 (테넌트 인식):
| 토픽 패턴 | 방향 | 설명 |
|---|---|---|
caffeine/{tenantId}/{driverId}/data | Outbound | 드라이버 데이터 전송 (기본) |
caffeine/{tenantId}/devices/{deviceId}/data | Outbound | DeviceId 기반 장치별 라우팅 |
caffeine/{tenantId}/{driverId}/command | Inbound | Engine 명령 수신 (구독) |
caffeine/{tenantId}/{driverId}/status | Status | 온라인/오프라인 상태 (Retained) |
DeviceId 라우팅 로직: TransportMessage.DeviceId가 설정된 경우 devices/{deviceId} 서브토픽으로 자동 라우팅. 미설정 시 기존 {driverId}/data 토픽 사용.
SqliteOfflineBuffer
SQLite 기반 디스크 오프라인 버퍼. 네트워크 단절 시 메시지를 영속 저장하고, 재연결 시 FIFO 순서로 전송한다.
위치: src/Caffeine.Bridge.Host/Transport/SqliteOfflineBuffer.cs
| 항목 | 기본값 | 설명 |
|---|---|---|
| 최대 크기 | 100 MB | 초과 시 가장 오래된 메시지부터 삭제 |
| 메시지 TTL | 24시간 | 만료 시 자동 정리 |
| 정렬 | FIFO (Id ASC) | 순서 보장 전송 |
| 자동 정리 | EnqueueAsync 호출 시 | TTL 만료 + 크기 제한 동시 적용 |
SQLite 테이블 스키마 (R-1):
CREATE TABLE OfflineMessages (
Id INTEGER PRIMARY KEY AUTOINCREMENT,
DriverId TEXT NOT NULL,
Topic TEXT NOT NULL,
Payload BLOB NOT NULL,
Direction INTEGER NOT NULL,
Timestamp TEXT NOT NULL,
Headers TEXT,
PayloadSize INTEGER NOT NULL,
CreatedAt TEXT NOT NULL DEFAULT (datetime('now')),
Priority INTEGER NOT NULL DEFAULT 0,
DeviceId TEXT
);
자동 마이그레이션 (R-1):
SqliteOfflineBuffer초기화 시 기존 DB에Priority,DeviceId컬럼이 없으면ALTER TABLE로 자동 추가합니다. 기존 데이터 손실 없음.
Flush 동작:
- FIFO 순서로 메시지 조회
IEdgeCloudTransport.SendAsync()로 전송- 전송 성공한 메시지만 삭제
- 전송 실패 시 중단 (순서 보장)
설정 예시
driver_settings.json
{
"Transport": {
"Type": "grpc",
"ServerUrl": "http://engine-host:5050",
"ApiKey": "your-api-key",
"UseTls": false,
"ConnectTimeout": "00:00:30",
"MaxRetryAttempts": 5,
"RetryBaseDelay": "00:00:02"
},
"OfflineBuffer": {
"DbPath": "offline_buffer.db",
"MaxSizeMB": 100,
"MessageTtlHours": 24,
"Enabled": true
}
}
DI 등록 예시:
// Program.cs (Bridge.Host)
services.Configure<OfflineBufferOptions>(
configuration.GetSection("OfflineBuffer"));
services.AddSingleton<IOfflineBuffer, SqliteOfflineBuffer>();
사용 예시
Transport 교체
// gRPC → MQTT 교체: DI 등록만 변경
services.AddSingleton<IEdgeCloudTransport, MqttEdgeCloudTransport>();
// 또는
services.AddSingleton<IEdgeCloudTransport, GrpcEdgeCloudTransport>();
오프라인 버퍼 활용
// 전송 실패 시 버퍼링
if (!await transport.SendAsync(message, ct))
{
await offlineBuffer.EnqueueAsync(message, ct);
}
// 재연결 시 Flush
transport.StateChanges
.Where(s => s == TransportState.Connected)
.Subscribe(async _ => await offlineBuffer.FlushAsync(transport));
참고 자료
- 멀티테넌시 API — 테넌트 인식 통신 관련
- 아키텍처 가이드 — Edge↔Cloud 통신 아키텍처 섹션
- Changelog v2.4.0 — 통신 추상화 변경 이력
작성일: 2026-02-27 버전: 2.5