程序员社区

Java线程间通信方式

一、通过synchronized关键字

public class test {

    public static void main(String[] args) {
        MyObject object=new MyObject();
        ThreadA threadA=new ThreadA(object);
        ThreadB threadB=new ThreadB(object);
        threadA.start();
        threadB.start();
    }

}

class MyObject{
    synchronized public void methodA(){
        //do something
    }

    synchronized public void methodB(){
        //do something
    }
}

class ThreadA extends Thread{
    private MyObject object;

    public ThreadA(MyObject object){
        this.object=object;
    }

    @Override
    public void run(){
        super.run();
        object.methodA();
    }
}

class ThreadB extends Thread{
    private MyObject object;

    public ThreadB(MyObject object){
        this.object=object;
    }

    @Override
    public void run(){
        super.run();
        object.methodB();
    }
}

二、wait和notify

import org.omg.Messaging.SYNC_WITH_TRANSPORT;

import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

public class test {

    public static void main(String[] args) {
        try {
            Object lock=new Object();

            ThreadA a=new ThreadA(lock);
            a.start();

            Thread.sleep(1000);

            ThreadB b=new ThreadB(lock);
            b.start();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }

}

class MyList{
    private static List<String> list=new ArrayList<>();

    public static void add(){
        list.add("test");
    }

    public static int size(){
        return list.size();
    }
}

class ThreadA extends Thread{
    private Object lock;

    public ThreadA(Object lock){
        super();
        this.lock=lock;
    }

    @Override
    public void run(){
        try{
            synchronized (lock){
                if(MyList.size()!=5){
                    System.out.println("wait begin:"+System.currentTimeMillis());
                    lock.wait();
                    System.out.println("wait end:"+System.currentTimeMillis());
                }
            }
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

class ThreadB extends Thread{
    private Object lock;

    public ThreadB(Object lock){
        super();
        this.lock=lock;
    }

    @Override
    public void run(){
        try{
            synchronized (lock){
                for(int i=0;i<10;i++){
                    MyList.add();
                    if(MyList.size()==5){
                        lock.notifyAll();
                        System.out.println("已经发出了通知");
                    }
                    System.out.println("添加了"+(i+1)+"个元素");
                    Thread.sleep(1000);
                }
            }
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

结果:

wait begin:1602418383158
添加了1个元素
添加了2个元素
添加了3个元素
添加了4个元素
已经发出了通知
添加了5个元素
添加了6个元素
添加了7个元素
添加了8个元素
添加了9个元素
添加了10个元素
wait end:1602418394166

线程A要等待某个条件满足时(list.size()==5),才执行操作。线程B则向list中添加元素,改变list 的size。

A,B之间如何通信的呢?也就是说,线程A如何知道 list.size() 已经为5了呢?

这里用到了Object类的 wait() 和 notify() 方法。

当条件未满足时(list.size() !=5),线程A调用wait() 放弃CPU,并进入阻塞状态。

当条件满足时,线程B调用 notify()通知 线程A,所谓通知线程A,就是唤醒线程A,并让它进入可运行状态。

这种方式的一个好处就是CPU的利用率提高了。

但是也有一些缺点:比如,线程B先执行,一下子添加了5个元素并调用了notify()发送了通知,而此时线程A还执行;当线程A执行并调用wait()时,那它永远就不可能被唤醒了。因为,线程B已经发了通知了,以后不再发通知了。这说明:通知过早,会打乱程序的执行逻辑。

三、while轮询

import org.omg.Messaging.SYNC_WITH_TRANSPORT;

import java.lang.annotation.Native;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

public class test {

    public static void main(String[] args) {
        MyList list=new MyList();

        ThreadA a=new ThreadA(list);
        a.setName("A");
        a.start();

        ThreadB b=new ThreadB(list);
        b.setName("B");
        b.start();
    }

}


class MyList{
    private List<String> list=new ArrayList<>();

    public void add(){
        list.add("elements");
    }

    public int size(){
        return list.size();
    }
}


class ThreadA extends Thread{
    private MyList list;

    public ThreadA(MyList list){
        super();
        this.list=list;
    }

    @Override
    public void run(){
        try{
            for(int i=0;i<10;i++){
                list.add();
                System.out.println("添加了:"+(i+1)+"个元素");
                Thread.sleep(1000);
            }
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

class ThreadB extends Thread{
    private MyList list;

    public ThreadB(MyList list){
        super();
        this.list=list;
    }

    @Override
    public void run(){
        try{
            while(true){
                if(list.size()==5){
                    System.out.println("线程B准备退出");
                    throw new InterruptedException();
                }
            }
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

在这种方式下,线程A不断改变条件,线程B不断通过while语句检测这个条件(list.size()==5)是否成立,从而实现了线程间的通信。但是这种方式会浪费CPU资源。

同时这种方式还存在另外一个问题,这就是轮询条件的可见性问题:
线程都是先把变量读取到本地线程栈空间,然后再去修改的本地变量。因此,如果线程B每次都在取本地的条件变量,那么尽管另外一个线程已经改变了轮询的条件,它也察觉不到,这样会造成死循环。

25行对list加上volatile限制后,这时程序会正常退出,不会陷入死循环。

private volatile List<String> list=new ArrayList<>();

四、管道通信

利用java.io.PipedInputStream和java.io.PipedOutputStream进行通信。

生产者每5s提供5个产品,放入管道
消费者每0.5s从管道中取一件产品,并打印剩余产品数量,打印产品信息(以数字替代)

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class test{

    public static void main(String[] args) throws InterruptedException {
        PipedOutputStream pos=new PipedOutputStream();
        PipedInputStream pis=new PipedInputStream();
        try {
            pis.connect(pos);
        }catch (IOException e){
            e.printStackTrace();
        }

        new MyProducer(pos).start();
        new MyConsumer(pis).start();
    }

}

class MyProducer extends Thread{
    private PipedOutputStream outputStream;

    private int index=0;

    public MyProducer(PipedOutputStream outputStream){
        this.outputStream=outputStream;
    }

    @Override
    public void run(){
        while(true){
            try{
                for(int i=0;i<5;i++){
                    outputStream.write(index++);
                }
            }catch (IOException e){
                e.printStackTrace();
            }

            try {
                Thread.sleep(1000);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}

class MyConsumer extends Thread{

    private PipedInputStream inputStream;

    public MyConsumer(PipedInputStream inputStream){
        this.inputStream=inputStream;
    }

    @Override
    public void run(){
        while (true){
            try {
                Thread.sleep(500);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            try {
                int count=inputStream.available();
                if(count>0){
                    System.out.println("rest product cout"+count);
                    System.out.println("get product"+inputStream.read());
                }
            }catch (IOException e1){
                e1.printStackTrace();
            }
        }
    }
}

https://blog.csdn.net/Hadwin1991/article/details/73527835

赞(0) 打赏
未经允许不得转载:IDEA激活码 » Java线程间通信方式

一个分享Java & Python知识的社区