partition support and operation

This commit is contained in:
le 2023-03-16 19:54:59 +08:00
parent d28ceaeeb4
commit b1f0412063
7 changed files with 719 additions and 311 deletions

View File

@ -4,4 +4,6 @@ from .blockdevice import BlockDevice
from .filesystem import Filesystem, MBR, GPT
from .partition import *
from .user_guides import *
from .validators import *
from .validators import *
from .lvmpartition import *
from .vgpartition import *

View File

@ -0,0 +1,113 @@
from __future__ import annotations
import time
import logging
import json
import time
import pathlib
import random
from typing import Optional, Dict, Any,List, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
from .blockdevice import BlockDevice
_: Any
from .lvmpartition import LvmPartition
from .validators import valid_fs_type
from ..exceptions import DiskError, SysCallError
from ..general import SysCommand
from ..output import log
from ..storage import storage
from .dmcryptdev import DMCryptDev, MapperDev
from .lvmdev import LvmDev
def find_lvm(part):
if "children" in part:
for pp in part["children"]:
if pp["type"] == "lvm":
yield pp["name"]
for n in find_lvm(pp):
yield n
def find_allname(part):
names = []
names.append(part["name"])
if "children" in part:
for pp in part["children"]:
names += find_allname(pp)
return names
def all_pvnames():
pvlist = []
output2 = SysCommand(f'pvs --reportformat json').decode()
if output2:
data2 = json.loads(output2)
report = data2["report"][0]
if "pv" in report and len(report["pv"]) > 0:
pvlist += report["pv"]
return pvlist
def remove_lvm(devpath):
output = SysCommand(f'lsblk --json {devpath}').decode()
if output:
data = json.loads(output)
dev_tree = data["blockdevices"][0]
allnames = [f'/dev/{x}' for x in find_allname(dev_tree)]
# print("dbg3// all dev names:", allnames)
pvs = all_pvnames()
# print("dbg4// pvs:", pvs)
for pv in pvs:
if pv["vg_name"] and pv["pv_name"] in allnames:
# print("dbg5// remove group:", pv)
SysCommand(f'vgremove -f {pv["vg_name"]}', peak_output=True)
def lvm_layout(lvmdrives:List[Dict[str,Any]]) -> list:
# pv vg
for drive in lvmdrives:
if len(pvs := drive.pvs) > 0:
for pv in pvs:
SysCommand(f"mkfs.ext4 -F {pv}")
time.sleep(0.1)
exist_vg = drive.vg_name and drive.vg_name in [item['vg_name'] for item in all_pvnames()]
log(f"checking==vg name {drive.vg_name} has existed {exist_vg}")
if exist_vg:
drive.vg_name = drive.vg_name + 'n' + str(random.randint(1,1000))
SysCommand(f"pvcreate -y {' '.join(pvs)}")
SysCommand(f"vgcreate -y {drive.vg_name} {' '.join(pvs)}")
for drive in lvmdrives:
if (vg_name := drive.vg_name) is None:
continue
if len(partitions := drive.partitions) > 0:
for index,item in enumerate(partitions):
print("=======>>>>filesystem:",drive,"=====>>>",item.get("filesystem",{}))
filesystem = item.get("filesystem",{}).get("format","ext4")
lv_name = item.get('name','lv' + str(index))
print("lvcreate====>>",item,"=====",item.get('size',None))
# time.sleep(1000)
SysCommand(f"lvcreate -y -l {item.get('size',None)}VG -n {lv_name} {vg_name}")
lvmP = LvmPartition(f"/dev/{vg_name}/{lv_name}",pvs=drive.pvs,filesystem=filesystem)
lvmP.format()
partitions[index]["path"] = f"/dev/{vg_name}/{lv_name}"
partitions[index]["device_instance"] = lvmP
# partition = {
# "device_instance":lvmP,
# "wipe":False,
# "as_lvm":True,
# "filesystem": {
# "format":filesystem,
# "mount_options":[]
# },
# "mountpoint":item.get('mountpoint',None),
# "path":
# }
# if drive.get("partitions",None) is not None:
# drive["partitions"].append(partition)
# else:
# drive["partitions"] = [partition]
# print("\n=====dddd:",lvmdrives)
return lvmdrives

View File

