Lennard-Jones equation of state#
We illustrate the usage of pantarei
to manage the calculation of the equation of state of a Lennard-Jones fluid.
Setup#
Make sure you have the following packages installed
pantarei (from framagit)
atooms-database
atooms-dynamics
atooms-pp
matplotlib
Basic imports
import os
import random
import numpy
import matplotlib.pyplot as plt
Production#
Simulate the Lennard-Jones fluid along some isotherms. Tag this dataset isotherm
to distiguish it from others.
from pantarei import Task, Job
# Sequential
# job = Task(run_md, tag='isotherm')
# Parallel (if scheduler is available on the node)
job = Job(Task(run_md, tag='isotherm'))
for T in [2.0, 3.0]:
for rho in numpy.arange(0.3, 1.0, 0.1):
job(rho=round(rho, 12), T=T, N=256, seed=1, rmsd=None, nsteps=40000, timestep=0.002,
output_path='/tmp/eos/N{N}/T{T}/rho{rho}/seed{seed}')
Analysis#
Browse the pantarei cache and get the simulation data along the isotherms (syntax: function name - tag; can match wildcards; if no tag, just the function name). These are all the entries of the dataset.
from pantarei import pantarei
ds = pantarei.browse('run_md-isotherm')
ds.columns()
F_s(k,t)
F_s_1(k,t)
F_s_2(k,t)
N
P
T
TSP
_dirname
artifacts
backend
berendsen_tau
blocks
concentration
conserved_energy
density
equilibration_dynamics
fix_cm_period
job_end
job_node
job_queue
job_start
md5_hash
model
nsteps
output_path
parallel
path
postprocess
potential_energy
pressure
production_dynamics
production_rate
rho
rmsd
seed
steps
tau(k)
tau_1(k)
tau_2(k)
temperature
thermostat_mass
timestep
unique_seed
verbose
wall_time
x
Print thermodynamic data, sorting first by temperature, then density
ds.pprint(columns=['T', 'rho', 'pressure', 'potential_energy'], sort_by=('T', 'rho'))
T rho pressure potential_energy
----------------------------------------------
2.0 0.3 0.5898617854368878 -1.0069639862394693
2.0 0.4 0.8233723392708177 -1.3299896189775564
2.0 0.5 1.1131326522161213 -1.6685100047940105
2.0 0.6 1.5707431636408715 -1.9817391798883273
2.0 0.7 2.1742059563044656 -2.301718465392361
2.0 0.8 3.278132889435198 -2.5668502145490066
2.0 0.9 5.033272134058294 -2.7667899993267735
3.0 0.3 1.0088127633114634 -0.9108300375209232
3.0 0.4 1.4353803075126594 -1.2207187684255687
3.0 0.5 1.9958267838780035 -1.5150167078096914
3.0 0.6 2.6816024264639733 -1.817581849144513
3.0 0.7 3.783999748749068 -2.064617517694943
3.0 0.8 5.411608963091276 -2.258963353688038
3.0 0.9 7.905726096178788 -2.355425440614277
More control on formatting by looping over the data entries
fmt = ' | '.join(['{T}', '{rho:.2f}', '{pressure:.4f}'])
for data in ds.find(sort_by=('T', 'rho')):
print('| ' + fmt.format(**data) + ' |')
Temperature |
Density |
Pressure |
---|---|---|
2.0 |
0.3 |
0.5899 |
2.0 |
0.4 |
0.8234 |
2.0 |
0.5 |
1.1131 |
2.0 |
0.6 |
1.5707 |
2.0 |
0.7 |
2.1742 |
2.0 |
0.8 |
3.2781 |
2.0 |
0.9 |
5.0333 |
3.0 |
0.3 |
1.0088 |
3.0 |
0.4 |
1.4354 |
3.0 |
0.5 |
1.9958 |
3.0 |
0.6 |
2.6816 |
3.0 |
0.7 |
3.7840 |
3.0 |
0.8 |
5.4116 |
3.0 |
0.9 |
7.9057 |
Plot the equation of state. Sorting is a bit convoluted this way…
fig, ax = plt.subplots()
for T in ds['T'].unique():
mask = ds['T'] == T
rho, Z = ds['rho'], (ds['pressure'] * ds['density']) / ds['T']
rho, Z = rho[mask], Z[mask]
# Sort by density
rho, Z = zip(*sorted(zip(rho, Z), key=lambda _: _[0]))
ax.plot(rho, Z, '-o', label=f'T={T}')
ax.set_xlabel('Density')
ax.set_ylabel('Compressibility factor')
ax.legend()
fig.show()
fig.savefig('lj_eos.png')
It is simpler by using dataset sorting mechanism
from pantarei import query
fig, ax = plt.subplots()
for T in ds['T'].unique():
data = ds.find(query.T == T, sort_by='rho')
rho, Z = data['rho'], (data['pressure'] * data['density']) / data['T']
ax.plot(rho, Z, '-o', label=f'T={T}')
ax.set_xlabel('Density')
ax.set_ylabel('Compressibility factor')
ax.legend()
fig.show()
Alternatively collect the artifacts in a dataset. We have almost the same info (except information about jobs)
from pantarei import Dataset
ds = Dataset()
ds.insert('/tmp/eos/**/*.yaml')
ds.insert('/tmp/eos/**/*.fskt*')
print(ds.columns())
['F_s(k,t)', 'F_s_1(k,t)', 'F_s_2(k,t)', 'N', 'T', 'TSP', '_dirname', 'backend', 'block', 'block size', 'block steps', 'cell_side', 'cell_volume', 'cm_velocity_norm', 'composition', 'concentration', 'density', 'duration', 'frames', 'grandcanonical', 'md5_hash', 'mean_conserved energy per particle', 'mean_density', 'mean_potential energy per particle', 'mean_pressure', 'mean_temperature', 'mean_total energy per particle', 'model', 'number_of_particles', 'number_of_species', 'path', 'rho', 'rmsd', 'seed', 'size_dispersion', 'species', 'std_conserved energy per particle', 'std_density', 'std_potential energy per particle', 'std_pressure', 'std_temperature', 'std_total energy per particle', 'steps', 'timestep', 'wall_time']
Plot \(F_s(k,t)\). Note it is a 3-dimensional array: (k, t, Fs(k,t))
from pantarei import query
fig, ax = plt.subplots()
for data in ds.find(query.T == 2.0, sort_by='rho'):
ax.semilogx(*data['F_s_1(k,t)'][1:], label=fr'$\rho$={data["rho"]}')
ax.legend()
ax.set_xlabel('t')
ax.set_ylabel('$F_{s,1}(k,t)$') # TODO: double subscript in F_s_1 not allowed
ax.legend()
fig.show()
fig.savefig('lj_fskt.png')
Library#
Helper function to make a unique seed how to of
def _unique_seed(seed, **kwargs):
"""Initialize random numbers with an almost unique seed for each parameter combination"""
import random
import numpy
from hashlib import md5
kwargs['seed'] = seed
kwargs = {kwargs[key] for key in sorted(kwargs)}
md5_hash = md5(str(kwargs).encode()).hexdigest()
_seed = int(md5_hash, 16)
_seed = int(str(_seed)[:9])
random.seed(_seed)
numpy.random.seed(_seed)
return _seed
Rather elaborate driver for MD simulation. It melts a crystal, equilibrate it, make a production run and finally postprocess the trajectory to measure some correlation functions. The results are returned as a dict and optionally written to disk in the output_path
folder.
from atooms.system import System, Thermostat, Barostat
from atooms import database
from atooms.backends.f90 import Interaction, VerletList
from atooms.dynamics import Berendsen, VelocityVerlet, NosePoincare
from atooms.simulation import Simulation, Scheduler, store, write_trajectory, write_thermo, target_rmsd, target_steps
from atooms.trajectory import Trajectory, TrajectoryRam, TrajectoryXYZ
from atooms.simulation.reports import *
import atooms.postprocessing as pp
def run_md(N, T, P=None, rho=None, nsteps=1000000, rmsd=4.0,
x=0.8, model='lennard_jones', seed=1, output_path=None, parallel=False,
production_rate=2, equilibration_dynamics="nose-poincare",
thermostat_mass=5.0, berendsen_tau=100.0, timestep=0.004, postprocess=None,
fix_cm_period=20000, blocks=200, verbose=False):
"""
Run a simulation with atooms-dynamics store and return a dict of
post-processed data
"""
_locals = locals()
# Defaults
if postprocess is None: postprocess = ['fskt']
from atooms.core.utils import setup_logging
if verbose: setup_logging(level=20)
# Sanity check
assert not (P is None and rho is None), 'provide either P or rho'
if P and rho:
P = None
# Create directory to store artifacts
from atooms.core.utils import mkdir
if output_path is not None:
output_path = output_path.format(**_locals)
mkdir(output_path)
# Create system of N particles in contact with a thermostat
# The seed is parametrized with T so that every run is different
import numpy
unique_seed = _unique_seed(seed, P=P, rho=rho, T=T, N=N, x=x)
system = System(N=N)
system.composition = {'A': round(N*x), 'B': N - round(N*x)}
system.species_layout = 'F'
system.temperature = T
system.density = 1.0
if rho is not None:
system.density = rho
# Specify interaction
_model = database.model(model)
system.interaction = Interaction(_model, parallel=parallel, helpers='helpers_3d.f90')
system.interaction.neighbor_list = VerletList(binning=False, parallel=parallel,
helpers='helpers_3d.f90')
# Melt
system.thermostat = Thermostat(temperature=4.0, relaxation_time=berendsen_tau)
bck = Berendsen(system, timestep=timestep / 4)
sim = Simulation(bck, enable_speedometer=verbose)
sim.run(10000)
# Simulation backend for equilibration
assert equilibration_dynamics in ['berendsen', 'nose-poincare']
if equilibration_dynamics == 'nose-poincare': assert P is None
if equilibration_dynamics == 'berendsen':
system.thermostat = Thermostat(temperature=T, relaxation_time=berendsen_tau)
if P is not None:
system.barostat = Barostat(pressure=P, relaxation_time=1e4)
bck = Berendsen(system, timestep=timestep / 2)
if equilibration_dynamics == 'nose-poincare':
system.thermostat = Thermostat(temperature=T, mass=thermostat_mass)
bck = NosePoincare(system, timestep=timestep / 2)
sim = Simulation(bck, enable_speedometer=verbose)
# Simulation callbacks
sim.add(store, 200, ['density', 'pressure', 'temperature',
'potential energy per particle',
'total energy per particle'])
if output_path is not None:
sim.output_path = os.path.join(output_path, 'equilibration')
sim.add(write_thermo, 1000)
# Target rmsd, but stop at steps
if rmsd is not None:
sim.add(target_rmsd, 20000, rmsd)
# Fix the CM regularly, this is crucial for long runs with Berendsen
sim.add(lambda sim: sim.system.fix_momentum(), fix_cm_period)
sim.add(target_steps, nsteps, nsteps)
# Equilibration run
sim.run(nsteps)
if output_path is not None:
db = {}
db.update({'model': model})
db.update({'steps': sim.current_step})
db.update(report_system(system))
db.update(report_simulation(sim))
db.update(report_stats(sim.data))
import yaml
with open(os.path.join(output_path, 'equilibration.yaml'), 'w') as fh:
yaml.dump(db, fh, sort_keys=False)
# Production run
if P is not None:
# Set density to the average one during the thermalisation run (second half)
# This way we avoid small fluctuations due to finite size
half = len(sim.data['density']) // 2
density = numpy.mean(sim.data['density'][-half:])
system.density = density
# Make sure there are alyaws about blocks blocks
def optimal_block(target_n_blocks):
for exponent in range(5, 20):
n_blocks = sim.current_step // 2**exponent
if n_blocks < target_n_blocks:
break
return [0] + [2**i for i in range(exponent)]
scheduler_ram = Scheduler(block=optimal_block(blocks))
scheduler_xyz = Scheduler(block=optimal_block(blocks))
nsteps = int(production_rate * 200 * scheduler_xyz.block[-1])
system.thermostat = Thermostat(temperature=T, mass=thermostat_mass)
bck = NosePoincare(system, timestep=timestep)
sim = Simulation(bck, enable_speedometer=verbose)
# Store trajectories
from atooms.trajectory.ram import TrajectoryRamLight
th = TrajectoryRamLight()
th.timestep = bck.timestep
func = lambda sim: sim.backend.conserved_energy / len(sim.system.particle)
sim.add(write_trajectory, scheduler_ram, trajectory=th)
sim.add(store, Scheduler(calls=1000), ['density', 'pressure', 'temperature',
'potential energy per particle',
('conserved energy per particle', func)])
# Production run
sim.run(nsteps)
# Data to be returned
data = {
'unique_seed': unique_seed,
'concentration': sim.system.concentration,
'rmsd': sim.rmsd.item(),
'steps': sim.current_step,
'pressure': numpy.mean(sim.data['pressure']).item(),
'temperature': numpy.mean(sim.data['temperature']).item(),
'potential_energy': numpy.mean(sim.data['potential energy per particle']).item(),
'conserved_energy': numpy.mean(sim.data['conserved energy per particle']).item(),
'density': numpy.mean(sim.data['density']).item(),
'output_path': output_path
}
data.update(report_simulation(sim))
# Stats and logs
if output_path is not None:
import yaml
db = {}
db.update({'model': model})
db.update(report_system(system))
db.update(report_simulation(sim))
db.update(report_trajectory(th))
db.update(report_stats(sim.data))
with open(os.path.join(output_path, 'production.yaml'), 'w') as fh:
yaml.dump(db, fh, sort_keys=False)
# Post processing
path = None
if output_path is not None:
path = os.path.join(output_path, 'production.{symbol}.{tag}')
if 'fskt' in postprocess:
from atooms.postprocessing.helpers import logx_grid, filter_species
tgrid = [0.0] + logx_grid(th.timestep, th.total_time * 0.5, 60)
cf = pp.SelfIntermediateScattering(th, kgrid=[7.25], tgrid=tgrid)
cf.do(path)
data.update({**cf.results, **cf.analysis})
cf = pp.Partial(pp.SelfIntermediateScattering, th, kgrid=[7.25], tgrid=tgrid)
cf.do(path)
data.update({**cf.results, **cf.analysis})
if 'gr' in postprocess:
cf = pp.Partial(pp.RadialDistributionFunction, th, norigins=-1, dr=0.01)
cf.compute()
data.update(cf.results)
if 'vacf' in postprocess:
tgrid = [0.0] + logx_grid(th.timestep, 10.0, 160)
cf = pp.Partial(pp.VelocityAutocorrelation, th, tgrid=tgrid, norigins=-1)
cf.compute()
data.update(cf.results)
if 'msd' in postprocess:
tgrid = [0.0] + logx_grid(th.timestep, 10.0, 160)
cf = pp.Partial(pp.MeanSquareDisplacement, th, tgrid=tgrid, norigins=-1)
cf.compute()
data.update(cf.results)
# Store artifacts as the whole output_path
if output_path:
data['artifacts'] = output_path
from atooms.core.utils import clear_logging
clear_logging()
return data