2005年12月02日

首先说一下一些小变化:

  • 在执行 Casing 模块的start(), start_thread(), start_sync_thread()时,可以输入关键字参数,那么这些参数就可以在运行绑定函数时将同名的关键字参数值进行替换。这样的话,你可以把生成 Casing 对象时传入的参数认为是缺省参数,然后在运行时改变它们的值。注意是,如果有多个绑定函数存在相同的关键字参数(一定要关键字参数的写法,不然不知道参数的对应关系),那么会一起替换掉。
  • Casing 增加了一个 copy() 方法,它可以生成一个克隆的对象,这个对象与原始对象一样。
  • Casing 增加了isactive() 方法可以判断 Casing 对象是否还在运行。
  • 再一个大些的变化就是增加了 MultiCasing 类,它可以实现 Casing 对象集的管理。

下面针对 MultiCasing 来说明。

from Casing import *
import time
import threading

def test_A(n, syncvar):
    for i in range(1, n+1):
        if syncvar:
            print threading.currentThread().getName(), i
            time.sleep(.5)
        else:
            break
  
def on_process(syncvar):
    print ‘on_process…’

def on_finish():
    print ‘finished’
   
def on_abort():
    print ‘abort’
   
control = MultiCasing()
control.onprocess(on_process)
control.onfinish(on_finish)
control.onabort(on_abort)

for i in range(0, 100):
    d = Casing(test_A, n=5)
    control.append(d)
control.start_sync_thread()
time.sleep(30)
print ’stop’

control是一个 MultiCasing 对象,它也象 Casing 对象一样可以有 onprocess, onfinish(Casing中为onsuccess), onabort的回调函数句柄。向它增加一个 Casing 很简单,调用 append() 方法即可。MultiCasing 有两个可传的构造参数,size表示并发线程的个数,缺省为10。need_manual_stop表示是否当所有线程执行完毕后自动停止线程调度,缺省为False,即自动停止。如果自动停止的话,使用append只是增加待处理 Casing 对象,并不进行线程调度。而值为 True 时,一旦运行,当append时,Casing 对象即参加线程调度。

上面的例子演示了加入许多的 Casing 对象,由 MultiCasing 对象来进行调度。上面的例子当所有线程执行完毕后,control就停止了。再添加也无效。如果想生成一个一直有效的对象,需要在构造时指定need_manual_stop=True。这样你可以随时向里面添加 Casing 对象。需要时手工执行 MultiCasing 对象的 stop_thread() 方法即可。请注意,目前 MultiCasing 对象不能重入,即一旦执行了 stop_thread() 之后,再 start_* 结果可能有问题。因此对于再想运行的话,请重新创建 MultiCasing ,并重新加入 Casing 对象。

2005年11月30日

Casing 还提供了一个叫start_sync_thread()的方法,它是用来进行同步控制。记得吗?上一篇为了实现处理线程与状态更新线程之间的信息交换,我们通过创建一个可变对象( 实例,列表,字典都可以)作为共同的参数,这样就可以了。但还是需要你自已去创建,同时不能保证线程安全(new_obj()只是一个空对象,没有线程安全的保护)。这样,为了解决线程同步和修改状态信息时的线程安全,Casing提供了一个叫 SyncVar 的对象。你可以单独使用它,也可以在调用start_sync_thread() 时由 Casing 来替你生成。

SyncVar 对象可以进行如下操作:

var = SyncVar()
print bool(var)
var.set()
print var.isset()
var.set(‘hello’)
print var.get()
var.clear()
print var.isset()
print bool(var)
if not var:print ‘False’
var.set(‘hello’)
print var == ‘hello’
print var != ‘hello’

创建一个新的 SyncVar 对象,它的缺省状态是 False 。执行set()时,如果没有参数,则状态改为True。它可以传入任何值的参数。通过isset()来得到状态值。执行clear()可以将状态清除为False。可以直接对 SyncVar 对象本身求bool()值,可以进行等于,不等于的比较。执行get()与isset()是一样的,只是方便理解,都是返回状态值。