@ -57,6 +57,11 @@ class LvmPartition:
# def _fetch_information(self, blkinfo) -> PartitionInfo:
# return PartitionInfo()
def init_lvm_partiton(self,path) -> bool:
umount_flag = self.unmount()
format_flag = self.format('ext4',path)
return umount_flag and format_flag
def parted_mklabel(self, disk_label: str) -> bool:
log(f"Creating a new partition label on {self._path}", level=logging.INFO, fg="yellow")
# Try to unmount devices before attempting to run mklabel
@ -107,17 +112,18 @@ class LvmPartition:
except SysCallError as err:
raise err
return True
# def unmount(self) -> bool:
# worker = SysCommand(f"umount {self._path}")
# exit_code = worker.exit_code
# # Without to much research, it seams that low error codes are errors.
# # And above 8k is indicators such as "/dev/x not mounted.".
# # So anything in between 0 and 8k are errors (?).
# if exit_code and 0 < exit_code < 8000:
# raise SysCallError(f"Could not unmount {self._path} properly: {worker}", exit_code=exit_code)
def unmount(self) -> bool:
worker = SysCommand(f"umount {self._path}")
exit_code = worker.exit_code
# return True
# Without to much research, it seams that low error codes are errors.
# And above 8k is indicators such as "/dev/x not mounted.".
# So anything in between 0 and 8k are errors (?).
if exit_code and 0 < exit_code < 8000:
raise SysCallError(f"Could not unmount {self._path} properly: {worker}", exit_code=exit_code)
return True
def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = [], retry :bool = True) -> bool:
"""

View File

@ -5,11 +5,11 @@ from .save_conf import save_config
from .manage_users_conf import ask_for_additional_users
from .backwards_compatible_conf import generic_select, generic_multi_select
from .locale_conf import select_locale_lang, select_locale_enc
from .system_conf import select_kernel, select_harddrives, select_driver, ask_for_bootloader, ask_for_swap
from .system_conf import select_kernel, select_harddrives, select_driver, ask_for_bootloader, ask_for_swap,select_lvmdrives
from .network_conf import ask_to_configure_network
from .partitioning_conf import select_partition, select_encrypted_partitions
from .general_conf import (ask_ntp, ask_for_a_timezone, ask_for_audio_selection, select_language, select_mirror_regions,
select_profile, select_archinstall_language, ask_additional_packages_to_install,
select_additional_repositories, ask_hostname, add_number_of_parrallel_downloads, select_base_image)
from .disk_conf import ask_for_main_filesystem_format, select_individual_blockdevice_usage, select_disk_layout, select_disk
from .disk_conf import ask_for_main_filesystem_format, select_individual_blockdevice_usage, select_disk_layout, select_disk,select_lvm_layout,select_individual_lvmdevice_usage,ask_for_lvm_partition_format
from .utils import get_password, do_countdown

View File

@ -2,399 +2,676 @@ from __future__ import annotations
import copy
from typing import List, Any, Dict, Union, TYPE_CHECKING, Callable, Optional
import time
from ..menu import Menu
from ..menu.menu import MenuSelectionType
from ..output import log, FormattedOutput
from ..disk.validators import fs_types
from ..disk.validators import fs_types,lvm_fs_types
if TYPE_CHECKING:
from ..disk import BlockDevice
from ..disk.partition import Partition
_: Any
from ..disk import BlockDevice
from ..disk.partition import Partition
_: Any
def partition_overlap(partitions: list, start: str, end: str) -> bool:
# TODO: Implement sanity check
return False
# TODO: Implement sanity check
return False
def current_partition_layout(partitions: List[Dict[str, Any]], with_idx: bool = False, with_title: bool = True) -> str:
def do_padding(name: str, max_len: int):
spaces = abs(len(str(name)) - max_len) + 2
pad_left = int(spaces / 2)
pad_right = spaces - pad_left
return f'{pad_right * " "}{name}{pad_left * " "}|'
def do_padding(name: str, max_len: int):
spaces = abs(len(str(name)) - max_len) + 2
pad_left = int(spaces / 2)
pad_right = spaces - pad_left
return f'{pad_right * " "}{name}{pad_left * " "}|'
def flatten_data(data: Dict[str, Any]) -> Dict[str, Any]:
flattened = {}
for k, v in data.items():
if k == 'filesystem':
flat = flatten_data(v)
flattened.update(flat)
elif k == 'btrfs':
# we're going to create a separate table for the btrfs subvolumes
pass
else:
flattened[k] = v
return flattened
def flatten_data(data: Dict[str, Any]) -> Dict[str, Any]:
flattened = {}
for k, v in data.items():
if k == 'filesystem':
flat = flatten_data(v)
flattened.update(flat)
elif k == 'btrfs':
# we're going to create a separate table for the btrfs subvolumes
pass
else:
flattened[k] = v
return flattened
display_data: List[Dict[str, Any]] = [flatten_data(entry) for entry in partitions]
display_data: List[Dict[str, Any]] = [flatten_data(entry) for entry in partitions]
column_names = {}
column_names = {}
# this will add an initial index to the table for each partition
if with_idx:
column_names['index'] = max([len(str(len(display_data))), len('index')])
# this will add an initial index to the table for each partition
if with_idx:
column_names['index'] = max([len(str(len(display_data))), len('index')])
# determine all attribute names and the max length
# of the value among all display_data to know the width
# of the table cells
for p in display_data:
for attribute, value in p.items():
if attribute in column_names.keys():
column_names[attribute] = max([column_names[attribute], len(str(value)), len(attribute)])
else:
column_names[attribute] = max([len(str(value)), len(attribute)])
# determine all attribute names and the max length
# of the value among all display_data to know the width
# of the table cells
for p in display_data:
for attribute, value in p.items():
if attribute in column_names.keys():
column_names[attribute] = max([column_names[attribute], len(str(value)), len(attribute)])
else:
column_names[attribute] = max([len(str(value)), len(attribute)])
current_layout = ''
for name, max_len in column_names.items():
current_layout += do_padding(name, max_len)
current_layout = ''
for name, max_len in column_names.items():
current_layout += do_padding(name, max_len)
current_layout = f'{current_layout[:-1]}\n{"-" * len(current_layout)}\n'
current_layout = f'{current_layout[:-1]}\n{"-" * len(current_layout)}\n'
for idx, p in enumerate(display_data):
row = ''
for name, max_len in column_names.items():
if name == 'index':
row += do_padding(str(idx), max_len)
elif name in p:
row += do_padding(p[name], max_len)
else:
row += ' ' * (max_len + 2) + '|'
for idx, p in enumerate(display_data):
row = ''
for name, max_len in column_names.items():
if name == 'index':
row += do_padding(str(idx), max_len)
elif name in p:
row += do_padding(p[name], max_len)
else:
row += ' ' * (max_len + 2) + '|'
current_layout += f'{row[:-1]}\n'
current_layout += f'{row[:-1]}\n'
# we'll create a separate table for the btrfs subvolumes
btrfs_subvolumes = [partition['btrfs']['subvolumes'] for partition in partitions if partition.get('btrfs', None)]
if len(btrfs_subvolumes) > 0:
for subvolumes in btrfs_subvolumes:
output = FormattedOutput.as_table(subvolumes)
current_layout += f'\n{output}'
# we'll create a separate table for the btrfs subvolumes
btrfs_subvolumes = [partition['btrfs']['subvolumes'] for partition in partitions if partition.get('btrfs', None)]
if len(btrfs_subvolumes) > 0:
for subvolumes in btrfs_subvolumes:
output = FormattedOutput.as_table(subvolumes)
current_layout += f'\n{output}'
if with_title:
title = str(_('Current partition layout'))
return f'\n\n{title}:\n\n{current_layout}'
if with_title:
title = str(_('Current partition layout'))
return f'\n\n{title}:\n\n{current_layout}'
return current_layout
return current_layout
def current_lvm_layout(partitions: List[Dict[str, Any]], with_idx: bool = False, with_title: bool = True) -> str:
def do_padding(name: str, max_len: int):
spaces = abs(len(str(name)) - max_len) + 2
pad_left = int(spaces / 2)
pad_right = spaces - pad_left
return f'{pad_right * " "}{name}{pad_left * " "}|'
def flatten_data(data: Dict[str, Any]) -> Dict[str, Any]:
flattened = {}
for k, v in data.items():
if k == 'filesystem':
flat = flatten_data(v)
flattened.update(flat)
elif k == 'btrfs':
# we're going to create a separate table for the btrfs subvolumes
pass
else:
flattened[k] = v
return flattened
display_data: List[Dict[str, Any]] = [flatten_data(entry) for entry in partitions]
column_names = {}
# this will add an initial index to the table for each partition
if with_idx:
column_names['index'] = max([len(str(len(display_data))), len('index')])
# determine all attribute names and the max length
# of the value among all display_data to know the width
# of the table cells
for p in display_data:
for attribute, value in p.items():
if attribute in column_names.keys():
column_names[attribute] = max([column_names[attribute], len(str(value)), len(attribute)])
else:
column_names[attribute] = max([len(str(value)), len(attribute)])
current_layout = ''
for name, max_len in column_names.items():
current_layout += do_padding(name, max_len)
current_layout = f'{current_layout[:-1]}\n{"-" * len(current_layout)}\n'
for idx, p in enumerate(display_data):
row = ''
for name, max_len in column_names.items():
if name == 'index':
row += do_padding(str(idx), max_len)
elif name in p:
row += do_padding(p[name], max_len)
else:
row += ' ' * (max_len + 2) + '|'
current_layout += f'{row[:-1]}\n'
# we'll create a separate table for the btrfs subvolumes
btrfs_subvolumes = [partition['btrfs']['subvolumes'] for partition in partitions if partition.get('btrfs', None)]
if len(btrfs_subvolumes) > 0:
for subvolumes in btrfs_subvolumes:
output = FormattedOutput.as_table(subvolumes)
current_layout += f'\n{output}'
if with_title:
title = str(_('Current partition layout'))
return f'\n\n{title}:\n\n{current_layout}'
return current_layout
def _get_partitions(partitions :List[Partition], filter_ :Callable = None) -> List[str]:
"""
filter allows to filter out the indexes once they are set. Should return True if element is to be included
"""
partition_indexes = []
for i in range(len(partitions)):
if filter_:
if filter_(partitions[i]):
partition_indexes.append(str(i))
else:
partition_indexes.append(str(i))
"""
filter allows to filter out the indexes once they are set. Should return True if element is to be included
"""
partition_indexes = []
for i in range(len(partitions)):
if filter_:
if filter_(partitions[i]):
partition_indexes.append(str(i))
else:
partition_indexes.append(str(i))
return partition_indexes
return partition_indexes
def select_partition(
title :str,
partitions :List[Partition],
multiple :bool = False,
filter_ :Callable = None
title :str,
partitions :List[Partition],
multiple :bool = False,
filter_ :Callable = None
) -> Optional[int, List[int]]:
partition_indexes = _get_partitions(partitions, filter_)
partition_indexes = _get_partitions(partitions, filter_)
if len(partition_indexes) == 0:
return None
if len(partition_indexes) == 0:
return None
choice = Menu(title, partition_indexes, multi=multiple).run()
choice = Menu(title, partition_indexes, multi=multiple).run()
if choice.type_ == MenuSelectionType.Esc:
return None
if choice.type_ == MenuSelectionType.Esc:
return None
if isinstance(choice.value, list):
return [int(p) for p in choice.value]
else:
return int(choice.value)
if isinstance(choice.value, list):
return [int(p) for p in choice.value]
else:
return int(choice.value)
def get_default_partition_layout(
block_devices: Union['BlockDevice', List['BlockDevice']],
advanced_options: bool = False
block_devices: Union['BlockDevice', List['BlockDevice']],
advanced_options: bool = False
) -> Optional[Dict[str, Any]]:
from ..disk import suggest_single_disk_layout, suggest_multi_disk_layout
from ..disk import suggest_single_disk_layout, suggest_multi_disk_layout
if len(block_devices) == 1:
return suggest_single_disk_layout(block_devices[0], advanced_options=advanced_options)
else:
return suggest_multi_disk_layout(block_devices, advanced_options=advanced_options)
if len(block_devices) == 1:
return suggest_single_disk_layout(block_devices[0], advanced_options=advanced_options)
else:
return suggest_multi_disk_layout(block_devices, advanced_options=advanced_options)
def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, Any]: # noqa: max-complexity: 50
block_device_struct = {"partitions": [partition.__dump__() for partition in block_device.partitions.values()]}
original_layout = copy.deepcopy(block_device_struct)
block_device_struct = {"partitions": [partition.__dump__() for partition in block_device.partitions.values()]}
vg_name_orig = 'vg0'
block_device_struct['device'] = {
"as_lvm": False,
"is_lvm": block_device.info.get("TYPE",None) == "LVM2_member",
"has_lvm":False,
"vg_size":block_device.size,
"partitions":[]
}
for part in block_device_struct["partitions"]:
print("part==========>>>>",part)
part['device'] = {
"as_lvm": False,
"is_lvm": False,
"has_lvm":False,
"vg_size":part['size'],
"partitions":[]
}
original_layout = copy.deepcopy(block_device_struct)
new_partition = str(_('Create a new partition'))
suggest_partition_layout = str(_('Suggest partition layout'))
delete_partition = str(_('Delete a partition'))
delete_all_partitions = str(_('Clear/Delete all partitions'))
assign_mount_point = str(_('Assign mount-point for a partition'))
mark_formatted = str(_('Mark/Unmark a partition to be formatted (wipes data)'))
mark_encrypted = str(_('Mark/Unmark a partition as encrypted'))
mark_compressed = str(_('Mark/Unmark a partition as compressed (btrfs only)'))
mark_bootable = str(_('Mark/Unmark a partition as bootable (automatic for /boot)'))
set_filesystem_partition = str(_('Set desired filesystem for a partition'))
set_btrfs_subvolumes = str(_('Set desired subvolumes on a btrfs partition'))
save_and_exit = str(_('Save and exit'))
cancel = str(_('Cancel'))
new_partition = str(_('Create a new partition'))
suggest_partition_layout = str(_('Suggest partition layout'))
delete_partition = str(_('Delete a partition'))
delete_all_partitions = str(_('Clear/Delete all partitions'))
assign_mount_point = str(_('Assign mount-point for a partition'))
mark_formatted = str(_('Mark/Unmark a partition to be formatted (wipes data)'))
mark_encrypted = str(_('Mark/Unmark a partition as encrypted'))
mark_compressed = str(_('Mark/Unmark a partition as compressed (btrfs only)'))
mark_bootable = str(_('Mark/Unmark a partition as bootable (automatic for /boot)'))
set_filesystem_partition = str(_('Set desired filesystem for a partition'))
set_btrfs_subvolumes = str(_('Set desired subvolumes on a btrfs partition'))
mark_full_disk_as_lvm = str(_('Mark/Unmark full disk as lvm partition'))
mark_partition_disk_as_lvm = str(_('Mark/Unmark a partition as lvm partition'))
save_and_exit = str(_('Save and exit'))
cancel = str(_('Cancel'))
while True:
modes = [new_partition, suggest_partition_layout]
while True:
modes = [new_partition, suggest_partition_layout,mark_full_disk_as_lvm]
if len(block_device_struct['partitions']) > 0:
modes += [
delete_partition,
delete_all_partitions,
assign_mount_point,
mark_formatted,
mark_encrypted,
mark_bootable,
mark_compressed,
set_filesystem_partition,
]
if len(block_device_struct['partitions']) > 0:
modes.remove(mark_full_disk_as_lvm)
modes += [
delete_partition,
delete_all_partitions,
assign_mount_point,
mark_formatted,
mark_encrypted,
mark_bootable,
mark_compressed,
set_filesystem_partition,
mark_partition_disk_as_lvm
]
indexes = _get_partitions(
block_device_struct["partitions"],
filter_=lambda x: True if x.get('filesystem', {}).get('format') == 'btrfs' else False
)
indexes = _get_partitions(
block_device_struct["partitions"],
filter_=lambda x: True if x.get('filesystem', {}).get('format') == 'btrfs' else False
)
if len(indexes) > 0:
modes += [set_btrfs_subvolumes]
if len(indexes) > 0:
modes += [set_btrfs_subvolumes]
title = _('Select what to do with\n{}').format(block_device)
title = _('Select what to do with\n{}').format(block_device)
# show current partition layout:
if len(block_device_struct["partitions"]):
title += current_partition_layout(block_device_struct['partitions']) + '\n'
# show current partition layout:
if len(block_device_struct["partitions"]):
title += current_partition_layout(block_device_struct['partitions']) + '\n'
modes += [save_and_exit, cancel]
modes += [save_and_exit, cancel]
task = Menu(title, modes, sort=False, skip=False).run()
task = task.value
task = Menu(title, modes, sort=False, skip=False).run()
task = task.value
if task == cancel:
return original_layout
elif task == save_and_exit:
break
if task == cancel:
return original_layout
elif task == save_and_exit:
break
if task == new_partition:
from ..disk import valid_parted_position
if task == new_partition:
from ..disk import valid_parted_position
# if partition_type == 'gpt':
# # https://www.gnu.org/software/parted/manual/html_node/mkpart.html
# # https://www.gnu.org/software/parted/manual/html_node/mklabel.html
# name = input("Enter a desired name for the partition: ").strip()
# if partition_type == 'gpt':
# # https://www.gnu.org/software/parted/manual/html_node/mkpart.html
# # https://www.gnu.org/software/parted/manual/html_node/mklabel.html
# name = input("Enter a desired name for the partition: ").strip()
fs_choice = Menu(_('Enter a desired filesystem type for the partition'), fs_types()).run()
fs_choice = Menu(_('Enter a desired filesystem type for the partition'), fs_types()).run()
if fs_choice.type_ == MenuSelectionType.Esc:
continue
if fs_choice.type_ == MenuSelectionType.Esc:
continue
prompt = str(_('Enter the start sector (percentage or block number, default: {}): ')).format(
block_device.first_free_sector
)
start = input(prompt).strip()
prompt = str(_('Enter the start sector (percentage or block number, default: {}): ')).format(
block_device.first_free_sector
)
start = input(prompt).strip()
if not start.strip():
start = block_device.first_free_sector
end_suggested = block_device.first_end_sector
else:
end_suggested = '100%'
if not start.strip():
start = block_device.first_free_sector
end_suggested = block_device.first_end_sector
else:
end_suggested = '100%'
prompt = str(_('Enter the end sector of the partition (percentage or block number, ex: {}): ')).format(
end_suggested
)
end = input(prompt).strip()
prompt = str(_('Enter the end sector of the partition (percentage or block number, ex: {}): ')).format(
end_suggested
)
end = input(prompt).strip()
if not end.strip():
end = end_suggested
if not end.strip():
end = end_suggested
if valid_parted_position(start) and valid_parted_position(end):
if partition_overlap(block_device_struct["partitions"], start, end):
log(f"This partition overlaps with other partitions on the drive! Ignoring this partition creation.",
fg="red")
continue
if valid_parted_position(start) and valid_parted_position(end):
if partition_overlap(block_device_struct["partitions"], start, end):
log(f"This partition overlaps with other partitions on the drive! Ignoring this partition creation.",
fg="red")
continue
block_device_struct["partitions"].append({
"type": "primary", # Strictly only allowed under MS-DOS, but GPT accepts it so it's "safe" to inject
"start": start,
"size": end,
"mountpoint": None,
"wipe": True,
"filesystem": {
"format": fs_choice.value
}
})
else:
log(f"Invalid start ({valid_parted_position(start)}) or end ({valid_parted_position(end)}) for this partition. Ignoring this partition creation.",
fg="red")
continue
elif task == suggest_partition_layout:
from ..disk import suggest_single_disk_layout
block_device_struct["partitions"].append({
"type": "primary", # Strictly only allowed under MS-DOS, but GPT accepts it so it's "safe" to inject
"start": start,
"size": end,
"mountpoint": None,
"wipe": True,
"filesystem": {
"format": fs_choice.value
}
})
else:
log(f"Invalid start ({valid_parted_position(start)}) or end ({valid_parted_position(end)}) for this partition. Ignoring this partition creation.",
fg="red")
continue
elif task == suggest_partition_layout:
from ..disk import suggest_single_disk_layout
if len(block_device_struct["partitions"]):
prompt = _('{}\ncontains queued partitions, this will remove those, are you sure?').format(block_device)
choice = Menu(prompt, Menu.yes_no(), default_option=Menu.no(), skip=False).run()
if len(block_device_struct["partitions"]):
prompt = _('{}\ncontains queued partitions, this will remove those, are you sure?').format(block_device)
choice = Menu(prompt, Menu.yes_no(), default_option=Menu.no(), skip=False).run()
if choice.value == Menu.no():
continue
if choice.value == Menu.no():
continue
block_device_struct.update(suggest_single_disk_layout(block_device)[block_device.path])
else:
current_layout = current_partition_layout(block_device_struct['partitions'], with_idx=True)
block_device_struct.update(suggest_single_disk_layout(block_device,lvm_options=False)[block_device.path])
elif task == mark_full_disk_as_lvm:
prompt = str(_('Enter lvm partition vg name (default: {}): ')).format(
vg_name_orig
)
vg_name_input = input(prompt).strip()
if not vg_name_input.strip():
vg_name_input = vg_name_orig
block_device_struct['device']['as_lvm'] = not block_device_struct['device']['as_lvm']
if task == delete_partition:
title = _('{}\n\nSelect by index which partitions to delete').format(current_layout)
to_delete = select_partition(title, block_device_struct["partitions"], multiple=True)
if block_device_struct['device']['as_lvm']:
block_device_struct['device']['vg_name'] = vg_name_input
else:
block_device_struct['device']['vg_name'] = None
else:
current_layout = current_partition_layout(block_device_struct['partitions'], with_idx=True)
if task == delete_partition:
title = _('{}\n\nSelect by index which partitions to delete').format(current_layout)
to_delete = select_partition(title, block_device_struct["partitions"], multiple=True)
if to_delete:
block_device_struct['partitions'] = [
p for idx, p in enumerate(block_device_struct['partitions']) if idx not in to_delete
]
elif task == mark_compressed:
title = _('{}\n\nSelect which partition to mark as bootable').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"])
if to_delete:
block_device_struct['partitions'] = [
p for idx, p in enumerate(block_device_struct['partitions']) if idx not in to_delete
]
elif task == mark_partition_disk_as_lvm:
title = _('{}\n\nSelect which partition to mark as lvm partiton').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"])
if partition is not None:
# ask vg name
block_device_struct["partitions"][partition]['device']['as_lvm'] = not block_device_struct["partitions"][partition]['device']['as_lvm']
prompt = str(_('Enter lvm partition vg name (default: {}): ')).format(
vg_name_orig
)
vg_name_input = input(prompt).strip()
if not vg_name_input.strip():
vg_name_input = vg_name_orig
block_device_struct["partitions"][partition]['device']['as_lvm'] = not block_device_struct['device']['as_lvm']
block_device_struct['device']['has_lvm'] = block_device_struct["partitions"][partition]['device']['as_lvm']
if partition is not None:
if "filesystem" not in block_device_struct["partitions"][partition]:
block_device_struct["partitions"][partition]["filesystem"] = {}
if "mount_options" not in block_device_struct["partitions"][partition]["filesystem"]:
block_device_struct["partitions"][partition]["filesystem"]["mount_options"] = []
if block_device_struct["partitions"][partition]['device']['as_lvm']:
block_device_struct["partitions"][partition]['device']['vg_name'] = vg_name_input
else:
block_device_struct["partitions"][partition]['device']['vg_name'] = None
if "compress=zstd" not in block_device_struct["partitions"][partition]["filesystem"]["mount_options"]:
block_device_struct["partitions"][partition]["filesystem"]["mount_options"].append("compress=zstd")
elif task == delete_all_partitions:
block_device_struct["partitions"] = []
block_device_struct["wipe"] = True
elif task == assign_mount_point:
title = _('{}\n\nSelect by index which partition to mount where').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"])
elif task == mark_compressed:
title = _('{}\n\nSelect which partition to mark as bootable').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"])
if partition is not None:
print(_(' * Partition mount-points are relative to inside the installation, the boot would be /boot as an example.'))
mountpoint = input(_('Select where to mount partition (leave blank to remove mountpoint): ')).strip()
if partition is not None:
if "filesystem" not in block_device_struct["partitions"][partition]:
block_device_struct["partitions"][partition]["filesystem"] = {}
if "mount_options" not in block_device_struct["partitions"][partition]["filesystem"]:
block_device_struct["partitions"][partition]["filesystem"]["mount_options"] = []
if len(mountpoint):
block_device_struct["partitions"][partition]['mountpoint'] = mountpoint
if mountpoint == '/boot':
log(f"Marked partition as bootable because mountpoint was set to /boot.", fg="yellow")
block_device_struct["partitions"][partition]['boot'] = True
else:
del (block_device_struct["partitions"][partition]['mountpoint'])
if "compress=zstd" not in block_device_struct["partitions"][partition]["filesystem"]["mount_options"]:
block_device_struct["partitions"][partition]["filesystem"]["mount_options"].append("compress=zstd")
elif task == delete_all_partitions:
block_device_struct["partitions"] = []
block_device_struct["wipe"] = True
elif task == assign_mount_point:
title = _('{}\n\nSelect by index which partition to mount where').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"])
elif task == mark_formatted:
title = _('{}\n\nSelect which partition to mask for formatting').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"])
if partition is not None:
print(_(' * Partition mount-points are relative to inside the installation, the boot would be /boot as an example.'))
mountpoint = input(_('Select where to mount partition (leave blank to remove mountpoint): ')).strip()
if partition is not None:
# If we mark a partition for formatting, but the format is CRYPTO LUKS, there's no point in formatting it really
# without asking the user which inner-filesystem they want to use. Since the flag 'encrypted' = True is already set,
# it's safe to change the filesystem for this partition.
if block_device_struct["partitions"][partition].get('filesystem',{}).get('format', 'crypto_LUKS') == 'crypto_LUKS':
if not block_device_struct["partitions"][partition].get('filesystem', None):
block_device_struct["partitions"][partition]['filesystem'] = {}
if len(mountpoint):
block_device_struct["partitions"][partition]['mountpoint'] = mountpoint
if mountpoint == '/boot':
log(f"Marked partition as bootable because mountpoint was set to /boot.", fg="yellow")
block_device_struct["partitions"][partition]['boot'] = True
else:
del (block_device_struct["partitions"][partition]['mountpoint'])
fs_choice = Menu(_('Enter a desired filesystem type for the partition'), fs_types()).run()
elif task == mark_formatted:
title = _('{}\n\nSelect which partition to mask for formatting').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"])
if fs_choice.type_ == MenuSelectionType.Selection:
block_device_struct["partitions"][partition]['filesystem']['format'] = fs_choice.value
if partition is not None:
# If we mark a partition for formatting, but the format is CRYPTO LUKS, there's no point in formatting it really
# without asking the user which inner-filesystem they want to use. Since the flag 'encrypted' = True is already set,
# it's safe to change the filesystem for this partition.
if block_device_struct["partitions"][partition].get('filesystem',{}).get('format', 'crypto_LUKS') == 'crypto_LUKS':
if not block_device_struct["partitions"][partition].get('filesystem', None):
block_device_struct["partitions"][partition]['filesystem'] = {}
# Negate the current wipe marking
block_device_struct["partitions"][partition]['wipe'] = not block_device_struct["partitions"][partition].get('wipe', False)
fs_choice = Menu(_('Enter a desired filesystem type for the partition'), fs_types()).run()
elif task == mark_encrypted:
title = _('{}\n\nSelect which partition to mark as encrypted').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"])
if fs_choice.type_ == MenuSelectionType.Selection:
block_device_struct["partitions"][partition]['filesystem']['format'] = fs_choice.value
if partition is not None:
# Negate the current encryption marking
block_device_struct["partitions"][partition]['encrypted'] = \
not block_device_struct["partitions"][partition].get('encrypted', False)
# Negate the current wipe marking
block_device_struct["partitions"][partition]['wipe'] = not block_device_struct["partitions"][partition].get('wipe', False)
elif task == mark_bootable:
title = _('{}\n\nSelect which partition to mark as bootable').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"])
elif task == mark_encrypted:
title = _('{}\n\nSelect which partition to mark as encrypted').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"])
if partition is not None:
block_device_struct["partitions"][partition]['boot'] = \
not block_device_struct["partitions"][partition].get('boot', False)
if partition is not None:
# Negate the current encryption marking
block_device_struct["partitions"][partition]['encrypted'] = \
not block_device_struct["partitions"][partition].get('encrypted', False)
elif task == set_filesystem_partition:
title = _('{}\n\nSelect which partition to set a filesystem on').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"])
elif task == mark_bootable:
title = _('{}\n\nSelect which partition to mark as bootable').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"])
if partition is not None:
if not block_device_struct["partitions"][partition].get('filesystem', None):
block_device_struct["partitions"][partition]['filesystem'] = {}
if partition is not None:
block_device_struct["partitions"][partition]['boot'] = \
not block_device_struct["partitions"][partition].get('boot', False)
fstype_title = _('Enter a desired filesystem type for the partition: ')
fs_choice = Menu(fstype_title, fs_types()).run()
elif task == set_filesystem_partition:
title = _('{}\n\nSelect which partition to set a filesystem on').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"])
if fs_choice.type_ == MenuSelectionType.Selection:
block_device_struct["partitions"][partition]['filesystem']['format'] = fs_choice.value
if partition is not None:
if not block_device_struct["partitions"][partition].get('filesystem', None):
block_device_struct["partitions"][partition]['filesystem'] = {}
elif task == set_btrfs_subvolumes:
from .subvolume_config import SubvolumeList
fstype_title = _('Enter a desired filesystem type for the partition: ')
fs_choice = Menu(fstype_title, fs_types()).run()
# TODO get preexisting partitions
title = _('{}\n\nSelect which partition to set subvolumes on').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"],filter_=lambda x:True if x.get('filesystem',{}).get('format') == 'btrfs' else False)
if fs_choice.type_ == MenuSelectionType.Selection:
block_device_struct["partitions"][partition]['filesystem']['format'] = fs_choice.value
if partition is not None:
if not block_device_struct["partitions"][partition].get('btrfs', {}):
block_device_struct["partitions"][partition]['btrfs'] = {}
if not block_device_struct["partitions"][partition]['btrfs'].get('subvolumes', []):
block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = []
elif task == set_btrfs_subvolumes:
from .subvolume_config import SubvolumeList
prev = block_device_struct["partitions"][partition]['btrfs']['subvolumes']
result = SubvolumeList(_("Manage btrfs subvolumes for current partition"), prev).run()
block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = result
# TODO get preexisting partitions
title = _('{}\n\nSelect which partition to set subvolumes on').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"],filter_=lambda x:True if x.get('filesystem',{}).get('format') == 'btrfs' else False)
return block_device_struct
if partition is not None:
if not block_device_struct["partitions"][partition].get('btrfs', {}):
block_device_struct["partitions"][partition]['btrfs'] = {}
if not block_device_struct["partitions"][partition]['btrfs'].get('subvolumes', []):
block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = []
prev = block_device_struct["partitions"][partition]['btrfs']['subvolumes']
result = SubvolumeList(_("Manage btrfs subvolumes for current partition"), prev).run()
block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = result
return block_device_struct
def manage_new_and_existing_lvm_partitions(block_device: Dict[str,Any]) -> Dict[str, Any]: # noqa: max-complexity: 50
block_device_struct = {"partitions": [partition for partition in block_device.partitions]}
# block_device_struct = {"partitions": block_device.get("partitions",[])}
original_layout = copy.deepcopy(block_device_struct)
new_partition = str(_('Create a new partition'))
# suggest_partition_layout = str(_('Suggest partition layout'))
delete_partition = str(_('Delete a partition'))
delete_all_partitions = str(_('Clear/Delete all partitions'))
assign_mount_point = str(_('Assign mount-point for a partition'))
mark_formatted = str(_('Mark/Unmark a partition to be formatted (wipes data)'))
# mark_encrypted = str(_('Mark/Unmark a partition as encrypted'))
# mark_compressed = str(_('Mark/Unmark a partition as compressed (btrfs only)'))
# mark_bootable = str(_('Mark/Unmark a partition as bootable (automatic for /boot)'))
set_filesystem_partition = str(_('Set desired filesystem for a partition'))
# set_btrfs_subvolumes = str(_('Set desired subvolumes on a btrfs partition'))
save_and_exit = str(_('Save and exit'))
cancel = str(_('Cancel'))
while True:
modes = [new_partition]
if len(block_device_struct['partitions']) > 0:
modes += [
delete_partition,
delete_all_partitions,
assign_mount_point,
set_filesystem_partition
# mark_formatted,
]
# indexes = _get_partitions(
# block_device_struct["partitions"],
# filter_=lambda x: True if x.get('filesystem', {}).get('format') == 'btrfs' else False
# )
# if len(indexes) > 0:
# modes += [set_btrfs_subvolumes]
title = _('Select what to do with\n{}').format(block_device)
# show current partition layout:
if len(block_device_struct["partitions"]):
title += current_partition_layout(block_device_struct['partitions']) + '\n'
modes += [save_and_exit, cancel]
task = Menu(title, modes, sort=False, skip=False).run()
task = task.value
if task == cancel:
return original_layout
elif task == save_and_exit:
break
if task == new_partition:
from ..disk import valid_parted_position
# if partition_type == 'gpt':
# # https://www.gnu.org/software/parted/manual/html_node/mkpart.html
# # https://www.gnu.org/software/parted/manual/html_node/mklabel.html
# name = input("Enter a desired name for the partition: ").strip()
fs_choice = Menu(_('Enter a desired filesystem type for the partition'), lvm_fs_types()).run()
if fs_choice.type_ == MenuSelectionType.Esc:
continue
prompt = str(_('Enter the start sector (percentage or block number, default: {}): ')).format(
0
)
start = input(prompt).strip()
if not start.strip():
start = '0'
end_suggested = '100%'
else:
end_suggested = '100%'
prompt = str(_('Enter the end sector of the partition (percentage or block number, ex: {}): ')).format(
end_suggested
)
end = input(prompt).strip()
if not end.strip():
end = end_suggested
if valid_parted_position(start) and valid_parted_position(end):
if partition_overlap(block_device_struct["partitions"], start, end):
log(f"This partition overlaps with other partitions on the drive! Ignoring this partition creation.",
fg="red")
continue
block_device_struct["partitions"].append({
# "type": "primary", # Strictly only allowed under MS-DOS, but GPT accepts it so it's "safe" to inject
"start": start,
"size": end,
"mountpoint": None,
"wipe": True,
"filesystem": {
"format": fs_choice.value
}
})
else:
log(f"Invalid start ({valid_parted_position(start)}) or end ({valid_parted_position(end)}) for this partition. Ignoring this partition creation.",
fg="red")
continue
else:
current_layout = current_partition_layout(block_device_struct['partitions'], with_idx=True)
if task == delete_partition:
title = _('{}\n\nSelect by index which partitions to delete').format(current_layout)
to_delete = select_partition(title, block_device_struct["partitions"], multiple=True)
if to_delete:
block_device_struct['partitions'] = [
p for idx, p in enumerate(block_device_struct['partitions']) if idx not in to_delete
]
elif task == delete_all_partitions:
block_device_struct["partitions"] = []
block_device_struct["wipe"] = True
elif task == set_filesystem_partition:
title = _('{}\n\nSelect which partition to set a filesystem on').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"])
if partition is not None:
if not block_device_struct["partitions"][partition].get('filesystem', None):
block_device_struct["partitions"][partition]['filesystem'] = {}
fstype_title = _('Enter a desired filesystem type for the partition: ')
fs_choice = Menu(fstype_title, lvm_fs_types()).run()
if fs_choice.type_ == MenuSelectionType.Selection:
block_device_struct["partitions"][partition]['filesystem']['format'] = fs_choice.value
elif task == assign_mount_point:
title = _('{}\n\nSelect by index which partition to mount where').format(current_layout)
partition = select_partition(title, block_device_struct["partitions"])
if partition is not None:
print(_(' * Partition mount-points are relative to inside the installation, the boot would be /boot as an example.'))
mountpoint = input(_('Select where to mount partition (leave blank to remove mountpoint): ')).strip()
if len(mountpoint) and mountpoint != '/boot' and mountpoint != '/boot/efi':
block_device_struct["partitions"][partition]['mountpoint'] = mountpoint
else:
print(_(' * LVM Partition mount-points can not be /boot or /boot/efi.'))
del (block_device_struct["partitions"][partition]['mountpoint'])
return block_device_struct
def select_encrypted_partitions(
title :str,
partitions :List[Partition],
multiple :bool = True,
filter_ :Callable = None
title :str,
partitions :List[Partition],
multiple :bool = True,
filter_ :Callable = None
) -> Optional[int, List[int]]:
partition_indexes = _get_partitions(partitions, filter_)
partition_indexes = _get_partitions(partitions, filter_)
if len(partition_indexes) == 0:
return None
if len(partition_indexes) == 0:
return None
# show current partition layout:
if len(partitions):
title += current_partition_layout(partitions, with_idx=True) + '\n'
# show current partition layout:
if len(partitions):
title += current_partition_layout(partitions, with_idx=True) + '\n'
choice = Menu(title, partition_indexes, multi=multiple).run()
choice = Menu(title, partition_indexes, multi=multiple).run()
if choice.type_ == MenuSelectionType.Esc:
return None
if choice.type_ == MenuSelectionType.Esc:
return None
if isinstance(choice.value, list):
for partition_index in choice.value:
yield int(partition_index)
else:
yield (partition_index)
if isinstance(choice.value, list):
for partition_index in choice.value:
yield int(partition_index)
else:
yield (partition_index)

View File

@ -1,2 +1,8 @@
#/sbin/bin
# ssh-keygen -t rsa -f /.ssh/id_rsa -P ""
# ssh-copy-id ubuntu@10.10.20.149
scp -r ubuntu@10.10.20.149:/data/project/os-installer/archinstall /

View File

@ -4,7 +4,11 @@ local-install nic
local-install vim.tiny
hw-detect
local-install ssh
ip link set $1 up
ip addr add $2 dev $1
ip route add default via $3 dev $1
eth0=`ip a |grep mtu|grep -v LOOP|grep 'ens[0-9]*' -o`
echo $eth0
echo "ip link set $eth0 up"
echo "ip addr add 10.10.20.$1/24 dev $eth0"
echo "ip route add default via 10.10.20.201 dev $eth0"
ip link set $eth0 up
ip addr add 10.10.20.$1/24 dev $eth0
ip route add default via 10.10.20.201 dev $eth0