Logo Search packages:      
Sourcecode: ajaxterm version File versions  Download package

qweb.py

#!/usr/bin/python2.3
#
# vim:set et ts=4 fdc=0 fdn=2 fdl=0:
#
# There are no blank lines between blocks beacause i use folding from:
# http://www.vim.org/scripts/script.php?script_id=515
#

"""= QWeb Framework =

== What is QWeb ? ==

QWeb is a python based [http://www.python.org/doc/peps/pep-0333/ WSGI]
compatible web framework, it provides an infratructure to quickly build web
applications consisting of:

 * A lightweight request handler (QWebRequest)
 * An xml templating engine (QWebXml and QWebHtml)
 * A simple name based controler (qweb_control)
 * A standalone WSGI Server (QWebWSGIServer)
 * A cgi and fastcgi WSGI wrapper (taken from flup)
 * A startup function that starts cgi, factgi or standalone according to the
   evironement (qweb_autorun).

QWeb applications are runnable in standalone mode (from commandline), via
FastCGI, Regular CGI or by any python WSGI compliant server.

QWeb doesn't provide any database access but it integrates nicely with ORMs
such as SQLObject, SQLAlchemy or plain DB-API.

Written by Antony Lesuisse (email al AT udev.org)

Homepage: http://antony.lesuisse.org/qweb/trac/

Forum: [http://antony.lesuisse.org/qweb/forum/viewforum.php?id=1 Forum]

== Quick Start (for Linux, MacOS X and cygwin) ==

Make sure you have at least python 2.3 installed and run the following commands:

{{{
$ wget http://antony.lesuisse.org/qweb/files/QWeb-0.7.tar.gz
$ tar zxvf QWeb-0.7.tar.gz
$ cd QWeb-0.7/examples/blog
$ ./blog.py
}}}

And point your browser to http://localhost:8080/

You may also try AjaxTerm which uses qweb request handler.

== Download ==

 * Version 0.7:
   * Source [/qweb/files/QWeb-0.7.tar.gz QWeb-0.7.tar.gz]
   * Python 2.3 Egg [/qweb/files/QWeb-0.7-py2.3.egg QWeb-0.7-py2.3.egg]
   * Python 2.4 Egg [/qweb/files/QWeb-0.7-py2.4.egg QWeb-0.7-py2.4.egg]

 * [/qweb/trac/browser Browse the source repository]

== Documentation ==

 * [/qweb/trac/browser/trunk/README.txt?format=raw Read the included documentation] 
 * QwebTemplating

== Mailin-list ==

 * Forum: [http://antony.lesuisse.org/qweb/forum/viewforum.php?id=1 Forum]
 * No mailing-list exists yet, discussion should happen on: [http://mail.python.org/mailman/listinfo/web-sig web-sig] [http://mail.python.org/pipermail/web-sig/ archives]

QWeb Components:
----------------

QWeb also feature a simple components api, that enables developers to easily
produces reusable components.

Default qweb components:

    - qweb_static:
        A qweb component to serve static content from the filesystem or from
        zipfiles.

    - qweb_dbadmin:
        scaffolding for sqlobject

License
-------
qweb/fcgi.py wich is BSD-like from saddi.com.
Everything else is put in the public domain.


TODO
----
    Announce QWeb to python-announce-list@python.org web-sig@python.org
    qweb_core
        rename request methods into
            request_save_files
            response_404
            response_redirect
            response_download
        request callback_generator, callback_function ?
        wsgi callback_server_local
        xml tags explicitly call render_attributes(t_att)?
        priority form-checkbox over t-value (for t-option)

"""

import BaseHTTPServer,SocketServer,Cookie
import cgi,datetime,email,email.Message,errno,gzip,os,random,re,socket,sys,tempfile,time,types,urllib,urlparse,xml.dom
try:
    import cPickle as pickle
except ImportError:
    import pickle
try:
    import cStringIO as StringIO
except ImportError:
    import StringIO

#----------------------------------------------------------
# Qweb Xml t-raw t-esc t-if t-foreach t-set t-call t-trim
#----------------------------------------------------------
class QWebEval:
    def __init__(self,data):
        self.data=data
    def __getitem__(self,expr):
        if self.data.has_key(expr):
            return self.data[expr]
        r=None
        try:
            r=eval(expr,self.data)
        except NameError,e:
            pass
        except AttributeError,e:
            pass
        except Exception,e:
            print "qweb: expression error '%s' "%expr,e
        if self.data.has_key("__builtins__"):
            del self.data["__builtins__"]
        return r
    def eval_object(self,expr):
        return self[expr]
    def eval_str(self,expr):
        if expr=="0":
            return self.data[0]
        if isinstance(self[expr],unicode):
            return self[expr].encode("utf8")
        return str(self[expr])
    def eval_format(self,expr):
        try:
            return str(expr%self)
        except:
            return "qweb: format error '%s' "%expr
#       if isinstance(r,unicode):
#           return r.encode("utf8")
    def eval_bool(self,expr):
        if self.eval_object(expr):
            return 1
        else:
            return 0
