
一、前言
因为微服务引擎依赖于dotnetty组件,很多协议都是针对于dotnetty 进行扩展,然后对于老版本https://github.com/azure/dotnetty 停止更新后,本人下载源码进行修改更新,并且大家要求独立仓库进行开源,所以今天整理了代码开源至https://github.com/microsurging/DotNetty, 也希望大家一起贡献代码,让dotnetty 生态更强大。
HttpFlv:http://demo.kayakiot.cn:281/httpflv.html (黑衣人)
HttpFlv:http://demo.kayakiot.cn:281/httpflv1.html (大红包)
HttpFlv:http://demo.kayakiot.cn:281/httpflv2.html (鹿鼎记)
rtmp:rtmp://demo.kayakiot.cn:76/live1/livestream2 (黑衣人)
rtmp:rtmp://demo.kayakiot.cn:76/live1/livestream3 (大红包)
rtmp:rtmp://demo.kayakiot.cn:76/live1/livestream4(鹿鼎记)
注:测试服务器带宽只有8MB, httpflv 缓冲做的没有rtmp好,然后httpflv卡就多刷新几次
凯亚 (Kayak) 是什么?
凯亚(Kayak)是基于.NET8.0软件环境下的surging微服务引擎进行开发的, 平台包含了微服务和物联网平台。支持异步和响应式编程开发,功能包含了物模型,设备,产品,网络组件的统一管理和微服务平台下的注册中心,服务路由,模块,中间服务等管理。还有多协议适配(TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,等),通过灵活多样的配置适配能够接入不同厂家不同协议等设备。并且通过设备告警,消息通知,数据可视化等功能。能够让你能快速建立起微服务物联网平台系统。
凯亚物联网平台:http://demo.kayakiot.cn:3100(用户名:fanly 密码:123456)
链路跟踪Skywalking V8:http://117.72.121.2:8080/
surging 微服务引擎开源地址:https://github.com/fanliang11/surging(后面surging 会移动到microsurging进行维护)
二、ValueTask扩展支持
IValueTaskPromise:
public interface IValueTaskPromise: IPromise { bool IsVoid { get; } bool IsCompleted { get; } bool IsSuccess { get; } bool IsFaulted { get; } bool IsCanceled { get; } bool TryComplete(); void Complete(); bool TrySetException(Exception exception); bool TrySetException(IEnumerable<Exception> exceptions); void SetException(Exception exception); void SetException(IEnumerable<Exception> exceptions); bool TrySetCanceled(); void SetCanceled(); bool SetUncancellable(); IPromise Unvoid(); }
DefaultValueTaskPromise:
public class DefaultValueTaskPromise: IValueTaskPromise { private readonly CancellationToken _token; #if NET private readonly TaskCompletionSource _tcs; #else private readonly ManualResetValueTaskSource<object> _tcs; #endif private int v_uncancellable = SharedConstants.False; public DefaultValueTaskPromise() { _token = CancellationToken.None; #if NET _tcs = new TaskCompletionSource(); #else _tcs = new ManualResetValueTaskSource<object>(); #endif } public DefaultValueTaskPromise(object state) { #if NET _tcs = new TaskCompletionSource(state); #else _tcs = new ManualResetValueTaskSource<object>(state); #endif } public DefaultValueTaskPromise(CancellationToken cancellationToken) { _token= cancellationToken; } public ValueTask ValueTask { [MethodImpl(InlineMethod.AggressiveOptimization)] get => _tcs.AwaitVoid(_token); } public bool IsVoid => false; public bool IsSuccess => ValueTask.IsCompletedSuccessfully; public bool IsCompleted => ValueTask.IsCompleted; public bool IsFaulted => ValueTask.IsFaulted; public bool IsCanceled => ValueTask.IsCanceled; public Task Task => ValueTask.AsTask(); public virtual bool TryComplete() { #if NET return _tcs.TrySetResult(); #else return _tcs.SetResult(0); #endif } public virtual void Complete() { #if NET _tcs.SetResult(); #else _tcs.SetResult(0); #endif } public virtual void SetCanceled() { if (SharedConstants.False < (uint)Volatile.Read(ref v_uncancellable)) { return; } _tcs.SetCanceled(); } public virtual void SetException(Exception exception) { if (exception is AggregateException aggregateException) { SetException(aggregateException.InnerExceptions); return; } _tcs.SetException(exception); } public virtual void SetException(IEnumerable<Exception> exceptions) { _tcs.SetException(exceptions.FirstOrDefault()); } public virtual bool TrySetCanceled() { if (SharedConstants.False < (uint)Volatile.Read(ref v_uncancellable)) { return false; } _tcs.SetCanceled(); return true; } public virtual bool TrySetException(Exception exception) { if (exception is AggregateException aggregateException) { return TrySetException(aggregateException.InnerExceptions); } _tcs.SetException(exception); return true; } public virtual bool TrySetException(IEnumerable<Exception> exceptions) { _tcs.SetException(exceptions.FirstOrDefault()); return true; } public bool SetUncancellable() { if (SharedConstants.False >= (uint)Interlocked.CompareExchange(ref v_uncancellable, SharedConstants.True, SharedConstants.False)) { return true; } return !IsCompleted; } public override string ToString() => "TaskCompletionSource[status: " + ValueTask.AsTask().Status.ToString() + "]"; public IPromise Unvoid() => this; }
ManualResetValueTaskSource:
internal interface IStrongBox<T> { ref T Value { get; } bool RunContinuationsAsynchronously { get; set; } } public enum ContinuationOptions { None, ForceDefaultTaskScheduler } public class ManualResetValueTaskSource<T> : IStrongBox<ManualResetValueTaskSourceLogic<T>>, IValueTaskSource<T>, IValueTaskSource { private ManualResetValueTaskSourceLogic<T> _logic; private readonly Action _cancellationCallback; [MethodImpl(MethodImplOptions.AggressiveInlining)] public ManualResetValueTaskSource(ContinuationOptions options = ContinuationOptions.None) { _logic = new ManualResetValueTaskSourceLogic<T>(this, options,null); _cancellationCallback = SetCanceled; } public ManualResetValueTaskSource(object state, ContinuationOptions options = ContinuationOptions.None) { _logic = new ManualResetValueTaskSourceLogic<T>(this, options,state); _cancellationCallback = SetCanceled; } public short Version => _logic.Version; [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool SetResult(T result) { lock (_cancellationCallback) { if (_logic.Completed) { return false; } _logic.SetResult(result); return true; } } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void SetException(Exception error) { if (Monitor.TryEnter(_cancellationCallback)) { if (_logic.Completed) { Monitor.Exit(_cancellationCallback); return; } _logic.SetException(error); Monitor.Exit(_cancellationCallback); } } public void SetCanceled() => SetException(new TaskCanceledException()); public T GetResult(short token) => _logic.GetResult(token); void IValueTaskSource.GetResult(short token) => _logic.GetResult(token); public ValueTaskSourceStatus GetStatus(short token) => _logic.GetStatus(token); public bool RunContinuationsAsynchronously { get; set; } = true; public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _logic.OnCompleted(continuation, state, token, flags); ref ManualResetValueTaskSourceLogic<T> IStrongBox<ManualResetValueTaskSourceLogic<T>>.Value => ref _logic; [MethodImpl(MethodImplOptions.AggressiveInlining)] public ValueTask<T> AwaitValue(CancellationToken cancellation) { CancellationTokenRegistration? registration = cancellation == CancellationToken.None ? (CancellationTokenRegistration?)null : cancellation.Register(_cancellationCallback); return _logic.AwaitValue(this, registration); } public ValueTask AwaitVoid(CancellationToken cancellation) { CancellationTokenRegistration? registration = cancellation == CancellationToken.None ? (CancellationTokenRegistration?)null : cancellation.Register(_cancellationCallback); return _logic.AwaitVoid(this, registration); } public void Reset() => _logic.Reset(); } internal struct ManualResetValueTaskSourceLogic<TResult> { private static readonly Action<object> s_sentinel = s => throw new InvalidOperationException(); private readonly IStrongBox<ManualResetValueTaskSourceLogic<TResult>> _parent; private readonly ContinuationOptions _options; private Action<object> _continuation; private object _continuationState; private object _capturedContext; private ExecutionContext _executionContext; private bool _completed; private TResult _result; private ExceptionDispatchInfo _error; private CancellationTokenRegistration? _registration; public ManualResetValueTaskSourceLogic(IStrongBox<ManualResetValueTaskSourceLogic<TResult>> parent, ContinuationOptions options,object state) { _parent = parent ?? throw new ArgumentNullException(nameof(parent)); _options = options; _continuation = null; _continuationState = null; _capturedContext = null; _executionContext = null; _completed = state != null; _result =state==null? default(TResult): (TResult)state; _error = null; Version = 0; _registration = null; } public short Version { get; private set; } public bool Completed => _completed; private void ValidateToken(short token) { if (token != Version) { throw new InvalidOperationException(); } } public ValueTaskSourceStatus GetStatus(short token) { // ValidateToken(token); return !_completed ? ValueTaskSourceStatus.Pending : _error == null ? ValueTaskSourceStatus.Succeeded : _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled : ValueTaskSourceStatus.Faulted; } public TResult GetResult(short token) { // ValidateToken(token); if (!_completed) { return _result; } TResult result = _result; ExceptionDispatchInfo error = _error; Reset(); error?.Throw(); return result; } public void Reset() { Version++; _registration?.Dispose(); _completed = false; _continuation = null; _continuationState = null; _result = default(TResult); _error = null; _executionContext = null; _capturedContext = null; _registration = null; } public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) { if (continuation == null) { throw new ArgumentNullException(nameof(continuation)); } ValidateToken(token); if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0) { _executionContext = ExecutionContext.Capture(); } if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0) { SynchronizationContext sc = SynchronizationContext.Current; if (sc != null && sc.GetType() != typeof(SynchronizationContext)) { _capturedContext = sc; } else { TaskScheduler ts = TaskScheduler.Current; if (ts != TaskScheduler.Default) { _capturedContext = ts; } } } _continuationState = state; if (Interlocked.CompareExchange(ref _continuation, continuation, null) != null) { _executionContext = null; object cc = _capturedContext; _capturedContext = null; switch (cc) { case null: Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); break; case SynchronizationContext sc: sc.Post(s => { var tuple = (Tuple<Action<object>, object>)s; tuple.Item1(tuple.Item2); }, Tuple.Create(continuation, state)); break; case TaskScheduler ts: Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts); break; } } } public void SetResult(TResult result) { _result = result; SignalCompletion(); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void SetException(Exception error) { _error = ExceptionDispatchInfo.Capture(error); SignalCompletion(); } private void SignalCompletion() { if (_completed) { throw new InvalidOperationException("Double completion of completion source is prohibited"); } _completed = true; if (Interlocked.CompareExchange(ref _continuation, s_sentinel, null) != null) { if (_executionContext != null) { ExecutionContext.Run( _executionContext, s => ((IStrongBox<ManualResetValueTaskSourceLogic<TResult>>)s).Value.InvokeContinuation(), _parent ?? throw new InvalidOperationException()); } else { InvokeContinuation(); } } } private void InvokeContinuation() { object cc = _capturedContext; _capturedContext = null; if (_options == ContinuationOptions.ForceDefaultTaskScheduler) { cc = TaskScheduler.Default; } switch (cc) { case null: if (_parent.RunContinuationsAsynchronously) { var c = _continuation; if (_executionContext != null) { ThreadPool.QueueUserWorkItem(s => c(s), _continuationState); } else { ThreadPool.UnsafeQueueUserWorkItem(s => c(s), _continuationState); } } else { _continuation(_continuationState); } break; case SynchronizationContext sc: sc.Post(s => { ref ManualResetValueTaskSourceLogic<TResult> logicRef = ref ((IStrongBox<ManualResetValueTaskSourceLogic<TResult>>)s).Value; logicRef._continuation(logicRef._continuationState); }, _parent ?? throw new InvalidOperationException()); break; case TaskScheduler ts: Task.Factory.StartNew(_continuation, _continuationState, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts); break; } } public ValueTask<T> AwaitValue<T>(IValueTaskSource<T> source, CancellationTokenRegistration? registration) { _registration = registration; return new ValueTask<T>(source, Version); } public ValueTask AwaitVoid(IValueTaskSource source, CancellationTokenRegistration? registration) { _registration = registration; return new ValueTask(source, Version); } }
然后把DefaultPromise 替换成DefaultValueTaskPromise,如下图所示
三、扩展支持rtmp编解码
如下图所示 :
四、demo展示
开启了三个通道进行推流,cpu,内存都比较稳定
在凯亚物联网平台你也可以创建rtmp组件
视频中心