テラシュールブログ

旧テラシュールウェアブログUnity記事。主にUnityのTipsやAR・VR、ニコニコ動画についてのメモを残します。

【Unity】JobSystemが動作しているスレッドの番号を取得する

複数のジョブで処理を行う時、全てのジョブが同じバッファに格納しようとすると当然競合を起こします。コレを回避するために排他処理を行う訳ですが、それを行わず計算結果を格納する方法を考えてみます。

ジョブ毎に計算結果を格納する対象を切り替える

特に何も考えずに全てのジョブから特定の要素に書き込む場合、同時に書き込んだり、計算の前提となる状態が変わったりして問題になります。この挙動はJobSystemでは実行時にエラーになります。

f:id:tsubaki_t1:20191204225339g:plain
ジョブが同じ場所に書き込んでしまう

この問題は 異なるスレッドから同じ要素に書き込む事で起こるので、スレッド毎に読み書きする要素を決めて、処理を行う方法を考えてみます。

f:id:tsubaki_t1:20191204225632g:plain
ジョブ毎に異なる要素に書き込む

利用するAPI

まず重要なのが [NativeSetThreadIndex] int threadIndex; をジョブに定義することです。この記述でジョブが動作しているスレッドの番号が取得できます。注意として1~4といったWorkerThreadが動作している番号ではなく、0~128といったスレッドの内のドレかが入ります。
このthreadIndexを元に情報を格納したり、取得したりします。

この時、ジョブが読み書きするNativeArrayにはNativeDisableContainerSafetyRestriction を設定します。コレを設定しなければ、ジョブシステムは複数のスレッドから一つの要素を操作する可能性があるとしてエラーを出してしまいます。今回はそれは起こらない予定なので上記の設定が使用できます。

ジョブの実装はこんな感じです。全てのジョブが各々の担当する要素に+1しているだけです。

    [BurstCompile]
    struct CountJob : IJobParallelFor
    {
        [NativeSetThreadIndex]
        int threadIndex;

        [NativeDisableContainerSafetyRestriction]
        public NativeArray<int> array;

        public void Execute(int index)
        {
            var count = array[threadIndex];
            count = count + 1;
            array[threadIndex] = count;
        }
    }

あとはジョブを使用する側です。普通にNativeArrayを生成するのですが、要素数JobsUtility.MaxJobThreadCountを使用します。threadIndexが返す数が0~128なので、その要素のどれでも格納出来るようにする必要がある為、この分量の要素を生成しています。

void Update()
{
    handle.Complete();

    var array = new NativeArray<int>(JobsUtility.MaxJobThreadCount, Allocator.TempJob);
    handle = new CountJob { array = array }.Schedule(24576, 8);
    array.Dispose(handle);

    JobHandle.ScheduleBatchedJobs();
}

なお最終的な集計はJobsUtility.MaxJobThreadCountの数の要素を集計する感じになります。ここはイケてない気もしますが、数千の要素をジョブの数だけ分割計算し、最終的な集計は128の要素を舐めれば良い(しかもBurstによる最適化が効きやすい構造)なので、まぁ悪くはないかなと言う認識です。ワーカースレッドの番号を取得できればもうちょっと納得行く感じではありますが。

コード

using UnityEngine;
using Unity.Jobs;
using Unity.Jobs.LowLevel.Unsafe;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
using Unity.Burst;

public class PositionUpdateSystem : MonoBehaviour
{
    JobHandle handle;

    void OnDestroy()
    {
        handle.Complete();
    }

    void Update()
    {
        handle.Complete();

        var array = new NativeArray<int>(JobsUtility.MaxJobThreadCount, Allocator.TempJob);
        var counter = new NativeArray<int>(1, Allocator.TempJob);

        handle = new CountJob { array = array }.Schedule(24576, 8);
        handle = new GatherJob { array = array, counter = counter }.Schedule(handle);
        handle = new LogJob { counter = counter }.Schedule(handle);

        array.Dispose(handle);
        counter.Dispose(handle);

        JobHandle.ScheduleBatchedJobs();
    }

    [BurstCompile]
    struct CountJob : IJobParallelFor
    {
        [NativeSetThreadIndex]
        int threadIndex;

        [NativeDisableContainerSafetyRestriction]
        public NativeArray<int> array;

        public void Execute(int index)
        {
            var count = array[threadIndex];
            count = count + 1;
            array[threadIndex] = count;
        }
    }

    [BurstCompile]
    struct GatherJob : IJob
    {
        [ReadOnly]
        public NativeArray<int> array;

        [WriteOnly]
        public NativeArray<int> counter;

        public void Execute()
        {
            var count = 0;
            for (int index = 0; index < array.Length; index++)
            {
                count = count + array[index];
            }
            counter[0] = count;
        }
    }

    struct LogJob : IJob
    {
        [ReadOnly]
        public NativeArray<int> counter;

        public void Execute()
        {
            Debug.Log(counter[0]);
        }
    }
}

感想

NativeQueueのNativeArray変換や、NativeListの並列書込が使えれば楽なんですが… UnsafeList君、君には期待しているよ(未検証