namespace NModbus.Device { using NModbus; using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; /// /// Provides concurrency control across multiple Modbus readers/writers. /// public class ConcurrentModbusMaster : IConcurrentModbusMaster { private readonly IModbusMaster _master; private readonly TimeSpan _minInterval; private bool _isDisposed; private readonly Stopwatch _stopwatch = new Stopwatch(); private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); public ConcurrentModbusMaster(IModbusMaster master, TimeSpan minInterval) { _master = master ?? throw new ArgumentNullException(nameof(master)); _minInterval = minInterval; _stopwatch.Start(); } private Task WaitAsync(CancellationToken cancellationToken) { int difference = (int)(_minInterval - _stopwatch.Elapsed).TotalMilliseconds; if (difference > 0) { return Task.Delay(difference, cancellationToken); } #if NET45 return CompletedTask(); #else return Task.CompletedTask; #endif } #if NET45 private static readonly Task completedTask = Task.FromResult(false); public static Task CompletedTask() { return completedTask; } #endif private async Task PerformFuncAsync(Func> action, CancellationToken cancellationToken) { T? value = default(T); await PerformAsync(async () => value = await action(), cancellationToken); return value; } private async Task PerformAsync(Func action, CancellationToken cancellationToken) { await _semaphore.WaitAsync(cancellationToken); try { await WaitAsync(cancellationToken); await action(); } finally { _semaphore.Release(); _stopwatch.Restart(); } } public async Task ReadInputRegistersAsync(byte slaveAddress, ushort startAddress, ushort numberOfPoints, ushort blockSize, CancellationToken cancellationToken) { return await PerformFuncAsync(async () => { List registers = new List(numberOfPoints); int soFar = 0; int thisRead = blockSize; while (soFar < numberOfPoints) { //If we're _not_ on the first run through here, wait for the min time if (soFar > 0) { await Task.Delay(_minInterval, cancellationToken); } //Check to see if we've ben cancelled cancellationToken.ThrowIfCancellationRequested(); if (thisRead > (numberOfPoints - soFar)) { thisRead = numberOfPoints - soFar; } //Perform this operation ushort[] registersFromThisRead = await _master.ReadInputRegistersAsync(slaveAddress, (ushort)(startAddress + soFar), (ushort)thisRead); //Add these to the result registers.AddRange(registersFromThisRead); //Increment where we're at soFar += thisRead; } return registers.ToArray(); }, cancellationToken); } public Task ReadHoldingRegistersAsync(byte slaveAddress, ushort startAddress, ushort numberOfPoints, ushort blockSize, CancellationToken cancellationToken) { return PerformFuncAsync(async () => { List registers = new List(numberOfPoints); int soFar = 0; int thisRead = blockSize; while (soFar < numberOfPoints) { //If we're _not_ on the first run through here, wait for the min time if (soFar > 0) { await Task.Delay(_minInterval, cancellationToken); } //Check to see if we've ben cancelled cancellationToken.ThrowIfCancellationRequested(); if (thisRead > (numberOfPoints - soFar)) { thisRead = numberOfPoints - soFar; } //Perform this operation ushort[] registersFromThisRead = await _master.ReadHoldingRegistersAsync(slaveAddress, (ushort)(startAddress + soFar), (ushort)thisRead); //Add these to the result registers.AddRange(registersFromThisRead); //Increment where we're at soFar += thisRead; } return registers.ToArray(); }, cancellationToken); } public Task WriteMultipleRegistersAsync(byte slaveAddress, ushort startAddress, ushort[] data, ushort blockSize, CancellationToken cancellationToken) { return PerformAsync(async () => { int soFar = 0; int thisWrite = blockSize; while (soFar < data.Length) { //If we're _not_ on the first run through here, wait for the min time if (soFar > 0) { await Task.Delay(_minInterval, cancellationToken); } if (thisWrite > (data.Length - soFar)) { thisWrite = data.Length - soFar; } ushort[] registers = data.Skip(soFar).Take(thisWrite).ToArray(); await _master.WriteMultipleRegistersAsync(slaveAddress, (ushort) (startAddress + soFar), registers); soFar += thisWrite; } }, cancellationToken); } public Task WriteSingleRegisterAsync(byte slaveAddress, ushort address, ushort value, CancellationToken cancellationToken) { return PerformAsync(() => _master.WriteSingleRegisterAsync(slaveAddress, address, value), cancellationToken); } public Task WriteCoilsAsync(byte slaveAddress, ushort startAddress, bool[] data, CancellationToken cancellationToken) { return PerformAsync(() => _master.WriteMultipleCoilsAsync(slaveAddress, startAddress, data), cancellationToken); } public Task ReadCoilsAsync(byte slaveAddress, ushort startAddress, ushort number, CancellationToken cancellationToken) { return PerformFuncAsync(() => _master.ReadCoilsAsync(slaveAddress, startAddress, number), cancellationToken); } public Task ReadDiscretesAsync(byte slaveAddress, ushort startAddress, ushort number, CancellationToken cancellationToken) { return PerformFuncAsync(() => _master.ReadInputsAsync(slaveAddress, startAddress, number), cancellationToken); } public Task WriteSingleCoilAsync(byte slaveAddress, ushort coilAddress, bool value, CancellationToken cancellationToken) { return PerformAsync(() => _master.WriteSingleCoilAsync(slaveAddress, coilAddress, value), cancellationToken); } public void Dispose() { if (!_isDisposed) { _isDisposed = true; _master.Dispose(); } } } }