C#高级编程五十八天
对于并行任务,与其相关紧密的就是对一些共享资源,数据结构的并行訪问.常常要做的就是对一些队列进行加锁-解锁,然后运行类似插入,删除等等相互排斥操作. .NET4提供了一些封装好的支持并行操作数据容器,能够降低并行编程的复杂程度.
并行集合的命名空间:System.Collections.Concurrent
并行容器:
ConcurrentQueue
ConcurrentStack
ConcurrentBag: 一个无序的数据结构集,当不考虑顺序时很实用.
BlockingCollection:与经典的堵塞队列数据结构类似
ConcurrentDictoinary
以上这些集合在某种程度上使用了无锁技术(CAS和内存屏蔽),与加相互排斥锁相比获得了性能的提升.可是在串行程序中,最好不要使用这些集合,他们必定会影响性能.
ConcurrentQueue使用方法与实例
其全然无锁,但当CAS面临资源竞争失败时可能会陷入自旋并重试操作.
Enqueue:在队尾插入元素
TryDequeue:尝试删除对头元素,并通过out參数返回
TryPeek:尝试将对头元素通过out參数返回,但不删除元素
案例:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Concurrent;
namespace 并行结合Queue
{
class Program
{
internal static ConcurrentQueue<int> _TestQueue;
class ThreadWork1 //生产者
{
public ThreadWork1()
{ }
public void run()
{
Console.WriteLine("ThreadWork1 run { ");
for (int i = 0; i < 100; i++)
{
Console.WriteLine("ThreadWork1 producer: "+i);
_TestQueue.Enqueue(i);
}
Console.WriteLine("ThreadWork1 run } ");
}
}
class ThreadWork2 //consumer
{
public ThreadWork2()
{ }
public void run()
{
int i = 0;
bool IsDequeue = false;
Console.WriteLine("ThreadWork2 run { ");
for (; ; )
{
IsDequeue = _TestQueue.TryDequeue(out i);
if (IsDequeue)
{
System.Console.WriteLine("ThreadWork2 consumer: " + i * i + " =====");
}
if (i==99)
{
break;
}
}
Console.WriteLine("ThreadWork2 run } ");
}
}
static void StartT1()
{
ThreadWork1 work1 = new ThreadWork1();
work1.run();
}
static void StartT2()
{
ThreadWork2 work2 = new ThreadWork2();
work2.run();
}
static void Main(string[] args)
{
Task t1 = new Task(() => StartT1());
Task t2 = new Task(() => StartT2());
_TestQueue = new ConcurrentQueue<int>();
Console.WriteLine("Sample 3-1 Main {");
Console.WriteLine("Main t1 t2 started {");
t1.Start();
t2.Start();
Console.WriteLine("Main t1 t2 started }");
Console.WriteLine("Main wait t1 t2 end {");
Task.WaitAll(t1, t2);
Console.WriteLine("Main wait t1 t2 end }");
Console.WriteLine("Sample 3-1 Main }");
Console.ReadKey();
}
}
}
ConcurrentStact
其全然无锁,但当CAS面临资源竞争失败时可能会陷入自旋并重试操作.
Push:向栈顶插入元素
TryPop:从栈顶弹出元素,而且通过out參数返回
TryPeek:返回栈顶元素,但不弹出
案例:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Concurrent;
namespace 并行结合Queue
{
class Program
{
internal static ConcurrentStack<int> _TestStack;
class ThreadWork1 //生产者
{
public ThreadWork1()
{ }
public void run()
{
Console.WriteLine("ThreadWork1 run { ");
for (int i = 0; i < 100; i++)
{
Console.WriteLine("ThreadWork1 producer: "+i);
_TestStack.Push(i);
}
Console.WriteLine("ThreadWork1 run } ");
}
}
class ThreadWork2 //consumer
{
public ThreadWork2()
{ }
public void run()
{
int i = 0;
bool IsDequeue = false;
Console.WriteLine("ThreadWork2 run { ");
for (; ; )
{
IsDequeue = _TestStack.TryPop(out i);
if (IsDequeue)
{
System.Console.WriteLine("ThreadWork2 consumer: " + i * i + " =====" + i);
}
if (i==99)
{
break;
}
}
Console.WriteLine("ThreadWork2 run } ");
}
}
static void StartT1()
{
ThreadWork1 work1 = new ThreadWork1();
work1.run();
}
static void StartT2()
{
ThreadWork2 work2 = new ThreadWork2();
work2.run();
}
static void Main(string[] args)
{
Task t1 = new Task(() => StartT1());
Task t2 = new Task(() => StartT2());
_TestStack = new ConcurrentStack<int>();
Console.WriteLine("Sample 4-1 Main {");
Console.WriteLine("Main t1 t2 started {");
t1.Start();
t2.Start();
Console.WriteLine("Main t1 t2 started }");
Console.WriteLine("Main wait t1 t2 end {");
Task.WaitAll(t1, t2);
Console.WriteLine("Main wait t1 t2 end }");
Console.WriteLine("Sample 4-1 Main }");
Console.ReadKey();
}
}
}
ConcurrentBag
一个无序的集合,程序能够向当中插入元素,或删除元素.
在同一个线程中向集合插入,删除元素效率非常高.
Add:向集合中插入元素
TryTake:从集合中取出元素并删除
TryPeek:从集合中取出元素,但不删除元素
案例:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace 并行集合ConcurrentBag
{
class Program
{
internal static ConcurrentBag<int> _TestBag;
class ThreadWork1 //producer
{
public ThreadWork1()
{ }
public void run()
{
Console.WriteLine("Threadwork1 run { ");
for (int i = 0; i < 100; i++)
{
Console.WriteLine("ThreadWork1 producer: "+i);
_TestBag.Add(i);
}
Console.WriteLine("ThreadWork1 run } ");
}
}
class ThreadWork2//consumer
{
public ThreadWork2()
{ }
public void run()
{
bool IsDequeue = false;
Console.WriteLine("ThreadWork2 run { ");
for (int i = 0; i < 100; i++)
{
IsDequeue = _TestBag.TryTake(out i);
if (IsDequeue)
{
Console.WriteLine("ThreadWork2 consumer: " + i * i + "======" + i);
}
}
Console.WriteLine("ThreadWork2 run } ");
}
}
static void Start1()
{
ThreadWork1 work1 = new ThreadWork1();
work1.run();
}
static void Start2()
{
ThreadWork2 work2 = new ThreadWork2();
work2.run();
}
static void Main(string[] args)
{
Task t1 = new Task(() => Start1());
Task t2 = new Task(() => Start2());
_TestBag = new ConcurrentBag<int>();
t1.Start();
t2.Start();
Console.WriteLine("Main t1 t2 started }");
Console.WriteLine("Main wait t1 t2 end {");
Task.WaitAll(t1, t2);
Console.WriteLine("Main wait t1 t2 end }");
Console.WriteLine("Sample 4-3 Main }");
Console.ReadKey();
}
}
}
BlockingCollection
一个支持界限和堵塞的容器
Add:向容器中插入元素
TryTake:从容器中取出元素并删除
TryPeek:从容器中取出元素,但不删除
CompleteAdding:告诉容器,加入元素完毕.此时假设还想继续加入会发生异常.
IsCompleted:告诉消费者线程,产生者线程还在继续执行中,任务还未完毕.
案例:
温馨提示: 本文由Jm博客推荐,转载请保留链接: https://www.jmwww.net/file/40526.html