Gone from async queue

This commit is contained in:
Felix Weiß
2023-07-31 17:59:19 +02:00
parent 3f3e3abe15
commit aa8441fa33
16 changed files with 465 additions and 123 deletions

View File

@@ -55,9 +55,14 @@ public partial class ConnectView : UserControl {
App.ViewModel.Plc = Mewtocol.Ethernet(viewModel.SelectedIP, parsedInt) App.ViewModel.Plc = Mewtocol.Ethernet(viewModel.SelectedIP, parsedInt)
.WithPoller() .WithPoller()
.WithInterfaceSettings(s => {
s.TryReconnectAttempts = 30;
s.TryReconnectDelayMs = 2000;
})
.WithRegisters(b => { .WithRegisters(b => {
b.Struct<short>("DT0").Build(); b.Struct<short>("DT0").Build();
b.Struct<short>("DT0").AsArray(30).Build(); b.Struct<short>("DT0").AsArray(30).Build();
b.Struct<Word>("DT1002").Build();
}) })
.Build(); .Build();
@@ -67,6 +72,12 @@ public partial class ConnectView : UserControl {
App.MainWindow.mainContent.Content = new PlcDataView(); App.MainWindow.mainContent.Content = new PlcDataView();
//for (int i = 0; i < 300000; i++) {
// _ = Task.Run(async () => await App.ViewModel.Plc.SendCommandAsync("%EE#RT"));
//}
} }
viewModel.IsConnecting = false; viewModel.IsConnecting = false;

View File

@@ -79,6 +79,7 @@
<DataGridTextColumn Header="Address" Binding="{Binding PLCAddressName}"/> <DataGridTextColumn Header="Address" Binding="{Binding PLCAddressName}"/>
<DataGridTextColumn Header="Name" Binding="{Binding UnderlyingSystemType.Name}"/> <DataGridTextColumn Header="Name" Binding="{Binding UnderlyingSystemType.Name}"/>
<DataGridTextColumn Header="Value" Binding="{Binding ValueStr}"/> <DataGridTextColumn Header="Value" Binding="{Binding ValueStr}"/>
<!--<DataGridTextColumn Header="Value" Binding="{Binding PollLevel, Mode=OneWay}"/>-->
</DataGrid.Columns> </DataGrid.Columns>
</DataGrid> </DataGrid>

View File