那么一旦你调用start_sync_thread()方法来执行的话,需要在加载的函数中增加"syncvar"参数。同时对于process函数也要增加这一参数。并且 start_sync_thread() 会将自动创建 SyncVar 对象同时将对象初始化为 True 的状态。同时在加载的函数和process函数中都应该对这个变量进行判断,一旦变为 False,则表示程序需要结束,那么这些函数都应该退出。同时 Casing 在函数切换时一旦发现 syncvar 的值为 False,则会停止后继的处理直接退出。同时Casing 还提供了 stop_thread()  的方法,用来将 syncvar 的值改为False,从而起到中断运行的效果。但这一些都基于这些约定。可能相对来说有些复杂。但如果你自已需要这样做,方法与此应该是相差不多。

from Casing import *
import time

def test_A(n, syncvar):
    for i in range(1, n+1):
        if syncvar:
            syncvar.set(i)
            print "=",i
            time.sleep(0.5)
        else:
            break
       
def process(syncvar):
    if syncvar:
        print ‘process…’, syncvar.get()
   
d = Casing(test_A, 10) + Casing(test_A, 20)
d.onprocess(process, timestep=2)
d.start_sync_thread()
time.sleep(20)
print ’stop’

这是一个使用了 SyncVar 约定的例子,需要注意:test_A和process都需要接收一个名为syncvar的变量,变量名不能错。但在定义Casing对象时并不需要传递这个参数,因为这是 Casing 自动为你生成的。

前面讲了这么多只是将函数串了起来,并没有什么,但却是最基础的功能。下面就要进入线程控制了。Casing 目前有三种启动方式:start(), start_thread(), start_sync_thread()。

start()是不带线程的方式。

start_thread()将启动线程,但在运行中不会对线程进行干预,用户可以自已想办法。

start_sync_thread() 将动线程,同时对调用的函数的参数有要求,它会传递一个同步对象给调用的方法,可以让方法使用这个同步对象,从而对线程的运行进行更精细的操作。

请注意,Casing 目前实现的线程控制是将多个函数串行,而不是并行(以后可以考虑实现多线程并行调度)。

例1:

from Casing import *
import time

def test_A(n):
    for i in range(1, n+1):
        print i
        time.sleep(.5)

d = Casing(test_A, n=10) + Casing(test_A, n=20)
d.start_thread()
time.sleep(20)
print ’stop’

这就是一个使用线程方式执行的例子。test_A会输入一系列的数字,每输出一个间隔0.5秒。d是一个Casing对象,它将两个test_A调用串起来。最后的time.sleep(10)是必须的,如果没有这一句,主线程一下子就执行完毕了,因此子线程将自动结束。而通过sleep()等待一会就可以让子线程运行完毕。

这时我们将其扩展一下,增加进度控制处理:

例2:

from Casing import *
import time

def test_A(n):
    for i in range(1, n+1):
        print i
        time.sleep(.5)
   
def process():
    print ‘process…’

d = Casing(test_A, n=10) + Casing(test_A, n=20)
d.onprocess(process, timestep=2)
d.start_thread()
time.sleep(20)
print ’stop’

可以看到上述的代码在一边打印数字的同时,一边显示’process…’字符串。但’process…’字符串的显示与打印是无关的,只是由 Casing 来进行调度的。特别是在挂接 onprocess 时,传入一个 timestep 的参数,它表示间隔多长就调用一次 process 的处理。但这个参数在真正调用 process() 时会被 Casing 去掉。这是需要注意的。

有时是需要这种处理的。比如你在下载,一边做下载,一边显示下载的进度。如果你不需要显示为百分比,那么只要不停地显示一种动画效果就好了。这样不用关心下载了多少之类的信息。因此完全可以象上面一样。如果转到 wxPython 下,那么可以在 process() 中进行界面的显示处理。此时可以使用 wx.CallAfter() 方面来刷新界面。要知道 process() 的处理是放在单独的一个线程中进行的。

