{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# How to use the PyZeta AOP framework" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Imports" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "# some standard library imports\n", "from os import remove\n", "from typing import Any, List, Protocol, runtime_checkable\n", "\n", "# the core imports for writing your custom logic and the plugin\n", "from pyzeta.framework.aop.analyzers.profiling_advice import ProfilingAdvice\n", "from pyzeta.framework.aop.analyzers.stats import StatsReader\n", "from pyzeta.framework.aop.point_cut import PointCut\n", "from pyzeta.framework.aop.rule import Rule\n", "from pyzeta.framework.aop.aspect import Aspect\n", "from pyzeta.framework.aop.advice import Advice\n", "from pyzeta.framework.initialization.initialization_handler import (\n", " PyZetaInitializationHandler,\n", ")\n", "from pyzeta.framework.ioc.container_provider import ContainerProvider\n", "\n", "PyZetaInitializationHandler.initPyZetaServices()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Prepare the Example Class" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "original method: PyZeta.MyClass!\n", "Hello World!\n" ] } ], "source": [ "class MyClass:\n", " \"Simple example class with two methods, one of which is to be profiled.\"\n", "\n", " def __init__(self, attr1: str, attr2: bool) -> None:\n", " \"Initialize the example class with example data.\"\n", " self.attr1 = attr1\n", " self.attr2 = attr2\n", "\n", " def method1(self, arg1: int) -> None:\n", " \"Print an instance attribute and a value calculated from an argument.\"\n", " counter = self._count(arg1)\n", " print(f\"original method: {self.attr2}, {counter}!\")\n", "\n", " def _count(self, limit: int) -> None:\n", " \"Count stuff to make profiles look more interesting.\"\n", " counter = 0\n", " for _ in range(limit):\n", " counter += 1\n", " return counter\n", "\n", " def method2(self) -> str:\n", " \"Return a constant string after printing an instance attribute.\"\n", " print(f\"original method: {self.attr1}!\")\n", " return \"Hello World!\"\n", "\n", " def method3(self) -> str:\n", " \"Actually the same as `method2` but duplicated for container example.\"\n", " print(f\"original method: {self.attr1}!\")\n", " return \"Bye World!\"\n", "\n", "\n", "# create an object for later demonstrations;\n", "# aspects apply globally, even to objects created before advice application!\n", "obj = MyClass(\"PyZeta.MyClass\", False)\n", "# verify the original functionality of MyClass.method2\n", "print(obj.method2())" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### First Example: Use the Pre-Defined Profiling Advice" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# create the advice and a point cut at which to apply it\n", "fileName = \"myclass_method1\"\n", "profilingAdvice: Advice[None, Any] = ProfilingAdvice(fileName)\n", "pointCut: PointCut = PointCut(\".*1\")\n", "# remove any previous statistics files\n", "try:\n", " remove(fileName + ProfilingAdvice.extension)\n", "except FileNotFoundError:\n", " print(\"no previous stats file to remove!\")\n", "# combine advice and point cut into a list of rules\n", "rules: List[Rule[None, Any]] = [Rule(pointCut, profilingAdvice)]\n", "# create the aspect from the list of rules\n", "aspect: Aspect[MyClass, None, Any] = Aspect(rules=rules)\n", "# apply the aspect to the example class - profiling of MyClass.method1 is now enabled!\n", "aspect(MyClass)\n" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "original method: False, 1000000!\n", "original method: PyZeta.MyClass!\n", "Hello World!\n", "--------------------------------------------------\n", "Wed Jul 26 11:41:34 2023 myclass_method1.cprofile\n", "\n", " 29 function calls in 0.103 seconds\n", "\n", " Ordered by: internal time\n", "\n", " ncalls tottime percall cumtime percall filename:lineno(function)\n", " 1 0.103 0.103 0.103 0.103 3516148056.py:14(_count)\n", " 1 0.000 0.000 0.000 0.000 socket.py:613(send)\n", " 2 0.000 0.000 0.000 0.000 iostream.py:535(write)\n", " 1 0.000 0.000 0.103 0.103 3516148056.py:9(method1)\n", " 1 0.000 0.000 0.000 0.000 {built-in method builtins.print}\n", " 1 0.000 0.000 0.000 0.000 iostream.py:203(schedule)\n", " 1 0.000 0.000 0.000 0.000 threading.py:1185(is_alive)\n", " 2 0.000 0.000 0.000 0.000 iostream.py:465(_schedule_flush)\n", " 2 0.000 0.000 0.000 0.000 iostream.py:444(_is_master_process)\n", " 1 0.000 0.000 0.000 0.000 profiling_advice.py:44(_stop)\n", " 1 0.000 0.000 0.000 0.000 threading.py:1118(_wait_for_tstate_lock)\n", " 1 0.000 0.000 0.000 0.000 iostream.py:90(_event_pipe)\n", " 2 0.000 0.000 0.000 0.000 {built-in method posix.getpid}\n", " 1 0.000 0.000 0.000 0.000 {method 'acquire' of '_thread.lock' objects}\n", " 2 0.000 0.000 0.000 0.000 {method 'write' of '_io.StringIO' objects}\n", " 2 0.000 0.000 0.000 0.000 {built-in method builtins.isinstance}\n", " 2 0.000 0.000 0.000 0.000 {method '__exit__' of '_thread.RLock' objects}\n", " 2 0.000 0.000 0.000 0.000 {built-in method builtins.len}\n", " 1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}\n", " 1 0.000 0.000 0.000 0.000 threading.py:568(is_set)\n", " 1 0.000 0.000 0.000 0.000 {method 'append' of 'collections.deque' objects}\n", "\n", "\n" ] } ], "source": [ "# run MyClass.method1 to record a profile\n", "obj.method1(arg1=1_000_000)\n", "# verify that MyClass.method2 was not affected\n", "print(obj.method2())\n", "# use the static helper to display the profile\n", "print(\"-\" * 50)\n", "StatsReader.printStats(filename=fileName + ProfilingAdvice.extension)\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Second Example: Define Custom Advice" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "# define two pieces of advice for subsequent application\n", "advice1: Advice[str, Any] = Advice(\n", " lambda *args, **kwargs: print(f\"advice1 pre: {args=}, {kwargs=}\"),\n", " lambda returnArg, *args, **kwargs: (\n", " f\"advice1 post: {returnArg=}, {args=}, {kwargs=}\"\n", " ),\n", ")\n", "advice2: Advice[str, Any] = Advice(\n", " lambda *args, **kwargs: print(f\"advice2 pre: {args=}, {kwargs=}\"),\n", " lambda returnArg, *args, **kwargs: (\n", " f\"advice2 post: {returnArg=}, {args=}, {kwargs=}\"\n", " ),\n", ")\n", "# define a point cut that filters for MyClass.method2\n", "pointCut: PointCut = PointCut(\".*2\")\n", "# combine the pieces of advice and the point cut into a list of rules\n", "rules: List[Rule[str, Any]] = [\n", " Rule(pointCut, advice1),\n", " Rule(pointCut, advice2),\n", "]\n", "# create the aspect from the list of rules\n", "aspect: Aspect[MyClass, str, Any] = Aspect(rules=rules)\n", "# apply the aspect - MyClass.method2 is now wrapped with additional print statements!\n", "aspect(MyClass)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "advice2 pre: args=(<__main__.MyClass object at 0x7f63504053d0>,), kwargs={}\n", "advice1 pre: args=(<__main__.MyClass object at 0x7f63504053d0>,), kwargs={}\n", "original method: PyZeta.MyClass!\n", "advice2 post: returnArg=\"advice1 post: returnArg='Hello World!', args=(<__main__.MyClass object at 0x7f63504053d0>,), kwargs={}\", args=(<__main__.MyClass object at 0x7f63504053d0>,), kwargs={}\n" ] } ], "source": [ "# run MyClass.method2 to observe the logic added by the aspect\n", "print(obj.method2())\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Third Example: Use Advice with Containers" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "# use the same general setup as in the second example but for method3\n", "advice3: Advice[str, Any] = Advice(\n", " lambda *args, **kwargs: print(f\"advice3 pre: {args=}, {kwargs=}\"),\n", " lambda returnArg, *args, **kwargs: (\n", " f\"advice3 post: {returnArg=}, {args=}, {kwargs=}\"\n", " ),\n", ")\n", "advice4: Advice[str, Any] = Advice(\n", " lambda *args, **kwargs: print(f\"advice4 pre: {args=}, {kwargs=}\"),\n", " lambda returnArg, *args, **kwargs: (\n", " f\"advice4 post: {returnArg=}, {args=}, {kwargs=}\"\n", " ),\n", ")\n", "# define a point cut that filters for MyClass.method3\n", "pointCut: PointCut = PointCut(\".*3\")\n", "# combine the pieces of advice and the point cut into a list of rules\n", "rules: List[Rule[str, Any]] = [\n", " Rule(pointCut, advice3),\n", " Rule(pointCut, advice4),\n", "]\n", "# create the aspect from the list of rules\n", "aspect: Aspect[MyClass, str, Any] = Aspect(rules=rules)\n" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "# create and interface and register MyClass as its implementation\n", "@runtime_checkable\n", "class MyInterface(Protocol):\n", " def method3(self) -> str:\n", " ...\n", "\n", "\n", "ContainerProvider.getContainer().registerAsSingleton(MyInterface, obj)\n", "assert ContainerProvider.getContainer().tryResolve(MyInterface) is obj, \"ERROR\"" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "original method: PyZeta.MyClass!\n", "Bye World!\n", "advice4 pre: args=(<__main__.MyClass object at 0x7f63504053d0>,), kwargs={}\n", "advice3 pre: args=(<__main__.MyClass object at 0x7f63504053d0>,), kwargs={}\n", "original method: PyZeta.MyClass!\n", "advice4 post: returnArg=\"advice3 post: returnArg='Bye World!', args=(<__main__.MyClass object at 0x7f63504053d0>,), kwargs={}\", args=(<__main__.MyClass object at 0x7f63504053d0>,), kwargs={}\n" ] } ], "source": [ "# run MyClass.method3 to observe the logic before adding the aspect\n", "obj = ContainerProvider.getContainer().tryResolve(MyInterface)\n", "print(obj.method3())\n", "# add the aspect\n", "ContainerProvider.registerAspectGlobally(aspect, MyInterface)\n", "# run MyClass.method3 to observe the logic added by the aspect\n", "# note that the instance must be resolved from the container, else no aspect!\n", "obj = ContainerProvider.getContainer().tryResolve(MyInterface)\n", "print(obj.method3())\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.2" } }, "nbformat": 4, "nbformat_minor": 4 }