Compare commits

...

2 Commits

4 changed files with 202 additions and 7 deletions

View File

@ -170,9 +170,47 @@ namespace Infrastructure.Cache
return iDatabase.ListLength(key);
}
public IEnumerable<RedisKey> GetAllKeys(string pattern)
public IEnumerable<RedisKey> GetAllKeys(string pattern)
{
return server.Keys(pattern: new RedisValue(pattern));
}
/// <summary>
/// HSET - 异步设置哈希表中的多个字段。
/// </summary>
/// <param name="key"></param>
/// <param name="hashFields"></param>
/// <param name="flags"></param>
public void HashSetAsync(RedisKey key, HashEntry[] hashFields, CommandFlags flags = CommandFlags.None)
{
iDatabase.HashSetAsync(key, hashFields, CommandFlags.None);
}
/// <summary>
/// HGETALL - 异步获取哈希表中的所有字段和值。
/// </summary>
/// <param name="key"></param>
/// <param name="flags"></param>
/// <returns></returns>
public Task<HashEntry[]> HashGetAllAsync(RedisKey key, CommandFlags flags = CommandFlags.None)
{
return iDatabase.HashGetAllAsync(key, CommandFlags.None);
}
/// <summary>
/// Returns all the members of the set value stored at key
/// </summary>
/// <param name="key"></param>
/// <param name="flags"></param>
/// <returns></returns>
public Task<RedisValue[]> SetMembersAsync(RedisKey key, CommandFlags flags = CommandFlags.None)
{
return iDatabase.SetMembersAsync(key, CommandFlags.None);
}
public Task<bool> SetAddAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None)
{
return iDatabase.SetAddAsync(key, value, CommandFlags.None);
}
public Task<bool> SetRemoveAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None)
{
return iDatabase.SetRemoveAsync(key, value, CommandFlags.None);
}
}
}

View File

@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace OpenAuth.App.ServiceApp.Response
{
public class MqttClientResp
{
/// <summary>
/// 客户端ID
/// </summary>
public string ClientId { get; set; }
/// <summary>
/// 用户id
/// </summary>
public string UserId { get; set; }
/// <summary>
/// 用户名
/// </summary>
public string UserName { get; set; }
/// <summary>
/// 连接时间
/// </summary>
public DateTime ConnectTime { get; set; }
/// <summary>
/// 是否控制
/// </summary>
public bool IsLock { get; set; }
}
}

View File

