dotnetty 新的篇章- 开源


一、前言

      因为微服务引擎依赖于dotnetty组件,很多协议都是针对于dotnetty 进行扩展,然后对于老版本https://github.com/azure/dotnetty 停止更新后,本人下载源码进行修改更新,并且大家要求独立仓库进行开源,所以今天整理了代码开源至https://github.com/microsurging/DotNetty,  也希望大家一起贡献代码,让dotnetty 生态更强大。

HttpFlv:http://demo.kayakiot.cn:281/httpflv.html  (黑衣人)

 HttpFlv:http://demo.kayakiot.cn:281/httpflv1.html  (大红包)

HttpFlv:http://demo.kayakiot.cn:281/httpflv2.html  (鹿鼎记)

rtmp:rtmp://demo.kayakiot.cn:76/live1/livestream2   (黑衣人)

rtmp:rtmp://demo.kayakiot.cn:76/live1/livestream3   (大红包)

rtmp:rtmp://demo.kayakiot.cn:76/live1/livestream4(鹿鼎记)

注:测试服务器带宽只有8MB, httpflv  缓冲做的没有rtmp好,然后httpflv卡就多刷新几次

  凯亚 (Kayak) 是什么?

       凯亚(Kayak)是基于.NET8.0软件环境下的surging微服务引擎进行开发的, 平台包含了微服务和物联网平台。支持异步和响应式编程开发,功能包含了物模型,设备,产品,网络组件的统一管理和微服务平台下的注册中心,服务路由,模块,中间服务等管理。还有多协议适配(TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,等),通过灵活多样的配置适配能够接入不同厂家不同协议等设备。并且通过设备告警,消息通知,数据可视化等功能。能够让你能快速建立起微服务物联网平台系统。

     凯亚物联网平台:http://demo.kayakiot.cn:3100(用户名:fanly  密码:123456)

    链路跟踪Skywalking V8:http://117.72.121.2:8080/

      surging 微服务引擎开源地址:https://github.com/fanliang11/surging(后面surging 会移动到microsurging进行维护)

二、ValueTask扩展支持

IValueTaskPromise:

    public interface IValueTaskPromise: IPromise
    {   
            bool IsVoid { get; }

            bool IsCompleted { get; }

            bool IsSuccess { get; }

            bool IsFaulted { get; }

            bool IsCanceled { get; }

            bool TryComplete();

            void Complete();

            bool TrySetException(Exception exception);

            bool TrySetException(IEnumerable<Exception> exceptions);

            void SetException(Exception exception);

            void SetException(IEnumerable<Exception> exceptions);

            bool TrySetCanceled();

            void SetCanceled();

            bool SetUncancellable();

             IPromise Unvoid();
        }

DefaultValueTaskPromise:

    public class DefaultValueTaskPromise: IValueTaskPromise
    {
        private readonly CancellationToken _token;
#if NET
        private readonly TaskCompletionSource _tcs;
#else
        private readonly ManualResetValueTaskSource<object> _tcs;
#endif

        private int v_uncancellable = SharedConstants.False;

        public DefaultValueTaskPromise()
        {
            _token = CancellationToken.None;
#if NET
            _tcs = new TaskCompletionSource();
#else
            _tcs = new ManualResetValueTaskSource<object>();
#endif
        }

        public DefaultValueTaskPromise(object state)
        {
#if NET
            _tcs = new TaskCompletionSource(state);
#else
            _tcs = new ManualResetValueTaskSource<object>(state);
#endif
        }

        public DefaultValueTaskPromise(CancellationToken cancellationToken)
        {
            _token= cancellationToken;
        }



        public ValueTask ValueTask
        {
            [MethodImpl(InlineMethod.AggressiveOptimization)]
            get => _tcs.AwaitVoid(_token);
        }

        public bool IsVoid => false;

        public bool IsSuccess => ValueTask.IsCompletedSuccessfully;

        public bool IsCompleted => ValueTask.IsCompleted;

        public bool IsFaulted => ValueTask.IsFaulted;

        public bool IsCanceled => ValueTask.IsCanceled;

       public  Task  Task => ValueTask.AsTask();

        public virtual bool TryComplete()
        {
#if NET
            return _tcs.TrySetResult();
#else
            return _tcs.SetResult(0);
#endif
        }

        public virtual void Complete()
        {
#if NET
            _tcs.SetResult();
#else
            _tcs.SetResult(0);
#endif
        }
        public virtual void SetCanceled()
        {
            if (SharedConstants.False < (uint)Volatile.Read(ref v_uncancellable)) { return; }
            _tcs.SetCanceled();
        }

        public virtual void SetException(Exception exception)
        {
            if (exception is AggregateException aggregateException)
            {
                SetException(aggregateException.InnerExceptions);
                return;
            }
            _tcs.SetException(exception);
        }

        public virtual void SetException(IEnumerable<Exception> exceptions)
        {
            _tcs.SetException(exceptions.FirstOrDefault());
        }

        public virtual bool TrySetCanceled()
        {
            if (SharedConstants.False < (uint)Volatile.Read(ref v_uncancellable)) { return false; }
              _tcs.SetCanceled();
            return true;
        }

        public virtual bool TrySetException(Exception exception)
        {
            if (exception is AggregateException aggregateException)
            {
                return TrySetException(aggregateException.InnerExceptions);
            }
              _tcs.SetException(exception);
            return true;
        }

        public virtual bool TrySetException(IEnumerable<Exception> exceptions)
        {
              _tcs.SetException(exceptions.FirstOrDefault());
            return true;
        }

        public bool SetUncancellable()
        {
            if (SharedConstants.False >= (uint)Interlocked.CompareExchange(ref v_uncancellable, SharedConstants.True, SharedConstants.False))
            {
                return true;
            }
            return !IsCompleted;
        }

        public override string ToString() => "TaskCompletionSource[status: " + ValueTask.AsTask().Status.ToString() + "]";

        public IPromise Unvoid() => this;
         
    }

