본문으로 건너뛰기

Edge↔Cloud 통신 API

Caffeine Edge↔Cloud 통신 추상화 시스템의 인터페이스, 타입, 구현체 레퍼런스이다.

개요

Caffeine Bridge(Edge)와 Engine(Cloud) 간의 통신을 추상화하여, gRPC/MQTT/Hybrid 전송 프로토콜을 런타임에 교체할 수 있다. 네트워크 단절 시 IOfflineBuffer가 메시지를 디스크에 영속 저장하고, 재연결 시 FIFO 순서로 자동 복원한다.

Core 인터페이스

IEdgeCloudTransport

Edge↔Cloud 통합 전송 인터페이스. IAsyncDisposable을 구현한다.

위치: src/Caffeine.Core/Abstractions/Communication/IEdgeCloudTransport.cs

public interface IEdgeCloudTransport : IAsyncDisposable
{
/// 전송 프로토콜 타입 ("grpc", "mqtt", "hybrid")
string TransportType { get; }

/// 현재 연결 상태
bool IsConnected { get; }

/// 연결 수립
Task ConnectAsync(TransportOptions options, CancellationToken ct = default);

/// 연결 해제
Task DisconnectAsync(CancellationToken ct = default);

/// 데이터 전송 (Bridge → Engine)
Task<bool> SendAsync(TransportMessage message, CancellationToken ct = default);

/// 명령 수신 스트림 (Engine → Bridge)
IObservable<TransportMessage> IncomingMessages { get; }

/// 상태 변경 스트림
IObservable<TransportState> StateChanges { get; }

/// 전송 메트릭 조회
TransportMetrics GetMetrics();
}

System.Reactive 통합: IncomingMessagesStateChangesIObservable<T>을 반환하여, Rx 파이프라인으로 메시지를 처리할 수 있다.

IOfflineBuffer

오프라인 메시지 버퍼 인터페이스. 네트워크 연결이 끊긴 동안 메시지를 영속적으로 저장하고, 재연결 시 FIFO 순서로 전송한다.

위치: src/Caffeine.Core/Abstractions/Communication/IOfflineBuffer.cs

public interface IOfflineBuffer : IAsyncDisposable
{
/// 메시지를 버퍼에 추가
Task EnqueueAsync(TransportMessage message, CancellationToken ct = default);

/// 버퍼의 모든 메시지를 전송 채널로 Flush (전송 성공 건수 반환)
Task<int> FlushAsync(IEdgeCloudTransport transport, CancellationToken ct = default);

/// 대기 중인 메시지 수
int PendingCount { get; }

/// 대기 중인 메시지의 총 바이트 크기
long PendingBytes { get; }

/// 현재 적용 중인 버퍼 설정
OfflineBufferOptions Options { get; }
}

OfflineBufferOptions

오프라인 버퍼 설정 sealed record. IOptions<OfflineBufferOptions> 주입 방식으로 DI 등록한다.

위치: src/Caffeine.Core/Abstractions/Communication/OfflineBufferOptions.cs

public sealed record OfflineBufferOptions
{
/// SQLite DB 파일 경로 (기본: "offline_buffer.db")
public string DbPath { get; init; } = "offline_buffer.db";

/// 최대 버퍼 크기 MB (기본: 100)
public int MaxSizeMB { get; init; } = 100;

/// 메시지 TTL 시간 (기본: 24시간)
public int MessageTtlHours { get; init; } = 24;

/// 오프라인 버퍼 활성화 여부 (기본: true)
public bool Enabled { get; init; } = true;
}
속성기본값설명
DbPath"offline_buffer.db"SQLite DB 파일 경로
MaxSizeMB100최대 버퍼 크기 (MB), 초과 시 오래된 메시지 삭제
MessageTtlHours24메시지 TTL (시간), 만료 시 자동 정리
Enabledtrue오프라인 버퍼 활성화 여부

DI 등록 예시:

services.Configure<OfflineBufferOptions>(
configuration.GetSection("OfflineBuffer"));
services.AddSingleton<IOfflineBuffer, SqliteOfflineBuffer>();

타입 정의

TransportMessage

Edge↔Cloud 전송 메시지 record. 드라이버 데이터(Bridge→Engine)와 명령(Engine→Bridge)을 통합 표현한다.

위치: src/Caffeine.Core/Abstractions/Communication/TransportMessage.cs

