如何让AI帮你干活-娱乐(3)
背景

今天的话题会偏代码技巧一些,对于以前没有接触过代码的朋友或者接触代码开发经验较少的朋友会有些吃力。
上篇文章介绍了如何广视角的生成相对稳定的视频。昨天的实现相对简单,主要用的是UI界面来做生成。但是生成的效果其实也显而易见,不算太好。人物连贯性和动作连贯性是不够的。原因如下:
1.stablediffusion webui的batch 的image2image还无法真正做到image2image
2.控制其实是通过固定的文本prompt+多个controlnet来控制
然后如果希望画面能够稳定,其实image的控制信息和每张图用相对有差异的prompt生成的图片质量连贯性和稳定性会更好(image2image控制整体的风格和内容,controlnet控制细节,prompt可以控制一些内容差异)。
这篇文章不会跟大家分享,稳定图生成的具体细节。而是跟大家分享更重要的,如何用代码来实现webui的功能,如何用代码方式搭建更可控更高效的图生成链路。
内容
1.用代码搭建stablediffusion+controlnet生产流程
2.multi-control net生产流程搭建
3.diffuser没有的功能如何自己实现加入
用代码搭建stablediffusion+controlnet生产流程
安装diffuser包
pip install --upgrade diffusers accelerate transformers
#如果要安装最新diffuser可以执行下面指令
pip install git+https://github.com/huggingface/diffusers
! pip install controlnet_hinter==0.0.5搭建stablediffusion+controlnet脚本
from diffusers import StableDiffusionControlNetPipeline, ControlNetModel
from diffusers.utils import load_image
import torch
import numpy as np
from PIL import Image
import cv2#加载测试图片
original_image = load_image("https://huggingface.co/datasets/diffusers/test-arrays/resolve/main/stable_diffusion_imgvar/input_image_vermeer.png")
original_image#设置controlnet+stablefiddufion流水线
controlnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-canny",cache_dir='./')
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None
).to('cuda')
pipe.enable_xformers_memory_efficient_attention()#抽取图片canny边界
canny_edged_image = load_image("https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_canny_edged.png")
canny_edged_image#canny controlnet做图片生成
generator = torch.Generator(device="cpu").manual_seed(3)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=canny_edged_image,num_inference_steps=30, generator=generator).images[0]
image#设置canny编辑过滤阀值
control_image = np.array(original_image)
low_threshold = 10 # default=100
high_threshold = 150 # default=200
control_image = cv2.Canny(control_image, low_threshold, high_threshold)
control_image = control_image[:, :, None]
control_image = np.concatenate([control_image, control_image, control_image], axis=2)
control_image = Image.fromarray(control_image)
control_image#canny controlnet做图片生成
generator = torch.Generator(device="cpu").manual_seed(3)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=control_image,num_inference_steps=30, generator=generator).images[0]
image#pose的controlnet流水线
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-openpose",cache_dir= './')
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None
).to('cuda')
pipe.enable_xformers_memory_efficient_attention()#pose的图片
pose_image = load_image('https://huggingface.co/takuma104/controlnet_dev/resolve/main/pose.png')
pose_image#pose流水线生产图
generator = torch.Generator(device="cpu").manual_seed(0)
image = pipe(prompt="best quality, extremely detailed, football, a boy", negative_prompt="lowres, bad anatomy, worst quality, low quality",image=pose_image, generator=generator,num_inference_steps=30).images[0]
image#用controlnet_hinter抽取pose控制特征
control_image = controlnet_hinter.hint_openpose(original_image)
control_image#pose流水线生产图
generator = torch.Generator(device="cpu").manual_seed(0)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="lowres, bad anatomy, worst quality, low quality",image=control_image, generator=generator,num_inference_steps=30).images[0]
image#深度图
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-depth")
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None
).to('cuda')
pipe.enable_xformers_memory_efficient_attention()control_image = controlnet_hinter.hint_depth(original_image)
control_imagegenerator = torch.Generator(device="cpu").manual_seed(0)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=control_image, generator=generator,num_inference_steps=30).images[0]
image#轮廓图
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-scribble")
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None).to('cuda')
pipe.enable_xformers_memory_efficient_attention()scribble_image = load_image('https://github.com/lllyasviel/ControlNet/raw/main/test_imgs/user_1.png')
scribble_imagegenerator = torch.Generator(device="cpu").manual_seed(1)
image = pipe(prompt="a turtle, best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=scribble_image, generator=generator,num_inference_steps=30).images[0]
image#Segmentation
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-seg")
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None).to('cuda')
pipe.enable_xformers_memory_efficient_attention()
control_image = controlnet_hinter.hint_segmentation(original_image)
control_imagegenerator = torch.Generator(device="cpu").manual_seed(0)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=control_image, generator=generator,num_inference_steps=30).images[0]
image#Hough 建筑、大场景里用的多
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-mlsd")
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None).to('cuda')
pipe.enable_xformers_memory_efficient_attention()control_image = controlnet_hinter.hint_hough(original_image)
control_imagegenerator = torch.Generator(device="cpu").manual_seed(2)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=control_image, generator=generator,num_inference_steps=30).images[0]
imagemulti-control net生产流程搭建
diffuser包里面现在还没实现多control net控制图生成,需要使用multi-controlnet可以用以下代码。
# Copyright 2023 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.import inspect
from typing import Any, Callable, Dict, List, Optional, Union, Tupleimport numpy as np
import PIL.Image
import torch
from transformers import CLIPFeatureExtractor, CLIPTextModel, CLIPTokenizerfrom diffusers import AutoencoderKL, ControlNetModel, UNet2DConditionModel
from diffusers.schedulers import KarrasDiffusionSchedulers
from diffusers.utils import (PIL_INTERPOLATION,is_accelerate_available,is_accelerate_version,logging,randn_tensor,replace_example_docstring,
)
from diffusers.pipeline_utils import DiffusionPipeline
from diffusers.pipelines.stable_diffusion import StableDiffusionPipelineOutput, StableDiffusionSafetyChecker
from diffusers.models.controlnet import ControlNetOutputlogger = logging.get_logger(__name__) # pylint: disable=invalid-nameclass ControlNetProcessor(object):def __init__(self,controlnet: ControlNetModel,image: Union[torch.FloatTensor, PIL.Image.Image, List[torch.FloatTensor], List[PIL.Image.Image]],conditioning_scale: float = 1.0,):self.controlnet = controlnetself.image = imageself.conditioning_scale = conditioning_scaledef _default_height_width(self, height, width, image):if isinstance(image, list):image = image[0]if height is None:if isinstance(image, PIL.Image.Image):height = image.heightelif isinstance(image, torch.Tensor):height = image.shape[3]height = (height // 8) * 8 # round down to nearest multiple of 8if width is None:if isinstance(image, PIL.Image.Image):width = image.widthelif isinstance(image, torch.Tensor):width = image.shape[2]width = (width // 8) * 8 # round down to nearest multiple of 8return height, widthdef default_height_width(self, height, width):return self._default_height_width(height, width, self.image)def _prepare_image(self, image, width, height, batch_size, num_images_per_prompt, device, dtype):if not isinstance(image, torch.Tensor):if isinstance(image, PIL.Image.Image):image = [image]if isinstance(image[0], PIL.Image.Image):image = [np.array(i.resize((width, height), resample=PIL_INTERPOLATION["lanczos"]))[None, :] for i in image]image = np.concatenate(image, axis=0)image = np.array(image).astype(np.float32) / 255.0image = image.transpose(0, 3, 1, 2)image = torch.from_numpy(image)elif isinstance(image[0], torch.Tensor):image = torch.cat(image, dim=0)image_batch_size = image.shape[0]if image_batch_size == 1:repeat_by = batch_sizeelse:# image batch size is the same as prompt batch sizerepeat_by = num_images_per_promptimage = image.repeat_interleave(repeat_by, dim=0)image = image.to(device=device, dtype=dtype)return imagedef _check_inputs(self, image, prompt, prompt_embeds):image_is_pil = isinstance(image, PIL.Image.Image)image_is_tensor = isinstance(image, torch.Tensor)image_is_pil_list = isinstance(image, list) and isinstance(image[0], PIL.Image.Image)image_is_tensor_list = isinstance(image, list) and isinstance(image[0], torch.Tensor)if not image_is_pil and not image_is_tensor and not image_is_pil_list and not image_is_tensor_list:raise TypeError("image must be passed and be one of PIL image, torch tensor, list of PIL images, or list of torch tensors")if image_is_pil:image_batch_size = 1elif image_is_tensor:image_batch_size = image.shape[0]elif image_is_pil_list:image_batch_size = len(image)elif image_is_tensor_list:image_batch_size = len(image)if prompt is not None and isinstance(prompt, str):prompt_batch_size = 1elif prompt is not None and isinstance(prompt, list):prompt_batch_size = len(prompt)elif prompt_embeds is not None:prompt_batch_size = prompt_embeds.shape[0]if image_batch_size != 1 and image_batch_size != prompt_batch_size:raise ValueError(f"If image batch size is not 1, image batch size must be same as prompt batch size. image batch size: {image_batch_size}, prompt batch size: {prompt_batch_size}")def check_inputs(self, prompt, prompt_embeds):self._check_inputs(self.image, prompt, prompt_embeds)def prepare_image(self, width, height, batch_size, num_images_per_prompt, device, do_classifier_free_guidance):self.image = self._prepare_image(self.image, width, height, batch_size, num_images_per_prompt, device, self.controlnet.dtype)if do_classifier_free_guidance:self.image = torch.cat([self.image] * 2)def __call__(self,sample: torch.FloatTensor,timestep: Union[torch.Tensor, float, int],encoder_hidden_states: torch.Tensor,class_labels: Optional[torch.Tensor] = None,timestep_cond: Optional[torch.Tensor] = None,attention_mask: Optional[torch.Tensor] = None,cross_attention_kwargs: Optional[Dict[str, Any]] = None,return_dict: bool = True,) -> Tuple:down_block_res_samples, mid_block_res_sample = self.controlnet(sample=sample,controlnet_cond=self.image,timestep=timestep,encoder_hidden_states=encoder_hidden_states,class_labels=class_labels,timestep_cond=timestep_cond,attention_mask=attention_mask,cross_attention_kwargs=cross_attention_kwargs,return_dict=False,)down_block_res_samples = [down_block_res_sample * self.conditioning_scale for down_block_res_sample in down_block_res_samples]mid_block_res_sample *= self.conditioning_scalereturn (down_block_res_samples, mid_block_res_sample)EXAMPLE_DOC_STRING = """Examples:```py>>> # !pip install opencv-python transformers accelerate>>> from diffusers import StableDiffusionControlNetPipeline, ControlNetModel, UniPCMultistepScheduler>>> from diffusers.utils import load_image>>> import numpy as np>>> import torch>>> import cv2>>> from PIL import Image>>> # download an image>>> image = load_image(... "https://hf.co/datasets/huggingface/documentation-images/resolve/main/diffusers/input_image_vermeer.png"... )>>> image = np.array(image)>>> # get canny image>>> image = cv2.Canny(image, 100, 200)>>> image = image[:, :, None]>>> image = np.concatenate([image, image, image], axis=2)>>> canny_image = Image.fromarray(image)>>> # load control net and stable diffusion v1-5>>> controlnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-canny", torch_dtype=torch.float16)>>> pipe = StableDiffusionControlNetPipeline.from_pretrained(... "runwayml/stable-diffusion-v1-5", controlnet=controlnet, torch_dtype=torch.float16... )>>> # speed up diffusion process with faster scheduler and memory optimization>>> pipe.scheduler = UniPCMultistepScheduler.from_config(pipe.scheduler.config)>>> # remove following line if xformers is not installed>>> pipe.enable_xformers_memory_efficient_attention()>>> pipe.enable_model_cpu_offload()>>> # generate image>>> generator = torch.manual_seed(0)>>> image = pipe(... "futuristic-looking woman", num_inference_steps=20, generator=generator, image=canny_image... ).images[0]```
"""class StableDiffusionMultiControlNetPipeline(DiffusionPipeline):r"""Pipeline for text-to-image generation using Stable Diffusion with ControlNet guidance.This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods thelibrary implements for all the pipelines (such as downloading or saving, running on a particular device, etc.)Args:vae ([`AutoencoderKL`]):Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.text_encoder ([`CLIPTextModel`]):Frozen text-encoder. Stable Diffusion uses the text portion of[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specificallythe [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant.tokenizer (`CLIPTokenizer`):Tokenizer of class[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).unet ([`UNet2DConditionModel`]): Conditional U-Net architecture to denoise the encoded image latents.scheduler ([`SchedulerMixin`]):A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].safety_checker ([`StableDiffusionSafetyChecker`]):Classification module that estimates whether generated images could be considered offensive or harmful.Please, refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for details.feature_extractor ([`CLIPFeatureExtractor`]):Model that extracts features from generated images to be used as inputs for the `safety_checker`."""_optional_components = ["safety_checker", "feature_extractor"]def __init__(self,vae: AutoencoderKL,text_encoder: CLIPTextModel,tokenizer: CLIPTokenizer,unet: UNet2DConditionModel,scheduler: KarrasDiffusionSchedulers,safety_checker: StableDiffusionSafetyChecker,feature_extractor: CLIPFeatureExtractor,requires_safety_checker: bool = True,):super().__init__()if safety_checker is None and requires_safety_checker:logger.warning(f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"" that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered"" results in services or applications open to the public. Both the diffusers team and Hugging Face"" strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"" it only for use-cases that involve analyzing network behavior or auditing its results. For more"" information, please have a look at https://github.com/huggingface/diffusers/pull/254 .")if safety_checker is not None and feature_extractor is None:raise ValueError("Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety"" checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead.")self.register_modules(vae=vae,text_encoder=text_encoder,tokenizer=tokenizer,unet=unet,scheduler=scheduler,safety_checker=safety_checker,feature_extractor=feature_extractor,)self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)self.register_to_config(requires_safety_checker=requires_safety_checker)# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.enable_vae_slicingdef enable_vae_slicing(self):r"""Enable sliced VAE decoding.When this option is enabled, the VAE will split the input tensor in slices to compute decoding in severalsteps. This is useful to save some memory and allow larger batch sizes."""self.vae.enable_slicing()# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.disable_vae_slicingdef disable_vae_slicing(self):r"""Disable sliced VAE decoding. If `enable_vae_slicing` was previously invoked, this method will go back tocomputing decoding in one step."""self.vae.disable_slicing()def enable_sequential_cpu_offload(self, gpu_id=0):r"""Offloads all models to CPU using accelerate, significantly reducing memory usage. When called, unet,text_encoder, vae, controlnet, and safety checker have their state dicts saved to CPU and then are moved to a`torch.device('meta') and loaded to GPU only when their specific submodule has its `forward` method called.Note that offloading happens on a submodule basis. Memory savings are higher than with`enable_model_cpu_offload`, but performance is lower."""if is_accelerate_available():from accelerate import cpu_offloadelse:raise ImportError("Please install accelerate via `pip install accelerate`")device = torch.device(f"cuda:{gpu_id}")for cpu_offloaded_model in [self.unet, self.text_encoder, self.vae]:cpu_offload(cpu_offloaded_model, device)if self.safety_checker is not None:cpu_offload(self.safety_checker, execution_device=device, offload_buffers=True)def enable_model_cpu_offload(self, gpu_id=0):r"""Offloads all models to CPU using accelerate, reducing memory usage with a low impact on performance. Comparedto `enable_sequential_cpu_offload`, this method moves one whole model at a time to the GPU when its `forward`method is called, and the model remains in GPU until the next model runs. Memory savings are lower than with`enable_sequential_cpu_offload`, but performance is much better due to the iterative execution of the `unet`."""if is_accelerate_available() and is_accelerate_version(">=", "0.17.0.dev0"):from accelerate import cpu_offload_with_hookelse:raise ImportError("`enable_model_offload` requires `accelerate v0.17.0` or higher.")device = torch.device(f"cuda:{gpu_id}")hook = Nonefor cpu_offloaded_model in [self.text_encoder, self.unet, self.vae]:_, hook = cpu_offload_with_hook(cpu_offloaded_model, device, prev_module_hook=hook)if self.safety_checker is not None:# the safety checker can offload the vae again_, hook = cpu_offload_with_hook(self.safety_checker, device, prev_module_hook=hook)# control net hook has be manually offloaded as it alternates with unet# cpu_offload_with_hook(self.controlnet, device)# We'll offload the last model manually.self.final_offload_hook = hook@property# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._execution_devicedef _execution_device(self):r"""Returns the device on which the pipeline's models will be executed. After calling`pipeline.enable_sequential_cpu_offload()` the execution device can only be inferred from Accelerate's modulehooks."""if not hasattr(self.unet, "_hf_hook"):return self.devicefor module in self.unet.modules():if (hasattr(module, "_hf_hook")and hasattr(module._hf_hook, "execution_device")and module._hf_hook.execution_device is not None):return torch.device(module._hf_hook.execution_device)return self.device# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_promptdef _encode_prompt(self,prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_prompt=None,prompt_embeds: Optional[torch.FloatTensor] = None,negative_prompt_embeds: Optional[torch.FloatTensor] = None,):r"""Encodes the prompt into text encoder hidden states.Args:prompt (`str` or `List[str]`, *optional*):prompt to be encodeddevice: (`torch.device`):torch devicenum_images_per_prompt (`int`):number of images that should be generated per promptdo_classifier_free_guidance (`bool`):whether to use classifier free guidance or notnegative_prompt (`str` or `List[str]`, *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to pass`negative_prompt_embeds`. instead. If not defined, one has to pass `negative_prompt_embeds`. instead.Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`).prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from `prompt` input argument.negative_prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` inputargument."""if prompt is not None and isinstance(prompt, str):batch_size = 1elif prompt is not None and isinstance(prompt, list):batch_size = len(prompt)else:batch_size = prompt_embeds.shape[0]if prompt_embeds is None:text_inputs = self.tokenizer(prompt,padding="max_length",max_length=self.tokenizer.model_max_length,truncation=True,return_tensors="pt",)text_input_ids = text_inputs.input_idsuntruncated_ids = self.tokenizer(prompt, padding="longest", return_tensors="pt").input_idsif untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):removed_text = self.tokenizer.batch_decode(untruncated_ids[:, self.tokenizer.model_max_length - 1 : -1])logger.warning("The following part of your input was truncated because CLIP can only handle sequences up to"f" {self.tokenizer.model_max_length} tokens: {removed_text}")if hasattr(self.text_encoder.config, "use_attention_mask") and self.text_encoder.config.use_attention_mask:attention_mask = text_inputs.attention_mask.to(device)else:attention_mask = Noneprompt_embeds = self.text_encoder(text_input_ids.to(device),attention_mask=attention_mask,)prompt_embeds = prompt_embeds[0]prompt_embeds = prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)bs_embed, seq_len, _ = prompt_embeds.shape# duplicate text embeddings for each generation per prompt, using mps friendly methodprompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)prompt_embeds = prompt_embeds.view(bs_embed * num_images_per_prompt, seq_len, -1)# get unconditional embeddings for classifier free guidanceif do_classifier_free_guidance and negative_prompt_embeds is None:uncond_tokens: List[str]if negative_prompt is None:uncond_tokens = [""] * batch_sizeelif type(prompt) is not type(negative_prompt):raise TypeError(f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="f" {type(prompt)}.")elif isinstance(negative_prompt, str):uncond_tokens = [negative_prompt]elif batch_size != len(negative_prompt):raise ValueError(f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:"f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches"" the batch size of `prompt`.")else:uncond_tokens = negative_promptmax_length = prompt_embeds.shape[1]uncond_input = self.tokenizer(uncond_tokens,padding="max_length",max_length=max_length,truncation=True,return_tensors="pt",)if hasattr(self.text_encoder.config, "use_attention_mask") and self.text_encoder.config.use_attention_mask:attention_mask = uncond_input.attention_mask.to(device)else:attention_mask = Nonenegative_prompt_embeds = self.text_encoder(uncond_input.input_ids.to(device),attention_mask=attention_mask,)negative_prompt_embeds = negative_prompt_embeds[0]if do_classifier_free_guidance:# duplicate unconditional embeddings for each generation per prompt, using mps friendly methodseq_len = negative_prompt_embeds.shape[1]negative_prompt_embeds = negative_prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)negative_prompt_embeds = negative_prompt_embeds.repeat(1, num_images_per_prompt, 1)negative_prompt_embeds = negative_prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)# For classifier free guidance, we need to do two forward passes.# Here we concatenate the unconditional and text embeddings into a single batch# to avoid doing two forward passesprompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds])return prompt_embeds# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checkerdef run_safety_checker(self, image, device, dtype):if self.safety_checker is not None:safety_checker_input = self.feature_extractor(self.numpy_to_pil(image), return_tensors="pt").to(device)image, has_nsfw_concept = self.safety_checker(images=image, clip_input=safety_checker_input.pixel_values.to(dtype))else:has_nsfw_concept = Nonereturn image, has_nsfw_concept# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latentsdef decode_latents(self, latents):latents = 1 / self.vae.config.scaling_factor * latentsimage = self.vae.decode(latents).sampleimage = (image / 2 + 0.5).clamp(0, 1)# we always cast to float32 as this does not cause significant overhead and is compatible with bfloat16image = image.cpu().permute(0, 2, 3, 1).float().numpy()return image# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargsdef prepare_extra_step_kwargs(self, generator, eta):# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502# and should be between [0, 1]accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())extra_step_kwargs = {}if accepts_eta:extra_step_kwargs["eta"] = eta# check if the scheduler accepts generatoraccepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())if accepts_generator:extra_step_kwargs["generator"] = generatorreturn extra_step_kwargsdef check_inputs(self,prompt,height,width,callback_steps,negative_prompt=None,prompt_embeds=None,negative_prompt_embeds=None,):if height % 8 != 0 or width % 8 != 0:raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")if (callback_steps is None) or (callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)):raise ValueError(f"`callback_steps` has to be a positive integer but is {callback_steps} of type"f" {type(callback_steps)}.")if prompt is not None and prompt_embeds is not None:raise ValueError(f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"" only forward one of the two.")elif prompt is None and prompt_embeds is None:raise ValueError("Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined.")elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")if negative_prompt is not None and negative_prompt_embeds is not None:raise ValueError(f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"f" {negative_prompt_embeds}. Please make sure to only forward one of the two.")if prompt_embeds is not None and negative_prompt_embeds is not None:if prompt_embeds.shape != negative_prompt_embeds.shape:raise ValueError("`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"f" {negative_prompt_embeds.shape}.")# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latentsdef prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):shape = (batch_size, num_channels_latents, height // self.vae_scale_factor, width // self.vae_scale_factor)if isinstance(generator, list) and len(generator) != batch_size:raise ValueError(f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"f" size of {batch_size}. Make sure the batch size matches the length of the generators.")if latents is None:latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)else:latents = latents.to(device)# scale the initial noise by the standard deviation required by the schedulerlatents = latents * self.scheduler.init_noise_sigmareturn latents@torch.no_grad()@replace_example_docstring(EXAMPLE_DOC_STRING)def __call__(self,processors: List[ControlNetProcessor],prompt: Union[str, List[str]] = None,height: Optional[int] = None,width: Optional[int] = None,num_inference_steps: int = 50,guidance_scale: float = 7.5,negative_prompt: Optional[Union[str, List[str]]] = None,num_images_per_prompt: Optional[int] = 1,eta: float = 0.0,generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,latents: Optional[torch.FloatTensor] = None,prompt_embeds: Optional[torch.FloatTensor] = None,negative_prompt_embeds: Optional[torch.FloatTensor] = None,output_type: Optional[str] = "pil",return_dict: bool = True,callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None,callback_steps: int = 1,cross_attention_kwargs: Optional[Dict[str, Any]] = None,):r"""Function invoked when calling the pipeline for generation.Args:prompt (`str` or `List[str]`, *optional*):The prompt or prompts to guide the image generation. If not defined, one has to pass `prompt_embeds`.instead.height (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The height in pixels of the generated image.width (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The width in pixels of the generated image.num_inference_steps (`int`, *optional*, defaults to 50):The number of denoising steps. More denoising steps usually lead to a higher quality image at theexpense of slower inference.guidance_scale (`float`, *optional*, defaults to 7.5):Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598).`guidance_scale` is defined as `w` of equation 2. of [ImagenPaper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale >1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`,usually at the expense of lower image quality.negative_prompt (`str` or `List[str]`, *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to pass`negative_prompt_embeds`. instead. If not defined, one has to pass `negative_prompt_embeds`. instead.Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`).num_images_per_prompt (`int`, *optional*, defaults to 1):The number of images to generate per prompt.eta (`float`, *optional*, defaults to 0.0):Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to[`schedulers.DDIMScheduler`], will be ignored for others.generator (`torch.Generator` or `List[torch.Generator]`, *optional*):One or a list of [torch generator(s)](https://pytorch.org/docs/stable/generated/torch.Generator.html)to make generation deterministic.latents (`torch.FloatTensor`, *optional*):Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for imagegeneration. Can be used to tweak the same generation with different prompts. If not provided, a latentstensor will ge generated by sampling using the supplied random `generator`.prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from `prompt` input argument.negative_prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` inputargument.output_type (`str`, *optional*, defaults to `"pil"`):The output format of the generate image. Choose between[PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`.return_dict (`bool`, *optional*, defaults to `True`):Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of aplain tuple.callback (`Callable`, *optional*):A function that will be called every `callback_steps` steps during inference. The function will becalled with the following arguments: `callback(step: int, timestep: int, latents: torch.FloatTensor)`.callback_steps (`int`, *optional*, defaults to 1):The frequency at which the `callback` function will be called. If not specified, the callback will becalled at every step.cross_attention_kwargs (`dict`, *optional*):A kwargs dictionary that if specified is passed along to the `AttnProcessor` as defined under`self.processor` in[diffusers.cross_attention](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/cross_attention.py).Examples:Returns:[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`:[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple.When returning a tuple, the first element is a list with the generated images, and the second element is alist of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work"(nsfw) content, according to the `safety_checker`."""# 0. Default height and width to unetheight, width = processors[0].default_height_width(height, width)# 1. Check inputs. Raise error if not correctself.check_inputs(prompt, height, width, callback_steps, negative_prompt, prompt_embeds, negative_prompt_embeds)for processor in processors:processor.check_inputs(prompt, prompt_embeds)# 2. Define call parametersif prompt is not None and isinstance(prompt, str):batch_size = 1elif prompt is not None and isinstance(prompt, list):batch_size = len(prompt)else:batch_size = prompt_embeds.shape[0]device = self._execution_device# here `guidance_scale` is defined analog to the guidance weight `w` of equation (2)# of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1`# corresponds to doing no classifier free guidance.do_classifier_free_guidance = guidance_scale > 1.0# 3. Encode input promptprompt_embeds = self._encode_prompt(prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_prompt,prompt_embeds=prompt_embeds,negative_prompt_embeds=negative_prompt_embeds,)# 4. Prepare imagefor processor in processors:processor.prepare_image(width=width,height=height,batch_size=batch_size * num_images_per_prompt,num_images_per_prompt=num_images_per_prompt,device=device,do_classifier_free_guidance=do_classifier_free_guidance,)# 5. Prepare timestepsself.scheduler.set_timesteps(num_inference_steps, device=device)timesteps = self.scheduler.timesteps# 6. Prepare latent variablesnum_channels_latents = self.unet.in_channelslatents = self.prepare_latents(batch_size * num_images_per_prompt,num_channels_latents,height,width,prompt_embeds.dtype,device,generator,latents,)# 7. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipelineextra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta)# 8. Denoising loopnum_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.orderwith self.progress_bar(total=num_inference_steps) as progress_bar:for i, t in enumerate(timesteps):# expand the latents if we are doing classifier free guidancelatent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latentslatent_model_input = self.scheduler.scale_model_input(latent_model_input, t)# controlnet inferencefor i, processor in enumerate(processors):down_samples, mid_sample = processor(latent_model_input,t,encoder_hidden_states=prompt_embeds,return_dict=False,)if i == 0:down_block_res_samples, mid_block_res_sample = down_samples, mid_sampleelse:down_block_res_samples = [d_prev + d_curr for d_prev, d_curr in zip(down_block_res_samples, down_samples)]mid_block_res_sample = mid_block_res_sample + mid_sample# predict the noise residualnoise_pred = self.unet(latent_model_input,t,encoder_hidden_states=prompt_embeds,cross_attention_kwargs=cross_attention_kwargs,down_block_additional_residuals=down_block_res_samples,mid_block_additional_residual=mid_block_res_sample,).sample# perform guidanceif do_classifier_free_guidance:noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)# compute the previous noisy sample x_t -> x_t-1latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample# call the callback, if providedif i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):progress_bar.update()if callback is not None and i % callback_steps == 0:callback(i, t, latents)# If we do sequential model offloading, let's offload unet and controlnet# manually for max memory savingsif hasattr(self, "final_offload_hook") and self.final_offload_hook is not None:self.unet.to("cpu")torch.cuda.empty_cache()if output_type == "latent":image = latentshas_nsfw_concept = Noneelif output_type == "pil":# 8. Post-processingimage = self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype)# 10. Convert to PILimage = self.numpy_to_pil(image)else:# 8. Post-processingimage = self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype)# Offload last model to CPUif hasattr(self, "final_offload_hook") and self.final_offload_hook is not None:self.final_offload_hook.offload()if not return_dict:return (image, has_nsfw_concept)return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept)# demo & simple test
def main():from diffusers.utils import load_imagepipe = StableDiffusionMultiControlNetPipeline.from_pretrained("./model", safety_checker=None, torch_dtype=torch.float16).to("cuda")pipe.enable_xformers_memory_efficient_attention()controlnet_canny = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-canny",cache_dir='./controlmodel', torch_dtype=torch.float16).to("cuda")controlnet_pose = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-openpose",cache_dir='./controlmodel', torch_dtype=torch.float16).to("cuda")canny_left = load_image("https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_left.png")canny_right = load_image("https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_right.png")pose_right = load_image("https://huggingface.co/takuma104/controlnet_dev/resolve/main/pose_right.png")image = pipe(prompt="best quality, extremely detailed",negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",processors=[ControlNetProcessor(controlnet_canny, canny_left),ControlNetProcessor(controlnet_canny, canny_right),],generator=torch.Generator(device="cpu").manual_seed(0),num_inference_steps=30,width=512,height=512,).images[0]image.save("./canny_left_right.png")image = pipe(prompt="best quality, extremely detailed",negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",processors=[ControlNetProcessor(controlnet_canny, canny_left,0.5),ControlNetProcessor(controlnet_pose, pose_right,1.6),],generator=torch.Generator(device="cpu").manual_seed(0),num_inference_steps=30,width=512,height=512,).images[0]image.save("./canny_left_pose_right.png")if __name__ == "__main__":main()diffuser没有的功能如何自己实现加入
假设我们要自己实现一个stablediffusion+controlnet+inpaint的功能,该如何实现。这个任务大部分是在生产流程上做串接,所以代码基本可以定位在pipline模块stablediffusion。
假设我们代码实现如下(代码在特定版本有效,这个后面会升级,大家可以不急着用。只是跟大家讲解diffuser的代码框架,以及改动一个模块该如何融入diffuser中供自己使用)
#代码文件名:pipeline_stable_diffusion_controlnet_inpaint.py
# Copyright 2023 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.import inspect
from typing import Any, Callable, Dict, List, Optional, Unionimport numpy as np
import PIL.Image
import torch
from transformers import CLIPFeatureExtractor, CLIPTextModel, CLIPTokenizerfrom ...models import AutoencoderKL, UNet2DConditionModel
from ...schedulers import KarrasDiffusionSchedulers
from ...utils import is_accelerate_available, logging, randn_tensor, replace_example_docstring
from ..pipeline_utils import DiffusionPipeline
from . import StableDiffusionPipelineOutput
from .safety_checker import StableDiffusionSafetyCheckerlogger = logging.get_logger(__name__) # pylint: disable=invalid-nameEXAMPLE_DOC_STRING = """Examples:```py>>> from diffusers import StableDiffusionControlNetPipeline>>> from diffusers.utils import load_image>>> # Canny edged image for control>>> canny_edged_image = load_image(... "https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_canny_edged.png"... )>>> pipe = StableDiffusionControlNetPipeline.from_pretrained("takuma104/control_sd15_canny").to("cuda")>>> image = pipe(prompt="best quality, extremely detailed", controlnet_hint=canny_edged_image).images[0]```
"""def prepare_mask_and_masked_image(image, mask):"""Prepares a pair (image, mask) to be consumed by the Stable Diffusion pipeline. This means that those inputs will beconverted to ``torch.Tensor`` with shapes ``batch x channels x height x width`` where ``channels`` is ``3`` for the``image`` and ``1`` for the ``mask``.The ``image`` will be converted to ``torch.float32`` and normalized to be in ``[-1, 1]``. The ``mask`` will bebinarized (``mask > 0.5``) and cast to ``torch.float32`` too.Args:image (Union[np.array, PIL.Image, torch.Tensor]): The image to inpaint.It can be a ``PIL.Image``, or a ``height x width x 3`` ``np.array`` or a ``channels x height x width````torch.Tensor`` or a ``batch x channels x height x width`` ``torch.Tensor``.mask (_type_): The mask to apply to the image, i.e. regions to inpaint.It can be a ``PIL.Image``, or a ``height x width`` ``np.array`` or a ``1 x height x width````torch.Tensor`` or a ``batch x 1 x height x width`` ``torch.Tensor``.Raises:ValueError: ``torch.Tensor`` images should be in the ``[-1, 1]`` range. ValueError: ``torch.Tensor`` maskshould be in the ``[0, 1]`` range. ValueError: ``mask`` and ``image`` should have the same spatial dimensions.TypeError: ``mask`` is a ``torch.Tensor`` but ``image`` is not(ot the other way around).Returns:tuple[torch.Tensor]: The pair (mask, masked_image) as ``torch.Tensor`` with 4dimensions: ``batch x channels x height x width``."""if isinstance(image, torch.Tensor):if not isinstance(mask, torch.Tensor):raise TypeError(f"`image` is a torch.Tensor but `mask` (type: {type(mask)} is not")# Batch single imageif image.ndim == 3:assert image.shape[0] == 3, "Image outside a batch should be of shape (3, H, W)"image = image.unsqueeze(0)# Batch and add channel dim for single maskif mask.ndim == 2:mask = mask.unsqueeze(0).unsqueeze(0)# Batch single mask or add channel dimif mask.ndim == 3:# Single batched mask, no channel dim or single mask not batched but channel dimif mask.shape[0] == 1:mask = mask.unsqueeze(0)# Batched masks no channel dimelse:mask = mask.unsqueeze(1)assert image.ndim == 4 and mask.ndim == 4, "Image and Mask must have 4 dimensions"assert image.shape[-2:] == mask.shape[-2:], "Image and Mask must have the same spatial dimensions"assert image.shape[0] == mask.shape[0], "Image and Mask must have the same batch size"# Check image is in [-1, 1]if image.min() < -1 or image.max() > 1:raise ValueError("Image should be in [-1, 1] range")# Check mask is in [0, 1]if mask.min() < 0 or mask.max() > 1:raise ValueError("Mask should be in [0, 1] range")# Binarize maskmask[mask < 0.5] = 0mask[mask >= 0.5] = 1# Image as float32image = image.to(dtype=torch.float32)elif isinstance(mask, torch.Tensor):raise TypeError(f"`mask` is a torch.Tensor but `image` (type: {type(image)} is not")else:# preprocess imageif isinstance(image, (PIL.Image.Image, np.ndarray)):image = [image]if isinstance(image, list) and isinstance(image[0], PIL.Image.Image):image = [np.array(i.convert("RGB"))[None, :] for i in image]image = np.concatenate(image, axis=0)elif isinstance(image, list) and isinstance(image[0], np.ndarray):image = np.concatenate([i[None, :] for i in image], axis=0)image = image.transpose(0, 3, 1, 2)image = torch.from_numpy(image).to(dtype=torch.float32) / 127.5 - 1.0# preprocess maskif isinstance(mask, (PIL.Image.Image, np.ndarray)):mask = [mask]if isinstance(mask, list) and isinstance(mask[0], PIL.Image.Image):mask = np.concatenate([np.array(m.convert("L"))[None, None, :] for m in mask], axis=0)mask = mask.astype(np.float32) / 255.0elif isinstance(mask, list) and isinstance(mask[0], np.ndarray):mask = np.concatenate([m[None, None, :] for m in mask], axis=0)mask[mask < 0.5] = 0mask[mask >= 0.5] = 1mask = torch.from_numpy(mask)masked_image = image * (mask < 0.5)return mask, masked_imageclass StableDiffusionControlNetInpaintPipeline(DiffusionPipeline):r"""Pipeline for text-to-image generation using Stable Diffusion with ControlNet guidance.This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods thelibrary implements for all the pipelines (such as downloading or saving, running on a particular device, etc.)Args:vae ([`AutoencoderKL`]):Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.text_encoder ([`CLIPTextModel`]):Frozen text-encoder. Stable Diffusion uses the text portion of[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specificallythe [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant.tokenizer (`CLIPTokenizer`):Tokenizer of class[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).unet ([`UNet2DConditionModel`]): Conditional U-Net architecture to denoise the encoded image latents.controlnet ([`UNet2DConditionModel`]):[ControlNet](https://arxiv.org/abs/2302.05543) architecture to generate guidance.scheduler ([`SchedulerMixin`]):A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].safety_checker ([`StableDiffusionSafetyChecker`]):Classification module that estimates whether generated images could be considered offensive or harmful.Please, refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for details.feature_extractor ([`CLIPFeatureExtractor`]):Model that extracts features from generated images to be used as inputs for the `safety_checker`."""def __init__(self,vae: AutoencoderKL,text_encoder: CLIPTextModel,tokenizer: CLIPTokenizer,unet: UNet2DConditionModel,controlnet: UNet2DConditionModel,scheduler: KarrasDiffusionSchedulers,safety_checker: StableDiffusionSafetyChecker,feature_extractor: CLIPFeatureExtractor,requires_safety_checker: bool = True,):super().__init__()self.register_modules(vae=vae,text_encoder=text_encoder,tokenizer=tokenizer,unet=unet,controlnet=controlnet,scheduler=scheduler,safety_checker=safety_checker,feature_extractor=feature_extractor,)self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)self.register_to_config(requires_safety_checker=requires_safety_checker)def enable_vae_slicing(self):r"""Enable sliced VAE decoding.When this option is enabled, the VAE will split the input tensor in slices to compute decoding in severalsteps. This is useful to save some memory and allow larger batch sizes."""self.vae.enable_slicing()def disable_vae_slicing(self):r"""Disable sliced VAE decoding. If `enable_vae_slicing` was previously invoked, this method will go back tocomputing decoding in one step."""self.vae.disable_slicing()def enable_sequential_cpu_offload(self, gpu_id=0):r"""Offloads all models to CPU using accelerate, significantly reducing memory usage. When called, unet,text_encoder, vae and safety checker have their state dicts saved to CPU and then are moved to a`torch.device('meta') and loaded to GPU only when their specific submodule has its `forward` method called."""if is_accelerate_available():from accelerate import cpu_offloadelse:raise ImportError("Please install accelerate via `pip install accelerate`")device = torch.device(f"cuda:{gpu_id}")for cpu_offloaded_model in [self.unet, self.text_encoder, self.vae]:cpu_offload(cpu_offloaded_model, device)if self.safety_checker is not None:cpu_offload(self.safety_checker, execution_device=device, offload_buffers=True)@propertydef _execution_device(self):r"""Returns the device on which the pipeline's models will be executed. After calling`pipeline.enable_sequential_cpu_offload()` the execution device can only be inferred from Accelerate's modulehooks."""if self.device != torch.device("meta") or not hasattr(self.unet, "_hf_hook"):return self.devicefor module in self.unet.modules():if (hasattr(module, "_hf_hook")and hasattr(module._hf_hook, "execution_device")and module._hf_hook.execution_device is not None):return torch.device(module._hf_hook.execution_device)return self.devicedef _encode_prompt(self,prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_prompt=None,prompt_embeds: Optional[torch.FloatTensor] = None,negative_prompt_embeds: Optional[torch.FloatTensor] = None,):r"""Encodes the prompt into text encoder hidden states.Args:prompt (`str` or `List[str]`, *optional*):prompt to be encodeddevice: (`torch.device`):torch devicenum_images_per_prompt (`int`):number of images that should be generated per promptdo_classifier_free_guidance (`bool`):whether to use classifier free guidance or notnegative_prompt (`str` or `List[str]`, *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to pass`negative_prompt_embeds`. instead. If not defined, one has to pass `negative_prompt_embeds`. instead.Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`).prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from `prompt` input argument.negative_prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` inputargument."""if prompt is not None and isinstance(prompt, str):batch_size = 1elif prompt is not None and isinstance(prompt, list):batch_size = len(prompt)else:batch_size = prompt_embeds.shape[0]if prompt_embeds is None:text_inputs = self.tokenizer(prompt,padding="max_length",max_length=self.tokenizer.model_max_length,truncation=True,return_tensors="pt",)text_input_ids = text_inputs.input_idsuntruncated_ids = self.tokenizer(prompt, padding="longest", return_tensors="pt").input_idsif untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):removed_text = self.tokenizer.batch_decode(untruncated_ids[:, self.tokenizer.model_max_length - 1 : -1])logger.warning("The following part of your input was truncated because CLIP can only handle sequences up to"f" {self.tokenizer.model_max_length} tokens: {removed_text}")if hasattr(self.text_encoder.config, "use_attention_mask") and self.text_encoder.config.use_attention_mask:attention_mask = text_inputs.attention_mask.to(device)else:attention_mask = Noneprompt_embeds = self.text_encoder(text_input_ids.to(device),attention_mask=attention_mask,)prompt_embeds = prompt_embeds[0]prompt_embeds = prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)bs_embed, seq_len, _ = prompt_embeds.shape# duplicate text embeddings for each generation per prompt, using mps friendly methodprompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)prompt_embeds = prompt_embeds.view(bs_embed * num_images_per_prompt, seq_len, -1)# get unconditional embeddings for classifier free guidanceif do_classifier_free_guidance and negative_prompt_embeds is None:uncond_tokens: List[str]if negative_prompt is None:uncond_tokens = [""] * batch_sizeelif type(prompt) is not type(negative_prompt):raise TypeError(f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="f" {type(prompt)}.")elif isinstance(negative_prompt, str):uncond_tokens = [negative_prompt]elif batch_size != len(negative_prompt):raise ValueError(f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:"f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches"" the batch size of `prompt`.")else:uncond_tokens = negative_promptmax_length = prompt_embeds.shape[1]uncond_input = self.tokenizer(uncond_tokens,padding="max_length",max_length=max_length,truncation=True,return_tensors="pt",)if hasattr(self.text_encoder.config, "use_attention_mask") and self.text_encoder.config.use_attention_mask:attention_mask = uncond_input.attention_mask.to(device)else:attention_mask = Nonenegative_prompt_embeds = self.text_encoder(uncond_input.input_ids.to(device),attention_mask=attention_mask,)negative_prompt_embeds = negative_prompt_embeds[0]if do_classifier_free_guidance:# duplicate unconditional embeddings for each generation per prompt, using mps friendly methodseq_len = negative_prompt_embeds.shape[1]negative_prompt_embeds = negative_prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)negative_prompt_embeds = negative_prompt_embeds.repeat(1, num_images_per_prompt, 1)negative_prompt_embeds = negative_prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)# For classifier free guidance, we need to do two forward passes.# Here we concatenate the unconditional and text embeddings into a single batch# to avoid doing two forward passesprompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds])return prompt_embedsdef run_safety_checker(self, image, device, dtype):if self.safety_checker is not None:safety_checker_input = self.feature_extractor(self.numpy_to_pil(image), return_tensors="pt").to(device)image, has_nsfw_concept = self.safety_checker(images=image, clip_input=safety_checker_input.pixel_values.to(dtype))else:has_nsfw_concept = Nonereturn image, has_nsfw_conceptdef decode_latents(self, latents):latents = 1 / self.vae.config.scaling_factor * latentsimage = self.vae.decode(latents).sampleimage = (image / 2 + 0.5).clamp(0, 1)# we always cast to float32 as this does not cause significant overhead and is compatible with bfloat16image = image.cpu().permute(0, 2, 3, 1).float().numpy()return imagedef prepare_extra_step_kwargs(self, generator, eta):# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502# and should be between [0, 1]accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())extra_step_kwargs = {}if accepts_eta:extra_step_kwargs["eta"] = eta# check if the scheduler accepts generatoraccepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())if accepts_generator:extra_step_kwargs["generator"] = generatorreturn extra_step_kwargsdef decode_latents(self, latents):latents = 1 / self.vae.config.scaling_factor * latentsimage = self.vae.decode(latents).sampleimage = (image / 2 + 0.5).clamp(0, 1)# we always cast to float32 as this does not cause significant overhead and is compatible with bfloat16image = image.cpu().permute(0, 2, 3, 1).float().numpy()return imagedef check_inputs(self,prompt,height,width,callback_steps,negative_prompt=None,prompt_embeds=None,negative_prompt_embeds=None,):if height % 8 != 0 or width % 8 != 0:raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")if (callback_steps is None) or (callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)):raise ValueError(f"`callback_steps` has to be a positive integer but is {callback_steps} of type"f" {type(callback_steps)}.")if prompt is not None and prompt_embeds is not None:raise ValueError(f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"" only forward one of the two.")elif prompt is None and prompt_embeds is None:raise ValueError("Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined.")elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")if negative_prompt is not None and negative_prompt_embeds is not None:raise ValueError(f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"f" {negative_prompt_embeds}. Please make sure to only forward one of the two.")if prompt_embeds is not None and negative_prompt_embeds is not None:if prompt_embeds.shape != negative_prompt_embeds.shape:raise ValueError("`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"f" {negative_prompt_embeds.shape}.")def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):shape = (batch_size, num_channels_latents, height // self.vae_scale_factor, width // self.vae_scale_factor)if isinstance(generator, list) and len(generator) != batch_size:raise ValueError(f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"f" size of {batch_size}. Make sure the batch size matches the length of the generators.")if latents is None:latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)else:latents = latents.to(device)# scale the initial noise by the standard deviation required by the schedulerlatents = latents * self.scheduler.init_noise_sigmareturn latentsdef controlnet_hint_conversion(self, controlnet_hint, height, width, num_images_per_prompt):channels = 3if isinstance(controlnet_hint, torch.Tensor):# torch.Tensor: acceptble shape are any of chw, bchw(b==1) or bchw(b==num_images_per_prompt)shape_chw = (channels, height, width)shape_bchw = (1, channels, height, width)shape_nchw = (num_images_per_prompt, channels, height, width)if controlnet_hint.shape in [shape_chw, shape_bchw, shape_nchw]:controlnet_hint = controlnet_hint.to(dtype=self.controlnet.dtype, device=self.controlnet.device)if controlnet_hint.shape != shape_nchw:controlnet_hint = controlnet_hint.repeat(num_images_per_prompt, 1, 1, 1)return controlnet_hintelse:raise ValueError(f"Acceptble shape of `controlnet_hint` are any of ({channels}, {height}, {width}),"+ f" (1, {channels}, {height}, {width}) or ({num_images_per_prompt}, "+ f"{channels}, {height}, {width}) but is {controlnet_hint.shape}")elif isinstance(controlnet_hint, np.ndarray):# np.ndarray: acceptable shape is any of hw, hwc, bhwc(b==1) or bhwc(b==num_images_per_promot)# hwc is opencv compatible image format. Color channel must be BGR Format.if controlnet_hint.shape == (height, width):controlnet_hint = np.repeat(controlnet_hint[:, :, np.newaxis], channels, axis=2) # hw -> hwc(c==3)shape_hwc = (height, width, channels)shape_bhwc = (1, height, width, channels)shape_nhwc = (num_images_per_prompt, height, width, channels)if controlnet_hint.shape in [shape_hwc, shape_bhwc, shape_nhwc]:controlnet_hint = torch.from_numpy(controlnet_hint.copy())controlnet_hint = controlnet_hint.to(dtype=self.controlnet.dtype, device=self.controlnet.device)controlnet_hint /= 255.0if controlnet_hint.shape != shape_nhwc:controlnet_hint = controlnet_hint.repeat(num_images_per_prompt, 1, 1, 1)controlnet_hint = controlnet_hint.permute(0, 3, 1, 2) # b h w c -> b c h wreturn controlnet_hintelse:raise ValueError(f"Acceptble shape of `controlnet_hint` are any of ({width}, {channels}), "+ f"({height}, {width}, {channels}), "+ f"(1, {height}, {width}, {channels}) or "+ f"({num_images_per_prompt}, {channels}, {height}, {width}) but is {controlnet_hint.shape}")elif isinstance(controlnet_hint, PIL.Image.Image):if controlnet_hint.size == (width, height):controlnet_hint = controlnet_hint.convert("RGB") # make sure 3 channel RGB formatcontrolnet_hint = np.array(controlnet_hint) # to numpycontrolnet_hint = controlnet_hint[:, :, ::-1] # RGB -> BGRreturn self.controlnet_hint_conversion(controlnet_hint, height, width, num_images_per_prompt)else:raise ValueError(f"Acceptable image size of `controlnet_hint` is ({width}, {height}) but is {controlnet_hint.size}")else:raise ValueError(f"Acceptable type of `controlnet_hint` are any of torch.Tensor, np.ndarray, PIL.Image.Image but is {type(controlnet_hint)}")def prepare_mask_latents(self, mask, masked_image, batch_size, height, width, dtype, device, generator, do_classifier_free_guidance):# resize the mask to latents shape as we concatenate the mask to the latents# we do that before converting to dtype to avoid breaking in case we're using cpu_offload# and half precisionmask = torch.nn.functional.interpolate(mask, size=(height // self.vae_scale_factor, width // self.vae_scale_factor))mask = mask.to(device=device, dtype=dtype)masked_image = masked_image.to(device=device, dtype=dtype)# encode the mask image into latents space so we can concatenate it to the latentsif isinstance(generator, list):masked_image_latents = [self.vae.encode(masked_image[i : i + 1]).latent_dist.sample(generator=generator[i])for i in range(batch_size)]masked_image_latents = torch.cat(masked_image_latents, dim=0)else:masked_image_latents = self.vae.encode(masked_image).latent_dist.sample(generator=generator)masked_image_latents = self.vae.config.scaling_factor * masked_image_latents# duplicate mask and masked_image_latents for each generation per prompt, using mps friendly methodif mask.shape[0] < batch_size:if not batch_size % mask.shape[0] == 0:raise ValueError("The passed mask and the required batch size don't match. Masks are supposed to be duplicated to"f" a total batch size of {batch_size}, but {mask.shape[0]} masks were passed. Make sure the number"" of masks that you pass is divisible by the total requested batch size.")mask = mask.repeat(batch_size // mask.shape[0], 1, 1, 1)if masked_image_latents.shape[0] < batch_size:if not batch_size % masked_image_latents.shape[0] == 0:raise ValueError("The passed images and the required batch size don't match. Images are supposed to be duplicated"f" to a total batch size of {batch_size}, but {masked_image_latents.shape[0]} images were passed."" Make sure the number of images that you pass is divisible by the total requested batch size.")masked_image_latents = masked_image_latents.repeat(batch_size // masked_image_latents.shape[0], 1, 1, 1)mask = torch.cat([mask] * 2) if do_classifier_free_guidance else maskmasked_image_latents = (torch.cat([masked_image_latents] * 2) if do_classifier_free_guidance else masked_image_latents)# aligning device to prevent device errors when concating it with the latent model inputmasked_image_latents = masked_image_latents.to(device=device, dtype=dtype)return mask, masked_image_latents@torch.no_grad()@replace_example_docstring(EXAMPLE_DOC_STRING)def __call__(self,prompt: Union[str, List[str]] = None,height: Optional[int] = None,width: Optional[int] = None,num_inference_steps: int = 50,guidance_scale: float = 7.5,negative_prompt: Optional[Union[str, List[str]]] = None,num_images_per_prompt: Optional[int] = 1,eta: float = 0.0,generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,latents: Optional[torch.FloatTensor] = None,prompt_embeds: Optional[torch.FloatTensor] = None,negative_prompt_embeds: Optional[torch.FloatTensor] = None,output_type: Optional[str] = "pil",return_dict: bool = True,callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None,callback_steps: Optional[int] = 1,cross_attention_kwargs: Optional[Dict[str, Any]] = None,controlnet_hint: Optional[Union[torch.FloatTensor, np.ndarray, PIL.Image.Image]] = None,image: Union[torch.FloatTensor, PIL.Image.Image] = None,mask_image: Union[torch.FloatTensor, PIL.Image.Image] = None,):r"""Function invoked when calling the pipeline for generation.Args:prompt (`str` or `List[str]`, *optional*):The prompt or prompts to guide the image generation. If not defined, one has to pass `prompt_embeds`.instead.height (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The height in pixels of the generated image.width (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The width in pixels of the generated image.num_inference_steps (`int`, *optional*, defaults to 50):The number of denoising steps. More denoising steps usually lead to a higher quality image at theexpense of slower inference.guidance_scale (`float`, *optional*, defaults to 7.5):Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598).`guidance_scale` is defined as `w` of equation 2. of [ImagenPaper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale >1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`,usually at the expense of lower image quality.negative_prompt (`str` or `List[str]`, *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to pass`negative_prompt_embeds`. instead. If not defined, one has to pass `negative_prompt_embeds`. instead.Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`).num_images_per_prompt (`int`, *optional*, defaults to 1):The number of images to generate per prompt.eta (`float`, *optional*, defaults to 0.0):Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to[`schedulers.DDIMScheduler`], will be ignored for others.generator (`torch.Generator` or `List[torch.Generator]`, *optional*):One or a list of [torch generator(s)](https://pytorch.org/docs/stable/generated/torch.Generator.html)to make generation deterministic.latents (`torch.FloatTensor`, *optional*):Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for imagegeneration. Can be used to tweak the same generation with different prompts. If not provided, a latentstensor will ge generated by sampling using the supplied random `generator`.prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from `prompt` input argument.negative_prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` inputargument.output_type (`str`, *optional*, defaults to `"pil"`):The output format of the generate image. Choose between[PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`.return_dict (`bool`, *optional*, defaults to `True`):Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of aplain tuple.callback (`Callable`, *optional*):A function that will be called every `callback_steps` steps during inference. The function will becalled with the following arguments: `callback(step: int, timestep: int, latents: torch.FloatTensor)`.callback_steps (`int`, *optional*, defaults to 1):The frequency at which the `callback` function will be called. If not specified, the callback will becalled at every step.cross_attention_kwargs (`dict`, *optional*):A kwargs dictionary that if specified is passed along to the `AttnProcessor` as defined under`self.processor` in[diffusers.cross_attention](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/cross_attention.py).controlnet_hint (`torch.FloatTensor`, `np.ndarray` or `PIL.Image.Image`, *optional*):ControlNet input embedding. ControlNet generates guidances using this input embedding. If the type isspecified as `torch.FloatTensor`, it is passed to ControlNet as is. If the type is `np.ndarray`, it isassumed to be an OpenCV compatible image format. PIL.Image.Image` can also be accepted as an image. Thesize of all these types must correspond to the output image size.Examples:Returns:[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`:[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple.When returning a tuple, the first element is a list with the generated images, and the second element is alist of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work"(nsfw) content, according to the `safety_checker`."""# 0. Default height and width to unetheight = height or self.unet.config.sample_size * self.vae_scale_factorwidth = width or self.unet.config.sample_size * self.vae_scale_factor# 1. Control Embedding check & conversionif controlnet_hint is not None:controlnet_hint = self.controlnet_hint_conversion(controlnet_hint, height, width, num_images_per_prompt)# 2. Check inputs. Raise error if not correctself.check_inputs(prompt, height, width, callback_steps, negative_prompt, prompt_embeds, negative_prompt_embeds)# 3. Define call parametersif prompt is not None and isinstance(prompt, str):batch_size = 1elif prompt is not None and isinstance(prompt, list):batch_size = len(prompt)else:batch_size = prompt_embeds.shape[0]device = self._execution_device# here `guidance_scale` is defined analog to the guidance weight `w` of equation (2)# of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1`# corresponds to doing no classifier free guidance.do_classifier_free_guidance = guidance_scale > 1.0# 4. Encode input promptprompt_embeds = self._encode_prompt(prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_prompt,prompt_embeds=prompt_embeds,negative_prompt_embeds=negative_prompt_embeds,)mask, masked_image = prepare_mask_and_masked_image(image, mask_image)# 5. Prepare timestepsself.scheduler.set_timesteps(num_inference_steps, device=device)timesteps = self.scheduler.timesteps# 6. Prepare latent variablesnum_channels_latents = self.unet.in_channelslatents = self.prepare_latents(batch_size * num_images_per_prompt,num_channels_latents,height,width,prompt_embeds.dtype,device,generator,latents,)mask, masked_image_latents = self.prepare_mask_latents(mask,masked_image,batch_size * num_images_per_prompt,height,width,prompt_embeds.dtype,device,generator,do_classifier_free_guidance,)num_channels_mask = mask.shape[1]num_channels_masked_image = masked_image_latents.shape[1]# 7. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipelineextra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta)# 8. Denoising loopnum_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.orderwith self.progress_bar(total=num_inference_steps) as progress_bar:for i, t in enumerate(timesteps):# expand the latents if we are doing classifier free guidancelatent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latentslatent_model_input = self.scheduler.scale_model_input(latent_model_input, t)if controlnet_hint is not None:# ControlNet predict the noise residualcontrol = self.controlnet(latent_model_input, t, encoder_hidden_states=prompt_embeds, controlnet_hint=controlnet_hint)control = [item for item in control]latent_model_input = torch.cat([latent_model_input, mask, masked_image_latents], dim=1)noise_pred = self.unet(latent_model_input,t,encoder_hidden_states=prompt_embeds,cross_attention_kwargs=cross_attention_kwargs,control=control,).sampleelse:# predict the noise residuallatent_model_input = torch.cat([latent_model_input, mask, masked_image_latents], dim=1)noise_pred = self.unet(latent_model_input,t,encoder_hidden_states=prompt_embeds,cross_attention_kwargs=cross_attention_kwargs,).sample# perform guidanceif do_classifier_free_guidance:noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)# compute the previous noisy sample x_t -> x_t-1latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample# call the callback, if providedif i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):progress_bar.update()if callback is not None and i % callback_steps == 0:callback(i, t, latents)if output_type == "latent":image = latentshas_nsfw_concept = Noneelif output_type == "pil":# 8. Post-processingimage = self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype)# 10. Convert to PILimage = self.numpy_to_pil(image)else:# 8. Post-processingimage = self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype)if not return_dict:return (image, has_nsfw_concept)return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept)
把文件放置在:

以下项目下的每个__init__.py需要把新加的类添加进去

添加信息可以参考文件里面的写法,一般如下:

相关文章:
如何让AI帮你干活-娱乐(3)
背景今天的话题会偏代码技巧一些,对于以前没有接触过代码的朋友或者接触代码开发经验较少的朋友会有些吃力。上篇文章介绍了如何广视角的生成相对稳定的视频。昨天的实现相对简单,主要用的是UI界面来做生成。但是生成的效果其实也显而易见,不…...
webview的工作、内存泄漏、漏洞以及缓存机制原理原理+方案解决
分析一段appium的日志来分析webview的工作原理,文章尾部附有自动化脚本及完整日志: 解析: 获取上下文列表 服务端发送命令adb shell cat /proc/net/unix获取域套接字列表。那什么是域套接字呢? 域套接字:是unix系统里…...
BFD协议原理
BFD协议原理引入背景不使用BFD带来的问题OSPF感知慢VRRP产生次优路径BFD技术简介BFD会话建立方式和检测机制BFD会话建立过程BFD工作流程BFD的单臂回声BFD默认参数以及调整方法总结引入背景 随着网络应用的广泛部署,网络发生中断可能影响业务正常运行并造成重大损失…...
你把骑行当什么? 它就是你需要的
1.骑行是一种有活力的运动,尝试一下你一定会喜欢上它的!2.把骑行当作一种娱乐,让自己快乐地体验自然的美!3.骑行可以帮助你改变心态,让你的心情变得更加愉悦!4.让骑行成为你每天的计划,看看骑行…...
python基础系列 —— 迭代器与内置高阶函数
目录 一、迭代器 1、基本概念 2、如何定义一个迭代器 3、如果判断对象是否是迭代器 4、如何重置迭代器 5、如何调用迭代器 二、高阶函数 1、map函数 2、filter函数 3、reduce函数 4、sorted函数 一、迭代器 1、基本概念 迭代:是一个重复的过程,每次重复…...
MySQL面试题-日志
目录 1.MySQL 中常见的日志有哪些? 2.慢查询日志有什么用? 3.binlog 主要记录了什么? 4.Mysql的binlog有几种录入格式?分别有什么区别? 5.redo log 如何保证事务的持久性? 6.页修改之后为什么不直接刷…...
Android 10.0 去掉Launcher3默认给 icon增加的APK图标白边
1.概述 在10.0的系统产品开发中,Launcher3定制化开发中,发现在给第三方app的icon绘制图标的时候,会有白边第三方app的图标没有完全绘制出来,而系统app不存在这个问题,是完全绘制出来的,所以需要分析图标绘制类来解决这个问题 2.去掉Launcher3默认给 icon增加的APK图标白…...
E900V21C(S905L-armbian)安装armbian-Ubuntu(WiFi)
基本上是s905L芯片的刷机都是如此,包括Q7等 在网上寻找好多的教程关于e900v21c的刷机包和教程都少的可怜,唯一的就是这个:山东联通版创维E900V21C盒子刷入Armbiam并安装宝塔和Docker,但他是不能用WiFi和蓝牙的然后就是寻找s90l的…...
tpc协议的3次握手和4次挥手
建立连接的3次握手过程: A: 我想和你建立连接,你收到我的请求吗?(我想娶你) B: 好的,我收到了你的请求,我们可以建立连接,我同意。(好的,我愿意嫁给你) A: 好的,我收到了你的回应,我…...
YOLOv5害虫识别项目代码打包完整上传Gitee仓库(已开源)以及git上传速率限制踩坑记录
YOLOv5害虫识别项目代码打包完整上传Gitee仓库(已开源)以及git上传速率限制踩坑记录 ps: 最近很多小伙伴需要这个害虫识别项目的源码,由于文件过大,所以将代码完整上传至gitee,所有文件、教程、论文、以及代码模型…...
从零开始学习c语言|21、动态内存管理
一、malloc函数 1、什么是malloc函数 malloc是memery(内存)和allocate(分配)的缩写,顾名思义,malloc函数为动态分配内存的意思 2、malloc函数语句 int *p(int *)malloc(sizeof(int))malloc函数的形参为申请的内存空间大小,上述申请了一个i…...
swagger关闭/v2/api-docs仍然可以访问漏洞
今天接到安全团队的说swagger有未授权访问漏洞,即使在swagger关闭的情况下http://127.0.0.1:8086/agcloud/v2/api-docs?group%E7%94%A8%E6%88%B7%E5%85%B3%E8%81%94%E4%BF%A1%E6%81%AF%E6%A8%A1%E5%9D%97仍然还能访问。 看了下原来是有写一个拦截器 registry.addI…...
k8s pod调度总结
在Kubernetes平台上,我们很少会直接创建一个Pod,在大多数情况下会通过控制器完成对一组Pod副本的创建、调度 及全生命周期的自动控制任务,如:RC、Deployment、DaemonSet、Job 等。本文主要举例常见的Pod调度。1全自动调度功能&…...
28个案例问题分析---10---对生产环境的敬畏--生产环境
一:背景介绍 1:上午9:23,老师没有进行上课,但是却又很多的在线人员,并且在线人员的时间也不正确。 2:开发人员及时练习用户,查看用户上课情况。 3:10点整,询问项目组长发…...
视觉SLAM十四讲ch7-1视觉里程计笔记
视觉SLAM十四讲ch7-1 视觉里程计笔记本讲目标从本讲开始,开始介绍SLAM系统的重要算法特征点法ORB特征BRIEF实践特征提取与匹配2D-2D:对极几何八点法求E八点法的讨论从单应矩阵恢复R,t小结三角化。 符号读作*指向AA的指针(总在左侧)[]容纳AA的数组(总在左侧)()返…...
bash编程(马哥)
bash基础特性: 命令行展开:~,{} 命令别名:alias,unalias 命令历史:history 命令和路径补全:$PATH glob通配符:*,?,[],[^], 快捷键&am…...
搭建Gerrit环境Ubuntu
搭建Gerrit环境 1.安装apache sudo apt-get install apache2 注意:To run Gerrit behind an Apache server using mod_proxy, enable the necessary Apache2 modules: 执行:sudo a2enmod proxy_http 执行:sudo a2enmod ssl 使新的配置生效,需要执行如下命令:serv…...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...
以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:
一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...
3.3.1_1 检错编码(奇偶校验码)
从这节课开始,我们会探讨数据链路层的差错控制功能,差错控制功能的主要目标是要发现并且解决一个帧内部的位错误,我们需要使用特殊的编码技术去发现帧内部的位错误,当我们发现位错误之后,通常来说有两种解决方案。第一…...
12.找到字符串中所有字母异位词
🧠 题目解析 题目描述: 给定两个字符串 s 和 p,找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义: 若两个字符串包含的字符种类和出现次数完全相同,顺序无所谓,则互为…...
高防服务器能够抵御哪些网络攻击呢?
高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...
C++八股 —— 单例模式
文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全(Thread Safety) 线程安全是指在多线程环境下,某个函数、类或代码片段能够被多个线程同时调用时,仍能保证数据的一致性和逻辑的正确性…...
代理篇12|深入理解 Vite中的Proxy接口代理配置
在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...
Pinocchio 库详解及其在足式机器人上的应用
Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库,专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性,并提供了一个通用的框架&…...
Vue ③-生命周期 || 脚手架
生命周期 思考:什么时候可以发送初始化渲染请求?(越早越好) 什么时候可以开始操作dom?(至少dom得渲染出来) Vue生命周期: 一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个…...
Vue3中的computer和watch
computed的写法 在页面中 <div>{{ calcNumber }}</div>script中 写法1 常用 import { computed, ref } from vue; let price ref(100);const priceAdd () > { //函数方法 price 1price.value ; }//计算属性 let calcNumber computed(() > {return ${p…...
