chore(ops): move archivist machine to ops and contrib
contrib/ gets the clickhouse patching, the bucket log parsing code and the awscli setup and shell. ops/ gets the machine config itself. Change-Id: If8b8f8cce5ca9c2b4d19e17be9a8b895ac35e84a Reviewed-on: https://cl.snix.dev/c/snix/+/30163 Autosubmit: Florian Klink <flokli@flokli.de> Tested-by: besadii Reviewed-by: Ryan Lahfa <masterancpp@gmail.com>
This commit is contained in:
parent
c3de9e21eb
commit
ae4d967288
14 changed files with 21 additions and 40 deletions
|
|
@ -1,6 +1,8 @@
|
|||
{ depot, ... }:
|
||||
|
||||
(with depot.ops.machines; [
|
||||
# Archivist EC2 machine
|
||||
archivist-ec2
|
||||
# Gerrit instance
|
||||
gerrit01
|
||||
# Public-facing services
|
||||
|
|
|
|||
1
ops/machines/archivist-ec2/OWNERS
Normal file
1
ops/machines/archivist-ec2/OWNERS
Normal file
|
|
@ -0,0 +1 @@
|
|||
edef
|
||||
41
ops/machines/archivist-ec2/default.nix
Normal file
41
ops/machines/archivist-ec2/default.nix
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
{ depot, pkgs, ... }: # readTree options
|
||||
{ modulesPath, ... }: # passed by module system
|
||||
|
||||
let
|
||||
mod = name: depot.path.origSrc + ("/ops/modules/" + name);
|
||||
in
|
||||
{
|
||||
imports = [
|
||||
"${modulesPath}/virtualisation/amazon-image.nix"
|
||||
(mod "archivist.nix")
|
||||
];
|
||||
|
||||
nixpkgs.hostPlatform = "x86_64-linux";
|
||||
|
||||
systemd.timers.parse-bucket-logs = {
|
||||
wantedBy = [ "multi-user.target" ];
|
||||
timerConfig.OnCalendar = "*-*-* 03:00:00 UTC";
|
||||
};
|
||||
|
||||
systemd.services.parse-bucket-logs = {
|
||||
path = [ depot.contrib.archivist.parse-bucket-logs ];
|
||||
serviceConfig = {
|
||||
Type = "oneshot";
|
||||
ExecStart = (pkgs.writers.writePython3 "parse-bucket-logs-continuously"
|
||||
{
|
||||
libraries = [ pkgs.python3Packages.boto3 ];
|
||||
} ./parse-bucket-logs-continuously.py);
|
||||
DynamicUser = "yes";
|
||||
StateDirectory = "parse-bucket-logs";
|
||||
};
|
||||
};
|
||||
|
||||
environment.systemPackages = [
|
||||
depot.contrib.archivist.parse-bucket-logs
|
||||
];
|
||||
|
||||
networking.hostName = "archivist-ec2";
|
||||
|
||||
system.stateVersion = "23.05"; # Did you read the comment?
|
||||
}
|
||||
|
||||
36
ops/machines/archivist-ec2/hardware-configuration.nix
Normal file
36
ops/machines/archivist-ec2/hardware-configuration.nix
Normal file
|
|
@ -0,0 +1,36 @@
|
|||
{ lib, modulesPath, ... }:
|
||||
|
||||
{
|
||||
imports =
|
||||
[
|
||||
(modulesPath + "/profiles/qemu-guest.nix")
|
||||
];
|
||||
|
||||
boot.initrd.availableKernelModules = [ "ahci" "xhci_pci" "virtio_pci" "sr_mod" "virtio_blk" ];
|
||||
boot.initrd.kernelModules = [ ];
|
||||
boot.kernelModules = [ "kvm-amd" ];
|
||||
boot.extraModulePackages = [ ];
|
||||
|
||||
fileSystems."/" =
|
||||
{
|
||||
device = "/dev/disk/by-partlabel/root";
|
||||
fsType = "xfs";
|
||||
};
|
||||
|
||||
fileSystems."/boot" =
|
||||
{
|
||||
device = "/dev/disk/by-partlabel/boot";
|
||||
fsType = "vfat";
|
||||
};
|
||||
|
||||
swapDevices = [ ];
|
||||
|
||||
# Enables DHCP on each ethernet and wireless interface. In case of scripted networking
|
||||
# (the default) this is the recommended approach. When using systemd-networkd it's
|
||||
# still possible to use this option, but it's recommended to use it in conjunction
|
||||
# with explicit per-interface declarations with `networking.interfaces.<interface>.useDHCP`.
|
||||
networking.useDHCP = lib.mkDefault true;
|
||||
# networking.interfaces.enp1s0.useDHCP = lib.mkDefault true;
|
||||
|
||||
nixpkgs.hostPlatform = lib.mkDefault "x86_64-linux";
|
||||
}
|
||||
62
ops/machines/archivist-ec2/parse-bucket-logs-continuously.py
Normal file
62
ops/machines/archivist-ec2/parse-bucket-logs-continuously.py
Normal file
|
|
@ -0,0 +1,62 @@
|
|||
import boto3
|
||||
import datetime
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
||||
s3 = boto3.resource('s3')
|
||||
bucket_name = "nix-archeologist"
|
||||
prefix = "nix-cache-bucket-logs/"
|
||||
|
||||
bucket = s3.Bucket(bucket_name)
|
||||
|
||||
key_pattern = re.compile(r'.*\/(?P<y>\d{4})-(?P<m>\d{2})-(?P<d>\d{2})\.parquet$') # noqa: E501
|
||||
|
||||
# get a listing (which is sorted), grab the most recent key
|
||||
last_elem = list(
|
||||
o for o in bucket.objects.filter(Prefix=prefix)
|
||||
if key_pattern.match(o.key)
|
||||
).pop()
|
||||
|
||||
# extract the date of that key.
|
||||
m = key_pattern.search(last_elem.key)
|
||||
last_elem_date = datetime.date(int(m.group("y")), int(m.group("m")), int(m.group("d"))) # noqa: E501
|
||||
|
||||
# get the current date (UTC)
|
||||
now = datetime.datetime.now(tz=datetime.UTC)
|
||||
now_date = datetime.date(now.year, now.month, now.day)
|
||||
|
||||
while True:
|
||||
# Calculate what date would be processed next.
|
||||
next_elem_date = last_elem_date + datetime.timedelta(days=1)
|
||||
|
||||
# If that's today, we don't want to process it.
|
||||
if next_elem_date == now_date:
|
||||
print("Caught up, would process data from today.")
|
||||
break
|
||||
|
||||
# If we'd be processing data from yesterday, but it's right after midnight,
|
||||
# also don't process - data might still be flushed.
|
||||
if (next_elem_date + datetime.timedelta(days=1) == now_date) and now.hour == 0: # noqa: E501
|
||||
print("Not processing data from previous day right after midnight")
|
||||
break
|
||||
|
||||
src = f"http://nix-cache-log.s3.amazonaws.com/log/{next_elem_date.isoformat()}-*" # noqa: E501
|
||||
|
||||
# Invoke parse-bucket-logs script inside a tempdir and upload on success.
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
work_file_name = os.path.join(td, "output.parquet")
|
||||
args = ["archivist-parse-bucket-logs", src, work_file_name]
|
||||
subprocess.run(
|
||||
args,
|
||||
check=True # throw exception if nonzero exit code
|
||||
)
|
||||
|
||||
dest_key = f"{prefix}{next_elem_date.isoformat()}.parquet"
|
||||
|
||||
# Upload the file
|
||||
print(f"uploading to s3://{bucket_name}{dest_key}")
|
||||
bucket.upload_file(work_file_name, dest_key)
|
||||
|
||||
last_elem_date = next_elem_date
|
||||
Loading…
Add table
Add a link
Reference in a new issue