ManualResetValueTaskSource:

 internal interface IStrongBox<T>
 {
     ref T Value { get; }

     bool RunContinuationsAsynchronously { get; set; }
 }

 public enum ContinuationOptions
 {
     None,

     ForceDefaultTaskScheduler
 }

 public class ManualResetValueTaskSource<T> : IStrongBox<ManualResetValueTaskSourceLogic<T>>, IValueTaskSource<T>, IValueTaskSource
 {
     private ManualResetValueTaskSourceLogic<T> _logic;
     private readonly Action _cancellationCallback;

     [MethodImpl(MethodImplOptions.AggressiveInlining)]
     public ManualResetValueTaskSource(ContinuationOptions options = ContinuationOptions.None)
     {
         _logic = new ManualResetValueTaskSourceLogic<T>(this, options,null);
         _cancellationCallback = SetCanceled;
     }

     public ManualResetValueTaskSource(object state, ContinuationOptions options = ContinuationOptions.None)
     {
         _logic = new ManualResetValueTaskSourceLogic<T>(this, options,state);
         _cancellationCallback = SetCanceled;
     }

     public short Version => _logic.Version;


     [MethodImpl(MethodImplOptions.AggressiveInlining)]
     public bool SetResult(T result)
     {
         lock (_cancellationCallback)
         {
             if (_logic.Completed)
             {
                 return false;
             }

             _logic.SetResult(result);
             return true;
         }
     }

     [MethodImpl(MethodImplOptions.AggressiveInlining)]
     public void SetException(Exception error)
     {
         if (Monitor.TryEnter(_cancellationCallback))
         {
             if (_logic.Completed)
             {
                 Monitor.Exit(_cancellationCallback);
                 return;
             }

             _logic.SetException(error);
             Monitor.Exit(_cancellationCallback);
         }
     }

     public void SetCanceled() => SetException(new TaskCanceledException());

     public T GetResult(short token) => _logic.GetResult(token);

     void IValueTaskSource.GetResult(short token) => _logic.GetResult(token);

     public ValueTaskSourceStatus GetStatus(short token) => _logic.GetStatus(token);

     public bool RunContinuationsAsynchronously { get; set; } = true;

     public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _logic.OnCompleted(continuation, state, token, flags);

     ref ManualResetValueTaskSourceLogic<T> IStrongBox<ManualResetValueTaskSourceLogic<T>>.Value => ref _logic;

     [MethodImpl(MethodImplOptions.AggressiveInlining)]
     public ValueTask<T> AwaitValue(CancellationToken cancellation)
     {
         CancellationTokenRegistration? registration = cancellation == CancellationToken.None
             ? (CancellationTokenRegistration?)null
             : cancellation.Register(_cancellationCallback);
         return _logic.AwaitValue(this, registration);
     }

     public ValueTask AwaitVoid(CancellationToken cancellation)
     {
         CancellationTokenRegistration? registration = cancellation == CancellationToken.None
             ? (CancellationTokenRegistration?)null
             : cancellation.Register(_cancellationCallback);
         return _logic.AwaitVoid(this, registration);
     }

     public void Reset() => _logic.Reset();
 }

 internal struct ManualResetValueTaskSourceLogic<TResult>
 {
     private static readonly Action<object> s_sentinel = s => throw new InvalidOperationException();

     private readonly IStrongBox<ManualResetValueTaskSourceLogic<TResult>> _parent;
     private readonly ContinuationOptions _options;
     private Action<object> _continuation;
     private object _continuationState;
     private object _capturedContext;
     private ExecutionContext _executionContext;
     private bool _completed;
     private TResult _result;
     private ExceptionDispatchInfo _error;
     private CancellationTokenRegistration? _registration;
      
     public ManualResetValueTaskSourceLogic(IStrongBox<ManualResetValueTaskSourceLogic<TResult>> parent, ContinuationOptions options,object state)
     {
         _parent = parent ?? throw new ArgumentNullException(nameof(parent));
         _options = options;
         _continuation = null;
         _continuationState = null;
         _capturedContext = null;
         _executionContext = null;
         _completed = state != null;
         _result =state==null? default(TResult): (TResult)state;
         _error = null;
         Version = 0;
         _registration = null;
     }

     public short Version { get; private set; }

     public bool Completed => _completed;

     private void ValidateToken(short token)
     {
         if (token != Version)
         { 
             throw new InvalidOperationException();
         }
     }

     public ValueTaskSourceStatus GetStatus(short token)
     {
        // ValidateToken(token);

         return
             !_completed ? ValueTaskSourceStatus.Pending :
             _error == null ? ValueTaskSourceStatus.Succeeded :
             _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled :
             ValueTaskSourceStatus.Faulted;
     }

     public TResult GetResult(short token)
     {
        // ValidateToken(token);

         if (!_completed)
         {
             return _result;
         }

         TResult result = _result;
         ExceptionDispatchInfo error = _error;
         Reset();

         error?.Throw();
         return result;
     }

     public void Reset()
     {
         Version++;

         _registration?.Dispose();

         _completed = false;
         _continuation = null;
         _continuationState = null;
         _result = default(TResult);
         _error = null;
         _executionContext = null;
         _capturedContext = null;
         _registration = null;
     }

     public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
     {
         if (continuation == null)
         {
             throw new ArgumentNullException(nameof(continuation));
         }

         ValidateToken(token);


         if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0)
         {
             _executionContext = ExecutionContext.Capture();
         }

         if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0)
         {
             SynchronizationContext sc = SynchronizationContext.Current;
             if (sc != null && sc.GetType() != typeof(SynchronizationContext))
             {
                 _capturedContext = sc;
             }
             else
             {
                 TaskScheduler ts = TaskScheduler.Current;
                 if (ts != TaskScheduler.Default)
                 {
                     _capturedContext = ts;
                 }
             }
         }

         _continuationState = state;
         if (Interlocked.CompareExchange(ref _continuation, continuation, null) != null)
         {
             _executionContext = null;

             object cc = _capturedContext;
             _capturedContext = null;

             switch (cc)
             {
                 case null:
                     Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
                     break;

                 case SynchronizationContext sc:
                     sc.Post(s =>
                     {
                         var tuple = (Tuple<Action<object>, object>)s;
                         tuple.Item1(tuple.Item2);
                     }, Tuple.Create(continuation, state));
                     break;

                 case TaskScheduler ts:
                     Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
                     break;
             }
         }
     }

     public void SetResult(TResult result)
     {
         _result = result;
         SignalCompletion();
     }

     [MethodImpl(MethodImplOptions.AggressiveInlining)]
     public void SetException(Exception error)
     {
         _error = ExceptionDispatchInfo.Capture(error);
         SignalCompletion();
     }

     private void SignalCompletion()
     {
         if (_completed)
         {
             throw new InvalidOperationException("Double completion of completion source is prohibited");
         }

         _completed = true;

         if (Interlocked.CompareExchange(ref _continuation, s_sentinel, null) != null)
         {
             if (_executionContext != null)
             {
                 ExecutionContext.Run(
                     _executionContext,
                     s => ((IStrongBox<ManualResetValueTaskSourceLogic<TResult>>)s).Value.InvokeContinuation(),
                     _parent ?? throw new InvalidOperationException());
             }
             else
             {
                 InvokeContinuation();
             }
         }
     }

     private void InvokeContinuation()
     {
         object cc = _capturedContext;
         _capturedContext = null;

         if (_options == ContinuationOptions.ForceDefaultTaskScheduler)
         {
             cc = TaskScheduler.Default;
         }

         switch (cc)
         {
             case null:
                 if (_parent.RunContinuationsAsynchronously)
                 {
                     var c = _continuation;
                     if (_executionContext != null)
                     {
                         ThreadPool.QueueUserWorkItem(s => c(s), _continuationState);
                     }
                     else
                     {
                         ThreadPool.UnsafeQueueUserWorkItem(s => c(s),  _continuationState);
                     }
                 }
                 else
                 {
                     _continuation(_continuationState);
                 }
                 break;

             case SynchronizationContext sc:
                 sc.Post(s =>
                 {
                     ref ManualResetValueTaskSourceLogic<TResult> logicRef = ref ((IStrongBox<ManualResetValueTaskSourceLogic<TResult>>)s).Value;
                     logicRef._continuation(logicRef._continuationState);
                 }, _parent ?? throw new InvalidOperationException());
                 break;

             case TaskScheduler ts:
                 Task.Factory.StartNew(_continuation, _continuationState, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
                 break;
         }
     }

     public ValueTask<T> AwaitValue<T>(IValueTaskSource<T> source, CancellationTokenRegistration? registration)
     {
         _registration = registration;
         return new ValueTask<T>(source, Version);
     }

     public ValueTask AwaitVoid(IValueTaskSource source, CancellationTokenRegistration? registration)
     {
         _registration = registration;
         return new ValueTask(source, Version);
     }
 }

然后把DefaultPromise 替换成DefaultValueTaskPromise,如下图所示

三、扩展支持rtmp编解码

如下图所示 :

四、demo展示

开启了三个通道进行推流,cpu,内存都比较稳定

 在凯亚物联网平台你也可以创建rtmp组件

 

视频中心