본문으로 건너뛰기

실시간 통신 API (gRPC & SignalR)

개요

Caffeine은 두 가지 실시간 통신 프로토콜을 제공합니다:

  • gRPC: Engine-Bridge 간 양방향 스트리밍 통신 (OT 데이터 수집 및 제어)
  • SignalR: Admin Dashboard 및 외부 클라이언트로의 실시간 Push 알림

아키텍처

Bridge (PLC/OT 장비)
↓ gRPC Streaming
Engine (데이터 처리 허브)
↓ SignalR Push
Admin Dashboard / 외부 클라이언트

gRPC 서비스

1. NativeBridgeService

Engine과 Bridge 간의 양방향 스트리밍 통신을 담당합니다.

엔드포인트: Engine의 gRPC 포트 (기본: 5001)

메서드

StreamIo (Legacy)

단건 IoPacket을 스트리밍합니다. (기존 호환성 유지)

요청 (Stream)

message IoPacket {
string driver_id = 1;
int64 timestamp = 2;
oneof payload {
bytes raw_bytes = 3;
}
int32 block_index = 4;
}

응답 (Stream)

message ControlCommand {
oneof command {
ScanConfig scan_config = 1;
WriteTagRequest write_request = 2;
}
}
StreamBatchIo (권장)

여러 태그 값을 배치로 묶어 전송하는 고성능 메서드입니다. 🚀

요청 (Stream)

message BatchIoPacket {
string driver_id = 1;
repeated BatchItem items = 2;
}

message BatchItem {
int64 timestamp = 1;
bytes raw_bytes = 2;
int32 block_index = 3;
}

응답 (Stream)

message ControlCommand {
oneof command {
ScanConfig scan_config = 1;
WriteTagRequest write_request = 2;
}
}

특징

  • 수천 개의 태그를 한 번에 전송 가능
  • 네트워크 오버헤드 최소화
  • 처리 성능 향상

제어 명령

ScanConfig: Bridge에게 스캔 대상 블록 전달

message ScanConfig {
repeated ScanBlock blocks = 1;
}

message ScanBlock {
string driver_id = 1;
int32 start_address = 2;
int32 length = 3;
}

WriteTagRequest: 태그 쓰기 명령

message WriteTagRequest {
string driver_id = 1;
string address = 2;
bytes data = 3;
}

2. CaffeineApi (REST 연동)

외부 클라이언트를 위한 gRPC 서비스입니다. (JSON Transcoding 지원)

엔드포인트: Engine의 gRPC 포트 (기본: 5001)

GetHealth

시스템 상태를 확인합니다.

요청

message Empty {}

응답

message HealthStatus {
string status = 1; // "Healthy"
string version = 2; // "2.0.0"
int64 uptime_seconds = 3; // 가동 시간 (초)
}

REST 경로

GET /api/v1/health

WriteTag

태그 값을 씁니다.

요청

message WriteRequest {
string driver_id = 1; // 드라이버 ID (예: "Sim-PLC-01")
string address = 2; // 태그 주소 (예: "D100")
string value = 3; // JSON 친화적 문자열 값
}

응답

message WriteResponse {
bool success = 1;
string message = 2;
}

REST 경로

POST /api/v1/tags/{driver_id}/{address}/write
Content-Type: application/json

{
"value": "123"
}

SignalR Hubs

1. AlarmHub

알람 실시간 브로드캐스팅을 담당합니다.

엔드포인트: /hubs/alarms

인증: 필요 ([Authorize])

서버 → 클라이언트 이벤트

ReceiveAlarm

알람이 발생할 때 모든 연결된 클라이언트에게 브로드캐스트됩니다.

파라미터

이름타입설명
messagestring알람 메시지
severitystring심각도 ("Info", "Warning", "Error")
timestampDateTime발생 시각

2. MonitoringHub

실시간 태그 값 스트리밍 및 설비 상태 모니터링을 담당합니다.

엔드포인트: /hubs/monitoring

인증: 필요 ([Authorize])

클라이언트 → 서버 메서드

SubscribeTagValues

태그 값 실시간 구독을 시작합니다.

파라미터

List<string> tagNames

예시

await connection.invoke("SubscribeTagValues", ["PLC01.D100", "PLC01.D200"]);
UnsubscribeTagValues

태그 값 구독을 해제합니다.

파라미터

List<string> tagNames
SubscribeEquipmentStatus

설비 상태 실시간 구독을 시작합니다.

파라미터

string equipmentId
UnsubscribeEquipmentStatus

설비 상태 구독을 해제합니다.

파라미터

string equipmentId
SubscribePredictiveMaintenance

예측 정비 알림을 구독합니다.

파라미터: 없음

UnsubscribePredictiveMaintenance

예측 정비 알림 구독을 해제합니다.

파라미터: 없음

GetSubscribedTags

현재 구독 중인 태그 목록을 조회합니다. (디버깅용)

반환값

HashSet<string>