@@ -1,4 +1,6 @@
using System; using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace MewtocolNet.Helpers { namespace MewtocolNet.Helpers {
@@ -8,16 +10,46 @@ namespace MewtocolNet.Helpers {
readonly object _locker = new object(); readonly object _locker = new object();
readonly WeakReference<Task> _lastTask = new WeakReference<Task>(null); readonly WeakReference<Task> _lastTask = new WeakReference<Task>(null);
internal CancellationTokenSource tSource = new CancellationTokenSource();
private List<Task> queuedTasks = new List<Task>();
//internal Task<T> Enqueue<T>(Func<Task<T>> asyncFunction) {
// lock (_locker) {
// var token = tSource.Token;
// Task lastTask;
// Task<T> resultTask;
// if (_lastTask.TryGetTarget(out lastTask)) {
// resultTask = lastTask.ContinueWith(_ => asyncFunction(), token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current).Unwrap();
// } else {
// resultTask = Task.Run(asyncFunction, token);
// }
// _lastTask.SetTarget(resultTask);
// return resultTask;
// }
//}
internal Task<T> Enqueue<T>(Func<Task<T>> asyncFunction) { internal Task<T> Enqueue<T>(Func<Task<T>> asyncFunction) {
lock (_locker) { lock (_locker) {
var token = tSource.Token;
Task lastTask; Task lastTask;
Task<T> resultTask; Task<T> resultTask;
if (_lastTask.TryGetTarget(out lastTask)) { if (_lastTask.TryGetTarget(out lastTask)) {
resultTask = lastTask.ContinueWith(_ => asyncFunction(), TaskContinuationOptions.ExecuteSynchronously).Unwrap(); resultTask = lastTask.ContinueWith(_ => asyncFunction(), token, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current).Unwrap();
} else { } else {
resultTask = Task.Run(asyncFunction); resultTask = Task.Run(asyncFunction, token);
} }
_lastTask.SetTarget(resultTask); _lastTask.SetTarget(resultTask);
@@ -25,6 +57,15 @@ namespace MewtocolNet.Helpers {
return resultTask; return resultTask;
} }
}
internal void CancelAll () {
tSource.Cancel();
Console.WriteLine();
} }
} }

View File

@@ -95,7 +95,7 @@ namespace MewtocolNet {
/// <param name="withTerminator">Append the checksum and bcc automatically</param> /// <param name="withTerminator">Append the checksum and bcc automatically</param>
/// <param name="timeoutMs">Timout to wait for a response</param> /// <param name="timeoutMs">Timout to wait for a response</param>
/// <returns>Returns the result</returns> /// <returns>Returns the result</returns>
Task<MewtocolFrameResponse> SendCommandAsync(string _msg, bool withTerminator = true, int timeoutMs = -1, Action<double> onReceiveProgress = null); Task<MewtocolFrameResponse> SendCommandAsync(string _msg, Action<double> onReceiveProgress = null);
/// <summary> /// <summary>
/// Changes the PLCs operation mode to the given one /// Changes the PLCs operation mode to the given one

View File

@@ -9,7 +9,7 @@ namespace MewtocolNet.Logging {
public static class Logger { public static class Logger {
/// <summary> /// <summary>
/// Sets the loglevel for the logger module /// Sets the loglevel for the global logging module
/// </summary> /// </summary>
public static LogLevel LogLevel { get; set; } public static LogLevel LogLevel { get; set; }
@@ -54,13 +54,17 @@ namespace MewtocolNet.Logging {
} }
});
LogInvoked += (d, l, m) => {
if (DefaultTargets.HasFlag(LoggerTargets.Trace)) { if (DefaultTargets.HasFlag(LoggerTargets.Trace)) {
Trace.WriteLine($"{d:hh:mm:ss:ff} {m}"); Trace.WriteLine($"{d:hh:mm:ss:ff} {m}");
} }
}); };
} }
@@ -70,22 +74,26 @@ namespace MewtocolNet.Logging {
/// <summary> /// <summary>
/// Gets invoked whenever a new log message is ready /// Gets invoked whenever a new log message is ready
/// </summary> /// </summary>
public static void OnNewLogMessage(Action<DateTime, LogLevel, string> onMsg) { public static void OnNewLogMessage(Action<DateTime, LogLevel, string> onMsg, LogLevel? maxLevel = null) {
if (maxLevel == null) maxLevel = LogLevel;
LogInvoked += (t, l, m) => { LogInvoked += (t, l, m) => {
onMsg(t, l, m);
if ((int)l <= (int)maxLevel) {
onMsg(t, l, m);
}
}; };
} }
internal static void Log(string message, LogLevel loglevel, MewtocolInterface sender = null) { internal static void Log(string message, LogLevel loglevel, MewtocolInterface sender = null) {
if ((int)loglevel <= (int)LogLevel) { if (sender == null) {
if (sender == null) { LogInvoked?.Invoke(DateTime.Now, loglevel, message);
LogInvoked?.Invoke(DateTime.Now, loglevel, message); } else {
} else { LogInvoked?.Invoke(DateTime.Now, loglevel, $"[{sender.GetConnectionInfo()}] {message}");
LogInvoked?.Invoke(DateTime.Now, loglevel, $"[{sender.GetConnectionInfo()}] {message}");
}
} }
} }

View File

@@ -321,6 +321,9 @@ namespace MewtocolNet
imew.memoryManager.pollLevelOrMode = res.PollLevelOverwriteMode; imew.memoryManager.pollLevelOrMode = res.PollLevelOverwriteMode;
imew.maxDataBlocksPerWrite = res.MaxDataBlocksPerWrite; imew.maxDataBlocksPerWrite = res.MaxDataBlocksPerWrite;
imew.sendReceiveTimeoutMs = res.SendReceiveTimeoutMs;
imew.tryReconnectAttempts = res.TryReconnectAttempts;
imew.tryReconnectDelayMs = res.TryReconnectDelayMs;
} }

View File