00160 class QWebXml:
    """QWeb Xml templating engine
    
    The templating engine use a very simple syntax, "magic" xml attributes, to
    produce any kind of texutal output (even non-xml).
    
    QWebXml:
        the template engine core implements the basic magic attributes:
    
        t-att t-raw t-esc t-if t-foreach t-set t-call t-trim
    
    """
    def __init__(self,x=None,zipname=None):
        self.node=xml.dom.Node
        self._t={}
        self._render_tag={}
        prefix='render_tag_'
        for i in [j for j in dir(self) if j.startswith(prefix)]:
            name=i[len(prefix):].replace('_','-')
            self._render_tag[name]=getattr(self.__class__,i)

        self._render_att={}
        prefix='render_att_'
        for i in [j for j in dir(self) if j.startswith(prefix)]:
            name=i[len(prefix):].replace('_','-')
            self._render_att[name]=getattr(self.__class__,i)

        if x!=None:
            if zipname!=None:
                import zipfile
                zf=zipfile.ZipFile(zipname, 'r')
                self.add_template(zf.read(x))
            else:
                self.add_template(x)
    def register_tag(self,tag,func):
        self._render_tag[tag]=func
    def add_template(self,x):
        if hasattr(x,'documentElement'):
            dom=x
        elif x.startswith("<?xml"):
            import xml.dom.minidom
            dom=xml.dom.minidom.parseString(x)
        else:
            import xml.dom.minidom
            dom=xml.dom.minidom.parse(x)
        for n in dom.documentElement.childNodes:
            if n.nodeName=="t":
                self._t[str(n.getAttribute("t-name"))]=n
    def get_template(self,name):
        return self._t[name]

    def eval_object(self,expr,v):
        return QWebEval(v).eval_object(expr)
    def eval_str(self,expr,v):
        return QWebEval(v).eval_str(expr)
    def eval_format(self,expr,v):
        return QWebEval(v).eval_format(expr)
    def eval_bool(self,expr,v):
        return QWebEval(v).eval_bool(expr)

    def render(self,tname,v={},out=None):
        if self._t.has_key(tname):
            return self.render_node(self._t[tname],v)
        else:
            return 'qweb: template "%s" not found'%tname
    def render_node(self,e,v):
        r=""
        if e.nodeType==self.node.TEXT_NODE or e.nodeType==self.node.CDATA_SECTION_NODE:
            r=e.data.encode("utf8")
        elif e.nodeType==self.node.ELEMENT_NODE:
            pre=""
            g_att=""
            t_render=None
            t_att={}
            for (an,av) in e.attributes.items():
                an=str(an)
                if isinstance(av,types.UnicodeType):
                    av=av.encode("utf8")
                else:
                    av=av.nodeValue.encode("utf8")
                if an.startswith("t-"):
                    for i in self._render_att:
                        if an[2:].startswith(i):
                            g_att+=self._render_att[i](self,e,an,av,v)
                            break
                    else:
                        if self._render_tag.has_key(an[2:]):
                            t_render=an[2:]
                        t_att[an[2:]]=av
                else:
                    g_att+=' %s="%s"'%(an,cgi.escape(av,1));
            if t_render:
                if self._render_tag.has_key(t_render):
                    r=self._render_tag[t_render](self,e,t_att,g_att,v)
            else:
                r=self.render_element(e,g_att,v,pre,t_att.get("trim",0))
        return r
    def render_element(self,e,g_att,v,pre="",trim=0):
        g_inner=[]
        for n in e.childNodes:
            g_inner.append(self.render_node(n,v))
        name=str(e.nodeName)
        inner="".join(g_inner)
        if trim==0:
            pass
        elif trim=='left':
            inner=inner.lstrip()
        elif trim=='right':
            inner=inner.rstrip()
        elif trim=='both':
            inner=inner.strip()
        if name=="t":
            return inner
        elif len(inner):
            return "<%s%s>%s%s</%s>"%(name,g_att,pre,inner,name)
        else:
            return "<%s%s/>"%(name,g_att)

    # Attributes
    def render_att_att(self,e,an,av,v):
        if an.startswith("t-attf-"):
            att,val=an[7:],self.eval_format(av,v)
        elif an.startswith("t-att-"):
            att,val=(an[6:],self.eval_str(av,v))
        else:
            att,val=self.eval_object(av,v)
        return ' %s="%s"'%(att,cgi.escape(val,1))

    # Tags
    def render_tag_raw(self,e,t_att,g_att,v):
        return self.eval_str(t_att["raw"],v)
    def render_tag_rawf(self,e,t_att,g_att,v):
        return self.eval_format(t_att["rawf"],v)
    def render_tag_esc(self,e,t_att,g_att,v):
        return cgi.escape(self.eval_str(t_att["esc"],v))
    def render_tag_escf(self,e,t_att,g_att,v):
        return cgi.escape(self.eval_format(t_att["escf"],v))
    def render_tag_foreach(self,e,t_att,g_att,v):
        expr=t_att["foreach"]
        enum=self.eval_object(expr,v)
        if enum!=None:
            var=t_att.get('as',expr).replace('.','_')
            d=v.copy()
            size=-1
            if isinstance(enum,types.ListType):
                size=len(enum)
            elif isinstance(enum,types.TupleType):
                size=len(enum)
            elif hasattr(enum,'count'):
                size=enum.count()
            d["%s_size"%var]=size
            d["%s_all"%var]=enum
            index=0
            ru=[]
            for i in enum:
                d["%s_value"%var]=i
                d["%s_index"%var]=index
                d["%s_first"%var]=index==0
                d["%s_even"%var]=index%2
                d["%s_odd"%var]=(index+1)%2
                d["%s_last"%var]=index+1==size
                if index%2:
                    d["%s_parity"%var]='odd'
                else:
                    d["%s_parity"%var]='even'
                if isinstance(i,types.DictType):
                    d.update(i)
                else:
                    d[var]=i
                ru.append(self.render_element(e,g_att,d))
                index+=1
            return "".join(ru)
        else:
            return "qweb: t-foreach %s not found."%expr
    def render_tag_if(self,e,t_att,g_att,v):
        if self.eval_bool(t_att["if"],v):
            return self.render_element(e,g_att,v)
        else:
            return ""
    def render_tag_call(self,e,t_att,g_att,v):
        # TODO t-prefix
        if t_att.has_key("import"):
            d=v
        else:
            d=v.copy()
        d[0]=self.render_element(e,g_att,d)
        return self.render(t_att["call"],d)
    def render_tag_set(self,e,t_att,g_att,v):
        if t_att.has_key("eval"):
            v[t_att["set"]]=self.eval_object(t_att["eval"],v)
        else:
            v[t_att["set"]]=self.render_element(e,g_att,v)
        return ""

