SafeDispatch/SafeMobileLIB_DLL/WebsocketClient/WebsocketClient.Sending.cs

227 lines
7.9 KiB
C#
Raw Normal View History

2024-02-22 16:43:59 +00:00
using System;
using System.Net.WebSockets;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace SafeMobileLib.WebsocketClient
{
public partial class WebsocketClient
{
private readonly Channel<string> _messagesTextToSendQueue = Channel.CreateUnbounded<string>(new UnboundedChannelOptions()
{
SingleReader = true,
SingleWriter = false
});
private readonly Channel<byte[]> _messagesBinaryToSendQueue = Channel.CreateUnbounded<byte[]>(new UnboundedChannelOptions()
{
SingleReader = true,
SingleWriter = false
});
/// <summary>
/// Send text message to the websocket channel.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Text message to be sent</param>
public void Send(string message)
{
Validations.Validations.ValidateInput(message, nameof(message));
_messagesTextToSendQueue.Writer.TryWrite(message);
}
/// <summary>
/// Send binary message to the websocket channel.
/// It inserts the message to the queue and actual sending is done on an other thread
/// </summary>
/// <param name="message">Binary message to be sent</param>
public void Send(byte[] message)
{
Validations.Validations.ValidateInput(message, nameof(message));
_messagesBinaryToSendQueue.Writer.TryWrite(message);
}
/// <summary>
/// Send text message to the websocket channel.
/// It doesn't use a sending queue,
/// beware of issue while sending two messages in the exact same time
/// on the full .NET Framework platform
/// </summary>
/// <param name="message">Message to be sent</param>
public Task SendInstant(string message)
{
Validations.Validations.ValidateInput(message, nameof(message));
return SendInternalSynchronized(message);
}
/// <summary>
/// Send binary message to the websocket channel.
/// It doesn't use a sending queue,
/// beware of issue while sending two messages in the exact same time
/// on the full .NET Framework platform
/// </summary>
/// <param name="message">Message to be sent</param>
public Task SendInstant(byte[] message)
{
return SendInternalSynchronized(message);
}
/// <summary>
/// Stream/publish fake message (via 'MessageReceived' observable).
/// Use for testing purposes to simulate a server message.
/// </summary>
/// <param name="message">Message to be stream</param>
public void StreamFakeMessage(ResponseMessage message)
{
Validations.Validations.ValidateInput(message, nameof(message));
_messageReceivedSubject.OnNext(message);
}
private async Task SendTextFromQueue()
{
string message;
try
{
while (await _messagesTextToSendQueue.Reader.WaitToReadAsync())
{
while (_messagesTextToSendQueue.Reader.TryRead(out message))
{
try
{
await SendInternalSynchronized(message).ConfigureAwait(false);
}
catch (Exception e)
{
Utils.WriteLine($"Failed to send text message: '{message}'. Error: {e.Message}", ConsoleColor.Red);
}
}
}
}
catch (TaskCanceledException)
{
// task was canceled, ignore
}
catch (OperationCanceledException)
{
// operation was canceled, ignore
}
catch (Exception e)
{
if (_cancellationTotal.IsCancellationRequested || _disposing)
{
// disposing/canceling, do nothing and exit
return;
}
Utils.WriteLine($"Sending text thread failed, error: {e.Message}. Creating a new sending thread.",ConsoleColor.Red);
StartBackgroundThreadForSendingText();
}
}
private async Task SendBinaryFromQueue()
{
byte[] message;
try
{
while (await _messagesBinaryToSendQueue.Reader.WaitToReadAsync())
{
while (_messagesBinaryToSendQueue.Reader.TryRead(out message))
{
try
{
await SendInternalSynchronized(message).ConfigureAwait(false);
}
catch (Exception e)
{
Utils.WriteLine($"Failed to send binary message: '{message}'. Error: {e.Message}", ConsoleColor.Red);
}
}
}
}
catch (TaskCanceledException)
{
// task was canceled, ignore
}
catch (OperationCanceledException)
{
// operation was canceled, ignore
}
catch (Exception e)
{
if (_cancellationTotal.IsCancellationRequested || _disposing)
{
// disposing/canceling, do nothing and exit
return;
}
Utils.WriteLine($"Sending binary thread failed, error: {e.Message}. Creating a new sending thread.", ConsoleColor.Red);
StartBackgroundThreadForSendingBinary();
}
}
private void StartBackgroundThreadForSendingText()
{
_ = Task.Factory.StartNew(_ => SendTextFromQueue(), TaskCreationOptions.LongRunning, _cancellationTotal.Token);
}
private void StartBackgroundThreadForSendingBinary()
{
_ = Task.Factory.StartNew(_ => SendBinaryFromQueue(), TaskCreationOptions.LongRunning, _cancellationTotal.Token);
}
private async Task SendInternalSynchronized(string message)
{
using (await _locker.LockAsync())
{
await SendInternal(message);
}
}
private async Task SendInternal(string message)
{
if (!IsClientConnected())
{
Utils.WriteLine($"Client is not connected to server, cannot send: {message}" , ConsoleColor.Magenta);
return;
}
Utils.WriteLine($"Sending: {message}", ConsoleColor.Green);
var buffer = GetEncoding().GetBytes(message);
var messageSegment = new ArraySegment<byte>(buffer);
await _client
.SendAsync(messageSegment, WebSocketMessageType.Text, true, _cancellation.Token)
.ConfigureAwait(false);
}
private async Task SendInternalSynchronized(byte[] message)
{
using (await _locker.LockAsync())
{
await SendInternal(message);
}
}
private async Task SendInternal(byte[] message)
{
if (!IsClientConnected())
{
Utils.WriteLine($"Client is not connected to server, cannot send binary, length: {message.Length}", ConsoleColor.Magenta);
return;
}
Utils.WriteLine($"Sending binary, length: {message.Length}", ConsoleColor.Green);
await _client
.SendAsync(new ArraySegment<byte>(message), WebSocketMessageType.Binary, true, _cancellation.Token)
.ConfigureAwait(false);
}
}
}