using System;
using Unity.Burst;
using Unity.Jobs;
using Unity.Jobs.LowLevel.Unsafe;
using UnityEngine.Assertions;

namespace Unity.Collections.LowLevel.Unsafe
{
    [BurstCompatible]
    internal unsafe struct UnsafeStreamBlock
    {
        internal UnsafeStreamBlock* Next;
        internal fixed byte Data[1];
    }

    [BurstCompatible]
    internal unsafe struct UnsafeStreamRange
    {
        internal UnsafeStreamBlock* Block;
        internal int OffsetInFirstBlock;
        internal int ElementCount;

        /// One byte past the end of the last byte written
        internal int LastOffset;
        internal int NumberOfBlocks;
    }

    [BurstCompatible]
    internal unsafe struct UnsafeStreamBlockData
    {
        internal const int AllocationSize = 4 * 1024;
        internal AllocatorManager.AllocatorHandle Allocator;

        internal UnsafeStreamBlock** Blocks;
        internal int BlockCount;

        internal UnsafeStreamBlock* Free;

        internal UnsafeStreamRange* Ranges;
        internal int RangeCount;

        internal UnsafeStreamBlock* Allocate(UnsafeStreamBlock* oldBlock, int threadIndex)
        {
            Assert.IsTrue(threadIndex < BlockCount && threadIndex >= 0);

            UnsafeStreamBlock* block = (UnsafeStreamBlock*)Memory.Unmanaged.Allocate(AllocationSize, 16, Allocator);
            block->Next = null;

            if (oldBlock == null)
            {
                // Append our new block in front of the previous head.
                block->Next = Blocks[threadIndex];
                Blocks[threadIndex] = block;
            }
            else
            {
                oldBlock->Next = block;
            }

            return block;
        }
    }