#----------------------------------------------------------
# QWeb HTML (+deprecated QWebFORM and QWebOLD)
#----------------------------------------------------------
00357 class QWebURL:
    """ URL helper
    assert req.PATH_INFO== "/site/admin/page_edit"
    u = QWebURL(root_path="/site/",req_path=req.PATH_INFO)
    s=u.url2_href("user/login",{'a':'1'})
    assert s=="../user/login?a=1"
    
    """
    def __init__(self, root_path="/", req_path="/",defpath="",defparam={}):
        self.defpath=defpath
        self.defparam=defparam
        self.root_path=root_path
        self.req_path=req_path
        self.req_list=req_path.split("/")[:-1]
        self.req_len=len(self.req_list)
    def decode(self,s):
        h={}
        for k,v in cgi.parse_qsl(s,1):
            h[k]=v
        return h
    def encode(self,h):
        return urllib.urlencode(h.items())
    def request(self,req):
        return req.REQUEST
    def copy(self,path=None,param=None):
        npath=self.defpath
        if path:
            npath=path
        nparam=self.defparam.copy()
        if param:
            nparam.update(param)
        return QWebURL(self.root_path,self.req_path,npath,nparam)
    def path(self,path=''):
        if not path:
            path=self.defpath
        pl=(self.root_path+path).split('/')
        i=0
        for i in range(min(len(pl), self.req_len)):
            if pl[i]!=self.req_list[i]:
                break
        else:
            i+=1
        dd=self.req_len-i
        if dd<0:
            dd=0
        return '/'.join(['..']*dd+pl[i:])
    def href(self,path='',arg={}):
        p=self.path(path)
        tmp=self.defparam.copy()
        tmp.update(arg)
        s=self.encode(tmp)
        if len(s):
            return p+"?"+s
        else:
            return p
    def form(self,path='',arg={}):
        p=self.path(path)
        tmp=self.defparam.copy()
        tmp.update(arg)
        r=''.join(['<input type="hidden" name="%s" value="%s"/>'%(k,cgi.escape(str(v),1)) for k,v in tmp.items()])
        return (p,r)
class QWebField:
    def __init__(self,name=None,default="",check=None):
        self.name=name
        self.default=default
        self.check=check
        # optional attributes
        self.type=None
        self.trim=1
        self.required=1
        self.cssvalid="form_valid"
        self.cssinvalid="form_invalid"
        # set by addfield
        self.form=None
        # set by processing
        self.input=None
        self.css=None
        self.value=None
        self.valid=None
        self.invalid=None
        self.validate(1)
    def validate(self,val=1,update=1):
        if val:
            self.valid=1
            self.invalid=0
            self.css=self.cssvalid
        else:
            self.valid=0
            self.invalid=1
            self.css=self.cssinvalid
        if update and self.form:
            self.form.update()
    def invalidate(self,update=1):
        self.validate(0,update)
class QWebForm:
    class QWebFormF:
        pass
    def __init__(self,e=None,arg=None,default=None):
        self.fields={}
        # all fields have been submitted
        self.submitted=False
        self.missing=[]
        # at least one field is invalid or missing
        self.invalid=False
        self.error=[]
        # all fields have been submitted and are valid
        self.valid=False
        # fields under self.f for convenience
        self.f=self.QWebFormF()
        if e:
            self.add_template(e)
        # assume that the fields are done with the template
        if default:
            self.set_default(default,e==None)
        if arg!=None:
            self.process_input(arg)
    def __getitem__(self,k):
        return self.fields[k]
    def set_default(self,default,add_missing=1):
        for k,v in default.items():
            if self.fields.has_key(k):
                self.fields[k].default=str(v)
            elif add_missing:
                self.add_field(QWebField(k,v))
    def add_field(self,f):
        self.fields[f.name]=f
        f.form=self
        setattr(self.f,f.name,f)
    def add_template(self,e):
        att={}
        for (an,av) in e.attributes.items():
            an=str(an)
            if an.startswith("t-"):
                att[an[2:]]=av.encode("utf8")
        for i in ["form-text", "form-password", "form-radio", "form-checkbox", "form-select","form-textarea"]:
            if att.has_key(i):
                name=att[i].split(".")[-1]
                default=att.get("default","")
                check=att.get("check",None)
                f=QWebField(name,default,check)
                if i=="form-textarea":
                    f.type="textarea"
                    f.trim=0
                if i=="form-checkbox":
                    f.type="checkbox"
                    f.required=0
                self.add_field(f)
        for n in e.childNodes:
            if n.nodeType==n.ELEMENT_NODE:
                self.add_template(n)
    def process_input(self,arg):
        for f in self.fields.values():
            if arg.has_key(f.name):
                f.input=arg[f.name]
                f.value=f.input
                if f.trim:
                    f.input=f.input.strip()
                f.validate(1,False)
                if f.check==None:
                    continue
                elif callable(f.check):
                    pass
                elif isinstance(f.check,str):
                    v=f.check
                    if f.check=="email":
                        v=r"/^[^@#!& ]+@[A-Za-z0-9-][.A-Za-z0-9-]{0,64}\.[A-Za-z]{2,5}$/"
                    if f.check=="date":
                        v=r"/^(19|20)\d\d-(0[1-9]|1[012])-(0[1-9]|[12][0-9]|3[01])$/"
                    if not re.match(v[1:-1],f.input):
                        f.validate(0,False)
            else:
                f.value=f.default
        self.update()
    def validate_all(self,val=1):
        for f in self.fields.values():
            f.validate(val,0)
        self.update()
    def invalidate_all(self):
        self.validate_all(0)
    def update(self):
        self.submitted=True
        self.valid=True
        self.errors=[]
        for f in self.fields.values():
            if f.required and f.input==None:
                self.submitted=False
                self.valid=False
                self.missing.append(f.name)
            if f.invalid:
                self.valid=False
                self.error.append(f.name)
        # invalid have been submitted and 
        self.invalid=self.submitted and self.valid==False
    def collect(self):
        d={}
        for f in self.fields.values():
            d[f.name]=f.value
        return d
class QWebURLEval(QWebEval):
    def __init__(self,data):
        QWebEval.__init__(self,data)
    def __getitem__(self,expr):
        r=QWebEval.__getitem__(self,expr)
        if isinstance(r,str):
            return urllib.quote_plus(r)
        else:
            return r
