using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Convai.Scripts.Utils.LipSync; using Grpc.Core; using Service; using UnityEngine; namespace Convai.Scripts.Utils { /// /// Represents an NPC2NPCGRPCClient that can be used to communicate with the ConvaiService using gRPC. /// public class NPC2NPCGRPCClient : MonoBehaviour { private readonly CancellationTokenSource _cancellationTokenSource = new(); private string _apiKey; private ConvaiService.ConvaiServiceClient _client; private NPCGroup _npcGroup; private void OnDestroy() { _cancellationTokenSource?.Cancel(); _cancellationTokenSource?.Dispose(); } /// /// Initializes the NPC2NPCGRPCClient with the given API key and ConvaiService client. /// /// The API key to use for authentication. /// The ConvaiService client to use for communication. public void Initialize(string apiKey, ConvaiService.ConvaiServiceClient client, NPCGroup group) { _apiKey = apiKey; _client = client; _npcGroup = group; } /// /// Creates an AsyncDuplexStreamingCall with the specified headers. /// /// An AsyncDuplexStreamingCall with the specified headers. private AsyncDuplexStreamingCall GetAsyncDuplexStreamingCallOptions() { Metadata headers = new() { { "source", "Unity" }, { "version", "3.0.0" } }; CallOptions options = new(headers); return _client.GetResponse(options); } /// /// Sends the specified user text to the server and receives a response. /// /// The user text to send to the server. /// The ID of the character to use for the request. /// The ID of the session to use for the request. /// Whether lip sync is active for the request. /// The face model to use for the request. /// A task that represents the asynchronous operation. public async Task SendTextData(string userText, string characterID, string sessionID, bool isLipSyncActive, FaceModel faceModel) { AsyncDuplexStreamingCall call = GetAsyncDuplexStreamingCallOptions(); GetResponseRequest getResponseConfigRequest = CreateGetResponseRequest(characterID, sessionID, isLipSyncActive, faceModel, false, null); try { await call.RequestStream.WriteAsync(getResponseConfigRequest); await call.RequestStream.WriteAsync(new GetResponseRequest { GetResponseData = new GetResponseRequest.Types.GetResponseData { TextData = userText } }); await call.RequestStream.CompleteAsync(); Task receiveResultsTask = Task.Run( async () => { await ReceiveResultFromServer(call, _cancellationTokenSource.Token); }, _cancellationTokenSource.Token); await receiveResultsTask.ConfigureAwait(false); } catch (Exception) { // ignored } } /// /// Creates a GetResponseRequest with the specified parameters. /// /// The ID of the character to use for the request. /// The ID of the session to use for the request. /// Whether lip sync is active for the request. /// The face model to use for the request. /// Whether action is active for the request. /// The action configuration to use for the request. /// A GetResponseRequest with the specified parameters. private GetResponseRequest CreateGetResponseRequest(string characterID, string sessionID, bool isLipSyncActive, FaceModel faceModel, bool isActionActive, ActionConfig actionConfig) { GetResponseRequest getResponseConfigRequest = new() { GetResponseConfig = new GetResponseRequest.Types.GetResponseConfig { CharacterId = characterID, ApiKey = _apiKey, SessionId = sessionID, AudioConfig = new AudioConfig { EnableFacialData = isLipSyncActive, FaceModel = faceModel } } }; if (isActionActive) getResponseConfigRequest.GetResponseConfig.ActionConfig = actionConfig; return getResponseConfigRequest; } /// /// Receives a response from the server asynchronously. /// /// The AsyncDuplexStreamingCall to use for receiving the response. /// The cancellation token to use for cancelling the operation. /// A task that represents the asynchronous operation. private async Task ReceiveResultFromServer(AsyncDuplexStreamingCall call, CancellationToken cancellationToken) { Logger.Info("Receiving response from server", Logger.LogCategory.Character); ConvaiNPC convaiNPC = _npcGroup.CurrentSpeaker.ConvaiNPC; _npcGroup.CurrentSpeaker.CanRelayMessage = true; Queue lipSyncBlendFrameQueue = new Queue(); bool firstSilFound = false; while (!cancellationToken.IsCancellationRequested && await call.ResponseStream.MoveNext(cancellationToken).ConfigureAwait(false)) try { GetResponseResponse result = call.ResponseStream.Current; // Process the received response here if (result.AudioResponse != null) { if (result.AudioResponse.AudioData != null) // Add response to the list in the active NPC { if (result.AudioResponse.AudioData.ToByteArray().Length > 46) { byte[] wavBytes = result.AudioResponse.AudioData.ToByteArray(); // will only work for wav files WavHeaderParser parser = new(wavBytes); if (convaiNPC.convaiLipSync == null) { Logger.DebugLog($"Enqueuing responses: {result.AudioResponse.TextData}", Logger.LogCategory.LipSync); convaiNPC.EnqueueResponse(result); } else { LipSyncBlendFrameData.FrameType frameType = convaiNPC.convaiLipSync.faceModel == FaceModel.OvrModelName ? LipSyncBlendFrameData.FrameType.Visemes : LipSyncBlendFrameData.FrameType.Blendshape; lipSyncBlendFrameQueue.Enqueue( new LipSyncBlendFrameData( (int)(parser.CalculateDurationSeconds() * 30), result, frameType ) ); } } if (result.AudioResponse.VisemesData != null) { if (convaiNPC.convaiLipSync != null) { //Logger.Info(result.AudioResponse.VisemesData, Logger.LogCategory.LipSync); if (result.AudioResponse.VisemesData.Visemes.Sil == -2 || result.AudioResponse.EndOfResponse) { if (firstSilFound) { lipSyncBlendFrameQueue.Dequeue().Process(convaiNPC); } firstSilFound = true; } else lipSyncBlendFrameQueue.Peek().Enqueue(result.AudioResponse.VisemesData); } } if (result.AudioResponse.BlendshapesFrame != null) { if (convaiNPC.convaiLipSync != null) { if (lipSyncBlendFrameQueue.Peek().CanProcess() || result.AudioResponse.EndOfResponse) { lipSyncBlendFrameQueue.Dequeue().Process(convaiNPC); } else { lipSyncBlendFrameQueue.Peek().Enqueue(result.AudioResponse.BlendshapesFrame); if (lipSyncBlendFrameQueue.Peek().CanPartiallyProcess()) { lipSyncBlendFrameQueue.Peek().ProcessPartially(convaiNPC); } } } } if (result.AudioResponse.EndOfResponse) { MainThreadDispatcher.Instance.RunOnMainThread(() => { _npcGroup.CurrentSpeaker.EndOfResponseReceived(); }); } } } } catch (RpcException rpcException) { if (rpcException.StatusCode == StatusCode.Cancelled) Logger.Error(rpcException, Logger.LogCategory.Character); else throw; } catch (Exception ex) { Logger.DebugLog(ex, Logger.LogCategory.Character); } } } }