Consumer/Producer queue

נתקלתי לאחרונה בבעיה הבאה:

יש לי ספריה שמשתמשת בספריית WebSockets נחמדה (בשם WebSocketListener של vtortola).

בעזרת הספריה של vtortola אני כותב הודעות Json לClientים.
הבעיה: הספריה שלי יכולה לכתוב הודעות מכמה Threadים לClient, אבל vtortola לא מרשה לכתוב לClient ביותר מThread אחד בו זמנית.

בעיה זו ניתן לפתור בעזרת טכנולוגיות שונות המממשות את הPattern של Consumer/Producer. למשל בעזרת TPL Dataflow הפתרון נראה כך:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class MyConnection : IDisposable
{
private readonly TcpClient mClient;
private readonly ActionBlock<string> mActionBlock;
public MyConnection(TcpClient client)
{
mActionBlock = new ActionBlock<string>(x => InnerSend(x));
mClient = client;
}
private async Task InnerSend(string json)
{
try
{
StreamWriter streamWriter = new StreamWriter(mClient.GetStream());
await streamWriter.WriteAsync(json)
.ConfigureAwait(false);
}
catch (Exception ex)
{
// Log ex
}
}
public void Dispose()
{
mActionBlock.Complete();
mActionBlock.Completion.Wait();
}
}

הסבר קצר: ActionBlock היא מחלקה המקבלת בConstructor פונקציה שמקבלת T ומחזירה Task. עבור כל אובייקט שנכנס לActionBlock, מורצת הפונקציה. הActionBlock דואג לכמה פעמים יכולה הפונקציה לרוץ במקביל: באופן דיפולטי יכול לרוץ בו-זמנית רק Task בודד. (ניתן לשנות זאת ע”י העברת פרמטרים בConstructor)

המשמעות היא שבכל פעם שנכנס אובייקט לActionBlock, הוא נכנס לתור. כשמגיע תורו, נוצר Task (ע”י הפונקציה). רק לאחר שTask זה מסתיים, תרוץ הפונקציה עבור האובייקט הבא בתור.
זה מבטיח לנו שThread אחד בלבד פורק את ההודעות בו-זמנית.

נתקלתי בבעיה, כיוון שTPL Dataflow לא נתמך בFramework 4.0 ובMono.

שאלתי בStackOverflow, ומסתבר שניתן לממש ActionBlock בסיסי בעזרת rx. למי שלא מכיר את rx, זהו Framework גאוני לתכנות Event driven המאפשר לא מעט יכולות (ביניהן גם LINQ), שמאפשר להשתמש בהרבה עקרונות מתמטיים, כגון קומפוזיציה ועוד. הרצאתי על rx לא אחת, אני ממליץ לקרוא על זה באינטרנט ולראות הרצאות בנושא. (יש מימושים לrx בשפות הבאות: C#, C++, Java, Ruby, Python, Clojure, JavaSciprt ועוד!)

אני מסביר את הפתרון כי אני חושב שהוא מעניין. (אם אתם יכולים להשתמש בTPL Dataflow זה כמובן עדיף!)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ActionBlock<T>
{
private readonly ISubject<T> mSubject = new Subject<T>();
private readonly Task mCompletion;
public ActionBlock(Func<T, Task> action)
{
mCompletion =
mSubject.Select(x => Observable.FromAsync(() => action(x)))
.Concat()
.Count()
.ToTask();
}
public Task Completion
{
get
{
return mCompletion;
}
}
public void Post(T item)
{
mSubject.OnNext(item);
}
public void Complete()
{
mSubject.OnCompleted();
}
}

מה שאנחנו רואים זה את הדבר הבא: יש לנו במחלקה Member מסוג ISubject<T>, זהו אובייקט שניתן להפיץ אליו אירועים מצד אחד, ולהרשם לאירועים שלו מצד שני. אנחנו מפיצים אליו אירועים בפונקציה Post (ע”י קריאה לפונקציה OnNext).

בConstructor אנחנו יוצרים Task שמייצג את הסיום של הBlock (הTask ששמו Completion מסתיים אחרי שקוראים לComplete והסתיימה ריצת הTaskים עבור כל איברי התור).

אני אסביר כעת את היצירה:
השורה

1
mSubject.Select(x => Observable.FromAsync(() => action(x)))

יוצרת IObservable<IObservable<Unit>>. הפונקציה Observable.FromAsync היא פונקציה שמקבלת [פונקציה המייצרת Task]. היא יוצרת Observable קר (זהו Observable שמתחיל לייצר אירועים רק לאחר שנרשמו אליו) המסתיים לאחר שהTask הנוצר מהפונקציה הפנימית מסתיים.

השורה

1
.Concat()

היא קריאה לפונקציה Concat. הפונקציה Concat משטחת את הIObservable<IObservable<Unit>> לIObservable<Unit> באופן הבא: היא נרשמת ראשית לIObserable הראשון שחוזר מהIObservable<IObservable<Unit>>. לאחר מכן, לאחר שהIObservable הראשון מסתיים, היא נרשמת לIObservable השני עד שהוא מסתיים וכו’. בסופו של דבר הIObservable השטוח מפיץ את כל התוצאות הנ”ל, ומסתיים כאשר הIObservable הפנימי האחרון מסתיים.

השורה

1
.ToTask();

היא קריאה למתודה ToTask. מתודה זו יוצרת Task מIObservable נתון. Task זה מסתיים כאשר הIObservable מסתיים וערכו הוא הערך האחרון של הIObservable. אלא שפונקציה זו מחזירה Task שהוא Faulted במידה והIObservable לא הפיץ אף ערך. כדי לתקן זאת, אנחנו קוראים לCount:

1
.Count();

Count זהו אופרטור המחזיר IObservable המסתיים כאשר הIObservable המקורי מסתיים, ומפיץ ערך בודד: מספר האיברים שהופצו ע”י הIObservable המקורי.

זהו מימוש נחמד. מה אם נרצה לקבוע את מספר הThreadים המטפלים במקביל למספר אחר? נוכל לעשות זאת ע”י אופרטור אחר ששמו Merge. אופרטור זה הוא אופרטור נוסף המשטח IObservable<IObservable> לIObservable בודד המפיץ את ערכים של כל הIObservableים הפנימיים. לאופרטור זה overload המקבל int ששמו maxConcurrent - זהו מספר המאפשר להחליט לכמה IObservableים פנימיים של הIObservable הגדול ניתן להרשם בו-זמנית.

לכן הקוד יכול להראות כך:

1
2
3
4
5
6
7
8
public ActionBlock(Func<T, Task> action, int maxConcurrent)
{
mCompletion =
mSubject.Select(x => Observable.FromAsync(() => action(x)))
.Merge(maxConcurrent)
.Count()
.ToTask();
}

הערה: מסתבר שמימוש זה סובל מבעיה של StackOverflowException כאשר יש מספר רב של איברים בתור. ראו כאן. מומלץ לא להשתמש במימוש זה.

סופ”ש בלי בעיות סנכרון טוב

שתף