public sealed record TransportMessage(
string DriverId,
string Topic,
ReadOnlyMemory<byte> Payload,
MessageDirection Direction,
DateTimeOffset Timestamp,
Dictionary<string, string>? Headers = null,
int Priority = 0,
string? DeviceId = null)
{
/// 데이터 전송용 팩토리 메서드 (Bridge → Engine)
static TransportMessage CreateOutbound(string driverId, string topic, ReadOnlyMemory<byte> payload);

/// 명령 수신용 팩토리 메서드 (Engine → Bridge)
static TransportMessage CreateInbound(string driverId, string topic, ReadOnlyMemory<byte> payload);

/// 우선순위 지정 Outbound 메시지
static TransportMessage CreateOutboundWithPriority(
string driverId, string topic, ReadOnlyMemory<byte> payload, int priority);

/// DeviceId 지정 Outbound 메시지 — MQTT 토픽 라우팅용
static TransportMessage CreateOutboundForDevice(
string driverId, string deviceId, string topic, ReadOnlyMemory<byte> payload);
}

MessagePriority

우선순위 정적 상수 클래스.

public static class MessagePriority
{
public const int Normal = 0; // 기본 우선순위
public const int High = 1; // 높은 우선순위
public const int Critical = 2; // 긴급 메시지
}

MessageDirection

public enum MessageDirection
{
Outbound, // Bridge → Engine (데이터 전송)
Inbound // Engine → Bridge (명령 수신)
}

TransportOptions

전송 채널 설정.

위치: src/Caffeine.Core/Abstractions/Communication/TransportOptions.cs

public class TransportOptions
{
/// Engine(서버) 주소
public string ServerUrl { get; set; } = "http://localhost:5000";

/// 인증 API 키
public string? ApiKey { get; set; }

/// TLS 사용 여부
public bool UseTls { get; set; }

/// 연결 타임아웃 (기본 30초)
public TimeSpan ConnectTimeout { get; set; } = TimeSpan.FromSeconds(30);

/// 최대 재시도 횟수 (기본 5회)
public int MaxRetryAttempts { get; set; } = 5;

/// 재시도 기본 대기 시간 (기본 2초, 지수 백오프 적용)
public TimeSpan RetryBaseDelay { get; set; } = TimeSpan.FromSeconds(2);
}

TransportMetrics

전송 채널 메트릭 record.

위치: src/Caffeine.Core/Abstractions/Communication/TransportMetrics.cs

public sealed record TransportMetrics(
long MessagesSent,
long MessagesReceived,
long BytesSent,
long BytesReceived,
long SendErrors,
int ReconnectCount,
DateTimeOffset? LastConnectedAt,
DateTimeOffset? LastDisconnectedAt,
double AvgLatencyMs = 0,
double P99LatencyMs = 0,
int QueueDepth = 0);
추가 속성타입설명
AvgLatencyMsdouble평균 전송 레이턴시 (ms)
P99LatencyMsdouble99번째 백분위 레이턴시 (ms)
QueueDepthint현재 전송 큐 깊이

TransportState

전송 채널 상태 머신.

위치: src/Caffeine.Core/Abstractions/Communication/TransportState.cs

public enum TransportState
{
Disconnected, // 초기 상태, 연결 전
Connecting, // 연결 시도 중
Connected, // 연결됨, 정상 운영
Reconnecting, // 재연결 시도 중
Faulted // 오류로 인한 연결 실패
}

상태 전이:

구현체

GrpcEdgeCloudTransport

기존 gRPC NativeBridgeServiceIEdgeCloudTransport로 래핑한 구현체. LAN 환경에서 고성능 양방향 스트리밍을 제공한다.

위치: src/Caffeine.Bridge.Host/Transport/GrpcEdgeCloudTransport.cs

항목
TransportType"grpc"
프로토콜gRPC h2c (HTTP/2 unencrypted)
재시도Polly 지수 백오프 (2s, 4s, 8s, 16s, 32s)
스트리밍AsyncDuplexStreamingCall (양방향)
직렬화Protobuf (IoPacket, ControlCommand)

수신 명령 처리:

  • ScanConfig: 드라이버 스캔 계획 업데이트 (IIOScheduler.UpdateScanPlan)
  • WriteRequest: 드라이버 태그 쓰기 (IIOScheduler.ScheduleWriteAsync)

레이턴시 메트릭 수집 (R-3):

  • SendAsync() 호출 시 _totalLatencyMs, _latencySamples 누적 → AvgLatencyMs 계산
  • _p99LatencyMs 슬라이딩 윈도우 방식으로 추정
  • _queueDepth Channel의 현재 메시지 수 반영

