2009年9月19日土曜日

MapReduce を C# で 01

インターフェースはこんな感じでいいのかな。


/// <summary>
/// 次の値が取得可能であれば取得するための列挙子。
/// </summary>
/// <remarks>IEnumerator は生成後のソース変更を許可しないため、このようなインターフェースを用意。</remarks>
/// <typeparam name="T"></typeparam>
public interface ParallelIterator<T>
: IEnumerator<T>, IDisposable, IEnumerator
{
bool TryNext(out T result);
new void Reset();
}

public interface ParallelIterable<T>
: IEnumerable<T>, IEnumerable
{
ParallelIterator<T> GetIterator();
}

public interface IParallelCollection<T>
: ParallelIterable<T>, IEnumerable<T>, IEnumerable
{
int Count { get; }
bool IsReadOnly { get; }
void Add(T item);
void Clear();
bool Contains(T item);
void CopyTo(T[] array, int arrayIndex);
bool Remove(T item);
}

public interface IParallelList<T>
: IParallelCollection<T>, ParallelIterable<T>, IEnumerable<T>, IEnumerable
{
T this[int index] { get; set; }
int IndexOf(T item);
void Insert(int index, T item);
void RemoveAt(int index);
}

public interface IParallelDictionary<TKey, TValue>
: IParallelCollection<KeyValuePair<TKey, TValue>>, ParallelIterable<KeyValuePair<TKey, TValue>>,
IEnumerable<KeyValuePair<TKey, TValue>>, IEnumerable
{
ICollection<TKey> Keys { get; }
ICollection<TValue> Values { get; }
TValue this[TKey key] { get; set; }
void Add(TKey key, TValue value);
bool ContainsKey(TKey key);
bool Remove(TKey key);
bool TryGetValue(TKey key, out TValue value);
}

/// <summary>
/// 列挙終了を外部から通知可能なコレクションのインターフェース。
/// </summary>
/// <remarks>
/// </remarks>
/// <typeparam name="T"></typeparam>
public interface IParallelEventCollection<T>
: IParallelCollection<T>, ParallelIterable<T>, IEnumerable<T>, IEnumerable
{
void EndIterationAsync();
bool EndPending { get; set; }
}

public interface IMapper<TInKey, TInValue, TOutKey, TIntermediateValue>
{
ParallelIterable<KeyValuePair<TOutKey, TIntermediateValue>> Map(TInKey inkey, TInValue inValue);
}

public interface IReducer<TOutKey, TIntermediateValue, TOutValue>
{
ParallelIterable<TOutValue> Reduce(TOutKey outKey, ParallelIterable<TIntermediateValue> intermediateValues);
}


public sealed class MapReduceManager<TInKey, TInValue, TOutKey, TIntermediateValue, TOutValue>
{
/// <summary>
/// 途中値を入れるストレージを生成する処理を取得・設定する。
/// </summary>
public Func<IParallelEventCollection<TIntermediateValue>> NewIntermediateValueStorage { get; set; }

/// <summary>
/// ストレージへの参照を管理するリポジトリを取得・設定する。
/// </summary>
/// <remarks>
/// 実際のストレージは IEventCollection&lt;TIntermediateValue&gt; が管理するため、
/// 特にメモリ上に展開しても問題ないと思われる。ストレージへの参照をポインタで持つか、ファイルパスで持つか、
/// はたまた URL で持つかは実装による。
/// </remarks>
public IParallelDictionary<TOutKey, IParallelEventCollection<TIntermediateValue>> Repository { get; set; }

/// <summary>
/// Map 処理を実装したオブジェクトを取得・設定する。
/// </summary>
public IMapper<TInKey, TInValue, TOutKey, TIntermediateValue> Mapper { get; set; }

/// <summary>
/// Reduce 処理を実装したオブジェクトを取得・設定する。
/// </summary>
public IReducer<TOutKey, TIntermediateValue, TOutValue> Reducer { get; set; }

/// <summary>
/// 非同期処理を取得・設定する。
/// </summary>
public Action<Action> AsyncInvoke { get; set; }

/// <summary>
/// 出力値を入れるストレージを生成する。
/// </summary>
public Func<IParallelEventCollection<TOutValue>> NewOutValueStorage { get; set; }


/// <summary>
/// 処理の実行。
/// </summary>
/// <param name="inKey">Map 処理に渡す In-Key。</param>
/// <param name="inValue">Map 処理に渡す In-Value 。</param>
/// <returns>Reduce 処理の出力結果をまとめたもの。</returns>
public IEnumerable<TOutValue> DoProcess(TInKey inKey, TInValue inValue)
{
var outValues = this.NewOutValueStorage();
foreach (var intermediate in this.Mapper.Map(inKey, inValue))
{
if (!this.Repository.ContainsKey(intermediate.Key))
{
this.AsyncInvoke(
new Action(
() =>
{
var intermediateValues = this.NewIntermediateValueStorage();
intermediateValues.Add(intermediate.Value);
this.Repository.Add(intermediate.Key, intermediateValues);
}
)
);
}
else
{
this.AsyncInvoke(
new Action(
() =>
{
foreach (var outValue in this.Reducer.Reduce(intermediate.Key,
this.Repository[intermediate.Key]))
{
this.AsyncInvoke(new Action(() => outValues.Add(outValue)));
}
}
)
);
}
}

return outValues;
}
}

3 件のコメント:

  1. あ、intermediateValues.EndIterationAsync() を Mapper の foreachが終わったら呼ばないといけないか・・・。

    返信削除
  2. Reducer 立ち上げるのは 1 回でいいか・・・!しまったそれ以前に this.Repository[intermediate.Key] に誰も intermediate.Valu 追加してない・・・orz

    返信削除
  3. this.Repository.Values も Mapper の foreach が終わった時点で EndIterationAsync() せなあかん。

    返信削除