00564 class QWebHtml(QWebXml):
    """QWebHtml
    QWebURL:
    QWebField:
    QWebForm:
    QWebHtml:
        an extended template engine, with a few utility class to easily produce
        HTML, handle URLs and process forms, it adds the following magic attributes:
    
        t-href t-action t-form-text t-form-password t-form-textarea t-form-radio
        t-form-checkbox t-form-select t-option t-selected t-checked t-pager
    
    # explication URL:
    # v['tableurl']=QWebUrl({p=afdmin,saar=,orderby=,des=,mlink;meta_active=})
    # t-href="tableurl?desc=1"
    #
    # explication FORM: t-if="form.valid()"
    # Foreach i
    #   email: <input type="text" t-esc-name="i" t-esc-value="form[i].value" t-esc-class="form[i].css"/>
    #   <input type="radio" name="spamtype" t-esc-value="i" t-selected="i==form.f.spamtype.value"/>
    #   <option t-esc-value="cc" t-selected="cc==form.f.country.value"><t t-esc="cname"></option>
    # Simple forms:
    #   <input t-form-text="form.email" t-check="email"/>
    #   <input t-form-password="form.email" t-check="email"/>
    #   <input t-form-radio="form.email" />
    #   <input t-form-checkbox="form.email" />
    #   <textarea t-form-textarea="form.email" t-check="email"/>
    #   <select t-form-select="form.email"/>
    #       <option t-value="1">
    #   <input t-form-radio="form.spamtype" t-value="1"/> Cars
    #   <input t-form-radio="form.spamtype" t-value="2"/> Sprt
    """
    # QWebForm from a template
    def form(self,tname,arg=None,default=None):
        form=QWebForm(self._t[tname],arg,default)
        return form

    # HTML Att
    def eval_url(self,av,v):
        s=QWebURLEval(v).eval_format(av)
        a=s.split('?',1)
        arg={}
        if len(a)>1:
            for k,v in cgi.parse_qsl(a[1],1):
                arg[k]=v
        b=a[0].split('/',1)
        path=''
        if len(b)>1:
            path=b[1]
        u=b[0]
        return u,path,arg
    def render_att_url_(self,e,an,av,v):
        u,path,arg=self.eval_url(av,v)
        if not isinstance(v.get(u,0),QWebURL):
            out='qweb: missing url %r %r %r'%(u,path,arg)
        else:
            out=v[u].href(path,arg)
        return ' %s="%s"'%(an[6:],cgi.escape(out,1))
    def render_att_href(self,e,an,av,v):
        return self.render_att_url_(e,"t-url-href",av,v)
    def render_att_checked(self,e,an,av,v):
        if self.eval_bool(av,v):
            return ' %s="%s"'%(an[2:],an[2:])
        else:
            return ''
    def render_att_selected(self,e,an,av,v):
        return self.render_att_checked(e,an,av,v)

    # HTML Tags forms
    def render_tag_rawurl(self,e,t_att,g_att,v):
        u,path,arg=self.eval_url(t_att["rawurl"],v)
        return v[u].href(path,arg)
    def render_tag_escurl(self,e,t_att,g_att,v):
        u,path,arg=self.eval_url(t_att["escurl"],v)
        return cgi.escape(v[u].href(path,arg))
    def render_tag_action(self,e,t_att,g_att,v):
        u,path,arg=self.eval_url(t_att["action"],v)
        if not isinstance(v.get(u,0),QWebURL):
            action,input=('qweb: missing url %r %r %r'%(u,path,arg),'')
        else:
            action,input=v[u].form(path,arg)
        g_att+=' action="%s"'%action
        return self.render_element(e,g_att,v,input)
    def render_tag_form_text(self,e,t_att,g_att,v):
        f=self.eval_object(t_att["form-text"],v)
        g_att+=' type="text" name="%s" value="%s" class="%s"'%(f.name,cgi.escape(f.value,1),f.css)
        return self.render_element(e,g_att,v)
    def render_tag_form_password(self,e,t_att,g_att,v):
        f=self.eval_object(t_att["form-password"],v)
        g_att+=' type="password" name="%s" value="%s" class="%s"'%(f.name,cgi.escape(f.value,1),f.css)
        return self.render_element(e,g_att,v)
    def render_tag_form_textarea(self,e,t_att,g_att,v):
        type="textarea"
        f=self.eval_object(t_att["form-textarea"],v)
        g_att+=' name="%s" class="%s"'%(f.name,f.css)
        r="<%s%s>%s</%s>"%(type,g_att,cgi.escape(f.value,1),type)
        return r
    def render_tag_form_radio(self,e,t_att,g_att,v):
        f=self.eval_object(t_att["form-radio"],v)
        val=t_att["value"]
        g_att+=' type="radio" name="%s" value="%s"'%(f.name,val)
        if f.value==val:
            g_att+=' checked="checked"'
        return self.render_element(e,g_att,v)
    def render_tag_form_checkbox(self,e,t_att,g_att,v):
        f=self.eval_object(t_att["form-checkbox"],v)
        val=t_att["value"]
        g_att+=' type="checkbox" name="%s" value="%s"'%(f.name,val)
        if f.value==val:
            g_att+=' checked="checked"'
        return self.render_element(e,g_att,v)
    def render_tag_form_select(self,e,t_att,g_att,v):
        f=self.eval_object(t_att["form-select"],v)
        g_att+=' name="%s" class="%s"'%(f.name,f.css)
        return self.render_element(e,g_att,v)
    def render_tag_option(self,e,t_att,g_att,v):
        f=self.eval_object(e.parentNode.getAttribute("t-form-select"),v)
        val=t_att["option"]
        g_att+=' value="%s"'%(val)
        if f.value==val:
            g_att+=' selected="selected"'
        return self.render_element(e,g_att,v)

    # HTML Tags others
    def render_tag_pager(self,e,t_att,g_att,v):
        pre=t_att["pager"]
        total=int(self.eval_str(t_att["total"],v))
        start=int(self.eval_str(t_att["start"],v))
        step=int(self.eval_str(t_att.get("step","100"),v))
        scope=int(self.eval_str(t_att.get("scope","5"),v))
        # Compute Pager
        p=pre+"_"
        d={}
        d[p+"tot_size"]=total
        d[p+"tot_page"]=tot_page=total/step
        d[p+"win_start0"]=total and start
        d[p+"win_start1"]=total and start+1
        d[p+"win_end0"]=max(0,min(start+step-1,total-1))
        d[p+"win_end1"]=min(start+step,total)
        d[p+"win_page0"]=win_page=start/step
        d[p+"win_page1"]=win_page+1
        d[p+"prev"]=(win_page!=0)
        d[p+"prev_start"]=(win_page-1)*step
        d[p+"next"]=(tot_page>=win_page+1)
        d[p+"next_start"]=(win_page+1)*step
        l=[]
        begin=win_page-scope
        end=win_page+scope
        if begin<0:
            end-=begin
        if end>tot_page:
            begin-=(end-tot_page)
        i=max(0,begin)
        while i<=min(end,tot_page) and total!=step:
            l.append( { p+"page0":i, p+"page1":i+1, p+"start":i*step, p+"sel":(win_page==i) })
            i+=1
        d[p+"active"]=len(l)>1
        d[p+"list"]=l
        # Update v
        v.update(d)
        return ""

