Tutorial about mutiprocessing using ray¶
We will describe how to set up an analysis pipeline to process multiple datasets in parallel using the framework ray.
import sys
import logging
%matplotlib inline
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import ray
import locan as lc
lc.show_versions(dependencies=False, verbose=False)
Locan:
version: 0.22.0.dev32+g4bfc3ab8b
Python:
version: 3.11.14
System:
python-bits: 64
system: Linux
release: 7.0.0-1004-aws
version: #4-Ubuntu SMP PREEMPT Mon Apr 13 13:14:24 UTC 2026
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: None
LANG: C.UTF-8
LOCALE: {'language-code': 'en_US', 'encoding': 'UTF-8'}
Activate logging¶
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
For changing the configuration logging has to be reloaded or the kernel be restarted.
Synthetic data¶
Simulate 3 datasets of localization data that is homogeneously Poisson distributed and treat them as files.
rng = np.random.default_rng(seed=1)
locdatas = [lc.simulate_Poisson(intensity=1e-3, region=((0,1000), (0,1000)), seed=rng) for _ in range(3)]
files = locdatas
print("Element_counts:", [locdata.meta.element_count for locdata in locdatas])
Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.
Element_counts: [1001, 1026, 935]
Analysis pipeline¶
Define an analysis pipeline. Typically a pipeline processes a single file, which in this example will be a an element of locdatas.
Within the analysis procedure there will be more random number generation involved. Therefore a correctly generated seed has to be passed.
def computation(self, file, seed):
logging.basicConfig(level=logging.INFO)
logger.info(f'computation started for file: {file}')
rng = np.random.default_rng(seed=seed)
other_locdata = lc.simulate_Poisson(intensity=1e-3, region=((0,1000), (0,1000)), seed=rng)
self.nn = lc.NearestNeighborDistances().compute(locdata=file, other_locdata=other_locdata)
return self
Run analysis in parallel¶
ray.init()
# ray.init(num_cpus = 4)
2026-04-30 08:38:12,964 WARNING services.py:2063 -- WARNING: The object store is using /tmp instead of /dev/shm because /dev/shm has only 67108864 bytes available. This will harm performance! You may be able to free up space by deleting files in /dev/shm. If you are inside a Docker container, you can increase /dev/shm size by passing '--shm-size=1.75gb' to 'docker run' (or add it to the run_options list in a Ray cluster config). Make sure to set this to more than 30% of available RAM.
2026-04-30 08:38:13,026 INFO worker.py:1832 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
%%time
@ray.remote
def worker(file, seed):
pipe = lc.Pipeline(computation=computation, file=file, seed=seed).compute()
return pipe
n_processes = len(files)
ss = np.random.SeedSequence()
child_seeds = ss.spawn(n_processes)
futures = [worker.remote(file=file, seed=seed) for file, seed in zip(locdatas, child_seeds)]
pipes = ray.get(futures)
CPU times: user 777 ms, sys: 196 ms, total: 973 ms
Wall time: 4.34 s
Visualize the combined results¶
[pipe.meta for pipe in pipes]
[identifier: "1"
method {
name: "Pipeline"
parameter: "{\'computation\': <function computation at 0x7a05f3f240e0>, \'file\': <locan.data.locdata.LocData object at 0x7a0519517590>, \'seed\': SeedSequence(\n entropy=211879663237729780593762805934173430752,\n spawn_key=(0,),\n)}"
}
creation_time {
seconds: 1777538297
nanos: 478579000
},
identifier: "1"
method {
name: "Pipeline"
parameter: "{\'computation\': <function computation at 0x7ddfc006c0e0>, \'file\': <locan.data.locdata.LocData object at 0x7ddee72830d0>, \'seed\': SeedSequence(\n entropy=211879663237729780593762805934173430752,\n spawn_key=(1,),\n)}"
}
creation_time {
seconds: 1777538297
nanos: 478571000
},
identifier: "1"
method {
name: "Pipeline"
parameter: "{\'computation\': <function computation at 0x7ddfc006c0e0>, \'file\': <locan.data.locdata.LocData object at 0x7ddfc2fcb590>, \'seed\': SeedSequence(\n entropy=211879663237729780593762805934173430752,\n spawn_key=(2,),\n)}"
}
creation_time {
seconds: 1777538297
nanos: 494677000
}]
fig, ax = plt.subplots(nrows=1, ncols=1)
for pipe in pipes:
pipe.nn.hist(ax=ax)
plt.show()