From 51895598ccade164d955d2de3ef4a535809f7831 Mon Sep 17 00:00:00 2001 From: Tymur Boiko Date: Tue, 3 Dec 2024 12:57:34 +0200 Subject: [PATCH] decode: added ability to decoder to resuse device in multisession decode mode --- common/libs/VkCodecUtils/ProgramConfig.h | 33 ++- common/libs/VkCodecUtils/poll_manager.h | 64 +++++ common/libs/VkCodecUtils/poll_usoc.cpp | 244 ++++++++++++++++++ common/libs/VkShell/ShellWin32.cpp | 2 +- .../demos/vk-video-dec/CMakeLists.txt | 4 +- vk_video_decoder/demos/vk-video-dec/Main.cpp | 130 ++++++++-- 6 files changed, 445 insertions(+), 32 deletions(-) create mode 100644 common/libs/VkCodecUtils/poll_manager.h create mode 100644 common/libs/VkCodecUtils/poll_usoc.cpp diff --git a/common/libs/VkCodecUtils/ProgramConfig.h b/common/libs/VkCodecUtils/ProgramConfig.h index 04e0e573..43fe17f9 100644 --- a/common/libs/VkCodecUtils/ProgramConfig.h +++ b/common/libs/VkCodecUtils/ProgramConfig.h @@ -80,6 +80,9 @@ struct ProgramConfig { outputcrcPerFrame = false; outputcrc = false; crcOutputFile = nullptr; + numberOfDecodeWorkers = 0; + enableWorkerProcessesPoll = false; + ipcType = 0; } using ProgramArgs = std::vector; @@ -187,8 +190,7 @@ struct ProgramConfig { {"--input", "-i", 1, "Input filename to decode", [this](const char **args, const ProgramArgs &a) { videoFileName = args[0]; - std::ifstream validVideoFileStream(videoFileName, std::ifstream::in); - return (bool)validVideoFileStream; + return true; }}, {"--output", "-o", 1, "Output filename to dump raw video to", [this](const char **args, const ProgramArgs &a) { @@ -322,6 +324,17 @@ struct ProgramConfig { crcInitValue = crcInitValueTemp; return true; }}, + {"--poll-of-processes", nullptr, 1, "Use poll of worker processes and specify number of workers.", + [this](const char **args, const ProgramArgs &a) { + enableWorkerProcessesPoll = true; + numberOfDecodeWorkers = std::atoi(args[0]); + return true; + }}, + {"--files-to-decode", nullptr, 1, "Specify a file location where command lines for the poll of worker processes are saved.", + [this](const char **args, const ProgramArgs &a) { + fileListIpc = args[0]; + return true; + }}, }; for (int i = 1; i < argc; i++) { @@ -391,6 +404,18 @@ struct ProgramConfig { crcOutputFile = stdout; } } + + if (!enableWorkerProcessesPoll) { + if (videoFileName.length() == 0) { + std::cerr << "Input file should be specified" << std::endl; + exit(EXIT_FAILURE); + } + std::ifstream validVideoFileStream(videoFileName, std::ifstream::in); + if (!(bool)validVideoFileStream) { + std::cerr << "Can't open input file: invalid file name" << std::endl; + exit(EXIT_FAILURE); + } + } } // Assuming we have the length as a parameter: @@ -461,6 +486,7 @@ struct ProgramConfig { uint32_t decoderQueueSize; int32_t enablePostProcessFilter; uint32_t *crcOutput; + uint32_t numberOfDecodeWorkers; uint32_t enableStreamDemuxing : 1; uint32_t directMode : 1; uint32_t vsync : 1; @@ -474,6 +500,9 @@ struct ProgramConfig { uint32_t outputy4m : 1; uint32_t outputcrc : 1; uint32_t outputcrcPerFrame : 1; + uint32_t enableWorkerProcessesPoll : 1; + uint32_t ipcType : 1; + std::string fileListIpc; }; #endif /* _PROGRAMSETTINGS_H_ */ diff --git a/common/libs/VkCodecUtils/poll_manager.h b/common/libs/VkCodecUtils/poll_manager.h new file mode 100644 index 00000000..2220dbe5 --- /dev/null +++ b/common/libs/VkCodecUtils/poll_manager.h @@ -0,0 +1,64 @@ +#include +#include +#include +#include + +enum IPC_TYPE { UNIX_DOMAIN_SOCKETS = 0 }; +constexpr int DEFAULT_BUFLEN = 512; + +int usoc_manager(int isNoPresent, std::string& inputCmdsList); +int clientConnectServer(std::string& recvbuf, const char* usocfilename = NULL); + +#ifdef _WIN32 + +static int cloneTheProcess(int argc, const char** argv, PROCESS_INFORMATION& pi, STARTUPINFO& si) { + ZeroMemory(&si, sizeof(si)); + si.cb = sizeof(si); + ZeroMemory(&pi, sizeof(pi)); + std::string argsToPass; + for (int i = 0; i < argc; i++) { + argsToPass += argv[i]; + argsToPass += " "; + } + argsToPass += "spawn"; + if (!CreateProcess(NULL, (LPTSTR)argsToPass.c_str(), NULL, NULL, FALSE, 0, NULL, NULL, &si, &pi)) { + printf("CreateProcess failed (%d).\n", GetLastError()); + return -1; + } + return 0; +} +#endif + +static int parseCharArray(std::vector& w, const char* messageString, int& argc, const char** argv) { + std::stringstream ss(messageString); + std::string word; + argc = 0; + std::cout << std::endl; + while (ss >> w[argc]) { + if (w[argc][0] == '~') { + w[argc] = getenv("HOME") + w[argc].substr(1); + } + if (w[argc].substr(0, 6) == "finish") { + printf("Received a request to finish this decode worker. The worker process is terminated (completed).\n"); + return 0; + } + if (w[argc].substr(0, 6) == "nodata") { + printf("Received a request to wait for a data...\n"); + return 0; + } + argv[argc] = w[argc].c_str(); + argc++; + } + return argc >= 1; +} + +static int receiveNewBitstream(IPC_TYPE ipcType, bool enableWorkerProcessesPoll, std::string& receivedMessage) { + if (!enableWorkerProcessesPoll) { + return 0; + } + int isDataReceived = 0; + if (ipcType == IPC_TYPE::UNIX_DOMAIN_SOCKETS) { + isDataReceived = clientConnectServer(receivedMessage); + } + return isDataReceived; +} \ No newline at end of file diff --git a/common/libs/VkCodecUtils/poll_usoc.cpp b/common/libs/VkCodecUtils/poll_usoc.cpp new file mode 100644 index 00000000..5fe4b624 --- /dev/null +++ b/common/libs/VkCodecUtils/poll_usoc.cpp @@ -0,0 +1,244 @@ +// The MIT License (MIT) + +// Copyright (c) Microsoft Corporation + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Portions of this repo are provided under the SIL Open Font License. +// See the LICENSE file in individual samples for additional details. + +#include +#include +#include +#include +#include +#include + +#ifndef _WIN32 +#include +#include +#include +#include +#include +#include +#include +#else +#include +#include +#include +#include +#endif + +constexpr char socket_path[14]{"tmpsoc"}; + +#ifndef _WIN32 +constexpr int INVALID_SOCKET = -1; +constexpr int SOCKET_ERROR = -1; +using SOCKET = int; +static inline int WSAGetLastError() { + return errno; +} +#else +static inline const int poll(LPWSAPOLLFD fdArray, ULONG fds, INT timeout) { + return WSAPoll(fdArray, fds, timeout); +} +static inline const int close(SOCKET ConnectSocket) { + int result = closesocket(ConnectSocket); + WSACleanup(); + return result; +} +#endif + +static int readDataFromFile(std::string inputCmdsList, std::vector& filenames) { + std::ifstream inputf; + inputf.open(inputCmdsList); + if (inputf.is_open()) { + std::string line; + while (getline(inputf, line)) { + filenames.push_back(line); + } + filenames.push_back("finish"); + inputf.close(); + return 0; + } + std::cout << "Error opening file"; + return -1; +} + +int usoc_manager(int isNoPresent, std::string& inputCmdsList) { + sockaddr addr; + std::vector filenames; + if (readDataFromFile(inputCmdsList, filenames) == -1) { + return 1; + } + + SOCKET listen_sd; + if ((listen_sd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { + perror("socket error"); + exit(-1); + } + + unlink(socket_path); + + memset(&addr, 0, sizeof(addr)); + addr.sa_family = AF_UNIX; + strncpy(addr.sa_data, socket_path, sizeof(addr.sa_data) - 1); + + if (bind(listen_sd, (struct sockaddr*)&addr, sizeof(addr)) == -1) { + perror("bind error"); + exit(-1); + } + + if (listen(listen_sd, 256) == -1) { + perror("listen error"); + exit(-1); + } + + pollfd fdarray; + constexpr int DEFAULT_WAIT = 30000; + int ret; + SOCKET lsock = INVALID_SOCKET, asock = INVALID_SOCKET; + int stop_server = 0; + for (int i = 0; !stop_server;) { + fdarray.fd = listen_sd; + fdarray.events = POLLIN | POLLOUT; + int data_sent = 0; + + data_sent = 0; + printf("Manager: poll is waiting for incoming events (timeout %d s)\n", DEFAULT_WAIT / 1000); + if (SOCKET_ERROR == (ret = poll(&fdarray, 1, DEFAULT_WAIT))) { + printf("Main: poll operation failed: %d\n", WSAGetLastError()); + return 1; + } + + if (ret == 0) { + stop_server = 1; + } + + if (ret) { + if (fdarray.revents & POLLIN) { + printf("Manager: Connection established.\n"); + + if (INVALID_SOCKET == (asock = accept(listen_sd, NULL, NULL))) { + WSAGetLastError(); + return 1; + } + char buf[512] = {0}; + if (SOCKET_ERROR == (ret = recv(asock, buf, sizeof(buf), 0))) { + WSAGetLastError(); + return 1; + } else + printf("Manager: recvd %d bytes\n", ret); + if (ret) { + i = std::min(i, (int)filenames.size() - 1); + if (SOCKET_ERROR == (ret = send(asock, filenames[i].c_str(), (int)filenames[i].length() + 1, 0))) { + printf("Manager: send socket failed %d \n", WSAGetLastError()); + return 1; + } + printf("Manager: sent %d bytes\n", ret); + data_sent = 1; + } + if (ret) { + if (SOCKET_ERROR == (ret = recv(asock, buf, sizeof(buf), 0))) { + WSAGetLastError(); + return 1; + } else if (ret >= 8) { + printf("Manager: recvd confirm %d bytes\n", ret); + if (strncmp("received", buf, 8) == 0) { + i++; + } + } + } + } + } + } + + close(fdarray.fd); + return 0; +} + +constexpr int DEFAULT_BUFLEN = 512; + +// The following function is a further modification of the main function from the file at the link below: +// https://learn.microsoft.com/en-us/windows/win32/winsock/complete-client-code +int clientConnectServer(std::string& recvbuf, const char* usocfilename) { + int iResult; +#if defined(_WIN32) + WSADATA wsaData; + iResult = WSAStartup(MAKEWORD(2, 2), &wsaData); + if (iResult != 0) { + printf("WSAStartup failed with error: %d\n", iResult); + return 1; + } +#endif + sockaddr saddr{saddr.sa_family = AF_UNIX}; + strncpy(saddr.sa_data, socket_path, sizeof(socket_path)); + SOCKET ConnectSocket = socket(AF_UNIX, SOCK_STREAM, IPPROTO_IP); + if (ConnectSocket == INVALID_SOCKET) { + printf("socket failed with error: %d\n", WSAGetLastError()); + return -1; + } + do { + iResult = connect(ConnectSocket, &saddr, sizeof(saddr) + (int)strlen(saddr.sa_data)); + if (iResult == -1) { +#if defined(_WIN32) + closesocket(ConnectSocket); +#else + close(ConnectSocket); +#endif + ConnectSocket = INVALID_SOCKET; + } + } while (iResult == -1); + + if (ConnectSocket == INVALID_SOCKET) { + printf("Unable to connect to server! %d\n", WSAGetLastError()); +#if defined(_WIN32) + WSACleanup(); +#endif + return -1; + } + int hasNewBitstreamReceived = 0; + int recvbuflen = DEFAULT_BUFLEN; + std::string sendbuf{"data request"}; + iResult = send(ConnectSocket, sendbuf.c_str(), (int)sendbuf.length(), 0); + if (iResult == SOCKET_ERROR) { + printf("send failed with error: %d\n", WSAGetLastError()); + close(ConnectSocket); + return -1; + } + printf("bytes Sent: %d (pid %d)\n", iResult, getpid()); + iResult = recv(ConnectSocket, (char*)recvbuf.c_str(), recvbuflen, 0); + if (iResult == SOCKET_ERROR) { + printf("recv failed with error: %d\n", WSAGetLastError()); + close(ConnectSocket); + return -1; + } + if (iResult > 0) { + hasNewBitstreamReceived = 1; + } + sendbuf = {"received"}; + iResult = send(ConnectSocket, (char*)sendbuf.c_str(), (int)sendbuf.length(), 0); + if (iResult == SOCKET_ERROR) { + printf("send failed with error: %d\n", WSAGetLastError()); + close(ConnectSocket); + return -1; + } + close(ConnectSocket); + return hasNewBitstreamReceived; +} \ No newline at end of file diff --git a/common/libs/VkShell/ShellWin32.cpp b/common/libs/VkShell/ShellWin32.cpp index dae2ef45..e122eced 100644 --- a/common/libs/VkShell/ShellWin32.cpp +++ b/common/libs/VkShell/ShellWin32.cpp @@ -130,7 +130,7 @@ LRESULT ShellWin32::HandleMessage(UINT msg, WPARAM wparam, LPARAM lparam) { } break; case WM_DESTROY: - QuitLoop(); + SendMessage(m_hwnd, WM_QUIT, 0, 0); break; default: return DefWindowProc(m_hwnd, msg, wparam, lparam); diff --git a/vk_video_decoder/demos/vk-video-dec/CMakeLists.txt b/vk_video_decoder/demos/vk-video-dec/CMakeLists.txt index 0a61f859..c2ce1067 100644 --- a/vk_video_decoder/demos/vk-video-dec/CMakeLists.txt +++ b/vk_video_decoder/demos/vk-video-dec/CMakeLists.txt @@ -74,6 +74,8 @@ set(sources ${VK_VIDEO_COMMON_LIBS_SOURCE_ROOT}/VkCodecUtils/VulkanBistreamBufferImpl.h ${VK_VIDEO_COMMON_LIBS_SOURCE_ROOT}/VkCodecUtils/VulkanBistreamBufferImpl.cpp ${VK_VIDEO_COMMON_LIBS_SOURCE_ROOT}/VkCodecUtils/crcgenerator.cpp + ${VK_VIDEO_COMMON_LIBS_SOURCE_ROOT}/VkCodecUtils/poll_manager.h + ${VK_VIDEO_COMMON_LIBS_SOURCE_ROOT}/VkCodecUtils/poll_usoc.cpp ${VK_VIDEO_DECODER_LIBS_SOURCE_ROOT}/VkDecoderUtils/FFmpegDemuxer.cpp ${VK_VIDEO_DECODER_LIBS_SOURCE_ROOT}/VkDecoderUtils/VideoStreamDemuxer.cpp ${VK_VIDEO_DECODER_LIBS_SOURCE_ROOT}/VkDecoderUtils/VideoStreamDemuxer.h @@ -108,7 +110,7 @@ link_directories( ) if(WIN32) - list(APPEND libraries PRIVATE ${AVCODEC_LIB} ${AVFORMAT_LIB} ${AVUTIL_LIB} ${VULKAN_VIDEO_PARSER_LIB} ${GLSLANG_LIBRARIES}) + list(APPEND libraries PRIVATE ${AVCODEC_LIB} ${AVFORMAT_LIB} ${AVUTIL_LIB} ${VULKAN_VIDEO_PARSER_LIB} ${GLSLANG_LIBRARIES} Ws2_32.lib user32.lib wsock32.lib) else() list(APPEND libraries PRIVATE -lX11) list(APPEND libraries PRIVATE -lavcodec -lavutil -lavformat) diff --git a/vk_video_decoder/demos/vk-video-dec/Main.cpp b/vk_video_decoder/demos/vk-video-dec/Main.cpp index 42783722..5aeba56d 100644 --- a/vk_video_decoder/demos/vk-video-dec/Main.cpp +++ b/vk_video_decoder/demos/vk-video-dec/Main.cpp @@ -20,18 +20,63 @@ #include #include #include +#ifndef _WIN32 +#include +#include +#else +#include +#include +#endif #include "VkCodecUtils/VulkanDeviceContext.h" #include "VkCodecUtils/ProgramConfig.h" #include "VkCodecUtils/VulkanVideoProcessor.h" #include "VkCodecUtils/VulkanDecoderFrameProcessor.h" +#include "VkCodecUtils/poll_manager.h" #include "VkShell/Shell.h" int main(int argc, const char **argv) { + int spawn = 0; +#ifdef _WIN32 + PROCESS_INFORMATION pi {}; + STARTUPINFO si {}; + if (strncmp(argv[argc-1],"spawn",5)==0) { + spawn = 1; + argc--; + } +#endif + ProgramConfig programConfig(argv[0]); programConfig.ParseArgs(argc, argv); + int pid = getpid(); + if (spawn == 0) { + for (int n = 0; n < (int)programConfig.numberOfDecodeWorkers; n++) { +#ifdef _WIN32 + cloneTheProcess(argc, argv, pi, si); +#else + if(fork() == 0) { + break; + } +#endif + } + } +#ifdef _WIN32 + if (spawn == 0) +#else + if (pid == getpid()) +#endif + { + if (programConfig.enableWorkerProcessesPoll == 1) { + int result = 1; + if (programConfig.ipcType == IPC_TYPE::UNIX_DOMAIN_SOCKETS) { + result = usoc_manager(programConfig.noPresent, programConfig.fileListIpc); + } + return result; + } + } + // In the regular application usecase the CRC output variables are allocated here and also output as part of main. // In the library case it is up to the caller of the library to allocate the values and initialize them. std::vector crcAllocation; @@ -154,17 +199,36 @@ int main(int argc, const char **argv) { } VkSharedBaseObj vulkanVideoProcessor; - result = VulkanVideoProcessor::Create(programConfig, &vkDevCtxt, vulkanVideoProcessor); - if (result != VK_SUCCESS) { - return -1; - } - - VkSharedBaseObj> videoQueue(vulkanVideoProcessor); VkSharedBaseObj frameProcessor; - result = CreateDecoderFrameProcessor(&vkDevCtxt, videoQueue, frameProcessor); - if (result != VK_SUCCESS) { - return -1; - } + std::vector messageBuffer(128); + auto processInputFile = [&] (const int i) -> bool { + int res = 0; + if (programConfig.enableWorkerProcessesPoll) { + std::string receivedMessage; + receivedMessage.resize(DEFAULT_BUFLEN); + res = receiveNewBitstream(static_cast(programConfig.ipcType), programConfig.enableWorkerProcessesPoll, receivedMessage); + if (res) { + receivedMessage.resize(DEFAULT_BUFLEN); + res = parseCharArray(messageBuffer, receivedMessage.c_str(), argc, argv); + if (0) { // if(parseAllCmdline) { // we can pass full cmdline as well in case we need to change some decoding parameters + programConfig.ParseArgs(argc, argv); + } else if (res) { + programConfig.videoFileName = argv[0]; + std::cout << argv[0] << std::endl; + } else { + return 0; + } + } + } + result = VulkanVideoProcessor::Create(programConfig, &vkDevCtxt, vulkanVideoProcessor); + vulkanVideoProcessor->Initialize(&vkDevCtxt, programConfig); + VkSharedBaseObj> videoQueue(vulkanVideoProcessor); + result = CreateDecoderFrameProcessor(&vkDevCtxt, videoQueue, frameProcessor); + if (!programConfig.enableWorkerProcessesPoll) { + return i == 0; + } + return res; + }; VkVideoCodecOperationFlagsKHR videoDecodeCodecs = (VK_VIDEO_CODEC_OPERATION_DECODE_H264_BIT_KHR | VK_VIDEO_CODEC_OPERATION_DECODE_H265_BIT_KHR | @@ -179,7 +243,6 @@ int main(int argc, const char **argv) { if (supportsDisplay && !programConfig.noPresent) { - const Shell::Configuration configuration(programConfig.appName.c_str(), programConfig.backBufferCount, programConfig.directMode); @@ -220,10 +283,11 @@ int main(int argc, const char **argv) { true, // createDisplayQueue requestVideoComputeQueueMask != 0 // createComputeQueue ); - vulkanVideoProcessor->Initialize(&vkDevCtxt, programConfig); - - - displayShell->RunLoop(); + int i0 = 0; + while (processInputFile(i0++)) { + Shell::Create(&vkDevCtxt, configuration, frameProcessor, displayShell); + displayShell->RunLoop(); + } } else { @@ -257,19 +321,21 @@ int main(int argc, const char **argv) { return -1; } - vulkanVideoProcessor->Initialize(&vkDevCtxt, programConfig); - - const int numberOfFrames = programConfig.decoderQueueSize; - int ret = frameProcessor->CreateFrameData(numberOfFrames); - assert(ret == numberOfFrames); - if (ret != numberOfFrames) { - return -1; + int i1 = 0; + while (processInputFile(i1++)) + { + const int numberOfFrames = programConfig.decoderQueueSize; + int ret = frameProcessor->CreateFrameData(numberOfFrames); + assert(ret == numberOfFrames); + if (ret != numberOfFrames) { + return -1; + } + bool continueLoop = true; + do { + continueLoop = frameProcessor->OnFrame(0); + } while (continueLoop); + frameProcessor->DestroyFrameData(); } - bool continueLoop = true; - do { - continueLoop = frameProcessor->OnFrame(0); - } while (continueLoop); - frameProcessor->DestroyFrameData(); } if (programConfig.outputcrc != 0) { @@ -284,6 +350,14 @@ int main(int argc, const char **argv) { programConfig.crcOutputFile = stdout; } } - +#ifdef _WIN32 + WaitForSingleObject( pi.hProcess, INFINITE ); + CloseHandle( pi.hProcess ); + CloseHandle( pi.hThread ); +#else + int status = 1; + wait(&status); + exit(status); +#endif return 0; }