서버 → 클라이언트 이벤트

ReceiveTagUpdate

구독한 태그의 값이 변경될 때 발생합니다.

파라미터

이름타입설명
driverIdstring드라이버 ID
addressstring태그 주소
valuestring변경된 값
timestampDateTime변경 시각

사용 예제

gRPC 클라이언트 (C#)

CaffeineClient SDK 사용

using Caffeine.Client;

// 클라이언트 생성
var client = new CaffeineClient();

// 연결
await client.ConnectAsync("https://localhost:5001");

// Health 체크
var health = await client.GetHealthAsync();
Console.WriteLine($"Status: {health.Status}, Version: {health.Version}");

// 태그 쓰기
bool success = await client.WriteTagAsync("Sim-PLC-01", "D100", 123);
Console.WriteLine($"Write Success: {success}");

// 리소스 정리
await client.DisposeAsync();

직접 gRPC 호출

using Grpc.Net.Client;
using Caffeine.IPC.Grpc;

var channel = GrpcChannel.ForAddress("https://localhost:5001");
var grpcClient = new CaffeineApi.CaffeineApiClient(channel);

var response = await grpcClient.WriteTagAsync(new WriteRequest
{
DriverId = "Sim-PLC-01",
Address = "D100",
Value = "456"
});

Console.WriteLine($"Success: {response.Success}, Message: {response.Message}");

SignalR 클라이언트 (C#)

AlarmHub 연결

using Microsoft.AspNetCore.SignalR.Client;

var connection = new HubConnectionBuilder()
.WithUrl("https://localhost:5001/hubs/alarms", options =>
{
options.AccessTokenProvider = () => Task.FromResult("YOUR_JWT_TOKEN");
})
.WithAutomaticReconnect()
.Build();

// 알람 수신 이벤트 등록
connection.On<string, string, DateTime>("ReceiveAlarm", (message, severity, timestamp) =>
{
Console.WriteLine($"[{severity}] {message} at {timestamp}");
});

// 연결
await connection.StartAsync();
Console.WriteLine("Connected to AlarmHub");

// 연결 유지...

// 종료
await connection.DisposeAsync();

MonitoringHub 연결 및 태그 구독

using Microsoft.AspNetCore.SignalR.Client;

var connection = new HubConnectionBuilder()
.WithUrl("https://localhost:5001/hubs/monitoring", options =>
{
options.AccessTokenProvider = () => Task.FromResult("YOUR_JWT_TOKEN");
})
.WithAutomaticReconnect()
.Build();

// 태그 값 변경 이벤트 등록
connection.On<string, string, string, DateTime>("ReceiveTagUpdate",
(driverId, address, value, timestamp) =>
{
Console.WriteLine($"{driverId}.{address} = {value} at {timestamp}");
});

// 연결
await connection.StartAsync();

// 태그 구독
await connection.InvokeAsync("SubscribeTagValues", new List<string>
{
"PLC01.D100",
"PLC01.D200"
});

// 설비 상태 구독
await connection.InvokeAsync("SubscribeEquipmentStatus", "MACHINE-001");

// 예측 정비 알림 구독
await connection.InvokeAsync("SubscribePredictiveMaintenance");

// 연결 유지...

// 구독 해제
await connection.InvokeAsync("UnsubscribeTagValues", new List<string> { "PLC01.D100" });

// 종료
await connection.DisposeAsync();

SignalR 클라이언트 (JavaScript)

AlarmHub 연결

import * as signalR from "@microsoft/signalr";

const connection = new signalR.HubConnectionBuilder()
.withUrl("https://localhost:5001/hubs/alarms", {
accessTokenFactory: () => "YOUR_JWT_TOKEN"
})
.withAutomaticReconnect()
.build();

// 알람 수신 이벤트 등록
connection.on("ReceiveAlarm", (message, severity, timestamp) => {
console.log(`[${severity}] ${message} at ${timestamp}`);
});

// 연결
await connection.start();
console.log("Connected to AlarmHub");

// 연결 유지...

// 종료
await connection.stop();

MonitoringHub 연결 및 태그 구독

import * as signalR from "@microsoft/signalr";

const connection = new signalR.HubConnectionBuilder()
.withUrl("https://localhost:5001/hubs/monitoring", {
accessTokenFactory: () => "YOUR_JWT_TOKEN"
})
.withAutomaticReconnect()
.build();

// 태그 값 변경 이벤트 등록
connection.on("ReceiveTagUpdate", (driverId, address, value, timestamp) => {
console.log(`${driverId}.${address} = ${value} at ${timestamp}`);
});

// 연결
await connection.start();

// 태그 구독
await connection.invoke("SubscribeTagValues", ["PLC01.D100", "PLC01.D200"]);

// 설비 상태 구독
await connection.invoke("SubscribeEquipmentStatus", "MACHINE-001");

// 예측 정비 알림 구독
await connection.invoke("SubscribePredictiveMaintenance");

// 연결 유지...

// 구독 해제
await connection.invoke("UnsubscribeTagValues", ["PLC01.D100"]);

// 종료
await connection.stop();

curl (REST API via gRPC JSON Transcoding)

Health 체크

curl -X GET "https://localhost:5001/api/v1/health"

응답

{
"status": "Healthy",
"version": "2.0.0",
"uptimeSeconds": 3600
}

태그 쓰기

curl -X POST "https://localhost:5001/api/v1/tags/Sim-PLC-01/D100/write" \
-H "Content-Type: application/json" \
-d '{"value": "789"}'

응답

{
"success": true,
"message": "Write Command Queued"
}

통합 클라이언트 (CaffeineClient)

Caffeine.Client 패키지는 gRPC, SignalR, GraphQL을 통합한 올인원 클라이언트입니다.

설치

dotnet add package NEXCODE.Caffeine.Client

통합 사용 예제

using Caffeine.Client;

var client = new CaffeineClient();

// 연결 (gRPC + SignalR 동시 연결)
await client.ConnectAsync("https://localhost:5001");

// 연결 상태 모니터링
client.ConnectionStateChanged += (sender, e) =>
{
Console.WriteLine($"Connection: {e.State} - {e.Reason}");
};

// 알람 수신
client.AlarmReceived += (sender, e) =>
{
Console.WriteLine($"[{e.Severity}] {e.Message}");
};

// 태그 값 변경 수신
client.TagValueChanged += (sender, e) =>
{
Console.WriteLine($"{e.DriverId}.{e.Address} = {e.Value}");
};

// 태그 구독 (SignalR)
await client.SubscribeTagAsync("Sim-PLC-01", "D100");

// 태그 읽기 (GraphQL)
var tagData = await client.ReadTagAsync("Sim-PLC-01", "D100");
Console.WriteLine($"Current Value: {tagData?.Value}");

// 태그 쓰기 (gRPC)
bool success = await client.WriteTagAsync("Sim-PLC-01", "D100", 999);

// Health 체크 (gRPC)
var health = await client.GetHealthAsync();
Console.WriteLine($"Status: {health.Status}, Uptime: {health.UptimeSeconds}s");

// 종료
await client.DisposeAsync();

응답 코드

gRPC 상태 코드

코드설명
OK요청 성공
CANCELLED요청 취소됨
INVALID_ARGUMENT잘못된 파라미터
DEADLINE_EXCEEDED타임아웃
NOT_FOUND드라이버 또는 태그를 찾을 수 없음
UNAVAILABLE서비스 연결 불가
UNAUTHENTICATED인증 실패

SignalR 연결 상태

상태설명
Connected연결됨
Connecting연결 시도 중
Reconnecting재연결 중
Disconnected연결 끊김

보안 및 인증

gRPC

gRPC 서비스는 기본적으로 TLS를 통한 암호화 통신을 사용합니다.

  • Engine ↔ Bridge: mTLS (상호 인증서 검증) 권장
  • 외부 클라이언트: JWT 토큰 기반 인증 (메타데이터 헤더)
var headers = new Metadata
{
{ "Authorization", $"Bearer {jwtToken}" }
};

var call = grpcClient.WriteTagAsync(request, headers);

SignalR

SignalR Hub는 [Authorize] 속성으로 보호됩니다.

  • JWT 토큰: AccessTokenProvider를 통해 전달
.WithUrl("https://localhost:5001/hubs/monitoring", options =>
{
options.AccessTokenProvider = () => Task.FromResult(GetJwtToken());
})

성능 최적화

gRPC Streaming

  • 배치 전송: StreamBatchIo 사용 권장 (수천 개 태그를 한 번에 전송)
  • 압축: WriteFlags.NoCompress 옵션으로 압축 비활성화 (저지연 요구사항)
  • 버퍼링: DisableBuffering() 호출로 즉시 플러시

SignalR

  • 그룹 구독: 불필요한 전체 브로드캐스트 방지
  • 자동 재연결: WithAutomaticReconnect() 활성화
  • 백프레셔: 클라이언트 처리 속도에 맞춰 서버 전송 제어

문제 해결

gRPC 연결 실패

증상: Grpc.Core.RpcException: Status(StatusCode="Unavailable")

해결:

  1. Engine 서비스가 실행 중인지 확인
  2. 방화벽에서 gRPC 포트 개방 확인
  3. TLS 인증서 유효성 확인

SignalR 재연결 실패

증상: HubConnection 상태가 Disconnected로 유지됨

해결:

  1. JWT 토큰 만료 확인 (갱신 필요)
  2. 서버 재시작 여부 확인
  3. 네트워크 연결 안정성 확인

태그 업데이트 수신 안 됨

증상: ReceiveTagUpdate 이벤트가 발생하지 않음

해결:

  1. SubscribeTagValues 호출 확인
  2. 태그 이름 정확성 확인 (대소문자 구분)
  3. Engine에서 해당 드라이버 활성화 상태 확인