실시간 통신 API (gRPC & SignalR)
개요
Caffeine은 두 가지 실시간 통신 프로토콜을 제공합니다:
- gRPC: Engine-Bridge 간 양방향 스트리밍 통신 (OT 데이터 수집 및 제어)
- SignalR: Admin Dashboard 및 외부 클라이언트로의 실시간 Push 알림
아키텍처
Bridge (PLC/OT 장비)
↓ gRPC Streaming
Engine (데이터 처리 허브)
↓ SignalR Push
Admin Dashboard / 외부 클라이언트
gRPC 서비스
1. NativeBridgeService
Engine과 Bridge 간의 양방향 스트리밍 통신을 담당합니다.
엔드포인트: Engine의 gRPC 포트 (기본: 5001)
메서드
StreamIo (Legacy)
단건 IoPacket을 스트리밍합니다. (기존 호환성 유지)
요청 (Stream)
message IoPacket {
string driver_id = 1;
int64 timestamp = 2;
oneof payload {
bytes raw_bytes = 3;
}
int32 block_index = 4;
}
응답 (Stream)
message ControlCommand {
oneof command {
ScanConfig scan_config = 1;
WriteTagRequest write_request = 2;
}
}
StreamBatchIo (권장)
여러 태그 값을 배치로 묶어 전송하는 고성능 메서드입니다. 🚀
요청 (Stream)
message BatchIoPacket {
string driver_id = 1;
repeated BatchItem items = 2;
}
message BatchItem {
int64 timestamp = 1;
bytes raw_bytes = 2;
int32 block_index = 3;
}
응답 (Stream)
message ControlCommand {
oneof command {
ScanConfig scan_config = 1;
WriteTagRequest write_request = 2;
}
}
특징
- 수천 개의 태그를 한 번에 전송 가능
- 네트워크 오버헤드 최소화
- 처리 성능 향상
제어 명령
ScanConfig: Bridge에게 스캔 대상 블록 전달
message ScanConfig {
repeated ScanBlock blocks = 1;
}
message ScanBlock {
string driver_id = 1;
int32 start_address = 2;
int32 length = 3;
}
WriteTagRequest: 태그 쓰기 명령
message WriteTagRequest {
string driver_id = 1;
string address = 2;
bytes data = 3;
}
2. CaffeineApi (REST 연동)
외부 클라이언트를 위한 gRPC 서비스입니다. (JSON Transcoding 지원)
엔드포인트: Engine의 gRPC 포트 (기본: 5001)
GetHealth
시스템 상태를 확인합니다.
요청
message Empty {}
응답
message HealthStatus {
string status = 1; // "Healthy"
string version = 2; // "2.0.0"
int64 uptime_seconds = 3; // 가동 시간 (초)
}
REST 경로
GET /api/v1/health
WriteTag
태그 값을 씁니다.
요청
message WriteRequest {
string driver_id = 1; // 드라이버 ID (예: "Sim-PLC-01")
string address = 2; // 태그 주소 (예: "D100")
string value = 3; // JSON 친화적 문자열 값
}
응답
message WriteResponse {
bool success = 1;
string message = 2;
}
REST 경로
POST /api/v1/tags/{driver_id}/{address}/write
Content-Type: application/json
{
"value": "123"
}
SignalR Hubs
1. AlarmHub
알람 실시간 브로드캐스팅을 담당합니다.
엔드포인트: /hubs/alarms
인증: 필요 ([Authorize])
서버 → 클라이언트 이벤트
ReceiveAlarm
알람이 발생할 때 모든 연결된 클라이언트에게 브로드캐스트됩니다.
파라미터
| 이름 | 타입 | 설명 |
|---|---|---|
| message | string | 알람 메시지 |
| severity | string | 심각도 ("Info", "Warning", "Error") |
| timestamp | DateTime | 발생 시각 |
2. MonitoringHub
실시간 태그 값 스트리밍 및 설비 상태 모니터링을 담당합니다.
엔드포인트: /hubs/monitoring
인증: 필요 ([Authorize])
클라이언트 → 서버 메서드
SubscribeTagValues
태그 값 실시간 구독을 시작합니다.
파라미터
List<string> tagNames
예시
await connection.invoke("SubscribeTagValues", ["PLC01.D100", "PLC01.D200"]);
UnsubscribeTagValues
태그 값 구독을 해제합니다.
파라미터
List<string> tagNames
SubscribeEquipmentStatus
설비 상태 실시간 구독을 시작합니다.
파라미터
string equipmentId
UnsubscribeEquipmentStatus
설비 상태 구독을 해제합니다.
파라미터
string equipmentId
SubscribePredictiveMaintenance
예측 정비 알림을 구독합니다.
파라미터: 없음
UnsubscribePredictiveMaintenance
예측 정비 알림 구독을 해제합니다.
파라미터: 없음
GetSubscribedTags
현재 구독 중인 태그 목록을 조회합니다. (디버깅용)
반환값
HashSet<string>
서버 → 클라이언트 이벤트
ReceiveTagUpdate
구독한 태그의 값이 변경될 때 발생합니다.
파라미터
| 이름 | 타입 | 설명 |
|---|---|---|
| driverId | string | 드라이버 ID |
| address | string | 태그 주소 |
| value | string | 변경된 값 |
| timestamp | DateTime | 변경 시각 |
사용 예제
gRPC 클라이언트 (C#)
CaffeineClient SDK 사용
using Caffeine.Client;
// 클라이언트 생성
var client = new CaffeineClient();
// 연결
await client.ConnectAsync("https://localhost:5001");
// Health 체크
var health = await client.GetHealthAsync();
Console.WriteLine($"Status: {health.Status}, Version: {health.Version}");
// 태그 쓰기
bool success = await client.WriteTagAsync("Sim-PLC-01", "D100", 123);
Console.WriteLine($"Write Success: {success}");
// 리소스 정리
await client.DisposeAsync();
직접 gRPC 호출
using Grpc.Net.Client;
using Caffeine.IPC.Grpc;
var channel = GrpcChannel.ForAddress("https://localhost:5001");
var grpcClient = new CaffeineApi.CaffeineApiClient(channel);
var response = await grpcClient.WriteTagAsync(new WriteRequest
{
DriverId = "Sim-PLC-01",
Address = "D100",
Value = "456"
});
Console.WriteLine($"Success: {response.Success}, Message: {response.Message}");
SignalR 클라이언트 (C#)
AlarmHub 연결
using Microsoft.AspNetCore.SignalR.Client;
var connection = new HubConnectionBuilder()
.WithUrl("https://localhost:5001/hubs/alarms", options =>
{
options.AccessTokenProvider = () => Task.FromResult("YOUR_JWT_TOKEN");
})
.WithAutomaticReconnect()
.Build();
// 알람 수신 이벤트 등록
connection.On<string, string, DateTime>("ReceiveAlarm", (message, severity, timestamp) =>
{
Console.WriteLine($"[{severity}] {message} at {timestamp}");
});
// 연결
await connection.StartAsync();
Console.WriteLine("Connected to AlarmHub");
// 연결 유지...
// 종료
await connection.DisposeAsync();
MonitoringHub 연결 및 태그 구독
using Microsoft.AspNetCore.SignalR.Client;
var connection = new HubConnectionBuilder()
.WithUrl("https://localhost:5001/hubs/monitoring", options =>
{
options.AccessTokenProvider = () => Task.FromResult("YOUR_JWT_TOKEN");
})
.WithAutomaticReconnect()
.Build();
// 태그 값 변경 이벤트 등록
connection.On<string, string, string, DateTime>("ReceiveTagUpdate",
(driverId, address, value, timestamp) =>
{
Console.WriteLine($"{driverId}.{address} = {value} at {timestamp}");
});
// 연결
await connection.StartAsync();
// 태그 구독
await connection.InvokeAsync("SubscribeTagValues", new List<string>
{
"PLC01.D100",
"PLC01.D200"
});
// 설비 상태 구독
await connection.InvokeAsync("SubscribeEquipmentStatus", "MACHINE-001");
// 예측 정비 알림 구독
await connection.InvokeAsync("SubscribePredictiveMaintenance");
// 연결 유지...
// 구독 해제
await connection.InvokeAsync("UnsubscribeTagValues", new List<string> { "PLC01.D100" });
// 종료
await connection.DisposeAsync();
SignalR 클라이언트 (JavaScript)
AlarmHub 연결
import * as signalR from "@microsoft/signalr";
const connection = new signalR.HubConnectionBuilder()
.withUrl("https://localhost:5001/hubs/alarms", {
accessTokenFactory: () => "YOUR_JWT_TOKEN"
})
.withAutomaticReconnect()
.build();
// 알람 수신 이벤트 등록
connection.on("ReceiveAlarm", (message, severity, timestamp) => {
console.log(`[${severity}] ${message} at ${timestamp}`);
});
// 연결
await connection.start();
console.log("Connected to AlarmHub");
// 연결 유지...
// 종료
await connection.stop();
MonitoringHub 연결 및 태그 구독
import * as signalR from "@microsoft/signalr";
const connection = new signalR.HubConnectionBuilder()
.withUrl("https://localhost:5001/hubs/monitoring", {
accessTokenFactory: () => "YOUR_JWT_TOKEN"
})
.withAutomaticReconnect()
.build();
// 태그 값 변경 이벤트 등록
connection.on("ReceiveTagUpdate", (driverId, address, value, timestamp) => {
console.log(`${driverId}.${address} = ${value} at ${timestamp}`);
});
// 연결
await connection.start();
// 태그 구독
await connection.invoke("SubscribeTagValues", ["PLC01.D100", "PLC01.D200"]);
// 설비 상태 구독
await connection.invoke("SubscribeEquipmentStatus", "MACHINE-001");
// 예측 정비 알림 구독
await connection.invoke("SubscribePredictiveMaintenance");
// 연결 유지...
// 구독 해제
await connection.invoke("UnsubscribeTagValues", ["PLC01.D100"]);
// 종료
await connection.stop();
curl (REST API via gRPC JSON Transcoding)
Health 체크
curl -X GET "https://localhost:5001/api/v1/health"
응답
{
"status": "Healthy",
"version": "2.0.0",
"uptimeSeconds": 3600
}
태그 쓰기
curl -X POST "https://localhost:5001/api/v1/tags/Sim-PLC-01/D100/write" \
-H "Content-Type: application/json" \
-d '{"value": "789"}'
응답
{
"success": true,
"message": "Write Command Queued"
}
통합 클라이언트 (CaffeineClient)
Caffeine.Client 패키지는 gRPC, SignalR, GraphQL을 통합한 올인원 클라이언트입니다.
설치
dotnet add package NEXCODE.Caffeine.Client
통합 사용 예제
using Caffeine.Client;
var client = new CaffeineClient();
// 연결 (gRPC + SignalR 동시 연결)
await client.ConnectAsync("https://localhost:5001");
// 연결 상태 모니터링
client.ConnectionStateChanged += (sender, e) =>
{
Console.WriteLine($"Connection: {e.State} - {e.Reason}");
};
// 알람 수신
client.AlarmReceived += (sender, e) =>
{
Console.WriteLine($"[{e.Severity}] {e.Message}");
};
// 태그 값 변경 수신
client.TagValueChanged += (sender, e) =>
{
Console.WriteLine($"{e.DriverId}.{e.Address} = {e.Value}");
};
// 태그 구독 (SignalR)
await client.SubscribeTagAsync("Sim-PLC-01", "D100");
// 태그 읽기 (GraphQL)
var tagData = await client.ReadTagAsync("Sim-PLC-01", "D100");
Console.WriteLine($"Current Value: {tagData?.Value}");
// 태그 쓰기 (gRPC)
bool success = await client.WriteTagAsync("Sim-PLC-01", "D100", 999);
// Health 체크 (gRPC)
var health = await client.GetHealthAsync();
Console.WriteLine($"Status: {health.Status}, Uptime: {health.UptimeSeconds}s");
// 종료
await client.DisposeAsync();
응답 코드
gRPC 상태 코드
| 코드 | 설명 |
|---|---|
| OK | 요청 성공 |
| CANCELLED | 요청 취소됨 |
| INVALID_ARGUMENT | 잘못된 파라미터 |
| DEADLINE_EXCEEDED | 타임아웃 |
| NOT_FOUND | 드라이버 또는 태그를 찾을 수 없음 |
| UNAVAILABLE | 서비스 연결 불가 |
| UNAUTHENTICATED | 인증 실패 |
SignalR 연결 상태
| 상태 | 설명 |
|---|---|
| Connected | 연결됨 |
| Connecting | 연결 시도 중 |
| Reconnecting | 재연결 중 |
| Disconnected | 연결 끊김 |
보안 및 인증
gRPC
gRPC 서비스는 기본적으로 TLS를 통한 암호화 통신을 사용합니다.
- Engine ↔ Bridge: mTLS (상호 인증서 검증) 권장
- 외부 클라이언트: JWT 토큰 기반 인증 (메타데이터 헤더)
var headers = new Metadata
{
{ "Authorization", $"Bearer {jwtToken}" }
};
var call = grpcClient.WriteTagAsync(request, headers);
SignalR
SignalR Hub는 [Authorize] 속성으로 보호됩니다.
- JWT 토큰:
AccessTokenProvider를 통해 전달
.WithUrl("https://localhost:5001/hubs/monitoring", options =>
{
options.AccessTokenProvider = () => Task.FromResult(GetJwtToken());
})
성능 최적화
gRPC Streaming
- 배치 전송:
StreamBatchIo사용 권장 (수천 개 태그를 한 번에 전송) - 압축:
WriteFlags.NoCompress옵션으로 압축 비활성화 (저지연 요구사항) - 버퍼링:
DisableBuffering()호출로 즉시 플러시
SignalR
- 그룹 구독: 불필요한 전체 브로드캐스트 방지
- 자동 재연결:
WithAutomaticReconnect()활성화 - 백프레셔: 클라이언트 처리 속도에 맞춰 서버 전송 제어
문제 해결
gRPC 연결 실패
증상: Grpc.Core.RpcException: Status(StatusCode="Unavailable")
해결:
- Engine 서비스가 실행 중인지 확인
- 방화벽에서 gRPC 포트 개방 확인
- TLS 인증서 유효성 확인
SignalR 재연결 실패
증상: HubConnection 상태가 Disconnected로 유지됨
해결:
- JWT 토큰 만료 확인 (갱신 필요)
- 서버 재시작 여부 확인
- 네트워크 연결 안정성 확인
태그 업데이트 수신 안 됨
증상: ReceiveTagUpdate 이벤트가 발생하지 않음
해결:
SubscribeTagValues호출 확인- 태그 이름 정확성 확인 (대소문자 구분)
- Engine에서 해당 드라이버 활성화 상태 확인