@ -1,13 +1,18 @@
using DocumentFormat.OpenXml.Math;
using DocumentFormat.OpenXml.EMMA;
using DocumentFormat.OpenXml.Math;
using DocumentFormat.OpenXml.Spreadsheet;
using Infrastructure;
using Infrastructure.Cache;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using MQTTnet;
using OpenAuth.App.ServiceApp;
using OpenAuth.App.ServiceApp.Response;
using OpenAuth.Repository.Domain;
using StackExchange.Redis;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
namespace OpenAuth.WebApi.Controllers.ServiceControllers
{
@ -21,12 +26,14 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
private readonly AirportMaintenanceApp _app;
private readonly MqttClientManager _mqttClientManager;
private readonly MqttMessageCenter _mqttCenter;
private readonly RedisCacheContext _cache;
public AirportMaintenanceController(AirportMaintenanceApp app, MqttClientManager mqttClientManager, MqttMessageCenter mqttCenter)
public AirportMaintenanceController(AirportMaintenanceApp app, MqttClientManager mqttClientManager, MqttMessageCenter mqttCenter, RedisCacheContext cache)
{
_app = app;
_mqttClientManager = mqttClientManager;
_mqttCenter = mqttCenter;
_cache = cache;
}
/// <summary>
/// 机场注册 注册码生成
@ -389,5 +396,120 @@ namespace OpenAuth.WebApi.Controllers.ServiceControllers
return response;
}
#region redis 多用户控制
/// <summary>
/// 添加修改mqtt客户端信息
/// </summary>
/// <param name="info"></param>
/// <returns></returns>
[HttpPost]
[AllowAnonymous]
public async Task<Response<bool>> AddOrUpdateRedisUser(MqttClientResp info)
{
var result = new Response<bool>();
try
{
var clientKey = $"client:{info.UserId}";
string lockSetKey = "locked_users";
// 查询所有锁定用户
var existingLocked = await _cache.SetMembersAsync(lockSetKey);
// 如果有锁定用户,并且锁定的用户不是当前用户,则拒绝
if (existingLocked.Length > 0 && info.IsLock == true)
{
bool isCurrentUserLocked = existingLocked.Any(u => u == info.UserId);
if (!isCurrentUserLocked)
{
result.Code = 400;
result.Message = "已有其他用户处于锁定状态,不能添加新的锁定用户。";
result.Result = false;
return result;
}
}
// 存客户端信息
_cache.HashSetAsync(clientKey, new HashEntry[]
{
new("ClientId", info.ClientId),
new("UserId", info.UserId),
new("UserName", info.UserName),
new("ConnectTime", info.ConnectTime.ToString("O")),
new("IsLock", info.IsLock ? "true" : "false")
});
if (info.IsLock)
{
await _cache.SetAddAsync(lockSetKey, info.UserId);
}
else
{
await _cache.SetRemoveAsync(lockSetKey, info.UserId);
}
result.Result = true;
}
catch (Exception ex)
{
result.Code = 500;
result.Message = ex.Message;
}
return result;
}
/// <summary>
/// 获取当前用户mqtt客户端信息
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
[HttpGet]
[AllowAnonymous]
public async Task<Response<MqttClientResp>> GetRedisUser(string id)
{
var result = new Response<MqttClientResp>();
try
{
result.Result = ParseClient(await _cache.HashGetAllAsync($"client:{id}"));
}
catch (Exception ex)
{
result.Code = 500;
result.Message = ex.Message;
}
return result;
}
private MqttClientResp ParseClient(HashEntry[] entries)
{
var dict = entries.ToDictionary(e => e.Name.ToString(), e => e.Value.ToString());
return new MqttClientResp
{
ClientId = dict["ClientId"],
UserId = dict["UserId"],
UserName = dict["UserName"],
ConnectTime = DateTime.Parse(dict["ConnectTime"]),
IsLock = bool.Parse(dict["IsLock"])
};
}
/// <summary>
/// 获取所有锁定的用户客户端信息
/// </summary>
/// <returns></returns>
[HttpGet]
[AllowAnonymous]
public async Task<List<MqttClientResp>> GetLockedClients()
{
var userIds = await _cache.SetMembersAsync("locked_users");
var result = new List<MqttClientResp>();
foreach (var userId in userIds)
{
var entries = await _cache.HashGetAllAsync($"client:{userId}");
if (entries.Length > 0)
result.Add(ParseClient(entries));
}
return result;
}
#endregion
}
}

View File

@ -4,6 +4,7 @@ using Autofac.Extensions.DependencyInjection;
using ce.autofac.extension;
using IdentityServer4.AccessTokenValidation;
using Infrastructure;
using Infrastructure.Cache;
using Infrastructure.CloudSdk.minio;
using Infrastructure.CloudSdk.mqttmessagecenter;
using Infrastructure.Extensions.AutofacManager;
@ -217,6 +218,7 @@ namespace OpenAuth.WebApi
#region MemoryCache
services.AddMemoryCache();
services.AddSingleton<RedisCacheContext>();
#endregion
@ -327,10 +329,10 @@ namespace OpenAuth.WebApi
#region mqtt
services.AddSingleton<MqttMessageCenter>();
services.AddSingleton<IMqttMessageHandler, ThingRequestHandler>();
services.AddSingleton<IMqttMessageHandler, ThingServiceHandler>();
services.AddSingleton<IMqttMessageHandler, ThingOsdHandler>();
services.AddSingleton<IMqttMessageHandler, ThingEventHandler>();
//services.AddSingleton<IMqttMessageHandler, ThingRequestHandler>();
//services.AddSingleton<IMqttMessageHandler, ThingServiceHandler>();
//services.AddSingleton<IMqttMessageHandler, ThingOsdHandler>();
//services.AddSingleton<IMqttMessageHandler, ThingEventHandler>();
services.AddSingleton<IMqttMessageHandler, ThingDrcHandler>();
services.AddHostedService<MqttHostedService>();