harmony 鸿蒙多级Worker间高性能消息通信

  • 2025-06-12
  • 浏览 (4)

多级Worker间高性能消息通信

多级Worker(即通过父Worker创建子Worker的机制形成层级线程关系)间通信是一种常见的需求,由于Worker线程生命周期由用户自行管理,因此需要注意多级Worker生命周期的正确管理,建议开发者确保销毁父Worker前先销毁所有子Worker。

本文介绍如何在多级Worker间实现高性能消息通信,高性能消息通信的关键在于Sendable对象,结合Worker的postMessageWithSharedSendable接口,可以实现线程间高性能的对象传递。以数据克隆场景为例,假设有三个Worker,一个父Worker和两个子Worker,父Worker负责创建子Worker,并向子Worker发送数据克隆任务,子Worker负责接收任务并执行数据克隆操作,完成后将克隆结果返回给父Worker。

  1. 准备一个Sendable类CopyEntry,用于封装克隆任务数据。
   // CopyEntry.ets
   @Sendable
   export class CopyEntry {
     // 克隆类型
     type: string;
     // 文件路径
     filePath: string;
     constructor(type: string, filePath: string) {
       this.type = type;
       this.filePath = filePath;
     }
   }
  1. 准备两个Worker文件,父Worker文件为ParentWorker.ets,子Worker文件为ChildWorker.ets。父Worker负责分发克隆任务并判断任务全部完成后关闭子Worker与父Worker;子Worker负责接收任务并执行数据克隆操作,并在任务完成后通知父Worker。
   // ParentWorker.ets
   import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker, collections, ArkTSUtils } from '@kit.ArkTS'
   import { CopyEntry } from './CopyEntry'

   const workerPort: ThreadWorkerGlobalScope = worker.workerPort;

   // 计算worker1的任务数量
   let count1 = 0;
   // 计算worker2的任务数量
   let count2 = 0;
   // 计算总任务数量
   let sum = 0;
   // 异步锁
   const asyncLock = new ArkTSUtils.locks.AsyncLock();
   // 创建子Worker
   const copyWorker1 = new worker.ThreadWorker('entry/ets/pages/ChildWorker');
   const copyWorker2 = new worker.ThreadWorker('entry/ets/pages/ChildWorker');

   workerPort.onmessage = (e : MessageEvents) => {
     let array = e.data as collections.Array<CopyEntry>;
     sum = array.length;
     for (let i = 0; i < array.length; i++) {
       let entry = array[i];
       if (entry.type === 'copy1') {
         count1++;
         // 如果是copy1类型,则将数据传递给 copyWorker1
         copyWorker1.postMessageWithSharedSendable(entry);
       } else if (entry.type === 'copy2') {
         count2++;
         // 如果是copy2类型,则将数据传递给 copyWorker2
         copyWorker2.postMessageWithSharedSendable(entry);
       }
     }
   }

   copyWorker1.onmessage = async (e : MessageEvents) => {
     console.info('copyWorker1 onmessage:' + e.data);
     await asyncLock.lockAsync(() => {
       count1--;
       if (count1 == 0) {
         // 如果copyWorker1的任务全部完成,则关闭copyWorker1
         console.info('copyWorker1 close');
         copyWorker1.terminate();
       }
       sum--;
       if (sum == 0) {
         // 如果所有任务全部完成,则关闭父Worker
         workerPort.close();
       }
     })
   }

   copyWorker2.onmessage = async (e : MessageEvents) => {
     console.info('copyWorker2 onmessage:' + e.data);
     await asyncLock.lockAsync(() => {
       count2--;
       sum--;
       if (count2 == 0) {
         // 如果copyWorker2的任务全部完成,则关闭copyWorker2
         console.info('copyWorker2 close')
         copyWorker2.terminate();
       }
       if (sum == 0) {
         // 如果所有任务全部完成,则关闭父Worker
         workerPort.close();
       }
     })
   }

   workerPort.onmessageerror = (e : MessageEvents) => {
     console.info('onmessageerror:' + e.data);
   }

   workerPort.onerror = (e : ErrorEvent) => {
     console.info('onerror:' + e.message);
   }
   // ChildWorker.ets
   import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker} from '@kit.ArkTS'
   import { CopyEntry } from './CopyEntry'

   const workerPort: ThreadWorkerGlobalScope = worker.workerPort;

   workerPort.onmessage = (e : MessageEvents) => {
     let data = e.data as CopyEntry;
     // 中间copy操作省略
     console.info(data.filePath);
     workerPort.postMessageWithSharedSendable("done");
   }

   workerPort.onmessageerror = (e : MessageEvents) => {
     console.info('onmessageerror:' + e.data);
   }

   workerPort.onerror = (e : ErrorEvent) => {
     console.info('onerror:' + e.message);
   }
  1. 在UI主进程页面,创建父Worker并准备克隆任务所需的数据,准备完成后将数据发送给父Worker。
   // Index.ets
   import { worker, collections } from '@kit.ArkTS';
   import { BusinessError } from '@kit.BasicServicesKit';
   import { CopyEntry } from './CopyEntry'

   function promiseCase() {
     let p: Promise<void> = new Promise<void>((resolve: Function, reject: Function) => {
       setTimeout(() => {
         resolve(1);
       }, 100)
     }).then(undefined, (error: BusinessError) => {
     })
     return p;
   }

   async function postMessageTest() {
     let ss = new worker.ThreadWorker("entry/ets/pages/ParentWorker");
     let isTerminate = false;
     ss.onexit = () => {
       isTerminate = true;
     }
     let array = new collections.Array<CopyEntry>();
     // 准备数据
     for (let i = 0; i < 4; i++) {
       if (i % 2 == 0) {
         array.push(new CopyEntry("copy1", "file://copy1.txt"));
       } else {
         array.push(new CopyEntry("copy2", "file://copy2.txt"));
       }
     }
     // 给Worker线程发送消息
     ss.postMessageWithSharedSendable(array);
     while (!isTerminate) {
       await promiseCase();
     }
     console.info("Worker线程已退出");
   }

   @Entry
   @Component
   struct Index {
     @State message: string = 'Hello World';
     build() {
       Row() {
         Column() {
           Text(this.message)
             .fontSize(50)
             .fontWeight(FontWeight.Bold)
             .onClick(() => {
               postMessageTest();
             })
         }
         .width('100%')
       }
       .height('100%')
     }
   }

你可能感兴趣的鸿蒙文章

harmony 鸿蒙ArkTS(方舟编程语言)

harmony 鸿蒙在build-profile.json5中配置arkOptions

harmony 鸿蒙异步锁

harmony 鸿蒙方舟字节码文件格式

harmony 鸿蒙方舟字节码函数命名规则

harmony 鸿蒙方舟字节码基本原理

harmony 鸿蒙方舟字节码概述

harmony 鸿蒙共享容器

harmony 鸿蒙异步等待

harmony 鸿蒙ArkTS跨语言交互

0  赞