@@ -17,6 +17,8 @@ namespace MewtocolNet
public static MewtocolFrameResponse NotIntialized => new MewtocolFrameResponse(405, "PLC was not initialized"); public static MewtocolFrameResponse NotIntialized => new MewtocolFrameResponse(405, "PLC was not initialized");
public static MewtocolFrameResponse Canceled => new MewtocolFrameResponse(500, "Op was canceled by the library");
public MewtocolFrameResponse(string response) { public MewtocolFrameResponse(string response) {
Success = true; Success = true;

View File

@@ -9,9 +9,11 @@ using System.ComponentModel;
using System.Diagnostics; using System.Diagnostics;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Net.Sockets;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Text; using System.Text;
using System.Text.RegularExpressions; using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace MewtocolNet { namespace MewtocolNet {
@@ -53,13 +55,18 @@ namespace MewtocolNet {
private protected int bytesPerSecondDownstream = 0; private protected int bytesPerSecondDownstream = 0;
private protected AsyncQueue queue = new AsyncQueue(); private protected AsyncQueue queue = new AsyncQueue();
private protected Stopwatch speedStopwatchUpstr; private protected Stopwatch speedStopwatchUpstr;
private protected Stopwatch speedStopwatchDownstr; private protected Stopwatch speedStopwatchDownstr;
private protected Task firstPollTask; private protected Task firstPollTask;
private Task reconnectTask;
private protected bool wasInitialStatusReceived; private protected bool wasInitialStatusReceived;
private protected MewtocolVersion mewtocolVersion; private protected MewtocolVersion mewtocolVersion;
//private protected string[] lastMsgs = new string[10];
private protected List<string> lastMsgs = new List<string>();
#endregion #endregion
#region Internal fields #region Internal fields
@@ -72,6 +79,10 @@ namespace MewtocolNet {
private volatile bool isReceiving; private volatile bool isReceiving;
private volatile bool isSending; private volatile bool isSending;
internal int sendReceiveTimeoutMs = 1000;
internal int tryReconnectAttempts = 0;
internal int tryReconnectDelayMs = 1000;
#endregion #endregion
#region Public Read Only Properties / Fields #region Public Read Only Properties / Fields
@@ -171,13 +182,20 @@ namespace MewtocolNet {
WatchPollerDemand(); WatchPollerDemand();
Connected += MewtocolInterface_Connected; Connected += MewtocolInterface_Connected;
Disconnected += MewtocolInterface_Disconnected;
RegisterChanged += OnRegisterChanged; RegisterChanged += OnRegisterChanged;
void MewtocolInterface_Connected(object sender, PlcConnectionArgs args) { }
IsConnected = true; private void MewtocolInterface_Connected(object sender, PlcConnectionArgs args) {
} IsConnected = true;
}
private void MewtocolInterface_Disconnected(object sender, PlcConnectionArgs e) {
Logger.LogVerbose("Disconnected", this);
} }
@@ -213,13 +231,13 @@ namespace MewtocolNet {
await memoryManager.OnPlcConnected(); await memoryManager.OnPlcConnected();
Logger.Log($"PLC: {PlcInfo.TypeName}", LogLevel.Verbose, this); Logger.Log($"PLC: {PlcInfo.TypeName}", LogLevel.Verbose, this);
Logger.Log($"TYPE CODE: {PlcInfo.TypeCode.ToString("X")}", LogLevel.Verbose, this); Logger.Log($"TYPE CODE: {PlcInfo.TypeCode.ToString("X")}", LogLevel.Verbose, this);
Logger.Log($"OP MODE: {PlcInfo.OperationMode}", LogLevel.Verbose, this); Logger.Log($"OP MODE: {PlcInfo.OperationMode}", LogLevel.Verbose, this);
Logger.Log($"PROG CAP: {PlcInfo.ProgramCapacity}k", LogLevel.Verbose, this); Logger.Log($"PROG CAP: {PlcInfo.ProgramCapacity}k", LogLevel.Verbose, this);
Logger.Log($"HW INFO: {PlcInfo.HardwareInformation}", LogLevel.Verbose, this); Logger.Log($"HW INFO: {PlcInfo.HardwareInformation}", LogLevel.Verbose, this);
Logger.Log($"DIAG ERR: {PlcInfo.SelfDiagnosticError}", LogLevel.Verbose, this); Logger.Log($"DIAG ERR: {PlcInfo.SelfDiagnosticError}", LogLevel.Verbose, this);
Logger.Log($"CPU VER: {PlcInfo.CpuVersion}", LogLevel.Verbose, this); Logger.Log($"CPU VER: {PlcInfo.CpuVersion}", LogLevel.Verbose, this);
Logger.Log($">> Intial connection end <<", LogLevel.Verbose, this); Logger.Log($">> Intial connection end <<", LogLevel.Verbose, this);
@@ -233,6 +251,9 @@ namespace MewtocolNet {
} }
/// <inheritdoc/>
protected virtual Task ReconnectAsync(int conTimeout) => throw new NotImplementedException();
/// <inheritdoc/> /// <inheritdoc/>
public async Task AwaitFirstDataCycleAsync() => await firstPollTask; public async Task AwaitFirstDataCycleAsync() => await firstPollTask;
@@ -271,8 +292,71 @@ namespace MewtocolNet {
/// <inheritdoc/> /// <inheritdoc/>
public virtual string GetConnectionInfo() => throw new NotImplementedException(); public virtual string GetConnectionInfo() => throw new NotImplementedException();
internal void StartReconnectTask() {
if (reconnectTask == null) {
reconnectTask = Task.Run(async () => {
int retryCount = 1;
if (!IsConnected) {
if(this is MewtocolInterfaceTcp tcpI) {
tcpI.client.Close();
tcpI.client = null;
}
while (!IsConnected && tryReconnectAttempts > 0 && retryCount < tryReconnectAttempts + 1) {
Logger.Log($"Reconnecting {retryCount}/{tryReconnectAttempts} ...", this);
//kill the poller
KillPoller();
//stop the heartbeat timer for the time of retries
StopHeartBeat();
await ReconnectAsync(tryReconnectDelayMs);
await Task.Delay(2000);
retryCount++;
}
//still not connected
if (!IsConnected) {
//invoke the dc evnt
OnMajorSocketExceptionAfterRetries();
}
}
});
}
}
internal async Task AwaitReconnectTaskAsync () {
if (reconnectTask != null && !reconnectTask.IsCompleted) await reconnectTask;
await Task.CompletedTask;
}
private Task<MewtocolFrameResponse> regularSendTask;
/// <inheritdoc/> /// <inheritdoc/>
public async Task<MewtocolFrameResponse> SendCommandAsync (string _msg, bool withTerminator = true, int timeoutMs = -1, Action<double> onReceiveProgress = null) { public async Task<MewtocolFrameResponse> SendCommandAsync(string _msg, Action<double> onReceiveProgress = null) {
await AwaitReconnectTaskAsync();
if (!IsConnected && !isConnectingStage) if (!IsConnected && !isConnectingStage)
throw new NotSupportedException("The device must be connected to send a message"); throw new NotSupportedException("The device must be connected to send a message");
@@ -280,32 +364,51 @@ namespace MewtocolNet {
//send request //send request
queuedMessages++; queuedMessages++;
var tempResponse = queue.Enqueue(async () => await SendFrameAsync(_msg, withTerminator, withTerminator, onReceiveProgress)); //wait for the last send task to complete
if (regularSendTask != null && !regularSendTask.IsCompleted) await regularSendTask;
regularSendTask = SendFrameAsync(_msg, onReceiveProgress);
if (await Task.WhenAny(regularSendTask, Task.Delay(2000)) != regularSendTask) {
if (await Task.WhenAny(tempResponse, Task.Delay(timeoutMs)) != tempResponse) {
// timeout logic // timeout logic
return MewtocolFrameResponse.Timeout; return MewtocolFrameResponse.Timeout;
} }
//canceled
if (regularSendTask.IsCanceled) return MewtocolFrameResponse.Canceled;
tcpMessagesSentThisCycle++; tcpMessagesSentThisCycle++;
queuedMessages--; queuedMessages--;
return tempResponse.Result; //success
if (regularSendTask.Result.Success) return regularSendTask.Result;
//no success
if(reconnectTask == null) StartReconnectTask();
//await the single reconnect task
await AwaitReconnectTaskAsync();
//re-send the command
if (IsConnected) {
return await SendCommandAsync(_msg, onReceiveProgress);
}
return MewtocolFrameResponse.Timeout;
} }
private protected async Task<MewtocolFrameResponse> SendFrameAsync(string frame, bool useBcc = true, bool useCr = true, Action<double> onReceiveProgress = null) { private protected async Task<MewtocolFrameResponse> SendFrameAsync(string frame, Action<double> onReceiveProgress = null) {
try { try {
if (stream == null) return MewtocolFrameResponse.NotIntialized; if (stream == null) return MewtocolFrameResponse.NotIntialized;
if (useBcc) frame = $"{frame.BCC_Mew()}\r";
frame = $"{frame.BCC_Mew()}";
if (useCr)
frame = $"{frame}\r";
SetUpstreamStopWatchStart(); SetUpstreamStopWatchStart();
@@ -334,8 +437,7 @@ namespace MewtocolNet {
//calc upstream speed //calc upstream speed
CalcUpstreamSpeed(writeBuffer.Length); CalcUpstreamSpeed(writeBuffer.Length);
Logger.Log($"[---------CMD START--------]", LogLevel.Critical, this); OnOutMsg(frame);
Logger.Log($"--> OUT MSG: {frame.Replace("\r", "(CR)")}", LogLevel.Critical, this);
var readResult = await ReadCommandAsync(wordsCountRequested, onReceiveProgress); var readResult = await ReadCommandAsync(wordsCountRequested, onReceiveProgress);
@@ -381,9 +483,7 @@ namespace MewtocolNet {
} }
Logger.Log($"<-- IN MSG: {resString.Replace("\r", "(CR)")}", LogLevel.Critical, this); OnInMsg(resString);
Logger.Log($"Total bytes parsed: {resString.Length}", LogLevel.Critical, this);
Logger.Log($"[---------CMD END----------]", LogLevel.Critical, this);
return new MewtocolFrameResponse(resString); return new MewtocolFrameResponse(resString);
@@ -415,7 +515,7 @@ namespace MewtocolNet {
byte[] buffer = new byte[RecBufferSize]; byte[] buffer = new byte[RecBufferSize];
IsReceiving = true; IsReceiving = true;
int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length); int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, queue.tSource.Token);
IsReceiving = false; IsReceiving = false;
CalcDownstreamSpeed(bytesRead); CalcDownstreamSpeed(bytesRead);
@@ -426,8 +526,7 @@ namespace MewtocolNet {
var commandRes = ParseBufferFrame(received); var commandRes = ParseBufferFrame(received);
needsRead = commandRes == CommandState.LineFeed || commandRes == CommandState.RequestedNextFrame; needsRead = commandRes == CommandState.LineFeed || commandRes == CommandState.RequestedNextFrame;
var tempMsg = Encoding.UTF8.GetString(received).Replace("\r", "(CR)"); OnInMsgPart(Encoding.UTF8.GetString(received));
Logger.Log($">> IN PART: {tempMsg}, Command state: {commandRes}", LogLevel.Critical, this);
//add complete response to collector without empty bytes //add complete response to collector without empty bytes
totalResponse.AddRange(received.Where(x => x != (byte)0x0)); totalResponse.AddRange(received.Where(x => x != (byte)0x0));
@@ -437,7 +536,7 @@ namespace MewtocolNet {
//request next frame //request next frame
var writeBuffer = Encoding.UTF8.GetBytes($"%{GetStationNumber()}**&\r"); var writeBuffer = Encoding.UTF8.GetBytes($"%{GetStationNumber()}**&\r");
IsSending = true; IsSending = true;
await stream.WriteAsync(writeBuffer, 0, writeBuffer.Length); await stream.WriteAsync(writeBuffer, 0, writeBuffer.Length, queue.tSource.Token);
IsSending = false; IsSending = false;
Logger.Log($">> Requested next frame", LogLevel.Critical, this); Logger.Log($">> Requested next frame", LogLevel.Critical, this);
wasMultiFramedResponse = true; wasMultiFramedResponse = true;
@@ -463,7 +562,16 @@ namespace MewtocolNet {
} while (needsRead); } while (needsRead);
} catch (OperationCanceledException) { } }
catch (OperationCanceledException) { }
catch (IOException ex) {
Logger.LogError($"Socket exception encountered: {ex.Message.ToString()}", this);
//socket io exception
OnSocketExceptionWhileConnected();
}
return (totalResponse.ToArray(), wasMultiFramedResponse); return (totalResponse.ToArray(), wasMultiFramedResponse);
@@ -508,10 +616,47 @@ namespace MewtocolNet {
} }
private protected void OnOutMsg(string outMsg) {
Logger.Log($"[---------CMD START--------]", LogLevel.Critical, this);
var formatted = $"S -> : {outMsg.Replace("\r", "(CR)")}";
AddToLastMsgs(formatted);
Logger.Log(formatted, LogLevel.Critical, this);
}
private protected void OnInMsgPart(string inPart) {
var formatted = $"<< IN PART: {inPart.Replace("\r", "(CR)")}";
AddToLastMsgs(formatted);
Logger.Log(formatted, LogLevel.Critical, this);
}
private protected void OnInMsg(string inMsg) {
var formatted = $"R <- : {inMsg.Replace("\r", "(CR)")}";
AddToLastMsgs(formatted);
Logger.Log(formatted, LogLevel.Critical, this);
Logger.Log($"[---------CMD END----------]", LogLevel.Critical, this);
}
private protected void AddToLastMsgs (string msgTxt) {
lastMsgs.Add(msgTxt);
if(lastMsgs.Count >= 51) {
lastMsgs.RemoveAt(0);
}
}
private protected void OnMajorSocketExceptionWhileConnecting() { private protected void OnMajorSocketExceptionWhileConnecting() {
if (IsConnected) { if (IsConnected) {
queue.CancelAll();
Logger.Log("The PLC connection timed out", LogLevel.Error, this); Logger.Log("The PLC connection timed out", LogLevel.Error, this);
OnDisconnect(); OnDisconnect();
@@ -523,6 +668,8 @@ namespace MewtocolNet {
if (IsConnected) { if (IsConnected) {
queue.CancelAll();
Logger.Log("The PLC connection was closed", LogLevel.Error, this); Logger.Log("The PLC connection was closed", LogLevel.Error, this);
OnDisconnect(); OnDisconnect();
@@ -530,6 +677,22 @@ namespace MewtocolNet {
} }
private protected void OnMajorSocketExceptionAfterRetries() {
Logger.LogError($"Failed to re-connect, closing PLC", this);
OnDisconnect();
}
private protected void OnSocketExceptionWhileConnected () {
queue.CancelAll();
IsConnected = false;
}
private protected virtual void OnConnected(PLCInfo plcinf) { private protected virtual void OnConnected(PLCInfo plcinf) {
Logger.Log("Connected to PLC", LogLevel.Info, this); Logger.Log("Connected to PLC", LogLevel.Info, this);

View File

@@ -41,7 +41,7 @@ namespace MewtocolNet {
} }
} }
private System.Timers.Timer heartBeatTimer = new System.Timers.Timer(); private System.Timers.Timer heartBeatTimer;
#region Register Polling #region Register Polling
@@ -53,17 +53,24 @@ namespace MewtocolNet {
Disconnected += (s, e) => { Disconnected += (s, e) => {
heartBeatTimer.Elapsed -= PollTimerTick; StopHeartBeat();
heartBeatTimer.Stop();
}; };
} }
private void StopHeartBeat () {
heartBeatTimer.Elapsed -= PollTimerTick;
heartBeatTimer.Dispose();
}
private void TestPollerStartNeeded () { private void TestPollerStartNeeded () {
if (!IsConnected) return; if (!IsConnected) return;
heartBeatTimer = new System.Timers.Timer();
heartBeatTimer.Interval = 3000; heartBeatTimer.Interval = 3000;
heartBeatTimer.Elapsed += PollTimerTick; heartBeatTimer.Elapsed += PollTimerTick;
heartBeatTimer.Start(); heartBeatTimer.Start();
@@ -102,10 +109,28 @@ namespace MewtocolNet {
private void PollTimerTick(object sender, System.Timers.ElapsedEventArgs e) { private void PollTimerTick(object sender, System.Timers.ElapsedEventArgs e) {
heartbeatTask = Task.Run(async () => { if(!IsConnected || isConnectingStage) return;
Logger.LogVerbose("Sending heartbeat", this);
await GetPLCInfoAsync(); heartbeatNeedsRun = true;
});
}
private bool heartbeatNeedsRun = false;
private async Task HeartbeatTickTask () {
if (regularSendTask != null && !regularSendTask.IsCompleted) await regularSendTask;
Logger.LogVerbose("Sending heartbeat", this);
if (await GetPLCInfoAsync(2000) == null) {
Logger.LogError("Heartbeat timed out", this);
OnSocketExceptionWhileConnected();
StartReconnectTask();
}
} }
@@ -125,6 +150,8 @@ namespace MewtocolNet {
pollCycleTask = OnMultiFrameCycle(); pollCycleTask = OnMultiFrameCycle();
await pollCycleTask; await pollCycleTask;
if (!memoryManager.HasCyclicPollableRegisters()) KillPoller();
return tcpMessagesSentThisCycle; return tcpMessagesSentThisCycle;
} }
@@ -144,6 +171,8 @@ namespace MewtocolNet {
pollCycleTask = OnMultiFrameCycle(); pollCycleTask = OnMultiFrameCycle();
await pollCycleTask; await pollCycleTask;
if (!memoryManager.HasCyclicPollableRegisters()) KillPoller();
InvokePolledCycleDone(); InvokePolledCycleDone();
if (!IsConnected) { if (!IsConnected) {
@@ -160,7 +189,12 @@ namespace MewtocolNet {
private async Task OnMultiFrameCycle() { private async Task OnMultiFrameCycle() {
//await the timed task before starting a new poller cycle //await the timed task before starting a new poller cycle
if (!heartbeatTask.IsCompleted) await heartbeatTask; if (heartbeatNeedsRun) {
await HeartbeatTickTask();
heartbeatNeedsRun = false;
}
var sw = Stopwatch.StartNew(); var sw = Stopwatch.StartNew();
@@ -169,8 +203,6 @@ namespace MewtocolNet {
sw.Stop(); sw.Stop();
PollerCycleDurationMs = (int)sw.ElapsedMilliseconds; PollerCycleDurationMs = (int)sw.ElapsedMilliseconds;
if (!memoryManager.HasCyclicPollableRegisters()) KillPoller();
} }
#endregion #endregion
@@ -417,8 +449,9 @@ namespace MewtocolNet {
for (int i = 0; i < internals.Count; i++) { for (int i = 0; i < internals.Count; i++) {
var reg = (Register)internals[i]; var reg = internals[i];
reg.ClearValue(); reg.ClearValue();
//reg.TriggerNotifyChange();
} }

View File

@@ -5,6 +5,7 @@ using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Net.Sockets; using System.Net.Sockets;
using System.Text; using System.Text;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace MewtocolNet { namespace MewtocolNet {
@@ -15,6 +16,8 @@ namespace MewtocolNet {
internal int maxDataBlocksPerWrite = 8; internal int maxDataBlocksPerWrite = 8;
private CancellationTokenSource tTaskCancelSource = new CancellationTokenSource();
#region PLC info getters #region PLC info getters
/// <summary> /// <summary>
@@ -23,22 +26,15 @@ namespace MewtocolNet {
/// <returns>A PLCInfo class</returns> /// <returns>A PLCInfo class</returns>
public async Task<PLCInfo> GetPLCInfoAsync(int timeout = -1) { public async Task<PLCInfo> GetPLCInfoAsync(int timeout = -1) {
MewtocolFrameResponse resRT = await SendCommandAsync("%EE#RT", timeoutMs: timeout); MewtocolFrameResponse resRT = await SendCommandAsync("%EE#RT");
if (!resRT.Success) { if (!resRT.Success) return null;
//timeouts are ok and don't throw
if (resRT == MewtocolFrameResponse.Timeout) return null;
throw new Exception(resRT.Error);
}
MewtocolFrameResponse? resEXRT = null; MewtocolFrameResponse? resEXRT = null;
if(isConnectingStage) { if(isConnectingStage) {
resEXRT = await SendCommandAsync("%EE#EX00RT00", timeoutMs: timeout); resEXRT = await SendCommandAsync("%EE#EX00RT00");
} }

View File

@@ -3,6 +3,7 @@ using System;
using System.Linq; using System.Linq;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace MewtocolNet { namespace MewtocolNet {
@@ -68,7 +69,45 @@ namespace MewtocolNet {
} }
/// <inheritdoc/> /// <inheritdoc/>
public override async Task ConnectAsync(Func<Task> callBack = null) { public override async Task ConnectAsync(Func<Task> callBack = null) => await ConnectAsyncPriv(callBack);
private void BuildTcpClient () {
if (HostEndpoint != null) {
var hasEndpoint = Mewtocol
.GetSourceEndpoints()
.Any(x => x.Address.ToString() == HostEndpoint.Address.ToString());
if (!hasEndpoint)
throw new NotSupportedException($"The specified source endpoint: " +
$"{HostEndpoint}, doesn't exist on the device, " +
$"use 'Mewtocol.GetSourceEndpoints()' to find applicable ones");
client = new TcpClient(HostEndpoint) {
ReceiveBufferSize = RecBufferSize,
NoDelay = false,
ReceiveTimeout = sendReceiveTimeoutMs,
SendTimeout = sendReceiveTimeoutMs,
};
var ep = (IPEndPoint)client.Client.LocalEndPoint;
Logger.Log($"Connecting [MAN] endpoint: {ep.Address}:{ep.Port}", LogLevel.Info, this);
} else {
client = new TcpClient() {
ReceiveBufferSize = RecBufferSize,
NoDelay = false,
ReceiveTimeout = sendReceiveTimeoutMs,
SendTimeout = sendReceiveTimeoutMs,
};
}
}
private async Task ConnectAsyncPriv(Func<Task> callBack = null) {
try { try {
@@ -77,33 +116,7 @@ namespace MewtocolNet {
Logger.Log($">> Intial connection start <<", LogLevel.Verbose, this); Logger.Log($">> Intial connection start <<", LogLevel.Verbose, this);
isConnectingStage = true; isConnectingStage = true;
if (HostEndpoint != null) { BuildTcpClient();
var hasEndpoint = Mewtocol
.GetSourceEndpoints()
.Any(x => x.Address.ToString() == HostEndpoint.Address.ToString());
if (!hasEndpoint)
throw new NotSupportedException($"The specified source endpoint: " +
$"{HostEndpoint}, doesn't exist on the device, " +
$"use 'Mewtocol.GetSourceEndpoints()' to find applicable ones");
client = new TcpClient(HostEndpoint) {
ReceiveBufferSize = RecBufferSize,
NoDelay = false,
};
var ep = (IPEndPoint)client.Client.LocalEndPoint;
Logger.Log($"Connecting [MAN] endpoint: {ep.Address}:{ep.Port}", LogLevel.Info, this);
} else {
client = new TcpClient() {
ReceiveBufferSize = RecBufferSize,
NoDelay = false,
//ExclusiveAddressUse = true,
};
}
var result = client.BeginConnect(ipAddr, Port, null, null); var result = client.BeginConnect(ipAddr, Port, null, null);
var success = result.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(ConnectTimeout)); var success = result.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(ConnectTimeout));
@@ -125,7 +138,7 @@ namespace MewtocolNet {
stream.ReadTimeout = 1000; stream.ReadTimeout = 1000;
//get plc info //get plc info
var plcinf = await GetPLCInfoAsync(); var plcinf = await GetPLCInfoAsync(ConnectTimeout);
if (plcinf != null) { if (plcinf != null) {
@@ -151,6 +164,70 @@ namespace MewtocolNet {
} }
protected override async Task ReconnectAsync (int conTimeout) {
try {
firstPollTask = new Task(() => { });
Logger.Log($">> Reconnect start <<", LogLevel.Verbose, this);
isConnectingStage = true;
BuildTcpClient();
var result = client.BeginConnect(ipAddr, Port, null, null);
var success = result.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(conTimeout));
if (client.Connected)
Logger.LogVerbose("TCP/IP Client connected", this);
if (!success || !client.Connected) {
Logger.Log("The PLC connection timed out", LogLevel.Error, this);
OnMajorSocketExceptionWhileConnecting();
return;
}
if (HostEndpoint == null) {
var ep = (IPEndPoint)client.Client.LocalEndPoint;
Logger.Log($"Connecting [AUTO] endpoint: {ep.Address.MapToIPv4()}:{ep.Port}", LogLevel.Info, this);
}
//get the stream
stream = client.GetStream();
stream.ReadTimeout = 1000;
Logger.LogVerbose("Attached stream, getting PLC info", this);
//get plc info
var plcinf = await GetPLCInfoAsync(ConnectTimeout);
if (plcinf != null) {
IsConnected = true;
await base.ConnectAsync();
Logger.LogVerbose("Connection re-established", this);
OnConnected(plcinf);
} else {
Logger.Log("Initial connection failed", LogLevel.Error, this);
OnDisconnect();
}
await Task.CompletedTask;
} catch (Exception ex) {
Logger.LogError($"Reconnect exception: {ex.Message}");
}
}
/// <summary> /// <summary>
/// Gets the connection info string /// Gets the connection info string
/// </summary> /// </summary>
@@ -162,9 +239,9 @@ namespace MewtocolNet {
private protected override void OnDisconnect() { private protected override void OnDisconnect() {
if (IsConnected) { base.OnDisconnect();
base.OnDisconnect(); if (client != null && client.Connected) {
client.Close(); client.Close();

View File

@@ -210,34 +210,12 @@ namespace MewtocolNet {
}; };
/// <inheritdoc/>
public static bool operator ==(PLCInfo c1, PLCInfo c2) {
return c1.Equals(c2);
}
/// <inheritdoc/>
public static bool operator !=(PLCInfo c1, PLCInfo c2) {
return !c1.Equals(c2);
}
public override string ToString() { public override string ToString() {
return $"{TypeName}, OP: {OperationMode}"; return $"{TypeName}, OP: {OperationMode}";
} }
public override bool Equals(object obj) {
if ((obj == null) || !this.GetType().Equals(obj.GetType())) {
return false;
} else {
return (PLCInfo)obj == this;
}
}
public override int GetHashCode() => base.GetHashCode();
private protected void OnPropChange([CallerMemberName] string propertyName = null) { private protected void OnPropChange([CallerMemberName] string propertyName = null) {
PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(propertyName)); PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(propertyName));

View File

@@ -24,6 +24,11 @@ namespace MewtocolNet.Registers {
/// </summary> /// </summary>
string Name { get; } string Name { get; }
/// <summary>
/// The poll level this register is attached to
/// </summary>
int PollLevel { get; }
/// <summary> /// <summary>
/// Gets the register address name as in the plc /// Gets the register address name as in the plc
/// </summary> /// </summary>

View File

@@ -65,6 +65,9 @@ namespace MewtocolNet.Registers {
/// <inheritdoc/> /// <inheritdoc/>
public uint MemoryAddress => memoryAddress; public uint MemoryAddress => memoryAddress;
/// <inheritdoc/>
int IRegister.PollLevel => pollLevel;
#region Trigger update notify #region Trigger update notify
public event PropertyChangedEventHandler PropertyChanged; public event PropertyChangedEventHandler PropertyChanged;
@@ -82,7 +85,11 @@ namespace MewtocolNet.Registers {
internal virtual void UpdateHoldingValue(object val) { internal virtual void UpdateHoldingValue(object val) {
if (lastValue?.ToString() != val?.ToString()) { bool nullDiff = false;
if (val == null && lastValue != null) nullDiff = true;
if (val != null && lastValue == null) nullDiff = true;
if (lastValue?.ToString() != val?.ToString() || nullDiff) {
var beforeVal = lastValue; var beforeVal = lastValue;
var beforeValStr = GetValueString(); var beforeValStr = GetValueString();

View File

@@ -66,9 +66,9 @@ namespace MewtocolNet.Registers {
lastValue = null; lastValue = null;
} }
public IEnumerator<T> GetEnumerator() => ((Array)ValueObj).OfType<T>().GetEnumerator(); public IEnumerator<T> GetEnumerator() => ((Array)ValueObj)?.OfType<T>()?.GetEnumerator() ?? Enumerable.Empty<T>().GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => ((Array)ValueObj).OfType<T>().GetEnumerator(); IEnumerator IEnumerable.GetEnumerator() => ((Array)ValueObj)?.OfType<T>()?.GetEnumerator() ?? Enumerable.Empty<T>().GetEnumerator();
async Task<T[]> IArrayRegister<T>.ReadAsync() => (T[])(object)await ReadAsync(); async Task<T[]> IArrayRegister<T>.ReadAsync() => (T[])(object)await ReadAsync();

View File

@@ -1,4 +1,6 @@
namespace MewtocolNet.SetupClasses { using System;
namespace MewtocolNet.SetupClasses {
public class InterfaceSettings { public class InterfaceSettings {
@@ -34,6 +36,21 @@
/// </summary> /// </summary>
public int MaxDataBlocksPerWrite { get; set; } = 8; public int MaxDataBlocksPerWrite { get; set; } = 8;
/// <summary>
/// The send and receive timout for messages in milliseconds
/// </summary>
public int SendReceiveTimeoutMs { get; set; } = 1000;
/// <summary>
/// Number of attempts to try and reconnect to the plc, 0 for none
/// </summary>
public int TryReconnectAttempts { get; set; } = 5;
/// <summary>
/// The delay between reconnect trys
/// </summary>
public int TryReconnectDelayMs { get; set; } = 2000;
} }
} }