Refactor SSE subscribers to use ConcurrentDictionary for thread safety and simplify response handling.

This commit is contained in:
2026-01-01 23:06:40 +03:30
parent 1d0b42654a
commit 365267a30a

View File

@@ -17,7 +17,7 @@ var app = builder.Build();
app.Urls.Clear();
app.Urls.Add("http://0.0.0.0:5030");
var subscribers = new ConcurrentDictionary<string, List<ChannelWriter<string>>>();
var subscribers = new ConcurrentDictionary<string, ConcurrentDictionary<ChannelWriter<string>, byte>>();
var cancellationSources =
new ConcurrentDictionary<string, CancellationTokenSource>(); // To manage cancellation per feedId
@@ -40,34 +40,23 @@ createApi.MapGet("/{feedId}", async (HttpContext context, string feedId) =>
{
var jsonQueryData = JsonSerializer.Serialize(dweet, AppJsonSerializerContext.Default.Dweet);
if (subscribers.TryGetValue(feedId, out var subscribersList))
foreach (var writer in subscribersList)
if (subscribers.TryGetValue(feedId, out var feedSubs))
foreach (var writer in feedSubs.Keys)
await writer.WriteAsync(jsonQueryData);
return Results.Ok(new AddDweetSucceededResponse
{ This = "succeeded", By = "dweeting", The = "dweet", With = dweet });
}
catch (Exception e)
{
var faultResponse = new AddDweetFailedResponse
{
This = "failed",
With = "WeMessedUp",
Because = "IDK, we couldnt dweet it. Report it at: https://github.com/mmahdium/HoolIt/issues"
};
var addFailedResponse =
JsonSerializer.Serialize(faultResponse, AppJsonSerializerContext.Default.AddDweetFailedResponse);
context.Response.StatusCode = 500;
context.Response.ContentType = "application/json";
await context.Response.WriteAsync(addFailedResponse);
await context.Response.CompleteAsync();
return Results.Json(
new AddDweetFailedResponse
{
This = "failed", With = "WeMessedUp",
Because = "IDK, we couldnt dweet it. Report it at: https://github.com/mmahdium/HoolIt/issues"
}, statusCode: 500);
}
var addSuccessResponse = new AddDweetSucceededResponse
{
This = "succeeded",
By = "dweeting",
The = "dweet",
With = dweet
};
return Results.Ok(addSuccessResponse);
});
var getLiveDataApi = app.MapGroup("/listen/for/dweets/from");
@@ -87,8 +76,10 @@ getLiveDataApi.MapGet("/{feedId}",
var channel = Channel.CreateUnbounded<string>();
var writer = channel.Writer;
subscribers.GetOrAdd(feedId, _ => new List<ChannelWriter<string>>()).Add(writer);
Console.WriteLine($"Added subscriber to feed {feedId}");
var feedSubs = subscribers.GetOrAdd(feedId, _ => new ConcurrentDictionary<ChannelWriter<string>, byte>());
feedSubs.TryAdd(writer, 0);
//Console.WriteLine($"Added subscriber to feed {feedId}");
try
{
@@ -104,14 +95,14 @@ getLiveDataApi.MapGet("/{feedId}",
}
catch (OperationCanceledException)
{
Console.WriteLine($"Cancellation requested for feed {feedId}");
//Console.WriteLine($"Cancellation requested for feed {feedId}");
}
finally
{
if (subscribers.TryGetValue(feedId, out var list))
{
list.Remove(writer);
Console.WriteLine($"Removed subscriber from feed {feedId}");
feedSubs.TryRemove(writer, out _);
//Console.WriteLine($"Removed subscriber from feed {feedId}");
if (list.Count == 0) subscribers.TryRemove(feedId, out _);
}
}