Skip to content

Commit

Permalink
Reimplemented looping + memory optimizations.
Browse files Browse the repository at this point in the history
  • Loading branch information
F0903 committed Mar 23, 2024
1 parent 25a79eb commit 5c4636d
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 73 deletions.
22 changes: 13 additions & 9 deletions Melodica/Services/Audio/FFmpegProcessor.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
using System.Diagnostics;
using System.Reflection.Metadata.Ecma335;
using Melodica.Services.Caching;
using System.Buffers;
using System.Diagnostics;
using Melodica.Services.Media;

namespace Melodica.Services.Audio;

public class FFmpegProcessor : IAsyncMediaProcessor
{
readonly static MemoryPool<byte> memory = MemoryPool<byte>.Shared;

Process? proc;

Stream? processInput;
Expand Down Expand Up @@ -60,7 +61,7 @@ public void SetPause(bool value)
else pauseWaiter.Set();
}

public async Task ProcessMediaAsync(PlayableMediaStream media, Stream output, Action? beforeInterruptionCallback, CancellationToken token)
public async Task ProcessMediaAsync(PlayableMediaStream media, Stream output, Action? onHalt, Action? onResume, CancellationToken token)
{
if (proc is null || proc.HasExited)
{
Expand All @@ -73,8 +74,9 @@ public async Task ProcessMediaAsync(PlayableMediaStream media, Stream output, Ac
void HandlePause()
{
if (!paused) return;
beforeInterruptionCallback?.Invoke();
onHalt?.Invoke();
pauseWaiter.WaitOne();
onResume?.Invoke();
}

const int bufferSize = 8 * 1024;
Expand All @@ -84,27 +86,29 @@ void HandlePause()
var inputTask = Task.Run(async () =>
{
int read = 0;
Memory<byte> buf = new byte[bufferSize];
using var mem = memory.Rent(bufferSize);
var buf = mem.Memory;
try
{
while ((read = await media.ReadAsync(buf, token)) != 0)
{
HandlePause();
await processInput!.WriteAsync(buf[..read], token);
}
}
}
finally
{
await processInput!.FlushAsync(token);
processInput!.Close();
}
}, token);

var outputTask = Task.Run(async () =>
{
int read = 0;
Memory<byte> buf = new byte[bufferSize];
using var mem = memory.Rent(bufferSize);
var buf = mem.Memory;
while ((read = await processOutput!.ReadAsync(buf, token)) != 0)
{
HandlePause();
Expand Down
2 changes: 1 addition & 1 deletion Melodica/Services/Audio/IAsyncMediaProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ namespace Melodica.Services.Audio;
public interface IAsyncMediaProcessor : IDisposable
{
public void SetPause(bool value);
public Task ProcessMediaAsync(PlayableMediaStream media, Stream output, Action? beforeInterruptionCallback, CancellationToken token);
public Task ProcessMediaAsync(PlayableMediaStream media, Stream output, Action? onHalt, Action? onResume, CancellationToken token);
}
6 changes: 4 additions & 2 deletions Melodica/Services/Caching/CachedMediaInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public CachedMediaInfo() : base("")
}

[NonSerialized]
private string cacheRoot;
private readonly string cacheRoot;

[NonSerialized]
public const string MetaFileExtension = ".meta";
Expand All @@ -32,7 +32,9 @@ public CachedMediaInfo() : base("")

public string MediaPath { get; init; }

public bool IsMediaComplete { get; set; }
public bool IsComplete { get; set; }

public bool IsWriting { get; set; }

public static async ValueTask<CachedMediaInfo> LoadFromDisk(string path)
{
Expand Down
2 changes: 1 addition & 1 deletion Melodica/Services/Caching/CachingModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ public sealed class CachingModule : DependencyModule
{
public override IServiceCollection Load() =>
new ServiceCollection()
.AddSingleton<MediaFileCache>();
.AddSingleton<IMediaCache, MediaFileCache>();
}
30 changes: 21 additions & 9 deletions Melodica/Services/Caching/MediaFileCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,26 @@ record struct CacheInfo(CachedMediaInfo CachedMediaInfo, long AccessCount);

private async Task LoadPreexistingFilesAsync()
{
foreach (var metaFile in Directory.EnumerateFileSystemEntries(cacheLocation, $"*{CachedMediaInfo.MetaFileExtension}", SearchOption.AllDirectories).Convert(x => new FileInfo(x)))
var dirEnumerator = Directory.EnumerateFileSystemEntries(cacheLocation, $"*{CachedMediaInfo.MetaFileExtension}", SearchOption.AllDirectories).Convert(x => new FileInfo(x));
await Parallel.ForEachAsync(dirEnumerator, async (metaFile, cancel) =>
{
try
{
var info = await CachedMediaInfo.LoadFromDisk(metaFile.FullName);
if (!info.IsComplete)
{
DeleteMedia(metaFile);
return;
}
info.IsWriting = false;
var id = info.Id ?? throw new Exception("Id was null.");
cache.Add(id, new(info, 0));
}
catch
{
DeleteMedia(metaFile);
}
}
});
}

static bool DeleteMedia(FileInfo file)
Expand Down Expand Up @@ -187,7 +194,7 @@ public Task<long> GetCacheSizeAsync()
{
var mediaInfo = cacheInfo.CachedMediaInfo;

if (!mediaInfo.IsMediaComplete)
if (!mediaInfo.IsComplete && !mediaInfo.IsWriting)
{
try
{
Expand All @@ -198,11 +205,15 @@ public Task<long> GetCacheSizeAsync()
return default;
}

cache[id] = cacheInfo with { AccessCount = cacheInfo.AccessCount + 1 };
var media = new PlayableMediaStream(File.OpenRead(mediaInfo.MediaPath), mediaInfo, null, null);
return media.WrapValueTask<PlayableMediaStream?>();
try
{
cache[id] = cacheInfo with { AccessCount = cacheInfo.AccessCount + 1 };
var fs = File.Open(mediaInfo.MediaPath, FileMode.Open, FileAccess.Read, FileShare.Read); // Stream closing will be handled later.
var media = new PlayableMediaStream(fs, mediaInfo, null, null);
return media.WrapValueTask<PlayableMediaStream?>();
}
catch { }
}

return default;
}

Expand All @@ -224,11 +235,12 @@ public async ValueTask<Stream> InitStreamableCache(MediaInfo info, bool pruneCac

var fileLegalId = id.ReplaceIllegalCharacters();
var mediaLocation = Path.Combine(cacheLocation, fileLegalId);
var file = File.OpenWrite(mediaLocation);
var file = File.Open(mediaLocation, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read);

var cachedMediaInfo = new CachedMediaInfo(mediaLocation, cacheLocation, info)
{
IsMediaComplete = false,
IsComplete = false,
IsWriting = true,
};
await cachedMediaInfo.WriteToDisk();

Expand Down
46 changes: 28 additions & 18 deletions Melodica/Services/Media/PlayableMediaStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,51 +22,61 @@ public class PlayableMediaStream(AsyncParameterizedLazyGetter<Stream, MediaInfo>

private Stream? cachingStream;

private bool cachingFinished;

public override bool CanRead { get; } = true;
public override bool CanSeek { get; } = false;
public override bool CanSeek => GetData().CanSeek || cachingFinished;
public override bool CanWrite { get; } = false;
public override long Length => GetData().Length;
public override long Position { get => GetData().Position; set => GetData().Position = value; }

public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
if (cachingFinished) // This path will only be used when looping an uncached song.
{
return await cachingStream!.ReadAsync(buffer, cancellationToken);
}

var data = await GetDataAsync();
var info = await GetInfoAsync();
var read = await data.ReadAsync(buffer, cancellationToken);

if (cachingProvider is not null)
{
if (cachingStream is not null)
{
await cachingStream.WriteAsync(buffer[..read], cancellationToken);

if (read == 0)
{
await cachingProvider.TryEditCacheInfo(info.Id, x =>
{
x.IsMediaComplete = true;
return x;
});
}
}
else
if (read == 0)
{
cachingStream = await cachingProvider.InitStreamableCache(await GetInfoAsync());
await cachingProvider.TryEditCacheInfo(info.Id, x =>
{
x.IsMediaComplete = false;
x.IsComplete = true;
x.IsWriting = false;
return x;
});
cachingFinished = true;
return 0;
}

cachingStream ??= await cachingProvider.InitStreamableCache(await GetInfoAsync());
await cachingStream.WriteAsync(buffer[..read], cancellationToken);
}

return read;
}

public override void Flush() => GetData().Flush();

public override async Task FlushAsync(CancellationToken cancellationToken) => await (await GetDataAsync()).FlushAsync(cancellationToken);

public override long Seek(long offset, SeekOrigin origin)
{
if (cachingFinished)
{
return cachingStream!.Seek(offset, origin);
}

return GetData().Seek(offset, origin);
}

public override int Read(byte[] buffer, int offset, int count) => throw new NotImplementedException("Use async read.");
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();

Expand Down
56 changes: 31 additions & 25 deletions Melodica/Services/Playback/Jukebox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public enum PlayResult

public bool Playing => !playLock.IsSet;

public bool Loop => Queue.Loop;
public bool Loop { get; private set; }

public bool Shuffle => Queue.Shuffle;

Expand Down Expand Up @@ -64,7 +64,7 @@ public async Task SetPausedAsync(bool value)

public async Task SetLoopAsync(bool value)
{
Queue.Loop = value;
Loop = value;
if (currentPlayer is not null)
{
await currentPlayer.SetButtonStateAsync(JukeboxInterfaceButton.Loop, value);
Expand Down Expand Up @@ -102,18 +102,28 @@ Task ResetState()
async Task SendDataAsync(PlayableMediaStream media, OpusEncodeStream output, CancellationToken token)
{
const int frameBytes = 3840;

try
{
using var memHandle = memory.Rent(frameBytes);
var buffer = memHandle.Memory;
durationTimer.Start();

await mediaProcessor.ProcessMediaAsync(media, output, async () =>
{
await output.WriteSilentFramesAsync();
await output.FlushAsync();
}, token);
await mediaProcessor.ProcessMediaAsync(
media,
output,
async () =>
{
durationTimer.Stop();
await output.WriteSilentFramesAsync();
await output.FlushAsync();
},
() =>
{
durationTimer.Start();
},
token
);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
Expand All @@ -124,9 +134,6 @@ await mediaProcessor.ProcessMediaAsync(media, output, async () =>
Log.Debug("Finished sending data... Flushing...");
await output.WriteSilentFramesAsync();
await output.FlushAsync(token);

media.Close();

durationTimer.Reset();
}
}
Expand Down Expand Up @@ -199,20 +206,29 @@ async Task ConnectAsync(IAudioChannel audioChannel)

async Task PlayNextAsync(IAudioChannel channel, OpusEncodeStream output)
{
if (Queue.IsEmpty)
{
await DisconnectAsync();
return;
}

var media = await Queue.DequeueAsync();
CurrentSong = await media.GetInfoAsync();

await currentPlayer!.SetSongEmbedAsync(CurrentSong, null); //TODO: Consider reimplementing collectionInfo / playlist info again.

var donePlaying = false;
try
{
stopper = new();
var stopToken = stopper.Token;
Log.Debug("Starting sending data..");
await SendDataAsync(media, output, stopToken);
do
{
await SendDataAsync(media, output, stopToken);
if (Loop && media.CanSeek) media.Seek(0, SeekOrigin.Begin);
} while (Loop);
}
catch (OperationCanceledException)
catch (OperationCanceledException)
{
Log.Debug("Caught operation cancelled exception in PlayNext.");
}
Expand All @@ -226,18 +242,8 @@ async Task PlayNextAsync(IAudioChannel channel, OpusEncodeStream output)
{
stopper?.Dispose();
stopper = null;
if (!Loop)
{
await media.DisposeAsync();
if (Queue.IsEmpty)
{
await DisconnectAsync();
donePlaying = true;
}
}
await media.DisposeAsync();
}
if (donePlaying)
return;

await PlayNextAsync(channel, output);
}
Expand Down
Loading

0 comments on commit 5c4636d

Please sign in to comment.