#----------------------------------------------------------
# QWeb Simple Controller
#----------------------------------------------------------
def qweb_control(self,jump='main',p=[]):
    """ qweb_control(self,jump='main',p=[]):
    A simple function to handle the controler part of your application. It
    dispatch the control to the jump argument, while ensuring that prefix
    function have been called.

    qweb_control replace '/' to '_' and strip '_' from the jump argument.

    name1
    name1_name2
    name1_name2_name3

    """
    jump=jump.replace('/','_').strip('_')
    if not hasattr(self,jump):
        return 0
    done={}
    todo=[]
    while 1:
        if jump!=None:
            tmp=""
            todo=[]
            for i in jump.split("_"):
                tmp+=i+"_";
                if not done.has_key(tmp[:-1]):
                    todo.append(tmp[:-1])
            jump=None
        elif len(todo):
            i=todo.pop(0)
            done[i]=1
            if hasattr(self,i):
                f=getattr(self,i)
                r=f(*p)
                if isinstance(r,types.StringType):
                    jump=r
        else:
            break
    return 1

#----------------------------------------------------------
# QWeb WSGI Request handler
#----------------------------------------------------------
class QWebSession(dict):
    def __init__(self,environ,**kw):
        dict.__init__(self)
        default={
            "path" : tempfile.gettempdir(),
            "cookie_name" : "QWEBSID",
            "cookie_lifetime" : 0,
            "cookie_path" : '/',
            "cookie_domain" : '',
            "limit_cache" : 1,
            "probability" : 0.01,
            "maxlifetime" : 3600,
            "disable" : 0,
        }
        for k,v in default.items():
            setattr(self,'session_%s'%k,kw.get(k,v))
        # Try to find session
        self.session_found_cookie=0
        self.session_found_url=0
        self.session_found=0
        self.session_orig=""
        # Try cookie
        c=Cookie.SimpleCookie()
        c.load(environ.get('HTTP_COOKIE', ''))
        if c.has_key(self.session_cookie_name):
            sid=c[self.session_cookie_name].value[:64]
            if re.match('[a-f0-9]+$',sid) and self.session_load(sid):
                self.session_id=sid
                self.session_found_cookie=1
                self.session_found=1
        # Try URL
        if not self.session_found_cookie:
            mo=re.search('&%s=([a-f0-9]+)'%self.session_cookie_name,environ.get('QUERY_STRING',''))
            if mo and self.session_load(mo.group(1)):
                self.session_id=mo.group(1)
                self.session_found_url=1
                self.session_found=1
        # New session
        if not self.session_found:
            self.session_id='%032x'%random.randint(1,2**128)
        self.session_trans_sid="&amp;%s=%s"%(self.session_cookie_name,self.session_id)
        # Clean old session
        if random.random() < self.session_probability:
            self.session_clean()
    def session_get_headers(self):
        h=[]
        if (not self.session_disable) and (len(self) or len(self.session_orig)):
            self.session_save()
            if not self.session_found_cookie:
                c=Cookie.SimpleCookie()
                c[self.session_cookie_name] = self.session_id
                c[self.session_cookie_name]['path'] = self.session_cookie_path
                if self.session_cookie_domain:
                    c[self.session_cookie_name]['domain'] = self.session_cookie_domain
#               if self.session_cookie_lifetime:
#                   c[self.session_cookie_name]['expires'] = TODO date localtime or not, datetime.datetime(1970, 1, 1)
                h.append(("Set-Cookie", c[self.session_cookie_name].OutputString()))
            if self.session_limit_cache:
                h.append(('Cache-Control','no-store, no-cache, must-revalidate, post-check=0, pre-check=0'))
                h.append(('Expires','Thu, 19 Nov 1981 08:52:00 GMT'))
                h.append(('Pragma','no-cache'))
        return h
    def session_load(self,sid):
        fname=os.path.join(self.session_path,'qweb_sess_%s'%sid)
        try:
            orig=file(fname).read()
            d=pickle.loads(orig)
        except:
            return
        self.session_orig=orig
        self.update(d)
        return 1
    def session_save(self):
        if not os.path.isdir(self.session_path):
            os.makedirs(self.session_path)
        fname=os.path.join(self.session_path,'qweb_sess_%s'%self.session_id)
        try:
            oldtime=os.path.getmtime(fname)
        except OSError,IOError:
            oldtime=0
        dump=pickle.dumps(self.copy())
        if (dump != self.session_orig) or (time.time() > oldtime+self.session_maxlifetime/4):
            tmpname=os.path.join(self.session_path,'qweb_sess_%s_%x'%(self.session_id,random.randint(1,2**32)))
            f=file(tmpname,'wb')
            f.write(dump)
            f.close()
            if sys.platform=='win32' and os.path.isfile(fname):
                os.remove(fname)
            os.rename(tmpname,fname)
    def session_clean(self):
        t=time.time()
        try:
            for i in [os.path.join(self.session_path,i) for i in os.listdir(self.session_path) if i.startswith('qweb_sess_')]:
                if (t > os.path.getmtime(i)+self.session_maxlifetime):
                    os.unlink(i)
        except OSError,IOError:
            pass
