2009年9月21日月曜日

MapReduce を C# で 05

括弧の数減らすのは断念。Map 処理やってるところと Reduce 処理やってるところが一覧できたほうがいいと思ったため。Parallel な Foreach を導入。列挙と中身の処理が 1 スレッドだったのが複数スレッド対応に。ここまで設計できれば、Java や GAE へのポーティングもラクになるはず…。

// あ、IDisposable !…後でいいか。(^_^;)



/// <summary>
/// パイプライン実行を可能にするための列挙子。
/// </summary>
/// <remarks>IEnumerator は生成後のソース変更を許可しないため、このようなインターフェースを用意。</remarks>
/// <typeparam name="T"></typeparam>
public interface ParallelIterator<T>
: IEnumerator<T>, IDisposable, IEnumerator
{
bool TryNext(out T result);
new void Reset();
}

public interface ParallelIterable<T>
: IEnumerable<T>, IEnumerable
{
ParallelIterator<T> GetIterator();
}

public interface IParallelCollection<T>
: ParallelIterable<T>, IEnumerable<T>, IEnumerable
{
int Count { get; }
bool IsReadOnly { get; }
void Add(T item);
void Clear();
bool Contains(T item);
void CopyTo(T[] array, int arrayIndex);
bool Remove(T item);

/// <summary>
/// 更新終了。
/// </summary>
bool Fixed { get; set; }
}

public interface IParallelList<T>
: IParallelCollection<T>, ParallelIterable<T>, IEnumerable<T>, IEnumerable
{
T this[int index] { get; set; }
int IndexOf(T item);
void Insert(int index, T item);
void RemoveAt(int index);
}

public interface IParallelDictionary<TKey, TValue>
: IParallelCollection<KeyValuePair<TKey, TValue>>, ParallelIterable<KeyValuePair<TKey, TValue>>,
IEnumerable<KeyValuePair<TKey, TValue>>, IEnumerable
{
IParallelCollection<TKey> Keys { get; }
IParallelCollection<TValue> Values { get; }
TValue this[TKey key] { get; set; }
void Add(TKey key, TValue value);
bool ContainsKey(TKey key);
bool Remove(TKey key);
bool TryGetValue(TKey key, out TValue value);

// NOTE: ParallelIterator&lt;T&gt; の Fixed 実装時は、 Keys や Values の中身も忘れずに Fixed すること。
}

public interface IMapper<TInKey, TInValue, TOutKey, TIntermediateValue>
{
ParallelIterable<KeyValuePair<TOutKey, TIntermediateValue>> Map(TInKey inkey, TInValue inValue);
int BufferSize { get; set; }
TimeSpan BufferFlushTimeout { get; set; }
}

public interface IReducer<TOutKey, TIntermediateValue, TOutValue>
{
ParallelIterable<TOutValue> Reduce(TOutKey outKey, ParallelIterable<TIntermediateValue> intermediateValues);
int BufferSize { get; set; }
TimeSpan BufferFlushTimeout { get; set; }
}

public interface IOperation
{
void Wait();
bool Wait(int millisecondsTimeout);
bool Wait(TimeSpan timeout);
}

public interface ITaskQueue
{
IOperation AddTask(Delegate action, params object[] state);
}

