test.py SampleΒΆ

  1# This extension is used mainly for testing purposes - it is not
  2# designed to be a simple sample, but instead is a hotch-potch of things
  3# that attempts to exercise the framework.
  4
  5from isapi import isapicon
  6from isapi.simple import SimpleExtension
  7import sys, os, stat
  8
  9if hasattr(sys, "isapidllhandle"):
 10    import win32traceutil
 11
 12# We use the same reload support as 'advanced.py' demonstrates.
 13from isapi import InternalReloadException
 14import win32event, win32file, winerror, win32con, threading
 15
 16# A watcher thread that checks for __file__ changing.
 17# When it detects it, it simply sets "change_detected" to true.
 18class ReloadWatcherThread(threading.Thread):
 19    def __init__(self):
 20        self.change_detected = False
 21        self.filename = __file__
 22        if self.filename.endswith("c") or self.filename.endswith("o"):
 23            self.filename = self.filename[:-1]
 24        self.handle = win32file.FindFirstChangeNotification(
 25            os.path.dirname(self.filename),
 26            False,  # watch tree?
 27            win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
 28        )
 29        threading.Thread.__init__(self)
 30
 31    def run(self):
 32        last_time = os.stat(self.filename)[stat.ST_MTIME]
 33        while 1:
 34            try:
 35                rc = win32event.WaitForSingleObject(self.handle, win32event.INFINITE)
 36                win32file.FindNextChangeNotification(self.handle)
 37            except win32event.error as details:
 38                # handle closed - thread should terminate.
 39                if details.winerror != winerror.ERROR_INVALID_HANDLE:
 40                    raise
 41                break
 42            this_time = os.stat(self.filename)[stat.ST_MTIME]
 43            if this_time != last_time:
 44                print("Detected file change - flagging for reload.")
 45                self.change_detected = True
 46                last_time = this_time
 47
 48    def stop(self):
 49        win32file.FindCloseChangeNotification(self.handle)
 50
 51
 52def TransmitFileCallback(ecb, hFile, cbIO, errCode):
 53    print("Transmit complete!")
 54    ecb.close()
 55
 56
 57# The ISAPI extension - handles requests in our virtual dir, and sends the
 58# response to the client.
 59class Extension(SimpleExtension):
 60    "Python test Extension"
 61
 62    def __init__(self):
 63        self.reload_watcher = ReloadWatcherThread()
 64        self.reload_watcher.start()
 65
 66    def HttpExtensionProc(self, ecb):
 67        # NOTE: If you use a ThreadPoolExtension, you must still perform
 68        # this check in HttpExtensionProc - raising the exception from
 69        # The "Dispatch" method will just cause the exception to be
 70        # rendered to the browser.
 71        if self.reload_watcher.change_detected:
 72            print("Doing reload")
 73            raise InternalReloadException
 74
 75        if ecb.GetServerVariable("UNICODE_URL").endswith("test.py"):
 76            file_flags = (
 77                win32con.FILE_FLAG_SEQUENTIAL_SCAN | win32con.FILE_FLAG_OVERLAPPED
 78            )
 79            hfile = win32file.CreateFile(
 80                __file__,
 81                win32con.GENERIC_READ,
 82                0,
 83                None,
 84                win32con.OPEN_EXISTING,
 85                file_flags,
 86                None,
 87            )
 88            flags = (
 89                isapicon.HSE_IO_ASYNC
 90                | isapicon.HSE_IO_DISCONNECT_AFTER_SEND
 91                | isapicon.HSE_IO_SEND_HEADERS
 92            )
 93            # We pass hFile to the callback simply as a way of keeping it alive
 94            # for the duration of the transmission
 95            try:
 96                ecb.TransmitFile(
 97                    TransmitFileCallback,
 98                    hfile,
 99                    int(hfile),
100                    "200 OK",
101                    0,
102                    0,
103                    None,
104                    None,
105                    flags,
106                )
107            except:
108                # Errors keep this source file open!
109                hfile.Close()
110                raise
111        else:
112            # default response
113            ecb.SendResponseHeaders("200 OK", "Content-Type: text/html\r\n\r\n", 0)
114            print("<HTML><BODY>", file=ecb)
115            print("The root of this site is at", ecb.MapURLToPath("/"), file=ecb)
116            print("</BODY></HTML>", file=ecb)
117            ecb.close()
118        return isapicon.HSE_STATUS_SUCCESS
119
120    def TerminateExtension(self, status):
121        self.reload_watcher.stop()
122
123
124# The entry points for the ISAPI extension.
125def __ExtensionFactory__():
126    return Extension()
127
128
129# Our special command line customization.
130# Pre-install hook for our virtual directory.
131def PreInstallDirectory(params, options):
132    # If the user used our special '--description' option,
133    # then we override our default.
134    if options.description:
135        params.Description = options.description
136
137
138# Post install hook for our entire script
139def PostInstall(params, options):
140    print()
141    print("The sample has been installed.")
142    print("Point your browser to /PyISAPITest")
143
144
145# Handler for our custom 'status' argument.
146def status_handler(options, log, arg):
147    "Query the status of something"
148    print("Everything seems to be fine!")
149
150
151custom_arg_handlers = {"status": status_handler}
152
153if __name__ == "__main__":
154    # If run from the command-line, install ourselves.
155    from isapi.install import *
156
157    params = ISAPIParameters(PostInstall=PostInstall)
158    # Setup the virtual directories - this is a list of directories our
159    # extension uses - in this case only 1.
160    # Each extension has a "script map" - this is the mapping of ISAPI
161    # extensions.
162    sm = [ScriptMapParams(Extension="*", Flags=0)]
163    vd = VirtualDirParameters(
164        Name="PyISAPITest",
165        Description=Extension.__doc__,
166        ScriptMaps=sm,
167        ScriptMapUpdate="replace",
168        # specify the pre-install hook.
169        PreInstall=PreInstallDirectory,
170    )
171    params.VirtualDirs = [vd]
172    # Setup our custom option parser.
173    from optparse import OptionParser
174
175    parser = OptionParser("")  # blank usage, so isapi sets it.
176    parser.add_option(
177        "",
178        "--description",
179        action="store",
180        help="custom description to use for the virtual directory",
181    )
182
183    HandleCommandLine(
184        params, opt_parser=parser, custom_arg_handlers=custom_arg_handlers
185    )