    /// <summary>
    /// A set of untyped, append-only buffers. Allows for concurrent reading and concurrent writing without synchronization.
    /// </summary>
    /// <remarks>
    /// As long as each individual buffer is written in one thread and read in one thread, multiple
    /// threads can read and write the stream concurrently, *e.g.*
    /// while thread *A* reads from buffer *X* of a stream, thread *B* can read from
    /// buffer *Y* of the same stream.
    ///
    /// Each buffer is stored as a chain of blocks. When a write exceeds a buffer's current capacity, another block
    /// is allocated and added to the end of the chain. Effectively, expanding the buffer never requires copying the existing
    /// data (unlike, for example, with <see cref="NativeList{T}"/>).
    ///
    /// **All writing to a stream should be completed before the stream is first read. Do not write to a stream after the first read.**
    ///
    /// Writing is done with <see cref="NativeStream.Writer"/>, and reading is done with <see cref="NativeStream.Reader"/>.
    /// An individual reader or writer cannot be used concurrently across threads. Each thread must use its own.
    ///
    /// The data written to an individual buffer can be heterogeneous in type, and the data written
    /// to different buffers of a stream can be entirely different in type, number, and order. Just make sure
    /// that the code reading from a particular buffer knows what to expect to read from it.
    /// </remarks>
    [BurstCompatible]
    public unsafe struct UnsafeStream
        : INativeDisposable
    {
        [NativeDisableUnsafePtrRestriction]
        internal UnsafeStreamBlockData* m_Block;
        internal AllocatorManager.AllocatorHandle m_Allocator;

        /// <summary>
        /// Initializes and returns an instance of UnsafeStream.
        /// </summary>
        /// <param name="bufferCount">The number of buffers to give the stream. You usually want
        /// one buffer for each thread that will read or write the stream.</param>
        /// <param name="allocator">The allocator to use.</param>
        public UnsafeStream(int bufferCount, AllocatorManager.AllocatorHandle allocator)
        {
            AllocateBlock(out this, allocator);
            AllocateForEach(bufferCount);
        }

        /// <summary>
        /// Creates and schedules a job to allocate a new stream.
        /// </summary>
        /// <remarks>The stream can be used on the main thread after completing the returned job or used in other jobs that depend upon the returned job.
        ///
        /// Using a job to allocate the buffers can be more efficient, particularly for a stream with many buffers.
        /// </remarks>
        /// <typeparam name="T">Ignored.</typeparam>
        /// <param name="stream">Outputs the new stream.</param>
        /// <param name="bufferCount">A list whose length determines the number of buffers in the stream.</param>
        /// <param name="dependency">A job handle. The new job will depend upon this handle.</param>
        /// <param name="allocator">The allocator to use.</param>
        /// <returns>The handle of the new job.</returns>
        [NotBurstCompatible /* This is not burst compatible because of IJob's use of a static IntPtr. Should switch to IJobBurstSchedulable in the future */]
        public static JobHandle ScheduleConstruct<T>(out UnsafeStream stream, NativeList<T> bufferCount, JobHandle dependency, AllocatorManager.AllocatorHandle allocator)
            where T : unmanaged
        {
            AllocateBlock(out stream, allocator);
            var jobData = new ConstructJobList { List = (UntypedUnsafeList*)bufferCount.GetUnsafeList(), Container = stream };
            return jobData.Schedule(dependency);
        }

        /// <summary>
        /// Creates and schedules a job to allocate a new stream.
        /// </summary>
        /// <remarks>The stream can be used on the main thread after completing the returned job or used in other jobs that depend upon the returned job.
        ///
        /// Allocating the buffers in a job can be more efficient, particularly for a stream with many buffers.
        /// </remarks>
        /// <param name="stream">Outputs the new stream.</param>
        /// <param name="bufferCount">An array whose value at index 0 determines the number of buffers in the stream.</param>
        /// <param name="dependency">A job handle. The new job will depend upon this handle.</param>
        /// <param name="allocator">The allocator to use.</param>
        /// <returns>The handle of the new job.</returns>
        [NotBurstCompatible /* This is not burst compatible because of IJob's use of a static IntPtr. Should switch to IJobBurstSchedulable in the future */]
        public static JobHandle ScheduleConstruct(out UnsafeStream stream, NativeArray<int> bufferCount, JobHandle dependency, AllocatorManager.AllocatorHandle allocator)
        {
            AllocateBlock(out stream, allocator);
            var jobData = new ConstructJob { Length = bufferCount, Container = stream };
            return jobData.Schedule(dependency);
        }

        internal static void AllocateBlock(out UnsafeStream stream, AllocatorManager.AllocatorHandle allocator)
        {
            int blockCount = JobsUtility.MaxJobThreadCount;

            int allocationSize = sizeof(UnsafeStreamBlockData) + sizeof(UnsafeStreamBlock*) * blockCount;
            byte* buffer = (byte*)Memory.Unmanaged.Allocate(allocationSize, 16, allocator);
            UnsafeUtility.MemClear(buffer, allocationSize);

            var block = (UnsafeStreamBlockData*)buffer;

            stream.m_Block = block;
            stream.m_Allocator = allocator;

            block->Allocator = allocator;
            block->BlockCount = blockCount;
            block->Blocks = (UnsafeStreamBlock**)(buffer + sizeof(UnsafeStreamBlockData));

            block->Ranges = null;
            block->RangeCount = 0;
        }

        internal void AllocateForEach(int forEachCount)
        {
            long allocationSize = sizeof(UnsafeStreamRange) * forEachCount;
            m_Block->Ranges = (UnsafeStreamRange*)Memory.Unmanaged.Allocate(allocationSize, 16, m_Allocator);
            m_Block->RangeCount = forEachCount;
            UnsafeUtility.MemClear(m_Block->Ranges, allocationSize);
        }

        /// <summary>
        /// Returns true if this stream is empty.
        /// </summary>
        /// <returns>True if this stream is empty or the stream has not been constructed.</returns>
        public bool IsEmpty()
        {
            if (!IsCreated)
            {
                return true;
            }

            for (int i = 0; i != m_Block->RangeCount; i++)
            {
                if (m_Block->Ranges[i].ElementCount > 0)
                {
                    return false;
                }
            }

            return true;
        }

        /// <summary>
        /// Whether this stream has been allocated (and not yet deallocated).
        /// </summary>
        /// <remarks>Does not necessarily reflect whether the buffers of the stream have themselves been allocated.</remarks>
        /// <value>True if this stream has been allocated (and not yet deallocated).</value>
        public bool IsCreated => m_Block != null;

        /// <summary>
        /// The number of buffers in this stream.
        /// </summary>
        /// <value>The number of buffers in this stream.</value>
        public int ForEachCount => m_Block->RangeCount;

        /// <summary>
        /// Returns a reader of this stream.
        /// </summary>
        /// <returns>A reader of this stream.</returns>
        public Reader AsReader()
        {
            return new Reader(ref this);
        }

        /// <summary>
        /// Returns a writer of this stream.
        /// </summary>
        /// <returns>A writer of this stream.</returns>
        public Writer AsWriter()
        {
            return new Writer(ref this);
        }

        /// <summary>
        /// Returns the total number of items in the buffers of this stream.
        /// </summary>
        /// <remarks>Each <see cref="Writer.Write{T}"/> and <see cref="Writer.Allocate"/> call increments this number.</remarks>
        /// <returns>The total number of items in the buffers of this stream.</returns>
        public int Count()
        {
            int itemCount = 0;

            for (int i = 0; i != m_Block->RangeCount; i++)
            {
                itemCount += m_Block->Ranges[i].ElementCount;
            }

            return itemCount;
        }

        /// <summary>
        /// Returns a new NativeArray copy of this stream's data.
        /// </summary>
        /// <remarks>The length of the array will equal the count of this stream.
        ///
        /// Each buffer of this stream is copied to the array, one after the other.
        /// </remarks>
        /// <typeparam name="T">The type of values in the array.</typeparam>
        /// <param name="allocator">The allocator to use.</param>
        /// <returns>A new NativeArray copy of this stream's data.</returns>
        [BurstCompatible(GenericTypeArguments = new[] { typeof(int) })]
        public NativeArray<T> ToNativeArray<T>(AllocatorManager.AllocatorHandle allocator) where T : struct
        {
            var array = CollectionHelper.CreateNativeArray<T>(Count(), allocator, NativeArrayOptions.UninitializedMemory);
            var reader = AsReader();

            int offset = 0;
            for (int i = 0; i != reader.ForEachCount; i++)
            {
                reader.BeginForEachIndex(i);
                int rangeItemCount = reader.RemainingItemCount;
                for (int j = 0; j < rangeItemCount; ++j)
                {
                    array[offset] = reader.Read<T>();
                    offset++;
                }
                reader.EndForEachIndex();
            }

            return array;
        }

        void Deallocate()
        {
            if (m_Block == null)
            {
                return;
            }

            for (int i = 0; i != m_Block->BlockCount; i++)
            {
                UnsafeStreamBlock* block = m_Block->Blocks[i];
                while (block != null)
                {
                    UnsafeStreamBlock* next = block->Next;
                    Memory.Unmanaged.Free(block, m_Allocator);
                    block = next;
                }
            }

            Memory.Unmanaged.Free(m_Block->Ranges, m_Allocator);
            Memory.Unmanaged.Free(m_Block, m_Allocator);
            m_Block = null;
            m_Allocator = Allocator.None;
        }

        /// <summary>
        /// Releases all resources (memory).
        /// </summary>
        public void Dispose()
        {
            Deallocate();
        }

        /// <summary>
        /// Creates and schedules a job that will release all resources (memory and safety handles) of this stream.
        /// </summary>
        /// <param name="inputDeps">A job handle which the newly scheduled job will depend upon.</param>
        /// <returns>The handle of a new job that will release all resources (memory and safety handles) of this stream.</returns>
        [NotBurstCompatible /* This is not burst compatible because of IJob's use of a static IntPtr. Should switch to IJobBurstSchedulable in the future */]
        public JobHandle Dispose(JobHandle inputDeps)
        {
            var jobHandle = new DisposeJob { Container = this }.Schedule(inputDeps);

            m_Block = null;

            return jobHandle;
        }

        [BurstCompile]
        struct DisposeJob : IJob
        {
            public UnsafeStream Container;

            public void Execute()
            {
                Container.Deallocate();
            }
        }

        [BurstCompile]
        struct ConstructJobList : IJob
        {
            public UnsafeStream Container;

            [ReadOnly]
            [NativeDisableUnsafePtrRestriction]
            public UntypedUnsafeList* List;

            public void Execute()
            {
                Container.AllocateForEach(List->m_length);
            }
        }

        [BurstCompile]
        struct ConstructJob : IJob
        {
            public UnsafeStream Container;

            [ReadOnly]
            public NativeArray<int> Length;

            public void Execute()
            {
                Container.AllocateForEach(Length[0]);
            }
        }

        /// <summary>
        /// Writes data into a buffer of an <see cref="UnsafeStream"/>.
        /// </summary>
        /// <remarks>An individual writer can only be used for one buffer of one stream.
        /// Do not create more than one writer for an individual buffer.</remarks>
        [BurstCompatible]
        public unsafe struct Writer
        {
            [NativeDisableUnsafePtrRestriction]
            internal UnsafeStreamBlockData* m_BlockStream;

            [NativeDisableUnsafePtrRestriction]
            UnsafeStreamBlock* m_CurrentBlock;

            [NativeDisableUnsafePtrRestriction]
            byte* m_CurrentPtr;

            [NativeDisableUnsafePtrRestriction]
            byte* m_CurrentBlockEnd;

            internal int m_ForeachIndex;
            int m_ElementCount;

            [NativeDisableUnsafePtrRestriction]
            UnsafeStreamBlock* m_FirstBlock;

            int m_FirstOffset;
            int m_NumberOfBlocks;

            [NativeSetThreadIndex]
            int m_ThreadIndex;

            internal Writer(ref UnsafeStream stream)
            {
                m_BlockStream = stream.m_Block;
                m_ForeachIndex = int.MinValue;
                m_ElementCount = -1;
                m_CurrentBlock = null;
                m_CurrentBlockEnd = null;
                m_CurrentPtr = null;
                m_FirstBlock = null;
                m_NumberOfBlocks = 0;
                m_FirstOffset = 0;
                m_ThreadIndex = 0;
            }

            /// <summary>
            /// The number of buffers in the stream of this writer.
            /// </summary>
            /// <value>The number of buffers in the stream of this writer.</value>
            public int ForEachCount => m_BlockStream->RangeCount;

            /// <summary>
            /// Readies this writer to write to a particular buffer of the stream.
            /// </summary>
            /// <remarks>Must be called before using this writer. For an individual writer, call this method only once.
            ///
            /// When done using this writer, you must call <see cref="EndForEachIndex"/>.</remarks>
            /// <param name="foreachIndex">The index of the buffer to write.</param>
            public void BeginForEachIndex(int foreachIndex)
            {
                m_ForeachIndex = foreachIndex;
                m_ElementCount = 0;
                m_NumberOfBlocks = 0;
                m_FirstBlock = m_CurrentBlock;
                m_FirstOffset = (int)(m_CurrentPtr - (byte*)m_CurrentBlock);
            }

            /// <summary>
            /// Readies the buffer written by this writer for reading.
            /// </summary>
            /// <remarks>Must be called before reading the buffer written by this writer.</remarks>
            public void EndForEachIndex()
            {
                m_BlockStream->Ranges[m_ForeachIndex].ElementCount = m_ElementCount;
                m_BlockStream->Ranges[m_ForeachIndex].OffsetInFirstBlock = m_FirstOffset;
                m_BlockStream->Ranges[m_ForeachIndex].Block = m_FirstBlock;

                m_BlockStream->Ranges[m_ForeachIndex].LastOffset = (int)(m_CurrentPtr - (byte*)m_CurrentBlock);
                m_BlockStream->Ranges[m_ForeachIndex].NumberOfBlocks = m_NumberOfBlocks;
            }

            /// <summary>
            /// Write a value to a buffer.
            /// </summary>
            /// <remarks>The value is written to the buffer which was specified
            /// with <see cref="BeginForEachIndex"/>.</remarks>
            /// <typeparam name="T">The type of value to write.</typeparam>
            /// <param name="value">The value to write.</param>
            [BurstCompatible(GenericTypeArguments = new[] { typeof(int) })]
            public void Write<T>(T value) where T : struct
            {
                ref T dst = ref Allocate<T>();
                dst = value;
            }

            /// <summary>
            /// Allocate space in a buffer.
            /// </summary>
            /// <remarks>The space is allocated in the buffer which was specified
            /// with <see cref="BeginForEachIndex"/>.</remarks>
            /// <typeparam name="T">The type of value to allocate space for.</typeparam>
            /// <returns>A reference to the allocation.</returns>
            [BurstCompatible(GenericTypeArguments = new[] { typeof(int) })]
            public ref T Allocate<T>() where T : struct
            {
                int size = UnsafeUtility.SizeOf<T>();
                return ref UnsafeUtility.AsRef<T>(Allocate(size));
            }

            /// <summary>
            /// Allocate space in a buffer.
            /// </summary>
            /// <remarks>The space is allocated in the buffer which was specified
            /// with <see cref="BeginForEachIndex"/>.</remarks>
            /// <param name="size">The number of bytes to allocate.</param>
            /// <returns>The allocation.</returns>
            public byte* Allocate(int size)
            {
                byte* ptr = m_CurrentPtr;
                m_CurrentPtr += size;

                if (m_CurrentPtr > m_CurrentBlockEnd)
                {
                    UnsafeStreamBlock* oldBlock = m_CurrentBlock;

                    m_CurrentBlock = m_BlockStream->Allocate(oldBlock, m_ThreadIndex);
                    m_CurrentPtr = m_CurrentBlock->Data;

                    if (m_FirstBlock == null)
                    {
                        m_FirstOffset = (int)(m_CurrentPtr - (byte*)m_CurrentBlock);
                        m_FirstBlock = m_CurrentBlock;
                    }
                    else
                    {
                        m_NumberOfBlocks++;
                    }

                    m_CurrentBlockEnd = (byte*)m_CurrentBlock + UnsafeStreamBlockData.AllocationSize;
                    ptr = m_CurrentPtr;
                    m_CurrentPtr += size;
                }

                m_ElementCount++;

                return ptr;
            }
        }

        /// <summary>
        /// Reads data from a buffer of an <see cref="UnsafeStream"/>.
        /// </summary>
        /// <remarks>An individual reader can only be used for one buffer of one stream.
        /// Do not create more than one reader for an individual buffer.</remarks>
        [BurstCompatible]
        public unsafe struct Reader
        {
            [NativeDisableUnsafePtrRestriction]
            internal UnsafeStreamBlockData* m_BlockStream;

            [NativeDisableUnsafePtrRestriction]
            internal UnsafeStreamBlock* m_CurrentBlock;

            [NativeDisableUnsafePtrRestriction]
            internal byte* m_CurrentPtr;

            [NativeDisableUnsafePtrRestriction]
            internal byte* m_CurrentBlockEnd;

            internal int m_RemainingItemCount;
            internal int m_LastBlockSize;

            internal Reader(ref UnsafeStream stream)
            {
                m_BlockStream = stream.m_Block;
                m_CurrentBlock = null;
                m_CurrentPtr = null;
                m_CurrentBlockEnd = null;
                m_RemainingItemCount = 0;
                m_LastBlockSize = 0;
            }

            /// <summary>
            /// Readies this reader to read a particular buffer of the stream.
            /// </summary>
            /// <remarks>Must be called before using this reader. For an individual reader, call this method only once.
            ///
            /// When done using this reader, you must call <see cref="EndForEachIndex"/>.</remarks>
            /// <param name="foreachIndex">The index of the buffer to read.</param>
            /// <returns>The number of remaining elements to read from the buffer.</returns>
            public int BeginForEachIndex(int foreachIndex)
            {
                m_RemainingItemCount = m_BlockStream->Ranges[foreachIndex].ElementCount;
                m_LastBlockSize = m_BlockStream->Ranges[foreachIndex].LastOffset;

                m_CurrentBlock = m_BlockStream->Ranges[foreachIndex].Block;
                m_CurrentPtr = (byte*)m_CurrentBlock + m_BlockStream->Ranges[foreachIndex].OffsetInFirstBlock;
                m_CurrentBlockEnd = (byte*)m_CurrentBlock + UnsafeStreamBlockData.AllocationSize;

                return m_RemainingItemCount;
            }

            /// <summary>
            /// Does nothing.
            /// </summary>
            /// <remarks>Included only for consistency with <see cref="NativeStream"/>.</remarks>
            public void EndForEachIndex()
            {
            }

            /// <summary>
            /// The number of buffers in the stream of this reader.
            /// </summary>
            /// <value>The number of buffers in the stream of this reader.</value>
            public int ForEachCount => m_BlockStream->RangeCount;

            /// <summary>
            /// The number of items not yet read from the buffer.
            /// </summary>
            /// <value>The number of items not yet read from the buffer.</value>
            public int RemainingItemCount => m_RemainingItemCount;

            /// <summary>
            /// Returns a pointer to the next position to read from the buffer. Advances the reader some number of bytes.
            /// </summary>
            /// <param name="size">The number of bytes to advance the reader.</param>
            /// <returns>A pointer to the next position to read from the buffer.</returns>
            /// <exception cref="System.ArgumentException">Thrown if the reader has been advanced past the end of the buffer.</exception>
            public byte* ReadUnsafePtr(int size)
            {
                m_RemainingItemCount--;

                byte* ptr = m_CurrentPtr;
                m_CurrentPtr += size;

                if (m_CurrentPtr > m_CurrentBlockEnd)
                {
                    m_CurrentBlock = m_CurrentBlock->Next;
                    m_CurrentPtr = m_CurrentBlock->Data;

                    m_CurrentBlockEnd = (byte*)m_CurrentBlock + UnsafeStreamBlockData.AllocationSize;

                    ptr = m_CurrentPtr;
                    m_CurrentPtr += size;
                }

                return ptr;
            }

            /// <summary>
            /// Reads the next value from the buffer.
            /// </summary>
            /// <remarks>Each read advances the reader to the next item in the buffer.</remarks>
            /// <typeparam name="T">The type of value to read.</typeparam>
            /// <returns>A reference to the next value from the buffer.</returns>
            [BurstCompatible(GenericTypeArguments = new[] { typeof(int) })]
            public ref T Read<T>() where T : struct
            {
                int size = UnsafeUtility.SizeOf<T>();
                return ref UnsafeUtility.AsRef<T>(ReadUnsafePtr(size));
            }

            /// <summary>
            /// Reads the next value from the buffer. Does not advance the reader.
            /// </summary>
            /// <typeparam name="T">The type of value to read.</typeparam>
            /// <returns>A reference to the next value from the buffer.</returns>
            [BurstCompatible(GenericTypeArguments = new[] { typeof(int) })]
            public ref T Peek<T>() where T : struct
            {
                int size = UnsafeUtility.SizeOf<T>();

                byte* ptr = m_CurrentPtr;
                if (ptr + size > m_CurrentBlockEnd)
                {
                    ptr = m_CurrentBlock->Next->Data;
                }

                return ref UnsafeUtility.AsRef<T>(ptr);
            }

            /// <summary>
            /// Returns the total number of items in the buffers of the stream.
            /// </summary>
            /// <returns>The total number of items in the buffers of the stream.</returns>
            public int Count()
            {
                int itemCount = 0;
                for (int i = 0; i != m_BlockStream->RangeCount; i++)
                {
                    itemCount += m_BlockStream->Ranges[i].ElementCount;
                }

                return itemCount;
            }
        }
    }
}