class QWebSessionMem(QWebSession):
    def session_load(self,sid):
        global _qweb_sessions
        if not "_qweb_sessions" in globals():
            _qweb_sessions={}
        if _qweb_sessions.has_key(sid):
            self.session_orig=_qweb_sessions[sid]
            self.update(self.session_orig)
            return 1
    def session_save(self):
        global _qweb_sessions
        if not "_qweb_sessions" in globals():
            _qweb_sessions={}
        _qweb_sessions[self.session_id]=self.copy()
class QWebSessionService:
    def __init__(self, wsgiapp, url_rewrite=0):
        self.wsgiapp=wsgiapp
        self.url_rewrite_tags="a=href,area=href,frame=src,form=,fieldset="
    def __call__(self, environ, start_response):
        # TODO
        # use QWebSession to provide environ["qweb.session"]
        return self.wsgiapp(environ,start_response)
class QWebDict(dict):
    def __init__(self,*p):
        dict.__init__(self,*p)
    def __getitem__(self,key):
        return self.get(key,"")
    def int(self,key):
        try:
            return int(self.get(key,"0"))
        except ValueError:
            return 0
class QWebListDict(dict):
    def __init__(self,*p):
        dict.__init__(self,*p)
    def __getitem__(self,key):
        return self.get(key,[])
    def appendlist(self,key,val):
        if self.has_key(key):
            self[key].append(val)
        else:
            self[key]=[val]
    def get_qwebdict(self):
        d=QWebDict()
        for k,v in self.items():
            d[k]=v[-1]
        return d
