using System; using System.IO; using System.Linq; using System.Net.WebSockets; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Text; using System.Threading; using System.Threading.Tasks; using SafeMobileLib.WebsocketClient.Models; using SafeMobileLib.WebsocketClient.Threading; using SafeMobileLib.WebsocketClient.Exceptions; using SafeMobileLib.WebsocketClient.Validations; using Newtonsoft.Json; namespace SafeMobileLib.WebsocketClient { /// /// A simple websocket client with built-in reconnection and error handling /// public partial class WebsocketClient : IWebsocketClient { private readonly WebsocketAsyncLock _locker = new WebsocketAsyncLock(); private readonly Func> _connectionFactory; private Uri _url; private Timer _lastChanceTimer; private DateTime _lastReceivedMsg = DateTime.UtcNow; private bool _disposing; private bool _reconnecting; private bool _stopping; private bool _isReconnectionEnabled = true; private WebSocket _client; private CancellationTokenSource _cancellation; private CancellationTokenSource _cancellationTotal; private readonly Subject _messageReceivedSubject = new Subject(); private readonly Subject _reconnectionSubject = new Subject(); private readonly Subject _disconnectedSubject = new Subject(); //private Task _; /// /// A simple websocket client with built-in reconnection and error handling /// /// Target websocket url (wss://) /// Optional factory for native ClientWebSocket, use it whenever you need some custom features (proxy, settings, etc) public WebsocketClient(Uri url, Func clientFactory = null) : this(url, GetClientFactory(clientFactory)) { } /// /// A simple websocket client with built-in reconnection and error handling /// /// Target websocket url (wss://) /// Optional factory for native creating and connecting to a websocket. The method should return a which is connected. Use it whenever you need some custom features (proxy, settings, etc) public WebsocketClient(Uri url, Func> connectionFactory) { Validations.Validations.ValidateInput(url, nameof(url)); _url = url; _connectionFactory = connectionFactory ?? (async (uri, token) => { var client = new ClientWebSocket { Options = { KeepAliveInterval = new TimeSpan(0, 0, 5, 0) } }; await client.ConnectAsync(uri, token).ConfigureAwait(false); return client; }); } /// public Uri Url { get { return _url; } set { Validations.Validations.ValidateInput(value, nameof(Url)); _url = value; } } /// /// Stream with received message (raw format) /// public IObservable MessageReceived => _messageReceivedSubject.AsObservable(); /// /// Stream for reconnection event (triggered after the new connection) /// public IObservable ReconnectionHappened => _reconnectionSubject.AsObservable(); /// /// Stream for disconnection event (triggered after the connection was lost) /// public IObservable DisconnectionHappened => _disconnectedSubject.AsObservable(); /// /// Time range in ms, how long to wait before reconnecting if no message comes from server. /// Set null to disable this feature. /// Default: 1 minute /// public TimeSpan? ReconnectTimeout { get; set; } = TimeSpan.FromMinutes(1); /// /// Time range in ms, how long to wait before reconnecting if last reconnection failed. /// Set null to disable this feature. /// Default: 1 minute /// public TimeSpan? ErrorReconnectTimeout { get; set; } = TimeSpan.FromMinutes(1); /// /// Enable or disable reconnection functionality (enabled by default) /// public bool IsReconnectionEnabled { get {return _isReconnectionEnabled;} set { _isReconnectionEnabled = value; if (IsStarted) { if (_isReconnectionEnabled) { ActivateLastChance(); } else { DeactivateLastChance(); } } } } /// /// Get or set the name of the current websocket client instance. /// For logging purpose (in case you use more parallel websocket clients and want to distinguish between them) /// public string Name { get; set; } /// /// Returns true if Start() method was called at least once. False if not started or disposed /// public bool IsStarted { get; private set; } /// /// Returns true if client is running and connected to the server /// public bool IsRunning { get; private set; } /// public Encoding MessageEncoding { get; set; } /// public ClientWebSocket NativeClient => GetSpecificOrThrow(_client); /// /// Terminate the websocket connection and cleanup everything /// public void Dispose() { _disposing = true; Utils.WriteLine(L("Disposing.."), ConsoleColor.Yellow); try { _messagesTextToSendQueue?.Writer.Complete(); _messagesBinaryToSendQueue?.Writer.Complete(); _lastChanceTimer?.Dispose(); _cancellation?.Cancel(); _cancellationTotal?.Cancel(); _client?.Abort(); _client?.Dispose(); _cancellation?.Dispose(); _cancellationTotal?.Dispose(); _messageReceivedSubject.OnCompleted(); _reconnectionSubject.OnCompleted(); } catch (Exception e) { Utils.WriteLine($"Failed to dispose client, error: {e.Message}", ConsoleColor.Red); } IsRunning = false; IsStarted = false; _disconnectedSubject.OnNext(DisconnectionInfo.Create(DisconnectionType.Exit, _client, null)); _disconnectedSubject.OnCompleted(); } /// /// Start listening to the websocket stream on the background thread. /// In case of connection error it doesn't throw an exception. /// Only streams a message via 'DisconnectionHappened' and logs it. /// public Task Start() { return StartInternal(false); } /// /// Start listening to the websocket stream on the background thread. /// In case of connection error it throws an exception. /// Fail fast approach. /// public Task StartOrFail() { return StartInternal(true); } /// /// Stop/close websocket connection with custom close code. /// Method doesn't throw exception, only logs it and mark client as closed. /// /// Returns true if close was initiated successfully public async Task Stop(WebSocketCloseStatus status, string statusDescription) { var result = await StopInternal( _client, status, statusDescription, null, false, false).ConfigureAwait(false); _disconnectedSubject.OnNext(DisconnectionInfo.Create(DisconnectionType.ByUser, _client, null)); return result; } /// /// Stop/close websocket connection with custom close code. /// Method could throw exceptions, but client is marked as closed anyway. /// /// Returns true if close was initiated successfully public async Task StopOrFail(WebSocketCloseStatus status, string statusDescription) { var result = await StopInternal( _client, status, statusDescription, null, true, false).ConfigureAwait(false); _disconnectedSubject.OnNext(DisconnectionInfo.Create(DisconnectionType.ByUser, _client, null)); return result; } private static Func> GetClientFactory(Func clientFactory) { if (clientFactory == null) return null; return (async (uri, token) => { var client = clientFactory(); await client.ConnectAsync(uri, token).ConfigureAwait(false); return client; }); } private async Task StartInternal(bool failFast) { if (_disposing) { throw new WebsocketException(L("Client is already disposed, starting not possible")); } if (IsStarted) { Utils.WriteLine(("Client already started, ignoring.."), ConsoleColor.Yellow); return; } IsStarted = true; Utils.WriteLine(("Starting.."), ConsoleColor.Green); _cancellation = new CancellationTokenSource(); _cancellationTotal = new CancellationTokenSource(); await StartClient(_url, _cancellation.Token, ReconnectionType.Initial, failFast).ConfigureAwait(false); StartBackgroundThreadForSendingText(); StartBackgroundThreadForSendingBinary(); } private async Task StopInternal(WebSocket client, WebSocketCloseStatus status, string statusDescription, CancellationToken? cancellation, bool failFast, bool byServer) { if (_disposing) { throw new WebsocketException(L("Client is already disposed, stopping not possible")); } var result = false; if (client == null) { IsStarted = false; IsRunning = false; return false; } DeactivateLastChance(); try { var cancellationToken = cancellation ?? CancellationToken.None; _stopping = true; if (byServer) await client.CloseOutputAsync(status, statusDescription, cancellationToken); else await client.CloseAsync(status, statusDescription, cancellationToken); result = true; } catch (Exception e) { Utils.WriteLine($"Error while stopping client, message: '{e.Message}'", ConsoleColor.Red); if (failFast) { // fail fast, propagate exception throw new WebsocketException($"Failed to stop Websocket client, error: '{e.Message}'", e); } } finally { IsStarted = false; IsRunning = false; _stopping = false; } return result; } private async Task StartClient(Uri uri, CancellationToken token, ReconnectionType type, bool failFast) { DeactivateLastChance(); try { _client = await _connectionFactory(uri, token).ConfigureAwait(false); IsRunning = true; IsStarted = true; _reconnectionSubject.OnNext(ReconnectionInfo.Create(type)); _ = Listen(_client, token); _lastReceivedMsg = DateTime.UtcNow; ActivateLastChance(); } catch (Exception e) { var info = DisconnectionInfo.Create(DisconnectionType.Error, _client, e); _disconnectedSubject.OnNext(info); if (info.CancelReconnection) { // reconnection canceled by user, do nothing Utils.WriteLine($"Exception while connecting. " + $"Reconnecting canceled by user, exiting. Error: '{e.Message}'", ConsoleColor.Magenta); return; } if (failFast) { // fail fast, propagate exception // do not reconnect throw new WebsocketException($"Failed to start Websocket client, error: '{e.Message}'", e); } if (ErrorReconnectTimeout == null) { Utils.WriteLine($"Exception while connecting. " + $"Reconnecting disable, exiting. Error: '{e.Message}'", ConsoleColor.Magenta); return; } var timeout = ErrorReconnectTimeout.Value; Utils.WriteLine($"Exception while connecting. " + $"Waiting {timeout.TotalSeconds} sec before next reconnection try. Error: '{e.Message}'", ConsoleColor.Yellow); await Task.Delay(timeout, token).ConfigureAwait(false); await Reconnect(ReconnectionType.Error, false, e).ConfigureAwait(false); } } private bool IsClientConnected() { return _client.State == WebSocketState.Open; } private async Task Listen(WebSocket client, CancellationToken token) { Exception causedException = null; try { // define buffer here and reuse, to avoid more allocation const int chunkSize = 1024 * 4; var buffer = new ArraySegment(new byte[chunkSize]); do { WebSocketReceiveResult result; byte[] resultArrayWithTrailing = null; var resultArraySize = 0; var isResultArrayCloned = false; MemoryStream ms = null; while (true) { result = await client.ReceiveAsync(buffer, token); var currentChunk = buffer.Array; var currentChunkSize = result.Count; var isFirstChunk = resultArrayWithTrailing == null; if (isFirstChunk) { // first chunk, use buffer as reference, do not allocate anything resultArraySize += currentChunkSize; resultArrayWithTrailing = currentChunk; isResultArrayCloned = false; } else if (currentChunk == null) { // weird chunk, do nothing } else { // received more chunks, lets merge them via memory stream if (ms == null) { // create memory stream and insert first chunk ms = new MemoryStream(); ms.Write(resultArrayWithTrailing, 0, resultArraySize); } // insert current chunk ms.Write(currentChunk, buffer.Offset, currentChunkSize); } if (result.EndOfMessage) { break; } if (isResultArrayCloned) continue; // we got more chunks incoming, need to clone first chunk resultArrayWithTrailing = resultArrayWithTrailing?.ToArray(); isResultArrayCloned = true; } ms?.Seek(0, SeekOrigin.Begin); ResponseMessage message; if (result.MessageType == WebSocketMessageType.Text) { var data = ms != null ? GetEncoding().GetString(ms.ToArray()) : resultArrayWithTrailing != null ? GetEncoding().GetString(resultArrayWithTrailing, 0, resultArraySize) : null; message = ResponseMessage.TextMessage(data); } else if (result.MessageType == WebSocketMessageType.Close) { Utils.WriteLine($"Received close message", ConsoleColor.Yellow); var info = DisconnectionInfo.Create(DisconnectionType.ByServer, client, null); _disconnectedSubject.OnNext(info); if (info.CancelClosing) { // closing canceled, reconnect if enabled if (IsReconnectionEnabled) { throw new OperationCanceledException("Websocket connection was closed by server"); } continue; } await StopInternal(client, WebSocketCloseStatus.NormalClosure, "Closing", token, false, true); // reconnect if enabled if (IsReconnectionEnabled && !ShouldIgnoreReconnection(client)) { _ = ReconnectSynchronized(ReconnectionType.Lost, false, null); } return; } else { if (ms != null) { message = ResponseMessage.BinaryMessage(ms.ToArray()); } else { Array.Resize(ref resultArrayWithTrailing, resultArraySize); message = ResponseMessage.BinaryMessage(resultArrayWithTrailing); } } ms?.Dispose(); Utils.WriteLine($"Received: {message}", ConsoleColor.Green); _lastReceivedMsg = DateTime.UtcNow; _messageReceivedSubject.OnNext(message); } while (client.State == WebSocketState.Open && !token.IsCancellationRequested); } catch (TaskCanceledException e) { // task was canceled, ignore causedException = e; } catch (OperationCanceledException e) { // operation was canceled, ignore causedException = e; } catch (ObjectDisposedException e) { // client was disposed, ignore causedException = e; } catch (Exception e) { Utils.WriteLine($"Error while listening to websocket stream, error: '{e.Message}'", ConsoleColor.Red); causedException = e; } if (ShouldIgnoreReconnection(client) || !IsStarted) { // reconnection already in progress or client stopped/disposed, do nothing return; } // listening thread is lost, we have to reconnect _ = ReconnectSynchronized(ReconnectionType.Lost, false, causedException); } private bool ShouldIgnoreReconnection(WebSocket client) { // reconnection already in progress or client stopped/ disposed, var inProgress = _disposing || _reconnecting || _stopping; // already reconnected var differentClient = client != _client; return inProgress || differentClient; } private Encoding GetEncoding() { if (MessageEncoding == null) MessageEncoding = Encoding.UTF8; return MessageEncoding; } private ClientWebSocket GetSpecificOrThrow(WebSocket client) { if (client == null) return null; var specific = client as ClientWebSocket; if (specific == null) throw new WebsocketException("Cannot cast 'WebSocket' client to 'ClientWebSocket', " + "provide correct type via factory or don't use this property at all."); return specific; } private string L(string msg) { var name = Name ?? "CLIENT"; return $"[WEBSOCKET {name}] {msg}"; } private DisconnectionType TranslateTypeToDisconnection(ReconnectionType type) { // beware enum indexes must correspond to each other return (DisconnectionType)type; } } public class UnitDataToSend { [JsonProperty("radio_id")] public string radio_id; [JsonProperty("lat")] public double lat; [JsonProperty("lng")] public double lng; [JsonProperty("speed")] public int speed; [JsonProperty("unix_time")] public uint unix_time; [JsonProperty("group_name")] public string GroupName; [JsonProperty("gps_state")] public string GpsState; [JsonProperty("heading")] public string Heading; public UnitDataToSend() { } } }