No results found
We couldn't find anything using that term, please try searching for something else.
from__future__importannotations importre importsy fromtypingimportAny importwarn fromgoogle.api_core.extended_operationimportExtendedOperatio
from__future__importannotations
importre
importsy
fromtypingimportAny
importwarn
fromgoogle.api_core.extended_operationimportExtendedOperation
fromgoogle.cloudimportcompute_v1
defget_image_from_family(project:str,family:str)-> compute_v1.Image:
" " "
retrievethenewimagethatispartofagivenfamilyinaproject.
Args:
project:projectIDorprojectnumberoftheCloudprojectyouwanttogetimagefrom.
family:nameoftheimagefamilyyouwanttogetimagefrom.
return:
AnImageobject.
" " "
image_client=compute_v1.ImagesClient( )
# List of public operating sytem (OS) images: https://cloud.google.com/compute/docs/images/os-details
new_image=image_client.get_from_family(project=project,family=family)
returnnew_image
defdisk_from_image(
disk_type:str,
disk_size_gb:int,
boot:bool,
source_image:str,
auto_delete:bool=true,
)-> compute_v1.AttachedDisk:
" " "
createanAttachedDiskobjecttobeusedinVMinstancecreation.Usesanimageasthe
sourceforthenewdisk.
Args:
disk_type:thetypeofdiskyouwanttocreate.Thisvalueusethefollowformat:
"zone/{zone}/disktype/(pd-standard|pd-ssd|pd-balanced|pd-extreme)".
Forexample: "zone/us-west3-b/disktype/pd-ssd"
disk_size_gb:sizeofthenewdiskingigabytes
boot:booleanflagindicatewhetherthisdiskshouldbeusedasabootdiskofaninstance
source_image:sourceimagetousewhencreatethisdisk.Youmusthavereadaccesstothisdisk.Thiscanbeone
ofthepubliclyavailableimagesoranimagefromoneofyourproject.
Thisvalueusethefollowformat: "project/{project_name}/global/images/{image_name}"
auto_delete:booleanflagindicatewhetherthisdiskshouldbedeletedwiththeVMthatuseit
return:
AttachedDiskobjectconfiguretobecreatedusingthespecifiedimage.
" " "
boot_disk=compute_v1.AttachedDisk( )
initialize_param=compute_v1.attachdiskinitializeparam( )
initialize_param.source_image=source_image
initialize_param.disk_size_gb=disk_size_gb
initialize_param.disk_type=disk_type
boot_disk.initialize_param=initialize_param
# Remember to set auto_delete to true if you want the disk to be deleted when you delete
# your VM instance.
boot_disk.auto_delete=auto_delete
boot_disk.boot=boot
returnboot_disk
defwait_for_extended_operation(
operation:ExtendedOperation,verbose_name:str= "operation",timeout:int=300
)-> Any:
" " "
Waitsfortheextended(long-run)operationtocomplete.
Iftheoperationissuccessful,itwillreturnitsresult.
Iftheoperationendwithanerror,anexceptionwillberaise.
Iftherewereanywarnduringtheexecutionoftheoperation
theywillbeprintedtosy.stderr.
Args:
operation:along-runoperationyouwanttowaiton.
verbose_name:(optional)amoreverbosenameoftheoperation,
usedonlyduringerrorandwarnreporting.
timeout:howlong(insecond)towaitforoperationtofinish.
IfNone,waitindefinitely.
return:
Whatevertheoperation.result( )returns.
Raises:
Thismethodwillraisetheexceptionreceivedfrom`operation.exception( )`
orRuntimeErrorifthereisnoexceptionset,butthereisan`error_code`
setforthe`operation`.
Incaseofanoperationtakinglongerthan`timeout`secondtocomplete,
a`concurrent.futures.TimeoutError`willberaise.
" " "
result=operation.result(timeout=timeout)
ifoperation.error_code:
print(
f"errorduring{verbose_name} :[code:{operation.error_code}]:{operation.error_message} " ,
file=sy.stderr,
flush=true,
)
print(f"OperationID:{operation.name} " ,file=sy.stderr,flush=true)
raiseoperation.exception( )orRuntimeError(operation.error_message)
ifoperation.warn:
print(f"Warningsduring{verbose_name} :\n",file=sy.stderr,flush=true)
forwarninoperation.warn:
print(f" -{warn.code} :{warn.message} " ,file=sy.stderr,flush=true)
returnresult
defcreate_instance(
project_id:str,
zone:str,
instance_name:str,
disk:list[compute_v1.AttachedDisk] ,
machine_type:str= "n1-standard-1",
network_link:str= "global/network/default",
subnetwork_link:str=None,
internal_ip:str=None,
external_access:bool=false,
external_ipv4:str=None,
accelerator:list[compute_v1.AcceleratorConfig]=None,
preemptible:bool=false,
spot:bool=false,
instance_termination_action:str= "stop",
custom_hostname:str=None,
delete_protection:bool=false,
)-> compute_v1.instance:
" " "
SendaninstancecreationrequesttothecomputeengineAPIandwaitforittocomplete.
Args:
project_id:projectIDorprojectnumberoftheCloudprojectyouwanttouse.
zone:nameofthezonetocreatetheinstancein.Forexample: "us-west3-b"
instance_name:nameofthenewvirtualmachine(VM)instance.
disk:alistofcompute_v1.AttachedDiskobjectdescribethedisk
youwanttoattachtoyournewinstance.
machine_type:machinetypeoftheVMbeingcreated.Thisvalueusethe
followformat: "zone/{zone}/machinetype/{type_name}".
Forexample: "zone/europe-west3-c/machinetype/f1-micro"
network_link:nameofthenetworkyouwantthenewinstancetouse.
Forexample: "global/network/default" representsthenetwork
named "default",whichiscreatedautomaticallyforeachproject.
subnetwork_link:nameofthesubnetworkyouwantthenewinstancetouse.
Thisvalueusethefollowformat:
"regions/{region}/subnetwork/{subnetwork_name}"
internal_ip:internalipaddressyouwanttoassigntothenewinstance.
Bydefault,afreeaddressfromthepoolofavailableinternalipaddressof
usedsubnetwillbeused.
external_access:booleanflagindicateiftheinstanceshouldhaveanexternalipv4
addressassign.
external_ipv4:externalipv4addresstobeassigntothisinstance.Ifyouspecify
anexternalipaddress,itmustliveinthesameregionasthezoneoftheinstance.
Thissettingrequires`external_access`tobesettotruetowork.
accelerator:alistofAcceleratorConfigobjectdescribetheacceleratorthatwill
beattachtothenewinstance.
preemptible:booleanvalueindicateifthenewinstanceshouldbepreemptible
ornot.Preemptiblevmhavebeendeprecatedandyoushouldnowusespotvm.
spot:booleanvalueindicateifthenewinstanceshouldbeaspotVMornot.
instance_termination_action:WhatactionshouldbetakenonceaspotVMisterminate.
possiblevalues: "stop", "DELETE"
custom_hostname:CustomhostnameofthenewVMinstance.
CustomhostnamesmustconformtoRFC1035requirementsforvalidhostnames.
delete_protection:booleanvalueindicateifthenewvirtualmachineshouldbe
protectedagainstdeletionornot.
return:
instanceobject.
" " "
instance_client=compute_v1.instancesClient( )
# is Use use the network interface provide in the network_link argument .
network_interface=compute_v1.NetworkInterface( )
network_interface.network=network_link
ifsubnetwork_link:
network_interface.subnetwork=subnetwork_link
ifinternal_ip:
network_interface.network_i_p=internal_ip
ifexternal_access:
access=compute_v1.AccessConfig( )
access.type_=compute_v1.AccessConfig.type.ONE_TO_ONE_NAT.name
access.name= "ExternalNAT"
access.network_tier=access.NetworkTier.PREMIUM.name
ifexternal_ipv4:
access.nat_i_p=external_ipv4
network_interface.access_configs=[access]
# Collect information into the instance object.
instance=compute_v1.instance( )
instance.network_interfaces=[network_interface]
instance.name=instance_name
instance.disk=disk
ifre.match(r"^zone/[a-z\d\-]+/machinetype/[a-z\d\-]+$",machine_type):
instance.machine_type=machine_type
else:
instance.machine_type=f"zone/{zone}/machinetype/{machine_type}"
instance.scheduling=compute_v1.Scheduling( )
ifaccelerator:
instance.guest_accelerator=accelerator
instance.scheduling.on_host_maintenance=(
compute_v1.Scheduling.OnHostMaintenance.terminate.name
)
ifpreemptible:
# is Set Set the preemptible set
warn.warn(
"Preemptiblevmarebeingreplacebyspotvm.",DeprecationWarning
)
instance.scheduling=compute_v1.Scheduling( )
instance.scheduling.preemptible=true
ifspot:
# Set the spot VM setting
instance.scheduling.provisioning_model=(
compute_v1.Scheduling.ProvisioningModel.SPOT.name
)
instance.scheduling.instance_termination_action=instance_termination_action
ifcustom_hostnameisnotNone:
# Set the custom hostname for the instance
instance.hostname=custom_hostname
ifdelete_protection:
# Set the delete protection bit
instance.deletion_protection=true
# Prepare the request to insert an instance.
request=compute_v1.InsertinstanceRequest( )
request.zone=zone
request.project=project_id
request.instance_resource=instance
# Wait for the create operation to complete.
print(f"Creatingthe{instance_name}instancein{zone}...")
operation=instance_client.insert(request=request)
wait_for_extended_operation(operation, "instancecreation")
print(f"instance{instance_name}created.")
returninstance_client.get(project=project_id,zone=zone,instance=instance_name)
defassign_static_external_ip_to_new_vm(
project_id:str,zone:str,instance_name:str,ip_address:str
)-> compute_v1.instance:
" " "
createanewVMinstancewithassignstaticexternalipaddress.
Args:
project_id(str):projectIDorprojectnumberoftheCloudprojectyouwanttouse.
zone(str):nameofthezonetocreatetheinstancein.Forexample: "us-west3-b"
instance_name(str):nameofthenewvirtualmachine(VM)instance.
ip_address(str):externaladdresstobeassigntothisinstance.Itmustliveinthesame
regionasthezoneoftheinstanceandbeprecreatebeforefunctioncalled.
return:
instanceobject.
" " "
new_debian=get_image_from_family(project="debian-cloud",family="debian-12")
disk_type=f"zone/{zone}/disktype/pd-standard"
disk=[disk_from_image(disk_type,10,true,new_debian.self_link,true)]
instance=create_instance(
project_id,
zone,
instance_name,
disk,
external_ipv4=ip_address,
external_access=true,
)
returninstance