{ "cells": [ { "cell_type": "markdown", "id": "6ca39b80-dd96-449e-91d2-42031c3a5e3f", "metadata": {}, "source": [ "# Tutorial about mutiprocessing using ray" ] }, { "cell_type": "markdown", "id": "5779d015-a270-4944-8159-6377635195b9", "metadata": {}, "source": [ "We will describe how to set up an analysis pipeline to process multiple datasets in parallel using the framework [ray](https://ray.io/)." ] }, { "cell_type": "code", "execution_count": null, "id": "930146a8-4017-4a6a-8b7d-ccf0037e9b37", "metadata": { "tags": [] }, "outputs": [], "source": [ "import sys\n", "import logging\n", "\n", "%matplotlib inline\n", "\n", "import numpy as np\n", "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "import ray\n", "\n", "import locan as lc" ] }, { "cell_type": "code", "execution_count": null, "id": "1b0095f4-3817-4084-9993-9b3e642bfab1", "metadata": { "tags": [] }, "outputs": [], "source": [ "lc.show_versions(dependencies=False, verbose=False)" ] }, { "cell_type": "markdown", "id": "38ac0b33-738a-44ad-8282-2544153acdf1", "metadata": {}, "source": [ "## Activate logging" ] }, { "cell_type": "code", "execution_count": null, "id": "29bf9112-cb7b-4b9c-bd19-ec0224e4c8a1", "metadata": { "tags": [] }, "outputs": [], "source": [ "logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')\n", "logger = logging.getLogger()" ] }, { "cell_type": "markdown", "id": "94931283-67d0-4005-b153-8d1541225f37", "metadata": {}, "source": [ "For changing the configuration logging has to be reloaded or the kernel be restarted." ] }, { "cell_type": "markdown", "id": "c3d02767-bf67-47b3-8732-5f266210ffc3", "metadata": {}, "source": [ "## Synthetic data" ] }, { "cell_type": "markdown", "id": "c0a57554-5bab-4034-a697-90ead8a32bd5", "metadata": {}, "source": [ "Simulate 3 datasets of localization data that is homogeneously Poisson distributed and treat them as files." ] }, { "cell_type": "code", "execution_count": null, "id": "718aeae8-a4d3-48ca-bada-d0d4183237e7", "metadata": {}, "outputs": [], "source": [ "rng = np.random.default_rng(seed=1)" ] }, { "cell_type": "code", "execution_count": null, "id": "c2703702-24e3-446e-bd50-3a4cf9c3161a", "metadata": {}, "outputs": [], "source": [ "locdatas = [lc.simulate_Poisson(intensity=1e-3, region=((0,1000), (0,1000)), seed=rng) for _ in range(3)]\n", "files = locdatas\n", "\n", "print(\"Element_counts:\", [locdata.meta.element_count for locdata in locdatas])" ] }, { "cell_type": "markdown", "id": "5494729d-661d-4f44-b41d-919e12d57c6d", "metadata": {}, "source": [ "## Analysis pipeline" ] }, { "cell_type": "markdown", "id": "ef41d726-1d46-461b-90b8-8ff66c101ffd", "metadata": {}, "source": [ "Define an analysis pipeline. Typically a pipeline processes a single file, which in this example will be a an element of locdatas.\n", "\n", "Within the analysis procedure there will be more random number generation involved. Therefore a correctly generated seed has to be passed." ] }, { "cell_type": "code", "execution_count": null, "id": "b9bd707b-f167-4c54-981d-6cd0ee98d0f6", "metadata": {}, "outputs": [], "source": [ "def computation(self, file, seed):\n", " logging.basicConfig(level=logging.INFO)\n", " logger.info(f'computation started for file: {file}')\n", " \n", " rng = np.random.default_rng(seed=seed)\n", " \n", " other_locdata = lc.simulate_Poisson(intensity=1e-3, region=((0,1000), (0,1000)), seed=rng)\n", " self.nn = lc.NearestNeighborDistances().compute(locdata=file, other_locdata=other_locdata)\n", " \n", " return self" ] }, { "cell_type": "markdown", "id": "c33ba275-1139-45d9-a767-4ea4f4b67eb4", "metadata": {}, "source": [ "## Run analysis in parallel" ] }, { "cell_type": "code", "execution_count": null, "id": "51949da6-12bf-42b6-8d47-1c071cdea091", "metadata": {}, "outputs": [], "source": [ "ray.init()\n", "# ray.init(num_cpus = 4)" ] }, { "cell_type": "code", "execution_count": null, "id": "b6c0362e-2629-4dc4-921b-d6d0c5bf3d25", "metadata": {}, "outputs": [], "source": [ "%%time\n", "@ray.remote\n", "def worker(file, seed):\n", " pipe = lc.Pipeline(computation=computation, file=file, seed=seed).compute()\n", " return pipe\n", "\n", "n_processes = len(files)\n", "ss = np.random.SeedSequence()\n", "child_seeds = ss.spawn(n_processes)\n", "\n", "futures = [worker.remote(file=file, seed=seed) for file, seed in zip(locdatas, child_seeds)]\n", "pipes = ray.get(futures)" ] }, { "cell_type": "markdown", "id": "23b5d32d-f762-48a2-a467-aa2df6b5cdf0", "metadata": {}, "source": [ "## Visualize the combined results" ] }, { "cell_type": "code", "execution_count": null, "id": "53a13f7a-94cc-4e83-bfac-8971eaa9aeba", "metadata": { "tags": [] }, "outputs": [], "source": [ "[pipe.meta for pipe in pipes]" ] }, { "cell_type": "code", "execution_count": null, "id": "a9111063-d43d-4d18-9f55-08fc05f9f8b5", "metadata": { "tags": [] }, "outputs": [], "source": [ "fig, ax = plt.subplots(nrows=1, ncols=1)\n", "for pipe in pipes:\n", " pipe.nn.hist(ax=ax)\n", "plt.show()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.8" }, "widgets": { "application/vnd.jupyter.widget-state+json": { "state": {}, "version_major": 2, "version_minor": 0 } } }, "nbformat": 4, "nbformat_minor": 5 }