MqttEdgeCloudTransport

MQTTnet 기반 IEdgeCloudTransport 구현체. WAN 환경에서 MQTT 브로커를 통해 통신한다.

위치: src/Caffeine.Bridge.Host/Transport/MqttEdgeCloudTransport.cs

항목
TransportType"mqtt"
라이브러리MQTTnet
QoSAtLeastOnce (QoS 1)
LWTcaffeine/{tenantId}/{driverId}/status"offline"
TLSTransportOptions.UseTls로 제어

토픽 구조 (테넌트 인식):

토픽 패턴방향설명
caffeine/{tenantId}/{driverId}/dataOutbound드라이버 데이터 전송 (기본)
caffeine/{tenantId}/devices/{deviceId}/dataOutboundDeviceId 기반 장치별 라우팅
caffeine/{tenantId}/{driverId}/commandInboundEngine 명령 수신 (구독)
caffeine/{tenantId}/{driverId}/statusStatus온라인/오프라인 상태 (Retained)

DeviceId 라우팅 로직: TransportMessage.DeviceId가 설정된 경우 devices/{deviceId} 서브토픽으로 자동 라우팅. 미설정 시 기존 {driverId}/data 토픽 사용.

SqliteOfflineBuffer

SQLite 기반 디스크 오프라인 버퍼. 네트워크 단절 시 메시지를 영속 저장하고, 재연결 시 FIFO 순서로 전송한다.

위치: src/Caffeine.Bridge.Host/Transport/SqliteOfflineBuffer.cs

항목기본값설명
최대 크기100 MB초과 시 가장 오래된 메시지부터 삭제
메시지 TTL24시간만료 시 자동 정리
정렬FIFO (Id ASC)순서 보장 전송
자동 정리EnqueueAsync 호출 시TTL 만료 + 크기 제한 동시 적용

SQLite 테이블 스키마 (R-1):

CREATE TABLE OfflineMessages (
Id INTEGER PRIMARY KEY AUTOINCREMENT,
DriverId TEXT NOT NULL,
Topic TEXT NOT NULL,
Payload BLOB NOT NULL,
Direction INTEGER NOT NULL,
Timestamp TEXT NOT NULL,
Headers TEXT,
PayloadSize INTEGER NOT NULL,
CreatedAt TEXT NOT NULL DEFAULT (datetime('now')),
Priority INTEGER NOT NULL DEFAULT 0,
DeviceId TEXT
);

자동 마이그레이션 (R-1): SqliteOfflineBuffer 초기화 시 기존 DB에 Priority, DeviceId 컬럼이 없으면 ALTER TABLE로 자동 추가합니다. 기존 데이터 손실 없음.

Flush 동작:

  1. FIFO 순서로 메시지 조회
  2. IEdgeCloudTransport.SendAsync()로 전송
  3. 전송 성공한 메시지만 삭제
  4. 전송 실패 시 중단 (순서 보장)

설정 예시

driver_settings.json

{
"Transport": {
"Type": "grpc",
"ServerUrl": "http://engine-host:5050",
"ApiKey": "your-api-key",
"UseTls": false,
"ConnectTimeout": "00:00:30",
"MaxRetryAttempts": 5,
"RetryBaseDelay": "00:00:02"
},
"OfflineBuffer": {
"DbPath": "offline_buffer.db",
"MaxSizeMB": 100,
"MessageTtlHours": 24,
"Enabled": true
}
}

DI 등록 예시:

// Program.cs (Bridge.Host)
services.Configure<OfflineBufferOptions>(
configuration.GetSection("OfflineBuffer"));
services.AddSingleton<IOfflineBuffer, SqliteOfflineBuffer>();

사용 예시

Transport 교체

// gRPC → MQTT 교체: DI 등록만 변경
services.AddSingleton<IEdgeCloudTransport, MqttEdgeCloudTransport>();
// 또는
services.AddSingleton<IEdgeCloudTransport, GrpcEdgeCloudTransport>();

오프라인 버퍼 활용

// 전송 실패 시 버퍼링
if (!await transport.SendAsync(message, ct))
{
await offlineBuffer.EnqueueAsync(message, ct);
}

// 재연결 시 Flush
transport.StateChanges
.Where(s => s == TransportState.Connected)
.Subscribe(async _ => await offlineBuffer.FlushAsync(transport));

참고 자료


작성일: 2026-02-27 버전: 2.5