有时情况更复杂。我就是需要实现一个进度百分比的显示,那么我必须要知道处理线程执行到什么程度了,然后再通知状态更新线程去显示这个程度,那我该怎么办呢?

1.你可以通过一个可变参数,在启动线程前创建好,然后分别传给处理线程和状态更新线程。这样只要处理线程改变了这个可变参数的值,那么状态更新线程自然就知道值是什么了。

2. 如果你不想自已创建这个可变参数,Casing提供了new_obj()可以创建一个空对象。

例3:

from Casing import *
import time

def test_A(n, var):
    for i in range(1, n+1):
        print i
        var.number = i
        time.sleep(.5)

def process(var):
    print ‘process…’, var.number
   
var = new_obj()

d = Casing(test_A, n=10, var=var) + Casing(test_A, n=20, var=var)
d.onprocess(process, timestep=2, var=var)
d.start_thread()
time.sleep(20)
print ’stop’

这时在执行结果中可以看到process…的后面有一个数字了。这个数字就是test_A()函数修改的。注意上面的参数都使用了字典关键字,象n=10, var=var。python不允许前面使用字典参数,后面却不用。

上一篇只是讲了 Casing 将无参数的方法串起来调用,下面就是将有参数的方法串起来调用。举例为:

import Casing

def test_A(a):
    print a
   
def test_B(a, b):
    print a, b
   
def test_C(a, b, c):
    print a, b, c
   