00915 class QWebRequest:
    """QWebRequest a WSGI request handler.

    QWebRequest is a WSGI request handler that feature GET, POST and POST
    multipart methods, handles cookies and headers and provide a dict-like
    SESSION Object (either on the filesystem or in memory).

    It is constructed with the environ and start_response WSGI arguments:
    
      req=qweb.QWebRequest(environ, start_response)
    
    req has the folowing attributes :
    
      req.environ standard WSGI dict (CGI and wsgi ones)
    
    Some CGI vars as attributes from environ for convenience: 
    
      req.SCRIPT_NAME
      req.PATH_INFO
      req.REQUEST_URI
    
    Some computed value (also for convenience)
    
      req.FULL_URL full URL recontructed (http://host/query)
      req.FULL_PATH (URL path before ?querystring)
    
    Dict constructed from querystring and POST datas, PHP-like.
    
      req.GET contains GET vars
      req.POST contains POST vars
      req.REQUEST contains merge of GET and POST
      req.FILES contains uploaded files
      req.GET_LIST req.POST_LIST req.REQUEST_LIST req.FILES_LIST multiple arguments versions
      req.debug() returns an HTML dump of those vars
    
    A dict-like session object.
    
      req.SESSION the session start when the dict is not empty.
    
    Attribute for handling the response
    
      req.response_headers dict-like to set headers
      req.response_cookies a SimpleCookie to set cookies
      req.response_status a string to set the status like '200 OK'
    
      req.write() to write to the buffer
    
    req itselfs is an iterable object with the buffer, it will also also call
    start_response automatically before returning anything via the iterator.
    
    To make it short, it means that you may use
    
      return req
    
    at the end of your request handling to return the reponse to any WSGI
    application server.
    """
    #
    # This class contains part ripped from colubrid (with the permission of
    # mitsuhiko) see http://wsgiarea.pocoo.org/colubrid/
    #
    # - the class HttpHeaders
    # - the method load_post_data (tuned version)
    #
    class HttpHeaders(object):
        def __init__(self):
            self.data = [('Content-Type', 'text/html')]
        def __setitem__(self, key, value):
            self.set(key, value)
        def __delitem__(self, key):
            self.remove(key)
        def __contains__(self, key):
            key = key.lower()
            for k, v in self.data:
                if k.lower() == key:
                    return True
            return False
        def add(self, key, value):
            self.data.append((key, value))
        def remove(self, key, count=-1):
            removed = 0
            data = []
            for _key, _value in self.data:
                if _key.lower() != key.lower():
                    if count > -1:
                        if removed >= count:
                            break
                        else:
                            removed += 1
                    data.append((_key, _value))
            self.data = data
        def clear(self):
            self.data = []
        def set(self, key, value):
            self.remove(key)
            self.add(key, value)
        def get(self, key=False, httpformat=False):
            if not key:
                result = self.data
            else:
                result = []
                for _key, _value in self.data:
                    if _key.lower() == key.lower():
                        result.append((_key, _value))
            if httpformat:
                return '\n'.join(['%s: %s' % item for item in result])
            return result
    def load_post_data(self,environ,POST,FILES):
        length = int(environ['CONTENT_LENGTH'])
        DATA = environ['wsgi.input'].read(length)
        if environ.get('CONTENT_TYPE', '').startswith('multipart'):
            lines = ['Content-Type: %s' % environ.get('CONTENT_TYPE', '')]
            for key, value in environ.items():
                if key.startswith('HTTP_'):
                    lines.append('%s: %s' % (key, value))
            raw = '\r\n'.join(lines) + '\r\n\r\n' + DATA
            msg = email.message_from_string(raw)
            for sub in msg.get_payload():
                if not isinstance(sub, email.Message.Message):
                    continue
                name_dict = cgi.parse_header(sub['Content-Disposition'])[1]
                if 'filename' in name_dict:
                    # Nested MIME Messages are not supported'
                    if type([]) == type(sub.get_payload()):
                        continue
                    if not name_dict['filename'].strip():
                        continue
                    filename = name_dict['filename']
                    # why not keep all the filename? because IE always send 'C:\documents and settings\blub\blub.png'
                    filename = filename[filename.rfind('\\') + 1:]
                    if 'Content-Type' in sub:
                        content_type = sub['Content-Type']
                    else:
                        content_type = None
                    s = { "name":filename, "type":content_type, "data":sub.get_payload() }
                    FILES.appendlist(name_dict['name'], s)
                else:
                    POST.appendlist(name_dict['name'], sub.get_payload())
        else:
            POST.update(cgi.parse_qs(DATA,keep_blank_values=1))
        return DATA

    def __init__(self,environ,start_response,session=QWebSession):
        self.environ=environ
        self.start_response=start_response
        self.buffer=[]

        self.SCRIPT_NAME = environ.get('SCRIPT_NAME', '')
        self.PATH_INFO = environ.get('PATH_INFO', '')
        # extensions:
        self.FULL_URL = environ['FULL_URL'] = self.get_full_url(environ)
        # REQUEST_URI is optional, fake it if absent
        if not environ.has_key("REQUEST_URI"):
            environ["REQUEST_URI"]=urllib.quote(self.SCRIPT_NAME+self.PATH_INFO)
            if environ.get('QUERY_STRING'):
                environ["REQUEST_URI"]+='?'+environ['QUERY_STRING']
        self.REQUEST_URI = environ["REQUEST_URI"]
        # full quote url path before the ?
        self.FULL_PATH = environ['FULL_PATH'] = self.REQUEST_URI.split('?')[0]

        self.request_cookies=Cookie.SimpleCookie()
        self.request_cookies.load(environ.get('HTTP_COOKIE', ''))

        self.response_started=False
        self.response_gzencode=False
        self.response_cookies=Cookie.SimpleCookie()
        # to delete a cookie use: c[key]['expires'] = datetime.datetime(1970, 1, 1)
        self.response_headers=self.HttpHeaders()
        self.response_status="200 OK"

        self.php=None
        if self.environ.has_key("php"):
            self.php=environ["php"]
            self.SESSION=self.php._SESSION
            self.GET=self.php._GET
            self.POST=self.php._POST
            self.REQUEST=self.php._ARG
            self.FILES=self.php._FILES
        else:
            if isinstance(session,QWebSession):
                self.SESSION=session
            elif session:
                self.SESSION=session(environ)
            else:
                self.SESSION=None
            self.GET_LIST=QWebListDict(cgi.parse_qs(environ.get('QUERY_STRING', ''),keep_blank_values=1))
            self.POST_LIST=QWebListDict()
            self.FILES_LIST=QWebListDict()
            self.REQUEST_LIST=QWebListDict(self.GET_LIST)
            if environ['REQUEST_METHOD'] == 'POST':
                self.DATA=self.load_post_data(environ,self.POST_LIST,self.FILES_LIST)
                self.REQUEST_LIST.update(self.POST_LIST)
            self.GET=self.GET_LIST.get_qwebdict()
            self.POST=self.POST_LIST.get_qwebdict()
            self.FILES=self.FILES_LIST.get_qwebdict()
            self.REQUEST=self.REQUEST_LIST.get_qwebdict()
    def get_full_url(environ):
        # taken from PEP 333
        if 'FULL_URL' in environ:
            return environ['FULL_URL']
        url = environ['wsgi.url_scheme']+'://'
        if environ.get('HTTP_HOST'):
            url += environ['HTTP_HOST']
        else:
            url += environ['SERVER_NAME']
            if environ['wsgi.url_scheme'] == 'https':
                if environ['SERVER_PORT'] != '443':
                    url += ':' + environ['SERVER_PORT']
            else:
                if environ['SERVER_PORT'] != '80':
                    url += ':' + environ['SERVER_PORT']
        if environ.has_key('REQUEST_URI'):
            url += environ['REQUEST_URI']
        else:
            url += urllib.quote(environ.get('SCRIPT_NAME', ''))
            url += urllib.quote(environ.get('PATH_INFO', ''))
            if environ.get('QUERY_STRING'):
                url += '?' + environ['QUERY_STRING']
        return url
    get_full_url=staticmethod(get_full_url)
    def save_files(self):
        for k,v in self.FILES.items():
            if not v.has_key("tmp_file"):
                f=tempfile.NamedTemporaryFile()
                f.write(v["data"])
                f.flush()
                v["tmp_file"]=f
                v["tmp_name"]=f.name
    def debug(self):
        body=''
        for name,d in [
            ("GET",self.GET), ("POST",self.POST), ("REQUEST",self.REQUEST), ("FILES",self.FILES),
            ("GET_LIST",self.GET_LIST), ("POST_LIST",self.POST_LIST), ("REQUEST_LIST",self.REQUEST_LIST), ("FILES_LIST",self.FILES_LIST),
            ("SESSION",self.SESSION), ("environ",self.environ),
        ]:
            body+='<table border="1" width="100%" align="center">\n'
            body+='<tr><th colspan="2" align="center">%s</th></tr>\n'%name
            keys=d.keys()
            keys.sort()
            body+=''.join(['<tr><td>%s</td><td>%s</td></tr>\n'%(k,cgi.escape(repr(d[k]))) for k in keys])
            body+='</table><br><br>\n\n'
        return body
    def write(self,s):
        self.buffer.append(s)
    def echo(self,*s):
        self.buffer.extend([str(i) for i in s])
    def response(self):
        if not self.response_started:
            if not self.php:
                for k,v in self.FILES.items():
                    if v.has_key("tmp_file"):
                        try:
                            v["tmp_file"].close()
                        except OSError:
                            pass
                if self.response_gzencode and self.environ.get('HTTP_ACCEPT_ENCODING','').find('gzip')!=-1:
                    zbuf=StringIO.StringIO()
                    zfile=gzip.GzipFile(mode='wb', fileobj=zbuf)
                    zfile.write(''.join(self.buffer))
                    zfile.close()
                    zbuf=zbuf.getvalue()
                    self.buffer=[zbuf]
                    self.response_headers['Content-Encoding']="gzip"
                    self.response_headers['Content-Length']=str(len(zbuf))
                headers = self.response_headers.get()
                if isinstance(self.SESSION, QWebSession):
                    headers.extend(self.SESSION.session_get_headers())
                headers.extend([('Set-Cookie', self.response_cookies[i].OutputString()) for i in self.response_cookies])
                self.start_response(self.response_status, headers)
            self.response_started=True
        return self.buffer
    def __iter__(self):
        return self.response().__iter__()
    def http_redirect(self,url,permanent=1):
        if permanent:
            self.response_status="301 Moved Permanently"
        else:
            self.response_status="302 Found"
        self.response_headers["Location"]=url
    def http_404(self,msg="<h1>404 Not Found</h1>"):
        self.response_status="404 Not Found"
        if msg:
            self.write(msg)
    def http_download(self,fname,fstr,partial=0):
