|
|
|
|
@@ -1,7 +1,9 @@
|
|
|
|
|
using System.Collections.Concurrent;
|
|
|
|
|
using System.Runtime.CompilerServices;
|
|
|
|
|
using System.Text;
|
|
|
|
|
using System.Text.Json;
|
|
|
|
|
using System.Text.Json.Serialization;
|
|
|
|
|
using System.Threading.Channels;
|
|
|
|
|
using HoolIt.Models;
|
|
|
|
|
|
|
|
|
|
var builder = WebApplication.CreateSlimBuilder(args);
|
|
|
|
|
@@ -15,7 +17,7 @@ var app = builder.Build();
|
|
|
|
|
app.Urls.Clear();
|
|
|
|
|
app.Urls.Add("http://0.0.0.0:5030");
|
|
|
|
|
|
|
|
|
|
var subscribers = new ConcurrentDictionary<string, List<StreamWriter>>();
|
|
|
|
|
var subscribers = new ConcurrentDictionary<string, ConcurrentDictionary<ChannelWriter<string>, byte>>();
|
|
|
|
|
var cancellationSources =
|
|
|
|
|
new ConcurrentDictionary<string, CancellationTokenSource>(); // To manage cancellation per feedId
|
|
|
|
|
|
|
|
|
|
@@ -36,40 +38,25 @@ createApi.MapGet("/{feedId}", async (HttpContext context, string feedId) =>
|
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
var chunkedQueryData = JsonSerializer.Serialize(dweet, AppJsonSerializerContext.Default.Dweet);
|
|
|
|
|
var jsonQueryData = JsonSerializer.Serialize(dweet, AppJsonSerializerContext.Default.Dweet);
|
|
|
|
|
|
|
|
|
|
if (subscribers.TryGetValue(feedId, out var subscribersList))
|
|
|
|
|
foreach (var writer in subscribersList)
|
|
|
|
|
{
|
|
|
|
|
await writer.WriteLineAsync(chunkedQueryData);
|
|
|
|
|
await writer.FlushAsync();
|
|
|
|
|
}
|
|
|
|
|
if (subscribers.TryGetValue(feedId, out var feedSubs))
|
|
|
|
|
foreach (var writer in feedSubs.Keys)
|
|
|
|
|
await writer.WriteAsync(jsonQueryData);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return Results.Ok(new AddDweetSucceededResponse
|
|
|
|
|
{ This = "succeeded", By = "dweeting", The = "dweet", With = dweet });
|
|
|
|
|
}
|
|
|
|
|
catch (Exception e)
|
|
|
|
|
{
|
|
|
|
|
var faultResponse = new AddDweetFailedResponse()
|
|
|
|
|
{
|
|
|
|
|
This = "failed",
|
|
|
|
|
With = "WeMessedUp",
|
|
|
|
|
Because = "IDK, we couldnt dweet it. Report it at: https://github.com/mmahdium/HoolIt/issues"
|
|
|
|
|
};
|
|
|
|
|
var addFailedResponse =
|
|
|
|
|
JsonSerializer.Serialize(faultResponse, AppJsonSerializerContext.Default.AddDweetFailedResponse);
|
|
|
|
|
context.Response.StatusCode = 500;
|
|
|
|
|
context.Response.ContentType = "application/json";
|
|
|
|
|
await context.Response.WriteAsync(addFailedResponse);
|
|
|
|
|
await context.Response.CompleteAsync();
|
|
|
|
|
|
|
|
|
|
return Results.Json(
|
|
|
|
|
new AddDweetFailedResponse
|
|
|
|
|
{
|
|
|
|
|
This = "failed", With = "WeMessedUp",
|
|
|
|
|
Because = "IDK, we couldnt dweet it. Report it at: https://github.com/mmahdium/HoolIt/issues"
|
|
|
|
|
}, statusCode: 500);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var addSuccessResponse = new AddDweetSucceededResponse
|
|
|
|
|
{
|
|
|
|
|
This = "succeeded",
|
|
|
|
|
By = "dweeting",
|
|
|
|
|
The = "dweet",
|
|
|
|
|
With = dweet
|
|
|
|
|
};
|
|
|
|
|
return Results.Ok(addSuccessResponse);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
var getLiveDataApi = app.MapGroup("/listen/for/dweets/from");
|
|
|
|
|
@@ -77,54 +64,46 @@ getLiveDataApi.MapGet("/{feedId}",
|
|
|
|
|
async (HttpContext context, string feedId, IHostApplicationLifetime appLifetime,
|
|
|
|
|
CancellationToken reqCancellationToken) =>
|
|
|
|
|
{
|
|
|
|
|
context.Response.Headers.ContentType = "text/event-stream";
|
|
|
|
|
context.Response.StatusCode = 200;
|
|
|
|
|
context.Response.Headers.ContentType = "text/plain";
|
|
|
|
|
context.Response.Headers.CacheControl = "no-cache";
|
|
|
|
|
context.Response.Headers["X-Content-Type-Options"] = "nosniff";
|
|
|
|
|
|
|
|
|
|
var writer = new StreamWriter(context.Response.Body, Encoding.UTF8);
|
|
|
|
|
subscribers.GetOrAdd(feedId, _ => new List<StreamWriter>()).Add(writer);
|
|
|
|
|
// Disable response buffering
|
|
|
|
|
context.Features.Get<Microsoft.AspNetCore.Http.Features.IHttpResponseBodyFeature>()?
|
|
|
|
|
.DisableBuffering();
|
|
|
|
|
|
|
|
|
|
// How this cancellation token mess works:
|
|
|
|
|
// - reqCancellationToken is the cancellation token from the client request, it is used when a client closes the request.
|
|
|
|
|
// - feedCts is the cancellation token from the feedId, it is used when the app is shutting down.
|
|
|
|
|
// - linkedCts is a linked token source that combines both reqCancellationToken and feedCts and gets canceled when either of them does.
|
|
|
|
|
// When the app is shutting down, the feedCts token source is canceled which means everything gets canceled altogether (Even that date you have been planning for the past few months; c'mon, you are probably a computer science student with no friends who barely touches grass).
|
|
|
|
|
var channel = Channel.CreateUnbounded<string>();
|
|
|
|
|
var writer = channel.Writer;
|
|
|
|
|
|
|
|
|
|
var feedCts = cancellationSources.GetOrAdd(feedId, _ => new CancellationTokenSource());
|
|
|
|
|
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(reqCancellationToken, feedCts.Token);
|
|
|
|
|
var linkedToken = linkedCts.Token;
|
|
|
|
|
var feedSubs = subscribers.GetOrAdd(feedId, _ => new ConcurrentDictionary<ChannelWriter<string>, byte>());
|
|
|
|
|
feedSubs.TryAdd(writer, 0);
|
|
|
|
|
|
|
|
|
|
appLifetime.ApplicationStopping.Register(() =>
|
|
|
|
|
{
|
|
|
|
|
if (cancellationSources.TryGetValue(feedId, out var existingFeedCts) &&
|
|
|
|
|
!existingFeedCts.IsCancellationRequested)
|
|
|
|
|
{
|
|
|
|
|
// It cancels
|
|
|
|
|
existingFeedCts.Cancel();
|
|
|
|
|
Console.WriteLine($"Cancellation signaled for feedId: {feedId} due to app shutdown.");
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
//Console.WriteLine($"Added subscriber to feed {feedId}");
|
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
while (!linkedToken.IsCancellationRequested) await Task.Delay(Timeout.Infinite, linkedToken);
|
|
|
|
|
var reader = channel.Reader;
|
|
|
|
|
|
|
|
|
|
while (!reqCancellationToken.IsCancellationRequested &&
|
|
|
|
|
await reader.WaitToReadAsync(reqCancellationToken))
|
|
|
|
|
while (reader.TryRead(out var msg))
|
|
|
|
|
{
|
|
|
|
|
await context.Response.WriteAsync(msg + "\n", reqCancellationToken);
|
|
|
|
|
await context.Response.Body.FlushAsync(reqCancellationToken);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (OperationCanceledException)
|
|
|
|
|
{
|
|
|
|
|
Console.WriteLine($"SSE connection for feedId: {feedId} canceled.");
|
|
|
|
|
//Console.WriteLine($"Cancellation requested for feed {feedId}");
|
|
|
|
|
}
|
|
|
|
|
finally
|
|
|
|
|
{
|
|
|
|
|
if (subscribers.TryGetValue(feedId, out var subscribersList))
|
|
|
|
|
if (subscribers.TryGetValue(feedId, out var list))
|
|
|
|
|
{
|
|
|
|
|
subscribersList.Remove(writer);
|
|
|
|
|
if (subscribersList.Count == 0)
|
|
|
|
|
{
|
|
|
|
|
subscribers.TryRemove(feedId, out _);
|
|
|
|
|
cancellationSources.TryRemove(feedId, out _);
|
|
|
|
|
Console.WriteLine($"No more subscribers for feedId: {feedId}. CTS removed.");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
await writer.DisposeAsync();
|
|
|
|
|
Console.WriteLine("Removed subscriber from feed " + feedId);
|
|
|
|
|
feedSubs.TryRemove(writer, out _);
|
|
|
|
|
//Console.WriteLine($"Removed subscriber from feed {feedId}");
|
|
|
|
|
if (list.Count == 0) subscribers.TryRemove(feedId, out _);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|