// TODO: IDisposable 実装!
public sealed class MapReduceManager<TInKey, TInValue, TOutKey, TIntermediateValue, TOutValue>
{
// 各実装に合わせた中間値保存、出力値保存用のストレージ取得処理。
public Func<IParallelCollection<TIntermediateValue>> NewIntermediateValueStorage { get; set; }
public Func<IParallelCollection<TOutValue>> NewOutValueStorage { get; set; }

// 各実装にあわせた MapReduce 処理と中間値参照用のリポジトリ。
public IMapper<TInKey, TInValue, TOutKey, TIntermediateValue> Mapper { get; set; }
public IReducer<TOutKey, TIntermediateValue, TOutValue> Reducer { get; set; }
public IParallelDictionary<TOutKey, IParallelCollection<TIntermediateValue>> Repository { get; set; }

// パイプライン実行のためのタスクキューおよび実行ハンドラ。
public ITaskQueue TaskQueue { get; set; }
public Func<IParallelCollection<IOperation>> NewOperations { get; set; }

public IEnumerable<TOutValue> DoProcess(TInKey inKey, TInValue inValue)
{
var outValues = this.NewOutValueStorage();
var reducingOperations = this.NewOperations();

// Map 処理
var mapperParallelLoop = this.NewMapperParallelLoop();
mapperParallelLoop
.Foreach(this.Mapper.Map(inKey, inValue),
new Action<KeyValuePair<TOutKey, TIntermediateValue>>(
intermediate =>
{
if (!this.Repository.ContainsKey(intermediate.Key))
{
var intermediateValues = this.NewIntermediateValueStorage();
intermediateValues.Add(intermediate.Value);
this.Repository.Add(intermediate.Key, intermediateValues);

// Reduce 処理
reducingOperations.Add(
this.TaskQueue.AddTask(
new Action(
() =>
{
var reducerParallelLoop = this.NewReducerParallelLoop();

reducerParallelLoop
.Foreach(this.Reducer.Reduce(intermediate.Key, this.Repository[intermediate.Key]),
new Action<TOutValue>(
outValue =>
{
outValues.Add(outValue);
}
)
);

reducerParallelLoop.WaitAll();
}
)
)
);
}
else
{
this.Repository[intermediate.Key].Add(intermediate.Value);
}
}
)
);

// Map 処理が一通り終わったところで、出力の列挙開始ができるように。
this.TaskQueue.AddTask(
new Action(
()=>
{
mapperParallelLoop.WaitAll();
this.Repository.Fixed = true;
reducingOperations.Fixed = true;
foreach (var operation in reducingOperations)
{
operation.Wait();
}
outValues.Fixed = true;
}
)
);

return outValues;
}


private ParallelLoop<KeyValuePair<TOutKey, TIntermediateValue>> NewMapperParallelLoop()
{
var mapperParallelLoop = new ParallelLoop<KeyValuePair<TOutKey, TIntermediateValue>>();
mapperParallelLoop.TaskQueue = this.TaskQueue;
mapperParallelLoop.NewOperations = this.NewOperations;
mapperParallelLoop.BufferSize = this.Mapper.BufferSize;
mapperParallelLoop.BufferFlushTimeout = this.Mapper.BufferFlushTimeout;
return mapperParallelLoop;
}

private ParallelLoop<TOutValue> NewReducerParallelLoop()
{
var reducerParallelLoop = new ParallelLoop<TOutValue>();
reducerParallelLoop.TaskQueue = this.TaskQueue;
reducerParallelLoop.NewOperations = this.NewOperations;
reducerParallelLoop.BufferSize = this.Reducer.BufferSize;
reducerParallelLoop.BufferFlushTimeout = this.Reducer.BufferFlushTimeout;
return reducerParallelLoop;
}
}

public class ParallelLoop<T>
{
public ITaskQueue TaskQueue { get; set; }
public Func<IParallelCollection<IOperation>> NewOperations { get; set; }
public int BufferSize { get; set; }
public TimeSpan BufferFlushTimeout { get; set; }

private IParallelCollection<IOperation> operations;
private ParallelIterator<IOperation> operationsIterator;
private IParallelCollection<IOperation> innerOperations;

public ParallelLoop()
{
this.BufferSize = 1;
this.BufferFlushTimeout = new TimeSpan(TimeSpan.TicksPerMinute);
}

public void Foreach(ParallelIterable<T> iterable, Action<T> action)
{
this.operations = this.NewOperations();
this.operationsIterator = this.operations.GetIterator();
this.innerOperations = this.NewOperations();

var iterator = iterable.GetIterator();
bool hasNext = true;

int current = 0;
while (hasNext)
{
// 列挙とその値を使った処理をパイプライン化
this.operations.Add(
this.TaskQueue.AddTask(
new Action(
() =>
{
T value = default(T);
hasNext = iterator.TryNext(out value);
if (hasNext)
{
this.innerOperations.Add(this.TaskQueue.AddTask(new Action(() => action(value))));
}
}
)
)
);

// 不必要な operation が増えすぎないよう、実装ごとの BufferSize で処理を Flush する。
if (this.BufferSize < ++current)
{
this.operations.Fixed = true;
IOperation operation = null;
while (this.operationsIterator.TryNext(out operation))
{
if (!operation.Wait(this.BufferFlushTimeout))
{
this.operations.Fixed = false;
this.operations.Add(operation);
this.operations.Fixed = true;

break;
}
}
this.operations.Fixed = false;

current = 0;
}
}


this.operations.Fixed = true;
}

public void WaitAll()
{
IOperation operation = null;
while (this.operationsIterator.TryNext(out operation))
{
operation.Wait();
}

this.innerOperations.Fixed = true;
var innerOperationsIterator = this.innerOperations.GetIterator();
while (innerOperationsIterator.TryNext(out operation))
{
operation.Wait();
}
}
}

1 件のコメント:

  1. スレッドプール使った場合、他のタスクを同期して待つタスクをキューにいれてはまずかった。あぁ・・・。

    返信削除