#       allow fstr to be a file-like object
#       if parital:
#           say accept ranages
#           parse range headers...
#           if range:
#               header("HTTP/1.1 206 Partial Content");
#               header("Content-Range: bytes $offset-".($fsize-1)."/".$fsize);
#               header("Content-Length: ".($fsize-$offset));
#               fseek($fd,$offset);
#           else:
        self.response_headers["Content-Type"]="application/octet-stream"
        self.response_headers["Content-Disposition"]="attachment; filename=\"%s\""%fname
        self.response_headers["Content-Transfer-Encoding"]="binary"
        self.response_headers["Content-Length"]="%d"%len(fstr)
        self.write(fstr)

#----------------------------------------------------------
# QWeb WSGI HTTP Server to run any WSGI app
# autorun, run an app as FCGI or CGI otherwise launch the server
#----------------------------------------------------------
class QWebWSGIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    def log_message(self,*p):
        if self.server.log:
            return BaseHTTPServer.BaseHTTPRequestHandler.log_message(self,*p)
    def address_string(self):
        return self.client_address[0]
    def start_response(self,status,headers):
        l=status.split(' ',1)
        self.send_response(int(l[0]),l[1])
        ctype_sent=0
        for i in headers:
            if i[0].lower()=="content-type":
                ctype_sent=1
            self.send_header(*i)
        if not ctype_sent:
            self.send_header("Content-type", "text/html")
        self.end_headers()
        return self.write
    def write(self,data):
        try:
            self.wfile.write(data)
        except (socket.error, socket.timeout),e:
            print e
    def bufferon(self):
        if not getattr(self,'wfile_buf',0):
            self.wfile_buf=1
            self.wfile_bak=self.wfile
            self.wfile=StringIO.StringIO()
    def bufferoff(self):
        if self.wfile_buf:
            buf=self.wfile
            self.wfile=self.wfile_bak
            self.write(buf.getvalue())
            self.wfile_buf=0
    def serve(self,type):
        path_info, parameters, query = urlparse.urlparse(self.path)[2:5]
        environ = {
            'wsgi.version':         (1,0),
            'wsgi.url_scheme':      'http',
            'wsgi.input':           self.rfile,
            'wsgi.errors':          sys.stderr,
            'wsgi.multithread':     0,
            'wsgi.multiprocess':    0,
            'wsgi.run_once':        0,
            'REQUEST_METHOD':       self.command,
            'SCRIPT_NAME':          '',
            'QUERY_STRING':         query,
            'CONTENT_TYPE':         self.headers.get('Content-Type', ''),
            'CONTENT_LENGTH':       self.headers.get('Content-Length', ''),
            'REMOTE_ADDR':          self.client_address[0],
            'REMOTE_PORT':          str(self.client_address[1]),
            'SERVER_NAME':          self.server.server_address[0],
            'SERVER_PORT':          str(self.server.server_address[1]),
            'SERVER_PROTOCOL':      self.request_version,
            # extention
            'FULL_PATH':            self.path,
            'qweb.mode':            'standalone',
        }
        if path_info:
            environ['PATH_INFO'] = urllib.unquote(path_info)
        for key, value in self.headers.items():
            environ['HTTP_' + key.upper().replace('-', '_')] = value
        # Hack to avoid may TCP packets
        self.bufferon()
        appiter=self.server.wsgiapp(environ, self.start_response)
        for data in appiter:
            self.write(data)
            self.bufferoff()
        self.bufferoff()
    def do_GET(self):
        self.serve('GET')
    def do_POST(self):
        self.serve('GET')
01292 class QWebWSGIServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
    """ QWebWSGIServer
        qweb_wsgi_autorun(wsgiapp,ip='127.0.0.1',port=8080,threaded=1)
        A WSGI HTTP server threaded or not and a function to automatically run your
        app according to the environement (either standalone, CGI or FastCGI).

        This feature is called QWeb autorun. If you want to  To use it on your
        application use the following lines at the end of the main application
        python file:

        if __name__ == '__main__':
            qweb.qweb_wsgi_autorun(your_wsgi_app)

        this function will select the approriate running mode according to the
        calling environement (http-server, FastCGI or CGI).
    """
    def __init__(self, wsgiapp, ip, port, threaded=1, log=1):
        BaseHTTPServer.HTTPServer.__init__(self, (ip, port), QWebWSGIHandler)
        self.wsgiapp = wsgiapp
        self.threaded = threaded
        self.log = log
    def process_request(self,*p):
        if self.threaded:
            return SocketServer.ThreadingMixIn.process_request(self,*p)
        else:
            return BaseHTTPServer.HTTPServer.process_request(self,*p)
def qweb_wsgi_autorun(wsgiapp,ip='127.0.0.1',port=8080,threaded=1,log=1,callback_ready=None):
    if sys.platform=='win32':
        fcgi=0
    else:
        fcgi=1
        sock = socket.fromfd(0, socket.AF_INET, socket.SOCK_STREAM)
        try:
            sock.getpeername()
        except socket.error, e:
            if e[0] == errno.ENOTSOCK:
                fcgi=0
    if fcgi or os.environ.has_key('REQUEST_METHOD'):
        import fcgi
        fcgi.WSGIServer(wsgiapp,multithreaded=False).run()
    else:
        if log:
            print 'Serving on %s:%d'%(ip,port)
        s=QWebWSGIServer(wsgiapp,ip=ip,port=port,threaded=threaded,log=log)
        if callback_ready:
            callback_ready()
        try:
            s.serve_forever()
        except KeyboardInterrupt,e:
            sys.excepthook(*sys.exc_info())

#----------------------------------------------------------
# Qweb Documentation
#----------------------------------------------------------
def qweb_doc():
    body=__doc__
    for i in [QWebXml ,QWebHtml ,QWebForm ,QWebURL ,qweb_control ,QWebRequest ,QWebSession ,QWebWSGIServer ,qweb_wsgi_autorun]:
        n=i.__name__
        d=i.__doc__
        body+='\n\n%s\n%s\n\n%s'%(n,'-'*len(n),d)
    return body

    print qweb_doc()

#

Generated by  Doxygen 1.6.0   Back to index