d = Casing.Casing(test_A, 1) + Casing.Casing(test_B, ‘a’, b=2) + Casing.Casing(test_C, 1, b=2, c=’c')
d.start()

不用太多的解释。可以使用tuple参数,也可以使用字典参数,只要对应上就好。执行结果为:

1
a 2
1 2 c

象前面的问题,如果调用的函数之间有关联怎么办呢?那么可以使用可变参数。为了方便 Casing 提供了一个 new_obj() 的方法,它其实就是创建一个空对象。你可以在上面添加属性,然后用它作为参数来传递。这个就不再演示了。大家自已去试吧。

那么在处理过程中可能会发生一些事件,比如说:抛出异常,执行完毕,这些我如何知道呢?

Casing 允许你注册一些特殊事件的回调函数来进行处理。如:

  • onsuccess()
    当所有函数执行完毕后将调用
  • onexception()
    当出现异常时将调用,如果没有定义则向标准输出打印异常信息
  • onabort()
    当某一函数抛出特殊的AbortException时调时,调用onabort回调函数并退出
  • onprocess()
    用在线程执行时处理进度状态

举例为:

def test_E():
    raise Casing.AbortException

def on_abort():
    print ‘abort…’

d = Casing.Casing(test_A, 1) + Casing.Casing(test_B, ‘a’, b=2) + Casing.Casing(test_E)
d += Casing.Casing(test_C, 1, b=2, c=’c')
d.onabort(on_abort)
d.start()

输出结果为:

1
a 2
abort…

首先演示一下 Casing 将函数串起来执行的能力,同时演示了如何创建 Casing 对象和向这个对象增加新的方法。首先看一段代码:

import Casing

def test_A():
    print ‘A’
   
def test_B():
    print ‘B’
   
def test_C():
    print ‘C’
   
d = Casing.Casing(test_A) + Casing.Casing(test_B) + Casing.Casing(test_C)
d.start()

上面很简单。有三个函数,分别打印出’A', ‘B’, ‘C’。然后通过Casing将它们分别包装起来,创建了三个 Casing 对象。然后将这个三个对象加起来生成最终的一个 Casing 对象。然后调用 Casing 对象的 start() 方法去执行。可以看到结果为:

A
B
C

那么 Casing 的生成还可以在创建了第一个对象后使用 += 的方法加追加函数,如:

d = Casing.Casing(test_A)
d += Casing.Casing(test_B)
d += Casing.Casing(test_C)
d.start()

执行效果同上面。

还可以通过 Casing 的 append 方法来追加函数,如:

d = Casing.Casing(test_A)
d.append(test_B)
d.append(test_C)
d.start()

更新的 Casing 版本还可以在允许产生一个空的 Casing  对象,如:

d = Casing.Casing()
d.append(test_A)
d.append(test_B)
d.append(test_C)
d.start()

刚开始时 d 中没有添加任何函数。

通过上面的执行可以看到 Casing 中的函数是按照添加的顺序来执行的。要注意 Casing 在调用其中的函数时不会将上一个函数的返回值传给下一个函数。因此如果想在函数间传递参数的话,可以使用可变参数来传递。这一点我们在后面会讲到。

Casing 是我写的一个模块,它将用于 NewEdit 中,主要的用途是为了实现多线程处理。那么在开发这一模块时我面临的需求是:

  • 一个完整的处理是由几个分离的步骤完成,我需要按顺序依次对它们进行调用
  • 我需要将这个完整处理放在一个线程中运行
  • 在运行同时我需要有一个进度处理的线程表示它正在工作
  • 处理线程结束时需要通知进度处理线程退出,即需要一定的同步功能

wxPython 做线程处理我已经摸索了一段时间了。说复杂也复杂,说简单也简单。从一般的编程思维方式转到线程编程可能有些不适应。象我们习惯在一段程序中加上一些信息的提示,表示过程的执行状态。如果只是向stdout输出,没有什么问题。但如果在 GUI 中更新GUI,在线程中可能就会有问题,可能造成系统挂起。GUI执行的线程一般称为主线程,其它线程为子线程。为了在子线程中更新GUI界面,需要转到主线程中去执行。这样,在 wxPython 中,或者使用它提供的方便的 wx.CallAfter() 方法,或者自已定义一个事件处理,然后通过发送自定义消息的方法来转到主线程去执行界面更新的操作。这些在我以前的 Blog 中也讲过了。

那么线程知道该如何办了,是不是就容易处理了呢?还不是。如果协调子线程与主线程,这是很麻烦的事。可以在需要更新界面时使用 wx.CallAfter() 来方便处理,但就是最基本的。创建子线程这也是一块工作。一般可以从 threading.Thread 派生自已的线程类。如果再有处理进度的功能,那么如果去通知它的状态也是一个问题。状态显示,一种是由子线程主动发出,一种可以是创建另一个并行的子线程专门用来显示状态,它只是显示运行中的状态,然后与处理子线程之间进行同步,当处理子线程执行完毕,需要通知状态更新线程结束。再有就是由处理子线程改变状态,由状态更新线程来反映出来。总之方式很多。那么这些也都需要编码。再有,一个处理分为几个函数调用。当然可以写成一个通一的调用函数来处理。但有没有简单的方法可以不写这个统一的调用方法而是实现函数调用的组合呢?这些想法就构成了我上面的需求。

于是我创建了这个名为 Casing 的模块,它由几个线程类和调度类组成。原来不叫这个名字,叫Deffer,但查字典没这个单词。后来才知道是 Defer ,原来是我拼写错误。Defer 在 Twisted中有,表示延迟执行。在我新学的 MochiKit 中也有。但后来想我写的这个模块并不是为了延迟执行,而是为了将多个函数进行组合与封装,创建成线程方式,因此这个名字并不合适。于是查了查字典就叫了 Casing 了。

关于它的功能和使用,我会不断地写出来。如果有兴趣可以下载使用。我也不知道有没有类似的其它 Python 模块具有它的功能。

代码已经放在 NewEdit 中去了。modules/Casing.py 就是。你可以从 svn 中更新。如果不想下载或更新 NewEdit ,那么可以从这里下载

这个模块刚开发出来,不能保证不做修改。