ConcurrentModbusMaster.cs 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. namespace NModbus.Device
  2. {
  3. using NModbus;
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Diagnostics;
  7. using System.Linq;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. /// <summary>
  11. /// Provides concurrency control across multiple Modbus readers/writers.
  12. /// </summary>
  13. public class ConcurrentModbusMaster : IConcurrentModbusMaster
  14. {
  15. private readonly IModbusMaster _master;
  16. private readonly TimeSpan _minInterval;
  17. private bool _isDisposed;
  18. private readonly Stopwatch _stopwatch = new Stopwatch();
  19. private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
  20. public ConcurrentModbusMaster(IModbusMaster master, TimeSpan minInterval)
  21. {
  22. _master = master ?? throw new ArgumentNullException(nameof(master));
  23. _minInterval = minInterval;
  24. _stopwatch.Start();
  25. }
  26. private Task WaitAsync(CancellationToken cancellationToken)
  27. {
  28. int difference = (int)(_minInterval - _stopwatch.Elapsed).TotalMilliseconds;
  29. if (difference > 0)
  30. {
  31. return Task.Delay(difference, cancellationToken);
  32. }
  33. #if NET45
  34. return CompletedTask();
  35. #else
  36. return Task.CompletedTask;
  37. #endif
  38. }
  39. #if NET45
  40. private static readonly Task completedTask = Task.FromResult(false);
  41. public static Task CompletedTask()
  42. {
  43. return completedTask;
  44. }
  45. #endif
  46. private async Task<T> PerformFuncAsync<T>(Func<Task<T>> action, CancellationToken cancellationToken)
  47. {
  48. T value = default(T);
  49. await PerformAsync(async () => value = await action(), cancellationToken);
  50. return value;
  51. }
  52. private async Task PerformAsync(Func<Task> action, CancellationToken cancellationToken)
  53. {
  54. await _semaphore.WaitAsync(cancellationToken);
  55. try
  56. {
  57. await WaitAsync(cancellationToken);
  58. await action();
  59. }
  60. finally
  61. {
  62. _semaphore.Release();
  63. _stopwatch.Restart();
  64. }
  65. }
  66. public async Task<ushort[]> ReadInputRegistersAsync(byte slaveAddress, ushort startAddress, ushort numberOfPoints, ushort blockSize, CancellationToken cancellationToken)
  67. {
  68. return await PerformFuncAsync(async () =>
  69. {
  70. List<ushort> registers = new List<ushort>(numberOfPoints);
  71. int soFar = 0;
  72. int thisRead = blockSize;
  73. while (soFar < numberOfPoints)
  74. {
  75. //If we're _not_ on the first run through here, wait for the min time
  76. if (soFar > 0)
  77. {
  78. await Task.Delay(_minInterval, cancellationToken);
  79. }
  80. //Check to see if we've ben cancelled
  81. cancellationToken.ThrowIfCancellationRequested();
  82. if (thisRead > (numberOfPoints - soFar))
  83. {
  84. thisRead = numberOfPoints - soFar;
  85. }
  86. //Perform this operation
  87. ushort[] registersFromThisRead = await _master.ReadInputRegistersAsync(slaveAddress, (ushort)(startAddress + soFar), (ushort)thisRead);
  88. //Add these to the result
  89. registers.AddRange(registersFromThisRead);
  90. //Increment where we're at
  91. soFar += thisRead;
  92. }
  93. return registers.ToArray();
  94. }, cancellationToken);
  95. }
  96. public Task<ushort[]> ReadHoldingRegistersAsync(byte slaveAddress, ushort startAddress, ushort numberOfPoints, ushort blockSize, CancellationToken cancellationToken)
  97. {
  98. return PerformFuncAsync(async () =>
  99. {
  100. List<ushort> registers = new List<ushort>(numberOfPoints);
  101. int soFar = 0;
  102. int thisRead = blockSize;
  103. while (soFar < numberOfPoints)
  104. {
  105. //If we're _not_ on the first run through here, wait for the min time
  106. if (soFar > 0)
  107. {
  108. await Task.Delay(_minInterval, cancellationToken);
  109. }
  110. //Check to see if we've ben cancelled
  111. cancellationToken.ThrowIfCancellationRequested();
  112. if (thisRead > (numberOfPoints - soFar))
  113. {
  114. thisRead = numberOfPoints - soFar;
  115. }
  116. //Perform this operation
  117. ushort[] registersFromThisRead = await _master.ReadHoldingRegistersAsync(slaveAddress, (ushort)(startAddress + soFar), (ushort)thisRead);
  118. //Add these to the result
  119. registers.AddRange(registersFromThisRead);
  120. //Increment where we're at
  121. soFar += thisRead;
  122. }
  123. return registers.ToArray();
  124. }, cancellationToken);
  125. }
  126. public Task WriteMultipleRegistersAsync(byte slaveAddress, ushort startAddress, ushort[] data, ushort blockSize, CancellationToken cancellationToken)
  127. {
  128. return PerformAsync(async () =>
  129. {
  130. int soFar = 0;
  131. int thisWrite = blockSize;
  132. while (soFar < data.Length)
  133. {
  134. //If we're _not_ on the first run through here, wait for the min time
  135. if (soFar > 0)
  136. {
  137. await Task.Delay(_minInterval, cancellationToken);
  138. }
  139. if (thisWrite > (data.Length - soFar))
  140. {
  141. thisWrite = data.Length - soFar;
  142. }
  143. ushort[] registers = data.Skip(soFar).Take(thisWrite).ToArray();
  144. await _master.WriteMultipleRegistersAsync(slaveAddress, (ushort) (startAddress + soFar), registers);
  145. soFar += thisWrite;
  146. }
  147. }, cancellationToken);
  148. }
  149. public Task WriteSingleRegisterAsync(byte slaveAddress, ushort address, ushort value, CancellationToken cancellationToken)
  150. {
  151. return PerformAsync(() => _master.WriteSingleRegisterAsync(slaveAddress, address, value), cancellationToken);
  152. }
  153. public Task WriteCoilsAsync(byte slaveAddress, ushort startAddress, bool[] data, CancellationToken cancellationToken)
  154. {
  155. return PerformAsync(() => _master.WriteMultipleCoilsAsync(slaveAddress, startAddress, data), cancellationToken);
  156. }
  157. public Task<bool[]> ReadCoilsAsync(byte slaveAddress, ushort startAddress, ushort number,
  158. CancellationToken cancellationToken)
  159. {
  160. return PerformFuncAsync(() => _master.ReadCoilsAsync(slaveAddress, startAddress, number), cancellationToken);
  161. }
  162. public Task<bool[]> ReadDiscretesAsync(byte slaveAddress, ushort startAddress, ushort number, CancellationToken cancellationToken)
  163. {
  164. return PerformFuncAsync(() => _master.ReadInputsAsync(slaveAddress, startAddress, number), cancellationToken);
  165. }
  166. public Task WriteSingleCoilAsync(byte slaveAddress, ushort coilAddress, bool value, CancellationToken cancellationToken)
  167. {
  168. return PerformAsync(() => _master.WriteSingleCoilAsync(slaveAddress, coilAddress, value), cancellationToken);
  169. }
  170. public void Dispose()
  171. {
  172. if (!_isDisposed)
  173. {
  174. _isDisposed = true;
  175. _master.Dispose();
  176. }
  177. }
  178. }
  179. }